1# Copyright (C) 2011 Google Inc. All rights reserved.
2#
3# Redistribution and use in source and binary forms, with or without
4# modification, are permitted provided that the following conditions are
5# met:
6#
7#    * Redistributions of source code must retain the above copyright
8# notice, this list of conditions and the following disclaimer.
9#    * Redistributions in binary form must reproduce the above
10# copyright notice, this list of conditions and the following disclaimer
11# in the documentation and/or other materials provided with the
12# distribution.
13#    * Neither the name of Google Inc. nor the names of its
14# contributors may be used to endorse or promote products derived from
15# this software without specific prior written permission.
16#
17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
19# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
21# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28
29import sys
30import time
31import unittest
32
33from webkitpy.layout_tests.port.factory import PortFactory
34from webkitpy.layout_tests.port import server_process
35from webkitpy.common.system.systemhost import SystemHost
36from webkitpy.common.system.systemhost_mock import MockSystemHost
37from webkitpy.common.system.outputcapture import OutputCapture
38
39
40class TrivialMockPort(object):
41    def __init__(self):
42        self.host = MockSystemHost()
43        self.host.executive.kill_process = lambda x: None
44        self.host.executive.kill_process = lambda x: None
45
46    def results_directory(self):
47        return "/mock-results"
48
49    def process_kill_time(self):
50        return 1
51
52
53class MockFile(object):
54    def __init__(self, server_process):
55        self._server_process = server_process
56        self.closed = False
57
58    def fileno(self):
59        return 1
60
61    def write(self, line):
62        self._server_process.broken_pipes.append(self)
63        raise IOError
64
65    def close(self):
66        self.closed = True
67
68
69class MockProc(object):
70    def __init__(self, server_process):
71        self.stdin = MockFile(server_process)
72        self.stdout = MockFile(server_process)
73        self.stderr = MockFile(server_process)
74        self.pid = 1
75
76    def poll(self):
77        return 1
78
79    def wait(self):
80        return 0
81
82
83class FakeServerProcess(server_process.ServerProcess):
84    def _start(self):
85        self._proc = MockProc(self)
86        self.stdin = self._proc.stdin
87        self.stdout = self._proc.stdout
88        self.stderr = self._proc.stderr
89        self._pid = self._proc.pid
90        self.broken_pipes = []
91
92
93class TestServerProcess(unittest.TestCase):
94    def test_basic(self):
95        cmd = [sys.executable, '-c', 'import sys; import time; time.sleep(0.02); print "stdout"; sys.stdout.flush(); print >>sys.stderr, "stderr"']
96        host = SystemHost()
97        factory = PortFactory(host)
98        port = factory.get()
99        now = time.time()
100        proc = server_process.ServerProcess(port, 'python', cmd)
101        proc.write('')
102
103        self.assertEqual(proc.poll(), None)
104        self.assertFalse(proc.has_crashed())
105
106        # check that doing a read after an expired deadline returns
107        # nothing immediately.
108        line = proc.read_stdout_line(now - 1)
109        self.assertEqual(line, None)
110
111        # FIXME: This part appears to be flaky. line should always be non-None.
112        # FIXME: https://bugs.webkit.org/show_bug.cgi?id=88280
113        line = proc.read_stdout_line(now + 1.0)
114        if line:
115            self.assertEqual(line.strip(), "stdout")
116
117        line = proc.read_stderr_line(now + 1.0)
118        if line:
119            self.assertEqual(line.strip(), "stderr")
120
121        proc.stop(0)
122
123    def test_cleanup(self):
124        port_obj = TrivialMockPort()
125        server_process = FakeServerProcess(port_obj=port_obj, name="test", cmd=["test"])
126        server_process._start()
127        server_process.stop()
128        self.assertTrue(server_process.stdin.closed)
129        self.assertTrue(server_process.stdout.closed)
130        self.assertTrue(server_process.stderr.closed)
131
132    def test_broken_pipe(self):
133        port_obj = TrivialMockPort()
134
135        port_obj.host.platform.os_name = 'win'
136        server_process = FakeServerProcess(port_obj=port_obj, name="test", cmd=["test"])
137        server_process.write("should break")
138        self.assertTrue(server_process.has_crashed())
139        self.assertIsNotNone(server_process.pid())
140        self.assertIsNone(server_process._proc)
141        self.assertEqual(server_process.broken_pipes, [server_process.stdin])
142
143        port_obj.host.platform.os_name = 'mac'
144        server_process = FakeServerProcess(port_obj=port_obj, name="test", cmd=["test"])
145        server_process.write("should break")
146        self.assertTrue(server_process.has_crashed())
147        self.assertIsNone(server_process._proc)
148        self.assertEqual(server_process.broken_pipes, [server_process.stdin])
149
150
151class TestQuoteData(unittest.TestCase):
152    def test_plain(self):
153        qd = server_process.quote_data
154        self.assertEqual(qd("foo"), ["foo"])
155
156    def test_trailing_spaces(self):
157        qd = server_process.quote_data
158        self.assertEqual(qd("foo  "),
159                         ["foo\x20\x20"])
160
161    def test_newlines(self):
162        qd = server_process.quote_data
163        self.assertEqual(qd("foo \nbar\n"),
164                         ["foo\x20\\n", "bar\\n"])
165
166    def test_binary_data(self):
167        qd = server_process.quote_data
168        self.assertEqual(qd("\x00\x01ab"),
169                         ["\\x00\\x01ab"])
170