monitor_db_unittest.py revision f66d51b5caa96995b91e7c155ff4378cdef4baaf
1#!/usr/bin/python
2#pylint: disable-msg=C0111
3
4import gc, time
5import common
6from autotest_lib.frontend import setup_django_environment
7from autotest_lib.frontend.afe import frontend_test_utils
8from autotest_lib.client.common_lib.test_utils import mock
9from autotest_lib.client.common_lib.test_utils import unittest
10from autotest_lib.database import database_connection
11from autotest_lib.frontend.afe import models
12from autotest_lib.scheduler import agent_task
13from autotest_lib.scheduler import monitor_db, drone_manager, email_manager
14from autotest_lib.scheduler import pidfile_monitor
15from autotest_lib.scheduler import scheduler_config, gc_stats
16from autotest_lib.scheduler import monitor_db_cleanup
17from autotest_lib.scheduler import monitor_db_functional_test
18from autotest_lib.scheduler import scheduler_lib
19from autotest_lib.scheduler import scheduler_models
20
21_DEBUG = False
22
23
24class DummyAgentTask(object):
25    num_processes = 1
26    owner_username = 'my_user'
27
28    def get_drone_hostnames_allowed(self):
29        return None
30
31
32class DummyAgent(object):
33    started = False
34    _is_done = False
35    host_ids = ()
36    queue_entry_ids = ()
37
38    def __init__(self):
39        self.task = DummyAgentTask()
40
41
42    def tick(self):
43        self.started = True
44
45
46    def is_done(self):
47        return self._is_done
48
49
50    def set_done(self, done):
51        self._is_done = done
52
53
54class IsRow(mock.argument_comparator):
55    def __init__(self, row_id):
56        self.row_id = row_id
57
58
59    def is_satisfied_by(self, parameter):
60        return list(parameter)[0] == self.row_id
61
62
63    def __str__(self):
64        return 'row with id %s' % self.row_id
65
66
67class IsAgentWithTask(mock.argument_comparator):
68    def __init__(self, task):
69        self._task = task
70
71
72    def is_satisfied_by(self, parameter):
73        if not isinstance(parameter, monitor_db.Agent):
74            return False
75        tasks = list(parameter.queue.queue)
76        if len(tasks) != 1:
77            return False
78        return tasks[0] == self._task
79
80
81def _set_host_and_qe_ids(agent_or_task, id_list=None):
82    if id_list is None:
83        id_list = []
84    agent_or_task.host_ids = agent_or_task.queue_entry_ids = id_list
85
86
87class BaseSchedulerTest(unittest.TestCase,
88                        frontend_test_utils.FrontendTestMixin):
89    _config_section = 'AUTOTEST_WEB'
90
91    def _do_query(self, sql):
92        self._database.execute(sql)
93
94
95    def _set_monitor_stubs(self):
96        # Clear the instance cache as this is a brand new database.
97        scheduler_models.DBObject._clear_instance_cache()
98
99        self._database = (
100            database_connection.TranslatingDatabase.get_test_database(
101                translators=monitor_db_functional_test._DB_TRANSLATORS))
102        self._database.connect(db_type='django')
103        self._database.debug = _DEBUG
104
105        connection_manager = scheduler_lib.ConnectionManager(autocommit=False)
106        self.god.stub_with(connection_manager, 'db_connection', self._database)
107        self.god.stub_with(monitor_db, '_db_manager', connection_manager)
108
109        # These tests only make sense if hosts are acquired inline with the
110        # rest of the tick.
111        self.god.stub_with(monitor_db, '_inline_host_acquisition', True)
112        self.god.stub_with(monitor_db.BaseDispatcher,
113                           '_get_pending_queue_entries',
114                           self._get_pending_hqes)
115        self.god.stub_with(scheduler_models, '_db', self._database)
116        self.god.stub_with(drone_manager.instance(), '_results_dir',
117                           '/test/path')
118        self.god.stub_with(drone_manager.instance(), '_temporary_directory',
119                           '/test/path/tmp')
120
121        monitor_db.initialize_globals()
122        scheduler_models.initialize_globals()
123
124
125    def setUp(self):
126        self._frontend_common_setup()
127        self._set_monitor_stubs()
128        self._dispatcher = monitor_db.Dispatcher()
129
130
131    def tearDown(self):
132        self._database.disconnect()
133        self._frontend_common_teardown()
134
135
136    def _update_hqe(self, set, where=''):
137        query = 'UPDATE afe_host_queue_entries SET ' + set
138        if where:
139            query += ' WHERE ' + where
140        self._do_query(query)
141
142
143    def _get_pending_hqes(self):
144        query_string=('afe_jobs.priority DESC, '
145                      'ifnull(nullif(host_id, NULL), host_id) DESC, '
146                      'ifnull(nullif(meta_host, NULL), meta_host) DESC, '
147                      'job_id')
148        return list(scheduler_models.HostQueueEntry.fetch(
149            joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
150            where='NOT complete AND NOT active AND status="Queued"',
151            order_by=query_string))
152
153
154class DispatcherSchedulingTest(BaseSchedulerTest):
155    _jobs_scheduled = []
156
157
158    def tearDown(self):
159        super(DispatcherSchedulingTest, self).tearDown()
160
161
162    def _set_monitor_stubs(self):
163        super(DispatcherSchedulingTest, self)._set_monitor_stubs()
164
165        def hqe__do_schedule_pre_job_tasks_stub(queue_entry):
166            """Called by HostQueueEntry.run()."""
167            self._record_job_scheduled(queue_entry.job.id, queue_entry.host.id)
168            queue_entry.set_status('Starting')
169
170        self.god.stub_with(scheduler_models.HostQueueEntry,
171                           '_do_schedule_pre_job_tasks',
172                           hqe__do_schedule_pre_job_tasks_stub)
173
174
175    def _record_job_scheduled(self, job_id, host_id):
176        record = (job_id, host_id)
177        self.assert_(record not in self._jobs_scheduled,
178                     'Job %d scheduled on host %d twice' %
179                     (job_id, host_id))
180        self._jobs_scheduled.append(record)
181
182
183    def _assert_job_scheduled_on(self, job_id, host_id):
184        record = (job_id, host_id)
185        self.assert_(record in self._jobs_scheduled,
186                     'Job %d not scheduled on host %d as expected\n'
187                     'Jobs scheduled: %s' %
188                     (job_id, host_id, self._jobs_scheduled))
189        self._jobs_scheduled.remove(record)
190
191
192    def _assert_job_scheduled_on_number_of(self, job_id, host_ids, number):
193        """Assert job was scheduled on exactly number hosts out of a set."""
194        found = []
195        for host_id in host_ids:
196            record = (job_id, host_id)
197            if record in self._jobs_scheduled:
198                found.append(record)
199                self._jobs_scheduled.remove(record)
200        if len(found) < number:
201            self.fail('Job %d scheduled on fewer than %d hosts in %s.\n'
202                      'Jobs scheduled: %s' % (job_id, number, host_ids, found))
203        elif len(found) > number:
204            self.fail('Job %d scheduled on more than %d hosts in %s.\n'
205                      'Jobs scheduled: %s' % (job_id, number, host_ids, found))
206
207
208    def _check_for_extra_schedulings(self):
209        if len(self._jobs_scheduled) != 0:
210            self.fail('Extra jobs scheduled: ' +
211                      str(self._jobs_scheduled))
212
213
214    def _convert_jobs_to_metahosts(self, *job_ids):
215        sql_tuple = '(' + ','.join(str(i) for i in job_ids) + ')'
216        self._do_query('UPDATE afe_host_queue_entries SET '
217                       'meta_host=host_id, host_id=NULL '
218                       'WHERE job_id IN ' + sql_tuple)
219
220
221    def _lock_host(self, host_id):
222        self._do_query('UPDATE afe_hosts SET locked=1 WHERE id=' +
223                       str(host_id))
224
225
226    def setUp(self):
227        super(DispatcherSchedulingTest, self).setUp()
228        self._jobs_scheduled = []
229
230
231    def _run_scheduler(self):
232        self._dispatcher._host_scheduler.tick()
233        for _ in xrange(2): # metahost scheduling can take two cycles
234            self._dispatcher._schedule_new_jobs()
235
236
237    def _test_basic_scheduling_helper(self, use_metahosts):
238        'Basic nonmetahost scheduling'
239        self._create_job_simple([1], use_metahosts)
240        self._create_job_simple([2], use_metahosts)
241        self._run_scheduler()
242        self._assert_job_scheduled_on(1, 1)
243        self._assert_job_scheduled_on(2, 2)
244        self._check_for_extra_schedulings()
245
246
247    def _test_priorities_helper(self, use_metahosts):
248        'Test prioritization ordering'
249        self._create_job_simple([1], use_metahosts)
250        self._create_job_simple([2], use_metahosts)
251        self._create_job_simple([1,2], use_metahosts)
252        self._create_job_simple([1], use_metahosts, priority=1)
253        self._run_scheduler()
254        self._assert_job_scheduled_on(4, 1) # higher priority
255        self._assert_job_scheduled_on(2, 2) # earlier job over later
256        self._check_for_extra_schedulings()
257
258
259    def _test_hosts_ready_helper(self, use_metahosts):
260        """
261        Only hosts that are status=Ready, unlocked and not invalid get
262        scheduled.
263        """
264        self._create_job_simple([1], use_metahosts)
265        self._do_query('UPDATE afe_hosts SET status="Running" WHERE id=1')
266        self._run_scheduler()
267        self._check_for_extra_schedulings()
268
269        self._do_query('UPDATE afe_hosts SET status="Ready", locked=1 '
270                       'WHERE id=1')
271        self._run_scheduler()
272        self._check_for_extra_schedulings()
273
274        self._do_query('UPDATE afe_hosts SET locked=0, invalid=1 '
275                       'WHERE id=1')
276        self._run_scheduler()
277        if not use_metahosts:
278            self._assert_job_scheduled_on(1, 1)
279        self._check_for_extra_schedulings()
280
281
282    def _test_hosts_idle_helper(self, use_metahosts):
283        'Only idle hosts get scheduled'
284        self._create_job(hosts=[1], active=True)
285        self._create_job_simple([1], use_metahosts)
286        self._run_scheduler()
287        self._check_for_extra_schedulings()
288
289
290    def _test_obey_ACLs_helper(self, use_metahosts):
291        self._do_query('DELETE FROM afe_acl_groups_hosts WHERE host_id=1')
292        self._create_job_simple([1], use_metahosts)
293        self._run_scheduler()
294        self._check_for_extra_schedulings()
295
296
297    def test_basic_scheduling(self):
298        self._test_basic_scheduling_helper(False)
299
300
301    def test_priorities(self):
302        self._test_priorities_helper(False)
303
304
305    def test_hosts_ready(self):
306        self._test_hosts_ready_helper(False)
307
308
309    def test_hosts_idle(self):
310        self._test_hosts_idle_helper(False)
311
312
313    def test_obey_ACLs(self):
314        self._test_obey_ACLs_helper(False)
315
316
317    def test_one_time_hosts_ignore_ACLs(self):
318        self._do_query('DELETE FROM afe_acl_groups_hosts WHERE host_id=1')
319        self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=1')
320        self._create_job_simple([1])
321        self._run_scheduler()
322        self._assert_job_scheduled_on(1, 1)
323        self._check_for_extra_schedulings()
324
325
326    def test_non_metahost_on_invalid_host(self):
327        """
328        Non-metahost entries can get scheduled on invalid hosts (this is how
329        one-time hosts work).
330        """
331        self._do_query('UPDATE afe_hosts SET invalid=1')
332        self._test_basic_scheduling_helper(False)
333
334
335    def test_metahost_scheduling(self):
336        """
337        Basic metahost scheduling
338        """
339        self._test_basic_scheduling_helper(True)
340
341
342    def test_metahost_priorities(self):
343        self._test_priorities_helper(True)
344
345
346    def test_metahost_hosts_ready(self):
347        self._test_hosts_ready_helper(True)
348
349
350    def test_metahost_hosts_idle(self):
351        self._test_hosts_idle_helper(True)
352
353
354    def test_metahost_obey_ACLs(self):
355        self._test_obey_ACLs_helper(True)
356
357
358    def test_nonmetahost_over_metahost(self):
359        """
360        Non-metahost entries should take priority over metahost entries
361        for the same host
362        """
363        self._create_job(metahosts=[1])
364        self._create_job(hosts=[1])
365        self._run_scheduler()
366        self._assert_job_scheduled_on(2, 1)
367        self._check_for_extra_schedulings()
368
369
370#    TODO: Revive this test.
371#    def test_HostScheduler_get_host_atomic_group_id(self):
372#        job = self._create_job(metahosts=[self.label6.id])
373#        queue_entry = scheduler_models.HostQueueEntry.fetch(
374#                where='job_id=%d' % job.id)[0]
375#        # Indirectly initialize the internal state of the host scheduler.
376#        self._dispatcher._refresh_pending_queue_entries()
377#
378#        # Test the host scheduler
379#        host_scheduler = self._dispatcher._host_scheduler
380#
381#
382#        # Two labels each in a different atomic group.  This should log an
383#        # error and continue.
384#        orig_logging_error = logging.error
385#        def mock_logging_error(message, *args):
386#            mock_logging_error._num_calls += 1
387#            # Test the logging call itself, we just wrapped it to count it.
388#            orig_logging_error(message, *args)
389#        mock_logging_error._num_calls = 0
390#        self.god.stub_with(logging, 'error', mock_logging_error)
391#        host_scheduler.refresh([])
392#        self.assertNotEquals(None, host_scheduler._get_host_atomic_group_id(
393#                [self.label4.id, self.label8.id], queue_entry))
394#        self.assertTrue(mock_logging_error._num_calls > 0)
395#        self.god.unstub(logging, 'error')
396#
397#        # Two labels both in the same atomic group, this should not raise an
398#        # error, it will merely cause the job to schedule on the intersection.
399#        self.assertEquals(1, host_scheduler._get_host_atomic_group_id(
400#                [self.label4.id, self.label5.id]))
401#
402#        self.assertEquals(None, host_scheduler._get_host_atomic_group_id([]))
403#        self.assertEquals(None, host_scheduler._get_host_atomic_group_id(
404#                [self.label3.id, self.label7.id, self.label6.id]))
405#        self.assertEquals(1, host_scheduler._get_host_atomic_group_id(
406#                [self.label4.id, self.label7.id, self.label6.id]))
407#        self.assertEquals(1, host_scheduler._get_host_atomic_group_id(
408#                [self.label7.id, self.label5.id]))
409
410    def test_only_schedule_queued_entries(self):
411        self._create_job(metahosts=[1])
412        self._update_hqe(set='active=1, host_id=2')
413        self._run_scheduler()
414        self._check_for_extra_schedulings()
415
416
417    def test_no_ready_hosts(self):
418        self._create_job(hosts=[1])
419        self._do_query('UPDATE afe_hosts SET status="Repair Failed"')
420        self._run_scheduler()
421        self._check_for_extra_schedulings()
422
423
424    def test_garbage_collection(self):
425        self.god.stub_with(self._dispatcher, '_seconds_between_garbage_stats',
426                           999999)
427        self.god.stub_function(gc, 'collect')
428        self.god.stub_function(gc_stats, '_log_garbage_collector_stats')
429        gc.collect.expect_call().and_return(0)
430        gc_stats._log_garbage_collector_stats.expect_call()
431        # Force a garbage collection run
432        self._dispatcher._last_garbage_stats_time = 0
433        self._dispatcher._garbage_collection()
434        # The previous call should have reset the time, it won't do anything
435        # the second time.  If it does, we'll get an unexpected call.
436        self._dispatcher._garbage_collection()
437
438
439    def test_overlapping_jobs(self):
440        """Test that we can detect overlapping jobs."""
441        self._create_job_simple([1], True)
442        self._run_scheduler()
443        self._do_query('UPDATE afe_hosts SET leased=0 where id=1')
444        self._create_job_simple([1], True)
445        self._run_scheduler()
446        jobs = monitor_db_cleanup.UserCleanup.get_overlapping_jobs()
447        self.assertTrue(jobs[0]['job_id'] == 1 and jobs[0]['host_id'] == 1 and
448                        jobs[1]['job_id'] == 2 and jobs[1]['host_id'] == 1)
449
450
451class DispatcherThrottlingTest(BaseSchedulerTest):
452    """
453    Test that the dispatcher throttles:
454     * total number of running processes
455     * number of processes started per cycle
456    """
457    _MAX_RUNNING = 3
458    _MAX_STARTED = 2
459
460    def setUp(self):
461        super(DispatcherThrottlingTest, self).setUp()
462        scheduler_config.config.max_processes_per_drone = self._MAX_RUNNING
463        scheduler_config.config.max_processes_started_per_cycle = (
464            self._MAX_STARTED)
465
466        def fake_max_runnable_processes(fake_self, username,
467                                        drone_hostnames_allowed):
468            running = sum(agent.task.num_processes
469                          for agent in self._agents
470                          if agent.started and not agent.is_done())
471            return self._MAX_RUNNING - running
472        self.god.stub_with(drone_manager.DroneManager, 'max_runnable_processes',
473                           fake_max_runnable_processes)
474
475
476    def _setup_some_agents(self, num_agents):
477        self._agents = [DummyAgent() for i in xrange(num_agents)]
478        self._dispatcher._agents = list(self._agents)
479
480
481    def _run_a_few_cycles(self):
482        for i in xrange(4):
483            self._dispatcher._handle_agents()
484
485
486    def _assert_agents_started(self, indexes, is_started=True):
487        for i in indexes:
488            self.assert_(self._agents[i].started == is_started,
489                         'Agent %d %sstarted' %
490                         (i, is_started and 'not ' or ''))
491
492
493    def _assert_agents_not_started(self, indexes):
494        self._assert_agents_started(indexes, False)
495
496
497    def test_throttle_total(self):
498        self._setup_some_agents(4)
499        self._run_a_few_cycles()
500        self._assert_agents_started([0, 1, 2])
501        self._assert_agents_not_started([3])
502
503
504    def test_throttle_per_cycle(self):
505        self._setup_some_agents(3)
506        self._dispatcher._handle_agents()
507        self._assert_agents_started([0, 1])
508        self._assert_agents_not_started([2])
509
510
511    def test_throttle_with_synchronous(self):
512        self._setup_some_agents(2)
513        self._agents[0].task.num_processes = 3
514        self._run_a_few_cycles()
515        self._assert_agents_started([0])
516        self._assert_agents_not_started([1])
517
518
519    def test_large_agent_starvation(self):
520        """
521        Ensure large agents don't get starved by lower-priority agents.
522        """
523        self._setup_some_agents(3)
524        self._agents[1].task.num_processes = 3
525        self._run_a_few_cycles()
526        self._assert_agents_started([0])
527        self._assert_agents_not_started([1, 2])
528
529        self._agents[0].set_done(True)
530        self._run_a_few_cycles()
531        self._assert_agents_started([1])
532        self._assert_agents_not_started([2])
533
534
535    def test_zero_process_agent(self):
536        self._setup_some_agents(5)
537        self._agents[4].task.num_processes = 0
538        self._run_a_few_cycles()
539        self._assert_agents_started([0, 1, 2, 4])
540        self._assert_agents_not_started([3])
541
542
543class PidfileRunMonitorTest(unittest.TestCase):
544    execution_tag = 'test_tag'
545    pid = 12345
546    process = drone_manager.Process('myhost', pid)
547    num_tests_failed = 1
548
549    def setUp(self):
550        self.god = mock.mock_god()
551        self.mock_drone_manager = self.god.create_mock_class(
552            drone_manager.DroneManager, 'drone_manager')
553        self.god.stub_with(pidfile_monitor, '_drone_manager',
554                           self.mock_drone_manager)
555        self.god.stub_function(email_manager.manager, 'enqueue_notify_email')
556        self.god.stub_with(pidfile_monitor, '_get_pidfile_timeout_secs',
557                           self._mock_get_pidfile_timeout_secs)
558
559        self.pidfile_id = object()
560
561        (self.mock_drone_manager.get_pidfile_id_from
562             .expect_call(self.execution_tag,
563                          pidfile_name=drone_manager.AUTOSERV_PID_FILE)
564             .and_return(self.pidfile_id))
565
566        self.monitor = pidfile_monitor.PidfileRunMonitor()
567        self.monitor.attach_to_existing_process(self.execution_tag)
568
569    def tearDown(self):
570        self.god.unstub_all()
571
572
573    def _mock_get_pidfile_timeout_secs(self):
574        return 300
575
576
577    def setup_pidfile(self, pid=None, exit_code=None, tests_failed=None,
578                      use_second_read=False):
579        contents = drone_manager.PidfileContents()
580        if pid is not None:
581            contents.process = drone_manager.Process('myhost', pid)
582        contents.exit_status = exit_code
583        contents.num_tests_failed = tests_failed
584        self.mock_drone_manager.get_pidfile_contents.expect_call(
585            self.pidfile_id, use_second_read=use_second_read).and_return(
586            contents)
587
588
589    def set_not_yet_run(self):
590        self.setup_pidfile()
591
592
593    def set_empty_pidfile(self):
594        self.setup_pidfile()
595
596
597    def set_running(self, use_second_read=False):
598        self.setup_pidfile(self.pid, use_second_read=use_second_read)
599
600
601    def set_complete(self, error_code, use_second_read=False):
602        self.setup_pidfile(self.pid, error_code, self.num_tests_failed,
603                           use_second_read=use_second_read)
604
605
606    def _check_monitor(self, expected_pid, expected_exit_status,
607                       expected_num_tests_failed):
608        if expected_pid is None:
609            self.assertEquals(self.monitor._state.process, None)
610        else:
611            self.assertEquals(self.monitor._state.process.pid, expected_pid)
612        self.assertEquals(self.monitor._state.exit_status, expected_exit_status)
613        self.assertEquals(self.monitor._state.num_tests_failed,
614                          expected_num_tests_failed)
615
616
617        self.god.check_playback()
618
619
620    def _test_read_pidfile_helper(self, expected_pid, expected_exit_status,
621                                  expected_num_tests_failed):
622        self.monitor._read_pidfile()
623        self._check_monitor(expected_pid, expected_exit_status,
624                            expected_num_tests_failed)
625
626
627    def _get_expected_tests_failed(self, expected_exit_status):
628        if expected_exit_status is None:
629            expected_tests_failed = None
630        else:
631            expected_tests_failed = self.num_tests_failed
632        return expected_tests_failed
633
634
635    def test_read_pidfile(self):
636        self.set_not_yet_run()
637        self._test_read_pidfile_helper(None, None, None)
638
639        self.set_empty_pidfile()
640        self._test_read_pidfile_helper(None, None, None)
641
642        self.set_running()
643        self._test_read_pidfile_helper(self.pid, None, None)
644
645        self.set_complete(123)
646        self._test_read_pidfile_helper(self.pid, 123, self.num_tests_failed)
647
648
649    def test_read_pidfile_error(self):
650        self.mock_drone_manager.get_pidfile_contents.expect_call(
651            self.pidfile_id, use_second_read=False).and_return(
652            drone_manager.InvalidPidfile('error'))
653        self.assertRaises(pidfile_monitor.PidfileRunMonitor._PidfileException,
654                          self.monitor._read_pidfile)
655        self.god.check_playback()
656
657
658    def setup_is_running(self, is_running):
659        self.mock_drone_manager.is_process_running.expect_call(
660            self.process).and_return(is_running)
661
662
663    def _test_get_pidfile_info_helper(self, expected_pid, expected_exit_status,
664                                      expected_num_tests_failed):
665        self.monitor._get_pidfile_info()
666        self._check_monitor(expected_pid, expected_exit_status,
667                            expected_num_tests_failed)
668
669
670    def test_get_pidfile_info(self):
671        """
672        normal cases for get_pidfile_info
673        """
674        # running
675        self.set_running()
676        self.setup_is_running(True)
677        self._test_get_pidfile_info_helper(self.pid, None, None)
678
679        # exited during check
680        self.set_running()
681        self.setup_is_running(False)
682        self.set_complete(123, use_second_read=True) # pidfile gets read again
683        self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed)
684
685        # completed
686        self.set_complete(123)
687        self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed)
688
689
690    def test_get_pidfile_info_running_no_proc(self):
691        """
692        pidfile shows process running, but no proc exists
693        """
694        # running but no proc
695        self.set_running()
696        self.setup_is_running(False)
697        self.set_running(use_second_read=True)
698        email_manager.manager.enqueue_notify_email.expect_call(
699            mock.is_string_comparator(), mock.is_string_comparator())
700        self._test_get_pidfile_info_helper(self.pid, 1, 0)
701        self.assertTrue(self.monitor.lost_process)
702
703
704    def test_get_pidfile_info_not_yet_run(self):
705        """
706        pidfile hasn't been written yet
707        """
708        self.set_not_yet_run()
709        self._test_get_pidfile_info_helper(None, None, None)
710
711
712    def test_process_failed_to_write_pidfile(self):
713        self.set_not_yet_run()
714        email_manager.manager.enqueue_notify_email.expect_call(
715            mock.is_string_comparator(), mock.is_string_comparator())
716        self.monitor._start_time = (time.time() -
717                                    pidfile_monitor._get_pidfile_timeout_secs() - 1)
718        self._test_get_pidfile_info_helper(None, 1, 0)
719        self.assertTrue(self.monitor.lost_process)
720
721
722class AgentTest(unittest.TestCase):
723    def setUp(self):
724        self.god = mock.mock_god()
725        self._dispatcher = self.god.create_mock_class(monitor_db.Dispatcher,
726                                                      'dispatcher')
727
728
729    def tearDown(self):
730        self.god.unstub_all()
731
732
733    def _create_mock_task(self, name):
734        task = self.god.create_mock_class(agent_task.AgentTask, name)
735        task.num_processes = 1
736        _set_host_and_qe_ids(task)
737        return task
738
739    def _create_agent(self, task):
740        agent = monitor_db.Agent(task)
741        agent.dispatcher = self._dispatcher
742        return agent
743
744
745    def _finish_agent(self, agent):
746        while not agent.is_done():
747            agent.tick()
748
749
750    def test_agent_abort(self):
751        task = self._create_mock_task('task')
752        task.poll.expect_call()
753        task.is_done.expect_call().and_return(False)
754        task.abort.expect_call()
755        task.aborted = True
756
757        agent = self._create_agent(task)
758        agent.tick()
759        agent.abort()
760        self._finish_agent(agent)
761        self.god.check_playback()
762
763
764    def _test_agent_abort_before_started_helper(self, ignore_abort=False):
765        task = self._create_mock_task('task')
766        task.abort.expect_call()
767        if ignore_abort:
768            task.aborted = False
769            task.poll.expect_call()
770            task.is_done.expect_call().and_return(True)
771            task.success = True
772        else:
773            task.aborted = True
774
775        agent = self._create_agent(task)
776        agent.abort()
777        self._finish_agent(agent)
778        self.god.check_playback()
779
780
781    def test_agent_abort_before_started(self):
782        self._test_agent_abort_before_started_helper()
783        self._test_agent_abort_before_started_helper(True)
784
785
786class JobSchedulingTest(BaseSchedulerTest):
787    def _test_run_helper(self, expect_agent=True, expect_starting=False,
788                         expect_pending=False):
789        if expect_starting:
790            expected_status = models.HostQueueEntry.Status.STARTING
791        elif expect_pending:
792            expected_status = models.HostQueueEntry.Status.PENDING
793        else:
794            expected_status = models.HostQueueEntry.Status.VERIFYING
795        job = scheduler_models.Job.fetch('id = 1')[0]
796        queue_entry = scheduler_models.HostQueueEntry.fetch('id = 1')[0]
797        assert queue_entry.job is job
798        job.run_if_ready(queue_entry)
799
800        self.god.check_playback()
801
802        self._dispatcher._schedule_delay_tasks()
803        self._dispatcher._schedule_running_host_queue_entries()
804        agent = self._dispatcher._agents[0]
805
806        actual_status = models.HostQueueEntry.smart_get(1).status
807        self.assertEquals(expected_status, actual_status)
808
809        if not expect_agent:
810            self.assertEquals(agent, None)
811            return
812
813        self.assert_(isinstance(agent, monitor_db.Agent))
814        self.assert_(agent.task)
815        return agent.task
816
817
818    def test_run_if_ready_delays(self):
819        # Also tests Job.run_with_ready_delay() on atomic group jobs.
820        django_job = self._create_job(hosts=[5, 6], atomic_group=1)
821        job = scheduler_models.Job(django_job.id)
822        self.assertEqual(1, job.synch_count)
823        django_hqes = list(models.HostQueueEntry.objects.filter(job=job.id))
824        self.assertEqual(2, len(django_hqes))
825        self.assertEqual(2, django_hqes[0].atomic_group.max_number_of_machines)
826
827        def set_hqe_status(django_hqe, status):
828            django_hqe.status = status
829            django_hqe.save()
830            scheduler_models.HostQueueEntry(django_hqe.id).host.set_status(status)
831
832        # An initial state, our synch_count is 1
833        set_hqe_status(django_hqes[0], models.HostQueueEntry.Status.VERIFYING)
834        set_hqe_status(django_hqes[1], models.HostQueueEntry.Status.PENDING)
835
836        # So that we don't depend on the config file value during the test.
837        self.assert_(scheduler_config.config
838                     .secs_to_wait_for_atomic_group_hosts is not None)
839        self.god.stub_with(scheduler_config.config,
840                           'secs_to_wait_for_atomic_group_hosts', 123456)
841
842        # Get the pending one as a scheduler_models.HostQueueEntry object.
843        hqe = scheduler_models.HostQueueEntry(django_hqes[1].id)
844        self.assert_(not job._delay_ready_task)
845        self.assertTrue(job.is_ready())
846
847        # Ready with one pending, one verifying and an atomic group should
848        # result in a DelayCallTask to re-check if we're ready a while later.
849        job.run_if_ready(hqe)
850        self.assertEquals('Waiting', hqe.status)
851        self._dispatcher._schedule_delay_tasks()
852        self.assertEquals('Pending', hqe.status)
853        agent = self._dispatcher._agents[0]
854        self.assert_(job._delay_ready_task)
855        self.assert_(isinstance(agent, monitor_db.Agent))
856        self.assert_(agent.task)
857        delay_task = agent.task
858        self.assert_(isinstance(delay_task, scheduler_models.DelayedCallTask))
859        self.assert_(not delay_task.is_done())
860
861        self.god.stub_function(delay_task, 'abort')
862
863        self.god.stub_function(job, 'run')
864
865        self.god.stub_function(job, '_pending_count')
866        self.god.stub_with(job, 'synch_count', 9)
867        self.god.stub_function(job, 'request_abort')
868
869        # Test that the DelayedCallTask's callback queued up above does the
870        # correct thing and does not call run if there are not enough hosts
871        # in pending after the delay.
872        job._pending_count.expect_call().and_return(0)
873        job.request_abort.expect_call()
874        delay_task._callback()
875        self.god.check_playback()
876
877        # Test that the DelayedCallTask's callback queued up above does the
878        # correct thing and returns the Agent returned by job.run() if
879        # there are still enough hosts pending after the delay.
880        job.synch_count = 4
881        job._pending_count.expect_call().and_return(4)
882        job.run.expect_call(hqe)
883        delay_task._callback()
884        self.god.check_playback()
885
886        job._pending_count.expect_call().and_return(4)
887
888        # Adjust the delay deadline so that enough time has passed.
889        job._delay_ready_task.end_time = time.time() - 111111
890        job.run.expect_call(hqe)
891        # ...the delay_expired condition should cause us to call run()
892        self._dispatcher._handle_agents()
893        self.god.check_playback()
894        delay_task.success = False
895
896        # Adjust the delay deadline back so that enough time has not passed.
897        job._delay_ready_task.end_time = time.time() + 111111
898        self._dispatcher._handle_agents()
899        self.god.check_playback()
900
901        # Now max_number_of_machines HQEs are in pending state.  Remaining
902        # delay will now be ignored.
903        other_hqe = scheduler_models.HostQueueEntry(django_hqes[0].id)
904        self.god.unstub(job, 'run')
905        self.god.unstub(job, '_pending_count')
906        self.god.unstub(job, 'synch_count')
907        self.god.unstub(job, 'request_abort')
908        # ...the over_max_threshold test should cause us to call run()
909        delay_task.abort.expect_call()
910        other_hqe.on_pending()
911        self.assertEquals('Starting', other_hqe.status)
912        self.assertEquals('Starting', hqe.status)
913        self.god.stub_function(job, 'run')
914        self.god.unstub(delay_task, 'abort')
915
916        hqe.set_status('Pending')
917        other_hqe.set_status('Pending')
918        # Now we're not over the max for the atomic group.  But all assigned
919        # hosts are in pending state.  over_max_threshold should make us run().
920        hqe.atomic_group.max_number_of_machines += 1
921        hqe.atomic_group.save()
922        job.run.expect_call(hqe)
923        hqe.on_pending()
924        self.god.check_playback()
925        hqe.atomic_group.max_number_of_machines -= 1
926        hqe.atomic_group.save()
927
928        other_hqe = scheduler_models.HostQueueEntry(django_hqes[0].id)
929        self.assertTrue(hqe.job is other_hqe.job)
930        # DBObject classes should reuse instances so these should be the same.
931        self.assertEqual(job, other_hqe.job)
932        self.assertEqual(other_hqe.job, hqe.job)
933        # Be sure our delay was not lost during the other_hqe construction.
934        self.assertEqual(job._delay_ready_task, delay_task)
935        self.assert_(job._delay_ready_task)
936        self.assertFalse(job._delay_ready_task.is_done())
937        self.assertFalse(job._delay_ready_task.aborted)
938
939        # We want the real run() to be called below.
940        self.god.unstub(job, 'run')
941
942        # We pass in the other HQE this time the same way it would happen
943        # for real when one host finishes verifying and enters pending.
944        job.run_if_ready(other_hqe)
945
946        # The delayed task must be aborted by the actual run() call above.
947        self.assertTrue(job._delay_ready_task.aborted)
948        self.assertFalse(job._delay_ready_task.success)
949        self.assertTrue(job._delay_ready_task.is_done())
950
951        # Check that job run() and _finish_run() were called by the above:
952        self._dispatcher._schedule_running_host_queue_entries()
953        agent = self._dispatcher._agents[0]
954        self.assert_(agent.task)
955        task = agent.task
956        self.assert_(isinstance(task, monitor_db.QueueTask))
957        # Requery these hqes in order to verify the status from the DB.
958        django_hqes = list(models.HostQueueEntry.objects.filter(job=job.id))
959        for entry in django_hqes:
960            self.assertEqual(models.HostQueueEntry.Status.STARTING,
961                             entry.status)
962
963        # We're already running, but more calls to run_with_ready_delay can
964        # continue to come in due to straggler hosts enter Pending.  Make
965        # sure we don't do anything.
966        self.god.stub_function(job, 'run')
967        job.run_with_ready_delay(hqe)
968        self.god.check_playback()
969        self.god.unstub(job, 'run')
970
971
972    def test_run_synchronous_atomic_group_ready(self):
973        self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True)
974        self._update_hqe("status='Pending', execution_subdir=''")
975
976        queue_task = self._test_run_helper(expect_starting=True)
977
978        self.assert_(isinstance(queue_task, monitor_db.QueueTask))
979        # Atomic group jobs that do not depend on a specific label in the
980        # atomic group will use the atomic group name as their group name.
981        self.assertEquals(queue_task.queue_entries[0].get_group_name(),
982                          'atomic1')
983
984
985    def test_run_synchronous_atomic_group_with_label_ready(self):
986        job = self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True)
987        job.dependency_labels.add(self.label4)
988        self._update_hqe("status='Pending', execution_subdir=''")
989
990        queue_task = self._test_run_helper(expect_starting=True)
991
992        self.assert_(isinstance(queue_task, monitor_db.QueueTask))
993        # Atomic group jobs that also specify a label in the atomic group
994        # will use the label name as their group name.
995        self.assertEquals(queue_task.queue_entries[0].get_group_name(),
996                          'label4')
997
998
999    def test_run_synchronous_ready(self):
1000        self._create_job(hosts=[1, 2], synchronous=True)
1001        self._update_hqe("status='Pending', execution_subdir=''")
1002
1003        queue_task = self._test_run_helper(expect_starting=True)
1004
1005        self.assert_(isinstance(queue_task, monitor_db.QueueTask))
1006        self.assertEquals(queue_task.job.id, 1)
1007        hqe_ids = [hqe.id for hqe in queue_task.queue_entries]
1008        self.assertEquals(hqe_ids, [1, 2])
1009
1010
1011    def test_schedule_running_host_queue_entries_fail(self):
1012        self._create_job(hosts=[2])
1013        self._update_hqe("status='%s', execution_subdir=''" %
1014                         models.HostQueueEntry.Status.PENDING)
1015        job = scheduler_models.Job.fetch('id = 1')[0]
1016        queue_entry = scheduler_models.HostQueueEntry.fetch('id = 1')[0]
1017        assert queue_entry.job is job
1018        job.run_if_ready(queue_entry)
1019        self.assertEqual(queue_entry.status,
1020                         models.HostQueueEntry.Status.STARTING)
1021        self.assert_(queue_entry.execution_subdir)
1022        self.god.check_playback()
1023
1024        class dummy_test_agent(object):
1025            task = 'dummy_test_agent'
1026        self._dispatcher._register_agent_for_ids(
1027                self._dispatcher._host_agents, [queue_entry.host.id],
1028                dummy_test_agent)
1029
1030        # Attempted to schedule on a host that already has an agent.
1031        self.assertRaises(scheduler_lib.SchedulerError,
1032                          self._dispatcher._schedule_running_host_queue_entries)
1033
1034
1035    def test_schedule_hostless_job(self):
1036        job = self._create_job(hostless=True)
1037        self.assertEqual(1, job.hostqueueentry_set.count())
1038        hqe_query = scheduler_models.HostQueueEntry.fetch(
1039                'id = %s' % job.hostqueueentry_set.all()[0].id)
1040        self.assertEqual(1, len(hqe_query))
1041        hqe = hqe_query[0]
1042
1043        self.assertEqual(models.HostQueueEntry.Status.QUEUED, hqe.status)
1044        self.assertEqual(0, len(self._dispatcher._agents))
1045
1046        self._dispatcher._schedule_new_jobs()
1047
1048        self.assertEqual(models.HostQueueEntry.Status.STARTING, hqe.status)
1049        self.assertEqual(1, len(self._dispatcher._agents))
1050
1051        self._dispatcher._schedule_new_jobs()
1052
1053        # No change to previously schedule hostless job, and no additional agent
1054        self.assertEqual(models.HostQueueEntry.Status.STARTING, hqe.status)
1055        self.assertEqual(1, len(self._dispatcher._agents))
1056
1057
1058class TopLevelFunctionsTest(unittest.TestCase):
1059    def setUp(self):
1060        self.god = mock.mock_god()
1061
1062
1063    def tearDown(self):
1064        self.god.unstub_all()
1065
1066
1067    def test_autoserv_command_line(self):
1068        machines = 'abcd12,efgh34'
1069        extra_args = ['-Z', 'hello']
1070        expected_command_line_base = set((monitor_db._autoserv_path, '-p',
1071                                          '-m', machines, '-r',
1072                                          drone_manager.WORKING_DIRECTORY))
1073
1074        expected_command_line = expected_command_line_base.union(
1075                ['--verbose']).union(extra_args)
1076        command_line = set(
1077                monitor_db._autoserv_command_line(machines, extra_args))
1078        self.assertEqual(expected_command_line, command_line)
1079
1080        class FakeJob(object):
1081            owner = 'Bob'
1082            name = 'fake job name'
1083            test_retry = 0
1084            id = 1337
1085
1086        class FakeHQE(object):
1087            job = FakeJob
1088
1089        expected_command_line = expected_command_line_base.union(
1090                ['-u', FakeJob.owner, '-l', FakeJob.name])
1091        command_line = set(monitor_db._autoserv_command_line(
1092                machines, extra_args=[], queue_entry=FakeHQE, verbose=False))
1093        self.assertEqual(expected_command_line, command_line)
1094
1095
1096class AgentTaskTest(unittest.TestCase,
1097                    frontend_test_utils.FrontendTestMixin):
1098    def setUp(self):
1099        self._frontend_common_setup()
1100
1101
1102    def tearDown(self):
1103        self._frontend_common_teardown()
1104
1105
1106    def _setup_drones(self):
1107        self.god.stub_function(models.DroneSet, 'drone_sets_enabled')
1108        models.DroneSet.drone_sets_enabled.expect_call().and_return(True)
1109
1110        drones = []
1111        for x in xrange(4):
1112            drones.append(models.Drone.objects.create(hostname=str(x)))
1113
1114        drone_set_1 = models.DroneSet.objects.create(name='1')
1115        drone_set_1.drones.add(*drones[0:2])
1116        drone_set_2 = models.DroneSet.objects.create(name='2')
1117        drone_set_2.drones.add(*drones[2:4])
1118        drone_set_3 = models.DroneSet.objects.create(name='3')
1119
1120        job_1 = self._create_job_simple([self.hosts[0].id],
1121                                        drone_set=drone_set_1)
1122        job_2 = self._create_job_simple([self.hosts[0].id],
1123                                        drone_set=drone_set_2)
1124        job_3 = self._create_job_simple([self.hosts[0].id],
1125                                        drone_set=drone_set_3)
1126
1127        job_4 = self._create_job_simple([self.hosts[0].id])
1128        job_4.drone_set = None
1129        job_4.save()
1130
1131        hqe_1 = job_1.hostqueueentry_set.all()[0]
1132        hqe_2 = job_2.hostqueueentry_set.all()[0]
1133        hqe_3 = job_3.hostqueueentry_set.all()[0]
1134        hqe_4 = job_4.hostqueueentry_set.all()[0]
1135
1136        return (hqe_1, hqe_2, hqe_3, hqe_4), agent_task.AgentTask()
1137
1138
1139    def test_get_drone_hostnames_allowed_no_drones_in_set(self):
1140        hqes, task = self._setup_drones()
1141        task.queue_entry_ids = (hqes[2].id,)
1142        self.assertEqual(set(), task.get_drone_hostnames_allowed())
1143        self.god.check_playback()
1144
1145
1146    def test_get_drone_hostnames_allowed_no_drone_set(self):
1147        hqes, task = self._setup_drones()
1148        hqe = hqes[3]
1149        task.queue_entry_ids = (hqe.id,)
1150
1151        result = object()
1152
1153        self.god.stub_function(task, '_user_or_global_default_drone_set')
1154        task._user_or_global_default_drone_set.expect_call(
1155                hqe.job, hqe.job.user()).and_return(result)
1156
1157        self.assertEqual(result, task.get_drone_hostnames_allowed())
1158        self.god.check_playback()
1159
1160
1161    def test_get_drone_hostnames_allowed_success(self):
1162        hqes, task = self._setup_drones()
1163        task.queue_entry_ids = (hqes[0].id,)
1164        self.assertEqual(set(('0','1')), task.get_drone_hostnames_allowed())
1165        self.god.check_playback()
1166
1167
1168    def test_get_drone_hostnames_allowed_multiple_jobs(self):
1169        hqes, task = self._setup_drones()
1170        task.queue_entry_ids = (hqes[0].id, hqes[1].id)
1171        self.assertRaises(AssertionError,
1172                          task.get_drone_hostnames_allowed)
1173        self.god.check_playback()
1174
1175
1176    def test_get_drone_hostnames_allowed_no_hqe(self):
1177        class MockSpecialTask(object):
1178            requested_by = object()
1179
1180        class MockSpecialAgentTask(agent_task.SpecialAgentTask):
1181            task = MockSpecialTask()
1182            queue_entry_ids = []
1183            def __init__(self, *args, **kwargs):
1184                pass
1185
1186        task = MockSpecialAgentTask()
1187        self.god.stub_function(models.DroneSet, 'drone_sets_enabled')
1188        self.god.stub_function(task, '_user_or_global_default_drone_set')
1189
1190        result = object()
1191        models.DroneSet.drone_sets_enabled.expect_call().and_return(True)
1192        task._user_or_global_default_drone_set.expect_call(
1193                task.task, MockSpecialTask.requested_by).and_return(result)
1194
1195        self.assertEqual(result, task.get_drone_hostnames_allowed())
1196        self.god.check_playback()
1197
1198
1199    def _setup_test_user_or_global_default_drone_set(self):
1200        result = object()
1201        class MockDroneSet(object):
1202            def get_drone_hostnames(self):
1203                return result
1204
1205        self.god.stub_function(models.DroneSet, 'get_default')
1206        models.DroneSet.get_default.expect_call().and_return(MockDroneSet())
1207        return result
1208
1209
1210    def test_user_or_global_default_drone_set(self):
1211        expected = object()
1212        class MockDroneSet(object):
1213            def get_drone_hostnames(self):
1214                return expected
1215        class MockUser(object):
1216            drone_set = MockDroneSet()
1217
1218        self._setup_test_user_or_global_default_drone_set()
1219
1220        actual = agent_task.AgentTask()._user_or_global_default_drone_set(
1221                None, MockUser())
1222
1223        self.assertEqual(expected, actual)
1224        self.god.check_playback()
1225
1226
1227    def test_user_or_global_default_drone_set_no_user(self):
1228        expected = self._setup_test_user_or_global_default_drone_set()
1229        actual = agent_task.AgentTask()._user_or_global_default_drone_set(
1230                None, None)
1231
1232        self.assertEqual(expected, actual)
1233        self.god.check_playback()
1234
1235
1236    def test_user_or_global_default_drone_set_no_user_drone_set(self):
1237        class MockUser(object):
1238            drone_set = None
1239            login = None
1240
1241        expected = self._setup_test_user_or_global_default_drone_set()
1242        actual = agent_task.AgentTask()._user_or_global_default_drone_set(
1243                None, MockUser())
1244
1245        self.assertEqual(expected, actual)
1246        self.god.check_playback()
1247
1248
1249    def test_abort_HostlessQueueTask(self):
1250        hqe = self.god.create_mock_class(scheduler_models.HostQueueEntry,
1251                                         'HostQueueEntry')
1252        # If hqe is still in STARTING status, aborting the task should finish
1253        # without changing hqe's status.
1254        hqe.status = models.HostQueueEntry.Status.STARTING
1255        hqe.job = None
1256        hqe.id = 0
1257        task = monitor_db.HostlessQueueTask(hqe)
1258        task.abort()
1259
1260        # If hqe is in RUNNING status, aborting the task should change hqe's
1261        # status to Parsing, so FinalReparseTask can be scheduled.
1262        hqe.set_status.expect_call('Parsing')
1263        hqe.status = models.HostQueueEntry.Status.RUNNING
1264        hqe.job = None
1265        hqe.id = 0
1266        task = monitor_db.HostlessQueueTask(hqe)
1267        task.abort()
1268
1269
1270if __name__ == '__main__':
1271    unittest.main()
1272