job_executer.py revision f81680c018729fd4499e1e200d04b48c4b90127c
1#!/usr/bin/python2.6 2# 3# Copyright 2010 Google Inc. All Rights Reserved. 4# 5 6import logging 7import os.path 8import threading 9 10from automation.common import command as cmd 11from automation.common import job 12from automation.common import logger 13from automation.common.command_executer import LoggingCommandExecuter 14from automation.common.command_executer import CommandTerminator 15 16 17class JobExecuter(threading.Thread): 18 def __init__(self, job_to_execute, machines, listeners): 19 threading.Thread.__init__(self) 20 21 assert machines 22 23 self.job = job_to_execute 24 self.listeners = listeners 25 self.machines = machines 26 27 # Set Thread name. 28 self.name = "%s-%s" % (self.__class__.__name__, self.job.id) 29 30 self._logger = logging.getLogger(self.__class__.__name__) 31 self._executer = LoggingCommandExecuter(self.job.dry_run) 32 self._terminator = CommandTerminator() 33 34 def _RunRemotely(self, command, fail_msg, command_timeout=1*60*60): 35 exit_code = self._executer.RunCommand(command, 36 self.job.primary_machine.hostname, 37 self.job.primary_machine.username, 38 command_terminator=self._terminator, 39 command_timeout=command_timeout) 40 if exit_code: 41 raise job.JobFailure(fail_msg, exit_code) 42 43 def _RunLocally(self, command, fail_msg, command_timeout=1*60*60): 44 exit_code = self._executer.RunCommand(command, 45 command_terminator=self._terminator, 46 command_timeout=command_timeout) 47 if exit_code: 48 raise job.JobFailure(fail_msg, exit_code) 49 50 def Kill(self): 51 self._terminator.Terminate() 52 53 def CleanUpWorkDir(self): 54 self._logger.debug('Cleaning up %r work directory.', self.job) 55 self._RunRemotely( 56 cmd.RmTree(self.job.work_dir), "Cleanup workdir failed.") 57 58 def CleanUpHomeDir(self): 59 self._logger.debug('Cleaning up %r home directory.', self.job) 60 self._RunLocally( 61 cmd.RmTree(self.job.home_dir), "Cleanup homedir failed.") 62 63 def _PrepareRuntimeEnvironment(self): 64 self._RunRemotely( 65 cmd.MakeDir(self.job.work_dir, self.job.logs_dir, self.job.results_dir), 66 "Creating new job directory failed.") 67 68 # The log directory is ready, so we can prepare to log command's output. 69 self._executer.OpenLog( 70 os.path.join(self.job.logs_dir, self.job.log_filename_prefix)) 71 72 def _SatisfyFolderDependencies(self): 73 for dependency in self.job.folder_dependencies: 74 to_folder = os.path.join(self.job.work_dir, dependency.dest) 75 from_folder = os.path.join(dependency.job.work_dir, dependency.src) 76 from_machine = dependency.job.primary_machine 77 78 if from_machine == self.job.primary_machine and dependency.read_only: 79 # No need to make a copy, just symlink it 80 self._RunRemotely( 81 cmd.MakeSymlink(from_folder, to_folder), 82 "Failed to create symlink to required directory.") 83 else: 84 self._RunRemotely( 85 cmd.RemoteCopyFrom(from_machine.hostname, from_folder, to_folder, 86 username=from_machine.username), 87 "Failed to copy required files.") 88 89 def _LaunchJobCommand(self): 90 command = self.job.GetCommand() 91 92 self._RunRemotely("%s; %s" % ("PS1=. TERM=linux source ~/.bashrc", 93 cmd.Wrapper(command, cwd=self.job.work_dir)), 94 "Command failed to execute: '%s'." % command, 95 self.job.timeout) 96 97 def _CopyJobResults(self): 98 """Copy test results back to directory.""" 99 self._RunLocally( 100 cmd.RemoteCopyFrom(self.job.primary_machine.hostname, 101 self.job.results_dir, 102 self.job.home_dir, 103 username=self.job.primary_machine.username), 104 "Failed to copy results.") 105 106 def run(self): 107 self.job.status = job.STATUS_SETUP 108 self.job.machines = self.machines 109 self._logger.debug( 110 "Executing %r on %r in directory %s.", 111 self.job, self.job.primary_machine.hostname, self.job.work_dir) 112 113 try: 114 self.CleanUpWorkDir() 115 116 self._PrepareRuntimeEnvironment() 117 118 self.job.status = job.STATUS_COPYING 119 120 self._SatisfyFolderDependencies() 121 122 self.job.status = job.STATUS_RUNNING 123 124 self._LaunchJobCommand() 125 self._CopyJobResults() 126 127 # If we get here, the job succeeded. 128 self.job.status = job.STATUS_SUCCEEDED 129 except job.JobFailure as ex: 130 self._logger.error( 131 "Job failed. Exit code %s. %s", ex.exit_code, ex) 132 if self._terminator.IsTerminated(): 133 self._logger.info("%r was killed", self.job) 134 135 self.job.status = job.STATUS_FAILED 136 137 self._executer.CloseLog() 138 139 for listener in self.listeners: 140 listener.NotifyJobComplete(self.job) 141