job.py revision f81680c018729fd4499e1e200d04b48c4b90127c
1#!/usr/bin/python2.6
2#
3# Copyright 2010 Google Inc. All Rights Reserved.
4#
5
6"""A module for a job in the infrastructure."""
7
8
9__author__ = 'raymes@google.com (Raymes Khoury)'
10
11
12import os.path
13
14from automation.common import state_machine
15
16STATUS_NOT_EXECUTED = 'NOT_EXECUTED'
17STATUS_SETUP = 'SETUP'
18STATUS_COPYING = 'COPYING'
19STATUS_RUNNING = 'RUNNING'
20STATUS_SUCCEEDED = 'SUCCEEDED'
21STATUS_FAILED = 'FAILED'
22
23
24class FolderDependency(object):
25  def __init__(self, job, src, dest=None):
26    if not dest:
27      dest = src
28
29    # TODO(kbaclawski): rename to producer
30    self.job = job
31    self.src = src
32    self.dest = dest
33
34  @property
35  def read_only(self):
36    return self.dest == self.src
37
38
39class JobStateMachine(state_machine.BasicStateMachine):
40  state_machine = {
41      STATUS_NOT_EXECUTED: [STATUS_SETUP],
42      STATUS_SETUP: [STATUS_COPYING, STATUS_FAILED],
43      STATUS_COPYING: [STATUS_RUNNING, STATUS_FAILED],
44      STATUS_RUNNING: [STATUS_SUCCEEDED, STATUS_FAILED]}
45
46  final_states = [STATUS_SUCCEEDED, STATUS_FAILED]
47
48
49class JobFailure(Exception):
50  def __init__(self, message, exit_code):
51    Exception.__init__(self, message)
52    self.exit_code = exit_code
53
54
55class Job(object):
56  """A class representing a job whose commands will be executed."""
57
58  WORKDIR_PREFIX = '/usr/local/google/tmp/automation'
59
60  def __init__(self, label, command, timeout=4*60*60):
61    self._state = JobStateMachine(STATUS_NOT_EXECUTED)
62    self.predecessors = set()
63    self.successors = set()
64    self.machine_dependencies = []
65    self.folder_dependencies = []
66    self.id = 0
67    self.machines = []
68    self.command = command
69    self._has_primary_machine_spec = False
70    self.group = None
71    self.dry_run = None
72    self.label = label
73    self.timeout = timeout
74
75  def _StateGet(self):
76    return self._state
77
78  def _StateSet(self, new_state):
79    self._state.Change(new_state)
80
81  status = property(_StateGet, _StateSet)
82
83  @property
84  def timeline(self):
85    return self._state.timeline
86
87  def __repr__(self):
88    return '{%s: %s}' % (self.__class__.__name__, self.id)
89
90  def __str__(self):
91    res = []
92    res.append('%d' % self.id)
93    res.append('Predecessors:')
94    res.extend(['%d' % pred.id for pred in self.predecessors])
95    res.append('Successors:')
96    res.extend(['%d' % succ.id for succ in self.successors])
97    res.append('Machines:')
98    res.extend(['%s' % machine for machine in self.machines])
99    res.append(self.PrettyFormatCommand())
100    res.append('%s' % self.status)
101    res.append(self.timeline.GetTransitionEventReport())
102    return '\n'.join(res)
103
104  @staticmethod
105  def _FormatCommand(cmd, substitutions):
106    for pattern, replacement in substitutions:
107      cmd = cmd.replace(pattern, replacement)
108
109    return cmd
110
111  def GetCommand(self):
112    substitutions = [
113        ('$JOB_ID', str(self.id)),
114        ('$JOB_TMP', self.work_dir),
115        ('$JOB_HOME', self.home_dir),
116        ('$PRIMARY_MACHINE', self.primary_machine.hostname)]
117
118    if len(self.machines) > 1:
119      for num, machine in enumerate(self.machines[1:]):
120        substitutions.append(
121            ('$SECONDARY_MACHINES[%d]' % num, machine.hostname))
122
123    return self._FormatCommand(str(self.command), substitutions)
124
125  def PrettyFormatCommand(self):
126    # TODO(kbaclawski): This method doesn't belong here, but rather to
127    # non existing Command class. If one is created then PrettyFormatCommand
128    # shall become its method.
129    return self._FormatCommand(self.GetCommand(), [
130        ('\{ ', ''), ('; \}', ''), ('\} ', '\n'), ('\s*&&\s*', '\n')])
131
132  def DependsOnFolder(self, dependency):
133    self.folder_dependencies.append(dependency)
134    self.DependsOn(dependency.job)
135
136  @property
137  def results_dir(self):
138    return os.path.join(self.work_dir, 'results')
139
140  @property
141  def logs_dir(self):
142    return os.path.join(self.home_dir, 'logs')
143
144  @property
145  def log_filename_prefix(self):
146    return 'job-%d.log' % self.id
147
148  @property
149  def work_dir(self):
150    return os.path.join(self.WORKDIR_PREFIX, 'job-%d' % self.id)
151
152  @property
153  def home_dir(self):
154    return os.path.join(self.group.home_dir, 'job-%d' % self.id)
155
156  @property
157  def primary_machine(self):
158    return self.machines[0]
159
160  def DependsOn(self, job):
161    """Specifies Jobs to be finished before this job can be launched."""
162    self.predecessors.add(job)
163    job.successors.add(self)
164
165  @property
166  def is_ready(self):
167    """Check that all our dependencies have been executed."""
168    return all(pred.status == STATUS_SUCCEEDED for pred in self.predecessors)
169
170  def DependsOnMachine(self, machine_spec, primary=True):
171    # Job will run on arbitrarily chosen machine specified by
172    # MachineSpecification class instances passed to this method.
173    if primary:
174      if self._has_primary_machine_spec:
175        raise RuntimeError('Only one primary machine specification allowed.')
176      self._has_primary_machine_spec = True
177      self.machine_dependencies.insert(0, machine_spec)
178    else:
179      self.machine_dependencies.append(machine_spec)
180