1#!/usr/bin/python
2
3import errno
4import logging
5import os
6import select
7import shutil
8import socket
9import StringIO
10import subprocess
11import time
12import unittest
13import urllib2
14
15import common
16from autotest_lib.client.common_lib import base_utils, autotemp
17from autotest_lib.client.common_lib.test_utils import mock
18
19
20class test_read_one_line(unittest.TestCase):
21    def setUp(self):
22        self.god = mock.mock_god(ut=self)
23        self.god.stub_function(base_utils, "open")
24
25
26    def tearDown(self):
27        self.god.unstub_all()
28
29
30    def test_ip_to_long(self):
31        self.assertEqual(base_utils.ip_to_long('0.0.0.0'), 0)
32        self.assertEqual(base_utils.ip_to_long('255.255.255.255'), 4294967295)
33        self.assertEqual(base_utils.ip_to_long('192.168.0.1'), 3232235521)
34        self.assertEqual(base_utils.ip_to_long('1.2.4.8'), 16909320)
35
36
37    def test_long_to_ip(self):
38        self.assertEqual(base_utils.long_to_ip(0), '0.0.0.0')
39        self.assertEqual(base_utils.long_to_ip(4294967295), '255.255.255.255')
40        self.assertEqual(base_utils.long_to_ip(3232235521), '192.168.0.1')
41        self.assertEqual(base_utils.long_to_ip(16909320), '1.2.4.8')
42
43
44    def test_create_subnet_mask(self):
45        self.assertEqual(base_utils.create_subnet_mask(0), 0)
46        self.assertEqual(base_utils.create_subnet_mask(32), 4294967295)
47        self.assertEqual(base_utils.create_subnet_mask(25), 4294967168)
48
49
50    def test_format_ip_with_mask(self):
51        self.assertEqual(base_utils.format_ip_with_mask('192.168.0.1', 0),
52                         '0.0.0.0/0')
53        self.assertEqual(base_utils.format_ip_with_mask('192.168.0.1', 32),
54                         '192.168.0.1/32')
55        self.assertEqual(base_utils.format_ip_with_mask('192.168.0.1', 26),
56                         '192.168.0.0/26')
57        self.assertEqual(base_utils.format_ip_with_mask('192.168.0.255', 26),
58                         '192.168.0.192/26')
59
60
61    def create_test_file(self, contents):
62        test_file = StringIO.StringIO(contents)
63        base_utils.open.expect_call("filename", "r").and_return(test_file)
64
65
66    def test_reads_one_line_file(self):
67        self.create_test_file("abc\n")
68        self.assertEqual("abc", base_utils.read_one_line("filename"))
69        self.god.check_playback()
70
71
72    def test_strips_read_lines(self):
73        self.create_test_file("abc   \n")
74        self.assertEqual("abc   ", base_utils.read_one_line("filename"))
75        self.god.check_playback()
76
77
78    def test_drops_extra_lines(self):
79        self.create_test_file("line 1\nline 2\nline 3\n")
80        self.assertEqual("line 1", base_utils.read_one_line("filename"))
81        self.god.check_playback()
82
83
84    def test_works_on_empty_file(self):
85        self.create_test_file("")
86        self.assertEqual("", base_utils.read_one_line("filename"))
87        self.god.check_playback()
88
89
90    def test_works_on_file_with_no_newlines(self):
91        self.create_test_file("line but no newline")
92        self.assertEqual("line but no newline",
93                         base_utils.read_one_line("filename"))
94        self.god.check_playback()
95
96
97    def test_preserves_leading_whitespace(self):
98        self.create_test_file("   has leading whitespace")
99        self.assertEqual("   has leading whitespace",
100                         base_utils.read_one_line("filename"))
101
102
103class test_write_one_line(unittest.TestCase):
104    def setUp(self):
105        self.god = mock.mock_god(ut=self)
106        self.god.stub_function(base_utils, "open")
107
108
109    def tearDown(self):
110        self.god.unstub_all()
111
112
113    def get_write_one_line_output(self, content):
114        test_file = mock.SaveDataAfterCloseStringIO()
115        base_utils.open.expect_call("filename", "w").and_return(test_file)
116        base_utils.write_one_line("filename", content)
117        self.god.check_playback()
118        return test_file.final_data
119
120
121    def test_writes_one_line_file(self):
122        self.assertEqual("abc\n", self.get_write_one_line_output("abc"))
123
124
125    def test_preserves_existing_newline(self):
126        self.assertEqual("abc\n", self.get_write_one_line_output("abc\n"))
127
128
129    def test_preserves_leading_whitespace(self):
130        self.assertEqual("   abc\n", self.get_write_one_line_output("   abc"))
131
132
133    def test_preserves_trailing_whitespace(self):
134        self.assertEqual("abc   \n", self.get_write_one_line_output("abc   "))
135
136
137    def test_handles_empty_input(self):
138        self.assertEqual("\n", self.get_write_one_line_output(""))
139
140
141class test_open_write_close(unittest.TestCase):
142    def setUp(self):
143        self.god = mock.mock_god(ut=self)
144        self.god.stub_function(base_utils, "open")
145
146
147    def tearDown(self):
148        self.god.unstub_all()
149
150
151    def test_simple_functionality(self):
152        data = "\n\nwhee\n"
153        test_file = mock.SaveDataAfterCloseStringIO()
154        base_utils.open.expect_call("filename", "w").and_return(test_file)
155        base_utils.open_write_close("filename", data)
156        self.god.check_playback()
157        self.assertEqual(data, test_file.final_data)
158
159
160class test_read_keyval(unittest.TestCase):
161    def setUp(self):
162        self.god = mock.mock_god(ut=self)
163        self.god.stub_function(base_utils, "open")
164        self.god.stub_function(os.path, "isdir")
165        self.god.stub_function(os.path, "exists")
166
167
168    def tearDown(self):
169        self.god.unstub_all()
170
171
172    def create_test_file(self, filename, contents):
173        test_file = StringIO.StringIO(contents)
174        os.path.exists.expect_call(filename).and_return(True)
175        base_utils.open.expect_call(filename).and_return(test_file)
176
177
178    def read_keyval(self, contents):
179        os.path.isdir.expect_call("file").and_return(False)
180        self.create_test_file("file", contents)
181        keyval = base_utils.read_keyval("file")
182        self.god.check_playback()
183        return keyval
184
185
186    def test_returns_empty_when_file_doesnt_exist(self):
187        os.path.isdir.expect_call("file").and_return(False)
188        os.path.exists.expect_call("file").and_return(False)
189        self.assertEqual({}, base_utils.read_keyval("file"))
190        self.god.check_playback()
191
192
193    def test_accesses_files_directly(self):
194        os.path.isdir.expect_call("file").and_return(False)
195        self.create_test_file("file", "")
196        base_utils.read_keyval("file")
197        self.god.check_playback()
198
199
200    def test_accesses_directories_through_keyval_file(self):
201        os.path.isdir.expect_call("dir").and_return(True)
202        self.create_test_file("dir/keyval", "")
203        base_utils.read_keyval("dir")
204        self.god.check_playback()
205
206
207    def test_values_are_rstripped(self):
208        keyval = self.read_keyval("a=b   \n")
209        self.assertEquals(keyval, {"a": "b"})
210
211
212    def test_comments_are_ignored(self):
213        keyval = self.read_keyval("a=b # a comment\n")
214        self.assertEquals(keyval, {"a": "b"})
215
216
217    def test_integers_become_ints(self):
218        keyval = self.read_keyval("a=1\n")
219        self.assertEquals(keyval, {"a": 1})
220        self.assertEquals(int, type(keyval["a"]))
221
222
223    def test_float_values_become_floats(self):
224        keyval = self.read_keyval("a=1.5\n")
225        self.assertEquals(keyval, {"a": 1.5})
226        self.assertEquals(float, type(keyval["a"]))
227
228
229    def test_multiple_lines(self):
230        keyval = self.read_keyval("a=one\nb=two\n")
231        self.assertEquals(keyval, {"a": "one", "b": "two"})
232
233
234    def test_the_last_duplicate_line_is_used(self):
235        keyval = self.read_keyval("a=one\nb=two\na=three\n")
236        self.assertEquals(keyval, {"a": "three", "b": "two"})
237
238
239    def test_extra_equals_are_included_in_values(self):
240        keyval = self.read_keyval("a=b=c\n")
241        self.assertEquals(keyval, {"a": "b=c"})
242
243
244    def test_non_alphanumeric_keynames_are_rejected(self):
245        self.assertRaises(ValueError, self.read_keyval, "a$=one\n")
246
247
248    def test_underscores_are_allowed_in_key_names(self):
249        keyval = self.read_keyval("a_b=value\n")
250        self.assertEquals(keyval, {"a_b": "value"})
251
252
253    def test_dashes_are_allowed_in_key_names(self):
254        keyval = self.read_keyval("a-b=value\n")
255        self.assertEquals(keyval, {"a-b": "value"})
256
257    def test_empty_value_is_allowed(self):
258        keyval = self.read_keyval("a=\n")
259        self.assertEquals(keyval, {"a": ""})
260
261
262class test_write_keyval(unittest.TestCase):
263    def setUp(self):
264        self.god = mock.mock_god(ut=self)
265        self.god.stub_function(base_utils, "open")
266        self.god.stub_function(os.path, "isdir")
267
268
269    def tearDown(self):
270        self.god.unstub_all()
271
272
273    def assertHasLines(self, value, lines):
274        vlines = value.splitlines()
275        vlines.sort()
276        self.assertEquals(vlines, sorted(lines))
277
278
279    def write_keyval(self, filename, dictionary, expected_filename=None,
280                     type_tag=None):
281        if expected_filename is None:
282            expected_filename = filename
283        test_file = StringIO.StringIO()
284        self.god.stub_function(test_file, "close")
285        base_utils.open.expect_call(expected_filename, "a").and_return(test_file)
286        test_file.close.expect_call()
287        if type_tag is None:
288            base_utils.write_keyval(filename, dictionary)
289        else:
290            base_utils.write_keyval(filename, dictionary, type_tag)
291        return test_file.getvalue()
292
293
294    def write_keyval_file(self, dictionary, type_tag=None):
295        os.path.isdir.expect_call("file").and_return(False)
296        return self.write_keyval("file", dictionary, type_tag=type_tag)
297
298
299    def test_accesses_files_directly(self):
300        os.path.isdir.expect_call("file").and_return(False)
301        result = self.write_keyval("file", {"a": "1"})
302        self.assertEquals(result, "a=1\n")
303
304
305    def test_accesses_directories_through_keyval_file(self):
306        os.path.isdir.expect_call("dir").and_return(True)
307        result = self.write_keyval("dir", {"b": "2"}, "dir/keyval")
308        self.assertEquals(result, "b=2\n")
309
310
311    def test_numbers_are_stringified(self):
312        result = self.write_keyval_file({"c": 3})
313        self.assertEquals(result, "c=3\n")
314
315
316    def test_type_tags_are_excluded_by_default(self):
317        result = self.write_keyval_file({"d": "a string"})
318        self.assertEquals(result, "d=a string\n")
319        self.assertRaises(ValueError, self.write_keyval_file,
320                          {"d{perf}": "a string"})
321
322
323    def test_perf_tags_are_allowed(self):
324        result = self.write_keyval_file({"a{perf}": 1, "b{perf}": 2},
325                                        type_tag="perf")
326        self.assertHasLines(result, ["a{perf}=1", "b{perf}=2"])
327        self.assertRaises(ValueError, self.write_keyval_file,
328                          {"a": 1, "b": 2}, type_tag="perf")
329
330
331    def test_non_alphanumeric_keynames_are_rejected(self):
332        self.assertRaises(ValueError, self.write_keyval_file, {"x$": 0})
333
334
335    def test_underscores_are_allowed_in_key_names(self):
336        result = self.write_keyval_file({"a_b": "value"})
337        self.assertEquals(result, "a_b=value\n")
338
339
340    def test_dashes_are_allowed_in_key_names(self):
341        result = self.write_keyval_file({"a-b": "value"})
342        self.assertEquals(result, "a-b=value\n")
343
344
345class test_is_url(unittest.TestCase):
346    def test_accepts_http(self):
347        self.assertTrue(base_utils.is_url("http://example.com"))
348
349
350    def test_accepts_ftp(self):
351        self.assertTrue(base_utils.is_url("ftp://ftp.example.com"))
352
353
354    def test_rejects_local_path(self):
355        self.assertFalse(base_utils.is_url("/home/username/file"))
356
357
358    def test_rejects_local_filename(self):
359        self.assertFalse(base_utils.is_url("filename"))
360
361
362    def test_rejects_relative_local_path(self):
363        self.assertFalse(base_utils.is_url("somedir/somesubdir/file"))
364
365
366    def test_rejects_local_path_containing_url(self):
367        self.assertFalse(base_utils.is_url("somedir/http://path/file"))
368
369
370class test_urlopen(unittest.TestCase):
371    def setUp(self):
372        self.god = mock.mock_god(ut=self)
373
374
375    def tearDown(self):
376        self.god.unstub_all()
377
378
379    def stub_urlopen_with_timeout_comparison(self, test_func, expected_return,
380                                             *expected_args):
381        expected_args += (None,) * (2 - len(expected_args))
382        def urlopen(url, data=None):
383            self.assertEquals(expected_args, (url,data))
384            test_func(socket.getdefaulttimeout())
385            return expected_return
386        self.god.stub_with(urllib2, "urlopen", urlopen)
387
388
389    def stub_urlopen_with_timeout_check(self, expected_timeout,
390                                        expected_return, *expected_args):
391        def test_func(timeout):
392            self.assertEquals(timeout, expected_timeout)
393        self.stub_urlopen_with_timeout_comparison(test_func, expected_return,
394                                                  *expected_args)
395
396
397    def test_timeout_set_during_call(self):
398        self.stub_urlopen_with_timeout_check(30, "retval", "url")
399        retval = base_utils.urlopen("url", timeout=30)
400        self.assertEquals(retval, "retval")
401
402
403    def test_timeout_reset_after_call(self):
404        old_timeout = socket.getdefaulttimeout()
405        self.stub_urlopen_with_timeout_check(30, None, "url")
406        try:
407            socket.setdefaulttimeout(1234)
408            base_utils.urlopen("url", timeout=30)
409            self.assertEquals(1234, socket.getdefaulttimeout())
410        finally:
411            socket.setdefaulttimeout(old_timeout)
412
413
414    def test_timeout_set_by_default(self):
415        def test_func(timeout):
416            self.assertTrue(timeout is not None)
417        self.stub_urlopen_with_timeout_comparison(test_func, None, "url")
418        base_utils.urlopen("url")
419
420
421    def test_args_are_untouched(self):
422        self.stub_urlopen_with_timeout_check(30, None, "http://url",
423                                             "POST data")
424        base_utils.urlopen("http://url", timeout=30, data="POST data")
425
426
427class test_urlretrieve(unittest.TestCase):
428    def setUp(self):
429        self.god = mock.mock_god(ut=self)
430
431
432    def tearDown(self):
433        self.god.unstub_all()
434
435
436    def test_urlopen_passed_arguments(self):
437        self.god.stub_function(base_utils, "urlopen")
438        self.god.stub_function(base_utils.shutil, "copyfileobj")
439        self.god.stub_function(base_utils, "open")
440
441        url = "url"
442        dest = "somefile"
443        data = object()
444        timeout = 10
445
446        src_file = self.god.create_mock_class(file, "file")
447        dest_file = self.god.create_mock_class(file, "file")
448
449        (base_utils.urlopen.expect_call(url, data=data, timeout=timeout)
450                .and_return(src_file))
451        base_utils.open.expect_call(dest, "wb").and_return(dest_file)
452        base_utils.shutil.copyfileobj.expect_call(src_file, dest_file)
453        dest_file.close.expect_call()
454        src_file.close.expect_call()
455
456        base_utils.urlretrieve(url, dest, data=data, timeout=timeout)
457        self.god.check_playback()
458
459
460class test_merge_trees(unittest.TestCase):
461    # a some path-handling helper functions
462    def src(self, *path_segments):
463        return os.path.join(self.src_tree.name, *path_segments)
464
465
466    def dest(self, *path_segments):
467        return os.path.join(self.dest_tree.name, *path_segments)
468
469
470    def paths(self, *path_segments):
471        return self.src(*path_segments), self.dest(*path_segments)
472
473
474    def assertFileEqual(self, *path_segments):
475        src, dest = self.paths(*path_segments)
476        self.assertEqual(True, os.path.isfile(src))
477        self.assertEqual(True, os.path.isfile(dest))
478        self.assertEqual(os.path.getsize(src), os.path.getsize(dest))
479        self.assertEqual(open(src).read(), open(dest).read())
480
481
482    def assertFileContents(self, contents, *path_segments):
483        dest = self.dest(*path_segments)
484        self.assertEqual(True, os.path.isfile(dest))
485        self.assertEqual(os.path.getsize(dest), len(contents))
486        self.assertEqual(contents, open(dest).read())
487
488
489    def setUp(self):
490        self.src_tree = autotemp.tempdir(unique_id='utilsrc')
491        self.dest_tree = autotemp.tempdir(unique_id='utilsdest')
492
493        # empty subdirs
494        os.mkdir(self.src("empty"))
495        os.mkdir(self.dest("empty"))
496
497
498    def tearDown(self):
499        self.src_tree.clean()
500        self.dest_tree.clean()
501
502
503    def test_both_dont_exist(self):
504        base_utils.merge_trees(*self.paths("empty"))
505
506
507    def test_file_only_at_src(self):
508        print >> open(self.src("src_only"), "w"), "line 1"
509        base_utils.merge_trees(*self.paths("src_only"))
510        self.assertFileEqual("src_only")
511
512
513    def test_file_only_at_dest(self):
514        print >> open(self.dest("dest_only"), "w"), "line 1"
515        base_utils.merge_trees(*self.paths("dest_only"))
516        self.assertEqual(False, os.path.exists(self.src("dest_only")))
517        self.assertFileContents("line 1\n", "dest_only")
518
519
520    def test_file_at_both(self):
521        print >> open(self.dest("in_both"), "w"), "line 1"
522        print >> open(self.src("in_both"), "w"), "line 2"
523        base_utils.merge_trees(*self.paths("in_both"))
524        self.assertFileContents("line 1\nline 2\n", "in_both")
525
526
527    def test_directory_with_files_in_both(self):
528        print >> open(self.dest("in_both"), "w"), "line 1"
529        print >> open(self.src("in_both"), "w"), "line 3"
530        base_utils.merge_trees(*self.paths())
531        self.assertFileContents("line 1\nline 3\n", "in_both")
532
533
534    def test_directory_with_mix_of_files(self):
535        print >> open(self.dest("in_dest"), "w"), "dest line"
536        print >> open(self.src("in_src"), "w"), "src line"
537        base_utils.merge_trees(*self.paths())
538        self.assertFileContents("dest line\n", "in_dest")
539        self.assertFileContents("src line\n", "in_src")
540
541
542    def test_directory_with_subdirectories(self):
543        os.mkdir(self.src("src_subdir"))
544        print >> open(self.src("src_subdir", "subfile"), "w"), "subdir line"
545        os.mkdir(self.src("both_subdir"))
546        os.mkdir(self.dest("both_subdir"))
547        print >> open(self.src("both_subdir", "subfile"), "w"), "src line"
548        print >> open(self.dest("both_subdir", "subfile"), "w"), "dest line"
549        base_utils.merge_trees(*self.paths())
550        self.assertFileContents("subdir line\n", "src_subdir", "subfile")
551        self.assertFileContents("dest line\nsrc line\n", "both_subdir",
552                                "subfile")
553
554
555class test_get_relative_path(unittest.TestCase):
556    def test_not_absolute(self):
557        self.assertRaises(AssertionError, base_utils.get_relative_path, "a", "b")
558
559    def test_same_dir(self):
560        self.assertEqual(base_utils.get_relative_path("/a/b/c", "/a/b"), "c")
561
562    def test_forward_dir(self):
563        self.assertEqual(base_utils.get_relative_path("/a/b/c/d", "/a/b"), "c/d")
564
565    def test_previous_dir(self):
566        self.assertEqual(base_utils.get_relative_path("/a/b", "/a/b/c/d"), "../..")
567
568    def test_parallel_dir(self):
569        self.assertEqual(base_utils.get_relative_path("/a/c/d", "/a/b/c/d"),
570                         "../../../c/d")
571
572
573class test_sh_escape(unittest.TestCase):
574    def _test_in_shell(self, text):
575        escaped_text = base_utils.sh_escape(text)
576        proc = subprocess.Popen('echo "%s"' % escaped_text, shell=True,
577                                stdin=open(os.devnull, 'r'),
578                                stdout=subprocess.PIPE,
579                                stderr=open(os.devnull, 'w'))
580        stdout, _ = proc.communicate()
581        self.assertEqual(proc.returncode, 0)
582        self.assertEqual(stdout[:-1], text)
583
584
585    def test_normal_string(self):
586        self._test_in_shell('abcd')
587
588
589    def test_spaced_string(self):
590        self._test_in_shell('abcd efgh')
591
592
593    def test_dollar(self):
594        self._test_in_shell('$')
595
596
597    def test_single_quote(self):
598        self._test_in_shell('\'')
599
600
601    def test_single_quoted_string(self):
602        self._test_in_shell('\'efgh\'')
603
604
605    def test_string_with_single_quote(self):
606        self._test_in_shell("a'b")
607
608
609    def test_string_with_escaped_single_quote(self):
610        self._test_in_shell(r"a\'b")
611
612
613    def test_double_quote(self):
614        self._test_in_shell('"')
615
616
617    def test_double_quoted_string(self):
618        self._test_in_shell('"abcd"')
619
620
621    def test_backtick(self):
622        self._test_in_shell('`')
623
624
625    def test_backticked_string(self):
626        self._test_in_shell('`jklm`')
627
628
629    def test_backslash(self):
630        self._test_in_shell('\\')
631
632
633    def test_backslashed_special_characters(self):
634        self._test_in_shell('\\$')
635        self._test_in_shell('\\"')
636        self._test_in_shell('\\\'')
637        self._test_in_shell('\\`')
638
639
640    def test_backslash_codes(self):
641        self._test_in_shell('\\n')
642        self._test_in_shell('\\r')
643        self._test_in_shell('\\t')
644        self._test_in_shell('\\v')
645        self._test_in_shell('\\b')
646        self._test_in_shell('\\a')
647        self._test_in_shell('\\000')
648
649    def test_real_newline(self):
650        self._test_in_shell('\n')
651        self._test_in_shell('\\\n')
652
653
654class test_sh_quote_word(test_sh_escape):
655    """Run tests on sh_quote_word.
656
657    Inherit from test_sh_escape to get the same tests to run on both.
658    """
659
660    def _test_in_shell(self, text):
661        quoted_word = base_utils.sh_quote_word(text)
662        echoed_value = subprocess.check_output('echo %s' % quoted_word,
663                                               shell=True)
664        self.assertEqual(echoed_value, text + '\n')
665
666
667class test_nested_sh_quote_word(test_sh_quote_word):
668    """Run nested tests on sh_quote_word.
669
670    Inherit from test_sh_quote_word to get the same tests to run on both.
671    """
672
673    def _test_in_shell(self, text):
674        command = 'echo ' + base_utils.sh_quote_word(text)
675        nested_command = 'echo ' + base_utils.sh_quote_word(command)
676        produced_command = subprocess.check_output(nested_command, shell=True)
677        echoed_value = subprocess.check_output(produced_command, shell=True)
678        self.assertEqual(echoed_value, text + '\n')
679
680
681class test_run(unittest.TestCase):
682    """
683    Test the base_utils.run() function.
684
685    Note: This test runs simple external commands to test the base_utils.run()
686    API without assuming implementation details.
687    """
688    def setUp(self):
689        self.god = mock.mock_god(ut=self)
690        self.god.stub_function(base_utils.logging, 'warning')
691        self.god.stub_function(base_utils.logging, 'debug')
692
693
694    def tearDown(self):
695        self.god.unstub_all()
696
697
698    def __check_result(self, result, command, exit_status=0, stdout='',
699                       stderr=''):
700        self.assertEquals(result.command, command)
701        self.assertEquals(result.exit_status, exit_status)
702        self.assertEquals(result.stdout, stdout)
703        self.assertEquals(result.stderr, stderr)
704
705
706    def test_default_simple(self):
707        cmd = 'echo "hello world"'
708        # expect some king of logging.debug() call but don't care about args
709        base_utils.logging.debug.expect_any_call()
710        self.__check_result(base_utils.run(cmd), cmd, stdout='hello world\n')
711
712
713    def test_default_failure(self):
714        cmd = 'exit 11'
715        try:
716            base_utils.run(cmd, verbose=False)
717        except base_utils.error.CmdError, err:
718            self.__check_result(err.result_obj, cmd, exit_status=11)
719
720
721    def test_ignore_status(self):
722        cmd = 'echo error >&2 && exit 11'
723        self.__check_result(base_utils.run(cmd, ignore_status=True, verbose=False),
724                            cmd, exit_status=11, stderr='error\n')
725
726
727    def test_timeout(self):
728        # we expect a logging.warning() message, don't care about the contents
729        base_utils.logging.warning.expect_any_call()
730        try:
731            base_utils.run('echo -n output && sleep 10', timeout=1, verbose=False)
732        except base_utils.error.CmdError, err:
733            self.assertEquals(err.result_obj.stdout, 'output')
734
735
736    def test_stdout_stderr_tee(self):
737        cmd = 'echo output && echo error >&2'
738        stdout_tee = StringIO.StringIO()
739        stderr_tee = StringIO.StringIO()
740
741        self.__check_result(base_utils.run(
742                cmd, stdout_tee=stdout_tee, stderr_tee=stderr_tee,
743                verbose=False), cmd, stdout='output\n', stderr='error\n')
744        self.assertEqual(stdout_tee.getvalue(), 'output\n')
745        self.assertEqual(stderr_tee.getvalue(), 'error\n')
746
747
748    def test_stdin_string(self):
749        cmd = 'cat'
750        self.__check_result(base_utils.run(cmd, verbose=False, stdin='hi!\n'),
751                            cmd, stdout='hi!\n')
752
753
754    def test_safe_args(self):
755        # NOTE: The string in expected_quoted_cmd depends on the internal
756        # implementation of shell quoting which is used by base_utils.run(),
757        # in this case, sh_quote_word().
758        expected_quoted_cmd = "echo 'hello \"world' again"
759        self.__check_result(base_utils.run(
760                'echo', verbose=False, args=('hello "world', 'again')),
761                expected_quoted_cmd, stdout='hello "world again\n')
762
763
764    def test_safe_args_given_string(self):
765        self.assertRaises(TypeError, base_utils.run, 'echo', args='hello')
766
767
768    def test_wait_interrupt(self):
769        """Test that we actually select twice if the first one returns EINTR."""
770        base_utils.logging.debug.expect_any_call()
771
772        bg_job = base_utils.BgJob('echo "hello world"')
773        bg_job.result.exit_status = 0
774        self.god.stub_function(base_utils.select, 'select')
775
776        base_utils.select.select.expect_any_call().and_raises(
777                select.error(errno.EINTR, 'Select interrupted'))
778        base_utils.logging.warning.expect_any_call()
779
780        base_utils.select.select.expect_any_call().and_return(
781                ([bg_job.sp.stdout, bg_job.sp.stderr], [], None))
782        base_utils.logging.warning.expect_any_call()
783
784        self.assertFalse(
785                base_utils._wait_for_commands([bg_job], time.time(), None))
786
787
788class test_compare_versions(unittest.TestCase):
789    def test_zerofill(self):
790        self.assertEqual(base_utils.compare_versions('1.7', '1.10'), -1)
791        self.assertEqual(base_utils.compare_versions('1.222', '1.3'), 1)
792        self.assertEqual(base_utils.compare_versions('1.03', '1.3'), 0)
793
794
795    def test_unequal_len(self):
796        self.assertEqual(base_utils.compare_versions('1.3', '1.3.4'), -1)
797        self.assertEqual(base_utils.compare_versions('1.3.1', '1.3'), 1)
798
799
800    def test_dash_delimited(self):
801        self.assertEqual(base_utils.compare_versions('1-2-3', '1-5-1'), -1)
802        self.assertEqual(base_utils.compare_versions('1-2-1', '1-1-1'), 1)
803        self.assertEqual(base_utils.compare_versions('1-2-4', '1-2-4'), 0)
804
805
806    def test_alphabets(self):
807        self.assertEqual(base_utils.compare_versions('m.l.b', 'n.b.a'), -1)
808        self.assertEqual(base_utils.compare_versions('n.b.a', 'm.l.b'), 1)
809        self.assertEqual(base_utils.compare_versions('abc.e', 'abc.e'), 0)
810
811
812    def test_mix_symbols(self):
813        self.assertEqual(base_utils.compare_versions('k-320.1', 'k-320.3'), -1)
814        self.assertEqual(base_utils.compare_versions('k-231.5', 'k-231.1'), 1)
815        self.assertEqual(base_utils.compare_versions('k-231.1', 'k-231.1'), 0)
816
817        self.assertEqual(base_utils.compare_versions('k.320-1', 'k.320-3'), -1)
818        self.assertEqual(base_utils.compare_versions('k.231-5', 'k.231-1'), 1)
819        self.assertEqual(base_utils.compare_versions('k.231-1', 'k.231-1'), 0)
820
821
822class test_args_to_dict(unittest.TestCase):
823    def test_no_args(self):
824        result = base_utils.args_to_dict([])
825        self.assertEqual({}, result)
826
827
828    def test_matches(self):
829        result = base_utils.args_to_dict(['aBc:DeF', 'SyS=DEf', 'XY_Z:',
830                                     'F__o0O=', 'B8r:=:=', '_bAZ_=:=:'])
831        self.assertEqual(result, {'abc':'DeF', 'sys':'DEf', 'xy_z':'',
832                                  'f__o0o':'', 'b8r':'=:=', '_baz_':':=:'})
833
834
835    def test_unmatches(self):
836        # Temporarily shut warning messages from args_to_dict() when an argument
837        # doesn't match its pattern.
838        logger = logging.getLogger()
839        saved_level = logger.level
840        logger.setLevel(logging.ERROR)
841
842        try:
843            result = base_utils.args_to_dict(['ab-c:DeF', '--SyS=DEf', 'a*=b', 'a*b',
844                                         ':VAL', '=VVV', 'WORD'])
845            self.assertEqual({}, result)
846        finally:
847            # Restore level.
848            logger.setLevel(saved_level)
849
850
851class test_get_random_port(unittest.TestCase):
852    def do_bind(self, port, socket_type, socket_proto):
853        s = socket.socket(socket.AF_INET, socket_type, socket_proto)
854        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
855        s.bind(('', port))
856        return s
857
858
859    def test_get_port(self):
860        for _ in xrange(100):
861            p = base_utils.get_unused_port()
862            s = self.do_bind(p, socket.SOCK_STREAM, socket.IPPROTO_TCP)
863            self.assert_(s.getsockname())
864            s = self.do_bind(p, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
865            self.assert_(s.getsockname())
866
867
868if __name__ == "__main__":
869    unittest.main()
870