job_manager.py revision f81680c018729fd4499e1e200d04b48c4b90127c
1#!/usr/bin/python2.6 2# 3# Copyright 2010 Google Inc. All Rights Reserved. 4# 5 6import logging 7import os 8import re 9import threading 10 11from automation.common import job 12from automation.common import logger 13from automation.server.job_executer import JobExecuter 14 15 16class IdProducerPolicy(object): 17 """Produces series of unique integer IDs. 18 19 Example: 20 id_producer = IdProducerPolicy() 21 id_a = id_producer.GetNextId() 22 id_b = id_producer.GetNextId() 23 assert id_a != id_b 24 """ 25 26 def __init__(self): 27 self._counter = 1 28 29 def Initialize(self, home_prefix, home_pattern): 30 """Find first available ID based on a directory listing. 31 32 Args: 33 home_prefix: A directory to be traversed. 34 home_pattern: A regexp describing all files/directories that will be 35 considered. The regexp must contain exactly one match group with name 36 "id", which must match an integer number. 37 38 Example: 39 id_producer.Initialize(JOBDIR_PREFIX, 'job-(?P<id>\d+)') 40 """ 41 harvested_ids = [] 42 43 if os.path.isdir(home_prefix): 44 for filename in os.listdir(home_prefix): 45 path = os.path.join(home_prefix, filename) 46 47 if os.path.isdir(path): 48 match = re.match(home_pattern, filename) 49 50 if match: 51 harvested_ids.append(int(match.group('id'))) 52 53 self._counter = max(harvested_ids or [0]) + 1 54 55 def GetNextId(self): 56 """Calculates another ID considered to be unique.""" 57 new_id = self._counter 58 self._counter += 1 59 return new_id 60 61 62class JobManager(threading.Thread): 63 def __init__(self, machine_manager): 64 threading.Thread.__init__(self, name=self.__class__.__name__) 65 self.all_jobs = [] 66 self.ready_jobs = [] 67 self.job_executer_mapping = {} 68 69 self.machine_manager = machine_manager 70 71 self._lock = threading.Lock() 72 self._jobs_available = threading.Condition(self._lock) 73 self._exit_request = False 74 75 self.listeners = [] 76 self.listeners.append(self) 77 78 self._id_producer = IdProducerPolicy() 79 self._id_producer.Initialize(job.Job.WORKDIR_PREFIX, 'job-(?P<id>\d+)') 80 81 self._logger = logging.getLogger(self.__class__.__name__) 82 83 def StartJobManager(self): 84 self._logger.info("Starting...") 85 86 with self._lock: 87 self.start() 88 self._jobs_available.notifyAll() 89 90 def StopJobManager(self): 91 self._logger.info("Shutdown request received.") 92 93 with self._lock: 94 for job_ in self.all_jobs: 95 self._KillJob(job_.id) 96 97 # Signal to die 98 self._exit_request = True 99 self._jobs_available.notifyAll() 100 101 # Wait for all job threads to finish 102 for executer in self.job_executer_mapping.values(): 103 executer.join() 104 105 def KillJob(self, job_id): 106 """Kill a job by id. 107 108 Does not block until the job is completed. 109 """ 110 with self._lock: 111 self._KillJob(job_id) 112 113 def GetJob(self, job_id): 114 for job_ in self.all_jobs: 115 if job_.id == job_id: 116 return job_ 117 return None 118 119 def _KillJob(self, job_id): 120 self._logger.info("Killing [Job: %d].", job_id) 121 122 if job_id in self.job_executer_mapping: 123 self.job_executer_mapping[job_id].Kill() 124 for job_ in self.ready_jobs: 125 if job_.id == job_id: 126 self.ready_jobs.remove(job_) 127 break 128 129 def AddJob(self, job_): 130 with self._lock: 131 job_.id = self._id_producer.GetNextId() 132 133 self.all_jobs.append(job_) 134 # Only queue a job as ready if it has no dependencies 135 if job_.is_ready: 136 self.ready_jobs.append(job_) 137 138 self._jobs_available.notifyAll() 139 140 return job_.id 141 142 def CleanUpJob(self, job_): 143 with self._lock: 144 if job_.id in self.job_executer_mapping: 145 self.job_executer_mapping[job_.id].CleanUpWorkDir() 146 del self.job_executer_mapping[job_.id] 147 # TODO(raymes): remove job from self.all_jobs 148 149 def NotifyJobComplete(self, job_): 150 self.machine_manager.ReturnMachines(job_.machines) 151 152 with self._lock: 153 self._logger.debug('Handling %r completion event.', job_) 154 155 if job_.status == job.STATUS_SUCCEEDED: 156 for succ in job_.successors: 157 if succ.is_ready: 158 if succ not in self.ready_jobs: 159 self.ready_jobs.append(succ) 160 161 self._jobs_available.notifyAll() 162 163 def AddListener(self, listener): 164 self.listeners.append(listener) 165 166 @logger.HandleUncaughtExceptions 167 def run(self): 168 self._logger.info("Started.") 169 170 while not self._exit_request: 171 with self._lock: 172 # Get the next ready job, block if there are none 173 self._jobs_available.wait() 174 175 while self.ready_jobs: 176 ready_job = self.ready_jobs.pop() 177 178 required_machines = ready_job.machine_dependencies 179 for pred in ready_job.predecessors: 180 required_machines[0].AddPreferredMachine( 181 pred.primary_machine.hostname) 182 183 machines = self.machine_manager.GetMachines(required_machines) 184 if not machines: 185 # If we can't get the necessary machines right now, simply wait 186 # for some jobs to complete 187 self.ready_jobs.insert(0, ready_job) 188 break 189 else: 190 # Mark as executing 191 executer = JobExecuter(ready_job, machines, self.listeners) 192 executer.start() 193 self.job_executer_mapping[ready_job.id] = executer 194 195 self._logger.info("Stopped.") 196