job_executer.py revision f2a3ef46f75d2196a93d3ed27f4d1fcf22b54fbe
1# Copyright 2010 Google Inc. All Rights Reserved.
2#
3
4import logging
5import os.path
6import threading
7
8from automation.common import command as cmd
9from automation.common import job
10from automation.common import logger
11from automation.common.command_executer import LoggingCommandExecuter
12from automation.common.command_executer import CommandTerminator
13
14
15class JobExecuter(threading.Thread):
16
17  def __init__(self, job_to_execute, machines, listeners):
18    threading.Thread.__init__(self)
19
20    assert machines
21
22    self.job = job_to_execute
23    self.listeners = listeners
24    self.machines = machines
25
26    # Set Thread name.
27    self.name = '%s-%s' % (self.__class__.__name__, self.job.id)
28
29    self._logger = logging.getLogger(self.__class__.__name__)
30    self._executer = LoggingCommandExecuter(self.job.dry_run)
31    self._terminator = CommandTerminator()
32
33  def _RunRemotely(self, command, fail_msg, command_timeout=1 * 60 * 60):
34    exit_code = self._executer.RunCommand(command,
35                                          self.job.primary_machine.hostname,
36                                          self.job.primary_machine.username,
37                                          command_terminator=self._terminator,
38                                          command_timeout=command_timeout)
39    if exit_code:
40      raise job.JobFailure(fail_msg, exit_code)
41
42  def _RunLocally(self, command, fail_msg, command_timeout=1 * 60 * 60):
43    exit_code = self._executer.RunCommand(command,
44                                          command_terminator=self._terminator,
45                                          command_timeout=command_timeout)
46    if exit_code:
47      raise job.JobFailure(fail_msg, exit_code)
48
49  def Kill(self):
50    self._terminator.Terminate()
51
52  def CleanUpWorkDir(self):
53    self._logger.debug('Cleaning up %r work directory.', self.job)
54    self._RunRemotely(cmd.RmTree(self.job.work_dir), 'Cleanup workdir failed.')
55
56  def CleanUpHomeDir(self):
57    self._logger.debug('Cleaning up %r home directory.', self.job)
58    self._RunLocally(cmd.RmTree(self.job.home_dir), 'Cleanup homedir failed.')
59
60  def _PrepareRuntimeEnvironment(self):
61    self._RunRemotely(
62        cmd.MakeDir(self.job.work_dir, self.job.logs_dir, self.job.results_dir),
63        'Creating new job directory failed.')
64
65    # The log directory is ready, so we can prepare to log command's output.
66    self._executer.OpenLog(os.path.join(self.job.logs_dir,
67                                        self.job.log_filename_prefix))
68
69  def _SatisfyFolderDependencies(self):
70    for dependency in self.job.folder_dependencies:
71      to_folder = os.path.join(self.job.work_dir, dependency.dest)
72      from_folder = os.path.join(dependency.job.work_dir, dependency.src)
73      from_machine = dependency.job.primary_machine
74
75      if from_machine == self.job.primary_machine and dependency.read_only:
76        # No need to make a copy, just symlink it
77        self._RunRemotely(
78            cmd.MakeSymlink(from_folder, to_folder),
79            'Failed to create symlink to required directory.')
80      else:
81        self._RunRemotely(
82            cmd.RemoteCopyFrom(from_machine.hostname,
83                               from_folder,
84                               to_folder,
85                               username=from_machine.username),
86            'Failed to copy required files.')
87
88  def _LaunchJobCommand(self):
89    command = self.job.GetCommand()
90
91    self._RunRemotely('%s; %s' % ('PS1=. TERM=linux source ~/.bashrc',
92                                  cmd.Wrapper(command,
93                                              cwd=self.job.work_dir)),
94                      "Command failed to execute: '%s'." % command,
95                      self.job.timeout)
96
97  def _CopyJobResults(self):
98    """Copy test results back to directory."""
99    self._RunLocally(
100        cmd.RemoteCopyFrom(self.job.primary_machine.hostname,
101                           self.job.results_dir,
102                           self.job.home_dir,
103                           username=self.job.primary_machine.username),
104        'Failed to copy results.')
105
106  def run(self):
107    self.job.status = job.STATUS_SETUP
108    self.job.machines = self.machines
109    self._logger.debug('Executing %r on %r in directory %s.', self.job,
110                       self.job.primary_machine.hostname, self.job.work_dir)
111
112    try:
113      self.CleanUpWorkDir()
114
115      self._PrepareRuntimeEnvironment()
116
117      self.job.status = job.STATUS_COPYING
118
119      self._SatisfyFolderDependencies()
120
121      self.job.status = job.STATUS_RUNNING
122
123      self._LaunchJobCommand()
124      self._CopyJobResults()
125
126      # If we get here, the job succeeded.
127      self.job.status = job.STATUS_SUCCEEDED
128    except job.JobFailure as ex:
129      self._logger.error('Job failed. Exit code %s. %s', ex.exit_code, ex)
130      if self._terminator.IsTerminated():
131        self._logger.info('%r was killed', self.job)
132
133      self.job.status = job.STATUS_FAILED
134
135    self._executer.CloseLog()
136
137    for listener in self.listeners:
138      listener.NotifyJobComplete(self.job)
139