1# Copyright (C) 2010 Google Inc. All rights reserved.
2#
3# Redistribution and use in source and binary forms, with or without
4# modification, are permitted provided that the following conditions are
5# met:
6#
7#     * Redistributions of source code must retain the above copyright
8# notice, this list of conditions and the following disclaimer.
9#     * Redistributions in binary form must reproduce the above
10# copyright notice, this list of conditions and the following disclaimer
11# in the documentation and/or other materials provided with the
12# distribution.
13#     * Neither the Google name nor the names of its
14# contributors may be used to endorse or promote products derived from
15# this software without specific prior written permission.
16#
17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
19# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
21# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28
29"""Package that implements the ServerProcess wrapper class"""
30
31import errno
32import logging
33import re
34import signal
35import sys
36import time
37
38# Note that although win32 python does provide an implementation of
39# the win32 select API, it only works on sockets, and not on the named pipes
40# used by subprocess, so we have to use the native APIs directly.
41_quote_cmd = None
42
43if sys.platform == 'win32':
44    import msvcrt
45    import win32pipe
46    import win32file
47    import subprocess
48    _quote_cmd = subprocess.list2cmdline
49else:
50    import fcntl
51    import os
52    import pipes
53    import select
54    _quote_cmd = lambda cmdline: ' '.join(pipes.quote(arg) for arg in cmdline)
55
56from webkitpy.common.system.executive import ScriptError
57
58
59_log = logging.getLogger(__name__)
60
61
62_trailing_spaces_re = re.compile('(.*[^ ])?( +)$')
63
64
65def quote_data(data):
66    txt = repr(data).replace('\\n', '\\n\n')[1:-1]
67    lines = []
68    for l in txt.splitlines():
69        m = _trailing_spaces_re.match(l)
70        if m:
71            l = m.group(1) + m.group(2).replace(' ', '\x20')
72        lines.append(l)
73    return lines
74
75class ServerProcess(object):
76    """This class provides a wrapper around a subprocess that
77    implements a simple request/response usage model. The primary benefit
78    is that reading responses takes a deadline, so that we don't ever block
79    indefinitely. The class also handles transparently restarting processes
80    as necessary to keep issuing commands."""
81
82    def __init__(self, port_obj, name, cmd, env=None, universal_newlines=False, treat_no_data_as_crash=False,
83                 logging=False):
84        self._port = port_obj
85        self._name = name  # Should be the command name (e.g. content_shell, image_diff)
86        self._cmd = cmd
87        self._env = env
88        # Set if the process outputs non-standard newlines like '\r\n' or '\r'.
89        # Don't set if there will be binary data or the data must be ASCII encoded.
90        self._universal_newlines = universal_newlines
91        self._treat_no_data_as_crash = treat_no_data_as_crash
92        self._logging = logging
93        self._host = self._port.host
94        self._pid = None
95        self._reset()
96
97        # See comment in imports for why we need the win32 APIs and can't just use select.
98        # FIXME: there should be a way to get win32 vs. cygwin from platforminfo.
99        self._use_win32_apis = sys.platform == 'win32'
100
101    def name(self):
102        return self._name
103
104    def pid(self):
105        return self._pid
106
107    def _reset(self):
108        if getattr(self, '_proc', None):
109            if self._proc.stdin:
110                self._proc.stdin.close()
111                self._proc.stdin = None
112            if self._proc.stdout:
113                self._proc.stdout.close()
114                self._proc.stdout = None
115            if self._proc.stderr:
116                self._proc.stderr.close()
117                self._proc.stderr = None
118
119        self._proc = None
120        self._output = str()  # bytesarray() once we require Python 2.6
121        self._error = str()  # bytesarray() once we require Python 2.6
122        self._crashed = False
123        self.timed_out = False
124
125    def process_name(self):
126        return self._name
127
128    def _start(self):
129        if self._proc:
130            raise ValueError("%s already running" % self._name)
131        self._reset()
132        # close_fds is a workaround for http://bugs.python.org/issue2320
133        close_fds = not self._host.platform.is_win()
134        if self._logging:
135            env_str = ''
136            if self._env:
137                env_str += '\n'.join("%s=%s" % (k, v) for k, v in self._env.items()) + '\n'
138            _log.info('CMD: \n%s%s\n', env_str, _quote_cmd(self._cmd))
139        self._proc = self._host.executive.popen(self._cmd, stdin=self._host.executive.PIPE,
140            stdout=self._host.executive.PIPE,
141            stderr=self._host.executive.PIPE,
142            close_fds=close_fds,
143            env=self._env,
144            universal_newlines=self._universal_newlines)
145        self._pid = self._proc.pid
146        fd = self._proc.stdout.fileno()
147        if not self._use_win32_apis:
148            fl = fcntl.fcntl(fd, fcntl.F_GETFL)
149            fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
150            fd = self._proc.stderr.fileno()
151            fl = fcntl.fcntl(fd, fcntl.F_GETFL)
152            fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
153
154    def _handle_possible_interrupt(self):
155        """This routine checks to see if the process crashed or exited
156        because of a keyboard interrupt and raises KeyboardInterrupt
157        accordingly."""
158        # FIXME: Linux and Mac set the returncode to -signal.SIGINT if a
159        # subprocess is killed with a ctrl^C.  Previous comments in this
160        # routine said that supposedly Windows returns 0xc000001d, but that's not what
161        # -1073741510 evaluates to. Figure out what the right value is
162        # for win32 and cygwin here ...
163        if self._proc.returncode in (-1073741510, -signal.SIGINT):
164            raise KeyboardInterrupt
165
166    def poll(self):
167        """Check to see if the underlying process is running; returns None
168        if it still is (wrapper around subprocess.poll)."""
169        if self._proc:
170            return self._proc.poll()
171        return None
172
173    def write(self, bytes):
174        """Write a request to the subprocess. The subprocess is (re-)start()'ed
175        if is not already running."""
176        if not self._proc:
177            self._start()
178        try:
179            self._log_data(' IN', bytes)
180            self._proc.stdin.write(bytes)
181        except IOError, e:
182            self.stop(0.0)
183            # stop() calls _reset(), so we have to set crashed to True after calling stop().
184            self._crashed = True
185
186    def _pop_stdout_line_if_ready(self):
187        index_after_newline = self._output.find('\n') + 1
188        if index_after_newline > 0:
189            return self._pop_output_bytes(index_after_newline)
190        return None
191
192    def _pop_stderr_line_if_ready(self):
193        index_after_newline = self._error.find('\n') + 1
194        if index_after_newline > 0:
195            return self._pop_error_bytes(index_after_newline)
196        return None
197
198    def pop_all_buffered_stderr(self):
199        return self._pop_error_bytes(len(self._error))
200
201    def read_stdout_line(self, deadline):
202        return self._read(deadline, self._pop_stdout_line_if_ready)
203
204    def read_stderr_line(self, deadline):
205        return self._read(deadline, self._pop_stderr_line_if_ready)
206
207    def read_either_stdout_or_stderr_line(self, deadline):
208        def retrieve_bytes_from_buffers():
209            stdout_line = self._pop_stdout_line_if_ready()
210            if stdout_line:
211                return stdout_line, None
212            stderr_line = self._pop_stderr_line_if_ready()
213            if stderr_line:
214                return None, stderr_line
215            return None  # Instructs the caller to keep waiting.
216
217        return_value = self._read(deadline, retrieve_bytes_from_buffers)
218        # FIXME: This is a bit of a hack around the fact that _read normally only returns one value, but this caller wants it to return two.
219        if return_value is None:
220            return None, None
221        return return_value
222
223    def read_stdout(self, deadline, size):
224        if size <= 0:
225            raise ValueError('ServerProcess.read() called with a non-positive size: %d ' % size)
226
227        def retrieve_bytes_from_stdout_buffer():
228            if len(self._output) >= size:
229                return self._pop_output_bytes(size)
230            return None
231
232        return self._read(deadline, retrieve_bytes_from_stdout_buffer)
233
234    def _log(self, message):
235        # This is a bit of a hack, but we first log a blank line to avoid
236        # messing up the master process's output.
237        _log.info('')
238        _log.info(message)
239
240    def _log_data(self, prefix, data):
241        if self._logging and data and len(data):
242            for line in quote_data(data):
243                _log.info('%s: %s', prefix, line)
244
245    def _handle_timeout(self):
246        self.timed_out = True
247        self._port.sample_process(self._name, self._proc.pid)
248
249    def _split_string_after_index(self, string, index):
250        return string[:index], string[index:]
251
252    def _pop_output_bytes(self, bytes_count):
253        output, self._output = self._split_string_after_index(self._output, bytes_count)
254        return output
255
256    def _pop_error_bytes(self, bytes_count):
257        output, self._error = self._split_string_after_index(self._error, bytes_count)
258        return output
259
260    def _wait_for_data_and_update_buffers_using_select(self, deadline, stopping=False):
261        if self._proc.stdout.closed or self._proc.stderr.closed:
262            # If the process crashed and is using FIFOs, like Chromium Android, the
263            # stdout and stderr pipes will be closed.
264            return
265
266        out_fd = self._proc.stdout.fileno()
267        err_fd = self._proc.stderr.fileno()
268        select_fds = (out_fd, err_fd)
269        try:
270            read_fds, _, _ = select.select(select_fds, [], select_fds, max(deadline - time.time(), 0))
271        except select.error, e:
272            # We can ignore EINVAL since it's likely the process just crashed and we'll
273            # figure that out the next time through the loop in _read().
274            if e.args[0] == errno.EINVAL:
275                return
276            raise
277
278        try:
279            # Note that we may get no data during read() even though
280            # select says we got something; see the select() man page
281            # on linux. I don't know if this happens on Mac OS and
282            # other Unixen as well, but we don't bother special-casing
283            # Linux because it's relatively harmless either way.
284            if out_fd in read_fds:
285                data = self._proc.stdout.read()
286                if not data and not stopping and (self._treat_no_data_as_crash or self._proc.poll()):
287                    self._crashed = True
288                self._log_data('OUT', data)
289                self._output += data
290
291            if err_fd in read_fds:
292                data = self._proc.stderr.read()
293                if not data and not stopping and (self._treat_no_data_as_crash or self._proc.poll()):
294                    self._crashed = True
295                self._log_data('ERR', data)
296                self._error += data
297        except IOError, e:
298            # We can ignore the IOErrors because we will detect if the subporcess crashed
299            # the next time through the loop in _read()
300            pass
301
302    def _wait_for_data_and_update_buffers_using_win32_apis(self, deadline):
303        # See http://code.activestate.com/recipes/440554-module-to-allow-asynchronous-subprocess-use-on-win/
304        # and http://docs.activestate.com/activepython/2.6/pywin32/modules.html
305        # for documentation on all of these win32-specific modules.
306        now = time.time()
307        out_fh = msvcrt.get_osfhandle(self._proc.stdout.fileno())
308        err_fh = msvcrt.get_osfhandle(self._proc.stderr.fileno())
309        while (self._proc.poll() is None) and (now < deadline):
310            output = self._non_blocking_read_win32(out_fh)
311            self._log_data('OUT', output)
312            error = self._non_blocking_read_win32(err_fh)
313            self._log_data('ERR', error)
314            if output or error:
315                if output:
316                    self._output += output
317                if error:
318                    self._error += error
319                return
320            time.sleep(0.01)
321            now = time.time()
322        return
323
324    def _non_blocking_read_win32(self, handle):
325        try:
326            _, avail, _ = win32pipe.PeekNamedPipe(handle, 0)
327            if avail > 0:
328                _, buf = win32file.ReadFile(handle, avail, None)
329                return buf
330        except Exception, e:
331            if e[0] not in (109, errno.ESHUTDOWN):  # 109 == win32 ERROR_BROKEN_PIPE
332                raise
333        return None
334
335    def has_crashed(self):
336        if not self._crashed and self.poll():
337            self._crashed = True
338            self._handle_possible_interrupt()
339        return self._crashed
340
341    # This read function is a bit oddly-designed, as it polls both stdout and stderr, yet
342    # only reads/returns from one of them (buffering both in local self._output/self._error).
343    # It might be cleaner to pass in the file descriptor to poll instead.
344    def _read(self, deadline, fetch_bytes_from_buffers_callback):
345        while True:
346            if self.has_crashed():
347                return None
348
349            if time.time() > deadline:
350                self._handle_timeout()
351                return None
352
353            bytes = fetch_bytes_from_buffers_callback()
354            if bytes is not None:
355                return bytes
356
357            if self._use_win32_apis:
358                self._wait_for_data_and_update_buffers_using_win32_apis(deadline)
359            else:
360                self._wait_for_data_and_update_buffers_using_select(deadline)
361
362    def start(self):
363        if not self._proc:
364            self._start()
365
366    def stop(self, timeout_secs=0.0):
367        if not self._proc:
368            return (None, None)
369
370        now = time.time()
371        if self._proc.stdin:
372            if self._logging:
373                _log.info(' IN: ^D')
374            self._proc.stdin.close()
375            self._proc.stdin = None
376        killed = False
377        if timeout_secs:
378            deadline = now + timeout_secs
379            while self._proc.poll() is None and time.time() < deadline:
380                time.sleep(0.01)
381            if self._proc.poll() is None:
382                _log.warning('stopping %s(pid %d) timed out, killing it' % (self._name, self._proc.pid))
383
384        if self._proc.poll() is None:
385            self._kill()
386            killed = True
387            _log.debug('killed pid %d' % self._proc.pid)
388
389        # read any remaining data on the pipes and return it.
390        if not killed:
391            if self._use_win32_apis:
392                self._wait_for_data_and_update_buffers_using_win32_apis(now)
393            else:
394                self._wait_for_data_and_update_buffers_using_select(now, stopping=True)
395        out, err = self._output, self._error
396        self._reset()
397        return (out, err)
398
399    def kill(self):
400        self.stop(0.0)
401
402    def _kill(self):
403        self._host.executive.kill_process(self._proc.pid)
404        if self._proc.poll() is not None:
405            self._proc.wait()
406
407    def replace_outputs(self, stdout, stderr):
408        assert self._proc
409        if stdout:
410            self._proc.stdout.close()
411            self._proc.stdout = stdout
412        if stderr:
413            self._proc.stderr.close()
414            self._proc.stderr = stderr
415