job.py revision f81680c018729fd4499e1e200d04b48c4b90127c
1#!/usr/bin/python2.6 2# 3# Copyright 2010 Google Inc. All Rights Reserved. 4# 5 6"""A module for a job in the infrastructure.""" 7 8 9__author__ = 'raymes@google.com (Raymes Khoury)' 10 11 12import os.path 13 14from automation.common import state_machine 15 16STATUS_NOT_EXECUTED = 'NOT_EXECUTED' 17STATUS_SETUP = 'SETUP' 18STATUS_COPYING = 'COPYING' 19STATUS_RUNNING = 'RUNNING' 20STATUS_SUCCEEDED = 'SUCCEEDED' 21STATUS_FAILED = 'FAILED' 22 23 24class FolderDependency(object): 25 def __init__(self, job, src, dest=None): 26 if not dest: 27 dest = src 28 29 # TODO(kbaclawski): rename to producer 30 self.job = job 31 self.src = src 32 self.dest = dest 33 34 @property 35 def read_only(self): 36 return self.dest == self.src 37 38 39class JobStateMachine(state_machine.BasicStateMachine): 40 state_machine = { 41 STATUS_NOT_EXECUTED: [STATUS_SETUP], 42 STATUS_SETUP: [STATUS_COPYING, STATUS_FAILED], 43 STATUS_COPYING: [STATUS_RUNNING, STATUS_FAILED], 44 STATUS_RUNNING: [STATUS_SUCCEEDED, STATUS_FAILED]} 45 46 final_states = [STATUS_SUCCEEDED, STATUS_FAILED] 47 48 49class JobFailure(Exception): 50 def __init__(self, message, exit_code): 51 Exception.__init__(self, message) 52 self.exit_code = exit_code 53 54 55class Job(object): 56 """A class representing a job whose commands will be executed.""" 57 58 WORKDIR_PREFIX = '/usr/local/google/tmp/automation' 59 60 def __init__(self, label, command, timeout=4*60*60): 61 self._state = JobStateMachine(STATUS_NOT_EXECUTED) 62 self.predecessors = set() 63 self.successors = set() 64 self.machine_dependencies = [] 65 self.folder_dependencies = [] 66 self.id = 0 67 self.machines = [] 68 self.command = command 69 self._has_primary_machine_spec = False 70 self.group = None 71 self.dry_run = None 72 self.label = label 73 self.timeout = timeout 74 75 def _StateGet(self): 76 return self._state 77 78 def _StateSet(self, new_state): 79 self._state.Change(new_state) 80 81 status = property(_StateGet, _StateSet) 82 83 @property 84 def timeline(self): 85 return self._state.timeline 86 87 def __repr__(self): 88 return '{%s: %s}' % (self.__class__.__name__, self.id) 89 90 def __str__(self): 91 res = [] 92 res.append('%d' % self.id) 93 res.append('Predecessors:') 94 res.extend(['%d' % pred.id for pred in self.predecessors]) 95 res.append('Successors:') 96 res.extend(['%d' % succ.id for succ in self.successors]) 97 res.append('Machines:') 98 res.extend(['%s' % machine for machine in self.machines]) 99 res.append(self.PrettyFormatCommand()) 100 res.append('%s' % self.status) 101 res.append(self.timeline.GetTransitionEventReport()) 102 return '\n'.join(res) 103 104 @staticmethod 105 def _FormatCommand(cmd, substitutions): 106 for pattern, replacement in substitutions: 107 cmd = cmd.replace(pattern, replacement) 108 109 return cmd 110 111 def GetCommand(self): 112 substitutions = [ 113 ('$JOB_ID', str(self.id)), 114 ('$JOB_TMP', self.work_dir), 115 ('$JOB_HOME', self.home_dir), 116 ('$PRIMARY_MACHINE', self.primary_machine.hostname)] 117 118 if len(self.machines) > 1: 119 for num, machine in enumerate(self.machines[1:]): 120 substitutions.append( 121 ('$SECONDARY_MACHINES[%d]' % num, machine.hostname)) 122 123 return self._FormatCommand(str(self.command), substitutions) 124 125 def PrettyFormatCommand(self): 126 # TODO(kbaclawski): This method doesn't belong here, but rather to 127 # non existing Command class. If one is created then PrettyFormatCommand 128 # shall become its method. 129 return self._FormatCommand(self.GetCommand(), [ 130 ('\{ ', ''), ('; \}', ''), ('\} ', '\n'), ('\s*&&\s*', '\n')]) 131 132 def DependsOnFolder(self, dependency): 133 self.folder_dependencies.append(dependency) 134 self.DependsOn(dependency.job) 135 136 @property 137 def results_dir(self): 138 return os.path.join(self.work_dir, 'results') 139 140 @property 141 def logs_dir(self): 142 return os.path.join(self.home_dir, 'logs') 143 144 @property 145 def log_filename_prefix(self): 146 return 'job-%d.log' % self.id 147 148 @property 149 def work_dir(self): 150 return os.path.join(self.WORKDIR_PREFIX, 'job-%d' % self.id) 151 152 @property 153 def home_dir(self): 154 return os.path.join(self.group.home_dir, 'job-%d' % self.id) 155 156 @property 157 def primary_machine(self): 158 return self.machines[0] 159 160 def DependsOn(self, job): 161 """Specifies Jobs to be finished before this job can be launched.""" 162 self.predecessors.add(job) 163 job.successors.add(self) 164 165 @property 166 def is_ready(self): 167 """Check that all our dependencies have been executed.""" 168 return all(pred.status == STATUS_SUCCEEDED for pred in self.predecessors) 169 170 def DependsOnMachine(self, machine_spec, primary=True): 171 # Job will run on arbitrarily chosen machine specified by 172 # MachineSpecification class instances passed to this method. 173 if primary: 174 if self._has_primary_machine_spec: 175 raise RuntimeError('Only one primary machine specification allowed.') 176 self._has_primary_machine_spec = True 177 self.machine_dependencies.insert(0, machine_spec) 178 else: 179 self.machine_dependencies.append(machine_spec) 180