server_job.py revision b03ba64bfac2f4268198c84974663923ca456621
1""" 2The main job wrapper for the server side. 3 4This is the core infrastructure. Derived from the client side job.py 5 6Copyright Martin J. Bligh, Andy Whitcroft 2007 7""" 8 9__author__ = """ 10Martin J. Bligh <mbligh@google.com> 11Andy Whitcroft <apw@shadowen.org> 12""" 13 14import os, sys, re, time, select, subprocess, traceback 15import test 16from utils import * 17from common.error import * 18 19# this magic incantation should give us access to a client library 20server_dir = os.path.dirname(__file__) 21client_dir = os.path.join(server_dir, "..", "client", "bin") 22sys.path.append(client_dir) 23import fd_stack 24sys.path.pop() 25 26# load up a control segment 27# these are all stored in <server_dir>/control_segments 28def load_control_segment(name): 29 server_dir = os.path.dirname(os.path.abspath(__file__)) 30 script_file = os.path.join(server_dir, "control_segments", name) 31 if os.path.exists(script_file): 32 return file(script_file).read() 33 else: 34 return "" 35 36 37preamble = """\ 38import os, sys 39 40import hosts, autotest, kvm, git, standalone_profiler 41import source_kernel, rpm_kernel, deb_kernel, git_kernel 42from common.error import * 43from common import barrier 44from subcommand import * 45from utils import run, get_tmp_dir, sh_escape 46 47autotest.Autotest.job = job 48hosts.SSHHost.job = job 49barrier = barrier.barrier 50 51if len(machines) > 1: 52 open('.machines', 'w').write('\\n'.join(machines) + '\\n') 53""" 54 55client_wrapper = """ 56at = autotest.Autotest() 57 58def run_client(machine): 59 host = hosts.SSHHost(machine) 60 at.run(control, host=host) 61 62parallel_simple(run_client, machines) 63""" 64 65crashdumps = """ 66def crashdumps(machine): 67 host = hosts.SSHHost(machine, initialize=False) 68 host.get_crashdumps(test_start_time) 69 70parallel_simple(crashdumps, machines, log=False) 71""" 72 73reboot_segment="""\ 74def reboot(machine): 75 host = hosts.SSHHost(machine, initialize=False) 76 host.reboot() 77 78parallel_simple(reboot, machines, log=False) 79""" 80 81install="""\ 82def install(machine): 83 host = hosts.SSHHost(machine, initialize=False) 84 host.machine_install() 85 86parallel_simple(install, machines, log=False) 87""" 88 89# load up the verifier control segment, with an optional site-specific hook 90verify = load_control_segment("site_verify") 91verify += load_control_segment("verify") 92 93# load up the repair control segment, with an optional site-specific hook 94repair = load_control_segment("site_repair") 95repair += load_control_segment("repair") 96 97 98# load up site-specific code for generating site-specific job data 99try: 100 import site_job 101 get_site_job_data = site_job.get_site_job_data 102 del site_job 103except ImportError: 104 # by default provide a stub that generates no site data 105 def get_site_job_data(job): 106 return {} 107 108 109class server_job: 110 """The actual job against which we do everything. 111 112 Properties: 113 autodir 114 The top level autotest directory (/usr/local/autotest). 115 serverdir 116 <autodir>/server/ 117 clientdir 118 <autodir>/client/ 119 conmuxdir 120 <autodir>/conmux/ 121 testdir 122 <autodir>/server/tests/ 123 control 124 the control file for this job 125 """ 126 127 def __init__(self, control, args, resultdir, label, user, machines, 128 client = False): 129 """ 130 control 131 The control file (pathname of) 132 args 133 args to pass to the control file 134 resultdir 135 where to throw the results 136 label 137 label for the job 138 user 139 Username for the job (email address) 140 client 141 True if a client-side control file 142 """ 143 path = os.path.dirname(sys.modules['server_job'].__file__) 144 self.autodir = os.path.abspath(os.path.join(path, '..')) 145 self.serverdir = os.path.join(self.autodir, 'server') 146 self.testdir = os.path.join(self.serverdir, 'tests') 147 self.tmpdir = os.path.join(self.serverdir, 'tmp') 148 self.conmuxdir = os.path.join(self.autodir, 'conmux') 149 self.clientdir = os.path.join(self.autodir, 'client') 150 if control: 151 self.control = open(control, 'r').read() 152 self.control = re.sub('\r', '', self.control) 153 else: 154 self.control = None 155 self.resultdir = resultdir 156 if not os.path.exists(resultdir): 157 os.mkdir(resultdir) 158 self.debugdir = os.path.join(resultdir, 'debug') 159 if not os.path.exists(self.debugdir): 160 os.mkdir(self.debugdir) 161 self.status = os.path.join(resultdir, 'status') 162 self.label = label 163 self.user = user 164 self.args = args 165 self.machines = machines 166 self.client = client 167 self.record_prefix = '' 168 self.warning_loggers = set() 169 170 self.stdout = fd_stack.fd_stack(1, sys.stdout) 171 self.stderr = fd_stack.fd_stack(2, sys.stderr) 172 173 if os.path.exists(self.status): 174 os.unlink(self.status) 175 job_data = { 'label' : label, 'user' : user, 176 'hostname' : ','.join(machines) } 177 job_data.update(get_site_job_data(self)) 178 write_keyval(self.resultdir, job_data) 179 180 181 def verify(self): 182 if not self.machines: 183 raise AutoservError('No machines specified to verify') 184 try: 185 namespace = {'machines' : self.machines, 'job' : self} 186 exec(preamble + verify, namespace, namespace) 187 except Exception, e: 188 msg = 'Verify failed\n' + str(e) + '\n' + format_error() 189 self.record('ABORT', None, None, msg) 190 raise 191 192 193 def repair(self): 194 if not self.machines: 195 raise AutoservError('No machines specified to repair') 196 namespace = {'machines' : self.machines, 'job' : self} 197 # no matter what happens during repair, go on to try to reverify 198 try: 199 exec(preamble + repair, namespace, namespace) 200 except Exception, exc: 201 print 'Exception occured during repair' 202 traceback.print_exc() 203 self.verify() 204 205 206 def run(self, reboot = False, install_before = False, 207 install_after = False, collect_crashdumps = True, 208 namespace = {}): 209 # use a copy so changes don't affect the original dictionary 210 namespace = namespace.copy() 211 machines = self.machines 212 213 self.aborted = False 214 namespace['machines'] = machines 215 namespace['args'] = self.args 216 namespace['job'] = self 217 test_start_time = int(time.time()) 218 219 os.chdir(self.resultdir) 220 221 status_log = os.path.join(self.resultdir, 'status.log') 222 try: 223 if install_before and machines: 224 exec(preamble + install, namespace, namespace) 225 if self.client: 226 namespace['control'] = self.control 227 open('control', 'w').write(self.control) 228 open('control.srv', 'w').write(client_wrapper) 229 server_control = client_wrapper 230 else: 231 open('control.srv', 'w').write(self.control) 232 server_control = self.control 233 exec(preamble + server_control, namespace, namespace) 234 235 finally: 236 if machines and collect_crashdumps: 237 namespace['test_start_time'] = test_start_time 238 exec(preamble + crashdumps, 239 namespace, namespace) 240 if reboot and machines: 241 exec(preamble + reboot_segment, 242 namespace, namespace) 243 if install_after and machines: 244 exec(preamble + install, namespace, namespace) 245 246 247 def run_test(self, url, *args, **dargs): 248 """Summon a test object and run it. 249 250 tag 251 tag to add to testname 252 url 253 url of the test to run 254 """ 255 256 (group, testname) = test.testname(url) 257 tag = None 258 subdir = testname 259 260 if dargs.has_key('tag'): 261 tag = dargs['tag'] 262 del dargs['tag'] 263 if tag: 264 subdir += '.' + tag 265 266 try: 267 test.runtest(self, url, tag, args, dargs) 268 self.record('GOOD', subdir, testname, 'completed successfully') 269 except Exception, detail: 270 self.record('FAIL', subdir, testname, format_error()) 271 272 273 def run_group(self, function, *args, **dargs): 274 """\ 275 function: 276 subroutine to run 277 *args: 278 arguments for the function 279 """ 280 281 result = None 282 name = function.__name__ 283 284 # Allow the tag for the group to be specified. 285 if dargs.has_key('tag'): 286 tag = dargs['tag'] 287 del dargs['tag'] 288 if tag: 289 name = tag 290 291 # if tag: 292 # name += '.' + tag 293 old_record_prefix = self.record_prefix 294 try: 295 try: 296 self.record('START', None, name) 297 self.record_prefix += '\t' 298 result = function(*args, **dargs) 299 self.record_prefix = old_record_prefix 300 self.record('END GOOD', None, name) 301 except: 302 self.record_prefix = old_record_prefix 303 self.record('END FAIL', None, name, format_error()) 304 # We don't want to raise up an error higher if it's just 305 # a TestError - we want to carry on to other tests. Hence 306 # this outer try/except block. 307 except TestError: 308 pass 309 except: 310 raise TestError(name + ' failed\n' + format_error()) 311 312 return result 313 314 315 def record(self, status_code, subdir, operation, status=''): 316 """ 317 Record job-level status 318 319 The intent is to make this file both machine parseable and 320 human readable. That involves a little more complexity, but 321 really isn't all that bad ;-) 322 323 Format is <status code>\t<subdir>\t<operation>\t<status> 324 325 status code: (GOOD|WARN|FAIL|ABORT) 326 or START 327 or END (GOOD|WARN|FAIL|ABORT) 328 329 subdir: MUST be a relevant subdirectory in the results, 330 or None, which will be represented as '----' 331 332 operation: description of what you ran (e.g. "dbench", or 333 "mkfs -t foobar /dev/sda9") 334 335 status: error message or "completed sucessfully" 336 337 ------------------------------------------------------------ 338 339 Initial tabs indicate indent levels for grouping, and is 340 governed by self.record_prefix 341 342 multiline messages have secondary lines prefaced by a double 343 space (' ') 344 345 Executing this method will trigger the logging of all new 346 warnings to date from the various console loggers. 347 """ 348 # poll all our warning loggers for new warnings 349 warnings = self._read_warnings() 350 for timestamp, msg in warnings: 351 self.__record("WARN", None, None, msg, timestamp) 352 353 # write out the actual status log line 354 self.__record(status_code, subdir, operation, status) 355 356 357 def _read_warnings(self): 358 warnings = [] 359 while True: 360 # pull in a line of output from every logger that has 361 # output ready to be read 362 loggers, _, _ = select.select(self.warning_loggers, 363 [], [], 0) 364 closed_loggers = set() 365 for logger in loggers: 366 line = logger.readline() 367 # record any broken pipes (aka line == empty) 368 if len(line) == 0: 369 closed_loggers.add(logger) 370 continue 371 timestamp, msg = line.split('\t', 1) 372 warnings.append((int(timestamp), msg.strip())) 373 374 # stop listening to loggers that are closed 375 self.warning_loggers -= closed_loggers 376 377 # stop if none of the loggers have any output left 378 if not loggers: 379 break 380 381 # sort into timestamp order 382 warnings.sort() 383 return warnings 384 385 386 def _render_record(self, status_code, subdir, operation, status='', 387 epoch_time=None, record_prefix=None): 388 """ 389 Internal Function to generate a record to be written into a 390 status log. For use by server_job.* classes only. 391 """ 392 if subdir: 393 if re.match(r'[\n\t]', subdir): 394 raise ValueError('Invalid character in subdir string') 395 substr = subdir 396 else: 397 substr = '----' 398 399 if not re.match(r'(START|(END )?(GOOD|WARN|FAIL|ABORT))$', \ 400 status_code): 401 raise ValueError('Invalid status code supplied: %s' % status_code) 402 if not operation: 403 operation = '----' 404 if re.match(r'[\n\t]', operation): 405 raise ValueError('Invalid character in operation string') 406 operation = operation.rstrip() 407 status = status.rstrip() 408 status = re.sub(r"\t", " ", status) 409 # Ensure any continuation lines are marked so we can 410 # detect them in the status file to ensure it is parsable. 411 status = re.sub(r"\n", "\n" + self.record_prefix + " ", status) 412 413 # Generate timestamps for inclusion in the logs 414 if epoch_time is None: 415 epoch_time = int(time.time()) 416 local_time = time.localtime(epoch_time) 417 epoch_time_str = "timestamp=%d" % (epoch_time,) 418 local_time_str = time.strftime("localtime=%b %d %H:%M:%S", 419 local_time) 420 421 if record_prefix is None: 422 record_prefix = self.record_prefix 423 424 msg = '\t'.join(str(x) for x in (status_code, substr, operation, 425 epoch_time_str, local_time_str, 426 status)) 427 return record_prefix + msg + '\n' 428 429 430 def _record_prerendered(self, msg): 431 """ 432 Record a pre-rendered msg into the status logs. The only 433 change this makes to the message is to add on the local 434 indentation. Should not be called outside of server_job.* 435 classes. Unlike __record, this does not write the message 436 to standard output. 437 """ 438 status_file = os.path.join(self.resultdir, 'status.log') 439 status_log = open(status_file, 'a') 440 need_reparse = False 441 for line in msg.splitlines(): 442 line = self.record_prefix + line + '\n' 443 status_log.write(line) 444 if self.__need_reparse(line): 445 need_reparse = True 446 status_log.close() 447 if need_reparse: 448 self.__parse_status() 449 450 451 def __record(self, status_code, subdir, operation, status='', 452 epoch_time=None): 453 """ 454 Actual function for recording a single line into the status 455 logs. Should never be called directly, only by job.record as 456 this would bypass the console monitor logging. 457 """ 458 459 msg = self._render_record(status_code, subdir, operation, 460 status, epoch_time) 461 462 463 status_file = os.path.join(self.resultdir, 'status.log') 464 sys.stdout.write(msg) 465 open(status_file, "a").write(msg) 466 if subdir: 467 test_dir = os.path.join(self.resultdir, subdir) 468 if not os.path.exists(test_dir): 469 os.mkdir(test_dir) 470 status_file = os.path.join(test_dir, 'status') 471 open(status_file, "a").write(msg) 472 if self.__need_reparse(msg): 473 self.__parse_status() 474 475 476 def __need_reparse(self, line): 477 # the parser will not record results if lines have more than 478 # one level of indentation 479 indent = len(re.search(r"^(\t*)", line).group(1)) 480 if indent > 1: 481 return False 482 # we can also skip START lines, as they add nothing 483 line = line.lstrip("\t") 484 if line.startswith("START\t"): 485 return False 486 # otherwise, we should do a parse 487 return True 488 489 490 def __parse_status(self): 491 """ 492 If a .parse.cmd file is present in the results directory, 493 launch the tko parser. 494 """ 495 cmdfile = os.path.join(self.resultdir, '.parse.cmd') 496 if os.path.exists(cmdfile): 497 cmd = open(cmdfile).read().strip() 498 subprocess.Popen(cmd, shell=True) 499 500 501# a file-like object for catching stderr from an autotest client and 502# extracting status logs from it 503class client_logger(object): 504 """Partial file object to write to both stdout and 505 the status log file. We only implement those methods 506 utils.run() actually calls. 507 """ 508 parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$") 509 extract_indent = re.compile(r"^(\t*).*$") 510 511 def __init__(self, job): 512 self.job = job 513 self.leftover = "" 514 self.last_line = "" 515 self.logs = {} 516 517 518 def _process_log_dict(self, log_dict): 519 log_list = log_dict.pop("logs", []) 520 for key in sorted(log_dict.iterkeys()): 521 log_list += self._process_log_dict(log_dict.pop(key)) 522 return log_list 523 524 525 def _process_logs(self): 526 """Go through the accumulated logs in self.log and print them 527 out to stdout and the status log. Note that this processes 528 logs in an ordering where: 529 530 1) logs to different tags are never interleaved 531 2) logs to x.y come before logs to x.y.z for all z 532 3) logs to x.y come before x.z whenever y < z 533 534 Note that this will in general not be the same as the 535 chronological ordering of the logs. However, if a chronological 536 ordering is desired that one can be reconstructed from the 537 status log by looking at timestamp lines.""" 538 log_list = self._process_log_dict(self.logs) 539 for line in log_list: 540 self.job._record_prerendered(line + '\n') 541 if log_list: 542 self.last_line = log_list[-1] 543 544 545 def _process_quoted_line(self, tag, line): 546 """Process a line quoted with an AUTOTEST_STATUS flag. If the 547 tag is blank then we want to push out all the data we've been 548 building up in self.logs, and then the newest line. If the 549 tag is not blank, then push the line into the logs for handling 550 later.""" 551 print line 552 if tag == "": 553 self._process_logs() 554 self.job._record_prerendered(line + '\n') 555 self.last_line = line 556 else: 557 tag_parts = [int(x) for x in tag.split(".")] 558 log_dict = self.logs 559 for part in tag_parts: 560 log_dict = log_dict.setdefault(part, {}) 561 log_list = log_dict.setdefault("logs", []) 562 log_list.append(line) 563 564 565 def _process_line(self, line): 566 """Write out a line of data to the appropriate stream. Status 567 lines sent by autotest will be prepended with 568 "AUTOTEST_STATUS", and all other lines are ssh error 569 messages.""" 570 match = self.parser.search(line) 571 if match: 572 tag, line = match.groups() 573 self._process_quoted_line(tag, line) 574 else: 575 print line 576 577 578 def _format_warnings(self, last_line, warnings): 579 # use the indentation of whatever the last log line was 580 indent = self.extract_indent.match(last_line).group(1) 581 # if the last line starts a new group, add an extra indent 582 if last_line.lstrip('\t').startswith("START\t"): 583 indent += '\t' 584 return [self.job._render_record("WARN", None, None, msg, 585 timestamp, indent).rstrip('\n') 586 for timestamp, msg in warnings] 587 588 589 def _process_warnings(self, last_line, log_dict, warnings): 590 if log_dict.keys() in ([], ["logs"]): 591 # there are no sub-jobs, just append the warnings here 592 warnings = self._format_warnings(last_line, warnings) 593 log_list = log_dict.setdefault("logs", []) 594 log_list += warnings 595 for warning in warnings: 596 sys.stdout.write(warning + '\n') 597 else: 598 # there are sub-jobs, so put the warnings in there 599 log_list = log_dict.get("logs", []) 600 if log_list: 601 last_line = log_list[-1] 602 for key in sorted(log_dict.iterkeys()): 603 if key != "logs": 604 self._process_warnings(last_line, 605 log_dict[key], 606 warnings) 607 608 609 def write(self, data): 610 # first check for any new console warnings 611 warnings = self.job._read_warnings() 612 self._process_warnings(self.last_line, self.logs, warnings) 613 # now process the newest data written out 614 data = self.leftover + data 615 lines = data.split("\n") 616 # process every line but the last one 617 for line in lines[:-1]: 618 self._process_line(line) 619 # save the last line for later processing 620 # since we may not have the whole line yet 621 self.leftover = lines[-1] 622 623 624 def flush(self): 625 sys.stdout.flush() 626 627 628 def close(self): 629 if self.leftover: 630 self._process_line(self.leftover) 631 self._process_logs() 632 self.flush() 633