monitor_db_unittest.py revision 5e2bb4aa28611aaacaa8798fd07943ede1df46c6
1#!/usr/bin/python
2#pylint: disable-msg=C0111
3
4import gc, time
5import common
6from autotest_lib.frontend import setup_django_environment
7from autotest_lib.frontend.afe import frontend_test_utils
8from autotest_lib.client.common_lib.test_utils import mock
9from autotest_lib.client.common_lib.test_utils import unittest
10from autotest_lib.database import database_connection
11from autotest_lib.frontend.afe import models
12from autotest_lib.scheduler import agent_task
13from autotest_lib.scheduler import monitor_db, drone_manager, email_manager
14from autotest_lib.scheduler import pidfile_monitor
15from autotest_lib.scheduler import scheduler_config, gc_stats, host_scheduler
16from autotest_lib.scheduler import monitor_db_functional_test
17from autotest_lib.scheduler import scheduler_models
18
19_DEBUG = False
20
21
22class DummyAgentTask(object):
23    num_processes = 1
24    owner_username = 'my_user'
25
26    def get_drone_hostnames_allowed(self):
27        return None
28
29
30class DummyAgent(object):
31    started = False
32    _is_done = False
33    host_ids = ()
34    queue_entry_ids = ()
35
36    def __init__(self):
37        self.task = DummyAgentTask()
38
39
40    def tick(self):
41        self.started = True
42
43
44    def is_done(self):
45        return self._is_done
46
47
48    def set_done(self, done):
49        self._is_done = done
50
51
52class IsRow(mock.argument_comparator):
53    def __init__(self, row_id):
54        self.row_id = row_id
55
56
57    def is_satisfied_by(self, parameter):
58        return list(parameter)[0] == self.row_id
59
60
61    def __str__(self):
62        return 'row with id %s' % self.row_id
63
64
65class IsAgentWithTask(mock.argument_comparator):
66    def __init__(self, task):
67        self._task = task
68
69
70    def is_satisfied_by(self, parameter):
71        if not isinstance(parameter, monitor_db.Agent):
72            return False
73        tasks = list(parameter.queue.queue)
74        if len(tasks) != 1:
75            return False
76        return tasks[0] == self._task
77
78
79def _set_host_and_qe_ids(agent_or_task, id_list=None):
80    if id_list is None:
81        id_list = []
82    agent_or_task.host_ids = agent_or_task.queue_entry_ids = id_list
83
84
85class BaseSchedulerTest(unittest.TestCase,
86                        frontend_test_utils.FrontendTestMixin):
87    _config_section = 'AUTOTEST_WEB'
88
89    def _do_query(self, sql):
90        self._database.execute(sql)
91
92
93    def _set_monitor_stubs(self):
94        # Clear the instance cache as this is a brand new database.
95        scheduler_models.DBObject._clear_instance_cache()
96
97        self._database = (
98            database_connection.TranslatingDatabase.get_test_database(
99                translators=monitor_db_functional_test._DB_TRANSLATORS))
100        self._database.connect(db_type='django')
101        self._database.debug = _DEBUG
102
103        self.god.stub_with(monitor_db, '_db', self._database)
104        self.god.stub_with(monitor_db.BaseDispatcher,
105                           '_get_pending_queue_entries',
106                           self._get_pending_hqes)
107        self.god.stub_with(scheduler_models, '_db', self._database)
108        self.god.stub_with(drone_manager.instance(), '_results_dir',
109                           '/test/path')
110        self.god.stub_with(drone_manager.instance(), '_temporary_directory',
111                           '/test/path/tmp')
112
113        monitor_db.initialize_globals()
114        scheduler_models.initialize_globals()
115
116
117    def setUp(self):
118        self._frontend_common_setup()
119        self._set_monitor_stubs()
120        self._dispatcher = monitor_db.Dispatcher()
121
122
123    def tearDown(self):
124        self._database.disconnect()
125        self._frontend_common_teardown()
126
127
128    def _update_hqe(self, set, where=''):
129        query = 'UPDATE afe_host_queue_entries SET ' + set
130        if where:
131            query += ' WHERE ' + where
132        self._do_query(query)
133
134
135    def _get_pending_hqes(self):
136        query_string=('afe_jobs.priority DESC, '
137                      'ifnull(nullif(host_id, NULL), host_id) DESC, '
138                      'ifnull(nullif(meta_host, NULL), meta_host) DESC, '
139                      'job_id')
140        return list(scheduler_models.HostQueueEntry.fetch(
141            joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
142            where='NOT complete AND NOT active AND status="Queued"',
143            order_by=query_string))
144
145
146class DispatcherSchedulingTest(BaseSchedulerTest):
147    _jobs_scheduled = []
148
149
150    def tearDown(self):
151        super(DispatcherSchedulingTest, self).tearDown()
152
153
154    def _set_monitor_stubs(self):
155        super(DispatcherSchedulingTest, self)._set_monitor_stubs()
156
157        def hqe__do_schedule_pre_job_tasks_stub(queue_entry):
158            """Called by HostQueueEntry.run()."""
159            self._record_job_scheduled(queue_entry.job.id, queue_entry.host.id)
160            queue_entry.set_status('Starting')
161
162        self.god.stub_with(scheduler_models.HostQueueEntry,
163                           '_do_schedule_pre_job_tasks',
164                           hqe__do_schedule_pre_job_tasks_stub)
165
166
167    def _record_job_scheduled(self, job_id, host_id):
168        record = (job_id, host_id)
169        self.assert_(record not in self._jobs_scheduled,
170                     'Job %d scheduled on host %d twice' %
171                     (job_id, host_id))
172        self._jobs_scheduled.append(record)
173
174
175    def _assert_job_scheduled_on(self, job_id, host_id):
176        record = (job_id, host_id)
177        self.assert_(record in self._jobs_scheduled,
178                     'Job %d not scheduled on host %d as expected\n'
179                     'Jobs scheduled: %s' %
180                     (job_id, host_id, self._jobs_scheduled))
181        self._jobs_scheduled.remove(record)
182
183
184    def _assert_job_scheduled_on_number_of(self, job_id, host_ids, number):
185        """Assert job was scheduled on exactly number hosts out of a set."""
186        found = []
187        for host_id in host_ids:
188            record = (job_id, host_id)
189            if record in self._jobs_scheduled:
190                found.append(record)
191                self._jobs_scheduled.remove(record)
192        if len(found) < number:
193            self.fail('Job %d scheduled on fewer than %d hosts in %s.\n'
194                      'Jobs scheduled: %s' % (job_id, number, host_ids, found))
195        elif len(found) > number:
196            self.fail('Job %d scheduled on more than %d hosts in %s.\n'
197                      'Jobs scheduled: %s' % (job_id, number, host_ids, found))
198
199
200    def _check_for_extra_schedulings(self):
201        if len(self._jobs_scheduled) != 0:
202            self.fail('Extra jobs scheduled: ' +
203                      str(self._jobs_scheduled))
204
205
206    def _convert_jobs_to_metahosts(self, *job_ids):
207        sql_tuple = '(' + ','.join(str(i) for i in job_ids) + ')'
208        self._do_query('UPDATE afe_host_queue_entries SET '
209                       'meta_host=host_id, host_id=NULL '
210                       'WHERE job_id IN ' + sql_tuple)
211
212
213    def _lock_host(self, host_id):
214        self._do_query('UPDATE afe_hosts SET locked=1 WHERE id=' +
215                       str(host_id))
216
217
218    def setUp(self):
219        super(DispatcherSchedulingTest, self).setUp()
220        self._jobs_scheduled = []
221
222
223    def _run_scheduler(self):
224        for _ in xrange(2): # metahost scheduling can take two cycles
225            self._dispatcher._schedule_new_jobs()
226
227
228    def _test_basic_scheduling_helper(self, use_metahosts):
229        'Basic nonmetahost scheduling'
230        self._create_job_simple([1], use_metahosts)
231        self._create_job_simple([2], use_metahosts)
232        self._run_scheduler()
233        self._assert_job_scheduled_on(1, 1)
234        self._assert_job_scheduled_on(2, 2)
235        self._check_for_extra_schedulings()
236
237
238    def _test_priorities_helper(self, use_metahosts):
239        'Test prioritization ordering'
240        self._create_job_simple([1], use_metahosts)
241        self._create_job_simple([2], use_metahosts)
242        self._create_job_simple([1,2], use_metahosts)
243        self._create_job_simple([1], use_metahosts, priority=1)
244        self._run_scheduler()
245        self._assert_job_scheduled_on(4, 1) # higher priority
246        self._assert_job_scheduled_on(2, 2) # earlier job over later
247        self._check_for_extra_schedulings()
248
249
250    def _test_hosts_ready_helper(self, use_metahosts):
251        """
252        Only hosts that are status=Ready, unlocked and not invalid get
253        scheduled.
254        """
255        self._create_job_simple([1], use_metahosts)
256        self._do_query('UPDATE afe_hosts SET status="Running" WHERE id=1')
257        self._run_scheduler()
258        self._check_for_extra_schedulings()
259
260        self._do_query('UPDATE afe_hosts SET status="Ready", locked=1 '
261                       'WHERE id=1')
262        self._run_scheduler()
263        self._check_for_extra_schedulings()
264
265        self._do_query('UPDATE afe_hosts SET locked=0, invalid=1 '
266                       'WHERE id=1')
267        self._run_scheduler()
268        if not use_metahosts:
269            self._assert_job_scheduled_on(1, 1)
270        self._check_for_extra_schedulings()
271
272
273    def _test_hosts_idle_helper(self, use_metahosts):
274        'Only idle hosts get scheduled'
275        self._create_job(hosts=[1], active=True)
276        self._create_job_simple([1], use_metahosts)
277        self._run_scheduler()
278        self._check_for_extra_schedulings()
279
280
281    def _test_obey_ACLs_helper(self, use_metahosts):
282        self._do_query('DELETE FROM afe_acl_groups_hosts WHERE host_id=1')
283        self._create_job_simple([1], use_metahosts)
284        self._run_scheduler()
285        self._check_for_extra_schedulings()
286
287
288    def test_basic_scheduling(self):
289        self._test_basic_scheduling_helper(False)
290
291
292    def test_priorities(self):
293        self._test_priorities_helper(False)
294
295
296    def test_hosts_ready(self):
297        self._test_hosts_ready_helper(False)
298
299
300    def test_hosts_idle(self):
301        self._test_hosts_idle_helper(False)
302
303
304    def test_obey_ACLs(self):
305        self._test_obey_ACLs_helper(False)
306
307
308    def test_one_time_hosts_ignore_ACLs(self):
309        self._do_query('DELETE FROM afe_acl_groups_hosts WHERE host_id=1')
310        self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=1')
311        self._create_job_simple([1])
312        self._run_scheduler()
313        self._assert_job_scheduled_on(1, 1)
314        self._check_for_extra_schedulings()
315
316
317    def test_non_metahost_on_invalid_host(self):
318        """
319        Non-metahost entries can get scheduled on invalid hosts (this is how
320        one-time hosts work).
321        """
322        self._do_query('UPDATE afe_hosts SET invalid=1')
323        self._test_basic_scheduling_helper(False)
324
325
326    def test_metahost_scheduling(self):
327        """
328        Basic metahost scheduling
329        """
330        self._test_basic_scheduling_helper(True)
331
332
333    def test_metahost_priorities(self):
334        self._test_priorities_helper(True)
335
336
337    def test_metahost_hosts_ready(self):
338        self._test_hosts_ready_helper(True)
339
340
341    def test_metahost_hosts_idle(self):
342        self._test_hosts_idle_helper(True)
343
344
345    def test_metahost_obey_ACLs(self):
346        self._test_obey_ACLs_helper(True)
347
348
349    def _setup_test_only_if_needed_labels(self):
350        # apply only_if_needed label3 to host1
351        models.Host.smart_get('host1').labels.add(self.label3)
352        return self._create_job_simple([1], use_metahost=True)
353
354
355    def test_only_if_needed_labels_avoids_host(self):
356        job = self._setup_test_only_if_needed_labels()
357        # if the job doesn't depend on label3, there should be no scheduling
358        self._run_scheduler()
359        self._check_for_extra_schedulings()
360
361
362    def test_only_if_needed_labels_schedules(self):
363        job = self._setup_test_only_if_needed_labels()
364        job.dependency_labels.add(self.label3)
365        self._run_scheduler()
366        self._assert_job_scheduled_on(1, 1)
367        self._check_for_extra_schedulings()
368
369
370    def test_only_if_needed_labels_via_metahost(self):
371        job = self._setup_test_only_if_needed_labels()
372        job.dependency_labels.add(self.label3)
373        # should also work if the metahost is the only_if_needed label
374        self._do_query('DELETE FROM afe_jobs_dependency_labels')
375        self._create_job(metahosts=[3])
376        self._run_scheduler()
377        self._assert_job_scheduled_on(2, 1)
378        self._check_for_extra_schedulings()
379
380
381    def test_nonmetahost_over_metahost(self):
382        """
383        Non-metahost entries should take priority over metahost entries
384        for the same host
385        """
386        self._create_job(metahosts=[1])
387        self._create_job(hosts=[1])
388        self._run_scheduler()
389        self._assert_job_scheduled_on(2, 1)
390        self._check_for_extra_schedulings()
391
392
393    def test_metahosts_obey_blocks(self):
394        """
395        Metahosts can't get scheduled on hosts already scheduled for
396        that job.
397        """
398        self._create_job(metahosts=[1], hosts=[1])
399        # make the nonmetahost entry complete, so the metahost can try
400        # to get scheduled
401        self._update_hqe(set='complete = 1', where='host_id=1')
402        self._run_scheduler()
403        self._check_for_extra_schedulings()
404
405
406    # TODO(gps): These should probably live in their own TestCase class
407    # specific to testing HostScheduler methods directly.  It was convenient
408    # to put it here for now to share existing test environment setup code.
409    def test_HostScheduler_check_atomic_group_labels(self):
410        normal_job = self._create_job(metahosts=[0])
411        atomic_job = self._create_job(atomic_group=1)
412        # Indirectly initialize the internal state of the host scheduler.
413        self._dispatcher._refresh_pending_queue_entries()
414
415        atomic_hqe = scheduler_models.HostQueueEntry.fetch(where='job_id=%d' %
416                                                     atomic_job.id)[0]
417        normal_hqe = scheduler_models.HostQueueEntry.fetch(where='job_id=%d' %
418                                                     normal_job.id)[0]
419
420        host_scheduler = self._dispatcher._host_scheduler
421        self.assertTrue(host_scheduler._check_atomic_group_labels(
422                [self.label4.id], atomic_hqe))
423        self.assertFalse(host_scheduler._check_atomic_group_labels(
424                [self.label4.id], normal_hqe))
425        self.assertFalse(host_scheduler._check_atomic_group_labels(
426                [self.label5.id, self.label6.id, self.label7.id], normal_hqe))
427        self.assertTrue(host_scheduler._check_atomic_group_labels(
428                [self.label4.id, self.label6.id], atomic_hqe))
429        self.assertTrue(host_scheduler._check_atomic_group_labels(
430                        [self.label4.id, self.label5.id],
431                        atomic_hqe))
432
433#    TODO: Revive this test.
434#    def test_HostScheduler_get_host_atomic_group_id(self):
435#        job = self._create_job(metahosts=[self.label6.id])
436#        queue_entry = scheduler_models.HostQueueEntry.fetch(
437#                where='job_id=%d' % job.id)[0]
438#        # Indirectly initialize the internal state of the host scheduler.
439#        self._dispatcher._refresh_pending_queue_entries()
440#
441#        # Test the host scheduler
442#        host_scheduler = self._dispatcher._host_scheduler
443#
444#
445#        # Two labels each in a different atomic group.  This should log an
446#        # error and continue.
447#        orig_logging_error = logging.error
448#        def mock_logging_error(message, *args):
449#            mock_logging_error._num_calls += 1
450#            # Test the logging call itself, we just wrapped it to count it.
451#            orig_logging_error(message, *args)
452#        mock_logging_error._num_calls = 0
453#        self.god.stub_with(logging, 'error', mock_logging_error)
454#        host_scheduler.refresh([])
455#        self.assertNotEquals(None, host_scheduler._get_host_atomic_group_id(
456#                [self.label4.id, self.label8.id], queue_entry))
457#        self.assertTrue(mock_logging_error._num_calls > 0)
458#        self.god.unstub(logging, 'error')
459#
460#        # Two labels both in the same atomic group, this should not raise an
461#        # error, it will merely cause the job to schedule on the intersection.
462#        self.assertEquals(1, host_scheduler._get_host_atomic_group_id(
463#                [self.label4.id, self.label5.id]))
464#
465#        self.assertEquals(None, host_scheduler._get_host_atomic_group_id([]))
466#        self.assertEquals(None, host_scheduler._get_host_atomic_group_id(
467#                [self.label3.id, self.label7.id, self.label6.id]))
468#        self.assertEquals(1, host_scheduler._get_host_atomic_group_id(
469#                [self.label4.id, self.label7.id, self.label6.id]))
470#        self.assertEquals(1, host_scheduler._get_host_atomic_group_id(
471#                [self.label7.id, self.label5.id]))
472
473
474    def test_atomic_group_hosts_blocked_from_non_atomic_jobs(self):
475        # Create a job scheduled to run on label6.
476        self._create_job(metahosts=[self.label6.id])
477        self._run_scheduler()
478        # label6 only has hosts that are in atomic groups associated with it,
479        # there should be no scheduling.
480        self._check_for_extra_schedulings()
481
482
483    def test_atomic_group_hosts_blocked_from_non_atomic_jobs_explicit(self):
484        # Create a job scheduled to run on label5.  This is an atomic group
485        # label but this job does not request atomic group scheduling.
486        self._create_job(metahosts=[self.label5.id])
487        self._run_scheduler()
488        # label6 only has hosts that are in atomic groups associated with it,
489        # there should be no scheduling.
490        self._check_for_extra_schedulings()
491
492
493    def test_atomic_group_scheduling_basics(self):
494        # Create jobs scheduled to run on an atomic group.
495        job_a = self._create_job(synchronous=True, metahosts=[self.label4.id],
496                         atomic_group=1)
497        job_b = self._create_job(synchronous=True, metahosts=[self.label5.id],
498                         atomic_group=1)
499        self._run_scheduler()
500        # atomic_group.max_number_of_machines was 2 so we should run on 2.
501        self._assert_job_scheduled_on_number_of(job_a.id, (5, 6, 7), 2)
502        self._assert_job_scheduled_on(job_b.id, 8)  # label5
503        self._assert_job_scheduled_on(job_b.id, 9)  # label5
504        self._check_for_extra_schedulings()
505
506        # The three host label4 atomic group still has one host available.
507        # That means a job with a synch_count of 1 asking to be scheduled on
508        # the atomic group can still use the final machine.
509        #
510        # This may seem like a somewhat odd use case.  It allows the use of an
511        # atomic group as a set of machines to run smaller jobs within (a set
512        # of hosts configured for use in network tests with eachother perhaps?)
513        onehost_job = self._create_job(atomic_group=1)
514        self._run_scheduler()
515        self._assert_job_scheduled_on_number_of(onehost_job.id, (5, 6, 7), 1)
516        self._check_for_extra_schedulings()
517
518        # No more atomic groups have hosts available, no more jobs should
519        # be scheduled.
520        self._create_job(atomic_group=1)
521        self._run_scheduler()
522        self._check_for_extra_schedulings()
523
524
525    def test_atomic_group_scheduling_obeys_acls(self):
526        # Request scheduling on a specific atomic label but be denied by ACLs.
527        self._do_query('DELETE FROM afe_acl_groups_hosts '
528                       'WHERE host_id in (8,9)')
529        job = self._create_job(metahosts=[self.label5.id], atomic_group=1)
530        self._run_scheduler()
531        self._check_for_extra_schedulings()
532
533
534    def test_atomic_group_scheduling_dependency_label_exclude(self):
535        # A dependency label that matches no hosts in the atomic group.
536        job_a = self._create_job(atomic_group=1)
537        job_a.dependency_labels.add(self.label3)
538        self._run_scheduler()
539        self._check_for_extra_schedulings()
540
541
542    def test_atomic_group_scheduling_metahost_dependency_label_exclude(self):
543        # A metahost and dependency label that excludes too many hosts.
544        job_b = self._create_job(synchronous=True, metahosts=[self.label4.id],
545                                 atomic_group=1)
546        job_b.dependency_labels.add(self.label7)
547        self._run_scheduler()
548        self._check_for_extra_schedulings()
549
550
551    def test_atomic_group_scheduling_dependency_label_match(self):
552        # A dependency label that exists on enough atomic group hosts in only
553        # one of the two atomic group labels.
554        job_c = self._create_job(synchronous=True, atomic_group=1)
555        job_c.dependency_labels.add(self.label7)
556        self._run_scheduler()
557        self._assert_job_scheduled_on_number_of(job_c.id, (8, 9), 2)
558        self._check_for_extra_schedulings()
559
560
561    def test_atomic_group_scheduling_no_metahost(self):
562        # Force it to schedule on the other group for a reliable test.
563        self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=9')
564        # An atomic job without a metahost.
565        job = self._create_job(synchronous=True, atomic_group=1)
566        self._run_scheduler()
567        self._assert_job_scheduled_on_number_of(job.id, (5, 6, 7), 2)
568        self._check_for_extra_schedulings()
569
570
571    def test_atomic_group_scheduling_partial_group(self):
572        # Make one host in labels[3] unavailable so that there are only two
573        # hosts left in the group.
574        self._do_query('UPDATE afe_hosts SET status="Repair Failed" WHERE id=5')
575        job = self._create_job(synchronous=True, metahosts=[self.label4.id],
576                         atomic_group=1)
577        self._run_scheduler()
578        # Verify that it was scheduled on the 2 ready hosts in that group.
579        self._assert_job_scheduled_on(job.id, 6)
580        self._assert_job_scheduled_on(job.id, 7)
581        self._check_for_extra_schedulings()
582
583
584    def test_atomic_group_scheduling_not_enough_available(self):
585        # Mark some hosts in each atomic group label as not usable.
586        # One host running, another invalid in the first group label.
587        self._do_query('UPDATE afe_hosts SET status="Running" WHERE id=5')
588        self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=6')
589        # One host invalid in the second group label.
590        self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=9')
591        # Nothing to schedule when no group label has enough (2) good hosts..
592        self._create_job(atomic_group=1, synchronous=True)
593        self._run_scheduler()
594        # There are not enough hosts in either atomic group,
595        # No more scheduling should occur.
596        self._check_for_extra_schedulings()
597
598        # Now create an atomic job that has a synch count of 1.  It should
599        # schedule on exactly one of the hosts.
600        onehost_job = self._create_job(atomic_group=1)
601        self._run_scheduler()
602        self._assert_job_scheduled_on_number_of(onehost_job.id, (7, 8), 1)
603
604
605    def test_atomic_group_scheduling_no_valid_hosts(self):
606        self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id in (8,9)')
607        self._create_job(synchronous=True, metahosts=[self.label5.id],
608                         atomic_group=1)
609        self._run_scheduler()
610        # no hosts in the selected group and label are valid.  no schedulings.
611        self._check_for_extra_schedulings()
612
613
614    def test_atomic_group_scheduling_metahost_works(self):
615        # Test that atomic group scheduling also obeys metahosts.
616        self._create_job(metahosts=[0], atomic_group=1)
617        self._run_scheduler()
618        # There are no atomic group hosts that also have that metahost.
619        self._check_for_extra_schedulings()
620
621        job_b = self._create_job(metahosts=[self.label5.id], atomic_group=1)
622        self._run_scheduler()
623        self._assert_job_scheduled_on(job_b.id, 8)
624        self._assert_job_scheduled_on(job_b.id, 9)
625        self._check_for_extra_schedulings()
626
627
628    def test_atomic_group_skips_ineligible_hosts(self):
629        # Test hosts marked ineligible for this job are not eligible.
630        # How would this ever happen anyways?
631        job = self._create_job(metahosts=[self.label4.id], atomic_group=1)
632        models.IneligibleHostQueue.objects.create(job=job, host_id=5)
633        models.IneligibleHostQueue.objects.create(job=job, host_id=6)
634        models.IneligibleHostQueue.objects.create(job=job, host_id=7)
635        self._run_scheduler()
636        # No scheduling should occur as all desired hosts were ineligible.
637        self._check_for_extra_schedulings()
638
639
640    def test_atomic_group_scheduling_fail(self):
641        # If synch_count is > the atomic group number of machines, the job
642        # should be aborted immediately.
643        model_job = self._create_job(synchronous=True, atomic_group=1)
644        model_job.synch_count = 4
645        model_job.save()
646        job = scheduler_models.Job(id=model_job.id)
647        self._run_scheduler()
648        self._check_for_extra_schedulings()
649        queue_entries = job.get_host_queue_entries()
650        self.assertEqual(1, len(queue_entries))
651        self.assertEqual(queue_entries[0].status,
652                         models.HostQueueEntry.Status.ABORTED)
653
654
655    def test_atomic_group_no_labels_no_scheduling(self):
656        # Never schedule on atomic groups marked invalid.
657        job = self._create_job(metahosts=[self.label5.id], synchronous=True,
658                               atomic_group=1)
659        # Deleting an atomic group via the frontend marks it invalid and
660        # removes all label references to the group.  The job now references
661        # an invalid atomic group with no labels associated with it.
662        self.label5.atomic_group.invalid = True
663        self.label5.atomic_group.save()
664        self.label5.atomic_group = None
665        self.label5.save()
666
667        self._run_scheduler()
668        self._check_for_extra_schedulings()
669
670
671    def test_schedule_directly_on_atomic_group_host_fail(self):
672        # Scheduling a job directly on hosts in an atomic group must
673        # fail to avoid users inadvertently holding up the use of an
674        # entire atomic group by using the machines individually.
675        job = self._create_job(hosts=[5])
676        self._run_scheduler()
677        self._check_for_extra_schedulings()
678
679
680    def test_schedule_directly_on_atomic_group_host(self):
681        # Scheduling a job directly on one host in an atomic group will
682        # work when the atomic group is listed on the HQE in addition
683        # to the host (assuming the sync count is 1).
684        job = self._create_job(hosts=[5], atomic_group=1)
685        self._run_scheduler()
686        self._assert_job_scheduled_on(job.id, 5)
687        self._check_for_extra_schedulings()
688
689
690    def test_schedule_directly_on_atomic_group_hosts_sync2(self):
691        job = self._create_job(hosts=[5,8], atomic_group=1, synchronous=True)
692        self._run_scheduler()
693        self._assert_job_scheduled_on(job.id, 5)
694        self._assert_job_scheduled_on(job.id, 8)
695        self._check_for_extra_schedulings()
696
697
698    def test_schedule_directly_on_atomic_group_hosts_wrong_group(self):
699        job = self._create_job(hosts=[5,8], atomic_group=2, synchronous=True)
700        self._run_scheduler()
701        self._check_for_extra_schedulings()
702
703
704    def test_only_schedule_queued_entries(self):
705        self._create_job(metahosts=[1])
706        self._update_hqe(set='active=1, host_id=2')
707        self._run_scheduler()
708        self._check_for_extra_schedulings()
709
710
711    def test_no_ready_hosts(self):
712        self._create_job(hosts=[1])
713        self._do_query('UPDATE afe_hosts SET status="Repair Failed"')
714        self._run_scheduler()
715        self._check_for_extra_schedulings()
716
717
718    def test_garbage_collection(self):
719        self.god.stub_with(self._dispatcher, '_seconds_between_garbage_stats',
720                           999999)
721        self.god.stub_function(gc, 'collect')
722        self.god.stub_function(gc_stats, '_log_garbage_collector_stats')
723        gc.collect.expect_call().and_return(0)
724        gc_stats._log_garbage_collector_stats.expect_call()
725        # Force a garbage collection run
726        self._dispatcher._last_garbage_stats_time = 0
727        self._dispatcher._garbage_collection()
728        # The previous call should have reset the time, it won't do anything
729        # the second time.  If it does, we'll get an unexpected call.
730        self._dispatcher._garbage_collection()
731
732
733
734class DispatcherThrottlingTest(BaseSchedulerTest):
735    """
736    Test that the dispatcher throttles:
737     * total number of running processes
738     * number of processes started per cycle
739    """
740    _MAX_RUNNING = 3
741    _MAX_STARTED = 2
742
743    def setUp(self):
744        super(DispatcherThrottlingTest, self).setUp()
745        scheduler_config.config.max_processes_per_drone = self._MAX_RUNNING
746        scheduler_config.config.max_processes_started_per_cycle = (
747            self._MAX_STARTED)
748
749        def fake_max_runnable_processes(fake_self, username,
750                                        drone_hostnames_allowed):
751            running = sum(agent.task.num_processes
752                          for agent in self._agents
753                          if agent.started and not agent.is_done())
754            return self._MAX_RUNNING - running
755        self.god.stub_with(drone_manager.DroneManager, 'max_runnable_processes',
756                           fake_max_runnable_processes)
757
758
759    def _setup_some_agents(self, num_agents):
760        self._agents = [DummyAgent() for i in xrange(num_agents)]
761        self._dispatcher._agents = list(self._agents)
762
763
764    def _run_a_few_cycles(self):
765        for i in xrange(4):
766            self._dispatcher._handle_agents()
767
768
769    def _assert_agents_started(self, indexes, is_started=True):
770        for i in indexes:
771            self.assert_(self._agents[i].started == is_started,
772                         'Agent %d %sstarted' %
773                         (i, is_started and 'not ' or ''))
774
775
776    def _assert_agents_not_started(self, indexes):
777        self._assert_agents_started(indexes, False)
778
779
780    def test_throttle_total(self):
781        self._setup_some_agents(4)
782        self._run_a_few_cycles()
783        self._assert_agents_started([0, 1, 2])
784        self._assert_agents_not_started([3])
785
786
787    def test_throttle_per_cycle(self):
788        self._setup_some_agents(3)
789        self._dispatcher._handle_agents()
790        self._assert_agents_started([0, 1])
791        self._assert_agents_not_started([2])
792
793
794    def test_throttle_with_synchronous(self):
795        self._setup_some_agents(2)
796        self._agents[0].task.num_processes = 3
797        self._run_a_few_cycles()
798        self._assert_agents_started([0])
799        self._assert_agents_not_started([1])
800
801
802    def test_large_agent_starvation(self):
803        """
804        Ensure large agents don't get starved by lower-priority agents.
805        """
806        self._setup_some_agents(3)
807        self._agents[1].task.num_processes = 3
808        self._run_a_few_cycles()
809        self._assert_agents_started([0])
810        self._assert_agents_not_started([1, 2])
811
812        self._agents[0].set_done(True)
813        self._run_a_few_cycles()
814        self._assert_agents_started([1])
815        self._assert_agents_not_started([2])
816
817
818    def test_zero_process_agent(self):
819        self._setup_some_agents(5)
820        self._agents[4].task.num_processes = 0
821        self._run_a_few_cycles()
822        self._assert_agents_started([0, 1, 2, 4])
823        self._assert_agents_not_started([3])
824
825
826class PidfileRunMonitorTest(unittest.TestCase):
827    execution_tag = 'test_tag'
828    pid = 12345
829    process = drone_manager.Process('myhost', pid)
830    num_tests_failed = 1
831
832    def setUp(self):
833        self.god = mock.mock_god()
834        self.mock_drone_manager = self.god.create_mock_class(
835            drone_manager.DroneManager, 'drone_manager')
836        self.god.stub_with(pidfile_monitor, '_drone_manager',
837                           self.mock_drone_manager)
838        self.god.stub_function(email_manager.manager, 'enqueue_notify_email')
839        self.god.stub_with(pidfile_monitor, '_get_pidfile_timeout_secs',
840                           self._mock_get_pidfile_timeout_secs)
841
842        self.pidfile_id = object()
843
844        (self.mock_drone_manager.get_pidfile_id_from
845             .expect_call(self.execution_tag,
846                          pidfile_name=drone_manager.AUTOSERV_PID_FILE)
847             .and_return(self.pidfile_id))
848
849        self.monitor = pidfile_monitor.PidfileRunMonitor()
850        self.monitor.attach_to_existing_process(self.execution_tag)
851
852    def tearDown(self):
853        self.god.unstub_all()
854
855
856    def _mock_get_pidfile_timeout_secs(self):
857        return 300
858
859
860    def setup_pidfile(self, pid=None, exit_code=None, tests_failed=None,
861                      use_second_read=False):
862        contents = drone_manager.PidfileContents()
863        if pid is not None:
864            contents.process = drone_manager.Process('myhost', pid)
865        contents.exit_status = exit_code
866        contents.num_tests_failed = tests_failed
867        self.mock_drone_manager.get_pidfile_contents.expect_call(
868            self.pidfile_id, use_second_read=use_second_read).and_return(
869            contents)
870
871
872    def set_not_yet_run(self):
873        self.setup_pidfile()
874
875
876    def set_empty_pidfile(self):
877        self.setup_pidfile()
878
879
880    def set_running(self, use_second_read=False):
881        self.setup_pidfile(self.pid, use_second_read=use_second_read)
882
883
884    def set_complete(self, error_code, use_second_read=False):
885        self.setup_pidfile(self.pid, error_code, self.num_tests_failed,
886                           use_second_read=use_second_read)
887
888
889    def _check_monitor(self, expected_pid, expected_exit_status,
890                       expected_num_tests_failed):
891        if expected_pid is None:
892            self.assertEquals(self.monitor._state.process, None)
893        else:
894            self.assertEquals(self.monitor._state.process.pid, expected_pid)
895        self.assertEquals(self.monitor._state.exit_status, expected_exit_status)
896        self.assertEquals(self.monitor._state.num_tests_failed,
897                          expected_num_tests_failed)
898
899
900        self.god.check_playback()
901
902
903    def _test_read_pidfile_helper(self, expected_pid, expected_exit_status,
904                                  expected_num_tests_failed):
905        self.monitor._read_pidfile()
906        self._check_monitor(expected_pid, expected_exit_status,
907                            expected_num_tests_failed)
908
909
910    def _get_expected_tests_failed(self, expected_exit_status):
911        if expected_exit_status is None:
912            expected_tests_failed = None
913        else:
914            expected_tests_failed = self.num_tests_failed
915        return expected_tests_failed
916
917
918    def test_read_pidfile(self):
919        self.set_not_yet_run()
920        self._test_read_pidfile_helper(None, None, None)
921
922        self.set_empty_pidfile()
923        self._test_read_pidfile_helper(None, None, None)
924
925        self.set_running()
926        self._test_read_pidfile_helper(self.pid, None, None)
927
928        self.set_complete(123)
929        self._test_read_pidfile_helper(self.pid, 123, self.num_tests_failed)
930
931
932    def test_read_pidfile_error(self):
933        self.mock_drone_manager.get_pidfile_contents.expect_call(
934            self.pidfile_id, use_second_read=False).and_return(
935            drone_manager.InvalidPidfile('error'))
936        self.assertRaises(pidfile_monitor.PidfileRunMonitor._PidfileException,
937                          self.monitor._read_pidfile)
938        self.god.check_playback()
939
940
941    def setup_is_running(self, is_running):
942        self.mock_drone_manager.is_process_running.expect_call(
943            self.process).and_return(is_running)
944
945
946    def _test_get_pidfile_info_helper(self, expected_pid, expected_exit_status,
947                                      expected_num_tests_failed):
948        self.monitor._get_pidfile_info()
949        self._check_monitor(expected_pid, expected_exit_status,
950                            expected_num_tests_failed)
951
952
953    def test_get_pidfile_info(self):
954        """
955        normal cases for get_pidfile_info
956        """
957        # running
958        self.set_running()
959        self.setup_is_running(True)
960        self._test_get_pidfile_info_helper(self.pid, None, None)
961
962        # exited during check
963        self.set_running()
964        self.setup_is_running(False)
965        self.set_complete(123, use_second_read=True) # pidfile gets read again
966        self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed)
967
968        # completed
969        self.set_complete(123)
970        self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed)
971
972
973    def test_get_pidfile_info_running_no_proc(self):
974        """
975        pidfile shows process running, but no proc exists
976        """
977        # running but no proc
978        self.set_running()
979        self.setup_is_running(False)
980        self.set_running(use_second_read=True)
981        email_manager.manager.enqueue_notify_email.expect_call(
982            mock.is_string_comparator(), mock.is_string_comparator())
983        self._test_get_pidfile_info_helper(self.pid, 1, 0)
984        self.assertTrue(self.monitor.lost_process)
985
986
987    def test_get_pidfile_info_not_yet_run(self):
988        """
989        pidfile hasn't been written yet
990        """
991        self.set_not_yet_run()
992        self._test_get_pidfile_info_helper(None, None, None)
993
994
995    def test_process_failed_to_write_pidfile(self):
996        self.set_not_yet_run()
997        email_manager.manager.enqueue_notify_email.expect_call(
998            mock.is_string_comparator(), mock.is_string_comparator())
999        self.monitor._start_time = (time.time() -
1000                                    pidfile_monitor._get_pidfile_timeout_secs() - 1)
1001        self._test_get_pidfile_info_helper(None, 1, 0)
1002        self.assertTrue(self.monitor.lost_process)
1003
1004
1005class AgentTest(unittest.TestCase):
1006    def setUp(self):
1007        self.god = mock.mock_god()
1008        self._dispatcher = self.god.create_mock_class(monitor_db.Dispatcher,
1009                                                      'dispatcher')
1010
1011
1012    def tearDown(self):
1013        self.god.unstub_all()
1014
1015
1016    def _create_mock_task(self, name):
1017        task = self.god.create_mock_class(agent_task.AgentTask, name)
1018        task.num_processes = 1
1019        _set_host_and_qe_ids(task)
1020        return task
1021
1022    def _create_agent(self, task):
1023        agent = monitor_db.Agent(task)
1024        agent.dispatcher = self._dispatcher
1025        return agent
1026
1027
1028    def _finish_agent(self, agent):
1029        while not agent.is_done():
1030            agent.tick()
1031
1032
1033    def test_agent_abort(self):
1034        task = self._create_mock_task('task')
1035        task.poll.expect_call()
1036        task.is_done.expect_call().and_return(False)
1037        task.abort.expect_call()
1038        task.aborted = True
1039
1040        agent = self._create_agent(task)
1041        agent.tick()
1042        agent.abort()
1043        self._finish_agent(agent)
1044        self.god.check_playback()
1045
1046
1047    def _test_agent_abort_before_started_helper(self, ignore_abort=False):
1048        task = self._create_mock_task('task')
1049        task.abort.expect_call()
1050        if ignore_abort:
1051            task.aborted = False
1052            task.poll.expect_call()
1053            task.is_done.expect_call().and_return(True)
1054            task.success = True
1055        else:
1056            task.aborted = True
1057
1058        agent = self._create_agent(task)
1059        agent.abort()
1060        self._finish_agent(agent)
1061        self.god.check_playback()
1062
1063
1064    def test_agent_abort_before_started(self):
1065        self._test_agent_abort_before_started_helper()
1066        self._test_agent_abort_before_started_helper(True)
1067
1068
1069class JobSchedulingTest(BaseSchedulerTest):
1070    def _test_run_helper(self, expect_agent=True, expect_starting=False,
1071                         expect_pending=False):
1072        if expect_starting:
1073            expected_status = models.HostQueueEntry.Status.STARTING
1074        elif expect_pending:
1075            expected_status = models.HostQueueEntry.Status.PENDING
1076        else:
1077            expected_status = models.HostQueueEntry.Status.VERIFYING
1078        job = scheduler_models.Job.fetch('id = 1')[0]
1079        queue_entry = scheduler_models.HostQueueEntry.fetch('id = 1')[0]
1080        assert queue_entry.job is job
1081        job.run_if_ready(queue_entry)
1082
1083        self.god.check_playback()
1084
1085        self._dispatcher._schedule_delay_tasks()
1086        self._dispatcher._schedule_running_host_queue_entries()
1087        agent = self._dispatcher._agents[0]
1088
1089        actual_status = models.HostQueueEntry.smart_get(1).status
1090        self.assertEquals(expected_status, actual_status)
1091
1092        if not expect_agent:
1093            self.assertEquals(agent, None)
1094            return
1095
1096        self.assert_(isinstance(agent, monitor_db.Agent))
1097        self.assert_(agent.task)
1098        return agent.task
1099
1100
1101    def test_run_if_ready_delays(self):
1102        # Also tests Job.run_with_ready_delay() on atomic group jobs.
1103        django_job = self._create_job(hosts=[5, 6], atomic_group=1)
1104        job = scheduler_models.Job(django_job.id)
1105        self.assertEqual(1, job.synch_count)
1106        django_hqes = list(models.HostQueueEntry.objects.filter(job=job.id))
1107        self.assertEqual(2, len(django_hqes))
1108        self.assertEqual(2, django_hqes[0].atomic_group.max_number_of_machines)
1109
1110        def set_hqe_status(django_hqe, status):
1111            django_hqe.status = status
1112            django_hqe.save()
1113            scheduler_models.HostQueueEntry(django_hqe.id).host.set_status(status)
1114
1115        # An initial state, our synch_count is 1
1116        set_hqe_status(django_hqes[0], models.HostQueueEntry.Status.VERIFYING)
1117        set_hqe_status(django_hqes[1], models.HostQueueEntry.Status.PENDING)
1118
1119        # So that we don't depend on the config file value during the test.
1120        self.assert_(scheduler_config.config
1121                     .secs_to_wait_for_atomic_group_hosts is not None)
1122        self.god.stub_with(scheduler_config.config,
1123                           'secs_to_wait_for_atomic_group_hosts', 123456)
1124
1125        # Get the pending one as a scheduler_models.HostQueueEntry object.
1126        hqe = scheduler_models.HostQueueEntry(django_hqes[1].id)
1127        self.assert_(not job._delay_ready_task)
1128        self.assertTrue(job.is_ready())
1129
1130        # Ready with one pending, one verifying and an atomic group should
1131        # result in a DelayCallTask to re-check if we're ready a while later.
1132        job.run_if_ready(hqe)
1133        self.assertEquals('Waiting', hqe.status)
1134        self._dispatcher._schedule_delay_tasks()
1135        self.assertEquals('Pending', hqe.status)
1136        agent = self._dispatcher._agents[0]
1137        self.assert_(job._delay_ready_task)
1138        self.assert_(isinstance(agent, monitor_db.Agent))
1139        self.assert_(agent.task)
1140        delay_task = agent.task
1141        self.assert_(isinstance(delay_task, scheduler_models.DelayedCallTask))
1142        self.assert_(not delay_task.is_done())
1143
1144        self.god.stub_function(delay_task, 'abort')
1145
1146        self.god.stub_function(job, 'run')
1147
1148        self.god.stub_function(job, '_pending_count')
1149        self.god.stub_with(job, 'synch_count', 9)
1150        self.god.stub_function(job, 'request_abort')
1151
1152        # Test that the DelayedCallTask's callback queued up above does the
1153        # correct thing and does not call run if there are not enough hosts
1154        # in pending after the delay.
1155        job._pending_count.expect_call().and_return(0)
1156        job.request_abort.expect_call()
1157        delay_task._callback()
1158        self.god.check_playback()
1159
1160        # Test that the DelayedCallTask's callback queued up above does the
1161        # correct thing and returns the Agent returned by job.run() if
1162        # there are still enough hosts pending after the delay.
1163        job.synch_count = 4
1164        job._pending_count.expect_call().and_return(4)
1165        job.run.expect_call(hqe)
1166        delay_task._callback()
1167        self.god.check_playback()
1168
1169        job._pending_count.expect_call().and_return(4)
1170
1171        # Adjust the delay deadline so that enough time has passed.
1172        job._delay_ready_task.end_time = time.time() - 111111
1173        job.run.expect_call(hqe)
1174        # ...the delay_expired condition should cause us to call run()
1175        self._dispatcher._handle_agents()
1176        self.god.check_playback()
1177        delay_task.success = False
1178
1179        # Adjust the delay deadline back so that enough time has not passed.
1180        job._delay_ready_task.end_time = time.time() + 111111
1181        self._dispatcher._handle_agents()
1182        self.god.check_playback()
1183
1184        # Now max_number_of_machines HQEs are in pending state.  Remaining
1185        # delay will now be ignored.
1186        other_hqe = scheduler_models.HostQueueEntry(django_hqes[0].id)
1187        self.god.unstub(job, 'run')
1188        self.god.unstub(job, '_pending_count')
1189        self.god.unstub(job, 'synch_count')
1190        self.god.unstub(job, 'request_abort')
1191        # ...the over_max_threshold test should cause us to call run()
1192        delay_task.abort.expect_call()
1193        other_hqe.on_pending()
1194        self.assertEquals('Starting', other_hqe.status)
1195        self.assertEquals('Starting', hqe.status)
1196        self.god.stub_function(job, 'run')
1197        self.god.unstub(delay_task, 'abort')
1198
1199        hqe.set_status('Pending')
1200        other_hqe.set_status('Pending')
1201        # Now we're not over the max for the atomic group.  But all assigned
1202        # hosts are in pending state.  over_max_threshold should make us run().
1203        hqe.atomic_group.max_number_of_machines += 1
1204        hqe.atomic_group.save()
1205        job.run.expect_call(hqe)
1206        hqe.on_pending()
1207        self.god.check_playback()
1208        hqe.atomic_group.max_number_of_machines -= 1
1209        hqe.atomic_group.save()
1210
1211        other_hqe = scheduler_models.HostQueueEntry(django_hqes[0].id)
1212        self.assertTrue(hqe.job is other_hqe.job)
1213        # DBObject classes should reuse instances so these should be the same.
1214        self.assertEqual(job, other_hqe.job)
1215        self.assertEqual(other_hqe.job, hqe.job)
1216        # Be sure our delay was not lost during the other_hqe construction.
1217        self.assertEqual(job._delay_ready_task, delay_task)
1218        self.assert_(job._delay_ready_task)
1219        self.assertFalse(job._delay_ready_task.is_done())
1220        self.assertFalse(job._delay_ready_task.aborted)
1221
1222        # We want the real run() to be called below.
1223        self.god.unstub(job, 'run')
1224
1225        # We pass in the other HQE this time the same way it would happen
1226        # for real when one host finishes verifying and enters pending.
1227        job.run_if_ready(other_hqe)
1228
1229        # The delayed task must be aborted by the actual run() call above.
1230        self.assertTrue(job._delay_ready_task.aborted)
1231        self.assertFalse(job._delay_ready_task.success)
1232        self.assertTrue(job._delay_ready_task.is_done())
1233
1234        # Check that job run() and _finish_run() were called by the above:
1235        self._dispatcher._schedule_running_host_queue_entries()
1236        agent = self._dispatcher._agents[0]
1237        self.assert_(agent.task)
1238        task = agent.task
1239        self.assert_(isinstance(task, monitor_db.QueueTask))
1240        # Requery these hqes in order to verify the status from the DB.
1241        django_hqes = list(models.HostQueueEntry.objects.filter(job=job.id))
1242        for entry in django_hqes:
1243            self.assertEqual(models.HostQueueEntry.Status.STARTING,
1244                             entry.status)
1245
1246        # We're already running, but more calls to run_with_ready_delay can
1247        # continue to come in due to straggler hosts enter Pending.  Make
1248        # sure we don't do anything.
1249        self.god.stub_function(job, 'run')
1250        job.run_with_ready_delay(hqe)
1251        self.god.check_playback()
1252        self.god.unstub(job, 'run')
1253
1254
1255    def test_run_synchronous_atomic_group_ready(self):
1256        self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True)
1257        self._update_hqe("status='Pending', execution_subdir=''")
1258
1259        queue_task = self._test_run_helper(expect_starting=True)
1260
1261        self.assert_(isinstance(queue_task, monitor_db.QueueTask))
1262        # Atomic group jobs that do not depend on a specific label in the
1263        # atomic group will use the atomic group name as their group name.
1264        self.assertEquals(queue_task.queue_entries[0].get_group_name(),
1265                          'atomic1')
1266
1267
1268    def test_run_synchronous_atomic_group_with_label_ready(self):
1269        job = self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True)
1270        job.dependency_labels.add(self.label4)
1271        self._update_hqe("status='Pending', execution_subdir=''")
1272
1273        queue_task = self._test_run_helper(expect_starting=True)
1274
1275        self.assert_(isinstance(queue_task, monitor_db.QueueTask))
1276        # Atomic group jobs that also specify a label in the atomic group
1277        # will use the label name as their group name.
1278        self.assertEquals(queue_task.queue_entries[0].get_group_name(),
1279                          'label4')
1280
1281
1282    def test_run_synchronous_ready(self):
1283        self._create_job(hosts=[1, 2], synchronous=True)
1284        self._update_hqe("status='Pending', execution_subdir=''")
1285
1286        queue_task = self._test_run_helper(expect_starting=True)
1287
1288        self.assert_(isinstance(queue_task, monitor_db.QueueTask))
1289        self.assertEquals(queue_task.job.id, 1)
1290        hqe_ids = [hqe.id for hqe in queue_task.queue_entries]
1291        self.assertEquals(hqe_ids, [1, 2])
1292
1293
1294    def test_schedule_running_host_queue_entries_fail(self):
1295        self._create_job(hosts=[2])
1296        self._update_hqe("status='%s', execution_subdir=''" %
1297                         models.HostQueueEntry.Status.PENDING)
1298        job = scheduler_models.Job.fetch('id = 1')[0]
1299        queue_entry = scheduler_models.HostQueueEntry.fetch('id = 1')[0]
1300        assert queue_entry.job is job
1301        job.run_if_ready(queue_entry)
1302        self.assertEqual(queue_entry.status,
1303                         models.HostQueueEntry.Status.STARTING)
1304        self.assert_(queue_entry.execution_subdir)
1305        self.god.check_playback()
1306
1307        class dummy_test_agent(object):
1308            task = 'dummy_test_agent'
1309        self._dispatcher._register_agent_for_ids(
1310                self._dispatcher._host_agents, [queue_entry.host.id],
1311                dummy_test_agent)
1312
1313        # Attempted to schedule on a host that already has an agent.
1314        self.assertRaises(host_scheduler.SchedulerError,
1315                          self._dispatcher._schedule_running_host_queue_entries)
1316
1317
1318    def test_schedule_hostless_job(self):
1319        job = self._create_job(hostless=True)
1320        self.assertEqual(1, job.hostqueueentry_set.count())
1321        hqe_query = scheduler_models.HostQueueEntry.fetch(
1322                'id = %s' % job.hostqueueentry_set.all()[0].id)
1323        self.assertEqual(1, len(hqe_query))
1324        hqe = hqe_query[0]
1325
1326        self.assertEqual(models.HostQueueEntry.Status.QUEUED, hqe.status)
1327        self.assertEqual(0, len(self._dispatcher._agents))
1328
1329        self._dispatcher._schedule_new_jobs()
1330
1331        self.assertEqual(models.HostQueueEntry.Status.STARTING, hqe.status)
1332        self.assertEqual(1, len(self._dispatcher._agents))
1333
1334        self._dispatcher._schedule_new_jobs()
1335
1336        # No change to previously schedule hostless job, and no additional agent
1337        self.assertEqual(models.HostQueueEntry.Status.STARTING, hqe.status)
1338        self.assertEqual(1, len(self._dispatcher._agents))
1339
1340
1341class TopLevelFunctionsTest(unittest.TestCase):
1342    def setUp(self):
1343        self.god = mock.mock_god()
1344
1345
1346    def tearDown(self):
1347        self.god.unstub_all()
1348
1349
1350    def test_autoserv_command_line(self):
1351        machines = 'abcd12,efgh34'
1352        extra_args = ['-Z', 'hello']
1353        expected_command_line_base = set((monitor_db._autoserv_path, '-p',
1354                                          '-m', machines, '-r',
1355                                          drone_manager.WORKING_DIRECTORY))
1356
1357        expected_command_line = expected_command_line_base.union(
1358                ['--verbose']).union(extra_args)
1359        command_line = set(
1360                monitor_db._autoserv_command_line(machines, extra_args))
1361        self.assertEqual(expected_command_line, command_line)
1362
1363        class FakeJob(object):
1364            owner = 'Bob'
1365            name = 'fake job name'
1366            test_retry = 0
1367            id = 1337
1368
1369        class FakeHQE(object):
1370            job = FakeJob
1371
1372        expected_command_line = expected_command_line_base.union(
1373                ['-u', FakeJob.owner, '-l', FakeJob.name])
1374        command_line = set(monitor_db._autoserv_command_line(
1375                machines, extra_args=[], queue_entry=FakeHQE, verbose=False))
1376        self.assertEqual(expected_command_line, command_line)
1377
1378
1379class AgentTaskTest(unittest.TestCase,
1380                    frontend_test_utils.FrontendTestMixin):
1381    def setUp(self):
1382        self._frontend_common_setup()
1383
1384
1385    def tearDown(self):
1386        self._frontend_common_teardown()
1387
1388
1389    def _setup_drones(self):
1390        self.god.stub_function(models.DroneSet, 'drone_sets_enabled')
1391        models.DroneSet.drone_sets_enabled.expect_call().and_return(True)
1392
1393        drones = []
1394        for x in xrange(4):
1395            drones.append(models.Drone.objects.create(hostname=str(x)))
1396
1397        drone_set_1 = models.DroneSet.objects.create(name='1')
1398        drone_set_1.drones.add(*drones[0:2])
1399        drone_set_2 = models.DroneSet.objects.create(name='2')
1400        drone_set_2.drones.add(*drones[2:4])
1401        drone_set_3 = models.DroneSet.objects.create(name='3')
1402
1403        job_1 = self._create_job_simple([self.hosts[0].id],
1404                                        drone_set=drone_set_1)
1405        job_2 = self._create_job_simple([self.hosts[0].id],
1406                                        drone_set=drone_set_2)
1407        job_3 = self._create_job_simple([self.hosts[0].id],
1408                                        drone_set=drone_set_3)
1409
1410        job_4 = self._create_job_simple([self.hosts[0].id])
1411        job_4.drone_set = None
1412        job_4.save()
1413
1414        hqe_1 = job_1.hostqueueentry_set.all()[0]
1415        hqe_2 = job_2.hostqueueentry_set.all()[0]
1416        hqe_3 = job_3.hostqueueentry_set.all()[0]
1417        hqe_4 = job_4.hostqueueentry_set.all()[0]
1418
1419        return (hqe_1, hqe_2, hqe_3, hqe_4), agent_task.AgentTask()
1420
1421
1422    def test_get_drone_hostnames_allowed_no_drones_in_set(self):
1423        hqes, task = self._setup_drones()
1424        task.queue_entry_ids = (hqes[2].id,)
1425        self.assertEqual(set(), task.get_drone_hostnames_allowed())
1426        self.god.check_playback()
1427
1428
1429    def test_get_drone_hostnames_allowed_no_drone_set(self):
1430        hqes, task = self._setup_drones()
1431        hqe = hqes[3]
1432        task.queue_entry_ids = (hqe.id,)
1433
1434        result = object()
1435
1436        self.god.stub_function(task, '_user_or_global_default_drone_set')
1437        task._user_or_global_default_drone_set.expect_call(
1438                hqe.job, hqe.job.user()).and_return(result)
1439
1440        self.assertEqual(result, task.get_drone_hostnames_allowed())
1441        self.god.check_playback()
1442
1443
1444    def test_get_drone_hostnames_allowed_success(self):
1445        hqes, task = self._setup_drones()
1446        task.queue_entry_ids = (hqes[0].id,)
1447        self.assertEqual(set(('0','1')), task.get_drone_hostnames_allowed())
1448        self.god.check_playback()
1449
1450
1451    def test_get_drone_hostnames_allowed_multiple_jobs(self):
1452        hqes, task = self._setup_drones()
1453        task.queue_entry_ids = (hqes[0].id, hqes[1].id)
1454        self.assertRaises(AssertionError,
1455                          task.get_drone_hostnames_allowed)
1456        self.god.check_playback()
1457
1458
1459    def test_get_drone_hostnames_allowed_no_hqe(self):
1460        class MockSpecialTask(object):
1461            requested_by = object()
1462
1463        class MockSpecialAgentTask(agent_task.SpecialAgentTask):
1464            task = MockSpecialTask()
1465            queue_entry_ids = []
1466            def __init__(self, *args, **kwargs):
1467                pass
1468
1469        task = MockSpecialAgentTask()
1470        self.god.stub_function(models.DroneSet, 'drone_sets_enabled')
1471        self.god.stub_function(task, '_user_or_global_default_drone_set')
1472
1473        result = object()
1474        models.DroneSet.drone_sets_enabled.expect_call().and_return(True)
1475        task._user_or_global_default_drone_set.expect_call(
1476                task.task, MockSpecialTask.requested_by).and_return(result)
1477
1478        self.assertEqual(result, task.get_drone_hostnames_allowed())
1479        self.god.check_playback()
1480
1481
1482    def _setup_test_user_or_global_default_drone_set(self):
1483        result = object()
1484        class MockDroneSet(object):
1485            def get_drone_hostnames(self):
1486                return result
1487
1488        self.god.stub_function(models.DroneSet, 'get_default')
1489        models.DroneSet.get_default.expect_call().and_return(MockDroneSet())
1490        return result
1491
1492
1493    def test_user_or_global_default_drone_set(self):
1494        expected = object()
1495        class MockDroneSet(object):
1496            def get_drone_hostnames(self):
1497                return expected
1498        class MockUser(object):
1499            drone_set = MockDroneSet()
1500
1501        self._setup_test_user_or_global_default_drone_set()
1502
1503        actual = agent_task.AgentTask()._user_or_global_default_drone_set(
1504                None, MockUser())
1505
1506        self.assertEqual(expected, actual)
1507        self.god.check_playback()
1508
1509
1510    def test_user_or_global_default_drone_set_no_user(self):
1511        expected = self._setup_test_user_or_global_default_drone_set()
1512        actual = agent_task.AgentTask()._user_or_global_default_drone_set(
1513                None, None)
1514
1515        self.assertEqual(expected, actual)
1516        self.god.check_playback()
1517
1518
1519    def test_user_or_global_default_drone_set_no_user_drone_set(self):
1520        class MockUser(object):
1521            drone_set = None
1522            login = None
1523
1524        expected = self._setup_test_user_or_global_default_drone_set()
1525        actual = agent_task.AgentTask()._user_or_global_default_drone_set(
1526                None, MockUser())
1527
1528        self.assertEqual(expected, actual)
1529        self.god.check_playback()
1530
1531
1532if __name__ == '__main__':
1533    unittest.main()
1534