1# Copyright 2013 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""A "Test Server Spawner" that handles killing/stopping per-test test servers.
6
7It's used to accept requests from the device to spawn and kill instances of the
8chrome test server on the host.
9"""
10
11import BaseHTTPServer
12import json
13import logging
14import os
15import select
16import struct
17import subprocess
18import sys
19import threading
20import time
21import urlparse
22
23import constants
24import ports
25
26from pylib.forwarder import Forwarder
27
28# Path that are needed to import necessary modules when launching a testserver.
29os.environ['PYTHONPATH'] = os.environ.get('PYTHONPATH', '') + (':%s:%s:%s:%s:%s'
30    % (os.path.join(constants.DIR_SOURCE_ROOT, 'third_party'),
31       os.path.join(constants.DIR_SOURCE_ROOT, 'third_party', 'tlslite'),
32       os.path.join(constants.DIR_SOURCE_ROOT, 'third_party', 'pyftpdlib',
33                    'src'),
34       os.path.join(constants.DIR_SOURCE_ROOT, 'net', 'tools', 'testserver'),
35       os.path.join(constants.DIR_SOURCE_ROOT, 'sync', 'tools', 'testserver')))
36
37
38SERVER_TYPES = {
39    'http': '',
40    'ftp': '-f',
41    'sync': '',  # Sync uses its own script, and doesn't take a server type arg.
42    'tcpecho': '--tcp-echo',
43    'udpecho': '--udp-echo',
44}
45
46
47# The timeout (in seconds) of starting up the Python test server.
48TEST_SERVER_STARTUP_TIMEOUT = 10
49
50def _WaitUntil(predicate, max_attempts=5):
51  """Blocks until the provided predicate (function) is true.
52
53  Returns:
54    Whether the provided predicate was satisfied once (before the timeout).
55  """
56  sleep_time_sec = 0.025
57  for attempt in xrange(1, max_attempts):
58    if predicate():
59      return True
60    time.sleep(sleep_time_sec)
61    sleep_time_sec = min(1, sleep_time_sec * 2)  # Don't wait more than 1 sec.
62  return False
63
64
65def _CheckPortStatus(port, expected_status):
66  """Returns True if port has expected_status.
67
68  Args:
69    port: the port number.
70    expected_status: boolean of expected status.
71
72  Returns:
73    Returns True if the status is expected. Otherwise returns False.
74  """
75  return _WaitUntil(lambda: ports.IsHostPortUsed(port) == expected_status)
76
77
78def _CheckDevicePortStatus(adb, port):
79  """Returns whether the provided port is used."""
80  return _WaitUntil(lambda: ports.IsDevicePortUsed(adb, port))
81
82
83def _GetServerTypeCommandLine(server_type):
84  """Returns the command-line by the given server type.
85
86  Args:
87    server_type: the server type to be used (e.g. 'http').
88
89  Returns:
90    A string containing the command-line argument.
91  """
92  if server_type not in SERVER_TYPES:
93    raise NotImplementedError('Unknown server type: %s' % server_type)
94  if server_type == 'udpecho':
95    raise Exception('Please do not run UDP echo tests because we do not have '
96                    'a UDP forwarder tool.')
97  return SERVER_TYPES[server_type]
98
99
100class TestServerThread(threading.Thread):
101  """A thread to run the test server in a separate process."""
102
103  def __init__(self, ready_event, arguments, adb, tool, build_type):
104    """Initialize TestServerThread with the following argument.
105
106    Args:
107      ready_event: event which will be set when the test server is ready.
108      arguments: dictionary of arguments to run the test server.
109      adb: instance of AndroidCommands.
110      tool: instance of runtime error detection tool.
111      build_type: 'Release' or 'Debug'.
112    """
113    threading.Thread.__init__(self)
114    self.wait_event = threading.Event()
115    self.stop_flag = False
116    self.ready_event = ready_event
117    self.ready_event.clear()
118    self.arguments = arguments
119    self.adb = adb
120    self.tool = tool
121    self.test_server_process = None
122    self.is_ready = False
123    self.host_port = self.arguments['port']
124    assert isinstance(self.host_port, int)
125    # The forwarder device port now is dynamically allocated.
126    self.forwarder_device_port = 0
127    # Anonymous pipe in order to get port info from test server.
128    self.pipe_in = None
129    self.pipe_out = None
130    self.command_line = []
131    self.build_type = build_type
132
133  def _WaitToStartAndGetPortFromTestServer(self):
134    """Waits for the Python test server to start and gets the port it is using.
135
136    The port information is passed by the Python test server with a pipe given
137    by self.pipe_out. It is written as a result to |self.host_port|.
138
139    Returns:
140      Whether the port used by the test server was successfully fetched.
141    """
142    assert self.host_port == 0 and self.pipe_out and self.pipe_in
143    (in_fds, _, _) = select.select([self.pipe_in, ], [], [],
144                                   TEST_SERVER_STARTUP_TIMEOUT)
145    if len(in_fds) == 0:
146      logging.error('Failed to wait to the Python test server to be started.')
147      return False
148    # First read the data length as an unsigned 4-byte value.  This
149    # is _not_ using network byte ordering since the Python test server packs
150    # size as native byte order and all Chromium platforms so far are
151    # configured to use little-endian.
152    # TODO(jnd): Change the Python test server and local_test_server_*.cc to
153    # use a unified byte order (either big-endian or little-endian).
154    data_length = os.read(self.pipe_in, struct.calcsize('=L'))
155    if data_length:
156      (data_length,) = struct.unpack('=L', data_length)
157      assert data_length
158    if not data_length:
159      logging.error('Failed to get length of server data.')
160      return False
161    port_json = os.read(self.pipe_in, data_length)
162    if not port_json:
163      logging.error('Failed to get server data.')
164      return False
165    logging.info('Got port json data: %s', port_json)
166    port_json = json.loads(port_json)
167    if port_json.has_key('port') and isinstance(port_json['port'], int):
168      self.host_port = port_json['port']
169      return _CheckPortStatus(self.host_port, True)
170    logging.error('Failed to get port information from the server data.')
171    return False
172
173  def _GenerateCommandLineArguments(self):
174    """Generates the command line to run the test server.
175
176    Note that all options are processed by following the definitions in
177    testserver.py.
178    """
179    if self.command_line:
180      return
181    # The following arguments must exist.
182    type_cmd = _GetServerTypeCommandLine(self.arguments['server-type'])
183    if type_cmd:
184      self.command_line.append(type_cmd)
185    self.command_line.append('--port=%d' % self.host_port)
186    # Use a pipe to get the port given by the instance of Python test server
187    # if the test does not specify the port.
188    if self.host_port == 0:
189      (self.pipe_in, self.pipe_out) = os.pipe()
190      self.command_line.append('--startup-pipe=%d' % self.pipe_out)
191    self.command_line.append('--host=%s' % self.arguments['host'])
192    data_dir = self.arguments['data-dir'] or 'chrome/test/data'
193    if not os.path.isabs(data_dir):
194      data_dir = os.path.join(constants.DIR_SOURCE_ROOT, data_dir)
195    self.command_line.append('--data-dir=%s' % data_dir)
196    # The following arguments are optional depending on the individual test.
197    if self.arguments.has_key('log-to-console'):
198      self.command_line.append('--log-to-console')
199    if self.arguments.has_key('auth-token'):
200      self.command_line.append('--auth-token=%s' % self.arguments['auth-token'])
201    if self.arguments.has_key('https'):
202      self.command_line.append('--https')
203      if self.arguments.has_key('cert-and-key-file'):
204        self.command_line.append('--cert-and-key-file=%s' % os.path.join(
205            constants.DIR_SOURCE_ROOT, self.arguments['cert-and-key-file']))
206      if self.arguments.has_key('ocsp'):
207        self.command_line.append('--ocsp=%s' % self.arguments['ocsp'])
208      if self.arguments.has_key('https-record-resume'):
209        self.command_line.append('--https-record-resume')
210      if self.arguments.has_key('ssl-client-auth'):
211        self.command_line.append('--ssl-client-auth')
212      if self.arguments.has_key('tls-intolerant'):
213        self.command_line.append('--tls-intolerant=%s' %
214                                 self.arguments['tls-intolerant'])
215      if self.arguments.has_key('ssl-client-ca'):
216        for ca in self.arguments['ssl-client-ca']:
217          self.command_line.append('--ssl-client-ca=%s' %
218                                   os.path.join(constants.DIR_SOURCE_ROOT, ca))
219      if self.arguments.has_key('ssl-bulk-cipher'):
220        for bulk_cipher in self.arguments['ssl-bulk-cipher']:
221          self.command_line.append('--ssl-bulk-cipher=%s' % bulk_cipher)
222
223  def _CloseUnnecessaryFDsForTestServerProcess(self):
224    # This is required to avoid subtle deadlocks that could be caused by the
225    # test server child process inheriting undesirable file descriptors such as
226    # file lock file descriptors.
227    for fd in xrange(0, 1024):
228      if fd != self.pipe_out:
229        try:
230          os.close(fd)
231        except:
232          pass
233
234  def run(self):
235    logging.info('Start running the thread!')
236    self.wait_event.clear()
237    self._GenerateCommandLineArguments()
238    command = constants.DIR_SOURCE_ROOT
239    if self.arguments['server-type'] == 'sync':
240      command = [os.path.join(command, 'sync', 'tools', 'testserver',
241                              'sync_testserver.py')] + self.command_line
242    else:
243      command = [os.path.join(command, 'net', 'tools', 'testserver',
244                              'testserver.py')] + self.command_line
245    logging.info('Running: %s', command)
246    self.process = subprocess.Popen(
247        command, preexec_fn=self._CloseUnnecessaryFDsForTestServerProcess)
248    if self.process:
249      if self.pipe_out:
250        self.is_ready = self._WaitToStartAndGetPortFromTestServer()
251      else:
252        self.is_ready = _CheckPortStatus(self.host_port, True)
253    if self.is_ready:
254      Forwarder.Map([(0, self.host_port)], self.adb, self.build_type, self.tool)
255      # Check whether the forwarder is ready on the device.
256      self.is_ready = False
257      device_port = Forwarder.DevicePortForHostPort(self.host_port)
258      if device_port and _CheckDevicePortStatus(self.adb, device_port):
259        self.is_ready = True
260        self.forwarder_device_port = device_port
261    # Wake up the request handler thread.
262    self.ready_event.set()
263    # Keep thread running until Stop() gets called.
264    _WaitUntil(lambda: self.stop_flag, max_attempts=sys.maxint)
265    if self.process.poll() is None:
266      self.process.kill()
267    Forwarder.UnmapDevicePort(self.forwarder_device_port, self.adb)
268    self.process = None
269    self.is_ready = False
270    if self.pipe_out:
271      os.close(self.pipe_in)
272      os.close(self.pipe_out)
273      self.pipe_in = None
274      self.pipe_out = None
275    logging.info('Test-server has died.')
276    self.wait_event.set()
277
278  def Stop(self):
279    """Blocks until the loop has finished.
280
281    Note that this must be called in another thread.
282    """
283    if not self.process:
284      return
285    self.stop_flag = True
286    self.wait_event.wait()
287
288
289class SpawningServerRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
290  """A handler used to process http GET/POST request."""
291
292  def _SendResponse(self, response_code, response_reason, additional_headers,
293                    contents):
294    """Generates a response sent to the client from the provided parameters.
295
296    Args:
297      response_code: number of the response status.
298      response_reason: string of reason description of the response.
299      additional_headers: dict of additional headers. Each key is the name of
300                          the header, each value is the content of the header.
301      contents: string of the contents we want to send to client.
302    """
303    self.send_response(response_code, response_reason)
304    self.send_header('Content-Type', 'text/html')
305    # Specify the content-length as without it the http(s) response will not
306    # be completed properly (and the browser keeps expecting data).
307    self.send_header('Content-Length', len(contents))
308    for header_name in additional_headers:
309      self.send_header(header_name, additional_headers[header_name])
310    self.end_headers()
311    self.wfile.write(contents)
312    self.wfile.flush()
313
314  def _StartTestServer(self):
315    """Starts the test server thread."""
316    logging.info('Handling request to spawn a test server.')
317    content_type = self.headers.getheader('content-type')
318    if content_type != 'application/json':
319      raise Exception('Bad content-type for start request.')
320    content_length = self.headers.getheader('content-length')
321    if not content_length:
322      content_length = 0
323    try:
324      content_length = int(content_length)
325    except:
326      raise Exception('Bad content-length for start request.')
327    logging.info(content_length)
328    test_server_argument_json = self.rfile.read(content_length)
329    logging.info(test_server_argument_json)
330    assert not self.server.test_server_instance
331    ready_event = threading.Event()
332    self.server.test_server_instance = TestServerThread(
333        ready_event,
334        json.loads(test_server_argument_json),
335        self.server.adb,
336        self.server.tool,
337        self.server.build_type)
338    self.server.test_server_instance.setDaemon(True)
339    self.server.test_server_instance.start()
340    ready_event.wait()
341    if self.server.test_server_instance.is_ready:
342      self._SendResponse(200, 'OK', {}, json.dumps(
343          {'port': self.server.test_server_instance.forwarder_device_port,
344           'message': 'started'}))
345      logging.info('Test server is running on port: %d.',
346                   self.server.test_server_instance.host_port)
347    else:
348      self.server.test_server_instance.Stop()
349      self.server.test_server_instance = None
350      self._SendResponse(500, 'Test Server Error.', {}, '')
351      logging.info('Encounter problem during starting a test server.')
352
353  def _KillTestServer(self):
354    """Stops the test server instance."""
355    # There should only ever be one test server at a time. This may do the
356    # wrong thing if we try and start multiple test servers.
357    if not self.server.test_server_instance:
358      return
359    port = self.server.test_server_instance.host_port
360    logging.info('Handling request to kill a test server on port: %d.', port)
361    self.server.test_server_instance.Stop()
362    # Make sure the status of test server is correct before sending response.
363    if _CheckPortStatus(port, False):
364      self._SendResponse(200, 'OK', {}, 'killed')
365      logging.info('Test server on port %d is killed', port)
366    else:
367      self._SendResponse(500, 'Test Server Error.', {}, '')
368      logging.info('Encounter problem during killing a test server.')
369    self.server.test_server_instance = None
370
371  def do_POST(self):
372    parsed_path = urlparse.urlparse(self.path)
373    action = parsed_path.path
374    logging.info('Action for POST method is: %s.', action)
375    if action == '/start':
376      self._StartTestServer()
377    else:
378      self._SendResponse(400, 'Unknown request.', {}, '')
379      logging.info('Encounter unknown request: %s.', action)
380
381  def do_GET(self):
382    parsed_path = urlparse.urlparse(self.path)
383    action = parsed_path.path
384    params = urlparse.parse_qs(parsed_path.query, keep_blank_values=1)
385    logging.info('Action for GET method is: %s.', action)
386    for param in params:
387      logging.info('%s=%s', param, params[param][0])
388    if action == '/kill':
389      self._KillTestServer()
390    elif action == '/ping':
391      # The ping handler is used to check whether the spawner server is ready
392      # to serve the requests. We don't need to test the status of the test
393      # server when handling ping request.
394      self._SendResponse(200, 'OK', {}, 'ready')
395      logging.info('Handled ping request and sent response.')
396    else:
397      self._SendResponse(400, 'Unknown request', {}, '')
398      logging.info('Encounter unknown request: %s.', action)
399
400
401class SpawningServer(object):
402  """The class used to start/stop a http server."""
403
404  def __init__(self, test_server_spawner_port, adb, tool, build_type):
405    logging.info('Creating new spawner on port: %d.', test_server_spawner_port)
406    self.server = BaseHTTPServer.HTTPServer(('', test_server_spawner_port),
407                                            SpawningServerRequestHandler)
408    self.server.adb = adb
409    self.server.tool = tool
410    self.server.test_server_instance = None
411    self.server.build_type = build_type
412
413  def _Listen(self):
414    logging.info('Starting test server spawner')
415    self.server.serve_forever()
416
417  def Start(self):
418    """Starts the test server spawner."""
419    listener_thread = threading.Thread(target=self._Listen)
420    listener_thread.setDaemon(True)
421    listener_thread.start()
422
423  def Stop(self):
424    """Stops the test server spawner.
425
426    Also cleans the server state.
427    """
428    self.CleanupState()
429    self.server.shutdown()
430
431  def CleanupState(self):
432    """Cleans up the spawning server state.
433
434    This should be called if the test server spawner is reused,
435    to avoid sharing the test server instance.
436    """
437    if self.server.test_server_instance:
438      self.server.test_server_instance.Stop()
439      self.server.test_server_instance = None
440