1#!/usr/bin/env python 2# Copyright 2013 The Chromium Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6"""Tests for the cmd_helper module.""" 7 8import unittest 9import subprocess 10import sys 11import time 12 13from devil import devil_env 14from devil.utils import cmd_helper 15 16with devil_env.SysPath(devil_env.PYMOCK_PATH): 17 import mock # pylint: disable=import-error 18 19 20class CmdHelperSingleQuoteTest(unittest.TestCase): 21 22 def testSingleQuote_basic(self): 23 self.assertEquals('hello', 24 cmd_helper.SingleQuote('hello')) 25 26 def testSingleQuote_withSpaces(self): 27 self.assertEquals("'hello world'", 28 cmd_helper.SingleQuote('hello world')) 29 30 def testSingleQuote_withUnsafeChars(self): 31 self.assertEquals("""'hello'"'"'; rm -rf /'""", 32 cmd_helper.SingleQuote("hello'; rm -rf /")) 33 34 def testSingleQuote_dontExpand(self): 35 test_string = 'hello $TEST_VAR' 36 cmd = 'TEST_VAR=world; echo %s' % cmd_helper.SingleQuote(test_string) 37 self.assertEquals(test_string, 38 cmd_helper.GetCmdOutput(cmd, shell=True).rstrip()) 39 40 41class CmdHelperDoubleQuoteTest(unittest.TestCase): 42 43 def testDoubleQuote_basic(self): 44 self.assertEquals('hello', 45 cmd_helper.DoubleQuote('hello')) 46 47 def testDoubleQuote_withSpaces(self): 48 self.assertEquals('"hello world"', 49 cmd_helper.DoubleQuote('hello world')) 50 51 def testDoubleQuote_withUnsafeChars(self): 52 self.assertEquals('''"hello\\"; rm -rf /"''', 53 cmd_helper.DoubleQuote('hello"; rm -rf /')) 54 55 def testSingleQuote_doExpand(self): 56 test_string = 'hello $TEST_VAR' 57 cmd = 'TEST_VAR=world; echo %s' % cmd_helper.DoubleQuote(test_string) 58 self.assertEquals('hello world', 59 cmd_helper.GetCmdOutput(cmd, shell=True).rstrip()) 60 61 62class CmdHelperShinkToSnippetTest(unittest.TestCase): 63 64 def testShrinkToSnippet_noArgs(self): 65 self.assertEquals('foo', 66 cmd_helper.ShrinkToSnippet(['foo'], 'a', 'bar')) 67 self.assertEquals("'foo foo'", 68 cmd_helper.ShrinkToSnippet(['foo foo'], 'a', 'bar')) 69 self.assertEquals('"$a"\' bar\'', 70 cmd_helper.ShrinkToSnippet(['foo bar'], 'a', 'foo')) 71 self.assertEquals('\'foo \'"$a"', 72 cmd_helper.ShrinkToSnippet(['foo bar'], 'a', 'bar')) 73 self.assertEquals('foo"$a"', 74 cmd_helper.ShrinkToSnippet(['foobar'], 'a', 'bar')) 75 76 def testShrinkToSnippet_singleArg(self): 77 self.assertEquals("foo ''", 78 cmd_helper.ShrinkToSnippet(['foo', ''], 'a', 'bar')) 79 self.assertEquals("foo foo", 80 cmd_helper.ShrinkToSnippet(['foo', 'foo'], 'a', 'bar')) 81 self.assertEquals('"$a" "$a"', 82 cmd_helper.ShrinkToSnippet(['foo', 'foo'], 'a', 'foo')) 83 self.assertEquals('foo "$a""$a"', 84 cmd_helper.ShrinkToSnippet(['foo', 'barbar'], 'a', 'bar')) 85 self.assertEquals('foo "$a"\' \'"$a"', 86 cmd_helper.ShrinkToSnippet(['foo', 'bar bar'], 'a', 'bar')) 87 self.assertEquals('foo "$a""$a"\' \'', 88 cmd_helper.ShrinkToSnippet(['foo', 'barbar '], 'a', 'bar')) 89 self.assertEquals('foo \' \'"$a""$a"\' \'', 90 cmd_helper.ShrinkToSnippet(['foo', ' barbar '], 'a', 'bar')) 91 92 93_DEFAULT = 'DEFAULT' 94 95 96class _ProcessOutputEvent(object): 97 98 def __init__(self, select_fds=_DEFAULT, read_contents=None, ts=_DEFAULT): 99 self.select_fds = select_fds 100 self.read_contents = read_contents 101 self.ts = ts 102 103 104class _MockProcess(object): 105 106 def __init__(self, output_sequence=None, return_value=0): 107 108 # Arbitrary. 109 fake_stdout_fileno = 25 110 111 self.mock_proc = mock.MagicMock(spec=subprocess.Popen) 112 self.mock_proc.stdout = mock.MagicMock() 113 self.mock_proc.stdout.fileno = mock.MagicMock( 114 return_value=fake_stdout_fileno) 115 self.mock_proc.returncode = None 116 117 self._return_value = return_value 118 119 # This links the behavior of os.read, select.select, time.time, and 120 # <process>.poll. The output sequence can be thought of as a list of 121 # return values for select.select with corresponding return values for 122 # the other calls at any time between that select call and the following 123 # one. We iterate through the sequence only on calls to select.select. 124 # 125 # os.read is a special case, though, where we only return a given chunk 126 # of data *once* after a given call to select. 127 128 if not output_sequence: 129 output_sequence = [] 130 131 # Use an leading element to make the iteration logic work. 132 initial_seq_element = _ProcessOutputEvent( 133 _DEFAULT, '', 134 output_sequence[0].ts if output_sequence else _DEFAULT) 135 output_sequence.insert(0, initial_seq_element) 136 137 for o in output_sequence: 138 if o.select_fds == _DEFAULT: 139 if o.read_contents is None: 140 o.select_fds = [] 141 else: 142 o.select_fds = [fake_stdout_fileno] 143 if o.ts == _DEFAULT: 144 o.ts = time.time() 145 self._output_sequence = output_sequence 146 147 self._output_seq_index = 0 148 self._read_flags = [False] * len(output_sequence) 149 150 def read_side_effect(*_args, **_kwargs): 151 if self._read_flags[self._output_seq_index]: 152 return None 153 self._read_flags[self._output_seq_index] = True 154 return self._output_sequence[self._output_seq_index].read_contents 155 156 def select_side_effect(*_args, **_kwargs): 157 if self._output_seq_index is None: 158 self._output_seq_index = 0 159 else: 160 self._output_seq_index += 1 161 if self._output_seq_index < len(self._output_sequence): 162 return (self._output_sequence[self._output_seq_index].select_fds, 163 None, None) 164 else: 165 return([], None, None) 166 167 def time_side_effect(*_args, **_kwargs): 168 return self._output_sequence[self._output_seq_index].ts 169 170 def poll_side_effect(*_args, **_kwargs): 171 if self._output_seq_index >= len(self._output_sequence) - 1: 172 self.mock_proc.returncode = self._return_value 173 return self.mock_proc.returncode 174 175 mock_read = mock.MagicMock(side_effect=read_side_effect) 176 mock_select = mock.MagicMock(side_effect=select_side_effect) 177 mock_time = mock.MagicMock(side_effect=time_side_effect) 178 self.mock_proc.poll = mock.MagicMock(side_effect=poll_side_effect) 179 180 # Set up but *do not start* the mocks. 181 self._mocks = [ 182 mock.patch('os.read', new=mock_read), 183 mock.patch('select.select', new=mock_select), 184 mock.patch('time.time', new=mock_time), 185 ] 186 if sys.platform != 'win32': 187 self._mocks.append(mock.patch('fcntl.fcntl')) 188 189 def __enter__(self): 190 for m in self._mocks: 191 m.__enter__() 192 return self.mock_proc 193 194 def __exit__(self, exc_type, exc_val, exc_tb): 195 for m in reversed(self._mocks): 196 m.__exit__(exc_type, exc_val, exc_tb) 197 198 199class CmdHelperIterCmdOutputLinesTest(unittest.TestCase): 200 """Test IterCmdOutputLines with some calls to the unix 'seq' command.""" 201 202 # This calls _IterCmdOutputLines rather than IterCmdOutputLines s.t. it 203 # can mock the process. 204 # pylint: disable=protected-access 205 206 _SIMPLE_OUTPUT_SEQUENCE = [ 207 _ProcessOutputEvent(read_contents='1\n2\n'), 208 ] 209 210 def testIterCmdOutputLines_success(self): 211 with _MockProcess( 212 output_sequence=self._SIMPLE_OUTPUT_SEQUENCE) as mock_proc: 213 for num, line in enumerate( 214 cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc'), 1): 215 self.assertEquals(num, int(line)) 216 217 def testIterCmdOutputLines_exitStatusFail(self): 218 with self.assertRaises(subprocess.CalledProcessError): 219 with _MockProcess(output_sequence=self._SIMPLE_OUTPUT_SEQUENCE, 220 return_value=1) as mock_proc: 221 for num, line in enumerate( 222 cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc'), 1): 223 self.assertEquals(num, int(line)) 224 # after reading all the output we get an exit status of 1 225 226 def testIterCmdOutputLines_exitStatusIgnored(self): 227 with _MockProcess(output_sequence=self._SIMPLE_OUTPUT_SEQUENCE, 228 return_value=1) as mock_proc: 229 for num, line in enumerate( 230 cmd_helper._IterCmdOutputLines( 231 mock_proc, 'mock_proc', check_status=False), 232 1): 233 self.assertEquals(num, int(line)) 234 235 def testIterCmdOutputLines_exitStatusSkipped(self): 236 with _MockProcess(output_sequence=self._SIMPLE_OUTPUT_SEQUENCE, 237 return_value=1) as mock_proc: 238 for num, line in enumerate( 239 cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc'), 1): 240 self.assertEquals(num, int(line)) 241 # no exception will be raised because we don't attempt to read past 242 # the end of the output and, thus, the status never gets checked 243 if num == 2: 244 break 245 246 def testIterCmdOutputLines_delay(self): 247 output_sequence = [ 248 _ProcessOutputEvent(read_contents='1\n2\n', ts=1), 249 _ProcessOutputEvent(read_contents=None, ts=2), 250 _ProcessOutputEvent(read_contents='Awake', ts=10), 251 ] 252 with _MockProcess(output_sequence=output_sequence) as mock_proc: 253 for num, line in enumerate( 254 cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc', 255 iter_timeout=5), 1): 256 if num <= 2: 257 self.assertEquals(num, int(line)) 258 elif num == 3: 259 self.assertEquals(None, line) 260 elif num == 4: 261 self.assertEquals('Awake', line) 262 else: 263 self.fail() 264 265 266if __name__ == '__main__': 267 unittest.main() 268