1#!/usr/bin/env python
2# Copyright 2013 The Chromium Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Tests for the cmd_helper module."""
7
8import unittest
9import subprocess
10import sys
11import time
12
13from devil import devil_env
14from devil.utils import cmd_helper
15
16with devil_env.SysPath(devil_env.PYMOCK_PATH):
17  import mock  # pylint: disable=import-error
18
19
20class CmdHelperSingleQuoteTest(unittest.TestCase):
21
22  def testSingleQuote_basic(self):
23    self.assertEquals('hello',
24                      cmd_helper.SingleQuote('hello'))
25
26  def testSingleQuote_withSpaces(self):
27    self.assertEquals("'hello world'",
28                      cmd_helper.SingleQuote('hello world'))
29
30  def testSingleQuote_withUnsafeChars(self):
31    self.assertEquals("""'hello'"'"'; rm -rf /'""",
32                      cmd_helper.SingleQuote("hello'; rm -rf /"))
33
34  def testSingleQuote_dontExpand(self):
35    test_string = 'hello $TEST_VAR'
36    cmd = 'TEST_VAR=world; echo %s' % cmd_helper.SingleQuote(test_string)
37    self.assertEquals(test_string,
38                      cmd_helper.GetCmdOutput(cmd, shell=True).rstrip())
39
40
41class CmdHelperDoubleQuoteTest(unittest.TestCase):
42
43  def testDoubleQuote_basic(self):
44    self.assertEquals('hello',
45                      cmd_helper.DoubleQuote('hello'))
46
47  def testDoubleQuote_withSpaces(self):
48    self.assertEquals('"hello world"',
49                      cmd_helper.DoubleQuote('hello world'))
50
51  def testDoubleQuote_withUnsafeChars(self):
52    self.assertEquals('''"hello\\"; rm -rf /"''',
53                      cmd_helper.DoubleQuote('hello"; rm -rf /'))
54
55  def testSingleQuote_doExpand(self):
56    test_string = 'hello $TEST_VAR'
57    cmd = 'TEST_VAR=world; echo %s' % cmd_helper.DoubleQuote(test_string)
58    self.assertEquals('hello world',
59                      cmd_helper.GetCmdOutput(cmd, shell=True).rstrip())
60
61
62class CmdHelperShinkToSnippetTest(unittest.TestCase):
63
64  def testShrinkToSnippet_noArgs(self):
65    self.assertEquals('foo',
66        cmd_helper.ShrinkToSnippet(['foo'], 'a', 'bar'))
67    self.assertEquals("'foo foo'",
68        cmd_helper.ShrinkToSnippet(['foo foo'], 'a', 'bar'))
69    self.assertEquals('"$a"\' bar\'',
70        cmd_helper.ShrinkToSnippet(['foo bar'], 'a', 'foo'))
71    self.assertEquals('\'foo \'"$a"',
72        cmd_helper.ShrinkToSnippet(['foo bar'], 'a', 'bar'))
73    self.assertEquals('foo"$a"',
74        cmd_helper.ShrinkToSnippet(['foobar'], 'a', 'bar'))
75
76  def testShrinkToSnippet_singleArg(self):
77    self.assertEquals("foo ''",
78        cmd_helper.ShrinkToSnippet(['foo', ''], 'a', 'bar'))
79    self.assertEquals("foo foo",
80        cmd_helper.ShrinkToSnippet(['foo', 'foo'], 'a', 'bar'))
81    self.assertEquals('"$a" "$a"',
82        cmd_helper.ShrinkToSnippet(['foo', 'foo'], 'a', 'foo'))
83    self.assertEquals('foo "$a""$a"',
84        cmd_helper.ShrinkToSnippet(['foo', 'barbar'], 'a', 'bar'))
85    self.assertEquals('foo "$a"\' \'"$a"',
86        cmd_helper.ShrinkToSnippet(['foo', 'bar bar'], 'a', 'bar'))
87    self.assertEquals('foo "$a""$a"\' \'',
88        cmd_helper.ShrinkToSnippet(['foo', 'barbar '], 'a', 'bar'))
89    self.assertEquals('foo \' \'"$a""$a"\' \'',
90        cmd_helper.ShrinkToSnippet(['foo', ' barbar '], 'a', 'bar'))
91
92
93_DEFAULT = 'DEFAULT'
94
95
96class _ProcessOutputEvent(object):
97
98  def __init__(self, select_fds=_DEFAULT, read_contents=None, ts=_DEFAULT):
99    self.select_fds = select_fds
100    self.read_contents = read_contents
101    self.ts = ts
102
103
104class _MockProcess(object):
105
106  def __init__(self, output_sequence=None, return_value=0):
107
108    # Arbitrary.
109    fake_stdout_fileno = 25
110
111    self.mock_proc = mock.MagicMock(spec=subprocess.Popen)
112    self.mock_proc.stdout = mock.MagicMock()
113    self.mock_proc.stdout.fileno = mock.MagicMock(
114        return_value=fake_stdout_fileno)
115    self.mock_proc.returncode = None
116
117    self._return_value = return_value
118
119    # This links the behavior of os.read, select.select, time.time, and
120    # <process>.poll. The output sequence can be thought of as a list of
121    # return values for select.select with corresponding return values for
122    # the other calls at any time between that select call and the following
123    # one. We iterate through the sequence only on calls to select.select.
124    #
125    # os.read is a special case, though, where we only return a given chunk
126    # of data *once* after a given call to select.
127
128    if not output_sequence:
129      output_sequence = []
130
131    # Use an leading element to make the iteration logic work.
132    initial_seq_element = _ProcessOutputEvent(
133        _DEFAULT, '',
134        output_sequence[0].ts if output_sequence else _DEFAULT)
135    output_sequence.insert(0, initial_seq_element)
136
137    for o in output_sequence:
138      if o.select_fds == _DEFAULT:
139        if o.read_contents is None:
140          o.select_fds = []
141        else:
142          o.select_fds = [fake_stdout_fileno]
143      if o.ts == _DEFAULT:
144        o.ts = time.time()
145    self._output_sequence = output_sequence
146
147    self._output_seq_index = 0
148    self._read_flags = [False] * len(output_sequence)
149
150    def read_side_effect(*_args, **_kwargs):
151      if self._read_flags[self._output_seq_index]:
152        return None
153      self._read_flags[self._output_seq_index] = True
154      return self._output_sequence[self._output_seq_index].read_contents
155
156    def select_side_effect(*_args, **_kwargs):
157      if self._output_seq_index is None:
158        self._output_seq_index = 0
159      else:
160        self._output_seq_index += 1
161      if self._output_seq_index < len(self._output_sequence):
162        return (self._output_sequence[self._output_seq_index].select_fds,
163                None, None)
164      else:
165        return([], None, None)
166
167    def time_side_effect(*_args, **_kwargs):
168      return self._output_sequence[self._output_seq_index].ts
169
170    def poll_side_effect(*_args, **_kwargs):
171      if self._output_seq_index >= len(self._output_sequence) - 1:
172        self.mock_proc.returncode = self._return_value
173      return self.mock_proc.returncode
174
175    mock_read = mock.MagicMock(side_effect=read_side_effect)
176    mock_select = mock.MagicMock(side_effect=select_side_effect)
177    mock_time = mock.MagicMock(side_effect=time_side_effect)
178    self.mock_proc.poll = mock.MagicMock(side_effect=poll_side_effect)
179
180    # Set up but *do not start* the mocks.
181    self._mocks = [
182      mock.patch('os.read', new=mock_read),
183      mock.patch('select.select', new=mock_select),
184      mock.patch('time.time', new=mock_time),
185    ]
186    if sys.platform != 'win32':
187      self._mocks.append(mock.patch('fcntl.fcntl'))
188
189  def __enter__(self):
190    for m in self._mocks:
191      m.__enter__()
192    return self.mock_proc
193
194  def __exit__(self, exc_type, exc_val, exc_tb):
195    for m in reversed(self._mocks):
196      m.__exit__(exc_type, exc_val, exc_tb)
197
198
199class CmdHelperIterCmdOutputLinesTest(unittest.TestCase):
200  """Test IterCmdOutputLines with some calls to the unix 'seq' command."""
201
202  # This calls _IterCmdOutputLines rather than IterCmdOutputLines s.t. it
203  # can mock the process.
204  # pylint: disable=protected-access
205
206  _SIMPLE_OUTPUT_SEQUENCE = [
207    _ProcessOutputEvent(read_contents='1\n2\n'),
208  ]
209
210  def testIterCmdOutputLines_success(self):
211    with _MockProcess(
212        output_sequence=self._SIMPLE_OUTPUT_SEQUENCE) as mock_proc:
213      for num, line in enumerate(
214          cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc'), 1):
215        self.assertEquals(num, int(line))
216
217  def testIterCmdOutputLines_exitStatusFail(self):
218    with self.assertRaises(subprocess.CalledProcessError):
219      with _MockProcess(output_sequence=self._SIMPLE_OUTPUT_SEQUENCE,
220                        return_value=1) as mock_proc:
221        for num, line in enumerate(
222            cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc'), 1):
223          self.assertEquals(num, int(line))
224        # after reading all the output we get an exit status of 1
225
226  def testIterCmdOutputLines_exitStatusIgnored(self):
227    with _MockProcess(output_sequence=self._SIMPLE_OUTPUT_SEQUENCE,
228                      return_value=1) as mock_proc:
229      for num, line in enumerate(
230          cmd_helper._IterCmdOutputLines(
231              mock_proc, 'mock_proc', check_status=False),
232          1):
233        self.assertEquals(num, int(line))
234
235  def testIterCmdOutputLines_exitStatusSkipped(self):
236    with _MockProcess(output_sequence=self._SIMPLE_OUTPUT_SEQUENCE,
237                      return_value=1) as mock_proc:
238      for num, line in enumerate(
239          cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc'), 1):
240        self.assertEquals(num, int(line))
241        # no exception will be raised because we don't attempt to read past
242        # the end of the output and, thus, the status never gets checked
243        if num == 2:
244          break
245
246  def testIterCmdOutputLines_delay(self):
247    output_sequence = [
248      _ProcessOutputEvent(read_contents='1\n2\n', ts=1),
249      _ProcessOutputEvent(read_contents=None, ts=2),
250      _ProcessOutputEvent(read_contents='Awake', ts=10),
251    ]
252    with _MockProcess(output_sequence=output_sequence) as mock_proc:
253      for num, line in enumerate(
254          cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc',
255                                         iter_timeout=5), 1):
256        if num <= 2:
257          self.assertEquals(num, int(line))
258        elif num == 3:
259          self.assertEquals(None, line)
260        elif num == 4:
261          self.assertEquals('Awake', line)
262        else:
263          self.fail()
264
265
266if __name__ == '__main__':
267  unittest.main()
268