flip_in_mem_edsm_server.cc revision c407dc5cd9bdc5668497f21b26b09d988ab439de
1// Copyright (c) 2009 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include <dirent.h>
6#include <linux/tcp.h>  // For TCP_NODELAY
7#include <sys/socket.h>
8#include <sys/types.h>
9#include <unistd.h>
10#include <openssl/err.h>
11#include <openssl/ssl.h>
12
13#include <deque>
14#include <iostream>
15#include <limits>
16#include <vector>
17#include <list>
18
19#include "base/logging.h"
20#include "base/simple_thread.h"
21#include "base/timer.h"
22#include "base/lock.h"
23#include "net/spdy/spdy_frame_builder.h"
24#include "net/spdy/spdy_framer.h"
25#include "net/spdy/spdy_protocol.h"
26#include "net/tools/flip_server/balsa_enums.h"
27#include "net/tools/flip_server/balsa_frame.h"
28#include "net/tools/flip_server/balsa_headers.h"
29#include "net/tools/flip_server/balsa_visitor_interface.h"
30#include "net/tools/flip_server/buffer_interface.h"
31#include "net/tools/flip_server/create_listener.h"
32#include "net/tools/flip_server/epoll_server.h"
33#include "net/tools/flip_server/other_defines.h"
34#include "net/tools/flip_server/ring_buffer.h"
35#include "net/tools/flip_server/simple_buffer.h"
36#include "net/tools/flip_server/split.h"
37#include "net/tools/flip_server/url_to_filename_encoder.h"
38#include "net/tools/flip_server/url_utilities.h"
39
40////////////////////////////////////////////////////////////////////////////////
41
42using std::cerr;
43using std::deque;
44using std::list;
45using std::map;
46using std::ostream;
47using std::pair;
48using std::string;
49using std::vector;
50
51////////////////////////////////////////////////////////////////////////////////
52
53
54//         If set to true, then the server will act as an SSL server for both
55//          HTTP and SPDY);
56bool FLAGS_use_ssl = true;
57
58// The name of the cert .pem file);
59string FLAGS_ssl_cert_name = "cert.pem";
60
61// The name of the key .pem file);
62string FLAGS_ssl_key_name = "key.pem";
63
64// The number of responses given before the server closes the
65//  connection);
66int32 FLAGS_response_count_until_close = 1000*1000;
67
68// If true, then disables the nagle algorithm);
69bool FLAGS_no_nagle = true;
70
71// The number of times that accept() will be called when the
72//  alarm goes off when the accept_using_alarm flag is set to true.
73//  If set to 0, accept() will be performed until the accept queue
74//  is completely drained and the accept() call returns an error);
75int32 FLAGS_accepts_per_wake = 0;
76
77// The port on which the spdy server listens);
78int32 FLAGS_spdy_port = 10040;
79
80// The port on which the http server listens);
81int32 FLAGS_port = 16002;
82
83// The size of the TCP accept backlog);
84int32 FLAGS_accept_backlog_size = 1024;
85
86// The directory where cache locates);
87string FLAGS_cache_base_dir = ".";
88
89// If true, then encode url to filename);
90bool FLAGS_need_to_encode_url = true;
91
92// If set to false a single socket will be used. If set to true
93//  then a new socket will be created for each accept thread.
94//  Note that this only works with kernels that support
95//  SO_REUSEPORT);
96bool FLAGS_reuseport = false;
97
98// The amount of time the server delays before sending back the
99//  reply);
100double FLAGS_server_think_time_in_s = 0;
101
102// Does the server send X-Subresource headers);
103bool FLAGS_use_xsub = false;
104
105// Does the server send X-Associated-Content headers);
106bool FLAGS_use_xac = false;
107
108// Does the server advance cwnd by sending no-op packets);
109bool FLAGS_use_cwnd_opener = false;
110
111// Does the server compress data frames);
112bool FLAGS_use_compression = false;
113
114////////////////////////////////////////////////////////////////////////////////
115
116using base::StringPiece;
117using base::SimpleThread;
118// using base::Lock;  // heh, this isn't in base namespace?!
119// using base::AutoLock;  // ditto!
120using net::BalsaFrame;
121using net::BalsaFrameEnums;
122using net::BalsaHeaders;
123using net::BalsaHeadersEnums;
124using net::BalsaVisitorInterface;
125using net::EpollAlarmCallbackInterface;
126using net::EpollCallbackInterface;
127using net::EpollEvent;
128using net::EpollServer;
129using net::RingBuffer;
130using net::SimpleBuffer;
131using net::SplitStringPieceToVector;
132using net::UrlUtilities;
133using spdy::CONTROL_FLAG_NONE;
134using spdy::DATA_FLAG_COMPRESSED;
135using spdy::DATA_FLAG_FIN;
136using spdy::RST_STREAM;
137using spdy::SYN_REPLY;
138using spdy::SYN_STREAM;
139using spdy::SpdyControlFrame;
140using spdy::SpdyDataFlags;
141using spdy::SpdyDataFrame;
142using spdy::SpdyRstStreamControlFrame;
143using spdy::SpdyFrame;
144using spdy::SpdyFrameBuilder;
145using spdy::SpdyFramer;
146using spdy::SpdyFramerVisitorInterface;
147using spdy::SpdyHeaderBlock;
148using spdy::SpdyStreamId;
149using spdy::SpdySynReplyControlFrame;
150using spdy::SpdySynStreamControlFrame;
151
152
153////////////////////////////////////////////////////////////////////////////////
154
155void PrintSslError() {
156  char buf[128];  // this buffer must be at least 120 chars long.
157  int error_num = ERR_get_error();
158  while (error_num != 0) {
159    LOG(INFO)<< ERR_error_string(error_num, buf);
160    error_num = ERR_get_error();
161  }
162}
163
164////////////////////////////////////////////////////////////////////////////////
165
166// Creates a socket with domain, type and protocol parameters.
167// Assigns the return value of socket() to *fd.
168// Returns errno if an error occurs, else returns zero.
169int CreateSocket(int domain, int type, int protocol, int *fd) {
170  CHECK(fd != NULL);
171  *fd = ::socket(domain, type, protocol);
172  return (*fd == -1) ? errno : 0;
173}
174
175////////////////////////////////////////////////////////////////////////////////
176
177// Sets an FD to be nonblocking.
178void SetNonBlocking(int fd) {
179  DCHECK(fd >= 0);
180
181  int fcntl_return = fcntl(fd, F_GETFL, 0);
182  CHECK_NE(fcntl_return, -1)
183    << "error doing fcntl(fd, F_GETFL, 0) fd: " << fd
184    << " errno=" << errno;
185
186  if (fcntl_return & O_NONBLOCK)
187    return;
188
189  fcntl_return = fcntl(fd, F_SETFL, fcntl_return | O_NONBLOCK);
190  CHECK_NE(fcntl_return, -1)
191    << "error doing fcntl(fd, F_SETFL, fcntl_return) fd: " << fd
192    << " errno=" << errno;
193}
194
195// Encode the URL.
196string EncodeURL(string uri, string host, string method) {
197  if (!FLAGS_need_to_encode_url) {
198    // TODO(mbelshe): if uri is fully qualified, need to strip protocol/host.
199    return string(method + "_" + uri);
200  }
201
202  string filename;
203  if (uri[0] == '/') {
204    // uri is not fully qualified.
205    filename = net::UrlToFilenameEncoder::Encode(
206        "http://" + host + uri, method + "_/");
207  } else {
208    filename = net::UrlToFilenameEncoder::Encode(uri, method + "_/");
209  }
210  return filename;
211}
212
213////////////////////////////////////////////////////////////////////////////////
214
215
216struct GlobalSSLState {
217  SSL_METHOD* ssl_method;
218  SSL_CTX* ssl_ctx;
219};
220
221////////////////////////////////////////////////////////////////////////////////
222
223GlobalSSLState* global_ssl_state = NULL;
224
225////////////////////////////////////////////////////////////////////////////////
226
227// SSL stuff
228void spdy_init_ssl(GlobalSSLState* state) {
229  SSL_library_init();
230  PrintSslError();
231
232  SSL_load_error_strings();
233  PrintSslError();
234
235  state->ssl_method = SSLv23_method();
236  state->ssl_ctx = SSL_CTX_new(state->ssl_method);
237  if (!state->ssl_ctx) {
238    PrintSslError();
239    LOG(FATAL) << "Unable to create SSL context";
240  }
241  // Disable SSLv2 support.
242  SSL_CTX_set_options(state->ssl_ctx, SSL_OP_NO_SSLv2);
243  if (SSL_CTX_use_certificate_file(state->ssl_ctx,
244                                   FLAGS_ssl_cert_name.c_str(),
245                                   SSL_FILETYPE_PEM) <= 0) {
246    PrintSslError();
247    LOG(FATAL) << "Unable to use cert.pem as SSL cert.";
248  }
249  if (SSL_CTX_use_PrivateKey_file(state->ssl_ctx,
250                                  FLAGS_ssl_key_name.c_str(),
251                                  SSL_FILETYPE_PEM) <= 0) {
252    PrintSslError();
253    LOG(FATAL) << "Unable to use key.pem as SSL key.";
254  }
255  if (!SSL_CTX_check_private_key(state->ssl_ctx)) {
256    PrintSslError();
257    LOG(FATAL) << "The cert.pem and key.pem files don't match";
258  }
259}
260
261SSL* spdy_new_ssl(SSL_CTX* ssl_ctx) {
262  SSL* ssl = SSL_new(ssl_ctx);
263  PrintSslError();
264
265  SSL_set_accept_state(ssl);
266  PrintSslError();
267  return ssl;
268}
269
270////////////////////////////////////////////////////////////////////////////////
271
272const int kMSS = 1460;
273const int kInitialDataSendersThreshold = (2 * kMSS) - SpdyFrame::size();
274const int kNormalSegmentSize = (2 * kMSS) - SpdyFrame::size();
275
276////////////////////////////////////////////////////////////////////////////////
277
278class DataFrame {
279 public:
280  const char* data;
281  size_t size;
282  bool delete_when_done;
283  size_t index;
284  DataFrame() : data(NULL), size(0), delete_when_done(false), index(0) {}
285  void MaybeDelete() {
286    if (delete_when_done) {
287      delete[] data;
288    }
289  }
290};
291
292////////////////////////////////////////////////////////////////////////////////
293
294class StoreBodyAndHeadersVisitor: public BalsaVisitorInterface {
295 public:
296  BalsaHeaders headers;
297  string body;
298  bool error_;
299
300  virtual void ProcessBodyInput(const char *input, size_t size) {}
301  virtual void ProcessBodyData(const char *input, size_t size) {
302    body.append(input, size);
303  }
304  virtual void ProcessHeaderInput(const char *input, size_t size) {}
305  virtual void ProcessTrailerInput(const char *input, size_t size) {}
306  virtual void ProcessHeaders(const BalsaHeaders& headers) {
307    // nothing to do here-- we're assuming that the BalsaFrame has
308    // been handed our headers.
309  }
310  virtual void ProcessRequestFirstLine(const char* line_input,
311                                       size_t line_length,
312                                       const char* method_input,
313                                       size_t method_length,
314                                       const char* request_uri_input,
315                                       size_t request_uri_length,
316                                       const char* version_input,
317                                       size_t version_length) {}
318  virtual void ProcessResponseFirstLine(const char *line_input,
319                                        size_t line_length,
320                                        const char *version_input,
321                                        size_t version_length,
322                                        const char *status_input,
323                                        size_t status_length,
324                                        const char *reason_input,
325                                        size_t reason_length) {}
326  virtual void ProcessChunkLength(size_t chunk_length) {}
327  virtual void ProcessChunkExtensions(const char *input, size_t size) {}
328  virtual void HeaderDone() {}
329  virtual void MessageDone() {}
330  virtual void HandleHeaderError(BalsaFrame* framer) { HandleError(); }
331  virtual void HandleHeaderWarning(BalsaFrame* framer) { HandleError(); }
332  virtual void HandleChunkingError(BalsaFrame* framer) { HandleError(); }
333  virtual void HandleBodyError(BalsaFrame* framer) { HandleError(); }
334
335  void HandleError() { error_ = true; }
336};
337
338////////////////////////////////////////////////////////////////////////////////
339
340struct FileData {
341  void CopyFrom(const FileData& file_data) {
342    headers = new BalsaHeaders;
343    headers->CopyFrom(*(file_data.headers));
344    filename = file_data.filename;
345    related_files = file_data.related_files;
346    body = file_data.body;
347  }
348  FileData(BalsaHeaders* h, const string& b) : headers(h), body(b) {}
349  FileData() {}
350  BalsaHeaders* headers;
351  string filename;
352  vector< pair<int, string> > related_files;   // priority, filename
353  string body;
354};
355
356////////////////////////////////////////////////////////////////////////////////
357
358class MemCacheIter {
359 public:
360  MemCacheIter() :
361      file_data(NULL),
362      priority(0),
363      transformed_header(false),
364      body_bytes_consumed(0),
365      stream_id(0),
366      max_segment_size(kInitialDataSendersThreshold),
367      bytes_sent(0) {}
368  explicit MemCacheIter(FileData* fd) :
369      file_data(fd),
370      priority(0),
371      transformed_header(false),
372      body_bytes_consumed(0),
373      stream_id(0),
374      max_segment_size(kInitialDataSendersThreshold),
375      bytes_sent(0) {}
376  FileData* file_data;
377  int priority;
378  bool transformed_header;
379  size_t body_bytes_consumed;
380  uint32 stream_id;
381  uint32 max_segment_size;
382  size_t bytes_sent;
383};
384
385////////////////////////////////////////////////////////////////////////////////
386
387class MemoryCache {
388 public:
389  typedef map<string, FileData> Files;
390
391 public:
392  Files files_;
393  string cwd_;
394
395  void CloneFrom(const MemoryCache& mc) {
396    for (Files::const_iterator i = mc.files_.begin();
397         i != mc.files_.end();
398         ++i) {
399      Files::iterator out_i =
400        files_.insert(make_pair(i->first, FileData())).first;
401      out_i->second.CopyFrom(i->second);
402      cwd_ = mc.cwd_;
403    }
404  }
405
406  void AddFiles() {
407    LOG(INFO) << "Adding files!";
408    deque<string> paths;
409    cwd_ = FLAGS_cache_base_dir;
410    paths.push_back(cwd_ + "/GET_");
411    DIR* current_dir = NULL;
412    while (!paths.empty()) {
413      while (current_dir == NULL && !paths.empty()) {
414        string current_dir_name = paths.front();
415        VLOG(1) << "Attempting to open dir: \"" << current_dir_name << "\"";
416        current_dir = opendir(current_dir_name.c_str());
417        paths.pop_front();
418
419        if (current_dir == NULL) {
420          perror("Unable to open directory. ");
421          current_dir_name.clear();
422          continue;
423        }
424
425        if (current_dir) {
426          VLOG(1) << "Succeeded opening";
427          for (struct dirent* dir_data = readdir(current_dir);
428               dir_data != NULL;
429               dir_data = readdir(current_dir)) {
430            string current_entry_name =
431              current_dir_name + "/" + dir_data->d_name;
432            if (dir_data->d_type == DT_REG) {
433              VLOG(1) << "Found file: " << current_entry_name;
434              ReadAndStoreFileContents(current_entry_name.c_str());
435            } else if (dir_data->d_type == DT_DIR) {
436              VLOG(1) << "Found subdir: " << current_entry_name;
437              if (string(dir_data->d_name) != "." &&
438                  string(dir_data->d_name) != "..") {
439                VLOG(1) << "Adding to search path: " << current_entry_name;
440                paths.push_front(current_entry_name);
441              }
442            }
443          }
444          VLOG(1) << "Oops, no data left. Closing dir.";
445          closedir(current_dir);
446          current_dir = NULL;
447        }
448      }
449    }
450  }
451
452  void ReadToString(const char* filename, string* output) {
453    output->clear();
454    int fd = open(filename, 0, "r");
455    if (fd == -1)
456      return;
457    char buffer[4096];
458    ssize_t read_status = read(fd, buffer, sizeof(buffer));
459    while (read_status > 0) {
460      output->append(buffer, static_cast<size_t>(read_status));
461      do {
462        read_status = read(fd, buffer, sizeof(buffer));
463      } while (read_status <= 0 && errno == EINTR);
464    }
465    close(fd);
466  }
467
468  void ReadAndStoreFileContents(const char* filename) {
469    StoreBodyAndHeadersVisitor visitor;
470    BalsaFrame framer;
471    framer.set_balsa_visitor(&visitor);
472    framer.set_balsa_headers(&(visitor.headers));
473    string filename_contents;
474    ReadToString(filename, &filename_contents);
475
476    // Ugly hack to make everything look like 1.1.
477    if (filename_contents.find("HTTP/1.0") == 0)
478      filename_contents[7] = '1';
479
480    size_t pos = 0;
481    size_t old_pos = 0;
482    while (true) {
483      old_pos = pos;
484      pos += framer.ProcessInput(filename_contents.data() + pos,
485                                 filename_contents.size() - pos);
486      if (framer.Error() || pos == old_pos) {
487        LOG(ERROR) << "Unable to make forward progress, or error"
488          " framing file: " << filename;
489        if (framer.Error()) {
490          LOG(INFO) << "********************************************ERROR!";
491          return;
492        }
493        return;
494      }
495      if (framer.MessageFullyRead()) {
496        // If no Content-Length or Transfer-Encoding was captured in the
497        // file, then the rest of the data is the body.  Many of the captures
498        // from within Chrome don't have content-lengths.
499        if (!visitor.body.length())
500          visitor.body = filename_contents.substr(pos);
501        break;
502      }
503    }
504    visitor.headers.RemoveAllOfHeader("content-length");
505    visitor.headers.RemoveAllOfHeader("transfer-encoding");
506    visitor.headers.RemoveAllOfHeader("connection");
507    visitor.headers.AppendHeader("transfer-encoding", "chunked");
508    visitor.headers.AppendHeader("connection", "keep-alive");
509
510    // Experiment with changing headers for forcing use of cached
511    // versions of content.
512    // TODO(mbelshe) REMOVE ME
513#if 0
514    // TODO(mbelshe) append current date.
515    visitor.headers.RemoveAllOfHeader("date");
516    if (visitor.headers.HasHeader("expires")) {
517      visitor.headers.RemoveAllOfHeader("expires");
518      visitor.headers.AppendHeader("expires",
519                                 "Fri, 30 Aug, 2019 12:00:00 GMT");
520    }
521#endif
522    BalsaHeaders* headers = new BalsaHeaders;
523    headers->CopyFrom(visitor.headers);
524    string filename_stripped = string(filename).substr(cwd_.size() + 1);
525//    LOG(INFO) << "Adding file (" << visitor.body.length() << " bytes): "
526//              << filename_stripped;
527    files_[filename_stripped] = FileData();
528    FileData& fd = files_[filename_stripped];
529    fd = FileData(headers, visitor.body);
530    fd.filename = string(filename_stripped,
531                         filename_stripped.find_first_of('/'));
532    if (headers->HasHeader("X-Associated-Content")) {
533      string content = headers->GetHeader("X-Associated-Content").as_string();
534      vector<StringPiece> urls_and_priorities;
535      SplitStringPieceToVector(content, "||", &urls_and_priorities, true);
536      VLOG(1) << "Examining X-Associated-Content header";
537      for (unsigned int i = 0; i < urls_and_priorities.size(); ++i) {
538        const StringPiece& url_and_priority_pair = urls_and_priorities[i];
539        vector<StringPiece> url_and_priority;
540        SplitStringPieceToVector(url_and_priority_pair, "??",
541                                 &url_and_priority, true);
542        if (url_and_priority.size() >= 2) {
543          string priority_string(url_and_priority[0].data(),
544                                 url_and_priority[0].size());
545          string filename_string(url_and_priority[1].data(),
546                                 url_and_priority[1].size());
547          long priority;
548          char* last_eaten_char;
549          priority = strtol(priority_string.c_str(), &last_eaten_char, 0);
550          if (last_eaten_char ==
551              priority_string.c_str() + priority_string.size()) {
552            pair<int, string> entry(priority, filename_string);
553            VLOG(1) << "Adding associated content: " << filename_string;
554            fd.related_files.push_back(entry);
555          }
556        }
557      }
558    }
559  }
560
561  // Called at runtime to update learned headers
562  // |url| is a url which contains a referrer header.
563  // |referrer| is the referring URL
564  // Adds an X-Subresource or X-Associated-Content to |referer| for |url|
565  void UpdateHeaders(string referrer, string file_url) {
566    if (!FLAGS_use_xac && !FLAGS_use_xsub)
567      return;
568
569    string referrer_host_path =
570      net::UrlToFilenameEncoder::Encode(referrer, "GET_/");
571
572    FileData* fd1 = GetFileData(string("GET_") + file_url);
573    if (!fd1) {
574      LOG(ERROR) << "Updating headers for unknown url: " << file_url;
575      return;
576    }
577    string url = fd1->headers->GetHeader("X-Original-Url").as_string();
578    string content_type = fd1->headers->GetHeader("Content-Type").as_string();
579    if (content_type.length() == 0) {
580      LOG(ERROR) << "Skipping subresource with unknown content-type";
581      return;
582    }
583
584    // Now, lets see if this is the same host or not
585    bool same_host = (UrlUtilities::GetUrlHost(referrer) ==
586                      UrlUtilities::GetUrlHost(url));
587
588    // This is a hacked algorithm for figuring out what priority
589    // to use with pushed content.
590    int priority = 4;
591    if (content_type.find("css") != string::npos)
592      priority = 1;
593    else if (content_type.find("cript") != string::npos)
594      priority = 1;
595    else if (content_type.find("html") != string::npos)
596      priority = 2;
597
598    LOG(ERROR) << "Attempting update for " << referrer_host_path;
599
600    FileData* fd2 = GetFileData(referrer_host_path);
601    if (fd2 != NULL) {
602      // If they are on the same host, we'll use X-Associated-Content
603      string header_name;
604      string new_value;
605      string delimiter;
606      bool related_files = false;
607      if (same_host && FLAGS_use_xac) {
608        header_name = "X-Associated-Content";
609        char pri_ch = priority + '0';
610        new_value = pri_ch + string("??") + url;
611        delimiter = "||";
612        related_files = true;
613      } else {
614        if (!FLAGS_use_xsub)
615          return;
616        header_name = "X-Subresource";
617        new_value = content_type + "!!" + url;
618        delimiter = "!!";
619      }
620
621      if (fd2->headers->HasNonEmptyHeader(header_name)) {
622        string existing_header =
623            fd2->headers->GetHeader(header_name).as_string();
624        if (existing_header.find(url) != string::npos)
625          return;  // header already recorded
626
627        // Don't let these lists grow too long for low pri stuff.
628        // TODO(mbelshe) We need better algorithms for this.
629        if (existing_header.length() > 256 && priority > 2)
630          return;
631
632        new_value = existing_header + delimiter + new_value;
633      }
634
635      LOG(INFO) << "Recording " << header_name << " for " << new_value;
636      fd2->headers->ReplaceOrAppendHeader(header_name, new_value);
637
638      // Add it to the related files so that it will actually get sent out.
639      if (related_files) {
640        pair<int, string> entry(4, file_url);
641        fd2->related_files.push_back(entry);
642      }
643    } else {
644      LOG(ERROR) << "Failed to update headers:";
645      LOG(ERROR) << "FAIL url: " << url;
646      LOG(ERROR) << "FAIL ref: " << referrer_host_path;
647    }
648  }
649
650  FileData* GetFileData(const string& filename) {
651    Files::iterator fi = files_.end();
652    if (filename.compare(filename.length() - 5, 5, ".html", 5) == 0) {
653      string new_filename(filename.data(), filename.size() - 5);
654      new_filename += ".http";
655      fi = files_.find(new_filename);
656    }
657    if (fi == files_.end())
658      fi = files_.find(filename);
659
660    if (fi == files_.end()) {
661      return NULL;
662    }
663    return &(fi->second);
664  }
665
666  bool AssignFileData(const string& filename, MemCacheIter* mci) {
667    mci->file_data = GetFileData(filename);
668    if (mci->file_data == NULL) {
669      LOG(ERROR) << "Could not find file data for " << filename;
670      return false;
671    }
672    return true;
673  }
674};
675
676////////////////////////////////////////////////////////////////////////////////
677
678class NotifierInterface {
679 public:
680  virtual ~NotifierInterface() {}
681  virtual void Notify() = 0;
682};
683
684////////////////////////////////////////////////////////////////////////////////
685
686class SMInterface {
687 public:
688  virtual size_t ProcessInput(const char* data, size_t len) = 0;
689  virtual bool MessageFullyRead() const = 0;
690  virtual bool Error() const = 0;
691  virtual const char* ErrorAsString() const = 0;
692  virtual void Reset() = 0;
693  virtual void ResetForNewConnection() = 0;
694
695  virtual void PostAcceptHook() = 0;
696
697  virtual void NewStream(uint32 stream_id, uint32 priority,
698                         const string& filename) = 0;
699  virtual void SendEOF(uint32 stream_id) = 0;
700  virtual void SendErrorNotFound(uint32 stream_id) = 0;
701  virtual size_t SendSynStream(uint32 stream_id,
702                              const BalsaHeaders& headers) = 0;
703  virtual size_t SendSynReply(uint32 stream_id,
704                              const BalsaHeaders& headers) = 0;
705  virtual void SendDataFrame(uint32 stream_id, const char* data, int64 len,
706                             uint32 flags, bool compress) = 0;
707  virtual void GetOutput() = 0;
708
709  virtual ~SMInterface() {}
710};
711
712////////////////////////////////////////////////////////////////////////////////
713
714class SMServerConnection;
715typedef SMInterface*(SMInterfaceFactory)(SMServerConnection*);
716
717////////////////////////////////////////////////////////////////////////////////
718
719typedef list<DataFrame> OutputList;
720
721////////////////////////////////////////////////////////////////////////////////
722
723class SMServerConnection;
724
725class SMServerConnectionPoolInterface {
726 public:
727  virtual ~SMServerConnectionPoolInterface() {}
728  // SMServerConnections will use this:
729  virtual void SMServerConnectionDone(SMServerConnection* connection) = 0;
730};
731
732////////////////////////////////////////////////////////////////////////////////
733
734class SMServerConnection: public EpollCallbackInterface,
735                          public NotifierInterface {
736 private:
737  SMServerConnection(SMInterfaceFactory* sm_interface_factory,
738                     MemoryCache* memory_cache,
739                     EpollServer* epoll_server) :
740      fd_(-1),
741      events_(0),
742
743      registered_in_epoll_server_(false),
744      initialized_(false),
745
746      connection_pool_(NULL),
747      epoll_server_(epoll_server),
748
749      read_buffer_(4096*10),
750      memory_cache_(memory_cache),
751      sm_interface_(sm_interface_factory(this)),
752
753      max_bytes_sent_per_dowrite_(4096),
754
755      ssl_(NULL) {}
756
757  int fd_;
758  int events_;
759
760  bool registered_in_epoll_server_;
761  bool initialized_;
762
763  SMServerConnectionPoolInterface* connection_pool_;
764  EpollServer* epoll_server_;
765
766  RingBuffer read_buffer_;
767
768  OutputList output_list_;
769  MemoryCache* memory_cache_;
770  SMInterface* sm_interface_;
771
772  size_t max_bytes_sent_per_dowrite_;
773
774  SSL* ssl_;
775 public:
776  EpollServer* epoll_server() { return epoll_server_; }
777  OutputList* output_list() { return &output_list_; }
778  MemoryCache* memory_cache() { return memory_cache_; }
779  void ReadyToSend() {
780    epoll_server_->SetFDReady(fd_, EPOLLIN | EPOLLOUT);
781  }
782  void EnqueueDataFrame(const DataFrame& df) {
783    output_list_.push_back(df);
784    VLOG(2) << "EnqueueDataFrame. Setting FD ready.";
785    ReadyToSend();
786  }
787
788 public:
789  ~SMServerConnection() {
790    if (initialized()) {
791      Reset();
792    }
793  }
794  static SMServerConnection* NewSMServerConnection(SMInterfaceFactory* smif,
795                                                   MemoryCache* memory_cache,
796                                                   EpollServer* epoll_server) {
797    return new SMServerConnection(smif, memory_cache, epoll_server);
798  }
799
800  bool initialized() const { return initialized_; }
801
802  void InitSMServerConnection(SMServerConnectionPoolInterface* connection_pool,
803                              EpollServer* epoll_server,
804                              int fd) {
805    if (initialized_) {
806      LOG(FATAL) << "Attempted to initialize already initialized server";
807      return;
808    }
809    if (epoll_server_ && registered_in_epoll_server_ && fd_ != -1) {
810      epoll_server_->UnregisterFD(fd_);
811    }
812    if (fd_ != -1) {
813      VLOG(2) << "Closing pre-existing fd";
814      close(fd_);
815      fd_ = -1;
816    }
817
818    fd_ = fd;
819
820    registered_in_epoll_server_ = false;
821    initialized_ = true;
822
823    connection_pool_ = connection_pool;
824    epoll_server_ = epoll_server;
825
826    sm_interface_->Reset();
827    read_buffer_.Clear();
828
829    epoll_server_->RegisterFD(fd_, this, EPOLLIN | EPOLLOUT | EPOLLET);
830
831    if (global_ssl_state) {
832      ssl_ = spdy_new_ssl(global_ssl_state->ssl_ctx);
833      SSL_set_fd(ssl_, fd_);
834      PrintSslError();
835    }
836    sm_interface_->PostAcceptHook();
837  }
838
839  int Send(const char* bytes, int len, int flags) {
840    return send(fd_, bytes, len, flags);
841  }
842
843  // the following are from the EpollCallbackInterface
844  virtual void OnRegistration(EpollServer* eps, int fd, int event_mask) {
845    registered_in_epoll_server_ = true;
846  }
847  virtual void OnModification(int fd, int event_mask) { }
848  virtual void OnEvent(int fd, EpollEvent* event) {
849    events_ |= event->in_events;
850    HandleEvents();
851    if (events_) {
852      event->out_ready_mask = events_;
853      events_ = 0;
854    }
855  }
856  virtual void OnUnregistration(int fd, bool replaced) {
857    registered_in_epoll_server_ = false;
858  }
859  virtual void OnShutdown(EpollServer* eps, int fd) {
860    Cleanup("OnShutdown");
861    return;
862  }
863
864 private:
865  void HandleEvents() {
866    VLOG(1) << "Received: " << EpollServer::EventMaskToString(events_);
867    if (events_ & EPOLLIN) {
868      if (!DoRead())
869        goto handle_close_or_error;
870    }
871
872    if (events_ & EPOLLOUT) {
873      if (!DoWrite())
874        goto handle_close_or_error;
875    }
876
877    if (events_ & (EPOLLHUP | EPOLLERR)) {
878      VLOG(2) << "!!!! Got HUP or ERR";
879      goto handle_close_or_error;
880    }
881    return;
882
883 handle_close_or_error:
884    Cleanup("HandleEvents");
885  }
886
887  bool DoRead() {
888    VLOG(2) << "DoRead()";
889    if (fd_ == -1) {
890      VLOG(2) << "DoRead(): fd_ == -1. Invalid FD. Returning false";
891      return false;
892    }
893    while (!read_buffer_.Full()) {
894      char* bytes;
895      int size;
896      read_buffer_.GetWritablePtr(&bytes, &size);
897      ssize_t bytes_read = 0;
898      if (ssl_) {
899        bytes_read = SSL_read(ssl_, bytes, size);
900        PrintSslError();
901      } else {
902        bytes_read = recv(fd_, bytes, size, MSG_DONTWAIT);
903      }
904      int stored_errno = errno;
905      if (bytes_read == -1) {
906        switch (stored_errno) {
907          case EAGAIN:
908            events_ &= ~EPOLLIN;
909            VLOG(2) << "Got EAGAIN while reading";
910            goto done;
911          case EINTR:
912            VLOG(2) << "Got EINTR while reading";
913            continue;
914          default:
915            VLOG(2) << "While calling recv, got error: " << stored_errno
916              << " " << strerror(stored_errno);
917            goto error_or_close;
918        }
919      } else if (bytes_read > 0) {
920        VLOG(2) << "Read: " << bytes_read << " bytes from fd: " << fd_;
921        read_buffer_.AdvanceWritablePtr(bytes_read);
922        if (!DoConsumeReadData()) {
923          goto error_or_close;
924        }
925        continue;
926      } else {  // bytes_read == 0
927        VLOG(2) << "0 bytes read with recv call.";
928      }
929      goto error_or_close;
930    }
931   done:
932    return true;
933
934   error_or_close:
935    VLOG(2) << "DoRead(): error_or_close. Cleaning up, then returning false";
936    Cleanup("DoRead");
937    return false;
938  }
939
940  bool DoConsumeReadData() {
941    char* bytes;
942    int size;
943    read_buffer_.GetReadablePtr(&bytes, &size);
944    while (size != 0) {
945      size_t bytes_consumed = sm_interface_->ProcessInput(bytes, size);
946      VLOG(2) << "consumed: " << bytes_consumed << " from socket fd: " << fd_;
947      if (bytes_consumed == 0) {
948        break;
949      }
950      read_buffer_.AdvanceReadablePtr(bytes_consumed);
951      if (sm_interface_->MessageFullyRead()) {
952        VLOG(2) << "HandleRequestFullyRead";
953        HandleRequestFullyRead();
954        sm_interface_->Reset();
955        events_ |= EPOLLOUT;
956      } else if (sm_interface_->Error()) {
957        LOG(ERROR) << "Framer error detected: "
958                   << sm_interface_->ErrorAsString();
959        // this causes everything to be closed/cleaned up.
960        events_ |= EPOLLOUT;
961        return false;
962      }
963      read_buffer_.GetReadablePtr(&bytes, &size);
964    }
965    return true;
966  }
967
968  void WriteResponse() {
969    // this happens asynchronously from separate threads
970    // feeding files into the output buffer.
971  }
972
973  void HandleRequestFullyRead() {
974  }
975
976  void Notify() {
977  }
978
979  bool DoWrite() {
980    size_t bytes_sent = 0;
981    int flags = MSG_NOSIGNAL | MSG_DONTWAIT;
982    if (fd_ == -1) {
983      VLOG(2) << "DoWrite: fd == -1. Returning false.";
984      return false;
985    }
986    if (output_list_.empty()) {
987      sm_interface_->GetOutput();
988      if (output_list_.empty())
989        events_ &= ~EPOLLOUT;
990    }
991    while (!output_list_.empty()) {
992      if (bytes_sent >= max_bytes_sent_per_dowrite_) {
993        events_ |= EPOLLOUT;
994        break;
995      }
996      if (output_list_.size() < 2) {
997        sm_interface_->GetOutput();
998      }
999      DataFrame& data_frame = output_list_.front();
1000      const char*  bytes = data_frame.data;
1001      int size = data_frame.size;
1002      bytes += data_frame.index;
1003      size -= data_frame.index;
1004      DCHECK_GE(size, 0);
1005      if (size <= 0) {
1006        data_frame.MaybeDelete();
1007        output_list_.pop_front();
1008        continue;
1009      }
1010
1011      flags = MSG_NOSIGNAL | MSG_DONTWAIT;
1012      if (output_list_.size() > 1) {
1013        flags |= MSG_MORE;
1014      }
1015      ssize_t bytes_written = 0;
1016      if (ssl_) {
1017        bytes_written = SSL_write(ssl_, bytes, size);
1018        PrintSslError();
1019      } else {
1020        bytes_written = send(fd_, bytes, size, flags);
1021      }
1022      int stored_errno = errno;
1023      if (bytes_written == -1) {
1024        switch (stored_errno) {
1025          case EAGAIN:
1026            events_ &= ~EPOLLOUT;
1027            VLOG(2) << " Got EAGAIN while writing";
1028            goto done;
1029          case EINTR:
1030            VLOG(2) << " Got EINTR while writing";
1031            continue;
1032          default:
1033            VLOG(2) << "While calling send, got error: " << stored_errno
1034              << " " << strerror(stored_errno);
1035            goto error_or_close;
1036        }
1037      } else if (bytes_written > 0) {
1038        VLOG(1) << "Wrote: " << bytes_written  << " bytes to socket fd: "
1039          << fd_;
1040        data_frame.index += bytes_written;
1041        bytes_sent += bytes_written;
1042        continue;
1043      }
1044      VLOG(2) << "0 bytes written to socket " << fd_ << " with send call.";
1045      goto error_or_close;
1046    }
1047   done:
1048    return true;
1049
1050   error_or_close:
1051    VLOG(2) << "DoWrite: error_or_close. Returning false after cleaning up";
1052    Cleanup("DoWrite");
1053    return false;
1054  }
1055
1056  friend ostream& operator<<(ostream& os, const SMServerConnection& c) {
1057    os << &c << "\n";
1058    return os;
1059  }
1060
1061  void Reset() {
1062    VLOG(2) << "Resetting";
1063    if (ssl_) {
1064      SSL_shutdown(ssl_);
1065      PrintSslError();
1066      SSL_free(ssl_);
1067      PrintSslError();
1068    }
1069    if (registered_in_epoll_server_) {
1070      epoll_server_->UnregisterFD(fd_);
1071      registered_in_epoll_server_ = false;
1072    }
1073    if (fd_ >= 0) {
1074      VLOG(2) << "Closing connection";
1075      close(fd_);
1076      fd_ = -1;
1077    }
1078    sm_interface_->ResetForNewConnection();
1079    read_buffer_.Clear();
1080    initialized_ = false;
1081    events_ = 0;
1082    output_list_.clear();
1083  }
1084
1085  void Cleanup(const char* cleanup) {
1086    VLOG(2) << "Cleaning up: " << cleanup;
1087    if (!initialized_) {
1088      return;
1089    }
1090    Reset();
1091    connection_pool_->SMServerConnectionDone(this);
1092  }
1093};
1094
1095////////////////////////////////////////////////////////////////////////////////
1096
1097class OutputOrdering {
1098 public:
1099  typedef list<MemCacheIter> PriorityRing;
1100
1101  typedef map<uint32, PriorityRing> PriorityMap;
1102
1103  struct PriorityMapPointer {
1104    PriorityMapPointer(): ring(NULL), alarm_enabled(false) {}
1105    PriorityRing* ring;
1106    PriorityRing::iterator it;
1107    bool alarm_enabled;
1108    EpollServer::AlarmRegToken alarm_token;
1109  };
1110  typedef map<uint32, PriorityMapPointer> StreamIdToPriorityMap;
1111
1112  StreamIdToPriorityMap stream_ids_;
1113  PriorityMap priority_map_;
1114  PriorityRing first_data_senders_;
1115  uint32 first_data_senders_threshold_;  // when you've passed this, you're no
1116                                         // longer a first_data_sender...
1117  SMServerConnection* connection_;
1118  EpollServer* epoll_server_;
1119
1120  explicit OutputOrdering(SMServerConnection* connection) :
1121      first_data_senders_threshold_(kInitialDataSendersThreshold),
1122      connection_(connection),
1123      epoll_server_(connection->epoll_server()) {
1124  }
1125
1126  void Reset() {
1127    while (!stream_ids_.empty()) {
1128      StreamIdToPriorityMap::iterator sitpmi = stream_ids_.begin();
1129      PriorityMapPointer& pmp = sitpmi->second;
1130      if (pmp.alarm_enabled) {
1131        epoll_server_->UnregisterAlarm(pmp.alarm_token);
1132      }
1133      stream_ids_.erase(sitpmi);
1134    }
1135    priority_map_.clear();
1136    first_data_senders_.clear();
1137  }
1138
1139  bool ExistsInPriorityMaps(uint32 stream_id) {
1140    StreamIdToPriorityMap::iterator sitpmi = stream_ids_.find(stream_id);
1141    return sitpmi != stream_ids_.end();
1142  }
1143
1144  struct BeginOutputtingAlarm : public EpollAlarmCallbackInterface {
1145   public:
1146    BeginOutputtingAlarm(OutputOrdering* oo,
1147                         OutputOrdering::PriorityMapPointer* pmp,
1148                         const MemCacheIter& mci) :
1149        output_ordering_(oo), pmp_(pmp), mci_(mci), epoll_server_(NULL) {}
1150
1151    int64 OnAlarm() {
1152      OnUnregistration();
1153      output_ordering_->MoveToActive(pmp_, mci_);
1154      VLOG(1) << "ON ALARM! Should now start to output...";
1155      delete this;
1156      return 0;
1157    }
1158    void OnRegistration(const EpollServer::AlarmRegToken& tok,
1159                        EpollServer* eps) {
1160      epoll_server_ = eps;
1161      pmp_->alarm_token = tok;
1162      pmp_->alarm_enabled = true;
1163    }
1164    void OnUnregistration() {
1165      pmp_->alarm_enabled = false;
1166    }
1167    void OnShutdown(EpollServer* eps) {
1168      OnUnregistration();
1169    }
1170    ~BeginOutputtingAlarm() {
1171      if (epoll_server_ && pmp_->alarm_enabled)
1172        epoll_server_->UnregisterAlarm(pmp_->alarm_token);
1173    }
1174   private:
1175    OutputOrdering* output_ordering_;
1176    OutputOrdering::PriorityMapPointer* pmp_;
1177    MemCacheIter mci_;
1178    EpollServer* epoll_server_;
1179  };
1180
1181  void MoveToActive(PriorityMapPointer* pmp, MemCacheIter mci) {
1182    VLOG(1) <<"Moving to active!";
1183    first_data_senders_.push_back(mci);
1184    pmp->ring = &first_data_senders_;
1185    pmp->it = first_data_senders_.end();
1186    --pmp->it;
1187    connection_->ReadyToSend();
1188  }
1189
1190  void AddToOutputOrder(const MemCacheIter& mci) {
1191    if (ExistsInPriorityMaps(mci.stream_id))
1192      LOG(FATAL) << "OOps, already was inserted here?!";
1193
1194    double think_time_in_s = FLAGS_server_think_time_in_s;
1195    string x_server_latency =
1196      mci.file_data->headers->GetHeader("X-Server-Latency").as_string();
1197    if (x_server_latency.size() != 0) {
1198      char* endp;
1199      double tmp_think_time_in_s = strtod(x_server_latency.c_str(), &endp);
1200      if (endp != x_server_latency.c_str() + x_server_latency.size()) {
1201        LOG(ERROR) << "Unable to understand X-Server-Latency of: "
1202          << x_server_latency << " for resource: " << mci.file_data->filename;
1203      } else {
1204        think_time_in_s = tmp_think_time_in_s;
1205      }
1206    }
1207    StreamIdToPriorityMap::iterator sitpmi;
1208    sitpmi = stream_ids_.insert(
1209        pair<uint32, PriorityMapPointer>(mci.stream_id,
1210                                         PriorityMapPointer())).first;
1211    PriorityMapPointer& pmp = sitpmi->second;
1212
1213    BeginOutputtingAlarm* boa = new BeginOutputtingAlarm(this, &pmp, mci);
1214    VLOG(2) << "Server think time: " << think_time_in_s;
1215    epoll_server_->RegisterAlarmApproximateDelta(
1216        think_time_in_s * 1000000, boa);
1217  }
1218
1219  void SpliceToPriorityRing(PriorityRing::iterator pri) {
1220    MemCacheIter& mci = *pri;
1221    PriorityMap::iterator pmi = priority_map_.find(mci.priority);
1222    if (pmi == priority_map_.end()) {
1223      pmi = priority_map_.insert(
1224          pair<uint32, PriorityRing>(mci.priority, PriorityRing())).first;
1225    }
1226
1227    pmi->second.splice(pmi->second.end(),
1228                       first_data_senders_,
1229                       pri);
1230    StreamIdToPriorityMap::iterator sitpmi = stream_ids_.find(mci.stream_id);
1231    sitpmi->second.ring = &(pmi->second);
1232  }
1233
1234  MemCacheIter* GetIter() {
1235    while (!first_data_senders_.empty()) {
1236      MemCacheIter& mci = first_data_senders_.front();
1237      if (mci.bytes_sent >= first_data_senders_threshold_) {
1238        SpliceToPriorityRing(first_data_senders_.begin());
1239      } else {
1240        first_data_senders_.splice(first_data_senders_.end(),
1241                                  first_data_senders_,
1242                                  first_data_senders_.begin());
1243        mci.max_segment_size = kInitialDataSendersThreshold;
1244        return &mci;
1245      }
1246    }
1247    while (!priority_map_.empty()) {
1248      PriorityRing& first_ring = priority_map_.begin()->second;
1249      if (first_ring.empty()) {
1250        priority_map_.erase(priority_map_.begin());
1251        continue;
1252      }
1253      MemCacheIter& mci = first_ring.front();
1254      first_ring.splice(first_ring.end(),
1255                        first_ring,
1256                        first_ring.begin());
1257      mci.max_segment_size = kNormalSegmentSize;
1258      return &mci;
1259    }
1260    return NULL;
1261  }
1262
1263  void RemoveStreamId(uint32 stream_id) {
1264    StreamIdToPriorityMap::iterator sitpmi = stream_ids_.find(stream_id);
1265    if (sitpmi == stream_ids_.end())
1266      return;
1267    PriorityMapPointer& pmp = sitpmi->second;
1268    if (pmp.alarm_enabled) {
1269      epoll_server_->UnregisterAlarm(pmp.alarm_token);
1270    } else {
1271      pmp.ring->erase(pmp.it);
1272    }
1273
1274    stream_ids_.erase(sitpmi);
1275  }
1276};
1277
1278////////////////////////////////////////////////////////////////////////////////
1279
1280class SpdySM : public SpdyFramerVisitorInterface, public SMInterface {
1281 private:
1282  uint64 seq_num_;
1283  SpdyFramer* framer_;
1284
1285  SMServerConnection* connection_;
1286  OutputList* output_list_;
1287  OutputOrdering output_ordering_;
1288  MemoryCache* memory_cache_;
1289  uint32 next_outgoing_stream_id_;
1290 public:
1291  explicit SpdySM(SMServerConnection* connection) :
1292      seq_num_(0),
1293      framer_(new SpdyFramer),
1294      connection_(connection),
1295      output_list_(connection->output_list()),
1296      output_ordering_(connection),
1297      memory_cache_(connection->memory_cache()),
1298      next_outgoing_stream_id_(2) {
1299    framer_->set_visitor(this);
1300  }
1301 private:
1302  virtual void OnError(SpdyFramer* framer) {
1303    /* do nothing with this right now */
1304  }
1305
1306  virtual void OnControl(const SpdyControlFrame* frame) {
1307    SpdyHeaderBlock headers;
1308    bool parsed_headers = false;
1309    switch (frame->type()) {
1310      case SYN_STREAM:
1311        {
1312        const SpdySynStreamControlFrame* syn_stream =
1313            reinterpret_cast<const SpdySynStreamControlFrame*>(frame);
1314        parsed_headers = framer_->ParseHeaderBlock(frame, &headers);
1315        VLOG(2) << "OnSyn(" << syn_stream->stream_id() << ")";
1316        VLOG(2) << "headers parsed?: " << (parsed_headers? "yes": "no");
1317        if (parsed_headers) {
1318          VLOG(2) << "# headers: " << headers.size();
1319        }
1320        for (SpdyHeaderBlock::iterator i = headers.begin();
1321             i != headers.end();
1322             ++i) {
1323          VLOG(2) << i->first << ": " << i->second;
1324        }
1325
1326        SpdyHeaderBlock::iterator method = headers.find("method");
1327        SpdyHeaderBlock::iterator url = headers.find("url");
1328        if (url == headers.end() || method == headers.end()) {
1329          VLOG(2) << "didn't find method or url or method. Not creating stream";
1330          break;
1331        }
1332
1333        SpdyHeaderBlock::iterator referer = headers.find("referer");
1334        if (referer != headers.end() && method->second == "GET") {
1335          memory_cache_->UpdateHeaders(referer->second, url->second);
1336        }
1337        string uri = UrlUtilities::GetUrlPath(url->second);
1338        string host = UrlUtilities::GetUrlHost(url->second);
1339
1340        string filename = EncodeURL(uri, host, method->second);
1341        NewStream(syn_stream->stream_id(),
1342                  reinterpret_cast<const SpdySynStreamControlFrame*>(frame)->
1343                    priority(),
1344                  filename);
1345        }
1346        break;
1347
1348      case SYN_REPLY:
1349        parsed_headers = framer_->ParseHeaderBlock(frame, &headers);
1350        VLOG(2) << "OnSynReply("
1351                << reinterpret_cast<const SpdySynReplyControlFrame*>(
1352                    frame)->stream_id() << ")";
1353        break;
1354      case RST_STREAM:
1355        {
1356        const SpdyRstStreamControlFrame* rst_stream =
1357            reinterpret_cast<const SpdyRstStreamControlFrame*>(frame);
1358        VLOG(2) << "OnRst(" << rst_stream->stream_id() << ")";
1359        output_ordering_.RemoveStreamId(rst_stream ->stream_id());
1360        }
1361        break;
1362
1363      default:
1364        LOG(DFATAL) << "Unknown control frame type";
1365    }
1366  }
1367  virtual void OnStreamFrameData(
1368    SpdyStreamId stream_id,
1369    const char* data, size_t len) {
1370    VLOG(2) << "StreamData(" << stream_id << ", [" << len << "])";
1371    /* do nothing with this right now */
1372  }
1373  virtual void OnLameDuck() {
1374    /* do nothing with this right now */
1375  }
1376
1377 public:
1378  ~SpdySM() {
1379    Reset();
1380  }
1381  size_t ProcessInput(const char* data, size_t len) {
1382    return framer_->ProcessInput(data, len);
1383  }
1384
1385  bool MessageFullyRead() const {
1386    return framer_->MessageFullyRead();
1387  }
1388
1389  bool Error() const {
1390    return framer_->HasError();
1391  }
1392
1393  const char* ErrorAsString() const {
1394    return SpdyFramer::ErrorCodeToString(framer_->error_code());
1395  }
1396
1397  void Reset() {}
1398  void ResetForNewConnection() {
1399    // seq_num is not cleared, intentionally.
1400    delete framer_;
1401    framer_ = new SpdyFramer;
1402    framer_->set_visitor(this);
1403    output_ordering_.Reset();
1404    next_outgoing_stream_id_ = 2;
1405  }
1406
1407  // Send a couple of NOOP packets to force opening of cwnd.
1408  void PostAcceptHook() {
1409    if (!FLAGS_use_cwnd_opener)
1410      return;
1411
1412    // We send 2 because that is the initial cwnd, and also because
1413    // we have to in order to get an ACK back from the client due to
1414    // delayed ACK.
1415    const int kPkts = 2;
1416
1417    LOG(ERROR) << "Sending NOP FRAMES";
1418
1419    scoped_ptr<SpdyControlFrame> frame(SpdyFramer::CreateNopFrame());
1420    for (int i = 0; i < kPkts; ++i) {
1421      char* bytes = frame->data();
1422      size_t size = SpdyFrame::size();
1423      ssize_t bytes_written = connection_->Send(bytes, size, MSG_DONTWAIT);
1424      if (static_cast<size_t>(bytes_written) != size) {
1425        LOG(ERROR) << "Trouble sending Nop packet! (" << errno << ")";
1426        if (errno == EAGAIN)
1427          break;
1428      }
1429    }
1430  }
1431
1432  void AddAssociatedContent(FileData* file_data) {
1433    for (unsigned int i = 0; i < file_data->related_files.size(); ++i) {
1434      pair<int, string>& related_file = file_data->related_files[i];
1435      MemCacheIter mci;
1436      string filename  = "GET_";
1437      filename += related_file.second;
1438      if (!memory_cache_->AssignFileData(filename, &mci)) {
1439        VLOG(1) << "Unable to find associated content for: " << filename;
1440        continue;
1441      }
1442      VLOG(1) << "Adding associated content: " << filename;
1443      mci.stream_id = next_outgoing_stream_id_;
1444      next_outgoing_stream_id_ += 2;
1445      mci.priority =  related_file.first;
1446      AddToOutputOrder(mci);
1447    }
1448  }
1449
1450  void NewStream(uint32 stream_id, uint32 priority, const string& filename) {
1451    MemCacheIter mci;
1452    mci.stream_id = stream_id;
1453    mci.priority = priority;
1454    if (!memory_cache_->AssignFileData(filename, &mci)) {
1455      // error creating new stream.
1456      VLOG(2) << "Sending ErrorNotFound";
1457      SendErrorNotFound(stream_id);
1458    } else {
1459      AddToOutputOrder(mci);
1460      if (FLAGS_use_xac) {
1461        AddAssociatedContent(mci.file_data);
1462      }
1463    }
1464  }
1465
1466  void AddToOutputOrder(const MemCacheIter& mci) {
1467    output_ordering_.AddToOutputOrder(mci);
1468  }
1469
1470  void SendEOF(uint32 stream_id) {
1471    SendEOFImpl(stream_id);
1472  }
1473
1474  void SendErrorNotFound(uint32 stream_id) {
1475    SendErrorNotFoundImpl(stream_id);
1476  }
1477
1478  void SendOKResponse(uint32 stream_id, string* output) {
1479    SendOKResponseImpl(stream_id, output);
1480  }
1481
1482  size_t SendSynStream(uint32 stream_id, const BalsaHeaders& headers) {
1483    return SendSynStreamImpl(stream_id, headers);
1484  }
1485
1486  size_t SendSynReply(uint32 stream_id, const BalsaHeaders& headers) {
1487    return SendSynReplyImpl(stream_id, headers);
1488  }
1489
1490  void SendDataFrame(uint32 stream_id, const char* data, int64 len,
1491                     uint32 flags, bool compress) {
1492    SpdyDataFlags spdy_flags = static_cast<SpdyDataFlags>(flags);
1493    SendDataFrameImpl(stream_id, data, len, spdy_flags, compress);
1494  }
1495
1496  SpdyFramer* spdy_framer() { return framer_; }
1497
1498 private:
1499  void SendEOFImpl(uint32 stream_id) {
1500    SendDataFrame(stream_id, NULL, 0, DATA_FLAG_FIN, false);
1501    VLOG(2) << "Sending EOF: " << stream_id;
1502    KillStream(stream_id);
1503  }
1504
1505  void SendErrorNotFoundImpl(uint32 stream_id) {
1506    BalsaHeaders my_headers;
1507    my_headers.SetFirstlineFromStringPieces("HTTP/1.1", "404", "Not Found");
1508    SendSynReplyImpl(stream_id, my_headers);
1509    SendDataFrame(stream_id, "wtf?", 4, DATA_FLAG_FIN, false);
1510    output_ordering_.RemoveStreamId(stream_id);
1511  }
1512
1513  void SendOKResponseImpl(uint32 stream_id, string* output) {
1514    BalsaHeaders my_headers;
1515    my_headers.SetFirstlineFromStringPieces("HTTP/1.1", "200", "OK");
1516    SendSynReplyImpl(stream_id, my_headers);
1517    SendDataFrame(
1518        stream_id, output->c_str(), output->size(), DATA_FLAG_FIN, false);
1519    output_ordering_.RemoveStreamId(stream_id);
1520  }
1521
1522  void KillStream(uint32 stream_id) {
1523    output_ordering_.RemoveStreamId(stream_id);
1524  }
1525
1526  void CopyHeaders(SpdyHeaderBlock& dest, const BalsaHeaders& headers) {
1527    for (BalsaHeaders::const_header_lines_iterator hi =
1528         headers.header_lines_begin();
1529         hi != headers.header_lines_end();
1530         ++hi) {
1531      SpdyHeaderBlock::iterator fhi = dest.find(hi->first.as_string());
1532      if (fhi == dest.end()) {
1533        dest[hi->first.as_string()] = hi->second.as_string();
1534      } else {
1535        dest[hi->first.as_string()] = (
1536            string(fhi->second.data(), fhi->second.size()) + "," +
1537            string(hi->second.data(), hi->second.size()));
1538      }
1539    }
1540
1541    // These headers have no value
1542    dest.erase("X-Associated-Content");  // TODO(mbelshe): case-sensitive
1543    dest.erase("X-Original-Url");  // TODO(mbelshe): case-sensitive
1544  }
1545
1546  size_t SendSynStreamImpl(uint32 stream_id, const BalsaHeaders& headers) {
1547    SpdyHeaderBlock block;
1548    block["method"] = headers.request_method().as_string();
1549    if (!headers.HasHeader("status"))
1550      block["status"] = headers.response_code().as_string();
1551    if (!headers.HasHeader("version"))
1552      block["version"] =headers.response_version().as_string();
1553    if (headers.HasHeader("X-Original-Url")) {
1554      string original_url = headers.GetHeader("X-Original-Url").as_string();
1555      block["path"] = UrlUtilities::GetUrlPath(original_url);
1556    } else {
1557      block["path"] = headers.request_uri().as_string();
1558    }
1559    CopyHeaders(block, headers);
1560
1561    SpdySynStreamControlFrame* fsrcf =
1562      framer_->CreateSynStream(stream_id, 0, 0, CONTROL_FLAG_NONE, true,
1563                               &block);
1564    DataFrame df;
1565    df.size = fsrcf->length() + SpdyFrame::size();
1566    size_t df_size = df.size;
1567    df.data = fsrcf->data();
1568    df.delete_when_done = true;
1569    EnqueueDataFrame(df);
1570
1571    VLOG(2) << "Sending SynStreamheader " << stream_id;
1572    return df_size;
1573  }
1574
1575  size_t SendSynReplyImpl(uint32 stream_id, const BalsaHeaders& headers) {
1576    SpdyHeaderBlock block;
1577    CopyHeaders(block, headers);
1578    block["status"] = headers.response_code().as_string() + " " +
1579                      headers.response_reason_phrase().as_string();
1580    block["version"] = headers.response_version().as_string();
1581
1582    SpdySynReplyControlFrame* fsrcf =
1583      framer_->CreateSynReply(stream_id, CONTROL_FLAG_NONE, true, &block);
1584    DataFrame df;
1585    df.size = fsrcf->length() + SpdyFrame::size();
1586    size_t df_size = df.size;
1587    df.data = fsrcf->data();
1588    df.delete_when_done = true;
1589    EnqueueDataFrame(df);
1590
1591    VLOG(2) << "Sending SynReplyheader " << stream_id;
1592    return df_size;
1593  }
1594
1595  void SendDataFrameImpl(uint32 stream_id, const char* data, int64 len,
1596                         SpdyDataFlags flags, bool compress) {
1597    // Force compression off if disabled via command line.
1598    if (!FLAGS_use_compression)
1599      flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_COMPRESSED);
1600
1601    // TODO(mbelshe):  We can't compress here - before going into the
1602    //                 priority queue.  Compression needs to be done
1603    //                 with late binding.
1604    SpdyDataFrame* fdf = framer_->CreateDataFrame(stream_id, data, len,
1605                                                  flags);
1606    DataFrame df;
1607    df.size = fdf->length() + SpdyFrame::size();
1608    df.data = fdf->data();
1609    df.delete_when_done = true;
1610    EnqueueDataFrame(df);
1611
1612    VLOG(2) << "Sending data frame" << stream_id << " [" << len << "]"
1613            << " shrunk to " << fdf->length();
1614  }
1615
1616  void EnqueueDataFrame(const DataFrame& df) {
1617    connection_->EnqueueDataFrame(df);
1618  }
1619
1620  void GetOutput() {
1621    while (output_list_->size() < 2) {
1622      MemCacheIter* mci = output_ordering_.GetIter();
1623      if (mci == NULL) {
1624        VLOG(2) << "GetOutput: nothing to output!?";
1625        return;
1626      }
1627      if (!mci->transformed_header) {
1628        mci->transformed_header = true;
1629        VLOG(2) << "GetOutput transformed header stream_id: ["
1630          << mci->stream_id << "]";
1631        if ((mci->stream_id % 2) == 0) {
1632          // this is a server initiated stream.
1633          // Ideally, we'd do a 'syn-push' here, instead of a syn-reply.
1634          BalsaHeaders headers;
1635          headers.CopyFrom(*(mci->file_data->headers));
1636          headers.ReplaceOrAppendHeader("status", "200");
1637          headers.ReplaceOrAppendHeader("version", "http/1.1");
1638          headers.SetRequestFirstlineFromStringPieces("PUSH",
1639                                                      mci->file_data->filename,
1640                                                      "");
1641          mci->bytes_sent = SendSynStream(mci->stream_id, headers);
1642        } else {
1643          BalsaHeaders headers;
1644          headers.CopyFrom(*(mci->file_data->headers));
1645          mci->bytes_sent = SendSynReply(mci->stream_id, headers);
1646        }
1647        return;
1648      }
1649      if (mci->body_bytes_consumed >= mci->file_data->body.size()) {
1650        VLOG(2) << "GetOutput remove_stream_id: [" << mci->stream_id << "]";
1651        SendEOF(mci->stream_id);
1652        return;
1653      }
1654      size_t num_to_write =
1655        mci->file_data->body.size() - mci->body_bytes_consumed;
1656      if (num_to_write > mci->max_segment_size)
1657        num_to_write = mci->max_segment_size;
1658
1659      bool should_compress = false;
1660      if (!mci->file_data->headers->HasHeader("content-encoding")) {
1661        if (mci->file_data->headers->HasHeader("content-type")) {
1662          string content_type =
1663              mci->file_data->headers->GetHeader("content-type").as_string();
1664          if (content_type.find("image") == content_type.npos)
1665            should_compress = true;
1666        }
1667      }
1668
1669      SendDataFrame(mci->stream_id,
1670                    mci->file_data->body.data() + mci->body_bytes_consumed,
1671                    num_to_write, 0, should_compress);
1672      VLOG(2) << "GetOutput SendDataFrame[" << mci->stream_id
1673        << "]: " << num_to_write;
1674      mci->body_bytes_consumed += num_to_write;
1675      mci->bytes_sent += num_to_write;
1676    }
1677  }
1678};
1679
1680////////////////////////////////////////////////////////////////////////////////
1681
1682class HTTPSM : public BalsaVisitorInterface, public SMInterface {
1683 private:
1684  uint64 seq_num_;
1685  BalsaFrame* framer_;
1686  BalsaHeaders headers_;
1687  uint32 stream_id_;
1688
1689  SMServerConnection* connection_;
1690  OutputList* output_list_;
1691  OutputOrdering output_ordering_;
1692  MemoryCache* memory_cache_;
1693 public:
1694  explicit HTTPSM(SMServerConnection* connection) :
1695      seq_num_(0),
1696      framer_(new BalsaFrame),
1697      stream_id_(1),
1698      connection_(connection),
1699      output_list_(connection->output_list()),
1700      output_ordering_(connection),
1701      memory_cache_(connection->memory_cache()) {
1702    framer_->set_balsa_visitor(this);
1703    framer_->set_balsa_headers(&headers_);
1704  }
1705 private:
1706  typedef map<string, uint32> ClientTokenMap;
1707 private:
1708    virtual void ProcessBodyInput(const char *input, size_t size) {
1709    }
1710    virtual void ProcessBodyData(const char *input, size_t size) {
1711      // ignoring this.
1712    }
1713    virtual void ProcessHeaderInput(const char *input, size_t size) {
1714    }
1715    virtual void ProcessTrailerInput(const char *input, size_t size) {}
1716    virtual void ProcessHeaders(const BalsaHeaders& headers) {
1717      VLOG(2) << "Got new request!";
1718      string host = UrlUtilities::GetUrlHost(
1719          headers.GetHeader("Host").as_string());
1720      string method = headers.request_method().as_string();
1721      string filename = EncodeURL(headers.request_uri().as_string(), host,
1722          method);
1723      NewStream(stream_id_, 0, filename);
1724      stream_id_ += 2;
1725    }
1726    virtual void ProcessRequestFirstLine(const char* line_input,
1727                                         size_t line_length,
1728                                         const char* method_input,
1729                                         size_t method_length,
1730                                         const char* request_uri_input,
1731                                         size_t request_uri_length,
1732                                         const char* version_input,
1733                                         size_t version_length) {}
1734    virtual void ProcessResponseFirstLine(const char *line_input,
1735                                          size_t line_length,
1736                                          const char *version_input,
1737                                          size_t version_length,
1738                                          const char *status_input,
1739                                          size_t status_length,
1740                                          const char *reason_input,
1741                                          size_t reason_length) {}
1742    virtual void ProcessChunkLength(size_t chunk_length) {}
1743    virtual void ProcessChunkExtensions(const char *input, size_t size) {}
1744    virtual void HeaderDone() {}
1745    virtual void MessageDone() {
1746      VLOG(2) << "MessageDone!";
1747    }
1748    virtual void HandleHeaderError(BalsaFrame* framer) {
1749      HandleError();
1750    }
1751    virtual void HandleHeaderWarning(BalsaFrame* framer) {}
1752    virtual void HandleChunkingError(BalsaFrame* framer) {
1753      HandleError();
1754    }
1755    virtual void HandleBodyError(BalsaFrame* framer) {
1756      HandleError();
1757    }
1758
1759    void HandleError() {
1760      VLOG(2) << "Error detected";
1761    }
1762
1763 public:
1764  ~HTTPSM() {
1765    Reset();
1766  }
1767  size_t ProcessInput(const char* data, size_t len) {
1768    return framer_->ProcessInput(data, len);
1769  }
1770
1771  bool MessageFullyRead() const {
1772    return framer_->MessageFullyRead();
1773  }
1774
1775  bool Error() const {
1776    return framer_->Error();
1777  }
1778
1779  const char* ErrorAsString() const {
1780    return BalsaFrameEnums::ErrorCodeToString(framer_->ErrorCode());
1781  }
1782
1783  void Reset() {
1784    framer_->Reset();
1785  }
1786
1787  void ResetForNewConnection() {
1788    seq_num_ = 0;
1789    output_ordering_.Reset();
1790    framer_->Reset();
1791  }
1792
1793  void PostAcceptHook() {
1794  }
1795
1796  void NewStream(uint32 stream_id, uint32 priority, const string& filename) {
1797    MemCacheIter mci;
1798    mci.stream_id = stream_id;
1799    mci.priority = priority;
1800    if (!memory_cache_->AssignFileData(filename, &mci)) {
1801      SendErrorNotFound(stream_id);
1802    } else {
1803      AddToOutputOrder(mci);
1804    }
1805  }
1806
1807  void AddToOutputOrder(const MemCacheIter& mci) {
1808    output_ordering_.AddToOutputOrder(mci);
1809  }
1810
1811  void SendEOF(uint32 stream_id) {
1812    SendEOFImpl(stream_id);
1813  }
1814
1815  void SendErrorNotFound(uint32 stream_id) {
1816    SendErrorNotFoundImpl(stream_id);
1817  }
1818
1819  void SendOKResponse(uint32 stream_id, string* output) {
1820    SendOKResponseImpl(stream_id, output);
1821  }
1822
1823  size_t SendSynStream(uint32 stream_id, const BalsaHeaders& headers) {
1824    return 0;
1825  }
1826
1827  size_t SendSynReply(uint32 stream_id, const BalsaHeaders& headers) {
1828    return SendSynReplyImpl(stream_id, headers);
1829  }
1830
1831  void SendDataFrame(uint32 stream_id, const char* data, int64 len,
1832                     uint32 flags, bool compress) {
1833    SendDataFrameImpl(stream_id, data, len, flags, compress);
1834  }
1835
1836  BalsaFrame* spdy_framer() { return framer_; }
1837
1838 private:
1839  void SendEOFImpl(uint32 stream_id) {
1840    DataFrame df;
1841    df.data = "0\r\n\r\n";
1842    df.size = 5;
1843    df.delete_when_done = false;
1844    EnqueueDataFrame(df);
1845  }
1846
1847  void SendErrorNotFoundImpl(uint32 stream_id) {
1848    BalsaHeaders my_headers;
1849    my_headers.SetFirstlineFromStringPieces("HTTP/1.1", "404", "Not Found");
1850    my_headers.RemoveAllOfHeader("content-length");
1851    my_headers.AppendHeader("transfer-encoding", "chunked");
1852    SendSynReplyImpl(stream_id, my_headers);
1853    SendDataFrame(stream_id, "wtf?", 4, 0, false);
1854    SendEOFImpl(stream_id);
1855    output_ordering_.RemoveStreamId(stream_id);
1856  }
1857
1858  void SendOKResponseImpl(uint32 stream_id, string* output) {
1859    BalsaHeaders my_headers;
1860    my_headers.SetFirstlineFromStringPieces("HTTP/1.1", "200", "OK");
1861    my_headers.RemoveAllOfHeader("content-length");
1862    my_headers.AppendHeader("transfer-encoding", "chunked");
1863    SendSynReplyImpl(stream_id, my_headers);
1864    SendDataFrame(stream_id, output->c_str(), output->size(), 0, false);
1865    SendEOFImpl(stream_id);
1866    output_ordering_.RemoveStreamId(stream_id);
1867  }
1868
1869  size_t SendSynReplyImpl(uint32 stream_id, const BalsaHeaders& headers) {
1870    SimpleBuffer sb;
1871    headers.WriteHeaderAndEndingToBuffer(&sb);
1872    DataFrame df;
1873    df.size = sb.ReadableBytes();
1874    char* buffer = new char[df.size];
1875    df.data = buffer;
1876    df.delete_when_done = true;
1877    sb.Read(buffer, df.size);
1878    VLOG(2) << "******************Sending HTTP Reply header " << stream_id;
1879    size_t df_size = df.size;
1880    EnqueueDataFrame(df);
1881    return df_size;
1882  }
1883
1884  size_t SendSynStreamImpl(uint32 stream_id, const BalsaHeaders& headers) {
1885    SimpleBuffer sb;
1886    headers.WriteHeaderAndEndingToBuffer(&sb);
1887    DataFrame df;
1888    df.size = sb.ReadableBytes();
1889    char* buffer = new char[df.size];
1890    df.data = buffer;
1891    df.delete_when_done = true;
1892    sb.Read(buffer, df.size);
1893    VLOG(2) << "******************Sending HTTP Reply header " << stream_id;
1894    size_t df_size = df.size;
1895    EnqueueDataFrame(df);
1896    return df_size;
1897  }
1898
1899  void SendDataFrameImpl(uint32 stream_id, const char* data, int64 len,
1900                         uint32 flags, bool compress) {
1901    char chunk_buf[128];
1902    snprintf(chunk_buf, sizeof(chunk_buf), "%x\r\n", (unsigned int)len);
1903    string chunk_description(chunk_buf);
1904    DataFrame df;
1905    df.size = chunk_description.size() + len + 2;
1906    char* buffer = new char[df.size];
1907    df.data = buffer;
1908    df.delete_when_done = true;
1909    memcpy(buffer, chunk_description.data(), chunk_description.size());
1910    memcpy(buffer + chunk_description.size(), data, len);
1911    memcpy(buffer + chunk_description.size() + len, "\r\n", 2);
1912    EnqueueDataFrame(df);
1913  }
1914
1915  void EnqueueDataFrame(const DataFrame& df) {
1916    connection_->EnqueueDataFrame(df);
1917  }
1918
1919  void GetOutput() {
1920    MemCacheIter* mci = output_ordering_.GetIter();
1921    if (mci == NULL) {
1922      VLOG(2) << "GetOutput: nothing to output!?";
1923      return;
1924    }
1925    if (!mci->transformed_header) {
1926      mci->bytes_sent = SendSynReply(mci->stream_id,
1927                                     *(mci->file_data->headers));
1928      mci->transformed_header = true;
1929      VLOG(2) << "GetOutput transformed header stream_id: ["
1930        << mci->stream_id << "]";
1931      return;
1932    }
1933    if (mci->body_bytes_consumed >= mci->file_data->body.size()) {
1934      SendEOF(mci->stream_id);
1935      output_ordering_.RemoveStreamId(mci->stream_id);
1936      VLOG(2) << "GetOutput remove_stream_id: [" << mci->stream_id << "]";
1937      return;
1938    }
1939    size_t num_to_write =
1940      mci->file_data->body.size() - mci->body_bytes_consumed;
1941    if (num_to_write > mci->max_segment_size)
1942      num_to_write = mci->max_segment_size;
1943    SendDataFrame(mci->stream_id,
1944                  mci->file_data->body.data() + mci->body_bytes_consumed,
1945                  num_to_write, 0, true);
1946    VLOG(2) << "GetOutput SendDataFrame[" << mci->stream_id
1947      << "]: " << num_to_write;
1948    mci->body_bytes_consumed += num_to_write;
1949    mci->bytes_sent += num_to_write;
1950  }
1951};
1952
1953////////////////////////////////////////////////////////////////////////////////
1954
1955class Notification {
1956 public:
1957  explicit Notification(bool value) : value_(value) {}
1958
1959  void Notify() {
1960    AutoLock al(lock_);
1961    value_ = true;
1962  }
1963  bool HasBeenNotified() {
1964    AutoLock al(lock_);
1965    return value_;
1966  }
1967  bool value_;
1968  Lock lock_;
1969};
1970
1971////////////////////////////////////////////////////////////////////////////////
1972
1973class SMAcceptorThread : public SimpleThread,
1974                         public EpollCallbackInterface,
1975                         public SMServerConnectionPoolInterface {
1976  EpollServer epoll_server_;
1977  int listen_fd_;
1978  int accepts_per_wake_;
1979
1980  vector<SMServerConnection*> unused_server_connections_;
1981  vector<SMServerConnection*> tmp_unused_server_connections_;
1982  vector<SMServerConnection*> allocated_server_connections_;
1983  Notification quitting_;
1984  SMInterfaceFactory* sm_interface_factory_;
1985  MemoryCache* memory_cache_;
1986 public:
1987
1988  SMAcceptorThread(int listen_fd,
1989                   int accepts_per_wake,
1990                   SMInterfaceFactory* smif,
1991                   MemoryCache* memory_cache) :
1992      SimpleThread("SMAcceptorThread"),
1993      listen_fd_(listen_fd),
1994      accepts_per_wake_(accepts_per_wake),
1995      quitting_(false),
1996      sm_interface_factory_(smif),
1997      memory_cache_(memory_cache) {
1998  }
1999
2000  ~SMAcceptorThread() {
2001    for (vector<SMServerConnection*>::iterator i =
2002           allocated_server_connections_.begin();
2003         i != allocated_server_connections_.end();
2004         ++i) {
2005      delete *i;
2006    }
2007  }
2008
2009  SMServerConnection* NewConnection() {
2010    SMServerConnection* server =
2011      SMServerConnection::NewSMServerConnection(sm_interface_factory_,
2012                                                memory_cache_,
2013                                                &epoll_server_);
2014    allocated_server_connections_.push_back(server);
2015    VLOG(3) << "Making new server: " << server;
2016    return server;
2017  }
2018
2019  SMServerConnection* FindOrMakeNewSMServerConnection() {
2020    if (unused_server_connections_.empty()) {
2021      return NewConnection();
2022    }
2023    SMServerConnection* retval = unused_server_connections_.back();
2024    unused_server_connections_.pop_back();
2025    return retval;
2026  }
2027
2028
2029  void InitWorker() {
2030    epoll_server_.RegisterFD(listen_fd_, this, EPOLLIN | EPOLLET);
2031  }
2032
2033  void HandleConnection(int client_fd) {
2034    SMServerConnection* server_connection = FindOrMakeNewSMServerConnection();
2035    if (server_connection == NULL) {
2036      VLOG(2) << "Closing " << client_fd;
2037      close(client_fd);
2038      return;
2039    }
2040    server_connection->InitSMServerConnection(this,
2041                                            &epoll_server_,
2042                                            client_fd);
2043  }
2044
2045  void AcceptFromListenFD() {
2046    if (accepts_per_wake_ > 0) {
2047      for (int i = 0; i < accepts_per_wake_; ++i) {
2048        struct sockaddr address;
2049        socklen_t socklen = sizeof(address);
2050        int fd = accept(listen_fd_, &address, &socklen);
2051        if (fd == -1) {
2052          VLOG(2) << "accept fail(" << listen_fd_ << "): " << errno;
2053          break;
2054        }
2055        VLOG(2) << "********************Accepted fd: " << fd << "\n\n\n";
2056        HandleConnection(fd);
2057      }
2058    } else {
2059      while (true) {
2060        struct sockaddr address;
2061        socklen_t socklen = sizeof(address);
2062        int fd = accept(listen_fd_, &address, &socklen);
2063        if (fd == -1) {
2064          VLOG(2) << "accept fail(" << listen_fd_ << "): " << errno;
2065          break;
2066        }
2067        VLOG(2) << "********************Accepted fd: " << fd << "\n\n\n";
2068        HandleConnection(fd);
2069      }
2070    }
2071  }
2072
2073  // EpollCallbackInteface virtual functions.
2074  virtual void OnRegistration(EpollServer* eps, int fd, int event_mask) { }
2075  virtual void OnModification(int fd, int event_mask) { }
2076  virtual void OnEvent(int fd, EpollEvent* event) {
2077    if (event->in_events | EPOLLIN) {
2078      VLOG(2) << "Accepting based upon epoll events";
2079      AcceptFromListenFD();
2080    }
2081  }
2082  virtual void OnUnregistration(int fd, bool replaced) { }
2083  virtual void OnShutdown(EpollServer* eps, int fd) { }
2084
2085  void Quit() {
2086    quitting_.Notify();
2087  }
2088
2089  void Run() {
2090    while (!quitting_.HasBeenNotified()) {
2091      epoll_server_.set_timeout_in_us(10 * 1000);  // 10 ms
2092      epoll_server_.WaitForEventsAndExecuteCallbacks();
2093      unused_server_connections_.insert(unused_server_connections_.end(),
2094                                        tmp_unused_server_connections_.begin(),
2095                                        tmp_unused_server_connections_.end());
2096      tmp_unused_server_connections_.clear();
2097    }
2098  }
2099
2100  // SMServerConnections will use this:
2101  virtual void SMServerConnectionDone(SMServerConnection* sc) {
2102    VLOG(3) << "Done with server connection: " << sc;
2103    tmp_unused_server_connections_.push_back(sc);
2104  }
2105};
2106
2107////////////////////////////////////////////////////////////////////////////////
2108
2109SMInterface* NewSpdySM(SMServerConnection* connection) {
2110  return new SpdySM(connection);
2111}
2112
2113SMInterface* NewHTTPSM(SMServerConnection* connection) {
2114  return new HTTPSM(connection);
2115}
2116
2117////////////////////////////////////////////////////////////////////////////////
2118
2119int CreateListeningSocket(int port, int backlog_size,
2120                          bool reuseport, bool no_nagle) {
2121  int listening_socket = 0;
2122  char port_buf[256];
2123  snprintf(port_buf, sizeof(port_buf), "%d", port);
2124  cerr <<" Attempting to listen on port: " << port_buf << "\n";
2125  cerr <<" input port: " << port << "\n";
2126  net::CreateListeningSocket("",
2127                              port_buf,
2128                              true,
2129                              backlog_size,
2130                              &listening_socket,
2131                              true,
2132                              reuseport,
2133                              &cerr);
2134  SetNonBlocking(listening_socket);
2135  if (no_nagle) {
2136    // set SO_REUSEADDR on the listening socket.
2137    int on = 1;
2138    int rc;
2139    rc = setsockopt(listening_socket, IPPROTO_TCP,  TCP_NODELAY,
2140                    reinterpret_cast<char*>(&on), sizeof(on));
2141    if (rc < 0) {
2142      close(listening_socket);
2143      LOG(FATAL) << "setsockopt() failed fd=" << listening_socket << "\n";
2144    }
2145  }
2146  return listening_socket;
2147}
2148
2149////////////////////////////////////////////////////////////////////////////////
2150
2151bool GotQuitFromStdin() {
2152  // Make stdin nonblocking. Yes this is done each time. Oh well.
2153  fcntl(0, F_SETFL, O_NONBLOCK);
2154  char c;
2155  string maybequit;
2156  while (read(0, &c, 1) > 0) {
2157    maybequit += c;
2158  }
2159  if (maybequit.size()) {
2160    VLOG(2) << "scanning string: \"" << maybequit << "\"";
2161  }
2162  return (maybequit.size() > 1 &&
2163          (maybequit.c_str()[0] == 'q' ||
2164           maybequit.c_str()[0] == 'Q'));
2165}
2166
2167
2168////////////////////////////////////////////////////////////////////////////////
2169
2170const char* BoolToStr(bool b) {
2171  if (b)
2172    return "true";
2173  return "false";
2174}
2175
2176////////////////////////////////////////////////////////////////////////////////
2177
2178int main(int argc, char**argv) {
2179  bool use_ssl = FLAGS_use_ssl;
2180  int response_count_until_close = FLAGS_response_count_until_close;
2181  int spdy_port = FLAGS_spdy_port;
2182  int port = FLAGS_port;
2183  int backlog_size = FLAGS_accept_backlog_size;
2184  bool reuseport = FLAGS_reuseport;
2185  bool no_nagle = FLAGS_no_nagle;
2186  double server_think_time_in_s = FLAGS_server_think_time_in_s;
2187  int accepts_per_wake = FLAGS_accepts_per_wake;
2188  int num_threads = 1;
2189
2190
2191  MemoryCache spdy_memory_cache;
2192  spdy_memory_cache.AddFiles();
2193
2194  MemoryCache http_memory_cache;
2195  http_memory_cache.CloneFrom(spdy_memory_cache);
2196
2197  LOG(INFO) <<
2198    "Starting up with the following state: \n"
2199    "                      use_ssl: " << use_ssl << "\n"
2200    "   response_count_until_close: " << response_count_until_close << "\n"
2201    "                         port: " << port << "\n"
2202    "                    spdy_port: " << spdy_port << "\n"
2203    "                 backlog_size: " << backlog_size << "\n"
2204    "                    reuseport: " << BoolToStr(reuseport) << "\n"
2205    "                     no_nagle: " << BoolToStr(no_nagle) << "\n"
2206    "       server_think_time_in_s: " << server_think_time_in_s << "\n"
2207    "             accepts_per_wake: " << accepts_per_wake << "\n"
2208    "                  num_threads: " << num_threads << "\n"
2209    "                     use_xsub: " << BoolToStr(FLAGS_use_xsub) << "\n"
2210    "                      use_xac: " << BoolToStr(FLAGS_use_xac) << "\n";
2211
2212  if (use_ssl) {
2213    global_ssl_state = new GlobalSSLState;
2214    spdy_init_ssl(global_ssl_state);
2215  } else {
2216    global_ssl_state = NULL;
2217  }
2218  EpollServer epoll_server;
2219  vector<SMAcceptorThread*> sm_worker_threads_;
2220
2221  {
2222    // spdy
2223    int listen_fd = -1;
2224
2225    if (reuseport || listen_fd == -1) {
2226      listen_fd = CreateListeningSocket(spdy_port, backlog_size,
2227                                        reuseport, no_nagle);
2228      if (listen_fd < 0) {
2229        LOG(FATAL) << "Unable to open listening socket on spdy_port: "
2230          << spdy_port;
2231      } else {
2232        LOG(INFO) << "Listening for spdy on port: " << spdy_port;
2233      }
2234    }
2235    sm_worker_threads_.push_back(
2236        new SMAcceptorThread(listen_fd,
2237                             accepts_per_wake,
2238                             &NewSpdySM,
2239                             &spdy_memory_cache));
2240    // Note that spdy_memory_cache is not threadsafe, it is merely
2241    // thread compatible. Thus, if ever we are to spawn multiple threads,
2242    // we either must make the MemoryCache threadsafe, or use
2243    // a separate MemoryCache for each thread.
2244    //
2245    // The latter is what is currently being done as we spawn
2246    // two threads (one for spdy, one for http).
2247    sm_worker_threads_.back()->InitWorker();
2248    sm_worker_threads_.back()->Start();
2249  }
2250
2251  {
2252    // http
2253    int listen_fd = -1;
2254    if (reuseport || listen_fd == -1) {
2255      listen_fd = CreateListeningSocket(port, backlog_size,
2256                                        reuseport, no_nagle);
2257      if (listen_fd < 0) {
2258        LOG(FATAL) << "Unable to open listening socket on port: " << port;
2259      } else {
2260        LOG(INFO) << "Listening for HTTP on port: " << port;
2261      }
2262    }
2263    sm_worker_threads_.push_back(
2264        new SMAcceptorThread(listen_fd,
2265                             accepts_per_wake,
2266                             &NewHTTPSM,
2267                             &http_memory_cache));
2268    // Note that spdy_memory_cache is not threadsafe, it is merely
2269    // thread compatible. Thus, if ever we are to spawn multiple threads,
2270    // we either must make the MemoryCache threadsafe, or use
2271    // a separate MemoryCache for each thread.
2272    //
2273    // The latter is what is currently being done as we spawn
2274    // two threads (one for spdy, one for http).
2275    sm_worker_threads_.back()->InitWorker();
2276    sm_worker_threads_.back()->Start();
2277  }
2278
2279  while (true) {
2280    if (GotQuitFromStdin()) {
2281      for (unsigned int i = 0; i < sm_worker_threads_.size(); ++i) {
2282        sm_worker_threads_[i]->Quit();
2283      }
2284      for (unsigned int i = 0; i < sm_worker_threads_.size(); ++i) {
2285        sm_worker_threads_[i]->Join();
2286      }
2287      return 0;
2288    }
2289    usleep(1000*10);  // 10 ms
2290  }
2291  return 0;
2292}
2293
2294