monitor_db_unittest.py revision f47a6bbb9971efd228eaa22425431b91fa9f69bf
1#!/usr/bin/python
2#pylint: disable-msg=C0111
3
4import gc, time
5import common
6from autotest_lib.frontend import setup_django_environment
7from autotest_lib.frontend.afe import frontend_test_utils
8from autotest_lib.client.common_lib.test_utils import mock
9from autotest_lib.client.common_lib.test_utils import unittest
10from autotest_lib.database import database_connection
11from autotest_lib.frontend.afe import models
12from autotest_lib.scheduler import agent_task
13from autotest_lib.scheduler import monitor_db, drone_manager, email_manager
14from autotest_lib.scheduler import pidfile_monitor
15from autotest_lib.scheduler import scheduler_config, gc_stats
16from autotest_lib.scheduler import scheduler_lib
17from autotest_lib.scheduler import scheduler_models
18
19_DEBUG = False
20
21
22class DummyAgentTask(object):
23    num_processes = 1
24    owner_username = 'my_user'
25
26    def get_drone_hostnames_allowed(self):
27        return None
28
29
30class DummyAgent(object):
31    started = False
32    _is_done = False
33    host_ids = ()
34    queue_entry_ids = ()
35
36    def __init__(self):
37        self.task = DummyAgentTask()
38
39
40    def tick(self):
41        self.started = True
42
43
44    def is_done(self):
45        return self._is_done
46
47
48    def set_done(self, done):
49        self._is_done = done
50
51
52class IsRow(mock.argument_comparator):
53    def __init__(self, row_id):
54        self.row_id = row_id
55
56
57    def is_satisfied_by(self, parameter):
58        return list(parameter)[0] == self.row_id
59
60
61    def __str__(self):
62        return 'row with id %s' % self.row_id
63
64
65class IsAgentWithTask(mock.argument_comparator):
66    def __init__(self, task):
67        self._task = task
68
69
70    def is_satisfied_by(self, parameter):
71        if not isinstance(parameter, monitor_db.Agent):
72            return False
73        tasks = list(parameter.queue.queue)
74        if len(tasks) != 1:
75            return False
76        return tasks[0] == self._task
77
78
79def _set_host_and_qe_ids(agent_or_task, id_list=None):
80    if id_list is None:
81        id_list = []
82    agent_or_task.host_ids = agent_or_task.queue_entry_ids = id_list
83
84
85class BaseSchedulerTest(unittest.TestCase,
86                        frontend_test_utils.FrontendTestMixin):
87    _config_section = 'AUTOTEST_WEB'
88
89    def _do_query(self, sql):
90        self._database.execute(sql)
91
92
93    def _set_monitor_stubs(self):
94        # Clear the instance cache as this is a brand new database.
95        scheduler_models.DBObject._clear_instance_cache()
96
97        self._database = (
98            database_connection.TranslatingDatabase.get_test_database(
99                translators=scheduler_lib._DB_TRANSLATORS))
100        self._database.connect(db_type='django')
101        self._database.debug = _DEBUG
102
103        connection_manager = scheduler_lib.ConnectionManager(autocommit=False)
104        self.god.stub_with(connection_manager, 'db_connection', self._database)
105        self.god.stub_with(monitor_db, '_db_manager', connection_manager)
106
107        # These tests only make sense if hosts are acquired inline with the
108        # rest of the tick.
109        self.god.stub_with(monitor_db, '_inline_host_acquisition', True)
110        self.god.stub_with(monitor_db.BaseDispatcher,
111                           '_get_pending_queue_entries',
112                           self._get_pending_hqes)
113        self.god.stub_with(scheduler_models, '_db', self._database)
114        self.god.stub_with(drone_manager.instance(), '_results_dir',
115                           '/test/path')
116        self.god.stub_with(drone_manager.instance(), '_temporary_directory',
117                           '/test/path/tmp')
118
119        monitor_db.initialize_globals()
120        scheduler_models.initialize_globals()
121
122
123    def setUp(self):
124        self._frontend_common_setup()
125        self._set_monitor_stubs()
126        self._dispatcher = monitor_db.Dispatcher()
127
128
129    def tearDown(self):
130        self._database.disconnect()
131        self._frontend_common_teardown()
132
133
134    def _update_hqe(self, set, where=''):
135        query = 'UPDATE afe_host_queue_entries SET ' + set
136        if where:
137            query += ' WHERE ' + where
138        self._do_query(query)
139
140
141    def _get_pending_hqes(self):
142        query_string=('afe_jobs.priority DESC, '
143                      'ifnull(nullif(host_id, NULL), host_id) DESC, '
144                      'ifnull(nullif(meta_host, NULL), meta_host) DESC, '
145                      'job_id')
146        return list(scheduler_models.HostQueueEntry.fetch(
147            joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
148            where='NOT complete AND NOT active AND status="Queued"',
149            order_by=query_string))
150
151
152class DispatcherSchedulingTest(BaseSchedulerTest):
153    _jobs_scheduled = []
154
155
156    def tearDown(self):
157        super(DispatcherSchedulingTest, self).tearDown()
158
159
160    def _set_monitor_stubs(self):
161        super(DispatcherSchedulingTest, self)._set_monitor_stubs()
162
163        def hqe__do_schedule_pre_job_tasks_stub(queue_entry):
164            """Called by HostQueueEntry.run()."""
165            self._record_job_scheduled(queue_entry.job.id, queue_entry.host.id)
166            queue_entry.set_status('Starting')
167
168        self.god.stub_with(scheduler_models.HostQueueEntry,
169                           '_do_schedule_pre_job_tasks',
170                           hqe__do_schedule_pre_job_tasks_stub)
171
172
173    def _record_job_scheduled(self, job_id, host_id):
174        record = (job_id, host_id)
175        self.assert_(record not in self._jobs_scheduled,
176                     'Job %d scheduled on host %d twice' %
177                     (job_id, host_id))
178        self._jobs_scheduled.append(record)
179
180
181    def _assert_job_scheduled_on(self, job_id, host_id):
182        record = (job_id, host_id)
183        self.assert_(record in self._jobs_scheduled,
184                     'Job %d not scheduled on host %d as expected\n'
185                     'Jobs scheduled: %s' %
186                     (job_id, host_id, self._jobs_scheduled))
187        self._jobs_scheduled.remove(record)
188
189
190    def _assert_job_scheduled_on_number_of(self, job_id, host_ids, number):
191        """Assert job was scheduled on exactly number hosts out of a set."""
192        found = []
193        for host_id in host_ids:
194            record = (job_id, host_id)
195            if record in self._jobs_scheduled:
196                found.append(record)
197                self._jobs_scheduled.remove(record)
198        if len(found) < number:
199            self.fail('Job %d scheduled on fewer than %d hosts in %s.\n'
200                      'Jobs scheduled: %s' % (job_id, number, host_ids, found))
201        elif len(found) > number:
202            self.fail('Job %d scheduled on more than %d hosts in %s.\n'
203                      'Jobs scheduled: %s' % (job_id, number, host_ids, found))
204
205
206    def _check_for_extra_schedulings(self):
207        if len(self._jobs_scheduled) != 0:
208            self.fail('Extra jobs scheduled: ' +
209                      str(self._jobs_scheduled))
210
211
212    def _convert_jobs_to_metahosts(self, *job_ids):
213        sql_tuple = '(' + ','.join(str(i) for i in job_ids) + ')'
214        self._do_query('UPDATE afe_host_queue_entries SET '
215                       'meta_host=host_id, host_id=NULL '
216                       'WHERE job_id IN ' + sql_tuple)
217
218
219    def _lock_host(self, host_id):
220        self._do_query('UPDATE afe_hosts SET locked=1 WHERE id=' +
221                       str(host_id))
222
223
224    def setUp(self):
225        super(DispatcherSchedulingTest, self).setUp()
226        self._jobs_scheduled = []
227
228
229    def _run_scheduler(self):
230        self._dispatcher._host_scheduler.tick()
231        for _ in xrange(2): # metahost scheduling can take two cycles
232            self._dispatcher._schedule_new_jobs()
233
234
235    def _test_basic_scheduling_helper(self, use_metahosts):
236        'Basic nonmetahost scheduling'
237        self._create_job_simple([1], use_metahosts)
238        self._create_job_simple([2], use_metahosts)
239        self._run_scheduler()
240        self._assert_job_scheduled_on(1, 1)
241        self._assert_job_scheduled_on(2, 2)
242        self._check_for_extra_schedulings()
243
244
245    def _test_priorities_helper(self, use_metahosts):
246        'Test prioritization ordering'
247        self._create_job_simple([1], use_metahosts)
248        self._create_job_simple([2], use_metahosts)
249        self._create_job_simple([1,2], use_metahosts)
250        self._create_job_simple([1], use_metahosts, priority=1)
251        self._run_scheduler()
252        self._assert_job_scheduled_on(4, 1) # higher priority
253        self._assert_job_scheduled_on(2, 2) # earlier job over later
254        self._check_for_extra_schedulings()
255
256
257    def _test_hosts_ready_helper(self, use_metahosts):
258        """
259        Only hosts that are status=Ready, unlocked and not invalid get
260        scheduled.
261        """
262        self._create_job_simple([1], use_metahosts)
263        self._do_query('UPDATE afe_hosts SET status="Running" WHERE id=1')
264        self._run_scheduler()
265        self._check_for_extra_schedulings()
266
267        self._do_query('UPDATE afe_hosts SET status="Ready", locked=1 '
268                       'WHERE id=1')
269        self._run_scheduler()
270        self._check_for_extra_schedulings()
271
272        self._do_query('UPDATE afe_hosts SET locked=0, invalid=1 '
273                       'WHERE id=1')
274        self._run_scheduler()
275        if not use_metahosts:
276            self._assert_job_scheduled_on(1, 1)
277        self._check_for_extra_schedulings()
278
279
280    def _test_hosts_idle_helper(self, use_metahosts):
281        'Only idle hosts get scheduled'
282        self._create_job(hosts=[1], active=True)
283        self._create_job_simple([1], use_metahosts)
284        self._run_scheduler()
285        self._check_for_extra_schedulings()
286
287
288    def _test_obey_ACLs_helper(self, use_metahosts):
289        self._do_query('DELETE FROM afe_acl_groups_hosts WHERE host_id=1')
290        self._create_job_simple([1], use_metahosts)
291        self._run_scheduler()
292        self._check_for_extra_schedulings()
293
294
295    def test_basic_scheduling(self):
296        self._test_basic_scheduling_helper(False)
297
298
299    def test_priorities(self):
300        self._test_priorities_helper(False)
301
302
303    def test_hosts_ready(self):
304        self._test_hosts_ready_helper(False)
305
306
307    def test_hosts_idle(self):
308        self._test_hosts_idle_helper(False)
309
310
311    def test_obey_ACLs(self):
312        self._test_obey_ACLs_helper(False)
313
314
315    def test_one_time_hosts_ignore_ACLs(self):
316        self._do_query('DELETE FROM afe_acl_groups_hosts WHERE host_id=1')
317        self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=1')
318        self._create_job_simple([1])
319        self._run_scheduler()
320        self._assert_job_scheduled_on(1, 1)
321        self._check_for_extra_schedulings()
322
323
324    def test_non_metahost_on_invalid_host(self):
325        """
326        Non-metahost entries can get scheduled on invalid hosts (this is how
327        one-time hosts work).
328        """
329        self._do_query('UPDATE afe_hosts SET invalid=1')
330        self._test_basic_scheduling_helper(False)
331
332
333    def test_metahost_scheduling(self):
334        """
335        Basic metahost scheduling
336        """
337        self._test_basic_scheduling_helper(True)
338
339
340    def test_metahost_priorities(self):
341        self._test_priorities_helper(True)
342
343
344    def test_metahost_hosts_ready(self):
345        self._test_hosts_ready_helper(True)
346
347
348    def test_metahost_hosts_idle(self):
349        self._test_hosts_idle_helper(True)
350
351
352    def test_metahost_obey_ACLs(self):
353        self._test_obey_ACLs_helper(True)
354
355
356    def test_nonmetahost_over_metahost(self):
357        """
358        Non-metahost entries should take priority over metahost entries
359        for the same host
360        """
361        self._create_job(metahosts=[1])
362        self._create_job(hosts=[1])
363        self._run_scheduler()
364        self._assert_job_scheduled_on(2, 1)
365        self._check_for_extra_schedulings()
366
367
368#    TODO: Revive this test.
369#    def test_HostScheduler_get_host_atomic_group_id(self):
370#        job = self._create_job(metahosts=[self.label6.id])
371#        queue_entry = scheduler_models.HostQueueEntry.fetch(
372#                where='job_id=%d' % job.id)[0]
373#        # Indirectly initialize the internal state of the host scheduler.
374#        self._dispatcher._refresh_pending_queue_entries()
375#
376#        # Test the host scheduler
377#        host_scheduler = self._dispatcher._host_scheduler
378#
379#
380#        # Two labels each in a different atomic group.  This should log an
381#        # error and continue.
382#        orig_logging_error = logging.error
383#        def mock_logging_error(message, *args):
384#            mock_logging_error._num_calls += 1
385#            # Test the logging call itself, we just wrapped it to count it.
386#            orig_logging_error(message, *args)
387#        mock_logging_error._num_calls = 0
388#        self.god.stub_with(logging, 'error', mock_logging_error)
389#        host_scheduler.refresh([])
390#        self.assertNotEquals(None, host_scheduler._get_host_atomic_group_id(
391#                [self.label4.id, self.label8.id], queue_entry))
392#        self.assertTrue(mock_logging_error._num_calls > 0)
393#        self.god.unstub(logging, 'error')
394#
395#        # Two labels both in the same atomic group, this should not raise an
396#        # error, it will merely cause the job to schedule on the intersection.
397#        self.assertEquals(1, host_scheduler._get_host_atomic_group_id(
398#                [self.label4.id, self.label5.id]))
399#
400#        self.assertEquals(None, host_scheduler._get_host_atomic_group_id([]))
401#        self.assertEquals(None, host_scheduler._get_host_atomic_group_id(
402#                [self.label3.id, self.label7.id, self.label6.id]))
403#        self.assertEquals(1, host_scheduler._get_host_atomic_group_id(
404#                [self.label4.id, self.label7.id, self.label6.id]))
405#        self.assertEquals(1, host_scheduler._get_host_atomic_group_id(
406#                [self.label7.id, self.label5.id]))
407
408    def test_only_schedule_queued_entries(self):
409        self._create_job(metahosts=[1])
410        self._update_hqe(set='active=1, host_id=2')
411        self._run_scheduler()
412        self._check_for_extra_schedulings()
413
414
415    def test_no_ready_hosts(self):
416        self._create_job(hosts=[1])
417        self._do_query('UPDATE afe_hosts SET status="Repair Failed"')
418        self._run_scheduler()
419        self._check_for_extra_schedulings()
420
421
422    def test_garbage_collection(self):
423        self.god.stub_with(self._dispatcher, '_seconds_between_garbage_stats',
424                           999999)
425        self.god.stub_function(gc, 'collect')
426        self.god.stub_function(gc_stats, '_log_garbage_collector_stats')
427        gc.collect.expect_call().and_return(0)
428        gc_stats._log_garbage_collector_stats.expect_call()
429        # Force a garbage collection run
430        self._dispatcher._last_garbage_stats_time = 0
431        self._dispatcher._garbage_collection()
432        # The previous call should have reset the time, it won't do anything
433        # the second time.  If it does, we'll get an unexpected call.
434        self._dispatcher._garbage_collection()
435
436
437class DispatcherThrottlingTest(BaseSchedulerTest):
438    """
439    Test that the dispatcher throttles:
440     * total number of running processes
441     * number of processes started per cycle
442    """
443    _MAX_RUNNING = 3
444    _MAX_STARTED = 2
445
446    def setUp(self):
447        super(DispatcherThrottlingTest, self).setUp()
448        scheduler_config.config.max_processes_per_drone = self._MAX_RUNNING
449        scheduler_config.config.max_processes_started_per_cycle = (
450            self._MAX_STARTED)
451
452        def fake_max_runnable_processes(fake_self, username,
453                                        drone_hostnames_allowed):
454            running = sum(agent.task.num_processes
455                          for agent in self._agents
456                          if agent.started and not agent.is_done())
457            return self._MAX_RUNNING - running
458        self.god.stub_with(drone_manager.DroneManager, 'max_runnable_processes',
459                           fake_max_runnable_processes)
460
461
462    def _setup_some_agents(self, num_agents):
463        self._agents = [DummyAgent() for i in xrange(num_agents)]
464        self._dispatcher._agents = list(self._agents)
465
466
467    def _run_a_few_cycles(self):
468        for i in xrange(4):
469            self._dispatcher._handle_agents()
470
471
472    def _assert_agents_started(self, indexes, is_started=True):
473        for i in indexes:
474            self.assert_(self._agents[i].started == is_started,
475                         'Agent %d %sstarted' %
476                         (i, is_started and 'not ' or ''))
477
478
479    def _assert_agents_not_started(self, indexes):
480        self._assert_agents_started(indexes, False)
481
482
483    def test_throttle_total(self):
484        self._setup_some_agents(4)
485        self._run_a_few_cycles()
486        self._assert_agents_started([0, 1, 2])
487        self._assert_agents_not_started([3])
488
489
490    def test_throttle_per_cycle(self):
491        self._setup_some_agents(3)
492        self._dispatcher._handle_agents()
493        self._assert_agents_started([0, 1])
494        self._assert_agents_not_started([2])
495
496
497    def test_throttle_with_synchronous(self):
498        self._setup_some_agents(2)
499        self._agents[0].task.num_processes = 3
500        self._run_a_few_cycles()
501        self._assert_agents_started([0])
502        self._assert_agents_not_started([1])
503
504
505    def test_large_agent_starvation(self):
506        """
507        Ensure large agents don't get starved by lower-priority agents.
508        """
509        self._setup_some_agents(3)
510        self._agents[1].task.num_processes = 3
511        self._run_a_few_cycles()
512        self._assert_agents_started([0])
513        self._assert_agents_not_started([1, 2])
514
515        self._agents[0].set_done(True)
516        self._run_a_few_cycles()
517        self._assert_agents_started([1])
518        self._assert_agents_not_started([2])
519
520
521    def test_zero_process_agent(self):
522        self._setup_some_agents(5)
523        self._agents[4].task.num_processes = 0
524        self._run_a_few_cycles()
525        self._assert_agents_started([0, 1, 2, 4])
526        self._assert_agents_not_started([3])
527
528
529class PidfileRunMonitorTest(unittest.TestCase):
530    execution_tag = 'test_tag'
531    pid = 12345
532    process = drone_manager.Process('myhost', pid)
533    num_tests_failed = 1
534
535    def setUp(self):
536        self.god = mock.mock_god()
537        self.mock_drone_manager = self.god.create_mock_class(
538            drone_manager.DroneManager, 'drone_manager')
539        self.god.stub_with(drone_manager, '_the_instance',
540                           self.mock_drone_manager)
541        self.god.stub_function(email_manager.manager, 'enqueue_notify_email')
542        self.god.stub_with(pidfile_monitor, '_get_pidfile_timeout_secs',
543                           self._mock_get_pidfile_timeout_secs)
544
545        self.pidfile_id = object()
546
547        (self.mock_drone_manager.get_pidfile_id_from
548             .expect_call(self.execution_tag,
549                          pidfile_name=drone_manager.AUTOSERV_PID_FILE)
550             .and_return(self.pidfile_id))
551
552        self.monitor = pidfile_monitor.PidfileRunMonitor()
553        self.monitor.attach_to_existing_process(self.execution_tag)
554
555    def tearDown(self):
556        self.god.unstub_all()
557
558
559    def _mock_get_pidfile_timeout_secs(self):
560        return 300
561
562
563    def setup_pidfile(self, pid=None, exit_code=None, tests_failed=None,
564                      use_second_read=False):
565        contents = drone_manager.PidfileContents()
566        if pid is not None:
567            contents.process = drone_manager.Process('myhost', pid)
568        contents.exit_status = exit_code
569        contents.num_tests_failed = tests_failed
570        self.mock_drone_manager.get_pidfile_contents.expect_call(
571            self.pidfile_id, use_second_read=use_second_read).and_return(
572            contents)
573
574
575    def set_not_yet_run(self):
576        self.setup_pidfile()
577
578
579    def set_empty_pidfile(self):
580        self.setup_pidfile()
581
582
583    def set_running(self, use_second_read=False):
584        self.setup_pidfile(self.pid, use_second_read=use_second_read)
585
586
587    def set_complete(self, error_code, use_second_read=False):
588        self.setup_pidfile(self.pid, error_code, self.num_tests_failed,
589                           use_second_read=use_second_read)
590
591
592    def _check_monitor(self, expected_pid, expected_exit_status,
593                       expected_num_tests_failed):
594        if expected_pid is None:
595            self.assertEquals(self.monitor._state.process, None)
596        else:
597            self.assertEquals(self.monitor._state.process.pid, expected_pid)
598        self.assertEquals(self.monitor._state.exit_status, expected_exit_status)
599        self.assertEquals(self.monitor._state.num_tests_failed,
600                          expected_num_tests_failed)
601
602
603        self.god.check_playback()
604
605
606    def _test_read_pidfile_helper(self, expected_pid, expected_exit_status,
607                                  expected_num_tests_failed):
608        self.monitor._read_pidfile()
609        self._check_monitor(expected_pid, expected_exit_status,
610                            expected_num_tests_failed)
611
612
613    def _get_expected_tests_failed(self, expected_exit_status):
614        if expected_exit_status is None:
615            expected_tests_failed = None
616        else:
617            expected_tests_failed = self.num_tests_failed
618        return expected_tests_failed
619
620
621    def test_read_pidfile(self):
622        self.set_not_yet_run()
623        self._test_read_pidfile_helper(None, None, None)
624
625        self.set_empty_pidfile()
626        self._test_read_pidfile_helper(None, None, None)
627
628        self.set_running()
629        self._test_read_pidfile_helper(self.pid, None, None)
630
631        self.set_complete(123)
632        self._test_read_pidfile_helper(self.pid, 123, self.num_tests_failed)
633
634
635    def test_read_pidfile_error(self):
636        self.mock_drone_manager.get_pidfile_contents.expect_call(
637            self.pidfile_id, use_second_read=False).and_return(
638            drone_manager.InvalidPidfile('error'))
639        self.assertRaises(pidfile_monitor.PidfileRunMonitor._PidfileException,
640                          self.monitor._read_pidfile)
641        self.god.check_playback()
642
643
644    def setup_is_running(self, is_running):
645        self.mock_drone_manager.is_process_running.expect_call(
646            self.process).and_return(is_running)
647
648
649    def _test_get_pidfile_info_helper(self, expected_pid, expected_exit_status,
650                                      expected_num_tests_failed):
651        self.monitor._get_pidfile_info()
652        self._check_monitor(expected_pid, expected_exit_status,
653                            expected_num_tests_failed)
654
655
656    def test_get_pidfile_info(self):
657        """
658        normal cases for get_pidfile_info
659        """
660        # running
661        self.set_running()
662        self.setup_is_running(True)
663        self._test_get_pidfile_info_helper(self.pid, None, None)
664
665        # exited during check
666        self.set_running()
667        self.setup_is_running(False)
668        self.set_complete(123, use_second_read=True) # pidfile gets read again
669        self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed)
670
671        # completed
672        self.set_complete(123)
673        self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed)
674
675
676    def test_get_pidfile_info_running_no_proc(self):
677        """
678        pidfile shows process running, but no proc exists
679        """
680        # running but no proc
681        self.set_running()
682        self.setup_is_running(False)
683        self.set_running(use_second_read=True)
684        email_manager.manager.enqueue_notify_email.expect_call(
685            mock.is_string_comparator(), mock.is_string_comparator())
686        self._test_get_pidfile_info_helper(self.pid, 1, 0)
687        self.assertTrue(self.monitor.lost_process)
688
689
690    def test_get_pidfile_info_not_yet_run(self):
691        """
692        pidfile hasn't been written yet
693        """
694        self.set_not_yet_run()
695        self._test_get_pidfile_info_helper(None, None, None)
696
697
698    def test_process_failed_to_write_pidfile(self):
699        self.set_not_yet_run()
700        email_manager.manager.enqueue_notify_email.expect_call(
701            mock.is_string_comparator(), mock.is_string_comparator())
702        self.monitor._start_time = (time.time() -
703                                    pidfile_monitor._get_pidfile_timeout_secs() - 1)
704        self._test_get_pidfile_info_helper(None, 1, 0)
705        self.assertTrue(self.monitor.lost_process)
706
707
708class AgentTest(unittest.TestCase):
709    def setUp(self):
710        self.god = mock.mock_god()
711        self._dispatcher = self.god.create_mock_class(monitor_db.Dispatcher,
712                                                      'dispatcher')
713
714
715    def tearDown(self):
716        self.god.unstub_all()
717
718
719    def _create_mock_task(self, name):
720        task = self.god.create_mock_class(agent_task.AgentTask, name)
721        task.num_processes = 1
722        _set_host_and_qe_ids(task)
723        return task
724
725    def _create_agent(self, task):
726        agent = monitor_db.Agent(task)
727        agent.dispatcher = self._dispatcher
728        return agent
729
730
731    def _finish_agent(self, agent):
732        while not agent.is_done():
733            agent.tick()
734
735
736    def test_agent_abort(self):
737        task = self._create_mock_task('task')
738        task.poll.expect_call()
739        task.is_done.expect_call().and_return(False)
740        task.abort.expect_call()
741        task.aborted = True
742
743        agent = self._create_agent(task)
744        agent.tick()
745        agent.abort()
746        self._finish_agent(agent)
747        self.god.check_playback()
748
749
750    def _test_agent_abort_before_started_helper(self, ignore_abort=False):
751        task = self._create_mock_task('task')
752        task.abort.expect_call()
753        if ignore_abort:
754            task.aborted = False
755            task.poll.expect_call()
756            task.is_done.expect_call().and_return(True)
757            task.success = True
758        else:
759            task.aborted = True
760
761        agent = self._create_agent(task)
762        agent.abort()
763        self._finish_agent(agent)
764        self.god.check_playback()
765
766
767    def test_agent_abort_before_started(self):
768        self._test_agent_abort_before_started_helper()
769        self._test_agent_abort_before_started_helper(True)
770
771
772class JobSchedulingTest(BaseSchedulerTest):
773    def _test_run_helper(self, expect_agent=True, expect_starting=False,
774                         expect_pending=False):
775        if expect_starting:
776            expected_status = models.HostQueueEntry.Status.STARTING
777        elif expect_pending:
778            expected_status = models.HostQueueEntry.Status.PENDING
779        else:
780            expected_status = models.HostQueueEntry.Status.VERIFYING
781        job = scheduler_models.Job.fetch('id = 1')[0]
782        queue_entry = scheduler_models.HostQueueEntry.fetch('id = 1')[0]
783        assert queue_entry.job is job
784        job.run_if_ready(queue_entry)
785
786        self.god.check_playback()
787
788        self._dispatcher._schedule_delay_tasks()
789        self._dispatcher._schedule_running_host_queue_entries()
790        agent = self._dispatcher._agents[0]
791
792        actual_status = models.HostQueueEntry.smart_get(1).status
793        self.assertEquals(expected_status, actual_status)
794
795        if not expect_agent:
796            self.assertEquals(agent, None)
797            return
798
799        self.assert_(isinstance(agent, monitor_db.Agent))
800        self.assert_(agent.task)
801        return agent.task
802
803
804    def test_run_if_ready_delays(self):
805        # Also tests Job.run_with_ready_delay() on atomic group jobs.
806        django_job = self._create_job(hosts=[5, 6], atomic_group=1)
807        job = scheduler_models.Job(django_job.id)
808        self.assertEqual(1, job.synch_count)
809        django_hqes = list(models.HostQueueEntry.objects.filter(job=job.id))
810        self.assertEqual(2, len(django_hqes))
811        self.assertEqual(2, django_hqes[0].atomic_group.max_number_of_machines)
812
813        def set_hqe_status(django_hqe, status):
814            django_hqe.status = status
815            django_hqe.save()
816            scheduler_models.HostQueueEntry(django_hqe.id).host.set_status(status)
817
818        # An initial state, our synch_count is 1
819        set_hqe_status(django_hqes[0], models.HostQueueEntry.Status.VERIFYING)
820        set_hqe_status(django_hqes[1], models.HostQueueEntry.Status.PENDING)
821
822        # So that we don't depend on the config file value during the test.
823        self.assert_(scheduler_config.config
824                     .secs_to_wait_for_atomic_group_hosts is not None)
825        self.god.stub_with(scheduler_config.config,
826                           'secs_to_wait_for_atomic_group_hosts', 123456)
827
828        # Get the pending one as a scheduler_models.HostQueueEntry object.
829        hqe = scheduler_models.HostQueueEntry(django_hqes[1].id)
830        self.assert_(not job._delay_ready_task)
831        self.assertTrue(job.is_ready())
832
833        # Ready with one pending, one verifying and an atomic group should
834        # result in a DelayCallTask to re-check if we're ready a while later.
835        job.run_if_ready(hqe)
836        self.assertEquals('Waiting', hqe.status)
837        self._dispatcher._schedule_delay_tasks()
838        self.assertEquals('Pending', hqe.status)
839        agent = self._dispatcher._agents[0]
840        self.assert_(job._delay_ready_task)
841        self.assert_(isinstance(agent, monitor_db.Agent))
842        self.assert_(agent.task)
843        delay_task = agent.task
844        self.assert_(isinstance(delay_task, scheduler_models.DelayedCallTask))
845        self.assert_(not delay_task.is_done())
846
847        self.god.stub_function(delay_task, 'abort')
848
849        self.god.stub_function(job, 'run')
850
851        self.god.stub_function(job, '_pending_count')
852        self.god.stub_with(job, 'synch_count', 9)
853        self.god.stub_function(job, 'request_abort')
854
855        # Test that the DelayedCallTask's callback queued up above does the
856        # correct thing and does not call run if there are not enough hosts
857        # in pending after the delay.
858        job._pending_count.expect_call().and_return(0)
859        job.request_abort.expect_call()
860        delay_task._callback()
861        self.god.check_playback()
862
863        # Test that the DelayedCallTask's callback queued up above does the
864        # correct thing and returns the Agent returned by job.run() if
865        # there are still enough hosts pending after the delay.
866        job.synch_count = 4
867        job._pending_count.expect_call().and_return(4)
868        job.run.expect_call(hqe)
869        delay_task._callback()
870        self.god.check_playback()
871
872        job._pending_count.expect_call().and_return(4)
873
874        # Adjust the delay deadline so that enough time has passed.
875        job._delay_ready_task.end_time = time.time() - 111111
876        job.run.expect_call(hqe)
877        # ...the delay_expired condition should cause us to call run()
878        self._dispatcher._handle_agents()
879        self.god.check_playback()
880        delay_task.success = False
881
882        # Adjust the delay deadline back so that enough time has not passed.
883        job._delay_ready_task.end_time = time.time() + 111111
884        self._dispatcher._handle_agents()
885        self.god.check_playback()
886
887        # Now max_number_of_machines HQEs are in pending state.  Remaining
888        # delay will now be ignored.
889        other_hqe = scheduler_models.HostQueueEntry(django_hqes[0].id)
890        self.god.unstub(job, 'run')
891        self.god.unstub(job, '_pending_count')
892        self.god.unstub(job, 'synch_count')
893        self.god.unstub(job, 'request_abort')
894        # ...the over_max_threshold test should cause us to call run()
895        delay_task.abort.expect_call()
896        other_hqe.on_pending()
897        self.assertEquals('Starting', other_hqe.status)
898        self.assertEquals('Starting', hqe.status)
899        self.god.stub_function(job, 'run')
900        self.god.unstub(delay_task, 'abort')
901
902        hqe.set_status('Pending')
903        other_hqe.set_status('Pending')
904        # Now we're not over the max for the atomic group.  But all assigned
905        # hosts are in pending state.  over_max_threshold should make us run().
906        hqe.atomic_group.max_number_of_machines += 1
907        hqe.atomic_group.save()
908        job.run.expect_call(hqe)
909        hqe.on_pending()
910        self.god.check_playback()
911        hqe.atomic_group.max_number_of_machines -= 1
912        hqe.atomic_group.save()
913
914        other_hqe = scheduler_models.HostQueueEntry(django_hqes[0].id)
915        self.assertTrue(hqe.job is other_hqe.job)
916        # DBObject classes should reuse instances so these should be the same.
917        self.assertEqual(job, other_hqe.job)
918        self.assertEqual(other_hqe.job, hqe.job)
919        # Be sure our delay was not lost during the other_hqe construction.
920        self.assertEqual(job._delay_ready_task, delay_task)
921        self.assert_(job._delay_ready_task)
922        self.assertFalse(job._delay_ready_task.is_done())
923        self.assertFalse(job._delay_ready_task.aborted)
924
925        # We want the real run() to be called below.
926        self.god.unstub(job, 'run')
927
928        # We pass in the other HQE this time the same way it would happen
929        # for real when one host finishes verifying and enters pending.
930        job.run_if_ready(other_hqe)
931
932        # The delayed task must be aborted by the actual run() call above.
933        self.assertTrue(job._delay_ready_task.aborted)
934        self.assertFalse(job._delay_ready_task.success)
935        self.assertTrue(job._delay_ready_task.is_done())
936
937        # Check that job run() and _finish_run() were called by the above:
938        self._dispatcher._schedule_running_host_queue_entries()
939        agent = self._dispatcher._agents[0]
940        self.assert_(agent.task)
941        task = agent.task
942        self.assert_(isinstance(task, monitor_db.QueueTask))
943        # Requery these hqes in order to verify the status from the DB.
944        django_hqes = list(models.HostQueueEntry.objects.filter(job=job.id))
945        for entry in django_hqes:
946            self.assertEqual(models.HostQueueEntry.Status.STARTING,
947                             entry.status)
948
949        # We're already running, but more calls to run_with_ready_delay can
950        # continue to come in due to straggler hosts enter Pending.  Make
951        # sure we don't do anything.
952        self.god.stub_function(job, 'run')
953        job.run_with_ready_delay(hqe)
954        self.god.check_playback()
955        self.god.unstub(job, 'run')
956
957
958    def test_run_synchronous_atomic_group_ready(self):
959        self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True)
960        self._update_hqe("status='Pending', execution_subdir=''")
961
962        queue_task = self._test_run_helper(expect_starting=True)
963
964        self.assert_(isinstance(queue_task, monitor_db.QueueTask))
965        # Atomic group jobs that do not depend on a specific label in the
966        # atomic group will use the atomic group name as their group name.
967        self.assertEquals(queue_task.queue_entries[0].get_group_name(),
968                          'atomic1')
969
970
971    def test_run_synchronous_atomic_group_with_label_ready(self):
972        job = self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True)
973        job.dependency_labels.add(self.label4)
974        self._update_hqe("status='Pending', execution_subdir=''")
975
976        queue_task = self._test_run_helper(expect_starting=True)
977
978        self.assert_(isinstance(queue_task, monitor_db.QueueTask))
979        # Atomic group jobs that also specify a label in the atomic group
980        # will use the label name as their group name.
981        self.assertEquals(queue_task.queue_entries[0].get_group_name(),
982                          'label4')
983
984
985    def test_run_synchronous_ready(self):
986        self._create_job(hosts=[1, 2], synchronous=True)
987        self._update_hqe("status='Pending', execution_subdir=''")
988
989        queue_task = self._test_run_helper(expect_starting=True)
990
991        self.assert_(isinstance(queue_task, monitor_db.QueueTask))
992        self.assertEquals(queue_task.job.id, 1)
993        hqe_ids = [hqe.id for hqe in queue_task.queue_entries]
994        self.assertEquals(hqe_ids, [1, 2])
995
996
997    def test_schedule_running_host_queue_entries_fail(self):
998        self._create_job(hosts=[2])
999        self._update_hqe("status='%s', execution_subdir=''" %
1000                         models.HostQueueEntry.Status.PENDING)
1001        job = scheduler_models.Job.fetch('id = 1')[0]
1002        queue_entry = scheduler_models.HostQueueEntry.fetch('id = 1')[0]
1003        assert queue_entry.job is job
1004        job.run_if_ready(queue_entry)
1005        self.assertEqual(queue_entry.status,
1006                         models.HostQueueEntry.Status.STARTING)
1007        self.assert_(queue_entry.execution_subdir)
1008        self.god.check_playback()
1009
1010        class dummy_test_agent(object):
1011            task = 'dummy_test_agent'
1012        self._dispatcher._register_agent_for_ids(
1013                self._dispatcher._host_agents, [queue_entry.host.id],
1014                dummy_test_agent)
1015
1016        # Attempted to schedule on a host that already has an agent.
1017        self.assertRaises(scheduler_lib.SchedulerError,
1018                          self._dispatcher._schedule_running_host_queue_entries)
1019
1020
1021    def test_schedule_hostless_job(self):
1022        job = self._create_job(hostless=True)
1023        self.assertEqual(1, job.hostqueueentry_set.count())
1024        hqe_query = scheduler_models.HostQueueEntry.fetch(
1025                'id = %s' % job.hostqueueentry_set.all()[0].id)
1026        self.assertEqual(1, len(hqe_query))
1027        hqe = hqe_query[0]
1028
1029        self.assertEqual(models.HostQueueEntry.Status.QUEUED, hqe.status)
1030        self.assertEqual(0, len(self._dispatcher._agents))
1031
1032        self._dispatcher._schedule_new_jobs()
1033
1034        self.assertEqual(models.HostQueueEntry.Status.STARTING, hqe.status)
1035        self.assertEqual(1, len(self._dispatcher._agents))
1036
1037        self._dispatcher._schedule_new_jobs()
1038
1039        # No change to previously schedule hostless job, and no additional agent
1040        self.assertEqual(models.HostQueueEntry.Status.STARTING, hqe.status)
1041        self.assertEqual(1, len(self._dispatcher._agents))
1042
1043
1044class TopLevelFunctionsTest(unittest.TestCase):
1045    def setUp(self):
1046        self.god = mock.mock_god()
1047
1048
1049    def tearDown(self):
1050        self.god.unstub_all()
1051
1052
1053    def test_autoserv_command_line(self):
1054        machines = 'abcd12,efgh34'
1055        extra_args = ['-Z', 'hello']
1056        expected_command_line_base = set((monitor_db._autoserv_path, '-p',
1057                                          '-m', machines, '-r',
1058                                          drone_manager.WORKING_DIRECTORY))
1059
1060        expected_command_line = expected_command_line_base.union(
1061                ['--verbose']).union(extra_args)
1062        command_line = set(
1063                monitor_db._autoserv_command_line(machines, extra_args))
1064        self.assertEqual(expected_command_line, command_line)
1065
1066        class FakeJob(object):
1067            owner = 'Bob'
1068            name = 'fake job name'
1069            test_retry = 0
1070            id = 1337
1071
1072        class FakeHQE(object):
1073            job = FakeJob
1074
1075        expected_command_line = expected_command_line_base.union(
1076                ['-u', FakeJob.owner, '-l', FakeJob.name])
1077        command_line = set(monitor_db._autoserv_command_line(
1078                machines, extra_args=[], queue_entry=FakeHQE, verbose=False))
1079        self.assertEqual(expected_command_line, command_line)
1080
1081
1082class AgentTaskTest(unittest.TestCase,
1083                    frontend_test_utils.FrontendTestMixin):
1084    def setUp(self):
1085        self._frontend_common_setup()
1086
1087
1088    def tearDown(self):
1089        self._frontend_common_teardown()
1090
1091
1092    def _setup_drones(self):
1093        self.god.stub_function(models.DroneSet, 'drone_sets_enabled')
1094        models.DroneSet.drone_sets_enabled.expect_call().and_return(True)
1095
1096        drones = []
1097        for x in xrange(4):
1098            drones.append(models.Drone.objects.create(hostname=str(x)))
1099
1100        drone_set_1 = models.DroneSet.objects.create(name='1')
1101        drone_set_1.drones.add(*drones[0:2])
1102        drone_set_2 = models.DroneSet.objects.create(name='2')
1103        drone_set_2.drones.add(*drones[2:4])
1104        drone_set_3 = models.DroneSet.objects.create(name='3')
1105
1106        job_1 = self._create_job_simple([self.hosts[0].id],
1107                                        drone_set=drone_set_1)
1108        job_2 = self._create_job_simple([self.hosts[0].id],
1109                                        drone_set=drone_set_2)
1110        job_3 = self._create_job_simple([self.hosts[0].id],
1111                                        drone_set=drone_set_3)
1112
1113        job_4 = self._create_job_simple([self.hosts[0].id])
1114        job_4.drone_set = None
1115        job_4.save()
1116
1117        hqe_1 = job_1.hostqueueentry_set.all()[0]
1118        hqe_2 = job_2.hostqueueentry_set.all()[0]
1119        hqe_3 = job_3.hostqueueentry_set.all()[0]
1120        hqe_4 = job_4.hostqueueentry_set.all()[0]
1121
1122        return (hqe_1, hqe_2, hqe_3, hqe_4), agent_task.AgentTask()
1123
1124
1125    def test_get_drone_hostnames_allowed_no_drones_in_set(self):
1126        hqes, task = self._setup_drones()
1127        task.queue_entry_ids = (hqes[2].id,)
1128        self.assertEqual(set(), task.get_drone_hostnames_allowed())
1129        self.god.check_playback()
1130
1131
1132    def test_get_drone_hostnames_allowed_no_drone_set(self):
1133        hqes, task = self._setup_drones()
1134        hqe = hqes[3]
1135        task.queue_entry_ids = (hqe.id,)
1136
1137        result = object()
1138
1139        self.god.stub_function(task, '_user_or_global_default_drone_set')
1140        task._user_or_global_default_drone_set.expect_call(
1141                hqe.job, hqe.job.user()).and_return(result)
1142
1143        self.assertEqual(result, task.get_drone_hostnames_allowed())
1144        self.god.check_playback()
1145
1146
1147    def test_get_drone_hostnames_allowed_success(self):
1148        hqes, task = self._setup_drones()
1149        task.queue_entry_ids = (hqes[0].id,)
1150        self.assertEqual(set(('0','1')), task.get_drone_hostnames_allowed())
1151        self.god.check_playback()
1152
1153
1154    def test_get_drone_hostnames_allowed_multiple_jobs(self):
1155        hqes, task = self._setup_drones()
1156        task.queue_entry_ids = (hqes[0].id, hqes[1].id)
1157        self.assertRaises(AssertionError,
1158                          task.get_drone_hostnames_allowed)
1159        self.god.check_playback()
1160
1161
1162    def test_get_drone_hostnames_allowed_no_hqe(self):
1163        class MockSpecialTask(object):
1164            requested_by = object()
1165
1166        class MockSpecialAgentTask(agent_task.SpecialAgentTask):
1167            task = MockSpecialTask()
1168            queue_entry_ids = []
1169            def __init__(self, *args, **kwargs):
1170                pass
1171
1172        task = MockSpecialAgentTask()
1173        self.god.stub_function(models.DroneSet, 'drone_sets_enabled')
1174        self.god.stub_function(task, '_user_or_global_default_drone_set')
1175
1176        result = object()
1177        models.DroneSet.drone_sets_enabled.expect_call().and_return(True)
1178        task._user_or_global_default_drone_set.expect_call(
1179                task.task, MockSpecialTask.requested_by).and_return(result)
1180
1181        self.assertEqual(result, task.get_drone_hostnames_allowed())
1182        self.god.check_playback()
1183
1184
1185    def _setup_test_user_or_global_default_drone_set(self):
1186        result = object()
1187        class MockDroneSet(object):
1188            def get_drone_hostnames(self):
1189                return result
1190
1191        self.god.stub_function(models.DroneSet, 'get_default')
1192        models.DroneSet.get_default.expect_call().and_return(MockDroneSet())
1193        return result
1194
1195
1196    def test_user_or_global_default_drone_set(self):
1197        expected = object()
1198        class MockDroneSet(object):
1199            def get_drone_hostnames(self):
1200                return expected
1201        class MockUser(object):
1202            drone_set = MockDroneSet()
1203
1204        self._setup_test_user_or_global_default_drone_set()
1205
1206        actual = agent_task.AgentTask()._user_or_global_default_drone_set(
1207                None, MockUser())
1208
1209        self.assertEqual(expected, actual)
1210        self.god.check_playback()
1211
1212
1213    def test_user_or_global_default_drone_set_no_user(self):
1214        expected = self._setup_test_user_or_global_default_drone_set()
1215        actual = agent_task.AgentTask()._user_or_global_default_drone_set(
1216                None, None)
1217
1218        self.assertEqual(expected, actual)
1219        self.god.check_playback()
1220
1221
1222    def test_user_or_global_default_drone_set_no_user_drone_set(self):
1223        class MockUser(object):
1224            drone_set = None
1225            login = None
1226
1227        expected = self._setup_test_user_or_global_default_drone_set()
1228        actual = agent_task.AgentTask()._user_or_global_default_drone_set(
1229                None, MockUser())
1230
1231        self.assertEqual(expected, actual)
1232        self.god.check_playback()
1233
1234
1235    def test_abort_HostlessQueueTask(self):
1236        hqe = self.god.create_mock_class(scheduler_models.HostQueueEntry,
1237                                         'HostQueueEntry')
1238        # If hqe is still in STARTING status, aborting the task should finish
1239        # without changing hqe's status.
1240        hqe.status = models.HostQueueEntry.Status.STARTING
1241        hqe.job = None
1242        hqe.id = 0
1243        task = monitor_db.HostlessQueueTask(hqe)
1244        task.abort()
1245
1246        # If hqe is in RUNNING status, aborting the task should change hqe's
1247        # status to Parsing, so FinalReparseTask can be scheduled.
1248        hqe.set_status.expect_call('Parsing')
1249        hqe.status = models.HostQueueEntry.Status.RUNNING
1250        hqe.job = None
1251        hqe.id = 0
1252        task = monitor_db.HostlessQueueTask(hqe)
1253        task.abort()
1254
1255
1256if __name__ == '__main__':
1257    unittest.main()
1258