job_unittest.py revision aee19fc0e6f543920ef5223bf2cf01beb16fb9ea
1#!/usr/bin/python
2
3import os, unittest, pickle, shutil, sys, time
4import common
5
6from autotest_lib.client.bin import job, boottool, config, sysinfo, harness
7from autotest_lib.client.bin import test, xen, kernel, autotest_utils, cpuset
8from autotest_lib.client.common_lib import packages, utils, error, logging
9from autotest_lib.client.common_lib.test_utils import mock
10
11
12class first_line_comparator(mock.argument_comparator):
13    def __init__(self, first_line):
14        self.first_line = first_line
15
16
17    def is_satisfied_by(self, parameter):
18        return self.first_line == parameter.splitlines()[0]
19
20
21class TestBaseJob(unittest.TestCase):
22    def setUp(self):
23        # make god
24        self.god = mock.mock_god()
25
26        # need to set some environ variables
27        self.autodir = "autodir"
28        os.environ['AUTODIR'] = self.autodir
29
30        # set up some variables
31        self.control = "control"
32        self.jobtag = "jobtag"
33
34        # stub out some stuff
35        self.god.stub_function(os.path, 'exists')
36        self.god.stub_function(os.path, 'isdir')
37        self.god.stub_function(os, 'mkdir')
38        self.god.stub_function(shutil, 'copyfile')
39        self.god.stub_function(job, 'open')
40        self.god.stub_function(utils, 'system')
41        self.god.stub_function(harness, 'select')
42        self.god.stub_function(sysinfo, 'log_per_reboot_data')
43        self.god.stub_function(pickle, 'load')
44        self.god.stub_function(sysinfo, 'log_per_reboot_data')
45
46        self.god.stub_class(config, 'config')
47        self.god.stub_class(boottool, 'boottool')
48
49
50    def tearDown(self):
51        self.god.unstub_all()
52
53
54    def construct_job(self, cont):
55        # will construct class instance using __new__
56        self.job = job.base_job.__new__(job.base_job)
57
58        # now some specific stubs
59        self.god.stub_function(self.job, '_load_state')
60        self.god.stub_function(self.job, '_init_group_level')
61        self.god.stub_function(self.job, 'config_get')
62        self.god.stub_function(self.job, 'record')
63        self.god.stub_function(self.job, '_increment_group_level')
64        self.god.stub_function(self.job, '_decrement_group_level')
65        self.god.stub_function(self.job, 'get_state')
66
67        # other setup
68        tmpdir = os.path.join(self.autodir, 'tmp')
69        results = os.path.join(self.autodir, 'results')
70        download = os.path.join(self.autodir, 'tests', 'download')
71        resultdir = os.path.join(self.autodir, 'results', self.jobtag)
72        sysinfodir = os.path.join(resultdir, 'sysinfo')
73        pkgdir = os.path.join(self.autodir, 'packages')
74
75        # record
76        self.job._load_state.expect_call()
77        if not cont:
78            os.path.exists.expect_call(tmpdir).and_return(False)
79            os.mkdir.expect_call(tmpdir)
80            os.path.exists.expect_call(pkgdir).and_return(False)
81            os.mkdir.expect_call(pkgdir)
82            os.path.exists.expect_call(results).and_return(False)
83            os.mkdir.expect_call(results)
84            os.path.exists.expect_call(download).and_return(False)
85            os.mkdir.expect_call(download)
86            os.path.exists.expect_call(resultdir).and_return(True)
87            utils.system.expect_call('rm -rf ' + resultdir)
88            os.mkdir.expect_call(resultdir)
89            os.mkdir.expect_call(sysinfodir)
90            os.mkdir.expect_call(os.path.join(resultdir, 'debug'))
91            os.mkdir.expect_call(os.path.join(resultdir,
92                                              'analysis'))
93            shutil.copyfile.expect_call(mock.is_string_comparator(),
94                                 os.path.join(resultdir, 'control'))
95
96        self.job._init_group_level.expect_call()
97        self.config = config.config.expect_new(self.job)
98        my_harness = self.god.create_mock_class(harness.harness,
99                                                'my_harness')
100        harness.select.expect_call(None,
101                                   self.job).and_return(my_harness)
102        self.job.config_get.expect_call(
103                'boottool.executable').and_return(None)
104        bootloader = boottool.boottool.expect_new(None)
105        sysinfo.log_per_reboot_data.expect_call(sysinfodir)
106        if not cont:
107            self.job.record.expect_call('START', None, None)
108            self.job._increment_group_level.expect_call()
109
110        my_harness.run_start.expect_call()
111        self.job.get_state.expect_call('__monitor_disk',
112                                       default=0.0).and_return(0.0)
113
114        # finish constructor
115        self.job.__init__(self.control, self.jobtag, cont)
116
117        # check
118        self.god.check_playback()
119
120
121    def test_constructor(self):
122        self.construct_job(False)
123
124
125    def test_monitor_disk_usage(self):
126        self.construct_job(True)
127
128        # setup
129        self.god.stub_function(self.job, 'set_state')
130
131        # record
132        max_rate = 10.0
133        self.job.set_state.expect_call('__monitor_disk', max_rate)
134
135        # test
136        self.job.monitor_disk_usage(max_rate)
137        self.god.check_playback()
138
139
140    def test_relative_path(self):
141        self.construct_job(True)
142        dummy = "asdf"
143        ret = self.job.relative_path(os.path.join(self.job.resultdir, dummy))
144        self.assertEquals(ret, dummy)
145
146
147    def test_control_functions(self):
148        self.construct_job(True)
149        control_file = "blah"
150        self.job.control_set(control_file)
151        self.assertEquals(self.job.control_get(), os.path.abspath(control_file))
152
153
154    def test_harness_select(self):
155        self.construct_job(True)
156
157        # record
158        which = "which"
159        harness.select.expect_call(which, self.job).and_return(None)
160
161        # run and test
162        self.job.harness_select(which)
163        self.god.check_playback()
164
165
166    def test_config_set(self):
167        self.construct_job(True)
168
169        # record
170        name = "foo"
171        val = 10
172        self.config.set.expect_call(name, val)
173
174        # run and test
175        self.job.config_set(name, val)
176        self.god.check_playback()
177
178
179    def test_config_get(self):
180        self.construct_job(True)
181
182        # unstub config_get
183        self.god.unstub(self.job, 'config_get')
184        # record
185        name = "foo"
186        val = 10
187        self.config.get.expect_call(name).and_return(val)
188
189        # run and test
190        self.job.config_get(name)
191        self.god.check_playback()
192
193
194    def test_setup_dirs_raise(self):
195        self.construct_job(True)
196
197        # setup
198        results_dir = 'foo'
199        tmp_dir = 'bar'
200
201        # record
202        os.path.exists.expect_call(tmp_dir).and_return(True)
203        os.path.isdir.expect_call(tmp_dir).and_return(False)
204
205        # test
206        self.assertRaises(ValueError, self.job.setup_dirs, results_dir, tmp_dir)
207        self.god.check_playback()
208
209
210    def test_setup_dirs(self):
211        self.construct_job(True)
212
213        # setup
214        results_dir1 = os.path.join(self.job.resultdir, 'build')
215        results_dir2 = os.path.join(self.job.resultdir, 'build.2')
216        results_dir3 = os.path.join(self.job.resultdir, 'build.3')
217        tmp_dir = 'bar'
218
219        # record
220        os.path.exists.expect_call(tmp_dir).and_return(False)
221        os.mkdir.expect_call(tmp_dir)
222        os.path.isdir.expect_call(tmp_dir).and_return(True)
223        os.path.exists.expect_call(results_dir1).and_return(True)
224        os.path.exists.expect_call(results_dir2).and_return(True)
225        os.path.exists.expect_call(results_dir3).and_return(False)
226        os.path.exists.expect_call(results_dir3).and_return(False)
227        os.mkdir.expect_call(results_dir3)
228
229        # test
230        self.assertEqual(self.job.setup_dirs(None, tmp_dir),
231                         (results_dir3, tmp_dir))
232        self.god.check_playback()
233
234
235    def test_xen(self):
236        self.construct_job(True)
237
238        # setup
239        self.god.stub_function(self.job, "setup_dirs")
240        self.god.stub_class(xen, "xen")
241        results = 'results_dir'
242        tmp = 'tmp'
243        build = 'xen'
244        base_tree = object()
245
246        # record
247        self.job.setup_dirs.expect_call(results,
248                                        tmp).and_return((results, tmp))
249        myxen = xen.xen.expect_new(self.job, base_tree, results, tmp, build,
250                                   False, None)
251
252        # run job and check
253        axen = self.job.xen(base_tree, results, tmp)
254        self.god.check_playback()
255        self.assertEquals(myxen, axen)
256
257
258    def test_kernel_rpm(self):
259        self.construct_job(True)
260
261        # setup
262        self.god.stub_function(self.job, "setup_dirs")
263        self.god.stub_class(kernel, "rpm_kernel")
264        self.god.stub_function(kernel, "preprocess_path")
265        self.god.stub_function(self.job.pkgmgr, "fetch_pkg")
266        results = 'results_dir'
267        tmp = 'tmp'
268        build = 'xen'
269        path = "somepath.rpm"
270        packages_dir = os.path.join("autodir/packages", path)
271
272        # record
273        self.job.setup_dirs.expect_call(results,
274                                        tmp).and_return((results, tmp))
275        kernel.preprocess_path.expect_call(path).and_return(path)
276        self.job.pkgmgr.fetch_pkg.expect_call(path, packages_dir, repo_url='')
277        mykernel = kernel.rpm_kernel.expect_new(self.job, packages_dir,
278                                                results)
279
280        # check
281        akernel = self.job.kernel(path, results, tmp)
282        self.god.check_playback()
283        self.assertEquals(mykernel, akernel)
284
285
286    def test_kernel(self):
287        self.construct_job(True)
288
289        # setup
290        self.god.stub_function(self.job, "setup_dirs")
291        self.god.stub_class(kernel, "kernel")
292        self.god.stub_function(kernel, "preprocess_path")
293        results = 'results_dir'
294        tmp = 'tmp'
295        build = 'linux'
296        path = "somepath.deb"
297
298        # record
299        self.job.setup_dirs.expect_call(results,
300                                        tmp).and_return((results, tmp))
301        kernel.preprocess_path.expect_call(path).and_return(path)
302        mykernel = kernel.kernel.expect_new(self.job, path, results, tmp,
303                                            build, False)
304
305        # check
306        akernel = self.job.kernel(path, results, tmp)
307        self.god.check_playback()
308        self.assertEquals(mykernel, akernel)
309
310
311    def test_run_test_logs_test_error_from_unhandled_error(self):
312        self.construct_job(True)
313
314        # set up stubs
315        self.god.stub_function(self.job.pkgmgr, 'get_package_name')
316        self.god.stub_function(self.job, "_runtest")
317
318        # create an unhandled error object
319        class MyError(error.TestError):
320            pass
321        real_error = MyError("this is the real error message")
322        unhandled_error = error.UnhandledTestError(real_error)
323
324        # set up the recording
325        testname = "error_test"
326        outputdir = os.path.join(self.job.resultdir, testname)
327        self.job.pkgmgr.get_package_name.expect_call(
328            testname, 'test').and_return(("", testname))
329        os.path.exists.expect_call(outputdir).and_return(False)
330        os.mkdir.expect_call(outputdir)
331        self.job.record.expect_call("START", testname, testname)
332        self.job._increment_group_level.expect_call()
333        self.job._runtest.expect_call(testname, "", (), {}).and_raises(
334            unhandled_error)
335        self.job.record.expect_call("ERROR", testname, testname,
336                                    first_line_comparator(str(real_error)))
337        self.job._decrement_group_level.expect_call()
338        self.job.record.expect_call("END ERROR", testname, testname,
339                                    first_line_comparator(str(real_error)))
340
341        # run and check
342        self.job.run_test(testname)
343        self.god.check_playback()
344
345
346    def test_run_test_logs_non_test_error_from_unhandled_error(self):
347        self.construct_job(True)
348
349        # set up stubs
350        self.god.stub_function(self.job.pkgmgr, 'get_package_name')
351        self.god.stub_function(self.job, "_runtest")
352
353        # create an unhandled error object
354        class MyError(Exception):
355            pass
356        real_error = MyError("this is the real error message")
357        unhandled_error = error.UnhandledTestError(real_error)
358        reason = first_line_comparator("Unhandled MyError: %s" % real_error)
359
360        # set up the recording
361        testname = "error_test"
362        outputdir = os.path.join(self.job.resultdir, testname)
363        self.job.pkgmgr.get_package_name.expect_call(
364            testname, 'test').and_return(("", testname))
365        os.path.exists.expect_call(outputdir).and_return(False)
366        os.mkdir.expect_call(outputdir)
367        self.job.record.expect_call("START", testname, testname)
368        self.job._increment_group_level.expect_call()
369        self.job._runtest.expect_call(testname, "", (), {}).and_raises(
370            unhandled_error)
371        self.job.record.expect_call("ERROR", testname, testname, reason)
372        self.job._decrement_group_level.expect_call()
373        self.job.record.expect_call("END ERROR", testname, testname, reason)
374
375        # run and check
376        self.job.run_test(testname)
377        self.god.check_playback()
378
379
380    def new_container(self):
381        self.construct_job(True)
382
383        # set up stubs
384        self.god.stub_function(autotest_utils, "grep")
385        self.god.stub_function(os, "getpid")
386        self.god.stub_class(cpuset, "cpuset")
387        pid = 100
388        name = 'test%d' % pid
389
390        # record
391        autotest_utils.grep.expect_call('cpuset',
392            '/proc/filesystems').and_return(True)
393        os.getpid.expect_call().and_return(pid)
394
395        container = cpuset.cpuset.expect_new(name, job_size=None, job_pid=pid,
396                                             cpus=None, root=None, disk=None,
397                                             network=None, kswapd_merge=False)
398
399        # run test
400        self.job.new_container()
401
402        self.god.check_playback()
403        return container
404
405
406    def test_new_container(self):
407        container = self.new_container()
408        self.assertEquals(self.job.container, container)
409
410
411    def test_release_container(self):
412        self.new_container()
413
414        # record
415        self.job.container.release.expect_call()
416
417        # run
418        self.job.release_container()
419        self.god.check_playback()
420        self.assertEquals(self.job.container, None)
421
422
423    def test_record(self):
424        self.construct_job(True)
425
426        # steup
427        self.job.group_level = 1
428        status = ''
429        status_code = "PASS"
430        subdir = "subdir"
431        operation = "super_fun"
432        mytime = "1234"
433        msg_tag = ""
434        if "." in self.job.log_filename:
435            msg_tag = self.job.log_filename.split(".", 1)[1]
436        epoch_time = int(mytime)
437        local_time = time.localtime(epoch_time)
438        optional_fields = {}
439        optional_fields["timestamp"] = str(epoch_time)
440        optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S",
441                                                     local_time)
442        fields = [status_code, subdir, operation]
443        fields += ["%s=%s" % x for x in optional_fields.iteritems()]
444        fields.append(status)
445        msg = '\t'.join(str(x) for x in fields)
446        msg = '\t' * self.job.group_level + msg
447
448        self.god.stub_function(logging, "is_valid_status")
449        self.god.stub_function(time, "time")
450        self.god.stub_function(self.job, "open")
451
452
453        # record
454        logging.is_valid_status.expect_call(status_code).and_return(True)
455        time.time.expect_call().and_return(mytime)
456        self.job.harness.test_status_detail.expect_call(status_code, subdir,
457                                                        operation, '', msg_tag)
458        self.job.harness.test_status.expect_call(msg, msg_tag)
459        myfile = self.god.create_mock_class(file, "file")
460        status_file = os.path.join(self.job.resultdir, self.job.log_filename)
461        self.job.open.expect_call(status_file, "a").and_return(myfile)
462        myfile.write.expect_call(msg + "\n")
463
464        dir = os.path.join(self.job.resultdir, subdir)
465        status_file = os.path.join(dir, self.job.DEFAULT_LOG_FILENAME)
466        self.job.open.expect_call(status_file, "a").and_return(myfile)
467        myfile.write.expect_call(msg + "\n")
468
469
470        # run test
471        self.god.unstub(self.job, "record")
472        self.job.record(status_code, subdir, operation)
473        self.god.check_playback()
474
475
476    def test_kernel_check_ident_success(self):
477        self.construct_job(True)
478
479        # set up the job class
480        self.job.group_level = 2
481
482        self.god.stub_function(autotest_utils, "running_os_ident")
483        autotest_utils.running_os_ident.expect_call().and_return("2.6.15-smp")
484
485        self.god.stub_function(utils, "read_one_line")
486        utils.read_one_line.expect_call("/proc/cmdline").and_return(
487            "blah more-blah root=lala IDENT=81234567 blah-again")
488
489        self.job.record.expect_call("GOOD", "sub", "reboot.verify",
490                                    "2.6.15-smp")
491        self.job._decrement_group_level.expect_call()
492        self.job.record.expect_call("END GOOD", "sub", "reboot",
493                                    optional_fields={"kernel": "2.6.15-smp"})
494
495        # run test
496        self.job.kernel_check_ident(81234567, "2.6.15-smp", "sub")
497        self.god.check_playback()
498
499
500    def test_kernel_check_ident_failure(self):
501        self.construct_job(True)
502
503        # set up the job class
504        self.job.group_level = 2
505
506        self.god.stub_function(autotest_utils, "running_os_ident")
507        autotest_utils.running_os_ident.expect_call().and_return("2.6.15-smp")
508
509        self.god.stub_function(utils, "read_one_line")
510        utils.read_one_line.expect_call("/proc/cmdline").and_return(
511            "blah more-blah root=lala IDENT=81234567 blah-again")
512
513        self.job.record.expect_call("ABORT", "sub", "reboot.verify",
514                                    "boot failure")
515        self.job._decrement_group_level.expect_call()
516        self.job.record.expect_call("END ABORT", "sub", "reboot",
517                                    optional_fields={"kernel": "2.6.15-smp"})
518
519        # run test
520        self.assertRaises(error.JobError, self.job.kernel_check_ident,
521                          91234567, "2.6.16-smp", "sub")
522        self.god.check_playback()
523
524
525if __name__ == "__main__":
526    unittest.main()
527