job_group_manager.py revision f2a3ef46f75d2196a93d3ed27f4d1fcf22b54fbe
1# Copyright 2010 Google Inc. All Rights Reserved.
2#
3
4import copy
5import logging
6import threading
7
8from automation.common import command as cmd
9from automation.common import logger
10from automation.common.command_executer import CommandExecuter
11from automation.common import job
12from automation.common import job_group
13from automation.server.job_manager import IdProducerPolicy
14
15
16class JobGroupManager(object):
17
18  def __init__(self, job_manager):
19    self.all_job_groups = []
20
21    self.job_manager = job_manager
22    self.job_manager.AddListener(self)
23
24    self._lock = threading.Lock()
25    self._job_group_finished = threading.Condition(self._lock)
26
27    self._id_producer = IdProducerPolicy()
28    self._id_producer.Initialize(job_group.JobGroup.HOMEDIR_PREFIX,
29                                 'job-group-(?P<id>\d+)')
30
31    self._logger = logging.getLogger(self.__class__.__name__)
32
33  def GetJobGroup(self, group_id):
34    with self._lock:
35      for group in self.all_job_groups:
36        if group.id == group_id:
37          return group
38
39      return None
40
41  def GetAllJobGroups(self):
42    with self._lock:
43      return copy.deepcopy(self.all_job_groups)
44
45  def AddJobGroup(self, group):
46    with self._lock:
47      group.id = self._id_producer.GetNextId()
48
49    self._logger.debug('Creating runtime environment for %r.', group)
50
51    CommandExecuter().RunCommand(cmd.Chain(
52        cmd.RmTree(group.home_dir), cmd.MakeDir(group.home_dir)))
53
54    with self._lock:
55      self.all_job_groups.append(group)
56
57      for job_ in group.jobs:
58        self.job_manager.AddJob(job_)
59
60      group.status = job_group.STATUS_EXECUTING
61
62    self._logger.info('Added %r to queue.', group)
63
64    return group.id
65
66  def KillJobGroup(self, group):
67    with self._lock:
68      self._logger.debug('Killing all jobs that belong to %r.', group)
69
70      for job_ in group.jobs:
71        self.job_manager.KillJob(job_)
72
73      self._logger.debug('Waiting for jobs to quit.')
74
75      # Lets block until the group is killed so we know it is completed
76      # when we return.
77      while group.status not in [job_group.STATUS_SUCCEEDED,
78                                 job_group.STATUS_FAILED]:
79        self._job_group_finished.wait()
80
81  def NotifyJobComplete(self, job_):
82    self._logger.debug('Handling %r completion event.', job_)
83
84    group = job_.group
85
86    with self._lock:
87      # We need to perform an action only if the group hasn't already failed.
88      if group.status != job_group.STATUS_FAILED:
89        if job_.status == job.STATUS_FAILED:
90          # We have a failed job, abort the job group
91          group.status = job_group.STATUS_FAILED
92          if group.cleanup_on_failure:
93            for job_ in group.jobs:
94              # TODO(bjanakiraman): We should probably only kill dependent jobs
95              # instead of the whole job group.
96              self.job_manager.KillJob(job_)
97              self.job_manager.CleanUpJob(job_)
98        else:
99          # The job succeeded successfully -- lets check to see if we are done.
100          assert job_.status == job.STATUS_SUCCEEDED
101          finished = True
102          for other_job in group.jobs:
103            assert other_job.status != job.STATUS_FAILED
104            if other_job.status != job.STATUS_SUCCEEDED:
105              finished = False
106              break
107
108          if finished and group.status != job_group.STATUS_SUCCEEDED:
109            # TODO(kbaclawski): Without check performed above following code
110            # could be called more than once. This would trigger StateMachine
111            # crash, because it cannot transition from STATUS_SUCCEEDED to
112            # STATUS_SUCCEEDED. Need to address that bug in near future.
113            group.status = job_group.STATUS_SUCCEEDED
114            if group.cleanup_on_completion:
115              for job_ in group.jobs:
116                self.job_manager.CleanUpJob(job_)
117
118        self._job_group_finished.notifyAll()
119