job_executer.py revision f81680c018729fd4499e1e200d04b48c4b90127c
1#!/usr/bin/python2.6
2#
3# Copyright 2010 Google Inc. All Rights Reserved.
4#
5
6import logging
7import os.path
8import threading
9
10from automation.common import command as cmd
11from automation.common import job
12from automation.common import logger
13from automation.common.command_executer import LoggingCommandExecuter
14from automation.common.command_executer import CommandTerminator
15
16
17class JobExecuter(threading.Thread):
18  def __init__(self, job_to_execute, machines, listeners):
19    threading.Thread.__init__(self)
20
21    assert machines
22
23    self.job = job_to_execute
24    self.listeners = listeners
25    self.machines = machines
26
27    # Set Thread name.
28    self.name = "%s-%s" % (self.__class__.__name__, self.job.id)
29
30    self._logger = logging.getLogger(self.__class__.__name__)
31    self._executer = LoggingCommandExecuter(self.job.dry_run)
32    self._terminator = CommandTerminator()
33
34  def _RunRemotely(self, command, fail_msg, command_timeout=1*60*60):
35    exit_code = self._executer.RunCommand(command,
36                                          self.job.primary_machine.hostname,
37                                          self.job.primary_machine.username,
38                                          command_terminator=self._terminator,
39                                          command_timeout=command_timeout)
40    if exit_code:
41      raise job.JobFailure(fail_msg, exit_code)
42
43  def _RunLocally(self, command, fail_msg, command_timeout=1*60*60):
44    exit_code = self._executer.RunCommand(command,
45                                          command_terminator=self._terminator,
46                                          command_timeout=command_timeout)
47    if exit_code:
48      raise job.JobFailure(fail_msg, exit_code)
49
50  def Kill(self):
51    self._terminator.Terminate()
52
53  def CleanUpWorkDir(self):
54    self._logger.debug('Cleaning up %r work directory.', self.job)
55    self._RunRemotely(
56        cmd.RmTree(self.job.work_dir), "Cleanup workdir failed.")
57
58  def CleanUpHomeDir(self):
59    self._logger.debug('Cleaning up %r home directory.', self.job)
60    self._RunLocally(
61        cmd.RmTree(self.job.home_dir), "Cleanup homedir failed.")
62
63  def _PrepareRuntimeEnvironment(self):
64    self._RunRemotely(
65        cmd.MakeDir(self.job.work_dir, self.job.logs_dir, self.job.results_dir),
66        "Creating new job directory failed.")
67
68    # The log directory is ready, so we can prepare to log command's output.
69    self._executer.OpenLog(
70        os.path.join(self.job.logs_dir, self.job.log_filename_prefix))
71
72  def _SatisfyFolderDependencies(self):
73    for dependency in self.job.folder_dependencies:
74      to_folder = os.path.join(self.job.work_dir, dependency.dest)
75      from_folder = os.path.join(dependency.job.work_dir, dependency.src)
76      from_machine = dependency.job.primary_machine
77
78      if from_machine == self.job.primary_machine and dependency.read_only:
79        # No need to make a copy, just symlink it
80        self._RunRemotely(
81            cmd.MakeSymlink(from_folder, to_folder),
82            "Failed to create symlink to required directory.")
83      else:
84        self._RunRemotely(
85            cmd.RemoteCopyFrom(from_machine.hostname, from_folder, to_folder,
86                               username=from_machine.username),
87            "Failed to copy required files.")
88
89  def _LaunchJobCommand(self):
90    command = self.job.GetCommand()
91
92    self._RunRemotely("%s; %s" % ("PS1=. TERM=linux source ~/.bashrc",
93                                  cmd.Wrapper(command, cwd=self.job.work_dir)),
94                      "Command failed to execute: '%s'." % command,
95                      self.job.timeout)
96
97  def _CopyJobResults(self):
98    """Copy test results back to directory."""
99    self._RunLocally(
100        cmd.RemoteCopyFrom(self.job.primary_machine.hostname,
101                           self.job.results_dir,
102                           self.job.home_dir,
103                           username=self.job.primary_machine.username),
104        "Failed to copy results.")
105
106  def run(self):
107    self.job.status = job.STATUS_SETUP
108    self.job.machines = self.machines
109    self._logger.debug(
110        "Executing %r on %r in directory %s.",
111        self.job, self.job.primary_machine.hostname, self.job.work_dir)
112
113    try:
114      self.CleanUpWorkDir()
115
116      self._PrepareRuntimeEnvironment()
117
118      self.job.status = job.STATUS_COPYING
119
120      self._SatisfyFolderDependencies()
121
122      self.job.status = job.STATUS_RUNNING
123
124      self._LaunchJobCommand()
125      self._CopyJobResults()
126
127      # If we get here, the job succeeded.
128      self.job.status = job.STATUS_SUCCEEDED
129    except job.JobFailure as ex:
130      self._logger.error(
131          "Job failed. Exit code %s. %s", ex.exit_code, ex)
132      if self._terminator.IsTerminated():
133        self._logger.info("%r was killed", self.job)
134
135      self.job.status = job.STATUS_FAILED
136
137    self._executer.CloseLog()
138
139    for listener in self.listeners:
140      listener.NotifyJobComplete(self.job)
141