monitor_db_unittest.py revision b6d1662e18d756483d5fd81f4057cae4ef62152c
1#!/usr/bin/python
2
3import unittest, time, subprocess, os, StringIO, tempfile, datetime, shutil
4import common
5import MySQLdb
6from autotest_lib.frontend import setup_django_environment
7from autotest_lib.frontend.afe import frontend_test_utils
8from autotest_lib.client.common_lib import global_config, host_protections
9from autotest_lib.client.common_lib.test_utils import mock
10from autotest_lib.database import database_connection, migrate
11from autotest_lib.frontend import thread_local
12from autotest_lib.frontend.afe import models
13from autotest_lib.scheduler import monitor_db, drone_manager, email_manager
14from autotest_lib.scheduler import scheduler_config
15
16_DEBUG = False
17
18
19class DummyAgent(object):
20    _is_running = False
21    _is_done = False
22    num_processes = 1
23    host_ids = []
24    queue_entry_ids = []
25
26    def is_running(self):
27        return self._is_running
28
29
30    def tick(self):
31        self._is_running = True
32
33
34    def is_done(self):
35        return self._is_done
36
37
38    def set_done(self, done):
39        self._is_done = done
40        self._is_running = not done
41
42
43class IsRow(mock.argument_comparator):
44    def __init__(self, row_id):
45        self.row_id = row_id
46
47
48    def is_satisfied_by(self, parameter):
49        return list(parameter)[0] == self.row_id
50
51
52    def __str__(self):
53        return 'row with id %s' % self.row_id
54
55
56class IsAgentWithTask(mock.argument_comparator):
57        def __init__(self, task):
58            self._task = task
59
60
61        def is_satisfied_by(self, parameter):
62            if not isinstance(parameter, monitor_db.Agent):
63                return False
64            tasks = list(parameter.queue.queue)
65            if len(tasks) != 1:
66                return False
67            return tasks[0] == self._task
68
69
70def _set_host_and_qe_ids(agent_or_task, id_list=None):
71    if id_list is None:
72        id_list = []
73    agent_or_task.host_ids = agent_or_task.queue_entry_ids = id_list
74
75
76class BaseSchedulerTest(unittest.TestCase,
77                        frontend_test_utils.FrontendTestMixin):
78    _config_section = 'AUTOTEST_WEB'
79
80    def _do_query(self, sql):
81        self._database.execute(sql)
82
83
84    def _set_monitor_stubs(self):
85        # Clear the instance cache as this is a brand new database.
86        monitor_db.DBObject._clear_instance_cache()
87
88        self._database = (
89            database_connection.DatabaseConnection.get_test_database(
90                self._test_db_file))
91        self._database.connect()
92        self._database.debug = _DEBUG
93
94        monitor_db._db = self._database
95        monitor_db._drone_manager._results_dir = '/test/path'
96        monitor_db._drone_manager._temporary_directory = '/test/path/tmp'
97
98
99    def setUp(self):
100        self._frontend_common_setup()
101        self._set_monitor_stubs()
102        self._dispatcher = monitor_db.Dispatcher()
103
104
105    def tearDown(self):
106        self._database.disconnect()
107        self._frontend_common_teardown()
108
109
110    def _update_hqe(self, set, where=''):
111        query = 'UPDATE host_queue_entries SET ' + set
112        if where:
113            query += ' WHERE ' + where
114        self._do_query(query)
115
116
117class DBObjectTest(BaseSchedulerTest):
118    # It may seem odd to subclass BaseSchedulerTest for this but it saves us
119    # duplicating some setup work for what we want to test.
120
121
122    def test_compare_fields_in_row(self):
123        host = monitor_db.Host(id=1)
124        fields = list(host._fields)
125        row_data = [getattr(host, fieldname) for fieldname in fields]
126        self.assertEqual({}, host._compare_fields_in_row(row_data))
127        row_data[fields.index('hostname')] = 'spam'
128        self.assertEqual({'hostname': ('host1', 'spam')},
129                         host._compare_fields_in_row(row_data))
130        row_data[fields.index('id')] = 23
131        self.assertEqual({'hostname': ('host1', 'spam'), 'id': (1, 23)},
132                         host._compare_fields_in_row(row_data))
133
134
135    def test_always_query(self):
136        host_a = monitor_db.Host(id=2)
137        self.assertEqual(host_a.hostname, 'host2')
138        self._do_query('UPDATE hosts SET hostname="host2-updated" WHERE id=2')
139        host_b = monitor_db.Host(id=2, always_query=True)
140        self.assert_(host_a is host_b, 'Cached instance not returned.')
141        self.assertEqual(host_a.hostname, 'host2-updated',
142                         'Database was not re-queried')
143
144        # If either of these are called, a query was made when it shouldn't be.
145        host_a._compare_fields_in_row = lambda _: self.fail('eek! a query!')
146        host_a._update_fields_from_row = host_a._compare_fields_in_row
147        host_c = monitor_db.Host(id=2, always_query=False)
148        self.assert_(host_a is host_c, 'Cached instance not returned')
149
150
151    def test_delete(self):
152        host = monitor_db.Host(id=3)
153        host.delete()
154        host = self.assertRaises(monitor_db.DBError, monitor_db.Host, id=3,
155                                 always_query=False)
156        host = self.assertRaises(monitor_db.DBError, monitor_db.Host, id=3,
157                                 always_query=True)
158
159    def test_save(self):
160        # Dummy Job to avoid creating a one in the HostQueueEntry __init__.
161        class MockJob(object):
162            def __init__(self, id):
163                pass
164            def tag(self):
165                return 'MockJob'
166        self.god.stub_with(monitor_db, 'Job', MockJob)
167        hqe = monitor_db.HostQueueEntry(
168                new_record=True,
169                row=[0, 1, 2, 'Queued', None, 0, 0, 0, '.', None, False, None])
170        hqe.save()
171        new_id = hqe.id
172        # Force a re-query and verify that the correct data was stored.
173        monitor_db.DBObject._clear_instance_cache()
174        hqe = monitor_db.HostQueueEntry(id=new_id)
175        self.assertEqual(hqe.id, new_id)
176        self.assertEqual(hqe.job_id, 1)
177        self.assertEqual(hqe.host_id, 2)
178        self.assertEqual(hqe.status, 'Queued')
179        self.assertEqual(hqe.meta_host, None)
180        self.assertEqual(hqe.active, False)
181        self.assertEqual(hqe.complete, False)
182        self.assertEqual(hqe.deleted, False)
183        self.assertEqual(hqe.execution_subdir, '.')
184        self.assertEqual(hqe.atomic_group_id, None)
185        self.assertEqual(hqe.started_on, None)
186
187
188class DispatcherSchedulingTest(BaseSchedulerTest):
189    _jobs_scheduled = []
190
191
192    def tearDown(self):
193        super(DispatcherSchedulingTest, self).tearDown()
194
195
196    def _set_monitor_stubs(self):
197        super(DispatcherSchedulingTest, self)._set_monitor_stubs()
198
199        def job_run_stub(job_self, queue_entry):
200            """Return a dummy for testing.  Called by HostQueueEntry.run()."""
201            self._record_job_scheduled(job_self.id, queue_entry.host.id)
202            queue_entry.set_status('Starting')
203            return DummyAgent()
204
205        self.god.stub_with(monitor_db.Job, 'run', job_run_stub)
206
207        def hqe_queue_log_record_stub(self, log_line):
208            """No-Op to avoid calls down to the _drone_manager during tests."""
209
210        self.god.stub_with(monitor_db.HostQueueEntry, 'queue_log_record',
211                           hqe_queue_log_record_stub)
212
213
214    def _record_job_scheduled(self, job_id, host_id):
215        record = (job_id, host_id)
216        self.assert_(record not in self._jobs_scheduled,
217                     'Job %d scheduled on host %d twice' %
218                     (job_id, host_id))
219        self._jobs_scheduled.append(record)
220
221
222    def _assert_job_scheduled_on(self, job_id, host_id):
223        record = (job_id, host_id)
224        self.assert_(record in self._jobs_scheduled,
225                     'Job %d not scheduled on host %d as expected\n'
226                     'Jobs scheduled: %s' %
227                     (job_id, host_id, self._jobs_scheduled))
228        self._jobs_scheduled.remove(record)
229
230
231    def _assert_job_scheduled_on_number_of(self, job_id, host_ids, number):
232        """Assert job was scheduled on exactly number hosts out of a set."""
233        found = []
234        for host_id in host_ids:
235            record = (job_id, host_id)
236            if record in self._jobs_scheduled:
237                found.append(record)
238                self._jobs_scheduled.remove(record)
239        if len(found) < number:
240            self.fail('Job %d scheduled on fewer than %d hosts in %s.\n'
241                      'Jobs scheduled: %s' % (job_id, number, host_ids, found))
242        elif len(found) > number:
243            self.fail('Job %d scheduled on more than %d hosts in %s.\n'
244                      'Jobs scheduled: %s' % (job_id, number, host_ids, found))
245
246
247    def _check_for_extra_schedulings(self):
248        if len(self._jobs_scheduled) != 0:
249            self.fail('Extra jobs scheduled: ' +
250                      str(self._jobs_scheduled))
251
252
253    def _convert_jobs_to_metahosts(self, *job_ids):
254        sql_tuple = '(' + ','.join(str(i) for i in job_ids) + ')'
255        self._do_query('UPDATE host_queue_entries SET '
256                       'meta_host=host_id, host_id=NULL '
257                       'WHERE job_id IN ' + sql_tuple)
258
259
260    def _lock_host(self, host_id):
261        self._do_query('UPDATE hosts SET locked=1 WHERE id=' +
262                       str(host_id))
263
264
265    def setUp(self):
266        super(DispatcherSchedulingTest, self).setUp()
267        self._jobs_scheduled = []
268
269
270    def _test_basic_scheduling_helper(self, use_metahosts):
271        'Basic nonmetahost scheduling'
272        self._create_job_simple([1], use_metahosts)
273        self._create_job_simple([2], use_metahosts)
274        self._dispatcher._schedule_new_jobs()
275        self._assert_job_scheduled_on(1, 1)
276        self._assert_job_scheduled_on(2, 2)
277        self._check_for_extra_schedulings()
278
279
280    def _test_priorities_helper(self, use_metahosts):
281        'Test prioritization ordering'
282        self._create_job_simple([1], use_metahosts)
283        self._create_job_simple([2], use_metahosts)
284        self._create_job_simple([1,2], use_metahosts)
285        self._create_job_simple([1], use_metahosts, priority=1)
286        self._dispatcher._schedule_new_jobs()
287        self._assert_job_scheduled_on(4, 1) # higher priority
288        self._assert_job_scheduled_on(2, 2) # earlier job over later
289        self._check_for_extra_schedulings()
290
291
292    def _test_hosts_ready_helper(self, use_metahosts):
293        """
294        Only hosts that are status=Ready, unlocked and not invalid get
295        scheduled.
296        """
297        self._create_job_simple([1], use_metahosts)
298        self._do_query('UPDATE hosts SET status="Running" WHERE id=1')
299        self._dispatcher._schedule_new_jobs()
300        self._check_for_extra_schedulings()
301
302        self._do_query('UPDATE hosts SET status="Ready", locked=1 '
303                       'WHERE id=1')
304        self._dispatcher._schedule_new_jobs()
305        self._check_for_extra_schedulings()
306
307        self._do_query('UPDATE hosts SET locked=0, invalid=1 '
308                       'WHERE id=1')
309        self._dispatcher._schedule_new_jobs()
310        if not use_metahosts:
311            self._assert_job_scheduled_on(1, 1)
312        self._check_for_extra_schedulings()
313
314
315    def _test_hosts_idle_helper(self, use_metahosts):
316        'Only idle hosts get scheduled'
317        self._create_job(hosts=[1], active=True)
318        self._create_job_simple([1], use_metahosts)
319        self._dispatcher._schedule_new_jobs()
320        self._check_for_extra_schedulings()
321
322
323    def _test_obey_ACLs_helper(self, use_metahosts):
324        self._do_query('DELETE FROM acl_groups_hosts WHERE host_id=1')
325        self._create_job_simple([1], use_metahosts)
326        self._dispatcher._schedule_new_jobs()
327        self._check_for_extra_schedulings()
328
329
330    def test_basic_scheduling(self):
331        self._test_basic_scheduling_helper(False)
332
333
334    def test_priorities(self):
335        self._test_priorities_helper(False)
336
337
338    def test_hosts_ready(self):
339        self._test_hosts_ready_helper(False)
340
341
342    def test_hosts_idle(self):
343        self._test_hosts_idle_helper(False)
344
345
346    def test_obey_ACLs(self):
347        self._test_obey_ACLs_helper(False)
348
349
350    def test_non_metahost_on_invalid_host(self):
351        """
352        Non-metahost entries can get scheduled on invalid hosts (this is how
353        one-time hosts work).
354        """
355        self._do_query('UPDATE hosts SET invalid=1')
356        self._test_basic_scheduling_helper(False)
357
358
359    def test_metahost_scheduling(self):
360        """
361        Basic metahost scheduling
362        """
363        self._test_basic_scheduling_helper(True)
364
365
366    def test_metahost_priorities(self):
367        self._test_priorities_helper(True)
368
369
370    def test_metahost_hosts_ready(self):
371        self._test_hosts_ready_helper(True)
372
373
374    def test_metahost_hosts_idle(self):
375        self._test_hosts_idle_helper(True)
376
377
378    def test_metahost_obey_ACLs(self):
379        self._test_obey_ACLs_helper(True)
380
381
382    def _setup_test_only_if_needed_labels(self):
383        # apply only_if_needed label3 to host1
384        models.Host.smart_get('host1').labels.add(self.label3)
385        return self._create_job_simple([1], use_metahost=True)
386
387
388    def test_only_if_needed_labels_avoids_host(self):
389        job = self._setup_test_only_if_needed_labels()
390        # if the job doesn't depend on label3, there should be no scheduling
391        self._dispatcher._schedule_new_jobs()
392        self._check_for_extra_schedulings()
393
394
395    def test_only_if_needed_labels_schedules(self):
396        job = self._setup_test_only_if_needed_labels()
397        job.dependency_labels.add(self.label3)
398        self._dispatcher._schedule_new_jobs()
399        self._assert_job_scheduled_on(1, 1)
400        self._check_for_extra_schedulings()
401
402
403    def test_only_if_needed_labels_via_metahost(self):
404        job = self._setup_test_only_if_needed_labels()
405        job.dependency_labels.add(self.label3)
406        # should also work if the metahost is the only_if_needed label
407        self._do_query('DELETE FROM jobs_dependency_labels')
408        self._create_job(metahosts=[3])
409        self._dispatcher._schedule_new_jobs()
410        self._assert_job_scheduled_on(2, 1)
411        self._check_for_extra_schedulings()
412
413
414    def test_nonmetahost_over_metahost(self):
415        """
416        Non-metahost entries should take priority over metahost entries
417        for the same host
418        """
419        self._create_job(metahosts=[1])
420        self._create_job(hosts=[1])
421        self._dispatcher._schedule_new_jobs()
422        self._assert_job_scheduled_on(2, 1)
423        self._check_for_extra_schedulings()
424
425
426    def test_metahosts_obey_blocks(self):
427        """
428        Metahosts can't get scheduled on hosts already scheduled for
429        that job.
430        """
431        self._create_job(metahosts=[1], hosts=[1])
432        # make the nonmetahost entry complete, so the metahost can try
433        # to get scheduled
434        self._update_hqe(set='complete = 1', where='host_id=1')
435        self._dispatcher._schedule_new_jobs()
436        self._check_for_extra_schedulings()
437
438
439    # TODO(gps): These should probably live in their own TestCase class
440    # specific to testing HostScheduler methods directly.  It was convenient
441    # to put it here for now to share existing test environment setup code.
442    def test_HostScheduler_check_atomic_group_labels(self):
443        normal_job = self._create_job(metahosts=[0])
444        atomic_job = self._create_job(atomic_group=1)
445        # Indirectly initialize the internal state of the host scheduler.
446        self._dispatcher._refresh_pending_queue_entries()
447
448        atomic_hqe = monitor_db.HostQueueEntry(id=atomic_job.id)
449        normal_hqe = monitor_db.HostQueueEntry(id=normal_job.id)
450
451        host_scheduler = self._dispatcher._host_scheduler
452        self.assertTrue(host_scheduler._check_atomic_group_labels(
453                [self.label4.id], atomic_hqe))
454        self.assertFalse(host_scheduler._check_atomic_group_labels(
455                [self.label4.id], normal_hqe))
456        self.assertFalse(host_scheduler._check_atomic_group_labels(
457                [self.label5.id, self.label6.id, self.label7.id], normal_hqe))
458        self.assertTrue(host_scheduler._check_atomic_group_labels(
459                [self.label4.id, self.label6.id], atomic_hqe))
460        self.assertRaises(monitor_db.SchedulerError,
461                          host_scheduler._check_atomic_group_labels,
462                          [self.label4.id, self.label5.id],
463                          atomic_hqe)
464
465
466    def test_HostScheduler_get_host_atomic_group_id(self):
467        self._create_job(metahosts=[self.label6.id])
468        # Indirectly initialize the internal state of the host scheduler.
469        self._dispatcher._refresh_pending_queue_entries()
470
471        # Test the host scheduler
472        host_scheduler = self._dispatcher._host_scheduler
473        self.assertRaises(monitor_db.SchedulerError,
474                          host_scheduler._get_host_atomic_group_id,
475                          [self.label4.id, self.label5.id])
476        self.assertEqual(None, host_scheduler._get_host_atomic_group_id([]))
477        self.assertEqual(None, host_scheduler._get_host_atomic_group_id(
478                [self.label3.id, self.label7.id, self.label6.id]))
479        self.assertEqual(1, host_scheduler._get_host_atomic_group_id(
480                [self.label4.id, self.label7.id, self.label6.id]))
481        self.assertEqual(1, host_scheduler._get_host_atomic_group_id(
482                [self.label7.id, self.label5.id]))
483
484
485    def test_atomic_group_hosts_blocked_from_non_atomic_jobs(self):
486        # Create a job scheduled to run on label6.
487        self._create_job(metahosts=[self.label6.id])
488        self._dispatcher._schedule_new_jobs()
489        # label6 only has hosts that are in atomic groups associated with it,
490        # there should be no scheduling.
491        self._check_for_extra_schedulings()
492
493
494    def test_atomic_group_hosts_blocked_from_non_atomic_jobs_explicit(self):
495        # Create a job scheduled to run on label5.  This is an atomic group
496        # label but this job does not request atomic group scheduling.
497        self._create_job(metahosts=[self.label5.id])
498        self._dispatcher._schedule_new_jobs()
499        # label6 only has hosts that are in atomic groups associated with it,
500        # there should be no scheduling.
501        self._check_for_extra_schedulings()
502
503
504    def test_atomic_group_scheduling_basics(self):
505        # Create jobs scheduled to run on an atomic group.
506        job_a = self._create_job(synchronous=True, metahosts=[self.label4.id],
507                         atomic_group=1)
508        job_b = self._create_job(synchronous=True, metahosts=[self.label5.id],
509                         atomic_group=1)
510        self._dispatcher._schedule_new_jobs()
511        # atomic_group.max_number_of_machines was 2 so we should run on 2.
512        self._assert_job_scheduled_on_number_of(job_a.id, (5, 6, 7), 2)
513        self._assert_job_scheduled_on(job_b.id, 8)  # label5
514        self._assert_job_scheduled_on(job_b.id, 9)  # label5
515        self._check_for_extra_schedulings()
516
517        # The three host label4 atomic group still has one host available.
518        # That means a job with a synch_count of 1 asking to be scheduled on
519        # the atomic group can still use the final machine.
520        #
521        # This may seem like a somewhat odd use case.  It allows the use of an
522        # atomic group as a set of machines to run smaller jobs within (a set
523        # of hosts configured for use in network tests with eachother perhaps?)
524        onehost_job = self._create_job(atomic_group=1)
525        self._dispatcher._schedule_new_jobs()
526        self._assert_job_scheduled_on_number_of(onehost_job.id, (5, 6, 7), 1)
527        self._check_for_extra_schedulings()
528
529        # No more atomic groups have hosts available, no more jobs should
530        # be scheduled.
531        self._create_job(atomic_group=1)
532        self._dispatcher._schedule_new_jobs()
533        self._check_for_extra_schedulings()
534
535
536    def test_atomic_group_scheduling_obeys_acls(self):
537        # Request scheduling on a specific atomic label but be denied by ACLs.
538        self._do_query('DELETE FROM acl_groups_hosts WHERE host_id in (8,9)')
539        job = self._create_job(metahosts=[self.label5.id], atomic_group=1)
540        self._dispatcher._schedule_new_jobs()
541        self._check_for_extra_schedulings()
542
543
544    def test_atomic_group_scheduling_dependency_label_exclude(self):
545        # A dependency label that matches no hosts in the atomic group.
546        job_a = self._create_job(atomic_group=1)
547        job_a.dependency_labels.add(self.label3)
548        self._dispatcher._schedule_new_jobs()
549        self._check_for_extra_schedulings()
550
551
552    def test_atomic_group_scheduling_metahost_dependency_label_exclude(self):
553        # A metahost and dependency label that excludes too many hosts.
554        job_b = self._create_job(synchronous=True, metahosts=[self.label4.id],
555                                 atomic_group=1)
556        job_b.dependency_labels.add(self.label7)
557        self._dispatcher._schedule_new_jobs()
558        self._check_for_extra_schedulings()
559
560
561    def test_atomic_group_scheduling_dependency_label_match(self):
562        # A dependency label that exists on enough atomic group hosts in only
563        # one of the two atomic group labels.
564        job_c = self._create_job(synchronous=True, atomic_group=1)
565        job_c.dependency_labels.add(self.label7)
566        self._dispatcher._schedule_new_jobs()
567        self._assert_job_scheduled_on_number_of(job_c.id, (8, 9), 2)
568        self._check_for_extra_schedulings()
569
570
571    def test_atomic_group_scheduling_no_metahost(self):
572        # Force it to schedule on the other group for a reliable test.
573        self._do_query('UPDATE hosts SET invalid=1 WHERE id=9')
574        # An atomic job without a metahost.
575        job = self._create_job(synchronous=True, atomic_group=1)
576        self._dispatcher._schedule_new_jobs()
577        self._assert_job_scheduled_on_number_of(job.id, (5, 6, 7), 2)
578        self._check_for_extra_schedulings()
579
580
581    def test_atomic_group_scheduling_partial_group(self):
582        # Make one host in labels[3] unavailable so that there are only two
583        # hosts left in the group.
584        self._do_query('UPDATE hosts SET status="Repair Failed" WHERE id=5')
585        job = self._create_job(synchronous=True, metahosts=[self.label4.id],
586                         atomic_group=1)
587        self._dispatcher._schedule_new_jobs()
588        # Verify that it was scheduled on the 2 ready hosts in that group.
589        self._assert_job_scheduled_on(job.id, 6)
590        self._assert_job_scheduled_on(job.id, 7)
591        self._check_for_extra_schedulings()
592
593
594    def test_atomic_group_scheduling_not_enough_available(self):
595        # Mark some hosts in each atomic group label as not usable.
596        # One host running, another invalid in the first group label.
597        self._do_query('UPDATE hosts SET status="Running" WHERE id=5')
598        self._do_query('UPDATE hosts SET invalid=1 WHERE id=6')
599        # One host invalid in the second group label.
600        self._do_query('UPDATE hosts SET invalid=1 WHERE id=9')
601        # Nothing to schedule when no group label has enough (2) good hosts..
602        self._create_job(atomic_group=1, synchronous=True)
603        self._dispatcher._schedule_new_jobs()
604        # There are not enough hosts in either atomic group,
605        # No more scheduling should occur.
606        self._check_for_extra_schedulings()
607
608        # Now create an atomic job that has a synch count of 1.  It should
609        # schedule on exactly one of the hosts.
610        onehost_job = self._create_job(atomic_group=1)
611        self._dispatcher._schedule_new_jobs()
612        self._assert_job_scheduled_on_number_of(onehost_job.id, (7, 8), 1)
613
614
615    def test_atomic_group_scheduling_no_valid_hosts(self):
616        self._do_query('UPDATE hosts SET invalid=1 WHERE id in (8,9)')
617        self._create_job(synchronous=True, metahosts=[self.label5.id],
618                         atomic_group=1)
619        self._dispatcher._schedule_new_jobs()
620        # no hosts in the selected group and label are valid.  no schedulings.
621        self._check_for_extra_schedulings()
622
623
624    def test_atomic_group_scheduling_metahost_works(self):
625        # Test that atomic group scheduling also obeys metahosts.
626        self._create_job(metahosts=[0], atomic_group=1)
627        self._dispatcher._schedule_new_jobs()
628        # There are no atomic group hosts that also have that metahost.
629        self._check_for_extra_schedulings()
630
631        job_b = self._create_job(metahosts=[self.label5.id], atomic_group=1)
632        self._dispatcher._schedule_new_jobs()
633        self._assert_job_scheduled_on(job_b.id, 8)
634        self._assert_job_scheduled_on(job_b.id, 9)
635        self._check_for_extra_schedulings()
636
637
638    def test_atomic_group_skips_ineligible_hosts(self):
639        # Test hosts marked ineligible for this job are not eligible.
640        # How would this ever happen anyways?
641        job = self._create_job(metahosts=[self.label4.id], atomic_group=1)
642        models.IneligibleHostQueue.objects.create(job=job, host_id=5)
643        models.IneligibleHostQueue.objects.create(job=job, host_id=6)
644        models.IneligibleHostQueue.objects.create(job=job, host_id=7)
645        self._dispatcher._schedule_new_jobs()
646        # No scheduling should occur as all desired hosts were ineligible.
647        self._check_for_extra_schedulings()
648
649
650    def test_atomic_group_scheduling_fail(self):
651        # If synch_count is > the atomic group number of machines, the job
652        # should be aborted immediately.
653        model_job = self._create_job(synchronous=True, atomic_group=1)
654        model_job.synch_count = 4
655        model_job.save()
656        job = monitor_db.Job(id=model_job.id)
657        self._dispatcher._schedule_new_jobs()
658        self._check_for_extra_schedulings()
659        queue_entries = job.get_host_queue_entries()
660        self.assertEqual(1, len(queue_entries))
661        self.assertEqual(queue_entries[0].status,
662                         models.HostQueueEntry.Status.ABORTED)
663
664
665    def test_atomic_group_no_labels_no_scheduling(self):
666        # Never schedule on atomic groups marked invalid.
667        job = self._create_job(metahosts=[self.label5.id], synchronous=True,
668                               atomic_group=1)
669        # Deleting an atomic group via the frontend marks it invalid and
670        # removes all label references to the group.  The job now references
671        # an invalid atomic group with no labels associated with it.
672        self.label5.atomic_group.invalid = True
673        self.label5.atomic_group.save()
674        self.label5.atomic_group = None
675        self.label5.save()
676
677        self._dispatcher._schedule_new_jobs()
678        self._check_for_extra_schedulings()
679
680
681    def test_schedule_directly_on_atomic_group_host_fail(self):
682        # Scheduling a job directly on hosts in an atomic group must
683        # fail to avoid users inadvertently holding up the use of an
684        # entire atomic group by using the machines individually.
685        job = self._create_job(hosts=[5])
686        self._dispatcher._schedule_new_jobs()
687        self._check_for_extra_schedulings()
688
689
690    def test_schedule_directly_on_atomic_group_host(self):
691        # Scheduling a job directly on one host in an atomic group will
692        # work when the atomic group is listed on the HQE in addition
693        # to the host (assuming the sync count is 1).
694        job = self._create_job(hosts=[5], atomic_group=1)
695        self._dispatcher._schedule_new_jobs()
696        self._assert_job_scheduled_on(job.id, 5)
697        self._check_for_extra_schedulings()
698
699
700    def test_schedule_directly_on_atomic_group_hosts_sync2(self):
701        job = self._create_job(hosts=[5,8], atomic_group=1, synchronous=True)
702        self._dispatcher._schedule_new_jobs()
703        self._assert_job_scheduled_on(job.id, 5)
704        self._assert_job_scheduled_on(job.id, 8)
705        self._check_for_extra_schedulings()
706
707
708    def test_schedule_directly_on_atomic_group_hosts_wrong_group(self):
709        job = self._create_job(hosts=[5,8], atomic_group=2, synchronous=True)
710        self._dispatcher._schedule_new_jobs()
711        self._check_for_extra_schedulings()
712
713
714    def test_only_schedule_queued_entries(self):
715        self._create_job(metahosts=[1])
716        self._update_hqe(set='active=1, host_id=2')
717        self._dispatcher._schedule_new_jobs()
718        self._check_for_extra_schedulings()
719
720
721    def test_no_ready_hosts(self):
722        self._create_job(hosts=[1])
723        self._do_query('UPDATE hosts SET status="Repair Failed"')
724        self._dispatcher._schedule_new_jobs()
725        self._check_for_extra_schedulings()
726
727
728class DispatcherThrottlingTest(BaseSchedulerTest):
729    """
730    Test that the dispatcher throttles:
731     * total number of running processes
732     * number of processes started per cycle
733    """
734    _MAX_RUNNING = 3
735    _MAX_STARTED = 2
736
737    def setUp(self):
738        super(DispatcherThrottlingTest, self).setUp()
739        scheduler_config.config.max_processes_per_drone = self._MAX_RUNNING
740        scheduler_config.config.max_processes_started_per_cycle = (
741            self._MAX_STARTED)
742
743        def fake_max_runnable_processes(fake_self):
744            running = sum(agent.num_processes
745                          for agent in self._agents
746                          if agent.is_running())
747            return self._MAX_RUNNING - running
748        self.god.stub_with(drone_manager.DroneManager, 'max_runnable_processes',
749                           fake_max_runnable_processes)
750
751
752    def _setup_some_agents(self, num_agents):
753        self._agents = [DummyAgent() for i in xrange(num_agents)]
754        self._dispatcher._agents = list(self._agents)
755
756
757    def _run_a_few_cycles(self):
758        for i in xrange(4):
759            self._dispatcher._handle_agents()
760
761
762    def _assert_agents_started(self, indexes, is_started=True):
763        for i in indexes:
764            self.assert_(self._agents[i].is_running() == is_started,
765                         'Agent %d %sstarted' %
766                         (i, is_started and 'not ' or ''))
767
768
769    def _assert_agents_not_started(self, indexes):
770        self._assert_agents_started(indexes, False)
771
772
773    def test_throttle_total(self):
774        self._setup_some_agents(4)
775        self._run_a_few_cycles()
776        self._assert_agents_started([0, 1, 2])
777        self._assert_agents_not_started([3])
778
779
780    def test_throttle_per_cycle(self):
781        self._setup_some_agents(3)
782        self._dispatcher._handle_agents()
783        self._assert_agents_started([0, 1])
784        self._assert_agents_not_started([2])
785
786
787    def test_throttle_with_synchronous(self):
788        self._setup_some_agents(2)
789        self._agents[0].num_processes = 3
790        self._run_a_few_cycles()
791        self._assert_agents_started([0])
792        self._assert_agents_not_started([1])
793
794
795    def test_large_agent_starvation(self):
796        """
797        Ensure large agents don't get starved by lower-priority agents.
798        """
799        self._setup_some_agents(3)
800        self._agents[1].num_processes = 3
801        self._run_a_few_cycles()
802        self._assert_agents_started([0])
803        self._assert_agents_not_started([1, 2])
804
805        self._agents[0].set_done(True)
806        self._run_a_few_cycles()
807        self._assert_agents_started([1])
808        self._assert_agents_not_started([2])
809
810
811    def test_zero_process_agent(self):
812        self._setup_some_agents(5)
813        self._agents[4].num_processes = 0
814        self._run_a_few_cycles()
815        self._assert_agents_started([0, 1, 2, 4])
816        self._assert_agents_not_started([3])
817
818
819class FindAbortTest(BaseSchedulerTest):
820    """
821    Test the dispatcher abort functionality.
822    """
823    def _check_host_agent(self, agent, host_id):
824        self.assert_(isinstance(agent, monitor_db.Agent))
825        tasks = list(agent.queue.queue)
826        self.assertEquals(len(tasks), 2)
827        cleanup, verify = tasks
828
829        self.assert_(isinstance(cleanup, monitor_db.CleanupTask))
830        self.assertEquals(cleanup.host.id, host_id)
831
832        self.assert_(isinstance(verify, monitor_db.VerifyTask))
833        self.assertEquals(verify.host.id, host_id)
834
835
836    def _check_agents(self, agents):
837        agents = list(agents)
838        self.assertEquals(len(agents), 3)
839        self.assertEquals(agents[0], self._agent)
840        self._check_host_agent(agents[1], 1)
841        self._check_host_agent(agents[2], 2)
842
843
844    def _common_setup(self):
845        self._create_job(hosts=[1, 2])
846        self._update_hqe(set='aborted=1')
847        self._agent = self.god.create_mock_class(monitor_db.Agent, 'old_agent')
848        _set_host_and_qe_ids(self._agent, [1, 2])
849        self._agent.abort.expect_call()
850        self._agent.abort.expect_call() # gets called once for each HQE
851        self._dispatcher.add_agent(self._agent)
852
853
854    def test_find_aborting(self):
855        self._common_setup()
856        self._dispatcher._find_aborting()
857        self.god.check_playback()
858
859
860    def test_find_aborting_verifying(self):
861        self._common_setup()
862        self._update_hqe(set='active=1, status="Verifying"')
863
864        self._dispatcher._find_aborting()
865
866        self._check_agents(self._dispatcher._agents)
867        self.god.check_playback()
868
869
870class JobTimeoutTest(BaseSchedulerTest):
871    def _test_synch_start_timeout_helper(self, expect_abort,
872                                         set_created_on=True, set_active=True,
873                                         set_acl=True):
874        scheduler_config.config.synch_job_start_timeout_minutes = 60
875        job = self._create_job(hosts=[1, 2])
876        if set_active:
877            hqe = job.hostqueueentry_set.filter(host__id=1)[0]
878            hqe.status = 'Pending'
879            hqe.active = 1
880            hqe.save()
881
882        everyone_acl = models.AclGroup.smart_get('Everyone')
883        host1 = models.Host.smart_get(1)
884        if set_acl:
885            everyone_acl.hosts.add(host1)
886        else:
887            everyone_acl.hosts.remove(host1)
888
889        job.created_on = datetime.datetime.now()
890        if set_created_on:
891            job.created_on -= datetime.timedelta(minutes=100)
892        job.save()
893
894        cleanup = self._dispatcher._periodic_cleanup
895        cleanup._abort_jobs_past_synch_start_timeout()
896
897        for hqe in job.hostqueueentry_set.all():
898            self.assertEquals(hqe.aborted, expect_abort)
899
900
901    def test_synch_start_timeout_helper(self):
902        # no abort if any of the condition aren't met
903        self._test_synch_start_timeout_helper(False, set_created_on=False)
904        self._test_synch_start_timeout_helper(False, set_active=False)
905        self._test_synch_start_timeout_helper(False, set_acl=False)
906        # abort if all conditions are met
907        self._test_synch_start_timeout_helper(True)
908
909
910class PidfileRunMonitorTest(unittest.TestCase):
911    execution_tag = 'test_tag'
912    pid = 12345
913    process = drone_manager.Process('myhost', pid)
914    num_tests_failed = 1
915
916    def setUp(self):
917        self.god = mock.mock_god()
918        self.mock_drone_manager = self.god.create_mock_class(
919            drone_manager.DroneManager, 'drone_manager')
920        self.god.stub_with(monitor_db, '_drone_manager',
921                           self.mock_drone_manager)
922        self.god.stub_function(email_manager.manager, 'enqueue_notify_email')
923
924        self.pidfile_id = object()
925
926        (self.mock_drone_manager.get_pidfile_id_from
927             .expect_call(self.execution_tag,
928                          pidfile_name=monitor_db._AUTOSERV_PID_FILE)
929             .and_return(self.pidfile_id))
930        self.mock_drone_manager.register_pidfile.expect_call(self.pidfile_id)
931
932        self.monitor = monitor_db.PidfileRunMonitor()
933        self.monitor.attach_to_existing_process(self.execution_tag)
934
935
936    def tearDown(self):
937        self.god.unstub_all()
938
939
940    def setup_pidfile(self, pid=None, exit_code=None, tests_failed=None,
941                      use_second_read=False):
942        contents = drone_manager.PidfileContents()
943        if pid is not None:
944            contents.process = drone_manager.Process('myhost', pid)
945        contents.exit_status = exit_code
946        contents.num_tests_failed = tests_failed
947        self.mock_drone_manager.get_pidfile_contents.expect_call(
948            self.pidfile_id, use_second_read=use_second_read).and_return(
949            contents)
950
951
952    def set_not_yet_run(self):
953        self.setup_pidfile()
954
955
956    def set_empty_pidfile(self):
957        self.setup_pidfile()
958
959
960    def set_running(self, use_second_read=False):
961        self.setup_pidfile(self.pid, use_second_read=use_second_read)
962
963
964    def set_complete(self, error_code, use_second_read=False):
965        self.setup_pidfile(self.pid, error_code, self.num_tests_failed,
966                           use_second_read=use_second_read)
967
968
969    def _check_monitor(self, expected_pid, expected_exit_status,
970                       expected_num_tests_failed):
971        if expected_pid is None:
972            self.assertEquals(self.monitor._state.process, None)
973        else:
974            self.assertEquals(self.monitor._state.process.pid, expected_pid)
975        self.assertEquals(self.monitor._state.exit_status, expected_exit_status)
976        self.assertEquals(self.monitor._state.num_tests_failed,
977                          expected_num_tests_failed)
978
979
980        self.god.check_playback()
981
982
983    def _test_read_pidfile_helper(self, expected_pid, expected_exit_status,
984                                  expected_num_tests_failed):
985        self.monitor._read_pidfile()
986        self._check_monitor(expected_pid, expected_exit_status,
987                            expected_num_tests_failed)
988
989
990    def _get_expected_tests_failed(self, expected_exit_status):
991        if expected_exit_status is None:
992            expected_tests_failed = None
993        else:
994            expected_tests_failed = self.num_tests_failed
995        return expected_tests_failed
996
997
998    def test_read_pidfile(self):
999        self.set_not_yet_run()
1000        self._test_read_pidfile_helper(None, None, None)
1001
1002        self.set_empty_pidfile()
1003        self._test_read_pidfile_helper(None, None, None)
1004
1005        self.set_running()
1006        self._test_read_pidfile_helper(self.pid, None, None)
1007
1008        self.set_complete(123)
1009        self._test_read_pidfile_helper(self.pid, 123, self.num_tests_failed)
1010
1011
1012    def test_read_pidfile_error(self):
1013        self.mock_drone_manager.get_pidfile_contents.expect_call(
1014            self.pidfile_id, use_second_read=False).and_return(
1015            drone_manager.InvalidPidfile('error'))
1016        self.assertRaises(monitor_db.PidfileRunMonitor._PidfileException,
1017                          self.monitor._read_pidfile)
1018        self.god.check_playback()
1019
1020
1021    def setup_is_running(self, is_running):
1022        self.mock_drone_manager.is_process_running.expect_call(
1023            self.process).and_return(is_running)
1024
1025
1026    def _test_get_pidfile_info_helper(self, expected_pid, expected_exit_status,
1027                                      expected_num_tests_failed):
1028        self.monitor._get_pidfile_info()
1029        self._check_monitor(expected_pid, expected_exit_status,
1030                            expected_num_tests_failed)
1031
1032
1033    def test_get_pidfile_info(self):
1034        """
1035        normal cases for get_pidfile_info
1036        """
1037        # running
1038        self.set_running()
1039        self.setup_is_running(True)
1040        self._test_get_pidfile_info_helper(self.pid, None, None)
1041
1042        # exited during check
1043        self.set_running()
1044        self.setup_is_running(False)
1045        self.set_complete(123, use_second_read=True) # pidfile gets read again
1046        self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed)
1047
1048        # completed
1049        self.set_complete(123)
1050        self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed)
1051
1052
1053    def test_get_pidfile_info_running_no_proc(self):
1054        """
1055        pidfile shows process running, but no proc exists
1056        """
1057        # running but no proc
1058        self.set_running()
1059        self.setup_is_running(False)
1060        self.set_running(use_second_read=True)
1061        email_manager.manager.enqueue_notify_email.expect_call(
1062            mock.is_string_comparator(), mock.is_string_comparator())
1063        self._test_get_pidfile_info_helper(self.pid, 1, 0)
1064        self.assertTrue(self.monitor.lost_process)
1065
1066
1067    def test_get_pidfile_info_not_yet_run(self):
1068        """
1069        pidfile hasn't been written yet
1070        """
1071        self.set_not_yet_run()
1072        self._test_get_pidfile_info_helper(None, None, None)
1073
1074
1075    def test_process_failed_to_write_pidfile(self):
1076        self.set_not_yet_run()
1077        email_manager.manager.enqueue_notify_email.expect_call(
1078            mock.is_string_comparator(), mock.is_string_comparator())
1079        self.monitor._start_time = time.time() - monitor_db.PIDFILE_TIMEOUT - 1
1080        self._test_get_pidfile_info_helper(None, 1, 0)
1081        self.assertTrue(self.monitor.lost_process)
1082
1083
1084class AgentTest(unittest.TestCase):
1085    def setUp(self):
1086        self.god = mock.mock_god()
1087        self._dispatcher = self.god.create_mock_class(monitor_db.Dispatcher,
1088                                                      'dispatcher')
1089
1090
1091    def tearDown(self):
1092        self.god.unstub_all()
1093
1094
1095    def _create_mock_task(self, name):
1096        task = self.god.create_mock_class(monitor_db.AgentTask, name)
1097        _set_host_and_qe_ids(task)
1098        return task
1099
1100    def _create_agent(self, tasks):
1101        agent = monitor_db.Agent(tasks)
1102        agent.dispatcher = self._dispatcher
1103        return agent
1104
1105
1106    def _finish_agent(self, agent):
1107        while not agent.is_done():
1108            agent.tick()
1109
1110
1111    def test_agent(self):
1112        task1 = self._create_mock_task('task1')
1113        task2 = self._create_mock_task('task2')
1114        task3 = self._create_mock_task('task3')
1115        task1.poll.expect_call()
1116        task1.is_done.expect_call().and_return(False)
1117        task1.poll.expect_call()
1118        task1.is_done.expect_call().and_return(True)
1119        task1.is_done.expect_call().and_return(True)
1120        task1.success = True
1121
1122        task2.poll.expect_call()
1123        task2.is_done.expect_call().and_return(True)
1124        task2.is_done.expect_call().and_return(True)
1125        task2.success = False
1126        task2.failure_tasks = [task3]
1127
1128        self._dispatcher.add_agent.expect_call(IsAgentWithTask(task3))
1129
1130        agent = self._create_agent([task1, task2])
1131        self._finish_agent(agent)
1132        self.god.check_playback()
1133
1134
1135    def _test_agent_abort_helper(self, ignore_abort=False):
1136        task1 = self._create_mock_task('task1')
1137        task2 = self._create_mock_task('task2')
1138        task1.poll.expect_call()
1139        task1.is_done.expect_call().and_return(False)
1140        task1.abort.expect_call()
1141        if ignore_abort:
1142            task1.aborted = False # task ignores abort; execution continues
1143
1144            task1.poll.expect_call()
1145            task1.is_done.expect_call().and_return(True)
1146            task1.is_done.expect_call().and_return(True)
1147            task1.success = True
1148
1149            task2.poll.expect_call()
1150            task2.is_done.expect_call().and_return(True)
1151            task2.is_done.expect_call().and_return(True)
1152            task2.success = True
1153        else:
1154            task1.aborted = True
1155            task2.abort.expect_call()
1156            task2.aborted = True
1157
1158        agent = self._create_agent([task1, task2])
1159        agent.tick()
1160        agent.abort()
1161        self._finish_agent(agent)
1162        self.god.check_playback()
1163
1164
1165    def test_agent_abort(self):
1166        self._test_agent_abort_helper()
1167        self._test_agent_abort_helper(True)
1168
1169
1170    def _test_agent_abort_before_started_helper(self, ignore_abort=False):
1171        task = self._create_mock_task('task')
1172        task.abort.expect_call()
1173        if ignore_abort:
1174            task.aborted = False
1175            task.poll.expect_call()
1176            task.is_done.expect_call().and_return(True)
1177            task.is_done.expect_call().and_return(True)
1178            task.success = True
1179        else:
1180            task.aborted = True
1181
1182        agent = self._create_agent([task])
1183        agent.abort()
1184        self._finish_agent(agent)
1185        self.god.check_playback()
1186
1187
1188    def test_agent_abort_before_started(self):
1189        self._test_agent_abort_before_started_helper()
1190        self._test_agent_abort_before_started_helper(True)
1191
1192
1193class AgentTasksTest(unittest.TestCase):
1194    TEMP_DIR = '/abspath/tempdir'
1195    RESULTS_DIR = '/results/dir'
1196    HOSTNAME = 'myhost'
1197    DUMMY_PROCESS = object()
1198    HOST_PROTECTION = host_protections.default
1199    PIDFILE_ID = object()
1200    JOB_OWNER = 'test_owner'
1201    JOB_NAME = 'test_job_name'
1202    JOB_AUTOSERV_PARAMS = set(['-u', JOB_OWNER, '-l', JOB_NAME])
1203
1204    def setUp(self):
1205        self.god = mock.mock_god()
1206        self.god.stub_with(drone_manager.DroneManager, 'get_temporary_path',
1207                           mock.mock_function('get_temporary_path',
1208                                              default_return_val='tempdir'))
1209        self.god.stub_function(drone_manager.DroneManager,
1210                               'copy_results_on_drone')
1211        self.god.stub_function(drone_manager.DroneManager,
1212                               'copy_to_results_repository')
1213        self.god.stub_function(drone_manager.DroneManager,
1214                               'get_pidfile_id_from')
1215
1216        def dummy_absolute_path(self, path):
1217            return '/abspath/' + path
1218        self.god.stub_with(drone_manager.DroneManager, 'absolute_path',
1219                           dummy_absolute_path)
1220
1221        self.god.stub_class_method(monitor_db.PidfileRunMonitor, 'run')
1222        self.god.stub_class_method(monitor_db.PidfileRunMonitor, 'exit_code')
1223        self.god.stub_class_method(monitor_db.PidfileRunMonitor, 'get_process')
1224        def mock_has_process(unused):
1225            return True
1226        self.god.stub_with(monitor_db.PidfileRunMonitor, 'has_process',
1227                           mock_has_process)
1228        self.host = self.god.create_mock_class(monitor_db.Host, 'host')
1229        self.host.id = 1
1230        self.host.hostname = self.HOSTNAME
1231        self.host.protection = self.HOST_PROTECTION
1232        self.queue_entry = self.god.create_mock_class(
1233            monitor_db.HostQueueEntry, 'queue_entry')
1234        self.job = self.god.create_mock_class(monitor_db.Job, 'job')
1235        self.job.owner = self.JOB_OWNER
1236        self.job.name = self.JOB_NAME
1237        self.queue_entry.id = 1
1238        self.queue_entry.job = self.job
1239        self.queue_entry.host = self.host
1240        self.queue_entry.meta_host = None
1241        self._dispatcher = self.god.create_mock_class(monitor_db.Dispatcher,
1242                                                      'dispatcher')
1243
1244
1245    def tearDown(self):
1246        self.god.unstub_all()
1247
1248
1249    def run_task(self, task, success):
1250        """
1251        Do essentially what an Agent would do, but protect againt
1252        infinite looping from test errors.
1253        """
1254        if not getattr(task, 'agent', None):
1255            task.agent = object()
1256        count = 0
1257        while not task.is_done():
1258            count += 1
1259            if count > 10:
1260                print 'Task failed to finish'
1261                # in case the playback has clues to why it
1262                # failed
1263                self.god.check_playback()
1264                self.fail()
1265            task.poll()
1266        self.assertEquals(task.success, success)
1267
1268
1269    def setup_run_monitor(self, exit_status, copy_log_file=True):
1270        monitor_db.PidfileRunMonitor.run.expect_call(
1271            mock.is_instance_comparator(list),
1272            'tempdir',
1273            nice_level=monitor_db.AUTOSERV_NICE_LEVEL,
1274            log_file=mock.anything_comparator(),
1275            pidfile_name=monitor_db._AUTOSERV_PID_FILE,
1276            paired_with_pidfile=None)
1277        monitor_db.PidfileRunMonitor.exit_code.expect_call()
1278        monitor_db.PidfileRunMonitor.exit_code.expect_call().and_return(
1279            exit_status)
1280
1281        if copy_log_file:
1282            self._setup_move_logfile()
1283
1284
1285    def _setup_move_logfile(self, copy_on_drone=False,
1286                            include_destination=False):
1287        monitor_db.PidfileRunMonitor.get_process.expect_call().and_return(
1288            self.DUMMY_PROCESS)
1289        if copy_on_drone:
1290            self.queue_entry.execution_tag.expect_call().and_return('tag')
1291            drone_manager.DroneManager.copy_results_on_drone.expect_call(
1292                self.DUMMY_PROCESS, source_path=mock.is_string_comparator(),
1293                destination_path=mock.is_string_comparator())
1294        elif include_destination:
1295            drone_manager.DroneManager.copy_to_results_repository.expect_call(
1296                self.DUMMY_PROCESS, mock.is_string_comparator(),
1297                destination_path=mock.is_string_comparator())
1298        else:
1299            drone_manager.DroneManager.copy_to_results_repository.expect_call(
1300                self.DUMMY_PROCESS, mock.is_string_comparator())
1301
1302
1303    def _test_repair_task_helper(self, success):
1304        self.host.set_status.expect_call('Repairing')
1305        if success:
1306            self.setup_run_monitor(0)
1307            self.host.set_status.expect_call('Ready')
1308        else:
1309            self.setup_run_monitor(1)
1310            self.host.set_status.expect_call('Repair Failed')
1311
1312        task = monitor_db.RepairTask(self.host)
1313        self.assertEquals(task.failure_tasks, [])
1314        self.run_task(task, success)
1315
1316        expected_protection = host_protections.Protection.get_string(
1317            host_protections.default)
1318        expected_protection = host_protections.Protection.get_attr_name(
1319            expected_protection)
1320
1321        self.assertTrue(set(task.cmd) >=
1322                        set([monitor_db._autoserv_path, '-p', '-R', '-m',
1323                             self.HOSTNAME, '-r', self.TEMP_DIR,
1324                             '--host-protection', expected_protection]))
1325        self.god.check_playback()
1326
1327
1328    def test_repair_task(self):
1329        self._test_repair_task_helper(True)
1330        self._test_repair_task_helper(False)
1331
1332
1333    def _test_repair_task_with_queue_entry_helper(self, parse_failed_repair):
1334        self.god.stub_class(monitor_db, 'FinalReparseTask')
1335        self.god.stub_class(monitor_db, 'Agent')
1336        self.god.stub_class_method(monitor_db.TaskWithJobKeyvals,
1337                                   '_write_keyval_after_job')
1338        agent = DummyAgent()
1339        agent.dispatcher = self._dispatcher
1340
1341        self.host.set_status.expect_call('Repairing')
1342        self.queue_entry.requeue.expect_call()
1343        self.setup_run_monitor(1)
1344        self.host.set_status.expect_call('Repair Failed')
1345        self.queue_entry.update_from_database.expect_call()
1346        self.queue_entry.set_execution_subdir.expect_call()
1347        monitor_db.TaskWithJobKeyvals._write_keyval_after_job.expect_call(
1348            'job_queued', mock.is_instance_comparator(int))
1349        monitor_db.TaskWithJobKeyvals._write_keyval_after_job.expect_call(
1350            'job_finished', mock.is_instance_comparator(int))
1351        self._setup_move_logfile(copy_on_drone=True)
1352        self.queue_entry.execution_tag.expect_call().and_return('tag')
1353        self._setup_move_logfile()
1354        self.job.parse_failed_repair = parse_failed_repair
1355        if parse_failed_repair:
1356            reparse_task = monitor_db.FinalReparseTask.expect_new(
1357                [self.queue_entry])
1358            reparse_agent = monitor_db.Agent.expect_new([reparse_task],
1359                                                        num_processes=0)
1360            self._dispatcher.add_agent.expect_call(reparse_agent)
1361        self.queue_entry.handle_host_failure.expect_call()
1362
1363        task = monitor_db.RepairTask(self.host, self.queue_entry)
1364        task.agent = agent
1365        self.queue_entry.status = 'Queued'
1366        self.job.created_on = datetime.datetime(2009, 1, 1)
1367        self.run_task(task, False)
1368        self.assertTrue(set(task.cmd) >= self.JOB_AUTOSERV_PARAMS)
1369        self.god.check_playback()
1370
1371
1372    def test_repair_task_with_queue_entry(self):
1373        self._test_repair_task_with_queue_entry_helper(True)
1374        self._test_repair_task_with_queue_entry_helper(False)
1375
1376
1377    def setup_verify_expects(self, success, use_queue_entry):
1378        if use_queue_entry:
1379            self.queue_entry.set_status.expect_call('Verifying')
1380        self.host.set_status.expect_call('Verifying')
1381        if success:
1382            self.setup_run_monitor(0)
1383            self.host.set_status.expect_call('Ready')
1384        else:
1385            self.setup_run_monitor(1)
1386            if use_queue_entry and not self.queue_entry.meta_host:
1387                self.queue_entry.set_execution_subdir.expect_call()
1388                self.queue_entry.execution_tag.expect_call().and_return('tag')
1389                self._setup_move_logfile(include_destination=True)
1390
1391
1392    def _check_verify_failure_tasks(self, verify_task):
1393        self.assertEquals(len(verify_task.failure_tasks), 1)
1394        repair_task = verify_task.failure_tasks[0]
1395        self.assert_(isinstance(repair_task, monitor_db.RepairTask))
1396        self.assertEquals(verify_task.host, repair_task.host)
1397        if verify_task.queue_entry:
1398            self.assertEquals(repair_task.queue_entry_to_fail,
1399                              verify_task.queue_entry)
1400        else:
1401            self.assertEquals(repair_task.queue_entry_to_fail, None)
1402
1403
1404    def _test_verify_task_helper(self, success, use_queue_entry=False,
1405                                 use_meta_host=False):
1406        self.setup_verify_expects(success, use_queue_entry)
1407
1408        if use_queue_entry:
1409            task = monitor_db.VerifyTask(queue_entry=self.queue_entry)
1410        else:
1411            task = monitor_db.VerifyTask(host=self.host)
1412        self._check_verify_failure_tasks(task)
1413        self.run_task(task, success)
1414        self.assertTrue(set(task.cmd) >=
1415                        set([monitor_db._autoserv_path, '-p', '-v', '-m',
1416                             self.HOSTNAME, '-r', self.TEMP_DIR]))
1417        if use_queue_entry:
1418            self.assertTrue(set(task.cmd) >= self.JOB_AUTOSERV_PARAMS)
1419        self.god.check_playback()
1420
1421
1422    def test_verify_task_with_host(self):
1423        self._test_verify_task_helper(True)
1424        self._test_verify_task_helper(False)
1425
1426
1427    def test_verify_task_with_queue_entry(self):
1428        self._test_verify_task_helper(True, use_queue_entry=True)
1429        self._test_verify_task_helper(False, use_queue_entry=True)
1430
1431
1432    def test_verify_task_with_metahost(self):
1433        self.queue_entry.meta_host = 1
1434        self.test_verify_task_with_queue_entry()
1435
1436
1437    def _setup_post_job_task_expects(self, autoserv_success, hqe_status=None,
1438                                     hqe_aborted=False):
1439        self.queue_entry.execution_tag.expect_call().and_return('tag')
1440        self.pidfile_monitor = monitor_db.PidfileRunMonitor.expect_new()
1441        self.pidfile_monitor.pidfile_id = self.PIDFILE_ID
1442        self.pidfile_monitor.attach_to_existing_process.expect_call('tag')
1443        if autoserv_success:
1444            code = 0
1445        else:
1446            code = 1
1447        self.queue_entry.update_from_database.expect_call()
1448        self.queue_entry.aborted = hqe_aborted
1449        if not hqe_aborted:
1450            self.pidfile_monitor.exit_code.expect_call().and_return(code)
1451
1452        if hqe_status:
1453            self.queue_entry.set_status.expect_call(hqe_status)
1454
1455
1456    def _setup_pre_parse_expects(self, autoserv_success):
1457        self._setup_post_job_task_expects(autoserv_success, 'Parsing')
1458
1459
1460    def _setup_post_parse_expects(self, autoserv_success):
1461        if autoserv_success:
1462            status = 'Completed'
1463        else:
1464            status = 'Failed'
1465        self.queue_entry.set_status.expect_call(status)
1466
1467
1468    def _expect_execute_run_monitor(self):
1469        self.monitor.exit_code.expect_call()
1470        self.monitor.exit_code.expect_call().and_return(0)
1471        self._expect_copy_results()
1472
1473
1474    def _setup_post_job_run_monitor(self, pidfile_name):
1475        self.pidfile_monitor.has_process.expect_call().and_return(True)
1476        autoserv_pidfile_id = object()
1477        self.monitor = monitor_db.PidfileRunMonitor.expect_new()
1478        self.monitor.run.expect_call(
1479            mock.is_instance_comparator(list),
1480            'tag',
1481            nice_level=monitor_db.AUTOSERV_NICE_LEVEL,
1482            log_file=mock.anything_comparator(),
1483            pidfile_name=pidfile_name,
1484            paired_with_pidfile=self.PIDFILE_ID)
1485        self._expect_execute_run_monitor()
1486
1487
1488    def _expect_copy_results(self, monitor=None, queue_entry=None):
1489        if monitor is None:
1490            monitor = self.monitor
1491        monitor.has_process.expect_call().and_return(True)
1492        if queue_entry:
1493            queue_entry.execution_tag.expect_call().and_return('tag')
1494        monitor.get_process.expect_call().and_return(self.DUMMY_PROCESS)
1495        drone_manager.DroneManager.copy_to_results_repository.expect_call(
1496                self.DUMMY_PROCESS, mock.is_string_comparator())
1497
1498
1499    def _test_final_reparse_task_helper(self, autoserv_success=True):
1500        self._setup_pre_parse_expects(autoserv_success)
1501        self._setup_post_job_run_monitor(monitor_db._PARSER_PID_FILE)
1502        self._setup_post_parse_expects(autoserv_success)
1503
1504        task = monitor_db.FinalReparseTask([self.queue_entry])
1505        self.run_task(task, True)
1506
1507        self.god.check_playback()
1508        cmd = [monitor_db._parser_path, '--write-pidfile', '-l', '2', '-r',
1509               '-o', '-P', '/abspath/tag']
1510        self.assertEquals(task.cmd, cmd)
1511
1512
1513    def test_final_reparse_task(self):
1514        self.god.stub_class(monitor_db, 'PidfileRunMonitor')
1515        self._test_final_reparse_task_helper()
1516        self._test_final_reparse_task_helper(autoserv_success=False)
1517
1518
1519    def test_final_reparse_throttling(self):
1520        self.god.stub_class(monitor_db, 'PidfileRunMonitor')
1521        self.god.stub_function(monitor_db.FinalReparseTask,
1522                               '_can_run_new_parse')
1523
1524        self._setup_pre_parse_expects(True)
1525        monitor_db.FinalReparseTask._can_run_new_parse.expect_call().and_return(
1526            False)
1527        monitor_db.FinalReparseTask._can_run_new_parse.expect_call().and_return(
1528            True)
1529        self._setup_post_job_run_monitor(monitor_db._PARSER_PID_FILE)
1530        self._setup_post_parse_expects(True)
1531
1532        task = monitor_db.FinalReparseTask([self.queue_entry])
1533        self.run_task(task, True)
1534        self.god.check_playback()
1535
1536
1537    def test_final_reparse_recovery(self):
1538        self.god.stub_class(monitor_db, 'PidfileRunMonitor')
1539        self.monitor = self.god.create_mock_class(monitor_db.PidfileRunMonitor,
1540                                                  'run_monitor')
1541        self._setup_post_job_task_expects(True)
1542        self._expect_execute_run_monitor()
1543        self._setup_post_parse_expects(True)
1544
1545        task = monitor_db.FinalReparseTask([self.queue_entry],
1546                                           run_monitor=self.monitor)
1547        self.run_task(task, True)
1548        self.god.check_playback()
1549
1550
1551    def _setup_gather_logs_expects(self, autoserv_killed=True,
1552                                   hqe_aborted=False):
1553        self.god.stub_class(monitor_db, 'PidfileRunMonitor')
1554        self.god.stub_class(monitor_db, 'FinalReparseTask')
1555        self._setup_post_job_task_expects(not autoserv_killed, 'Gathering',
1556                                          hqe_aborted)
1557        if hqe_aborted:
1558            exit_code = None
1559        elif autoserv_killed:
1560            exit_code = 271
1561        else:
1562            exit_code = 0
1563        self.pidfile_monitor.exit_code.expect_call().and_return(exit_code)
1564        if exit_code != 0:
1565            self._setup_post_job_run_monitor('.collect_crashinfo_execute')
1566        self.pidfile_monitor.has_process.expect_call().and_return(True)
1567        self._expect_copy_results(monitor=self.pidfile_monitor,
1568                                  queue_entry=self.queue_entry)
1569        parse_task = monitor_db.FinalReparseTask.expect_new([self.queue_entry])
1570        _set_host_and_qe_ids(parse_task)
1571        self._dispatcher.add_agent.expect_call(IsAgentWithTask(parse_task))
1572
1573
1574    def _run_gather_logs_task(self):
1575        task = monitor_db.GatherLogsTask(self.job, [self.queue_entry])
1576        task.agent = DummyAgent()
1577        task.agent.dispatcher = self._dispatcher
1578        self.run_task(task, True)
1579        self.god.check_playback()
1580
1581
1582    def test_gather_logs_task(self):
1583        self._setup_gather_logs_expects()
1584        # no rebooting for this basic test
1585        self.job.reboot_after = models.RebootAfter.NEVER
1586        self.host.set_status.expect_call('Ready')
1587
1588        self._run_gather_logs_task()
1589
1590
1591    def test_gather_logs_task_successful_autoserv(self):
1592        # When Autoserv exits successfully, no collect_crashinfo stage runs
1593        self._setup_gather_logs_expects(autoserv_killed=False)
1594        self.job.reboot_after = models.RebootAfter.NEVER
1595        self.host.set_status.expect_call('Ready')
1596
1597        self._run_gather_logs_task()
1598
1599
1600    def _setup_gather_task_cleanup_expects(self):
1601        self.god.stub_class(monitor_db, 'CleanupTask')
1602        cleanup_task = monitor_db.CleanupTask.expect_new(host=self.host)
1603        _set_host_and_qe_ids(cleanup_task)
1604        self._dispatcher.add_agent.expect_call(IsAgentWithTask(cleanup_task))
1605
1606
1607    def test_gather_logs_reboot_hosts(self):
1608        self._setup_gather_logs_expects()
1609        self.job.reboot_after = models.RebootAfter.ALWAYS
1610        self._setup_gather_task_cleanup_expects()
1611
1612        self._run_gather_logs_task()
1613
1614
1615    def test_gather_logs_reboot_on_abort(self):
1616        self._setup_gather_logs_expects(hqe_aborted=True)
1617        self.job.reboot_after = models.RebootAfter.NEVER
1618        self._setup_gather_task_cleanup_expects()
1619
1620        self._run_gather_logs_task()
1621
1622
1623    def _test_cleanup_task_helper(self, success, use_queue_entry=False):
1624        if use_queue_entry:
1625            self.queue_entry.get_host.expect_call().and_return(self.host)
1626        self.host.set_status.expect_call('Cleaning')
1627        if success:
1628            self.setup_run_monitor(0)
1629            self.host.set_status.expect_call('Ready')
1630            self.host.update_field.expect_call('dirty', 0)
1631        else:
1632            self.setup_run_monitor(1)
1633            if use_queue_entry and not self.queue_entry.meta_host:
1634                self.queue_entry.set_execution_subdir.expect_call()
1635                self.queue_entry.execution_tag.expect_call().and_return('tag')
1636                self._setup_move_logfile(include_destination=True)
1637
1638        if use_queue_entry:
1639            task = monitor_db.CleanupTask(queue_entry=self.queue_entry)
1640        else:
1641            task = monitor_db.CleanupTask(host=self.host)
1642        self.assertEquals(len(task.failure_tasks), 1)
1643        repair_task = task.failure_tasks[0]
1644        self.assert_(isinstance(repair_task, monitor_db.RepairTask))
1645        if use_queue_entry:
1646            self.assertEquals(repair_task.queue_entry_to_fail, self.queue_entry)
1647
1648        self.run_task(task, success)
1649
1650        self.god.check_playback()
1651        self.assert_(set(task.cmd) >=
1652                        set([monitor_db._autoserv_path, '-p', '--cleanup', '-m',
1653                             self.HOSTNAME, '-r', self.TEMP_DIR]))
1654        if use_queue_entry:
1655            self.assertTrue(set(task.cmd) >= self.JOB_AUTOSERV_PARAMS)
1656
1657    def test_cleanup_task(self):
1658        self._test_cleanup_task_helper(True)
1659        self._test_cleanup_task_helper(False)
1660
1661
1662    def test_cleanup_task_with_queue_entry(self):
1663        self._test_cleanup_task_helper(False, True)
1664
1665
1666    def test_recovery_queue_task_aborted_early(self):
1667        # abort a RecoveryQueueTask right after it's created
1668        self.god.stub_class_method(monitor_db.QueueTask, '_log_abort')
1669        self.god.stub_class_method(monitor_db.QueueTask, '_finish_task')
1670        run_monitor = self.god.create_mock_class(monitor_db.PidfileRunMonitor,
1671                                                 'run_monitor')
1672
1673        self.queue_entry.execution_tag.expect_call().and_return('tag')
1674        run_monitor.kill.expect_call()
1675        run_monitor.has_process.expect_call().and_return(True)
1676        monitor_db.QueueTask._log_abort.expect_call()
1677        monitor_db.QueueTask._finish_task.expect_call()
1678
1679        task = monitor_db.RecoveryQueueTask(self.job, [self.queue_entry],
1680                                            run_monitor)
1681        task.abort()
1682        self.assert_(task.aborted)
1683        self.god.check_playback()
1684
1685
1686class HostTest(BaseSchedulerTest):
1687    def test_cmp_for_sort(self):
1688        expected_order = [
1689                'alice', 'Host1', 'host2', 'host3', 'host09', 'HOST010',
1690                'host10', 'host11', 'yolkfolk']
1691        hostname_idx = list(monitor_db.Host._fields).index('hostname')
1692        row = [None] * len(monitor_db.Host._fields)
1693        hosts = []
1694        for hostname in expected_order:
1695            row[hostname_idx] = hostname
1696            hosts.append(monitor_db.Host(row=row, new_record=True))
1697
1698        host1 = hosts[expected_order.index('Host1')]
1699        host010 = hosts[expected_order.index('HOST010')]
1700        host10 = hosts[expected_order.index('host10')]
1701        host3 = hosts[expected_order.index('host3')]
1702        alice = hosts[expected_order.index('alice')]
1703        self.assertEqual(0, monitor_db.Host.cmp_for_sort(host10, host10))
1704        self.assertEqual(1, monitor_db.Host.cmp_for_sort(host10, host010))
1705        self.assertEqual(-1, monitor_db.Host.cmp_for_sort(host010, host10))
1706        self.assertEqual(-1, monitor_db.Host.cmp_for_sort(host1, host10))
1707        self.assertEqual(-1, monitor_db.Host.cmp_for_sort(host1, host010))
1708        self.assertEqual(-1, monitor_db.Host.cmp_for_sort(host3, host10))
1709        self.assertEqual(-1, monitor_db.Host.cmp_for_sort(host3, host010))
1710        self.assertEqual(1, monitor_db.Host.cmp_for_sort(host3, host1))
1711        self.assertEqual(-1, monitor_db.Host.cmp_for_sort(host1, host3))
1712        self.assertEqual(-1, monitor_db.Host.cmp_for_sort(alice, host3))
1713        self.assertEqual(1, monitor_db.Host.cmp_for_sort(host3, alice))
1714        self.assertEqual(0, monitor_db.Host.cmp_for_sort(alice, alice))
1715
1716        hosts.sort(cmp=monitor_db.Host.cmp_for_sort)
1717        self.assertEqual(expected_order, [h.hostname for h in hosts])
1718
1719        hosts.reverse()
1720        hosts.sort(cmp=monitor_db.Host.cmp_for_sort)
1721        self.assertEqual(expected_order, [h.hostname for h in hosts])
1722
1723
1724class HostQueueEntryTest(BaseSchedulerTest):
1725    def _create_hqe(self, dependency_labels=(), **create_job_kwargs):
1726        job = self._create_job(**create_job_kwargs)
1727        for label in dependency_labels:
1728            job.dependency_labels.add(label)
1729        hqes = list(monitor_db.HostQueueEntry.fetch(where='job_id=%d' % job.id))
1730        self.assertEqual(1, len(hqes))
1731        return hqes[0]
1732
1733    def _check_hqe_labels(self, hqe, expected_labels):
1734        expected_labels = set(expected_labels)
1735        label_names = set(label.name for label in hqe.get_labels())
1736        self.assertEqual(expected_labels, label_names)
1737
1738    def test_get_labels_empty(self):
1739        hqe = self._create_hqe(hosts=[1])
1740        labels = list(hqe.get_labels())
1741        self.assertEqual([], labels)
1742
1743    def test_get_labels_metahost(self):
1744        hqe = self._create_hqe(metahosts=[2])
1745        self._check_hqe_labels(hqe, ['label2'])
1746
1747    def test_get_labels_dependancies(self):
1748        hqe = self._create_hqe(dependency_labels=(self.label3, self.label4),
1749                               metahosts=[1])
1750        self._check_hqe_labels(hqe, ['label1', 'label3', 'label4'])
1751
1752
1753class JobTest(BaseSchedulerTest):
1754    def setUp(self):
1755        super(JobTest, self).setUp()
1756        self.god.stub_with(
1757            drone_manager.DroneManager, 'attach_file_to_execution',
1758            mock.mock_function('attach_file_to_execution',
1759                               default_return_val='/test/path/tmp/foo'))
1760
1761
1762    def _setup_directory_expects(self, execution_subdir):
1763        # XXX(gps): um... this function does -nothing-
1764        job_path = os.path.join('.', '1-my_user')
1765        results_dir = os.path.join(job_path, execution_subdir)
1766
1767
1768    def _test_run_helper(self, expect_agent=True, expect_starting=False,
1769                         expect_pending=False):
1770        if expect_starting:
1771            expected_status = models.HostQueueEntry.Status.STARTING
1772        elif expect_pending:
1773            expected_status = models.HostQueueEntry.Status.PENDING
1774        else:
1775            expected_status = models.HostQueueEntry.Status.VERIFYING
1776        job = monitor_db.Job.fetch('id = 1').next()
1777        queue_entry = monitor_db.HostQueueEntry.fetch('id = 1').next()
1778        agent = job.run(queue_entry)
1779
1780        self.god.check_playback()
1781        self.assertEquals(models.HostQueueEntry.smart_get(1).status,
1782                          expected_status)
1783
1784        if not expect_agent:
1785            self.assertEquals(agent, None)
1786            return
1787
1788        self.assert_(isinstance(agent, monitor_db.Agent))
1789        tasks = list(agent.queue.queue)
1790        return tasks
1791
1792
1793    def _check_verify_task(self, verify_task):
1794        self.assert_(isinstance(verify_task, monitor_db.VerifyTask))
1795        self.assertEquals(verify_task.queue_entry.id, 1)
1796
1797
1798    def _check_pending_task(self, pending_task):
1799        self.assert_(isinstance(pending_task, monitor_db.SetEntryPendingTask))
1800        self.assertEquals(pending_task._queue_entry.id, 1)
1801
1802
1803    def test_run_asynchronous(self):
1804        self._create_job(hosts=[1, 2])
1805
1806        tasks = self._test_run_helper()
1807
1808        self.assertEquals(len(tasks), 2)
1809        verify_task, pending_task = tasks
1810        self._check_verify_task(verify_task)
1811        self._check_pending_task(pending_task)
1812
1813
1814    def test_run_asynchronous_skip_verify(self):
1815        job = self._create_job(hosts=[1, 2])
1816        job.run_verify = False
1817        job.save()
1818        self._setup_directory_expects('host1')
1819
1820        tasks = self._test_run_helper()
1821
1822        self.assertEquals(len(tasks), 1)
1823        pending_task = tasks[0]
1824        self._check_pending_task(pending_task)
1825
1826
1827    def test_run_synchronous_verify(self):
1828        self._create_job(hosts=[1, 2], synchronous=True)
1829
1830        tasks = self._test_run_helper()
1831        self.assertEquals(len(tasks), 2)
1832        verify_task, pending_task = tasks
1833        self._check_verify_task(verify_task)
1834        self._check_pending_task(pending_task)
1835
1836
1837    def test_run_synchronous_skip_verify(self):
1838        job = self._create_job(hosts=[1, 2], synchronous=True)
1839        job.run_verify = False
1840        job.save()
1841
1842        tasks = self._test_run_helper()
1843        self.assertEquals(len(tasks), 1)
1844        self._check_pending_task(tasks[0])
1845
1846
1847    def test_run_synchronous_ready(self):
1848        self._create_job(hosts=[1, 2], synchronous=True)
1849        self._update_hqe("status='Pending', execution_subdir=''")
1850        self._setup_directory_expects('group0')
1851
1852        tasks = self._test_run_helper(expect_starting=True)
1853        self.assertEquals(len(tasks), 1)
1854        queue_task = tasks[0]
1855
1856        self.assert_(isinstance(queue_task, monitor_db.QueueTask))
1857        self.assertEquals(queue_task.job.id, 1)
1858        hqe_ids = [hqe.id for hqe in queue_task.queue_entries]
1859        self.assertEquals(hqe_ids, [1, 2])
1860
1861
1862    def test_run_synchronous_atomic_group_ready(self):
1863        self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True)
1864        self._update_hqe("status='Pending', execution_subdir=''")
1865
1866        tasks = self._test_run_helper(expect_starting=True)
1867        self.assertEquals(len(tasks), 1)
1868        queue_task = tasks[0]
1869
1870        self.assert_(isinstance(queue_task, monitor_db.QueueTask))
1871        # Atomic group jobs that do not a specific label in the atomic group
1872        # will use the atomic group name as their group name.
1873        self.assertEquals(queue_task.group_name, 'atomic1')
1874
1875
1876    def test_run_synchronous_atomic_group_with_label_ready(self):
1877        job = self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True)
1878        job.dependency_labels.add(self.label4)
1879        self._update_hqe("status='Pending', execution_subdir=''")
1880
1881        tasks = self._test_run_helper(expect_starting=True)
1882        self.assertEquals(len(tasks), 1)
1883        queue_task = tasks[0]
1884
1885        self.assert_(isinstance(queue_task, monitor_db.QueueTask))
1886        # Atomic group jobs that also specify a label in the atomic group
1887        # will use the label name as their group name.
1888        self.assertEquals(queue_task.group_name, 'label4')
1889
1890
1891    def test_reboot_before_always(self):
1892        job = self._create_job(hosts=[1])
1893        job.reboot_before = models.RebootBefore.ALWAYS
1894        job.save()
1895
1896        tasks = self._test_run_helper()
1897        self.assertEquals(len(tasks), 3)
1898        cleanup_task = tasks[0]
1899        self.assert_(isinstance(cleanup_task, monitor_db.CleanupTask))
1900        self.assertEquals(cleanup_task.host.id, 1)
1901
1902
1903    def _test_reboot_before_if_dirty_helper(self, expect_reboot):
1904        job = self._create_job(hosts=[1])
1905        job.reboot_before = models.RebootBefore.IF_DIRTY
1906        job.save()
1907
1908        tasks = self._test_run_helper()
1909        self.assertEquals(len(tasks), expect_reboot and 3 or 2)
1910        if expect_reboot:
1911            cleanup_task = tasks[0]
1912            self.assert_(isinstance(cleanup_task, monitor_db.CleanupTask))
1913            self.assertEquals(cleanup_task.host.id, 1)
1914
1915    def test_reboot_before_if_dirty(self):
1916        models.Host.smart_get(1).update_object(dirty=True)
1917        self._test_reboot_before_if_dirty_helper(True)
1918
1919
1920    def test_reboot_before_not_dirty(self):
1921        models.Host.smart_get(1).update_object(dirty=False)
1922        self._test_reboot_before_if_dirty_helper(False)
1923
1924
1925    def test_next_group_name(self):
1926        django_job = self._create_job(metahosts=[1])
1927        job = monitor_db.Job(id=django_job.id)
1928        self.assertEqual('group0', job._next_group_name())
1929
1930        for hqe in django_job.hostqueueentry_set.filter():
1931            hqe.execution_subdir = 'my_rack.group0'
1932            hqe.save()
1933        self.assertEqual('my_rack.group1', job._next_group_name('my/rack'))
1934
1935
1936class TopLevelFunctionsTest(unittest.TestCase):
1937    def test_autoserv_command_line(self):
1938        machines = 'abcd12,efgh34'
1939        results_dir = '/fake/path'
1940        extra_args = ['-Z', 'hello']
1941        expected_command_line = [monitor_db._autoserv_path, '-p',
1942                                 '-m', machines, '-r', results_dir]
1943
1944        command_line = monitor_db._autoserv_command_line(
1945                machines, results_dir, extra_args)
1946        self.assertEqual(expected_command_line + extra_args, command_line)
1947
1948        class FakeJob(object):
1949            owner = 'Bob'
1950            name = 'fake job name'
1951
1952        command_line = monitor_db._autoserv_command_line(
1953                machines, results_dir, extra_args=[], job=FakeJob())
1954        self.assertEqual(expected_command_line +
1955                         ['-u', FakeJob.owner, '-l', FakeJob.name],
1956                         command_line)
1957
1958
1959if __name__ == '__main__':
1960    unittest.main()
1961