1# Copyright 2017 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Monitor jobs and abort them as necessary.
6
7This daemon does a number of upkeep tasks:
8
9* When a process owning a job crashes, job_aborter will mark the job as
10  aborted in the database and clean up its lease files.
11
12* When a job is marked aborted in the database, job_aborter will signal
13  the process owning the job to abort.
14
15See also http://goto.google.com/monitor_db_per_job_refactor
16"""
17
18from __future__ import absolute_import
19from __future__ import division
20from __future__ import print_function
21
22import argparse
23import logging
24import sys
25import time
26
27from lucifer import autotest
28from lucifer import handoffs
29from lucifer import leasing
30from lucifer import loglib
31
32logger = logging.getLogger(__name__)
33
34
35def main(args):
36    """Main function
37
38    @param args: list of command line args
39    """
40
41    parser = argparse.ArgumentParser(prog='job_aborter', description=__doc__)
42    parser.add_argument('--jobdir', required=True)
43    loglib.add_logging_options(parser)
44    args = parser.parse_args(args)
45    loglib.configure_logging_with_args(parser, args)
46    logger.info('Starting with args: %r', args)
47
48    autotest.monkeypatch()
49    ts_mon_config = autotest.chromite_load('ts_mon_config')
50    with ts_mon_config.SetupTsMonGlobalState('job_aborter'):
51        _main_loop(jobdir=args.jobdir)
52    assert False  # cannot exit normally
53
54
55def _main_loop(jobdir):
56    transaction = autotest.deps_load('django.db.transaction')
57
58    @transaction.commit_manually
59    def flush_transaction():
60        """Flush transaction https://stackoverflow.com/questions/3346124/"""
61        transaction.commit()
62
63    metrics = _Metrics()
64    metrics.send_starting()
65    while True:
66        logger.debug('Tick')
67        metrics.send_tick()
68        _main_loop_body(metrics, jobdir)
69        flush_transaction()
70        time.sleep(20)
71
72
73def _main_loop_body(metrics, jobdir):
74    active_leases = {
75            lease.id: lease for lease in leasing.leases_iter(jobdir)
76            if not lease.expired()
77    }
78    _mark_expired_jobs_failed(metrics, active_leases)
79    _abort_timed_out_jobs(active_leases)
80    _abort_jobs_marked_aborting(active_leases)
81    _abort_special_tasks_marked_aborted()
82    _clean_up_expired_leases(jobdir)
83    # TODO(crbug.com/748234): abort_jobs_past_max_runtime goes into
84    # lucifer_run_job
85
86
87def _mark_expired_jobs_failed(metrics, active_leases):
88    """Mark expired jobs failed.
89
90    Expired jobs are jobs that have an incomplete JobHandoff and that do
91    not have an active lease.  These jobs have been handed off to a
92    job_reporter, but that job_reporter has crashed.  These jobs are
93    marked failed in the database.
94
95    @param metrics: _Metrics instance.
96    @param active_leases: dict mapping job ids to Leases.
97    """
98    logger.debug('Looking for expired jobs')
99    job_ids = []
100    for handoff in handoffs.incomplete():
101        logger.debug('Found handoff: %d', handoff.job_id)
102        if handoff.job_id not in active_leases:
103            logger.debug('Handoff %d is missing active lease', handoff.job_id)
104            job_ids.append(handoff.job_id)
105    handoffs.clean_up(job_ids)
106    handoffs.mark_complete(job_ids)
107    metrics.send_expired_jobs(len(job_ids))
108
109
110def _abort_timed_out_jobs(active_leases):
111    """Send abort to timed out jobs.
112
113    @param active_leases: dict mapping job ids to Leases.
114    """
115    for job in _timed_out_jobs_queryset():
116        if job.id in active_leases:
117            active_leases[job.id].maybe_abort()
118
119
120def _abort_jobs_marked_aborting(active_leases):
121    """Send abort to jobs marked aborting in Autotest database.
122
123    @param active_leases: dict mapping job ids to Leases.
124    """
125    for job in _aborting_jobs_queryset():
126        if job.id in active_leases:
127            active_leases[job.id].maybe_abort()
128
129
130def _abort_special_tasks_marked_aborted():
131    # TODO(crbug.com/748234): Special tasks not implemented yet.  This
132    # would abort jobs running on the behalf of special tasks and thus
133    # need to check a different database table.
134    pass
135
136
137def _clean_up_expired_leases(jobdir):
138    """Clean up files for expired leases.
139
140    We only care about active leases, so we can remove the stale files
141    for expired leases.
142    """
143    for lease in leasing.leases_iter(jobdir):
144        if lease.expired():
145            lease.cleanup()
146
147
148def _timed_out_jobs_queryset():
149    """Return a QuerySet of timed out Jobs.
150
151    @returns: Django QuerySet
152    """
153    models = autotest.load('frontend.afe.models')
154    return (
155            models.Job.objects
156            .filter(hostqueueentry__complete=False)
157            .extra(where=['created_on + INTERVAL timeout_mins MINUTE < NOW()'])
158            .distinct()
159    )
160
161
162def _aborting_jobs_queryset():
163    """Return a QuerySet of aborting Jobs.
164
165    @returns: Django QuerySet
166    """
167    models = autotest.load('frontend.afe.models')
168    return (
169            models.Job.objects
170            .filter(hostqueueentry__aborted=True)
171            .filter(hostqueueentry__complete=False)
172            .distinct()
173    )
174
175
176class _Metrics(object):
177
178    """Class for sending job_aborter metrics."""
179
180    def __init__(self):
181        metrics = autotest.chromite_load('metrics')
182        prefix = 'chromeos/lucifer/job_aborter'
183        self._starting_m = metrics.Counter(prefix + '/start')
184        self._tick_m = metrics.Counter(prefix + '/tick')
185        self._expired_m = metrics.Counter(prefix + '/expired_jobs')
186
187    def send_starting(self):
188        """Send starting metric."""
189        self._starting_m.increment()
190
191    def send_tick(self):
192        """Send tick metric."""
193        self._tick_m.increment()
194
195    def send_expired_jobs(self, count):
196        """Send expired_jobs metric."""
197        self._expired_m.increment_by(count)
198
199
200if __name__ == '__main__':
201    main(sys.argv[1:])
202