monitor_db_unittest.py revision a5288b4bb2b09aafe914d0b7d5aab79a7e433eaf
1#!/usr/bin/python
2
3import unittest, time, subprocess, os, StringIO, tempfile, datetime, shutil
4import logging
5import common
6import MySQLdb
7from autotest_lib.frontend import setup_django_environment
8from autotest_lib.frontend.afe import frontend_test_utils
9from autotest_lib.client.common_lib import global_config, host_protections
10from autotest_lib.client.common_lib.test_utils import mock
11from autotest_lib.database import database_connection, migrate
12from autotest_lib.frontend import thread_local
13from autotest_lib.frontend.afe import models
14from autotest_lib.scheduler import monitor_db, drone_manager, email_manager
15from autotest_lib.scheduler import scheduler_config
16
17_DEBUG = False
18
19
20class DummyAgent(object):
21    _is_running = False
22    _is_done = False
23    num_processes = 1
24    host_ids = []
25    queue_entry_ids = []
26
27    def is_running(self):
28        return self._is_running
29
30
31    def tick(self):
32        self._is_running = True
33
34
35    def is_done(self):
36        return self._is_done
37
38
39    def set_done(self, done):
40        self._is_done = done
41        self._is_running = not done
42
43
44class IsRow(mock.argument_comparator):
45    def __init__(self, row_id):
46        self.row_id = row_id
47
48
49    def is_satisfied_by(self, parameter):
50        return list(parameter)[0] == self.row_id
51
52
53    def __str__(self):
54        return 'row with id %s' % self.row_id
55
56
57class IsAgentWithTask(mock.argument_comparator):
58        def __init__(self, task):
59            self._task = task
60
61
62        def is_satisfied_by(self, parameter):
63            if not isinstance(parameter, monitor_db.Agent):
64                return False
65            tasks = list(parameter.queue.queue)
66            if len(tasks) != 1:
67                return False
68            return tasks[0] == self._task
69
70
71def _set_host_and_qe_ids(agent_or_task, id_list=None):
72    if id_list is None:
73        id_list = []
74    agent_or_task.host_ids = agent_or_task.queue_entry_ids = id_list
75
76
77class BaseSchedulerTest(unittest.TestCase,
78                        frontend_test_utils.FrontendTestMixin):
79    _config_section = 'AUTOTEST_WEB'
80
81    def _do_query(self, sql):
82        self._database.execute(sql)
83
84
85    def _set_monitor_stubs(self):
86        # Clear the instance cache as this is a brand new database.
87        monitor_db.DBObject._clear_instance_cache()
88
89        self._database = (
90            database_connection.DatabaseConnection.get_test_database(
91                self._test_db_file))
92        self._database.connect()
93        self._database.debug = _DEBUG
94
95        monitor_db._db = self._database
96        monitor_db._drone_manager._results_dir = '/test/path'
97        monitor_db._drone_manager._temporary_directory = '/test/path/tmp'
98
99
100    def setUp(self):
101        self._frontend_common_setup()
102        self._set_monitor_stubs()
103        self._dispatcher = monitor_db.Dispatcher()
104
105
106    def tearDown(self):
107        self._database.disconnect()
108        self._frontend_common_teardown()
109
110
111    def _update_hqe(self, set, where=''):
112        query = 'UPDATE host_queue_entries SET ' + set
113        if where:
114            query += ' WHERE ' + where
115        self._do_query(query)
116
117
118class DBObjectTest(BaseSchedulerTest):
119    # It may seem odd to subclass BaseSchedulerTest for this but it saves us
120    # duplicating some setup work for what we want to test.
121
122
123    def test_compare_fields_in_row(self):
124        host = monitor_db.Host(id=1)
125        fields = list(host._fields)
126        row_data = [getattr(host, fieldname) for fieldname in fields]
127        self.assertEqual({}, host._compare_fields_in_row(row_data))
128        row_data[fields.index('hostname')] = 'spam'
129        self.assertEqual({'hostname': ('host1', 'spam')},
130                         host._compare_fields_in_row(row_data))
131        row_data[fields.index('id')] = 23
132        self.assertEqual({'hostname': ('host1', 'spam'), 'id': (1, 23)},
133                         host._compare_fields_in_row(row_data))
134
135
136    def test_always_query(self):
137        host_a = monitor_db.Host(id=2)
138        self.assertEqual(host_a.hostname, 'host2')
139        self._do_query('UPDATE hosts SET hostname="host2-updated" WHERE id=2')
140        host_b = monitor_db.Host(id=2, always_query=True)
141        self.assert_(host_a is host_b, 'Cached instance not returned.')
142        self.assertEqual(host_a.hostname, 'host2-updated',
143                         'Database was not re-queried')
144
145        # If either of these are called, a query was made when it shouldn't be.
146        host_a._compare_fields_in_row = lambda _: self.fail('eek! a query!')
147        host_a._update_fields_from_row = host_a._compare_fields_in_row
148        host_c = monitor_db.Host(id=2, always_query=False)
149        self.assert_(host_a is host_c, 'Cached instance not returned')
150
151
152    def test_delete(self):
153        host = monitor_db.Host(id=3)
154        host.delete()
155        host = self.assertRaises(monitor_db.DBError, monitor_db.Host, id=3,
156                                 always_query=False)
157        host = self.assertRaises(monitor_db.DBError, monitor_db.Host, id=3,
158                                 always_query=True)
159
160    def test_save(self):
161        # Dummy Job to avoid creating a one in the HostQueueEntry __init__.
162        class MockJob(object):
163            def __init__(self, id):
164                pass
165            def tag(self):
166                return 'MockJob'
167        self.god.stub_with(monitor_db, 'Job', MockJob)
168        hqe = monitor_db.HostQueueEntry(
169                new_record=True,
170                row=[0, 1, 2, 'Queued', None, 0, 0, 0, '.', None, False, None])
171        hqe.save()
172        new_id = hqe.id
173        # Force a re-query and verify that the correct data was stored.
174        monitor_db.DBObject._clear_instance_cache()
175        hqe = monitor_db.HostQueueEntry(id=new_id)
176        self.assertEqual(hqe.id, new_id)
177        self.assertEqual(hqe.job_id, 1)
178        self.assertEqual(hqe.host_id, 2)
179        self.assertEqual(hqe.status, 'Queued')
180        self.assertEqual(hqe.meta_host, None)
181        self.assertEqual(hqe.active, False)
182        self.assertEqual(hqe.complete, False)
183        self.assertEqual(hqe.deleted, False)
184        self.assertEqual(hqe.execution_subdir, '.')
185        self.assertEqual(hqe.atomic_group_id, None)
186        self.assertEqual(hqe.started_on, None)
187
188
189class DispatcherSchedulingTest(BaseSchedulerTest):
190    _jobs_scheduled = []
191
192
193    def tearDown(self):
194        super(DispatcherSchedulingTest, self).tearDown()
195
196
197    def _set_monitor_stubs(self):
198        super(DispatcherSchedulingTest, self)._set_monitor_stubs()
199
200        def hqe__do_run_pre_job_tasks_stub(queue_entry):
201            """Return a test dummy.  Called by HostQueueEntry.run()."""
202            self._record_job_scheduled(queue_entry.job.id, queue_entry.host.id)
203            queue_entry.set_status('Starting')
204            return DummyAgent()
205
206        self.god.stub_with(monitor_db.HostQueueEntry, '_do_run_pre_job_tasks',
207                           hqe__do_run_pre_job_tasks_stub)
208
209        def hqe_queue_log_record_stub(self, log_line):
210            """No-Op to avoid calls down to the _drone_manager during tests."""
211
212        self.god.stub_with(monitor_db.HostQueueEntry, 'queue_log_record',
213                           hqe_queue_log_record_stub)
214
215
216    def _record_job_scheduled(self, job_id, host_id):
217        record = (job_id, host_id)
218        self.assert_(record not in self._jobs_scheduled,
219                     'Job %d scheduled on host %d twice' %
220                     (job_id, host_id))
221        self._jobs_scheduled.append(record)
222
223
224    def _assert_job_scheduled_on(self, job_id, host_id):
225        record = (job_id, host_id)
226        self.assert_(record in self._jobs_scheduled,
227                     'Job %d not scheduled on host %d as expected\n'
228                     'Jobs scheduled: %s' %
229                     (job_id, host_id, self._jobs_scheduled))
230        self._jobs_scheduled.remove(record)
231
232
233    def _assert_job_scheduled_on_number_of(self, job_id, host_ids, number):
234        """Assert job was scheduled on exactly number hosts out of a set."""
235        found = []
236        for host_id in host_ids:
237            record = (job_id, host_id)
238            if record in self._jobs_scheduled:
239                found.append(record)
240                self._jobs_scheduled.remove(record)
241        if len(found) < number:
242            self.fail('Job %d scheduled on fewer than %d hosts in %s.\n'
243                      'Jobs scheduled: %s' % (job_id, number, host_ids, found))
244        elif len(found) > number:
245            self.fail('Job %d scheduled on more than %d hosts in %s.\n'
246                      'Jobs scheduled: %s' % (job_id, number, host_ids, found))
247
248
249    def _check_for_extra_schedulings(self):
250        if len(self._jobs_scheduled) != 0:
251            self.fail('Extra jobs scheduled: ' +
252                      str(self._jobs_scheduled))
253
254
255    def _convert_jobs_to_metahosts(self, *job_ids):
256        sql_tuple = '(' + ','.join(str(i) for i in job_ids) + ')'
257        self._do_query('UPDATE host_queue_entries SET '
258                       'meta_host=host_id, host_id=NULL '
259                       'WHERE job_id IN ' + sql_tuple)
260
261
262    def _lock_host(self, host_id):
263        self._do_query('UPDATE hosts SET locked=1 WHERE id=' +
264                       str(host_id))
265
266
267    def setUp(self):
268        super(DispatcherSchedulingTest, self).setUp()
269        self._jobs_scheduled = []
270
271
272    def _test_basic_scheduling_helper(self, use_metahosts):
273        'Basic nonmetahost scheduling'
274        self._create_job_simple([1], use_metahosts)
275        self._create_job_simple([2], use_metahosts)
276        self._dispatcher._schedule_new_jobs()
277        self._assert_job_scheduled_on(1, 1)
278        self._assert_job_scheduled_on(2, 2)
279        self._check_for_extra_schedulings()
280
281
282    def _test_priorities_helper(self, use_metahosts):
283        'Test prioritization ordering'
284        self._create_job_simple([1], use_metahosts)
285        self._create_job_simple([2], use_metahosts)
286        self._create_job_simple([1,2], use_metahosts)
287        self._create_job_simple([1], use_metahosts, priority=1)
288        self._dispatcher._schedule_new_jobs()
289        self._assert_job_scheduled_on(4, 1) # higher priority
290        self._assert_job_scheduled_on(2, 2) # earlier job over later
291        self._check_for_extra_schedulings()
292
293
294    def _test_hosts_ready_helper(self, use_metahosts):
295        """
296        Only hosts that are status=Ready, unlocked and not invalid get
297        scheduled.
298        """
299        self._create_job_simple([1], use_metahosts)
300        self._do_query('UPDATE hosts SET status="Running" WHERE id=1')
301        self._dispatcher._schedule_new_jobs()
302        self._check_for_extra_schedulings()
303
304        self._do_query('UPDATE hosts SET status="Ready", locked=1 '
305                       'WHERE id=1')
306        self._dispatcher._schedule_new_jobs()
307        self._check_for_extra_schedulings()
308
309        self._do_query('UPDATE hosts SET locked=0, invalid=1 '
310                       'WHERE id=1')
311        self._dispatcher._schedule_new_jobs()
312        if not use_metahosts:
313            self._assert_job_scheduled_on(1, 1)
314        self._check_for_extra_schedulings()
315
316
317    def _test_hosts_idle_helper(self, use_metahosts):
318        'Only idle hosts get scheduled'
319        self._create_job(hosts=[1], active=True)
320        self._create_job_simple([1], use_metahosts)
321        self._dispatcher._schedule_new_jobs()
322        self._check_for_extra_schedulings()
323
324
325    def _test_obey_ACLs_helper(self, use_metahosts):
326        self._do_query('DELETE FROM acl_groups_hosts WHERE host_id=1')
327        self._create_job_simple([1], use_metahosts)
328        self._dispatcher._schedule_new_jobs()
329        self._check_for_extra_schedulings()
330
331
332    def test_basic_scheduling(self):
333        self._test_basic_scheduling_helper(False)
334
335
336    def test_priorities(self):
337        self._test_priorities_helper(False)
338
339
340    def test_hosts_ready(self):
341        self._test_hosts_ready_helper(False)
342
343
344    def test_hosts_idle(self):
345        self._test_hosts_idle_helper(False)
346
347
348    def test_obey_ACLs(self):
349        self._test_obey_ACLs_helper(False)
350
351
352    def test_one_time_hosts_ignore_ACLs(self):
353        self._do_query('DELETE FROM acl_groups_hosts WHERE host_id=1')
354        self._do_query('UPDATE hosts SET invalid=1 WHERE id=1')
355        self._create_job_simple([1])
356        self._dispatcher._schedule_new_jobs()
357        self._assert_job_scheduled_on(1, 1)
358        self._check_for_extra_schedulings()
359
360
361    def test_non_metahost_on_invalid_host(self):
362        """
363        Non-metahost entries can get scheduled on invalid hosts (this is how
364        one-time hosts work).
365        """
366        self._do_query('UPDATE hosts SET invalid=1')
367        self._test_basic_scheduling_helper(False)
368
369
370    def test_metahost_scheduling(self):
371        """
372        Basic metahost scheduling
373        """
374        self._test_basic_scheduling_helper(True)
375
376
377    def test_metahost_priorities(self):
378        self._test_priorities_helper(True)
379
380
381    def test_metahost_hosts_ready(self):
382        self._test_hosts_ready_helper(True)
383
384
385    def test_metahost_hosts_idle(self):
386        self._test_hosts_idle_helper(True)
387
388
389    def test_metahost_obey_ACLs(self):
390        self._test_obey_ACLs_helper(True)
391
392
393    def _setup_test_only_if_needed_labels(self):
394        # apply only_if_needed label3 to host1
395        models.Host.smart_get('host1').labels.add(self.label3)
396        return self._create_job_simple([1], use_metahost=True)
397
398
399    def test_only_if_needed_labels_avoids_host(self):
400        job = self._setup_test_only_if_needed_labels()
401        # if the job doesn't depend on label3, there should be no scheduling
402        self._dispatcher._schedule_new_jobs()
403        self._check_for_extra_schedulings()
404
405
406    def test_only_if_needed_labels_schedules(self):
407        job = self._setup_test_only_if_needed_labels()
408        job.dependency_labels.add(self.label3)
409        self._dispatcher._schedule_new_jobs()
410        self._assert_job_scheduled_on(1, 1)
411        self._check_for_extra_schedulings()
412
413
414    def test_only_if_needed_labels_via_metahost(self):
415        job = self._setup_test_only_if_needed_labels()
416        job.dependency_labels.add(self.label3)
417        # should also work if the metahost is the only_if_needed label
418        self._do_query('DELETE FROM jobs_dependency_labels')
419        self._create_job(metahosts=[3])
420        self._dispatcher._schedule_new_jobs()
421        self._assert_job_scheduled_on(2, 1)
422        self._check_for_extra_schedulings()
423
424
425    def test_nonmetahost_over_metahost(self):
426        """
427        Non-metahost entries should take priority over metahost entries
428        for the same host
429        """
430        self._create_job(metahosts=[1])
431        self._create_job(hosts=[1])
432        self._dispatcher._schedule_new_jobs()
433        self._assert_job_scheduled_on(2, 1)
434        self._check_for_extra_schedulings()
435
436
437    def test_metahosts_obey_blocks(self):
438        """
439        Metahosts can't get scheduled on hosts already scheduled for
440        that job.
441        """
442        self._create_job(metahosts=[1], hosts=[1])
443        # make the nonmetahost entry complete, so the metahost can try
444        # to get scheduled
445        self._update_hqe(set='complete = 1', where='host_id=1')
446        self._dispatcher._schedule_new_jobs()
447        self._check_for_extra_schedulings()
448
449
450    # TODO(gps): These should probably live in their own TestCase class
451    # specific to testing HostScheduler methods directly.  It was convenient
452    # to put it here for now to share existing test environment setup code.
453    def test_HostScheduler_check_atomic_group_labels(self):
454        normal_job = self._create_job(metahosts=[0])
455        atomic_job = self._create_job(atomic_group=1)
456        # Indirectly initialize the internal state of the host scheduler.
457        self._dispatcher._refresh_pending_queue_entries()
458
459        atomic_hqe = monitor_db.HostQueueEntry.fetch(where='job_id=%d' %
460                                                     atomic_job.id).next()
461        normal_hqe = monitor_db.HostQueueEntry.fetch(where='job_id=%d' %
462                                                     normal_job.id).next()
463
464        host_scheduler = self._dispatcher._host_scheduler
465        self.assertTrue(host_scheduler._check_atomic_group_labels(
466                [self.label4.id], atomic_hqe))
467        self.assertFalse(host_scheduler._check_atomic_group_labels(
468                [self.label4.id], normal_hqe))
469        self.assertFalse(host_scheduler._check_atomic_group_labels(
470                [self.label5.id, self.label6.id, self.label7.id], normal_hqe))
471        self.assertTrue(host_scheduler._check_atomic_group_labels(
472                [self.label4.id, self.label6.id], atomic_hqe))
473        self.assertTrue(host_scheduler._check_atomic_group_labels(
474                        [self.label4.id, self.label5.id],
475                        atomic_hqe))
476
477
478    def test_HostScheduler_get_host_atomic_group_id(self):
479        job = self._create_job(metahosts=[self.label6.id])
480        queue_entry = monitor_db.HostQueueEntry.fetch(
481                where='job_id=%d' % job.id).next()
482        # Indirectly initialize the internal state of the host scheduler.
483        self._dispatcher._refresh_pending_queue_entries()
484
485        # Test the host scheduler
486        host_scheduler = self._dispatcher._host_scheduler
487
488        # Two labels each in a different atomic group.  This should log an
489        # error and continue.
490        orig_logging_error = logging.error
491        def mock_logging_error(message, *args):
492            mock_logging_error._num_calls += 1
493            # Test the logging call itself, we just wrapped it to count it.
494            orig_logging_error(message, *args)
495        mock_logging_error._num_calls = 0
496        self.god.stub_with(logging, 'error', mock_logging_error)
497        self.assertNotEquals(None, host_scheduler._get_host_atomic_group_id(
498                [self.label4.id, self.label8.id], queue_entry))
499        self.assertTrue(mock_logging_error._num_calls > 0)
500        self.god.unstub(logging, 'error')
501
502        # Two labels both in the same atomic group, this should not raise an
503        # error, it will merely cause the job to schedule on the intersection.
504        self.assertEquals(1, host_scheduler._get_host_atomic_group_id(
505                [self.label4.id, self.label5.id]))
506
507        self.assertEquals(None, host_scheduler._get_host_atomic_group_id([]))
508        self.assertEquals(None, host_scheduler._get_host_atomic_group_id(
509                [self.label3.id, self.label7.id, self.label6.id]))
510        self.assertEquals(1, host_scheduler._get_host_atomic_group_id(
511                [self.label4.id, self.label7.id, self.label6.id]))
512        self.assertEquals(1, host_scheduler._get_host_atomic_group_id(
513                [self.label7.id, self.label5.id]))
514
515
516    def test_atomic_group_hosts_blocked_from_non_atomic_jobs(self):
517        # Create a job scheduled to run on label6.
518        self._create_job(metahosts=[self.label6.id])
519        self._dispatcher._schedule_new_jobs()
520        # label6 only has hosts that are in atomic groups associated with it,
521        # there should be no scheduling.
522        self._check_for_extra_schedulings()
523
524
525    def test_atomic_group_hosts_blocked_from_non_atomic_jobs_explicit(self):
526        # Create a job scheduled to run on label5.  This is an atomic group
527        # label but this job does not request atomic group scheduling.
528        self._create_job(metahosts=[self.label5.id])
529        self._dispatcher._schedule_new_jobs()
530        # label6 only has hosts that are in atomic groups associated with it,
531        # there should be no scheduling.
532        self._check_for_extra_schedulings()
533
534
535    def test_atomic_group_scheduling_basics(self):
536        # Create jobs scheduled to run on an atomic group.
537        job_a = self._create_job(synchronous=True, metahosts=[self.label4.id],
538                         atomic_group=1)
539        job_b = self._create_job(synchronous=True, metahosts=[self.label5.id],
540                         atomic_group=1)
541        self._dispatcher._schedule_new_jobs()
542        # atomic_group.max_number_of_machines was 2 so we should run on 2.
543        self._assert_job_scheduled_on_number_of(job_a.id, (5, 6, 7), 2)
544        self._assert_job_scheduled_on(job_b.id, 8)  # label5
545        self._assert_job_scheduled_on(job_b.id, 9)  # label5
546        self._check_for_extra_schedulings()
547
548        # The three host label4 atomic group still has one host available.
549        # That means a job with a synch_count of 1 asking to be scheduled on
550        # the atomic group can still use the final machine.
551        #
552        # This may seem like a somewhat odd use case.  It allows the use of an
553        # atomic group as a set of machines to run smaller jobs within (a set
554        # of hosts configured for use in network tests with eachother perhaps?)
555        onehost_job = self._create_job(atomic_group=1)
556        self._dispatcher._schedule_new_jobs()
557        self._assert_job_scheduled_on_number_of(onehost_job.id, (5, 6, 7), 1)
558        self._check_for_extra_schedulings()
559
560        # No more atomic groups have hosts available, no more jobs should
561        # be scheduled.
562        self._create_job(atomic_group=1)
563        self._dispatcher._schedule_new_jobs()
564        self._check_for_extra_schedulings()
565
566
567    def test_atomic_group_scheduling_obeys_acls(self):
568        # Request scheduling on a specific atomic label but be denied by ACLs.
569        self._do_query('DELETE FROM acl_groups_hosts WHERE host_id in (8,9)')
570        job = self._create_job(metahosts=[self.label5.id], atomic_group=1)
571        self._dispatcher._schedule_new_jobs()
572        self._check_for_extra_schedulings()
573
574
575    def test_atomic_group_scheduling_dependency_label_exclude(self):
576        # A dependency label that matches no hosts in the atomic group.
577        job_a = self._create_job(atomic_group=1)
578        job_a.dependency_labels.add(self.label3)
579        self._dispatcher._schedule_new_jobs()
580        self._check_for_extra_schedulings()
581
582
583    def test_atomic_group_scheduling_metahost_dependency_label_exclude(self):
584        # A metahost and dependency label that excludes too many hosts.
585        job_b = self._create_job(synchronous=True, metahosts=[self.label4.id],
586                                 atomic_group=1)
587        job_b.dependency_labels.add(self.label7)
588        self._dispatcher._schedule_new_jobs()
589        self._check_for_extra_schedulings()
590
591
592    def test_atomic_group_scheduling_dependency_label_match(self):
593        # A dependency label that exists on enough atomic group hosts in only
594        # one of the two atomic group labels.
595        job_c = self._create_job(synchronous=True, atomic_group=1)
596        job_c.dependency_labels.add(self.label7)
597        self._dispatcher._schedule_new_jobs()
598        self._assert_job_scheduled_on_number_of(job_c.id, (8, 9), 2)
599        self._check_for_extra_schedulings()
600
601
602    def test_atomic_group_scheduling_no_metahost(self):
603        # Force it to schedule on the other group for a reliable test.
604        self._do_query('UPDATE hosts SET invalid=1 WHERE id=9')
605        # An atomic job without a metahost.
606        job = self._create_job(synchronous=True, atomic_group=1)
607        self._dispatcher._schedule_new_jobs()
608        self._assert_job_scheduled_on_number_of(job.id, (5, 6, 7), 2)
609        self._check_for_extra_schedulings()
610
611
612    def test_atomic_group_scheduling_partial_group(self):
613        # Make one host in labels[3] unavailable so that there are only two
614        # hosts left in the group.
615        self._do_query('UPDATE hosts SET status="Repair Failed" WHERE id=5')
616        job = self._create_job(synchronous=True, metahosts=[self.label4.id],
617                         atomic_group=1)
618        self._dispatcher._schedule_new_jobs()
619        # Verify that it was scheduled on the 2 ready hosts in that group.
620        self._assert_job_scheduled_on(job.id, 6)
621        self._assert_job_scheduled_on(job.id, 7)
622        self._check_for_extra_schedulings()
623
624
625    def test_atomic_group_scheduling_not_enough_available(self):
626        # Mark some hosts in each atomic group label as not usable.
627        # One host running, another invalid in the first group label.
628        self._do_query('UPDATE hosts SET status="Running" WHERE id=5')
629        self._do_query('UPDATE hosts SET invalid=1 WHERE id=6')
630        # One host invalid in the second group label.
631        self._do_query('UPDATE hosts SET invalid=1 WHERE id=9')
632        # Nothing to schedule when no group label has enough (2) good hosts..
633        self._create_job(atomic_group=1, synchronous=True)
634        self._dispatcher._schedule_new_jobs()
635        # There are not enough hosts in either atomic group,
636        # No more scheduling should occur.
637        self._check_for_extra_schedulings()
638
639        # Now create an atomic job that has a synch count of 1.  It should
640        # schedule on exactly one of the hosts.
641        onehost_job = self._create_job(atomic_group=1)
642        self._dispatcher._schedule_new_jobs()
643        self._assert_job_scheduled_on_number_of(onehost_job.id, (7, 8), 1)
644
645
646    def test_atomic_group_scheduling_no_valid_hosts(self):
647        self._do_query('UPDATE hosts SET invalid=1 WHERE id in (8,9)')
648        self._create_job(synchronous=True, metahosts=[self.label5.id],
649                         atomic_group=1)
650        self._dispatcher._schedule_new_jobs()
651        # no hosts in the selected group and label are valid.  no schedulings.
652        self._check_for_extra_schedulings()
653
654
655    def test_atomic_group_scheduling_metahost_works(self):
656        # Test that atomic group scheduling also obeys metahosts.
657        self._create_job(metahosts=[0], atomic_group=1)
658        self._dispatcher._schedule_new_jobs()
659        # There are no atomic group hosts that also have that metahost.
660        self._check_for_extra_schedulings()
661
662        job_b = self._create_job(metahosts=[self.label5.id], atomic_group=1)
663        self._dispatcher._schedule_new_jobs()
664        self._assert_job_scheduled_on(job_b.id, 8)
665        self._assert_job_scheduled_on(job_b.id, 9)
666        self._check_for_extra_schedulings()
667
668
669    def test_atomic_group_skips_ineligible_hosts(self):
670        # Test hosts marked ineligible for this job are not eligible.
671        # How would this ever happen anyways?
672        job = self._create_job(metahosts=[self.label4.id], atomic_group=1)
673        models.IneligibleHostQueue.objects.create(job=job, host_id=5)
674        models.IneligibleHostQueue.objects.create(job=job, host_id=6)
675        models.IneligibleHostQueue.objects.create(job=job, host_id=7)
676        self._dispatcher._schedule_new_jobs()
677        # No scheduling should occur as all desired hosts were ineligible.
678        self._check_for_extra_schedulings()
679
680
681    def test_atomic_group_scheduling_fail(self):
682        # If synch_count is > the atomic group number of machines, the job
683        # should be aborted immediately.
684        model_job = self._create_job(synchronous=True, atomic_group=1)
685        model_job.synch_count = 4
686        model_job.save()
687        job = monitor_db.Job(id=model_job.id)
688        self._dispatcher._schedule_new_jobs()
689        self._check_for_extra_schedulings()
690        queue_entries = job.get_host_queue_entries()
691        self.assertEqual(1, len(queue_entries))
692        self.assertEqual(queue_entries[0].status,
693                         models.HostQueueEntry.Status.ABORTED)
694
695
696    def test_atomic_group_no_labels_no_scheduling(self):
697        # Never schedule on atomic groups marked invalid.
698        job = self._create_job(metahosts=[self.label5.id], synchronous=True,
699                               atomic_group=1)
700        # Deleting an atomic group via the frontend marks it invalid and
701        # removes all label references to the group.  The job now references
702        # an invalid atomic group with no labels associated with it.
703        self.label5.atomic_group.invalid = True
704        self.label5.atomic_group.save()
705        self.label5.atomic_group = None
706        self.label5.save()
707
708        self._dispatcher._schedule_new_jobs()
709        self._check_for_extra_schedulings()
710
711
712    def test_schedule_directly_on_atomic_group_host_fail(self):
713        # Scheduling a job directly on hosts in an atomic group must
714        # fail to avoid users inadvertently holding up the use of an
715        # entire atomic group by using the machines individually.
716        job = self._create_job(hosts=[5])
717        self._dispatcher._schedule_new_jobs()
718        self._check_for_extra_schedulings()
719
720
721    def test_schedule_directly_on_atomic_group_host(self):
722        # Scheduling a job directly on one host in an atomic group will
723        # work when the atomic group is listed on the HQE in addition
724        # to the host (assuming the sync count is 1).
725        job = self._create_job(hosts=[5], atomic_group=1)
726        self._dispatcher._schedule_new_jobs()
727        self._assert_job_scheduled_on(job.id, 5)
728        self._check_for_extra_schedulings()
729
730
731    def test_schedule_directly_on_atomic_group_hosts_sync2(self):
732        job = self._create_job(hosts=[5,8], atomic_group=1, synchronous=True)
733        self._dispatcher._schedule_new_jobs()
734        self._assert_job_scheduled_on(job.id, 5)
735        self._assert_job_scheduled_on(job.id, 8)
736        self._check_for_extra_schedulings()
737
738
739    def test_schedule_directly_on_atomic_group_hosts_wrong_group(self):
740        job = self._create_job(hosts=[5,8], atomic_group=2, synchronous=True)
741        self._dispatcher._schedule_new_jobs()
742        self._check_for_extra_schedulings()
743
744
745    def test_only_schedule_queued_entries(self):
746        self._create_job(metahosts=[1])
747        self._update_hqe(set='active=1, host_id=2')
748        self._dispatcher._schedule_new_jobs()
749        self._check_for_extra_schedulings()
750
751
752    def test_no_ready_hosts(self):
753        self._create_job(hosts=[1])
754        self._do_query('UPDATE hosts SET status="Repair Failed"')
755        self._dispatcher._schedule_new_jobs()
756        self._check_for_extra_schedulings()
757
758
759class DispatcherThrottlingTest(BaseSchedulerTest):
760    """
761    Test that the dispatcher throttles:
762     * total number of running processes
763     * number of processes started per cycle
764    """
765    _MAX_RUNNING = 3
766    _MAX_STARTED = 2
767
768    def setUp(self):
769        super(DispatcherThrottlingTest, self).setUp()
770        scheduler_config.config.max_processes_per_drone = self._MAX_RUNNING
771        scheduler_config.config.max_processes_started_per_cycle = (
772            self._MAX_STARTED)
773
774        def fake_max_runnable_processes(fake_self):
775            running = sum(agent.num_processes
776                          for agent in self._agents
777                          if agent.is_running())
778            return self._MAX_RUNNING - running
779        self.god.stub_with(drone_manager.DroneManager, 'max_runnable_processes',
780                           fake_max_runnable_processes)
781
782
783    def _setup_some_agents(self, num_agents):
784        self._agents = [DummyAgent() for i in xrange(num_agents)]
785        self._dispatcher._agents = list(self._agents)
786
787
788    def _run_a_few_cycles(self):
789        for i in xrange(4):
790            self._dispatcher._handle_agents()
791
792
793    def _assert_agents_started(self, indexes, is_started=True):
794        for i in indexes:
795            self.assert_(self._agents[i].is_running() == is_started,
796                         'Agent %d %sstarted' %
797                         (i, is_started and 'not ' or ''))
798
799
800    def _assert_agents_not_started(self, indexes):
801        self._assert_agents_started(indexes, False)
802
803
804    def test_throttle_total(self):
805        self._setup_some_agents(4)
806        self._run_a_few_cycles()
807        self._assert_agents_started([0, 1, 2])
808        self._assert_agents_not_started([3])
809
810
811    def test_throttle_per_cycle(self):
812        self._setup_some_agents(3)
813        self._dispatcher._handle_agents()
814        self._assert_agents_started([0, 1])
815        self._assert_agents_not_started([2])
816
817
818    def test_throttle_with_synchronous(self):
819        self._setup_some_agents(2)
820        self._agents[0].num_processes = 3
821        self._run_a_few_cycles()
822        self._assert_agents_started([0])
823        self._assert_agents_not_started([1])
824
825
826    def test_large_agent_starvation(self):
827        """
828        Ensure large agents don't get starved by lower-priority agents.
829        """
830        self._setup_some_agents(3)
831        self._agents[1].num_processes = 3
832        self._run_a_few_cycles()
833        self._assert_agents_started([0])
834        self._assert_agents_not_started([1, 2])
835
836        self._agents[0].set_done(True)
837        self._run_a_few_cycles()
838        self._assert_agents_started([1])
839        self._assert_agents_not_started([2])
840
841
842    def test_zero_process_agent(self):
843        self._setup_some_agents(5)
844        self._agents[4].num_processes = 0
845        self._run_a_few_cycles()
846        self._assert_agents_started([0, 1, 2, 4])
847        self._assert_agents_not_started([3])
848
849
850class FindAbortTest(BaseSchedulerTest):
851    """
852    Test the dispatcher abort functionality.
853    """
854    def _check_host_agent(self, agent, host_id):
855        self.assert_(isinstance(agent, monitor_db.Agent))
856        tasks = list(agent.queue.queue)
857        self.assertEquals(len(tasks), 2)
858        cleanup, verify = tasks
859
860        self.assert_(isinstance(cleanup, monitor_db.CleanupTask))
861        self.assertEquals(cleanup.host.id, host_id)
862
863        self.assert_(isinstance(verify, monitor_db.VerifyTask))
864        self.assertEquals(verify.host.id, host_id)
865
866
867    def _check_agents(self, agents):
868        agents = list(agents)
869        self.assertEquals(len(agents), 3)
870        self.assertEquals(agents[0], self._agent)
871        self._check_host_agent(agents[1], 1)
872        self._check_host_agent(agents[2], 2)
873
874
875    def _common_setup(self):
876        self._create_job(hosts=[1, 2])
877        self._update_hqe(set='aborted=1')
878        self._agent = self.god.create_mock_class(monitor_db.Agent, 'old_agent')
879        _set_host_and_qe_ids(self._agent, [1, 2])
880        self._agent.abort.expect_call()
881        self._agent.abort.expect_call() # gets called once for each HQE
882        self._dispatcher.add_agent(self._agent)
883
884
885    def test_find_aborting(self):
886        self._common_setup()
887        self._dispatcher._find_aborting()
888        self.god.check_playback()
889
890
891    def test_find_aborting_verifying(self):
892        self._common_setup()
893        self._update_hqe(set='active=1, status="Verifying"')
894
895        self._dispatcher._find_aborting()
896
897        self._check_agents(self._dispatcher._agents)
898        self.god.check_playback()
899
900
901class JobTimeoutTest(BaseSchedulerTest):
902    def _test_synch_start_timeout_helper(self, expect_abort,
903                                         set_created_on=True, set_active=True,
904                                         set_acl=True):
905        scheduler_config.config.synch_job_start_timeout_minutes = 60
906        job = self._create_job(hosts=[1, 2])
907        if set_active:
908            hqe = job.hostqueueentry_set.filter(host__id=1)[0]
909            hqe.status = 'Pending'
910            hqe.active = 1
911            hqe.save()
912
913        everyone_acl = models.AclGroup.smart_get('Everyone')
914        host1 = models.Host.smart_get(1)
915        if set_acl:
916            everyone_acl.hosts.add(host1)
917        else:
918            everyone_acl.hosts.remove(host1)
919
920        job.created_on = datetime.datetime.now()
921        if set_created_on:
922            job.created_on -= datetime.timedelta(minutes=100)
923        job.save()
924
925        cleanup = self._dispatcher._periodic_cleanup
926        cleanup._abort_jobs_past_synch_start_timeout()
927
928        for hqe in job.hostqueueentry_set.all():
929            self.assertEquals(hqe.aborted, expect_abort)
930
931
932    def test_synch_start_timeout_helper(self):
933        # no abort if any of the condition aren't met
934        self._test_synch_start_timeout_helper(False, set_created_on=False)
935        self._test_synch_start_timeout_helper(False, set_active=False)
936        self._test_synch_start_timeout_helper(False, set_acl=False)
937        # abort if all conditions are met
938        self._test_synch_start_timeout_helper(True)
939
940
941class PidfileRunMonitorTest(unittest.TestCase):
942    execution_tag = 'test_tag'
943    pid = 12345
944    process = drone_manager.Process('myhost', pid)
945    num_tests_failed = 1
946
947    def setUp(self):
948        self.god = mock.mock_god()
949        self.mock_drone_manager = self.god.create_mock_class(
950            drone_manager.DroneManager, 'drone_manager')
951        self.god.stub_with(monitor_db, '_drone_manager',
952                           self.mock_drone_manager)
953        self.god.stub_function(email_manager.manager, 'enqueue_notify_email')
954
955        self.pidfile_id = object()
956
957        (self.mock_drone_manager.get_pidfile_id_from
958             .expect_call(self.execution_tag,
959                          pidfile_name=monitor_db._AUTOSERV_PID_FILE)
960             .and_return(self.pidfile_id))
961        self.mock_drone_manager.register_pidfile.expect_call(self.pidfile_id)
962
963        self.monitor = monitor_db.PidfileRunMonitor()
964        self.monitor.attach_to_existing_process(self.execution_tag)
965
966
967    def tearDown(self):
968        self.god.unstub_all()
969
970
971    def setup_pidfile(self, pid=None, exit_code=None, tests_failed=None,
972                      use_second_read=False):
973        contents = drone_manager.PidfileContents()
974        if pid is not None:
975            contents.process = drone_manager.Process('myhost', pid)
976        contents.exit_status = exit_code
977        contents.num_tests_failed = tests_failed
978        self.mock_drone_manager.get_pidfile_contents.expect_call(
979            self.pidfile_id, use_second_read=use_second_read).and_return(
980            contents)
981
982
983    def set_not_yet_run(self):
984        self.setup_pidfile()
985
986
987    def set_empty_pidfile(self):
988        self.setup_pidfile()
989
990
991    def set_running(self, use_second_read=False):
992        self.setup_pidfile(self.pid, use_second_read=use_second_read)
993
994
995    def set_complete(self, error_code, use_second_read=False):
996        self.setup_pidfile(self.pid, error_code, self.num_tests_failed,
997                           use_second_read=use_second_read)
998
999
1000    def _check_monitor(self, expected_pid, expected_exit_status,
1001                       expected_num_tests_failed):
1002        if expected_pid is None:
1003            self.assertEquals(self.monitor._state.process, None)
1004        else:
1005            self.assertEquals(self.monitor._state.process.pid, expected_pid)
1006        self.assertEquals(self.monitor._state.exit_status, expected_exit_status)
1007        self.assertEquals(self.monitor._state.num_tests_failed,
1008                          expected_num_tests_failed)
1009
1010
1011        self.god.check_playback()
1012
1013
1014    def _test_read_pidfile_helper(self, expected_pid, expected_exit_status,
1015                                  expected_num_tests_failed):
1016        self.monitor._read_pidfile()
1017        self._check_monitor(expected_pid, expected_exit_status,
1018                            expected_num_tests_failed)
1019
1020
1021    def _get_expected_tests_failed(self, expected_exit_status):
1022        if expected_exit_status is None:
1023            expected_tests_failed = None
1024        else:
1025            expected_tests_failed = self.num_tests_failed
1026        return expected_tests_failed
1027
1028
1029    def test_read_pidfile(self):
1030        self.set_not_yet_run()
1031        self._test_read_pidfile_helper(None, None, None)
1032
1033        self.set_empty_pidfile()
1034        self._test_read_pidfile_helper(None, None, None)
1035
1036        self.set_running()
1037        self._test_read_pidfile_helper(self.pid, None, None)
1038
1039        self.set_complete(123)
1040        self._test_read_pidfile_helper(self.pid, 123, self.num_tests_failed)
1041
1042
1043    def test_read_pidfile_error(self):
1044        self.mock_drone_manager.get_pidfile_contents.expect_call(
1045            self.pidfile_id, use_second_read=False).and_return(
1046            drone_manager.InvalidPidfile('error'))
1047        self.assertRaises(monitor_db.PidfileRunMonitor._PidfileException,
1048                          self.monitor._read_pidfile)
1049        self.god.check_playback()
1050
1051
1052    def setup_is_running(self, is_running):
1053        self.mock_drone_manager.is_process_running.expect_call(
1054            self.process).and_return(is_running)
1055
1056
1057    def _test_get_pidfile_info_helper(self, expected_pid, expected_exit_status,
1058                                      expected_num_tests_failed):
1059        self.monitor._get_pidfile_info()
1060        self._check_monitor(expected_pid, expected_exit_status,
1061                            expected_num_tests_failed)
1062
1063
1064    def test_get_pidfile_info(self):
1065        """
1066        normal cases for get_pidfile_info
1067        """
1068        # running
1069        self.set_running()
1070        self.setup_is_running(True)
1071        self._test_get_pidfile_info_helper(self.pid, None, None)
1072
1073        # exited during check
1074        self.set_running()
1075        self.setup_is_running(False)
1076        self.set_complete(123, use_second_read=True) # pidfile gets read again
1077        self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed)
1078
1079        # completed
1080        self.set_complete(123)
1081        self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed)
1082
1083
1084    def test_get_pidfile_info_running_no_proc(self):
1085        """
1086        pidfile shows process running, but no proc exists
1087        """
1088        # running but no proc
1089        self.set_running()
1090        self.setup_is_running(False)
1091        self.set_running(use_second_read=True)
1092        email_manager.manager.enqueue_notify_email.expect_call(
1093            mock.is_string_comparator(), mock.is_string_comparator())
1094        self._test_get_pidfile_info_helper(self.pid, 1, 0)
1095        self.assertTrue(self.monitor.lost_process)
1096
1097
1098    def test_get_pidfile_info_not_yet_run(self):
1099        """
1100        pidfile hasn't been written yet
1101        """
1102        self.set_not_yet_run()
1103        self._test_get_pidfile_info_helper(None, None, None)
1104
1105
1106    def test_process_failed_to_write_pidfile(self):
1107        self.set_not_yet_run()
1108        email_manager.manager.enqueue_notify_email.expect_call(
1109            mock.is_string_comparator(), mock.is_string_comparator())
1110        self.monitor._start_time = time.time() - monitor_db.PIDFILE_TIMEOUT - 1
1111        self._test_get_pidfile_info_helper(None, 1, 0)
1112        self.assertTrue(self.monitor.lost_process)
1113
1114
1115class AgentTest(unittest.TestCase):
1116    def setUp(self):
1117        self.god = mock.mock_god()
1118        self._dispatcher = self.god.create_mock_class(monitor_db.Dispatcher,
1119                                                      'dispatcher')
1120
1121
1122    def tearDown(self):
1123        self.god.unstub_all()
1124
1125
1126    def _create_mock_task(self, name):
1127        task = self.god.create_mock_class(monitor_db.AgentTask, name)
1128        _set_host_and_qe_ids(task)
1129        return task
1130
1131    def _create_agent(self, tasks):
1132        agent = monitor_db.Agent(tasks)
1133        agent.dispatcher = self._dispatcher
1134        return agent
1135
1136
1137    def _finish_agent(self, agent):
1138        while not agent.is_done():
1139            agent.tick()
1140
1141
1142    def test_agent(self):
1143        task1 = self._create_mock_task('task1')
1144        task2 = self._create_mock_task('task2')
1145        task3 = self._create_mock_task('task3')
1146        task1.poll.expect_call()
1147        task1.is_done.expect_call().and_return(False)
1148        task1.poll.expect_call()
1149        task1.is_done.expect_call().and_return(True)
1150        task1.is_done.expect_call().and_return(True)
1151        task1.success = True
1152
1153        task2.poll.expect_call()
1154        task2.is_done.expect_call().and_return(True)
1155        task2.is_done.expect_call().and_return(True)
1156        task2.success = False
1157        task2.failure_tasks = [task3]
1158
1159        self._dispatcher.add_agent.expect_call(IsAgentWithTask(task3))
1160
1161        agent = self._create_agent([task1, task2])
1162        self._finish_agent(agent)
1163        self.god.check_playback()
1164
1165
1166    def _test_agent_abort_helper(self, ignore_abort=False):
1167        task1 = self._create_mock_task('task1')
1168        task2 = self._create_mock_task('task2')
1169        task1.poll.expect_call()
1170        task1.is_done.expect_call().and_return(False)
1171        task1.abort.expect_call()
1172        if ignore_abort:
1173            task1.aborted = False # task ignores abort; execution continues
1174
1175            task1.poll.expect_call()
1176            task1.is_done.expect_call().and_return(True)
1177            task1.is_done.expect_call().and_return(True)
1178            task1.success = True
1179
1180            task2.poll.expect_call()
1181            task2.is_done.expect_call().and_return(True)
1182            task2.is_done.expect_call().and_return(True)
1183            task2.success = True
1184        else:
1185            task1.aborted = True
1186            task2.abort.expect_call()
1187            task2.aborted = True
1188
1189        agent = self._create_agent([task1, task2])
1190        agent.tick()
1191        agent.abort()
1192        self._finish_agent(agent)
1193        self.god.check_playback()
1194
1195
1196    def test_agent_abort(self):
1197        self._test_agent_abort_helper()
1198        self._test_agent_abort_helper(True)
1199
1200
1201    def _test_agent_abort_before_started_helper(self, ignore_abort=False):
1202        task = self._create_mock_task('task')
1203        task.abort.expect_call()
1204        if ignore_abort:
1205            task.aborted = False
1206            task.poll.expect_call()
1207            task.is_done.expect_call().and_return(True)
1208            task.is_done.expect_call().and_return(True)
1209            task.success = True
1210        else:
1211            task.aborted = True
1212
1213        agent = self._create_agent([task])
1214        agent.abort()
1215        self._finish_agent(agent)
1216        self.god.check_playback()
1217
1218
1219    def test_agent_abort_before_started(self):
1220        self._test_agent_abort_before_started_helper()
1221        self._test_agent_abort_before_started_helper(True)
1222
1223
1224class DelayedCallTaskTest(unittest.TestCase):
1225    def setUp(self):
1226        self.god = mock.mock_god()
1227
1228
1229    def tearDown(self):
1230        self.god.unstub_all()
1231
1232
1233    def test_delayed_call(self):
1234        test_time = self.god.create_mock_function('time')
1235        test_time.expect_call().and_return(33)
1236        test_time.expect_call().and_return(34.01)
1237        test_time.expect_call().and_return(34.99)
1238        test_time.expect_call().and_return(35.01)
1239        def test_callback():
1240            test_callback.calls += 1
1241        test_callback.calls = 0
1242        delay_task = monitor_db.DelayedCallTask(
1243                delay_seconds=2, callback=test_callback,
1244                now_func=test_time)  # time 33
1245        self.assertEqual(35, delay_task.end_time)
1246        agent = monitor_db.Agent([delay_task], num_processes=0)
1247        self.assert_(not agent.active_task)
1248        agent.tick()  # activates the task and polls it once, time 34.01
1249        self.assertEqual(0, test_callback.calls, "callback called early")
1250        agent.tick()  # time 34.99
1251        self.assertEqual(0, test_callback.calls, "callback called early")
1252        agent.tick()  # time 35.01
1253        self.assertEqual(1, test_callback.calls)
1254        self.assert_(agent.is_done())
1255        self.assert_(delay_task.is_done())
1256        self.assert_(delay_task.success)
1257        self.assert_(not delay_task.aborted)
1258        self.god.check_playback()
1259
1260
1261    def test_delayed_call_abort(self):
1262        delay_task = monitor_db.DelayedCallTask(
1263                delay_seconds=987654, callback=lambda : None)
1264        agent = monitor_db.Agent([delay_task], num_processes=0)
1265        agent.abort()
1266        agent.tick()
1267        self.assert_(agent.is_done())
1268        self.assert_(delay_task.aborted)
1269        self.assert_(delay_task.is_done())
1270        self.assert_(not delay_task.success)
1271        self.god.check_playback()
1272
1273
1274
1275class AgentTasksTest(BaseSchedulerTest):
1276    ABSPATH_BASE = '/abspath/'
1277    HOSTNAME = 'myhost'
1278    BASE_TASK_DIR = 'hosts/%s/' % HOSTNAME
1279    RESULTS_DIR = '/results/dir'
1280    DUMMY_PROCESS = object()
1281    HOST_PROTECTION = host_protections.default
1282    PIDFILE_ID = object()
1283    JOB_OWNER = 'test_owner'
1284    JOB_NAME = 'test_job_name'
1285    JOB_AUTOSERV_PARAMS = set(['-u', JOB_OWNER, '-l', JOB_NAME])
1286
1287    def setUp(self):
1288        super(AgentTasksTest, self).setUp()
1289        self.god = mock.mock_god()
1290        self.god.stub_with(drone_manager.DroneManager, 'get_temporary_path',
1291                           mock.mock_function('get_temporary_path',
1292                                              default_return_val='tempdir'))
1293        self.god.stub_function(drone_manager.DroneManager,
1294                               'copy_results_on_drone')
1295        self.god.stub_function(drone_manager.DroneManager,
1296                               'copy_to_results_repository')
1297        self.god.stub_function(drone_manager.DroneManager,
1298                               'get_pidfile_id_from')
1299
1300        def dummy_absolute_path(drone_manager_self, path):
1301            return self.ABSPATH_BASE + path
1302        self.god.stub_with(drone_manager.DroneManager, 'absolute_path',
1303                           dummy_absolute_path)
1304
1305        self.god.stub_class_method(monitor_db.PidfileRunMonitor, 'run')
1306        self.god.stub_class_method(monitor_db.PidfileRunMonitor, 'exit_code')
1307        self.god.stub_class_method(monitor_db.PidfileRunMonitor, 'kill')
1308        self.god.stub_class_method(monitor_db.PidfileRunMonitor, 'get_process')
1309        def mock_has_process(unused):
1310            return True
1311        self.god.stub_with(monitor_db.PidfileRunMonitor, 'has_process',
1312                           mock_has_process)
1313
1314        self.host = self.god.create_mock_class(monitor_db.Host, 'host')
1315        self.host.id = 1
1316        self.host.hostname = self.HOSTNAME
1317        self.host.protection = self.HOST_PROTECTION
1318
1319        # For this (and other model creations), we must create the entry this
1320        # way; otherwise, if an entry matching the id already exists, Django 1.0
1321        # will raise an exception complaining about a duplicate primary key.
1322        # This way, Django performs an UPDATE query if an entry matching the id
1323        # already exists.
1324        host = models.Host(id=self.host.id, hostname=self.host.hostname,
1325                           protection=self.host.protection)
1326        host.save()
1327
1328        self.job = self.god.create_mock_class(monitor_db.Job, 'job')
1329        self.job.owner = self.JOB_OWNER
1330        self.job.name = self.JOB_NAME
1331        self.job.id = 1337
1332        self.job.tag = lambda: 'fake-job-tag'
1333        job = models.Job(id=self.job.id, owner=self.job.owner,
1334                         name=self.job.name, created_on=datetime.datetime.now())
1335        job.save()
1336
1337        self.queue_entry = self.god.create_mock_class(
1338            monitor_db.HostQueueEntry, 'queue_entry')
1339        self.queue_entry.id = 1
1340        self.queue_entry.job = self.job
1341        self.queue_entry.host = self.host
1342        self.queue_entry.meta_host = None
1343        queue_entry = models.HostQueueEntry(id=self.queue_entry.id, job=job,
1344                                            host=host, meta_host=None)
1345        queue_entry.save()
1346
1347        self._dispatcher = self.god.create_mock_class(monitor_db.Dispatcher,
1348                                                      'dispatcher')
1349
1350
1351    def tearDown(self):
1352        super(AgentTasksTest, self).tearDown()
1353        self.god.unstub_all()
1354
1355
1356    def run_task(self, task, success):
1357        """
1358        Do essentially what an Agent would do, but protect againt
1359        infinite looping from test errors.
1360        """
1361        if not getattr(task, 'agent', None):
1362            task.agent = object()
1363        count = 0
1364        while not task.is_done():
1365            count += 1
1366            if count > 10:
1367                print 'Task failed to finish'
1368                # in case the playback has clues to why it
1369                # failed
1370                self.god.check_playback()
1371                self.fail()
1372            task.poll()
1373        self.assertEquals(task.success, success)
1374
1375
1376    def setup_run_monitor(self, exit_status, task_tag, copy_log_file=True,
1377                          aborted=False):
1378        monitor_db.PidfileRunMonitor.run.expect_call(
1379            mock.is_instance_comparator(list),
1380            self.BASE_TASK_DIR + task_tag,
1381            nice_level=monitor_db.AUTOSERV_NICE_LEVEL,
1382            log_file=mock.anything_comparator(),
1383            pidfile_name=monitor_db._AUTOSERV_PID_FILE,
1384            paired_with_pidfile=None)
1385        monitor_db.PidfileRunMonitor.exit_code.expect_call()
1386        if aborted:
1387            monitor_db.PidfileRunMonitor.kill.expect_call()
1388        else:
1389            monitor_db.PidfileRunMonitor.exit_code.expect_call().and_return(
1390                    exit_status)
1391
1392        if copy_log_file:
1393            self._setup_move_logfile()
1394
1395
1396    def _setup_move_logfile(self, copy_on_drone=False,
1397                            include_destination=False):
1398        monitor_db.PidfileRunMonitor.get_process.expect_call().and_return(
1399            self.DUMMY_PROCESS)
1400        if copy_on_drone:
1401            self.queue_entry.execution_path.expect_call().and_return('tag')
1402            drone_manager.DroneManager.copy_results_on_drone.expect_call(
1403                self.DUMMY_PROCESS, source_path=mock.is_string_comparator(),
1404                destination_path=mock.is_string_comparator())
1405        elif include_destination:
1406            drone_manager.DroneManager.copy_to_results_repository.expect_call(
1407                self.DUMMY_PROCESS, mock.is_string_comparator(),
1408                destination_path=mock.is_string_comparator())
1409        else:
1410            drone_manager.DroneManager.copy_to_results_repository.expect_call(
1411                self.DUMMY_PROCESS, mock.is_string_comparator())
1412
1413
1414    def _test_repair_task_helper(self, success, task_tag, queue_entry=None):
1415        self.host.set_status.expect_call('Repairing')
1416        if success:
1417            self.setup_run_monitor(0, task_tag)
1418            self.host.set_status.expect_call('Ready')
1419        else:
1420            self.setup_run_monitor(1, task_tag)
1421            self.host.set_status.expect_call('Repair Failed')
1422
1423        task = monitor_db.RepairTask(self.host, queue_entry=queue_entry)
1424        self.assertEquals(task.failure_tasks, [])
1425        self.run_task(task, success)
1426
1427        expected_protection = host_protections.Protection.get_string(
1428            host_protections.default)
1429        expected_protection = host_protections.Protection.get_attr_name(
1430            expected_protection)
1431
1432        self.assertTrue(set(task.cmd) >=
1433                        set([monitor_db._autoserv_path, '-p', '-R', '-m',
1434                             self.HOSTNAME, '-r',
1435                             drone_manager.WORKING_DIRECTORY,
1436                             '--host-protection', expected_protection]))
1437        self.god.check_playback()
1438
1439
1440    def test_repair_task(self):
1441        self._test_repair_task_helper(True, '1-repair')
1442        self._test_repair_task_helper(False, '2-repair')
1443
1444
1445    def test_repair_task_with_hqe_already_requeued(self):
1446        # during recovery, a RepairTask can be passed a queue entry that has
1447        # already been requeued.  ensure it leaves the HQE alone in that case.
1448        self.queue_entry.meta_host = 1
1449        self.queue_entry.host = None
1450        self._test_repair_task_helper(False, '1-repair',
1451                                      queue_entry=self.queue_entry)
1452
1453
1454    def test_recovery_repair_task_working_directory(self):
1455        # ensure that a RepairTask recovering an existing SpecialTask picks up
1456        # the working directory immediately
1457        class MockSpecialTask(object):
1458            def execution_path(self):
1459                return '/my/path'
1460
1461        special_task = MockSpecialTask()
1462        task = monitor_db.RepairTask(self.host, task=special_task)
1463
1464        self.assertEquals(task._working_directory, '/my/path')
1465
1466
1467    def test_repair_task_aborted(self):
1468        self.host.set_status.expect_call('Repairing')
1469        self.setup_run_monitor(0, '1-repair', aborted=True)
1470
1471        task = monitor_db.RepairTask(self.host)
1472        task.agent = object()
1473        task.poll()
1474        task.abort()
1475
1476        self.assertTrue(task.done)
1477        self.assertTrue(task.aborted)
1478        self.assertTrue(task.task.is_complete)
1479        self.assertFalse(task.task.is_active)
1480        self.god.check_playback()
1481
1482
1483    def _test_repair_task_with_queue_entry_helper(self, parse_failed_repair,
1484                                                  task_tag):
1485        self.god.stub_class(monitor_db, 'FinalReparseTask')
1486        self.god.stub_class(monitor_db, 'Agent')
1487        self.god.stub_class_method(monitor_db.TaskWithJobKeyvals,
1488                                   '_write_keyval_after_job')
1489        agent = DummyAgent()
1490        agent.dispatcher = self._dispatcher
1491
1492        self.host.set_status.expect_call('Repairing')
1493        self.queue_entry.requeue.expect_call()
1494        self.setup_run_monitor(1, task_tag)
1495        self.host.set_status.expect_call('Repair Failed')
1496        self.queue_entry.update_from_database.expect_call()
1497        self.queue_entry.set_execution_subdir.expect_call()
1498        monitor_db.TaskWithJobKeyvals._write_keyval_after_job.expect_call(
1499            'job_queued', mock.is_instance_comparator(int))
1500        monitor_db.TaskWithJobKeyvals._write_keyval_after_job.expect_call(
1501            'job_finished', mock.is_instance_comparator(int))
1502        self._setup_move_logfile(copy_on_drone=True)
1503        self.queue_entry.execution_path.expect_call().and_return('tag')
1504        self._setup_move_logfile()
1505        self.job.parse_failed_repair = parse_failed_repair
1506        if parse_failed_repair:
1507            reparse_task = monitor_db.FinalReparseTask.expect_new(
1508                [self.queue_entry])
1509            reparse_agent = monitor_db.Agent.expect_new([reparse_task],
1510                                                        num_processes=0)
1511            self._dispatcher.add_agent.expect_call(reparse_agent)
1512        self.queue_entry.handle_host_failure.expect_call()
1513
1514        task = monitor_db.RepairTask(self.host, self.queue_entry)
1515        task.agent = agent
1516        self.queue_entry.status = 'Queued'
1517        self.job.created_on = datetime.datetime(2009, 1, 1)
1518        self.run_task(task, False)
1519        self.assertTrue(set(task.cmd) >= self.JOB_AUTOSERV_PARAMS)
1520        self.god.check_playback()
1521
1522
1523    def test_repair_task_with_queue_entry(self):
1524        self._test_repair_task_with_queue_entry_helper(True, '1-repair')
1525        self._test_repair_task_with_queue_entry_helper(False, '2-repair')
1526
1527
1528    def setup_verify_expects(self, success, use_queue_entry, task_tag):
1529        if use_queue_entry:
1530            self.queue_entry.set_status.expect_call('Verifying')
1531        self.host.set_status.expect_call('Verifying')
1532        if success:
1533            self.setup_run_monitor(0, task_tag)
1534            self.host.set_status.expect_call('Ready')
1535        else:
1536            self.setup_run_monitor(1, task_tag)
1537            if use_queue_entry and not self.queue_entry.meta_host:
1538                self.queue_entry.set_execution_subdir.expect_call()
1539                self.queue_entry.execution_path.expect_call().and_return('tag')
1540                self._setup_move_logfile(include_destination=True)
1541
1542
1543    def _check_verify_failure_tasks(self, verify_task):
1544        self.assertEquals(len(verify_task.failure_tasks), 1)
1545        repair_task = verify_task.failure_tasks[0]
1546        self.assert_(isinstance(repair_task, monitor_db.RepairTask))
1547        self.assertEquals(verify_task.host, repair_task.host)
1548        if verify_task.queue_entry:
1549            self.assertEquals(repair_task.queue_entry,
1550                              verify_task.queue_entry)
1551        else:
1552            self.assertEquals(repair_task.queue_entry, None)
1553
1554
1555    def _test_verify_task_helper(self, success, task_tag, use_queue_entry=False,
1556                                 use_meta_host=False):
1557        self.setup_verify_expects(success, use_queue_entry, task_tag)
1558
1559        if use_queue_entry:
1560            task = monitor_db.VerifyTask(queue_entry=self.queue_entry)
1561        else:
1562            task = monitor_db.VerifyTask(host=self.host)
1563        self._check_verify_failure_tasks(task)
1564        self.run_task(task, success)
1565        self.assertTrue(set(task.cmd) >=
1566                        set([monitor_db._autoserv_path, '-p', '-v', '-m',
1567                             self.HOSTNAME, '-r',
1568                             drone_manager.WORKING_DIRECTORY]))
1569        if use_queue_entry:
1570            self.assertTrue(set(task.cmd) >= self.JOB_AUTOSERV_PARAMS)
1571        self.god.check_playback()
1572
1573
1574    def test_verify_task_with_host(self):
1575        self._test_verify_task_helper(True, '1-verify')
1576        self._test_verify_task_helper(False, '2-verify')
1577
1578
1579    def test_verify_task_with_queue_entry(self):
1580        self._test_verify_task_helper(True, '1-verify', use_queue_entry=True)
1581        self._test_verify_task_helper(False, '2-verify', use_queue_entry=True)
1582
1583
1584    def test_verify_task_with_metahost(self):
1585        self.queue_entry.meta_host = 1
1586        self.test_verify_task_with_queue_entry()
1587
1588
1589    def test_specialtask_abort_before_prolog(self):
1590        task = monitor_db.RepairTask(self.host)
1591        task.abort()
1592        self.assertTrue(task.aborted)
1593
1594
1595    def _setup_post_job_task_expects(self, autoserv_success, hqe_status=None,
1596                                     hqe_aborted=False):
1597        self.queue_entry.execution_path.expect_call().and_return('tag')
1598        self.pidfile_monitor = monitor_db.PidfileRunMonitor.expect_new()
1599        self.pidfile_monitor.pidfile_id = self.PIDFILE_ID
1600        self.pidfile_monitor.attach_to_existing_process.expect_call('tag')
1601        if autoserv_success:
1602            code = 0
1603        else:
1604            code = 1
1605        self.queue_entry.update_from_database.expect_call()
1606        self.queue_entry.aborted = hqe_aborted
1607        if not hqe_aborted:
1608            self.pidfile_monitor.exit_code.expect_call().and_return(code)
1609
1610        if hqe_status:
1611            self.queue_entry.set_status.expect_call(hqe_status)
1612
1613
1614    def _setup_pre_parse_expects(self, autoserv_success):
1615        self._setup_post_job_task_expects(autoserv_success, 'Parsing')
1616
1617
1618    def _setup_post_parse_expects(self, autoserv_success):
1619        if autoserv_success:
1620            status = 'Completed'
1621        else:
1622            status = 'Failed'
1623        self.queue_entry.set_status.expect_call(status)
1624
1625
1626    def _expect_execute_run_monitor(self):
1627        self.monitor.exit_code.expect_call()
1628        self.monitor.exit_code.expect_call().and_return(0)
1629        self._expect_copy_results()
1630
1631
1632    def _setup_post_job_run_monitor(self, pidfile_name):
1633        self.pidfile_monitor.has_process.expect_call().and_return(True)
1634        autoserv_pidfile_id = object()
1635        self.monitor = monitor_db.PidfileRunMonitor.expect_new()
1636        self.monitor.run.expect_call(
1637            mock.is_instance_comparator(list),
1638            'tag',
1639            nice_level=monitor_db.AUTOSERV_NICE_LEVEL,
1640            log_file=mock.anything_comparator(),
1641            pidfile_name=pidfile_name,
1642            paired_with_pidfile=self.PIDFILE_ID)
1643        self._expect_execute_run_monitor()
1644
1645
1646    def _expect_copy_results(self, monitor=None, queue_entry=None):
1647        if monitor is None:
1648            monitor = self.monitor
1649        monitor.has_process.expect_call().and_return(True)
1650        if queue_entry:
1651            queue_entry.execution_path.expect_call().and_return('tag')
1652        monitor.get_process.expect_call().and_return(self.DUMMY_PROCESS)
1653        drone_manager.DroneManager.copy_to_results_repository.expect_call(
1654                self.DUMMY_PROCESS, mock.is_string_comparator())
1655
1656
1657    def _test_final_reparse_task_helper(self, autoserv_success=True):
1658        self._setup_pre_parse_expects(autoserv_success)
1659        self._setup_post_job_run_monitor(monitor_db._PARSER_PID_FILE)
1660        self._setup_post_parse_expects(autoserv_success)
1661
1662        task = monitor_db.FinalReparseTask([self.queue_entry])
1663        self.run_task(task, True)
1664
1665        self.god.check_playback()
1666        cmd = [monitor_db._parser_path, '--write-pidfile', '-l', '2', '-r',
1667               '-o', '-P', '/abspath/tag']
1668        self.assertEquals(task.cmd, cmd)
1669
1670
1671    def test_final_reparse_task(self):
1672        self.god.stub_class(monitor_db, 'PidfileRunMonitor')
1673        self._test_final_reparse_task_helper()
1674        self._test_final_reparse_task_helper(autoserv_success=False)
1675
1676
1677    def test_final_reparse_throttling(self):
1678        self.god.stub_class(monitor_db, 'PidfileRunMonitor')
1679        self.god.stub_function(monitor_db.FinalReparseTask,
1680                               '_can_run_new_parse')
1681
1682        self._setup_pre_parse_expects(True)
1683        monitor_db.FinalReparseTask._can_run_new_parse.expect_call().and_return(
1684            False)
1685        monitor_db.FinalReparseTask._can_run_new_parse.expect_call().and_return(
1686            True)
1687        self._setup_post_job_run_monitor(monitor_db._PARSER_PID_FILE)
1688        self._setup_post_parse_expects(True)
1689
1690        task = monitor_db.FinalReparseTask([self.queue_entry])
1691        self.run_task(task, True)
1692        self.god.check_playback()
1693
1694
1695    def test_final_reparse_recovery(self):
1696        self.god.stub_class(monitor_db, 'PidfileRunMonitor')
1697        self.monitor = self.god.create_mock_class(monitor_db.PidfileRunMonitor,
1698                                                  'run_monitor')
1699        self._setup_post_job_task_expects(True)
1700        self._expect_execute_run_monitor()
1701        self._setup_post_parse_expects(True)
1702
1703        task = monitor_db.FinalReparseTask([self.queue_entry],
1704                                           recover_run_monitor=self.monitor)
1705        self.run_task(task, True)
1706        self.god.check_playback()
1707
1708
1709    def _setup_gather_logs_expects(self, autoserv_killed=True,
1710                                   hqe_aborted=False):
1711        self.god.stub_class(monitor_db, 'PidfileRunMonitor')
1712        self.god.stub_class(monitor_db, 'FinalReparseTask')
1713        self._setup_post_job_task_expects(not autoserv_killed, 'Gathering',
1714                                          hqe_aborted)
1715        if hqe_aborted:
1716            exit_code = None
1717        elif autoserv_killed:
1718            exit_code = 271
1719        else:
1720            exit_code = 0
1721        self.pidfile_monitor.exit_code.expect_call().and_return(exit_code)
1722        if exit_code != 0:
1723            self._setup_post_job_run_monitor('.collect_crashinfo_execute')
1724        self.pidfile_monitor.has_process.expect_call().and_return(True)
1725        self._expect_copy_results(monitor=self.pidfile_monitor,
1726                                  queue_entry=self.queue_entry)
1727        parse_task = monitor_db.FinalReparseTask.expect_new([self.queue_entry])
1728        _set_host_and_qe_ids(parse_task)
1729        self._dispatcher.add_agent.expect_call(IsAgentWithTask(parse_task))
1730
1731        self.pidfile_monitor.num_tests_failed.expect_call().and_return(0)
1732
1733
1734    def _run_gather_logs_task(self):
1735        task = monitor_db.GatherLogsTask(self.job, [self.queue_entry])
1736        task.agent = DummyAgent()
1737        task.agent.dispatcher = self._dispatcher
1738        self.run_task(task, True)
1739        self.god.check_playback()
1740
1741
1742    def test_gather_logs_task(self):
1743        self._setup_gather_logs_expects()
1744        # no rebooting for this basic test
1745        self.job.reboot_after = models.RebootAfter.NEVER
1746        self.host.set_status.expect_call('Ready')
1747
1748        self._run_gather_logs_task()
1749
1750
1751    def test_gather_logs_task_successful_autoserv(self):
1752        # When Autoserv exits successfully, no collect_crashinfo stage runs
1753        self._setup_gather_logs_expects(autoserv_killed=False)
1754        self.job.reboot_after = models.RebootAfter.NEVER
1755        self.host.set_status.expect_call('Ready')
1756
1757        self._run_gather_logs_task()
1758
1759
1760    def _setup_gather_task_cleanup_expects(self):
1761        self.god.stub_class(monitor_db, 'CleanupTask')
1762        cleanup_task = monitor_db.CleanupTask.expect_new(host=self.host)
1763        _set_host_and_qe_ids(cleanup_task)
1764        self._dispatcher.add_agent.expect_call(IsAgentWithTask(cleanup_task))
1765
1766
1767    def test_gather_logs_reboot_hosts(self):
1768        self._setup_gather_logs_expects()
1769        self.job.reboot_after = models.RebootAfter.ALWAYS
1770        self._setup_gather_task_cleanup_expects()
1771
1772        self._run_gather_logs_task()
1773
1774
1775    def test_gather_logs_reboot_on_abort(self):
1776        self._setup_gather_logs_expects(hqe_aborted=True)
1777        self.job.reboot_after = models.RebootAfter.NEVER
1778        self._setup_gather_task_cleanup_expects()
1779
1780        self._run_gather_logs_task()
1781
1782
1783    def _test_cleanup_task_helper(self, success, task_tag,
1784                                  use_queue_entry=False):
1785        if use_queue_entry:
1786            self.queue_entry.get_host.expect_call().and_return(self.host)
1787        self.host.set_status.expect_call('Cleaning')
1788        if success:
1789            self.setup_run_monitor(0, task_tag)
1790            self.host.set_status.expect_call('Ready')
1791            self.host.update_field.expect_call('dirty', 0)
1792        else:
1793            self.setup_run_monitor(1, task_tag)
1794            if use_queue_entry and not self.queue_entry.meta_host:
1795                self.queue_entry.set_execution_subdir.expect_call()
1796                self.queue_entry.execution_path.expect_call().and_return('tag')
1797                self._setup_move_logfile(include_destination=True)
1798
1799        if use_queue_entry:
1800            task = monitor_db.CleanupTask(queue_entry=self.queue_entry)
1801        else:
1802            task = monitor_db.CleanupTask(host=self.host)
1803        self.assertEquals(len(task.failure_tasks), 1)
1804        repair_task = task.failure_tasks[0]
1805        self.assert_(isinstance(repair_task, monitor_db.RepairTask))
1806        if use_queue_entry:
1807            self.assertEquals(repair_task.queue_entry, self.queue_entry)
1808
1809        self.run_task(task, success)
1810
1811        self.god.check_playback()
1812        self.assert_(set(task.cmd) >=
1813                     set([monitor_db._autoserv_path, '-p', '--cleanup', '-m',
1814                          self.HOSTNAME, '-r',
1815                          drone_manager.WORKING_DIRECTORY]))
1816        if use_queue_entry:
1817            self.assertTrue(set(task.cmd) >= self.JOB_AUTOSERV_PARAMS)
1818
1819    def test_cleanup_task(self):
1820        self._test_cleanup_task_helper(True, '1-cleanup')
1821        self._test_cleanup_task_helper(False, '2-cleanup')
1822
1823
1824    def test_cleanup_task_with_queue_entry(self):
1825        self._test_cleanup_task_helper(False, '1-cleanup', True)
1826
1827
1828    def test_recovery_queue_task_aborted_early(self):
1829        # abort a recovery QueueTask right after it's created
1830        self.god.stub_class_method(monitor_db.QueueTask, '_log_abort')
1831        self.god.stub_class_method(monitor_db.QueueTask, '_finish_task')
1832        run_monitor = self.god.create_mock_class(monitor_db.PidfileRunMonitor,
1833                                                 'run_monitor')
1834
1835        self.queue_entry.execution_path.expect_call().and_return('tag')
1836        run_monitor.kill.expect_call()
1837        run_monitor.has_process.expect_call().and_return(True)
1838        monitor_db.QueueTask._log_abort.expect_call()
1839        monitor_db.QueueTask._finish_task.expect_call()
1840
1841        task = monitor_db.QueueTask(self.job, [self.queue_entry],
1842                                    recover_run_monitor=run_monitor)
1843        task.abort()
1844        self.assert_(task.aborted)
1845        self.god.check_playback()
1846
1847
1848class HostTest(BaseSchedulerTest):
1849    def test_cmp_for_sort(self):
1850        expected_order = [
1851                'alice', 'Host1', 'host2', 'host3', 'host09', 'HOST010',
1852                'host10', 'host11', 'yolkfolk']
1853        hostname_idx = list(monitor_db.Host._fields).index('hostname')
1854        row = [None] * len(monitor_db.Host._fields)
1855        hosts = []
1856        for hostname in expected_order:
1857            row[hostname_idx] = hostname
1858            hosts.append(monitor_db.Host(row=row, new_record=True))
1859
1860        host1 = hosts[expected_order.index('Host1')]
1861        host010 = hosts[expected_order.index('HOST010')]
1862        host10 = hosts[expected_order.index('host10')]
1863        host3 = hosts[expected_order.index('host3')]
1864        alice = hosts[expected_order.index('alice')]
1865        self.assertEqual(0, monitor_db.Host.cmp_for_sort(host10, host10))
1866        self.assertEqual(1, monitor_db.Host.cmp_for_sort(host10, host010))
1867        self.assertEqual(-1, monitor_db.Host.cmp_for_sort(host010, host10))
1868        self.assertEqual(-1, monitor_db.Host.cmp_for_sort(host1, host10))
1869        self.assertEqual(-1, monitor_db.Host.cmp_for_sort(host1, host010))
1870        self.assertEqual(-1, monitor_db.Host.cmp_for_sort(host3, host10))
1871        self.assertEqual(-1, monitor_db.Host.cmp_for_sort(host3, host010))
1872        self.assertEqual(1, monitor_db.Host.cmp_for_sort(host3, host1))
1873        self.assertEqual(-1, monitor_db.Host.cmp_for_sort(host1, host3))
1874        self.assertEqual(-1, monitor_db.Host.cmp_for_sort(alice, host3))
1875        self.assertEqual(1, monitor_db.Host.cmp_for_sort(host3, alice))
1876        self.assertEqual(0, monitor_db.Host.cmp_for_sort(alice, alice))
1877
1878        hosts.sort(cmp=monitor_db.Host.cmp_for_sort)
1879        self.assertEqual(expected_order, [h.hostname for h in hosts])
1880
1881        hosts.reverse()
1882        hosts.sort(cmp=monitor_db.Host.cmp_for_sort)
1883        self.assertEqual(expected_order, [h.hostname for h in hosts])
1884
1885
1886class HostQueueEntryTest(BaseSchedulerTest):
1887    def _create_hqe(self, dependency_labels=(), **create_job_kwargs):
1888        job = self._create_job(**create_job_kwargs)
1889        for label in dependency_labels:
1890            job.dependency_labels.add(label)
1891        hqes = list(monitor_db.HostQueueEntry.fetch(where='job_id=%d' % job.id))
1892        self.assertEqual(1, len(hqes))
1893        return hqes[0]
1894
1895
1896    def _check_hqe_labels(self, hqe, expected_labels):
1897        expected_labels = set(expected_labels)
1898        label_names = set(label.name for label in hqe.get_labels())
1899        self.assertEqual(expected_labels, label_names)
1900
1901
1902    def test_get_labels_empty(self):
1903        hqe = self._create_hqe(hosts=[1])
1904        labels = list(hqe.get_labels())
1905        self.assertEqual([], labels)
1906
1907
1908    def test_get_labels_metahost(self):
1909        hqe = self._create_hqe(metahosts=[2])
1910        self._check_hqe_labels(hqe, ['label2'])
1911
1912
1913    def test_get_labels_dependancies(self):
1914        hqe = self._create_hqe(dependency_labels=(self.label3, self.label4),
1915                               metahosts=[1])
1916        self._check_hqe_labels(hqe, ['label1', 'label3', 'label4'])
1917
1918
1919class JobTest(BaseSchedulerTest):
1920    def setUp(self):
1921        super(JobTest, self).setUp()
1922        self.god.stub_with(
1923            drone_manager.DroneManager, 'attach_file_to_execution',
1924            mock.mock_function('attach_file_to_execution',
1925                               default_return_val='/test/path/tmp/foo'))
1926
1927
1928    def _test_pre_job_tasks_helper(self):
1929        """
1930        Calls HQE._do_pre_run_job_tasks() and returns the task list after
1931        confirming that the last task is the SetEntryPendingTask.
1932        """
1933        queue_entry = monitor_db.HostQueueEntry.fetch('id = 1').next()
1934        pre_job_agent = queue_entry._do_run_pre_job_tasks()
1935        self.assert_(isinstance(pre_job_agent, monitor_db.Agent))
1936        pre_job_tasks = list(pre_job_agent.queue.queue)
1937        self.assertTrue(isinstance(pre_job_tasks[-1],
1938                                   monitor_db.SetEntryPendingTask))
1939
1940        return pre_job_tasks
1941
1942
1943    def _test_run_helper(self, expect_agent=True, expect_starting=False,
1944                         expect_pending=False):
1945        if expect_starting:
1946            expected_status = models.HostQueueEntry.Status.STARTING
1947        elif expect_pending:
1948            expected_status = models.HostQueueEntry.Status.PENDING
1949        else:
1950            expected_status = models.HostQueueEntry.Status.VERIFYING
1951        job = monitor_db.Job.fetch('id = 1').next()
1952        queue_entry = monitor_db.HostQueueEntry.fetch('id = 1').next()
1953        assert queue_entry.job is job
1954        agent = job.run_if_ready(queue_entry)
1955
1956        self.god.check_playback()
1957        actual_status = models.HostQueueEntry.smart_get(1).status
1958        self.assertEquals(expected_status, actual_status)
1959
1960        if not expect_agent:
1961            self.assertEquals(agent, None)
1962            return
1963
1964        self.assert_(isinstance(agent, monitor_db.Agent))
1965        tasks = list(agent.queue.queue)
1966        return tasks
1967
1968
1969    def _check_verify_task(self, verify_task):
1970        self.assert_(isinstance(verify_task, monitor_db.VerifyTask))
1971        self.assertEquals(verify_task.queue_entry.id, 1)
1972
1973
1974    def _check_pending_task(self, pending_task):
1975        self.assert_(isinstance(pending_task, monitor_db.SetEntryPendingTask))
1976        self.assertEquals(pending_task._queue_entry.id, 1)
1977
1978
1979    def test_run_if_ready_delays(self):
1980        # Also tests Job.run_with_ready_delay() on atomic group jobs.
1981        django_job = self._create_job(hosts=[5, 6], atomic_group=1)
1982        job = monitor_db.Job(django_job.id)
1983        self.assertEqual(1, job.synch_count)
1984        django_hqes = list(models.HostQueueEntry.objects.filter(job=job.id))
1985        self.assertEqual(2, len(django_hqes))
1986        self.assertEqual(2, django_hqes[0].atomic_group.max_number_of_machines)
1987
1988        def set_hqe_status(django_hqe, status):
1989            django_hqe.status = status
1990            django_hqe.save()
1991            monitor_db.HostQueueEntry(django_hqe.id).host.set_status(status)
1992
1993        # An initial state, our synch_count is 1
1994        set_hqe_status(django_hqes[0], models.HostQueueEntry.Status.VERIFYING)
1995        set_hqe_status(django_hqes[1], models.HostQueueEntry.Status.PENDING)
1996
1997        # So that we don't depend on the config file value during the test.
1998        self.assert_(scheduler_config.config
1999                     .secs_to_wait_for_atomic_group_hosts is not None)
2000        self.god.stub_with(scheduler_config.config,
2001                           'secs_to_wait_for_atomic_group_hosts', 123456)
2002
2003        # Get the pending one as a monitor_db.HostQueueEntry object.
2004        pending_hqe = monitor_db.HostQueueEntry(django_hqes[1].id)
2005        self.assert_(not job._delay_ready_task)
2006        self.assertTrue(job.is_ready())
2007
2008        # Ready with one pending, one verifying and an atomic group should
2009        # result in a DelayCallTask to re-check if we're ready a while later.
2010        agent = job.run_if_ready(pending_hqe)
2011        self.assert_(job._delay_ready_task)
2012        self.assert_(isinstance(agent, monitor_db.Agent))
2013        tasks = list(agent.queue.queue)
2014        self.assertEqual(1, len(tasks))
2015        self.assert_(isinstance(tasks[0], monitor_db.DelayedCallTask))
2016        delay_task = tasks[0]
2017        self.assert_(not delay_task.is_done())
2018
2019        self.god.stub_function(job, 'run')
2020
2021        # Test that the DelayedCallTask's callback queued up above does the
2022        # correct thing and returns the Agent returned by job.run().
2023        job.run.expect_call(pending_hqe).and_return('Fake Agent')
2024        self.assertEqual('Fake Agent', delay_task._callback())
2025
2026        # A delay already exists, this must do nothing.
2027        self.assertEqual(None, job.run_with_ready_delay(pending_hqe))
2028
2029        # Adjust the delay deadline so that enough time has passed.
2030        job._delay_ready_task.end_time = time.time() - 111111
2031        job.run.expect_call(pending_hqe).and_return('Forty two')
2032        # ...the delay_expired condition should cause us to call run()
2033        self.assertEqual('Forty two', job.run_with_ready_delay(pending_hqe))
2034
2035        # Adjust the delay deadline back so that enough time has not passed.
2036        job._delay_ready_task.end_time = time.time() + 111111
2037        self.assertEqual(None, job.run_with_ready_delay(pending_hqe))
2038
2039        set_hqe_status(django_hqes[0], models.HostQueueEntry.Status.PENDING)
2040        # Now max_number_of_machines HQEs are in pending state.  Remaining
2041        # delay will now be ignored.
2042        job.run.expect_call(pending_hqe).and_return('Watermelon')
2043        # ...the over_max_threshold test should cause us to call run()
2044        self.assertEqual('Watermelon', job.run_with_ready_delay(pending_hqe))
2045
2046        other_hqe = monitor_db.HostQueueEntry(django_hqes[0].id)
2047        self.assertTrue(pending_hqe.job is other_hqe.job)
2048        # DBObject classes should reuse instances so these should be the same.
2049        self.assertEqual(job, other_hqe.job)
2050        self.assertEqual(other_hqe.job, pending_hqe.job)
2051        # Be sure our delay was not lost during the other_hqe construction.
2052        self.assert_(job._delay_ready_task)
2053        self.assertFalse(job._delay_ready_task.is_done())
2054        self.assertFalse(job._delay_ready_task.aborted)
2055
2056        # We want the real run() to be called below.
2057        self.god.unstub(job, 'run')
2058
2059        # We pass in the other HQE this time the same way it would happen
2060        # for real when one host finishes verifying and enters pending.
2061        agent = job.run_if_ready(other_hqe)
2062
2063        # The delayed task must be aborted by the actual run() call above.
2064        self.assertTrue(job._delay_ready_task.aborted)
2065        self.assertFalse(job._delay_ready_task.success)
2066        self.assertTrue(job._delay_ready_task.is_done())
2067
2068        # Check that job run() and _finish_run() were called by the above:
2069        tasks = list(agent.queue.queue)
2070        self.assertEqual(1, len(tasks))
2071        self.assert_(isinstance(tasks[0], monitor_db.QueueTask))
2072        # Requery these hqes in order to verify the status from the DB.
2073        django_hqes = list(models.HostQueueEntry.objects.filter(job=job.id))
2074        for entry in django_hqes:
2075            self.assertEqual(models.HostQueueEntry.Status.STARTING,
2076                             entry.status)
2077
2078        # We're already running, but more calls to run_with_ready_delay can
2079        # continue to come in due to straggler hosts enter Pending.  Make
2080        # sure we don't do anything.
2081        self.assertEqual(None, job.run_with_ready_delay(pending_hqe))
2082
2083
2084    def test__atomic_and_has_started__on_atomic(self):
2085        self._create_job(hosts=[5, 6], atomic_group=1)
2086        job = monitor_db.Job.fetch('id = 1').next()
2087        self.assertFalse(job._atomic_and_has_started())
2088
2089        self._update_hqe("status='Pending'")
2090        self.assertFalse(job._atomic_and_has_started())
2091        self._update_hqe("status='Verifying'")
2092        self.assertFalse(job._atomic_and_has_started())
2093        self.assertFalse(job._atomic_and_has_started())
2094        self._update_hqe("status='Failed'")
2095        self.assertFalse(job._atomic_and_has_started())
2096        self._update_hqe("status='Stopped'")
2097        self.assertFalse(job._atomic_and_has_started())
2098
2099        self._update_hqe("status='Starting'")
2100        self.assertTrue(job._atomic_and_has_started())
2101        self._update_hqe("status='Completed'")
2102        self.assertTrue(job._atomic_and_has_started())
2103        self._update_hqe("status='Aborted'")
2104
2105
2106    def test__atomic_and_has_started__not_atomic(self):
2107        self._create_job(hosts=[1, 2])
2108        job = monitor_db.Job.fetch('id = 1').next()
2109        self.assertFalse(job._atomic_and_has_started())
2110        self._update_hqe("status='Starting'")
2111        self.assertFalse(job._atomic_and_has_started())
2112
2113
2114    def test_run_asynchronous(self):
2115        self._create_job(hosts=[1, 2])
2116
2117        tasks = self._test_pre_job_tasks_helper()
2118
2119        self.assertEquals(len(tasks), 2)
2120        verify_task, pending_task = tasks
2121        self._check_verify_task(verify_task)
2122        self._check_pending_task(pending_task)
2123
2124
2125    def test_run_asynchronous_skip_verify(self):
2126        job = self._create_job(hosts=[1, 2])
2127        job.run_verify = False
2128        job.save()
2129
2130        tasks = self._test_pre_job_tasks_helper()
2131
2132        self.assertEquals(len(tasks), 1)
2133        pending_task = tasks[0]
2134        self._check_pending_task(pending_task)
2135
2136
2137    def test_run_synchronous_verify(self):
2138        self._create_job(hosts=[1, 2], synchronous=True)
2139
2140        tasks = self._test_pre_job_tasks_helper()
2141        self.assertEquals(len(tasks), 2)
2142        verify_task, pending_task = tasks
2143        self._check_verify_task(verify_task)
2144        self._check_pending_task(pending_task)
2145
2146
2147    def test_run_synchronous_skip_verify(self):
2148        job = self._create_job(hosts=[1, 2], synchronous=True)
2149        job.run_verify = False
2150        job.save()
2151
2152        tasks = self._test_pre_job_tasks_helper()
2153        self.assertEquals(len(tasks), 1)
2154        self._check_pending_task(tasks[0])
2155
2156
2157    def test_run_synchronous_ready(self):
2158        self._create_job(hosts=[1, 2], synchronous=True)
2159        self._update_hqe("status='Pending', execution_subdir=''")
2160
2161        tasks = self._test_run_helper(expect_starting=True)
2162        self.assertEquals(len(tasks), 1)
2163        queue_task = tasks[0]
2164
2165        self.assert_(isinstance(queue_task, monitor_db.QueueTask))
2166        self.assertEquals(queue_task.job.id, 1)
2167        hqe_ids = [hqe.id for hqe in queue_task.queue_entries]
2168        self.assertEquals(hqe_ids, [1, 2])
2169
2170
2171    def test_run_atomic_group_already_started(self):
2172        self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True)
2173        self._update_hqe("status='Starting', execution_subdir=''")
2174
2175        job = monitor_db.Job.fetch('id = 1').next()
2176        queue_entry = monitor_db.HostQueueEntry.fetch('id = 1').next()
2177        assert queue_entry.job is job
2178        self.assertEqual(None, job.run(queue_entry))
2179
2180        self.god.check_playback()
2181
2182
2183    def test_run_synchronous_atomic_group_ready(self):
2184        self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True)
2185        self._update_hqe("status='Pending', execution_subdir=''")
2186
2187        tasks = self._test_run_helper(expect_starting=True)
2188        self.assertEquals(len(tasks), 1)
2189        queue_task = tasks[0]
2190
2191        self.assert_(isinstance(queue_task, monitor_db.QueueTask))
2192        # Atomic group jobs that do not depend on a specific label in the
2193        # atomic group will use the atomic group name as their group name.
2194        self.assertEquals(queue_task.group_name, 'atomic1')
2195
2196
2197    def test_run_synchronous_atomic_group_with_label_ready(self):
2198        job = self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True)
2199        job.dependency_labels.add(self.label4)
2200        self._update_hqe("status='Pending', execution_subdir=''")
2201
2202        tasks = self._test_run_helper(expect_starting=True)
2203        self.assertEquals(len(tasks), 1)
2204        queue_task = tasks[0]
2205
2206        self.assert_(isinstance(queue_task, monitor_db.QueueTask))
2207        # Atomic group jobs that also specify a label in the atomic group
2208        # will use the label name as their group name.
2209        self.assertEquals(queue_task.group_name, 'label4')
2210
2211
2212    def test_reboot_before_always(self):
2213        job = self._create_job(hosts=[1])
2214        job.reboot_before = models.RebootBefore.ALWAYS
2215        job.save()
2216
2217        tasks = self._test_pre_job_tasks_helper()
2218        self.assertEquals(len(tasks), 3)
2219        cleanup_task = tasks[0]
2220        self.assert_(isinstance(cleanup_task, monitor_db.CleanupTask))
2221        self.assertEquals(cleanup_task.host.id, 1)
2222
2223
2224    def _test_reboot_before_if_dirty_helper(self, expect_reboot):
2225        job = self._create_job(hosts=[1])
2226        job.reboot_before = models.RebootBefore.IF_DIRTY
2227        job.save()
2228
2229        tasks = self._test_pre_job_tasks_helper()
2230        self.assertEquals(len(tasks), expect_reboot and 3 or 2)
2231        if expect_reboot:
2232            cleanup_task = tasks[0]
2233            self.assert_(isinstance(cleanup_task, monitor_db.CleanupTask))
2234            self.assertEquals(cleanup_task.host.id, 1)
2235
2236
2237    def test_reboot_before_if_dirty(self):
2238        models.Host.smart_get(1).update_object(dirty=True)
2239        self._test_reboot_before_if_dirty_helper(True)
2240
2241
2242    def test_reboot_before_not_dirty(self):
2243        models.Host.smart_get(1).update_object(dirty=False)
2244        self._test_reboot_before_if_dirty_helper(False)
2245
2246
2247    def test_next_group_name(self):
2248        django_job = self._create_job(metahosts=[1])
2249        job = monitor_db.Job(id=django_job.id)
2250        self.assertEqual('group0', job._next_group_name())
2251
2252        for hqe in django_job.hostqueueentry_set.filter():
2253            hqe.execution_subdir = 'my_rack.group0'
2254            hqe.save()
2255        self.assertEqual('my_rack.group1', job._next_group_name('my/rack'))
2256
2257
2258class TopLevelFunctionsTest(unittest.TestCase):
2259    def setUp(self):
2260        self.god = mock.mock_god()
2261
2262
2263    def tearDown(self):
2264        self.god.unstub_all()
2265
2266
2267    def test_autoserv_command_line(self):
2268        machines = 'abcd12,efgh34'
2269        extra_args = ['-Z', 'hello']
2270        expected_command_line = [monitor_db._autoserv_path, '-p',
2271                                 '-m', machines, '-r',
2272                                 drone_manager.WORKING_DIRECTORY]
2273
2274        command_line = monitor_db._autoserv_command_line(machines, extra_args)
2275        self.assertEqual(expected_command_line + ['--verbose'] + extra_args,
2276                         command_line)
2277
2278        class FakeJob(object):
2279            owner = 'Bob'
2280            name = 'fake job name'
2281            id = 1337
2282
2283        class FakeHQE(object):
2284            job = FakeJob
2285
2286        command_line = monitor_db._autoserv_command_line(
2287                machines, extra_args=[], queue_entry=FakeHQE, verbose=False)
2288        self.assertEqual(expected_command_line +
2289                         ['-u', FakeJob.owner, '-l', FakeJob.name],
2290                         command_line)
2291
2292
2293if __name__ == '__main__':
2294    unittest.main()
2295