job_manager.py revision f81680c018729fd4499e1e200d04b48c4b90127c
1#!/usr/bin/python2.6
2#
3# Copyright 2010 Google Inc. All Rights Reserved.
4#
5
6import logging
7import os
8import re
9import threading
10
11from automation.common import job
12from automation.common import logger
13from automation.server.job_executer import JobExecuter
14
15
16class IdProducerPolicy(object):
17  """Produces series of unique integer IDs.
18
19  Example:
20      id_producer = IdProducerPolicy()
21      id_a = id_producer.GetNextId()
22      id_b = id_producer.GetNextId()
23      assert id_a != id_b
24  """
25
26  def __init__(self):
27    self._counter = 1
28
29  def Initialize(self, home_prefix, home_pattern):
30    """Find first available ID based on a directory listing.
31
32    Args:
33      home_prefix: A directory to be traversed.
34      home_pattern: A regexp describing all files/directories that will be
35        considered. The regexp must contain exactly one match group with name
36        "id", which must match an integer number.
37
38    Example:
39      id_producer.Initialize(JOBDIR_PREFIX, 'job-(?P<id>\d+)')
40    """
41    harvested_ids = []
42
43    if os.path.isdir(home_prefix):
44      for filename in os.listdir(home_prefix):
45        path = os.path.join(home_prefix, filename)
46
47        if os.path.isdir(path):
48          match = re.match(home_pattern, filename)
49
50          if match:
51            harvested_ids.append(int(match.group('id')))
52
53    self._counter = max(harvested_ids or [0]) + 1
54
55  def GetNextId(self):
56    """Calculates another ID considered to be unique."""
57    new_id = self._counter
58    self._counter += 1
59    return new_id
60
61
62class JobManager(threading.Thread):
63  def __init__(self, machine_manager):
64    threading.Thread.__init__(self, name=self.__class__.__name__)
65    self.all_jobs = []
66    self.ready_jobs = []
67    self.job_executer_mapping = {}
68
69    self.machine_manager = machine_manager
70
71    self._lock = threading.Lock()
72    self._jobs_available = threading.Condition(self._lock)
73    self._exit_request = False
74
75    self.listeners = []
76    self.listeners.append(self)
77
78    self._id_producer = IdProducerPolicy()
79    self._id_producer.Initialize(job.Job.WORKDIR_PREFIX, 'job-(?P<id>\d+)')
80
81    self._logger = logging.getLogger(self.__class__.__name__)
82
83  def StartJobManager(self):
84    self._logger.info("Starting...")
85
86    with self._lock:
87      self.start()
88      self._jobs_available.notifyAll()
89
90  def StopJobManager(self):
91    self._logger.info("Shutdown request received.")
92
93    with self._lock:
94      for job_ in self.all_jobs:
95        self._KillJob(job_.id)
96
97      # Signal to die
98      self._exit_request = True
99      self._jobs_available.notifyAll()
100
101    # Wait for all job threads to finish
102    for executer in self.job_executer_mapping.values():
103      executer.join()
104
105  def KillJob(self, job_id):
106    """Kill a job by id.
107
108    Does not block until the job is completed.
109    """
110    with self._lock:
111      self._KillJob(job_id)
112
113  def GetJob(self, job_id):
114    for job_ in self.all_jobs:
115      if job_.id == job_id:
116        return job_
117    return None
118
119  def _KillJob(self, job_id):
120    self._logger.info("Killing [Job: %d].", job_id)
121
122    if job_id in self.job_executer_mapping:
123      self.job_executer_mapping[job_id].Kill()
124    for job_ in self.ready_jobs:
125      if job_.id == job_id:
126        self.ready_jobs.remove(job_)
127        break
128
129  def AddJob(self, job_):
130    with self._lock:
131      job_.id = self._id_producer.GetNextId()
132
133      self.all_jobs.append(job_)
134      # Only queue a job as ready if it has no dependencies
135      if job_.is_ready:
136        self.ready_jobs.append(job_)
137
138      self._jobs_available.notifyAll()
139
140    return job_.id
141
142  def CleanUpJob(self, job_):
143    with self._lock:
144      if job_.id in self.job_executer_mapping:
145        self.job_executer_mapping[job_.id].CleanUpWorkDir()
146        del self.job_executer_mapping[job_.id]
147      # TODO(raymes): remove job from self.all_jobs
148
149  def NotifyJobComplete(self, job_):
150    self.machine_manager.ReturnMachines(job_.machines)
151
152    with self._lock:
153      self._logger.debug('Handling %r completion event.', job_)
154
155      if job_.status == job.STATUS_SUCCEEDED:
156        for succ in job_.successors:
157          if succ.is_ready:
158            if succ not in self.ready_jobs:
159              self.ready_jobs.append(succ)
160
161      self._jobs_available.notifyAll()
162
163  def AddListener(self, listener):
164    self.listeners.append(listener)
165
166  @logger.HandleUncaughtExceptions
167  def run(self):
168    self._logger.info("Started.")
169
170    while not self._exit_request:
171      with self._lock:
172        # Get the next ready job, block if there are none
173        self._jobs_available.wait()
174
175        while self.ready_jobs:
176          ready_job = self.ready_jobs.pop()
177
178          required_machines = ready_job.machine_dependencies
179          for pred in ready_job.predecessors:
180            required_machines[0].AddPreferredMachine(
181                pred.primary_machine.hostname)
182
183          machines = self.machine_manager.GetMachines(required_machines)
184          if not machines:
185            # If we can't get the necessary machines right now, simply wait
186            # for some jobs to complete
187            self.ready_jobs.insert(0, ready_job)
188            break
189          else:
190            # Mark as executing
191            executer = JobExecuter(ready_job, machines, self.listeners)
192            executer.start()
193            self.job_executer_mapping[ready_job.id] = executer
194
195    self._logger.info("Stopped.")
196