1#!/usr/bin/python
2
3import gc, time
4import common
5from autotest_lib.frontend import setup_django_environment
6from autotest_lib.frontend.afe import frontend_test_utils
7from autotest_lib.client.common_lib import global_config
8from autotest_lib.client.common_lib.test_utils import mock
9from autotest_lib.client.common_lib.test_utils import unittest
10from autotest_lib.database import database_connection
11from autotest_lib.frontend.afe import models
12from autotest_lib.scheduler import agent_task
13from autotest_lib.scheduler import monitor_db, drone_manager
14from autotest_lib.scheduler import pidfile_monitor
15from autotest_lib.scheduler import scheduler_config, gc_stats
16from autotest_lib.scheduler import scheduler_lib
17from autotest_lib.scheduler import scheduler_models
18
19_DEBUG = False
20
21
22class DummyAgentTask(object):
23    num_processes = 1
24    owner_username = 'my_user'
25
26    def get_drone_hostnames_allowed(self):
27        return None
28
29
30class DummyAgent(object):
31    started = False
32    _is_done = False
33    host_ids = ()
34    hostnames = {}
35    queue_entry_ids = ()
36
37    def __init__(self):
38        self.task = DummyAgentTask()
39
40
41    def tick(self):
42        self.started = True
43
44
45    def is_done(self):
46        return self._is_done
47
48
49    def set_done(self, done):
50        self._is_done = done
51
52
53class IsRow(mock.argument_comparator):
54    def __init__(self, row_id):
55        self.row_id = row_id
56
57
58    def is_satisfied_by(self, parameter):
59        return list(parameter)[0] == self.row_id
60
61
62    def __str__(self):
63        return 'row with id %s' % self.row_id
64
65
66class IsAgentWithTask(mock.argument_comparator):
67    def __init__(self, task):
68        self._task = task
69
70
71    def is_satisfied_by(self, parameter):
72        if not isinstance(parameter, monitor_db.Agent):
73            return False
74        tasks = list(parameter.queue.queue)
75        if len(tasks) != 1:
76            return False
77        return tasks[0] == self._task
78
79
80def _set_host_and_qe_ids(agent_or_task, id_list=None):
81    if id_list is None:
82        id_list = []
83    agent_or_task.host_ids = agent_or_task.queue_entry_ids = id_list
84    agent_or_task.hostnames = dict((host_id, '192.168.1.1')
85                                   for host_id in id_list)
86
87
88class BaseSchedulerTest(unittest.TestCase,
89                        frontend_test_utils.FrontendTestMixin):
90    _config_section = 'AUTOTEST_WEB'
91
92    def _do_query(self, sql):
93        self._database.execute(sql)
94
95
96    def _set_monitor_stubs(self):
97        # Clear the instance cache as this is a brand new database.
98        scheduler_models.DBObject._clear_instance_cache()
99
100        self._database = (
101            database_connection.TranslatingDatabase.get_test_database(
102                translators=scheduler_lib._DB_TRANSLATORS))
103        self._database.connect(db_type='django')
104        self._database.debug = _DEBUG
105
106        connection_manager = scheduler_lib.ConnectionManager(autocommit=False)
107        self.god.stub_with(connection_manager, 'db_connection', self._database)
108        self.god.stub_with(monitor_db, '_db_manager', connection_manager)
109        self.god.stub_with(monitor_db, '_db', self._database)
110
111        # These tests only make sense if hosts are acquired inline with the
112        # rest of the tick.
113        self.god.stub_with(monitor_db, '_inline_host_acquisition', True)
114        self.god.stub_with(monitor_db.BaseDispatcher,
115                           '_get_pending_queue_entries',
116                           self._get_pending_hqes)
117        self.god.stub_with(scheduler_models, '_db', self._database)
118        self.god.stub_with(drone_manager.instance(), '_results_dir',
119                           '/test/path')
120        self.god.stub_with(drone_manager.instance(), '_temporary_directory',
121                           '/test/path/tmp')
122        self.god.stub_with(drone_manager.instance(), 'initialize',
123                           lambda *args: None)
124        self.god.stub_with(drone_manager.instance(), 'execute_actions',
125                           lambda *args: None)
126
127        monitor_db.initialize_globals()
128        scheduler_models.initialize_globals()
129
130
131    def setUp(self):
132        self._frontend_common_setup()
133        self._set_monitor_stubs()
134        self._dispatcher = monitor_db.Dispatcher()
135
136
137    def tearDown(self):
138        self._database.disconnect()
139        self._frontend_common_teardown()
140
141
142    def _update_hqe(self, set, where=''):
143        query = 'UPDATE afe_host_queue_entries SET ' + set
144        if where:
145            query += ' WHERE ' + where
146        self._do_query(query)
147
148
149    def _get_pending_hqes(self):
150        query_string=('afe_jobs.priority DESC, '
151                      'ifnull(nullif(host_id, NULL), host_id) DESC, '
152                      'ifnull(nullif(meta_host, NULL), meta_host) DESC, '
153                      'job_id')
154        return list(scheduler_models.HostQueueEntry.fetch(
155            joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
156            where='NOT complete AND NOT active AND status="Queued"',
157            order_by=query_string))
158
159
160class DispatcherSchedulingTest(BaseSchedulerTest):
161    _jobs_scheduled = []
162
163
164    def tearDown(self):
165        super(DispatcherSchedulingTest, self).tearDown()
166
167
168    def _set_monitor_stubs(self):
169        super(DispatcherSchedulingTest, self)._set_monitor_stubs()
170
171        def hqe__do_schedule_pre_job_tasks_stub(queue_entry):
172            """Called by HostQueueEntry.run()."""
173            self._record_job_scheduled(queue_entry.job.id, queue_entry.host.id)
174            queue_entry.set_status('Starting')
175
176        self.god.stub_with(scheduler_models.HostQueueEntry,
177                           '_do_schedule_pre_job_tasks',
178                           hqe__do_schedule_pre_job_tasks_stub)
179
180
181    def _record_job_scheduled(self, job_id, host_id):
182        record = (job_id, host_id)
183        self.assert_(record not in self._jobs_scheduled,
184                     'Job %d scheduled on host %d twice' %
185                     (job_id, host_id))
186        self._jobs_scheduled.append(record)
187
188
189    def _assert_job_scheduled_on(self, job_id, host_id):
190        record = (job_id, host_id)
191        self.assert_(record in self._jobs_scheduled,
192                     'Job %d not scheduled on host %d as expected\n'
193                     'Jobs scheduled: %s' %
194                     (job_id, host_id, self._jobs_scheduled))
195        self._jobs_scheduled.remove(record)
196
197
198    def _assert_job_scheduled_on_number_of(self, job_id, host_ids, number):
199        """Assert job was scheduled on exactly number hosts out of a set."""
200        found = []
201        for host_id in host_ids:
202            record = (job_id, host_id)
203            if record in self._jobs_scheduled:
204                found.append(record)
205                self._jobs_scheduled.remove(record)
206        if len(found) < number:
207            self.fail('Job %d scheduled on fewer than %d hosts in %s.\n'
208                      'Jobs scheduled: %s' % (job_id, number, host_ids, found))
209        elif len(found) > number:
210            self.fail('Job %d scheduled on more than %d hosts in %s.\n'
211                      'Jobs scheduled: %s' % (job_id, number, host_ids, found))
212
213
214    def _check_for_extra_schedulings(self):
215        if len(self._jobs_scheduled) != 0:
216            self.fail('Extra jobs scheduled: ' +
217                      str(self._jobs_scheduled))
218
219
220    def _convert_jobs_to_metahosts(self, *job_ids):
221        sql_tuple = '(' + ','.join(str(i) for i in job_ids) + ')'
222        self._do_query('UPDATE afe_host_queue_entries SET '
223                       'meta_host=host_id, host_id=NULL '
224                       'WHERE job_id IN ' + sql_tuple)
225
226
227    def _lock_host(self, host_id):
228        self._do_query('UPDATE afe_hosts SET locked=1 WHERE id=' +
229                       str(host_id))
230
231
232    def setUp(self):
233        super(DispatcherSchedulingTest, self).setUp()
234        self._jobs_scheduled = []
235
236
237    def _run_scheduler(self):
238        self._dispatcher._host_scheduler.tick()
239        for _ in xrange(2): # metahost scheduling can take two ticks
240            self._dispatcher._schedule_new_jobs()
241
242
243    def _test_basic_scheduling_helper(self, use_metahosts):
244        'Basic nonmetahost scheduling'
245        self._create_job_simple([1], use_metahosts)
246        self._create_job_simple([2], use_metahosts)
247        self._run_scheduler()
248        self._assert_job_scheduled_on(1, 1)
249        self._assert_job_scheduled_on(2, 2)
250        self._check_for_extra_schedulings()
251
252
253    def _test_priorities_helper(self, use_metahosts):
254        'Test prioritization ordering'
255        self._create_job_simple([1], use_metahosts)
256        self._create_job_simple([2], use_metahosts)
257        self._create_job_simple([1,2], use_metahosts)
258        self._create_job_simple([1], use_metahosts, priority=1)
259        self._run_scheduler()
260        self._assert_job_scheduled_on(4, 1) # higher priority
261        self._assert_job_scheduled_on(2, 2) # earlier job over later
262        self._check_for_extra_schedulings()
263
264
265    def _test_hosts_ready_helper(self, use_metahosts):
266        """
267        Only hosts that are status=Ready, unlocked and not invalid get
268        scheduled.
269        """
270        self._create_job_simple([1], use_metahosts)
271        self._do_query('UPDATE afe_hosts SET status="Running" WHERE id=1')
272        self._run_scheduler()
273        self._check_for_extra_schedulings()
274
275        self._do_query('UPDATE afe_hosts SET status="Ready", locked=1 '
276                       'WHERE id=1')
277        self._run_scheduler()
278        self._check_for_extra_schedulings()
279
280        self._do_query('UPDATE afe_hosts SET locked=0, invalid=1 '
281                       'WHERE id=1')
282        self._run_scheduler()
283        if not use_metahosts:
284            self._assert_job_scheduled_on(1, 1)
285        self._check_for_extra_schedulings()
286
287
288    def _test_hosts_idle_helper(self, use_metahosts):
289        'Only idle hosts get scheduled'
290        self._create_job(hosts=[1], active=True)
291        self._create_job_simple([1], use_metahosts)
292        self._run_scheduler()
293        self._check_for_extra_schedulings()
294
295
296    def _test_obey_ACLs_helper(self, use_metahosts):
297        self._do_query('DELETE FROM afe_acl_groups_hosts WHERE host_id=1')
298        self._create_job_simple([1], use_metahosts)
299        self._run_scheduler()
300        self._check_for_extra_schedulings()
301
302
303    def test_basic_scheduling(self):
304        self._test_basic_scheduling_helper(False)
305
306
307    def test_priorities(self):
308        self._test_priorities_helper(False)
309
310
311    def test_hosts_ready(self):
312        self._test_hosts_ready_helper(False)
313
314
315    def test_hosts_idle(self):
316        self._test_hosts_idle_helper(False)
317
318
319    def test_obey_ACLs(self):
320        self._test_obey_ACLs_helper(False)
321
322
323    def test_one_time_hosts_ignore_ACLs(self):
324        self._do_query('DELETE FROM afe_acl_groups_hosts WHERE host_id=1')
325        self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=1')
326        self._create_job_simple([1])
327        self._run_scheduler()
328        self._assert_job_scheduled_on(1, 1)
329        self._check_for_extra_schedulings()
330
331
332    def test_non_metahost_on_invalid_host(self):
333        """
334        Non-metahost entries can get scheduled on invalid hosts (this is how
335        one-time hosts work).
336        """
337        self._do_query('UPDATE afe_hosts SET invalid=1')
338        self._test_basic_scheduling_helper(False)
339
340
341    def test_metahost_scheduling(self):
342        """
343        Basic metahost scheduling
344        """
345        self._test_basic_scheduling_helper(True)
346
347
348    def test_metahost_priorities(self):
349        self._test_priorities_helper(True)
350
351
352    def test_metahost_hosts_ready(self):
353        self._test_hosts_ready_helper(True)
354
355
356    def test_metahost_hosts_idle(self):
357        self._test_hosts_idle_helper(True)
358
359
360    def test_metahost_obey_ACLs(self):
361        self._test_obey_ACLs_helper(True)
362
363
364    def test_nonmetahost_over_metahost(self):
365        """
366        Non-metahost entries should take priority over metahost entries
367        for the same host
368        """
369        self._create_job(metahosts=[1])
370        self._create_job(hosts=[1])
371        self._run_scheduler()
372        self._assert_job_scheduled_on(2, 1)
373        self._check_for_extra_schedulings()
374
375
376#    TODO: Revive this test.
377#    def test_HostScheduler_get_host_atomic_group_id(self):
378#        job = self._create_job(metahosts=[self.label6.id])
379#        queue_entry = scheduler_models.HostQueueEntry.fetch(
380#                where='job_id=%d' % job.id)[0]
381#        # Indirectly initialize the internal state of the host scheduler.
382#        self._dispatcher._refresh_pending_queue_entries()
383#
384#        # Test the host scheduler
385#        host_scheduler = self._dispatcher._host_scheduler
386#
387#
388#        # Two labels each in a different atomic group.  This should log an
389#        # error and continue.
390#        orig_logging_error = logging.error
391#        def mock_logging_error(message, *args):
392#            mock_logging_error._num_calls += 1
393#            # Test the logging call itself, we just wrapped it to count it.
394#            orig_logging_error(message, *args)
395#        mock_logging_error._num_calls = 0
396#        self.god.stub_with(logging, 'error', mock_logging_error)
397#        host_scheduler.refresh([])
398#        self.assertNotEquals(None, host_scheduler._get_host_atomic_group_id(
399#                [self.label4.id, self.label8.id], queue_entry))
400#        self.assertTrue(mock_logging_error._num_calls > 0)
401#        self.god.unstub(logging, 'error')
402#
403#        # Two labels both in the same atomic group, this should not raise an
404#        # error, it will merely cause the job to schedule on the intersection.
405#        self.assertEquals(1, host_scheduler._get_host_atomic_group_id(
406#                [self.label4.id, self.label5.id]))
407#
408#        self.assertEquals(None, host_scheduler._get_host_atomic_group_id([]))
409#        self.assertEquals(None, host_scheduler._get_host_atomic_group_id(
410#                [self.label3.id, self.label7.id, self.label6.id]))
411#        self.assertEquals(1, host_scheduler._get_host_atomic_group_id(
412#                [self.label4.id, self.label7.id, self.label6.id]))
413#        self.assertEquals(1, host_scheduler._get_host_atomic_group_id(
414#                [self.label7.id, self.label5.id]))
415
416
417    def test_no_execution_subdir_not_found(self):
418        """Reproduce bug crosbug.com/334353 and recover from it."""
419
420        global_config.global_config.override_config_value(
421                'SCHEDULER', 'drones', 'localhost')
422
423        job = self._create_job(hostless=True)
424
425        # Ensure execution_subdir is set before status
426        original_set_status = scheduler_models.HostQueueEntry.set_status
427        def fake_set_status(hqe, *args, **kwargs):
428            self.assertEqual(hqe.execution_subdir, 'hostless')
429            original_set_status(hqe, *args, **kwargs)
430
431        self.god.stub_with(scheduler_models.HostQueueEntry, 'set_status',
432                           fake_set_status)
433
434        self._dispatcher._schedule_new_jobs()
435
436        hqe = job.hostqueueentry_set.all()[0]
437        self.assertEqual(models.HostQueueEntry.Status.STARTING, hqe.status)
438        self.assertEqual('hostless', hqe.execution_subdir)
439
440
441    def test_only_schedule_queued_entries(self):
442        self._create_job(metahosts=[1])
443        self._update_hqe(set='active=1, host_id=2')
444        self._run_scheduler()
445        self._check_for_extra_schedulings()
446
447
448    def test_no_ready_hosts(self):
449        self._create_job(hosts=[1])
450        self._do_query('UPDATE afe_hosts SET status="Repair Failed"')
451        self._run_scheduler()
452        self._check_for_extra_schedulings()
453
454
455    def test_garbage_collection(self):
456        self.god.stub_with(self._dispatcher, '_seconds_between_garbage_stats',
457                           999999)
458        self.god.stub_function(gc, 'collect')
459        self.god.stub_function(gc_stats, '_log_garbage_collector_stats')
460        gc.collect.expect_call().and_return(0)
461        gc_stats._log_garbage_collector_stats.expect_call()
462        # Force a garbage collection run
463        self._dispatcher._last_garbage_stats_time = 0
464        self._dispatcher._garbage_collection()
465        # The previous call should have reset the time, it won't do anything
466        # the second time.  If it does, we'll get an unexpected call.
467        self._dispatcher._garbage_collection()
468
469
470class DispatcherThrottlingTest(BaseSchedulerTest):
471    """
472    Test that the dispatcher throttles:
473     * total number of running processes
474     * number of processes started per cycle
475    """
476    _MAX_RUNNING = 3
477    _MAX_STARTED = 2
478
479    def setUp(self):
480        super(DispatcherThrottlingTest, self).setUp()
481        scheduler_config.config.max_processes_per_drone = self._MAX_RUNNING
482
483        def fake_max_runnable_processes(fake_self, username,
484                                        drone_hostnames_allowed):
485            running = sum(agent.task.num_processes
486                          for agent in self._agents
487                          if agent.started and not agent.is_done())
488            return self._MAX_RUNNING - running
489        self.god.stub_with(drone_manager.DroneManager, 'max_runnable_processes',
490                           fake_max_runnable_processes)
491
492
493    def _setup_some_agents(self, num_agents):
494        self._agents = [DummyAgent() for i in xrange(num_agents)]
495        self._dispatcher._agents = list(self._agents)
496
497
498    def _run_a_few_ticks(self):
499        for i in xrange(4):
500            self._dispatcher._handle_agents()
501
502
503    def _assert_agents_started(self, indexes, is_started=True):
504        for i in indexes:
505            self.assert_(self._agents[i].started == is_started,
506                         'Agent %d %sstarted' %
507                         (i, is_started and 'not ' or ''))
508
509
510    def _assert_agents_not_started(self, indexes):
511        self._assert_agents_started(indexes, False)
512
513
514    def test_throttle_total(self):
515        self._setup_some_agents(4)
516        self._run_a_few_ticks()
517        self._assert_agents_started([0, 1, 2])
518        self._assert_agents_not_started([3])
519
520
521    def test_throttle_with_synchronous(self):
522        self._setup_some_agents(2)
523        self._agents[0].task.num_processes = 3
524        self._run_a_few_ticks()
525        self._assert_agents_started([0])
526        self._assert_agents_not_started([1])
527
528
529    def test_large_agent_starvation(self):
530        """
531        Ensure large agents don't get starved by lower-priority agents.
532        """
533        self._setup_some_agents(3)
534        self._agents[1].task.num_processes = 3
535        self._run_a_few_ticks()
536        self._assert_agents_started([0])
537        self._assert_agents_not_started([1, 2])
538
539        self._agents[0].set_done(True)
540        self._run_a_few_ticks()
541        self._assert_agents_started([1])
542        self._assert_agents_not_started([2])
543
544
545    def test_zero_process_agent(self):
546        self._setup_some_agents(5)
547        self._agents[4].task.num_processes = 0
548        self._run_a_few_ticks()
549        self._assert_agents_started([0, 1, 2, 4])
550        self._assert_agents_not_started([3])
551
552
553class PidfileRunMonitorTest(unittest.TestCase):
554    execution_tag = 'test_tag'
555    pid = 12345
556    process = drone_manager.Process('myhost', pid)
557    num_tests_failed = 1
558
559    def setUp(self):
560        self.god = mock.mock_god()
561        self.mock_drone_manager = self.god.create_mock_class(
562            drone_manager.DroneManager, 'drone_manager')
563        self.god.stub_with(drone_manager, '_the_instance',
564                           self.mock_drone_manager)
565        self.god.stub_with(pidfile_monitor, '_get_pidfile_timeout_secs',
566                           self._mock_get_pidfile_timeout_secs)
567
568        self.pidfile_id = object()
569
570        (self.mock_drone_manager.get_pidfile_id_from
571             .expect_call(self.execution_tag,
572                          pidfile_name=drone_manager.AUTOSERV_PID_FILE)
573             .and_return(self.pidfile_id))
574
575        self.monitor = pidfile_monitor.PidfileRunMonitor()
576        self.monitor.attach_to_existing_process(self.execution_tag)
577
578    def tearDown(self):
579        self.god.unstub_all()
580
581
582    def _mock_get_pidfile_timeout_secs(self):
583        return 300
584
585
586    def setup_pidfile(self, pid=None, exit_code=None, tests_failed=None,
587                      use_second_read=False):
588        contents = drone_manager.PidfileContents()
589        if pid is not None:
590            contents.process = drone_manager.Process('myhost', pid)
591        contents.exit_status = exit_code
592        contents.num_tests_failed = tests_failed
593        self.mock_drone_manager.get_pidfile_contents.expect_call(
594            self.pidfile_id, use_second_read=use_second_read).and_return(
595            contents)
596
597
598    def set_not_yet_run(self):
599        self.setup_pidfile()
600
601
602    def set_empty_pidfile(self):
603        self.setup_pidfile()
604
605
606    def set_running(self, use_second_read=False):
607        self.setup_pidfile(self.pid, use_second_read=use_second_read)
608
609
610    def set_complete(self, error_code, use_second_read=False):
611        self.setup_pidfile(self.pid, error_code, self.num_tests_failed,
612                           use_second_read=use_second_read)
613
614
615    def _check_monitor(self, expected_pid, expected_exit_status,
616                       expected_num_tests_failed):
617        if expected_pid is None:
618            self.assertEquals(self.monitor._state.process, None)
619        else:
620            self.assertEquals(self.monitor._state.process.pid, expected_pid)
621        self.assertEquals(self.monitor._state.exit_status, expected_exit_status)
622        self.assertEquals(self.monitor._state.num_tests_failed,
623                          expected_num_tests_failed)
624
625
626        self.god.check_playback()
627
628
629    def _test_read_pidfile_helper(self, expected_pid, expected_exit_status,
630                                  expected_num_tests_failed):
631        self.monitor._read_pidfile()
632        self._check_monitor(expected_pid, expected_exit_status,
633                            expected_num_tests_failed)
634
635
636    def _get_expected_tests_failed(self, expected_exit_status):
637        if expected_exit_status is None:
638            expected_tests_failed = None
639        else:
640            expected_tests_failed = self.num_tests_failed
641        return expected_tests_failed
642
643
644    def test_read_pidfile(self):
645        self.set_not_yet_run()
646        self._test_read_pidfile_helper(None, None, None)
647
648        self.set_empty_pidfile()
649        self._test_read_pidfile_helper(None, None, None)
650
651        self.set_running()
652        self._test_read_pidfile_helper(self.pid, None, None)
653
654        self.set_complete(123)
655        self._test_read_pidfile_helper(self.pid, 123, self.num_tests_failed)
656
657
658    def test_read_pidfile_error(self):
659        self.mock_drone_manager.get_pidfile_contents.expect_call(
660            self.pidfile_id, use_second_read=False).and_return(
661            drone_manager.InvalidPidfile('error'))
662        self.assertRaises(pidfile_monitor.PidfileRunMonitor._PidfileException,
663                          self.monitor._read_pidfile)
664        self.god.check_playback()
665
666
667    def setup_is_running(self, is_running):
668        self.mock_drone_manager.is_process_running.expect_call(
669            self.process).and_return(is_running)
670
671
672    def _test_get_pidfile_info_helper(self, expected_pid, expected_exit_status,
673                                      expected_num_tests_failed):
674        self.monitor._get_pidfile_info()
675        self._check_monitor(expected_pid, expected_exit_status,
676                            expected_num_tests_failed)
677
678
679    def test_get_pidfile_info(self):
680        """
681        normal cases for get_pidfile_info
682        """
683        # running
684        self.set_running()
685        self.setup_is_running(True)
686        self._test_get_pidfile_info_helper(self.pid, None, None)
687
688        # exited during check
689        self.set_running()
690        self.setup_is_running(False)
691        self.set_complete(123, use_second_read=True) # pidfile gets read again
692        self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed)
693
694        # completed
695        self.set_complete(123)
696        self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed)
697
698
699    def test_get_pidfile_info_running_no_proc(self):
700        """
701        pidfile shows process running, but no proc exists
702        """
703        # running but no proc
704        self.set_running()
705        self.setup_is_running(False)
706        self.set_running(use_second_read=True)
707        self._test_get_pidfile_info_helper(self.pid, 1, 0)
708        self.assertTrue(self.monitor.lost_process)
709
710
711    def test_get_pidfile_info_not_yet_run(self):
712        """
713        pidfile hasn't been written yet
714        """
715        self.set_not_yet_run()
716        self._test_get_pidfile_info_helper(None, None, None)
717
718
719    def test_process_failed_to_write_pidfile(self):
720        self.set_not_yet_run()
721        self.monitor._start_time = (time.time() -
722                                    pidfile_monitor._get_pidfile_timeout_secs() - 1)
723        self._test_get_pidfile_info_helper(None, 1, 0)
724        self.assertTrue(self.monitor.lost_process)
725
726
727class AgentTest(unittest.TestCase):
728    def setUp(self):
729        self.god = mock.mock_god()
730        self._dispatcher = self.god.create_mock_class(monitor_db.Dispatcher,
731                                                      'dispatcher')
732
733
734    def tearDown(self):
735        self.god.unstub_all()
736
737
738    def _create_mock_task(self, name):
739        task = self.god.create_mock_class(agent_task.AgentTask, name)
740        task.num_processes = 1
741        _set_host_and_qe_ids(task)
742        return task
743
744    def _create_agent(self, task):
745        agent = monitor_db.Agent(task)
746        agent.dispatcher = self._dispatcher
747        return agent
748
749
750    def _finish_agent(self, agent):
751        while not agent.is_done():
752            agent.tick()
753
754
755    def test_agent_abort(self):
756        task = self._create_mock_task('task')
757        task.poll.expect_call()
758        task.is_done.expect_call().and_return(False)
759        task.abort.expect_call()
760        task.aborted = True
761
762        agent = self._create_agent(task)
763        agent.tick()
764        agent.abort()
765        self._finish_agent(agent)
766        self.god.check_playback()
767
768
769    def _test_agent_abort_before_started_helper(self, ignore_abort=False):
770        task = self._create_mock_task('task')
771        task.abort.expect_call()
772        if ignore_abort:
773            task.aborted = False
774            task.poll.expect_call()
775            task.is_done.expect_call().and_return(True)
776            task.success = True
777        else:
778            task.aborted = True
779
780        agent = self._create_agent(task)
781        agent.abort()
782        self._finish_agent(agent)
783        self.god.check_playback()
784
785
786    def test_agent_abort_before_started(self):
787        self._test_agent_abort_before_started_helper()
788        self._test_agent_abort_before_started_helper(True)
789
790
791class JobSchedulingTest(BaseSchedulerTest):
792    def _test_run_helper(self, expect_agent=True, expect_starting=False,
793                         expect_pending=False):
794        if expect_starting:
795            expected_status = models.HostQueueEntry.Status.STARTING
796        elif expect_pending:
797            expected_status = models.HostQueueEntry.Status.PENDING
798        else:
799            expected_status = models.HostQueueEntry.Status.VERIFYING
800        job = scheduler_models.Job.fetch('id = 1')[0]
801        queue_entry = scheduler_models.HostQueueEntry.fetch('id = 1')[0]
802        assert queue_entry.job is job
803        job.run_if_ready(queue_entry)
804
805        self.god.check_playback()
806
807        self._dispatcher._schedule_delay_tasks()
808        self._dispatcher._schedule_running_host_queue_entries()
809        agent = self._dispatcher._agents[0]
810
811        actual_status = models.HostQueueEntry.smart_get(1).status
812        self.assertEquals(expected_status, actual_status)
813
814        if not expect_agent:
815            self.assertEquals(agent, None)
816            return
817
818        self.assert_(isinstance(agent, monitor_db.Agent))
819        self.assert_(agent.task)
820        return agent.task
821
822
823    def test_run_if_ready_delays(self):
824        # Also tests Job.run_with_ready_delay() on atomic group jobs.
825        django_job = self._create_job(hosts=[5, 6], atomic_group=1)
826        job = scheduler_models.Job(django_job.id)
827        self.assertEqual(1, job.synch_count)
828        django_hqes = list(models.HostQueueEntry.objects.filter(job=job.id))
829        self.assertEqual(2, len(django_hqes))
830        self.assertEqual(2, django_hqes[0].atomic_group.max_number_of_machines)
831
832        def set_hqe_status(django_hqe, status):
833            django_hqe.status = status
834            django_hqe.save()
835            scheduler_models.HostQueueEntry(django_hqe.id).host.set_status(status)
836
837        # An initial state, our synch_count is 1
838        set_hqe_status(django_hqes[0], models.HostQueueEntry.Status.VERIFYING)
839        set_hqe_status(django_hqes[1], models.HostQueueEntry.Status.PENDING)
840
841        # So that we don't depend on the config file value during the test.
842        self.assert_(scheduler_config.config
843                     .secs_to_wait_for_atomic_group_hosts is not None)
844        self.god.stub_with(scheduler_config.config,
845                           'secs_to_wait_for_atomic_group_hosts', 123456)
846
847        # Get the pending one as a scheduler_models.HostQueueEntry object.
848        hqe = scheduler_models.HostQueueEntry(django_hqes[1].id)
849        self.assert_(not job._delay_ready_task)
850        self.assertTrue(job.is_ready())
851
852        # Ready with one pending, one verifying and an atomic group should
853        # result in a DelayCallTask to re-check if we're ready a while later.
854        job.run_if_ready(hqe)
855        self.assertEquals('Waiting', hqe.status)
856        self._dispatcher._schedule_delay_tasks()
857        self.assertEquals('Pending', hqe.status)
858        agent = self._dispatcher._agents[0]
859        self.assert_(job._delay_ready_task)
860        self.assert_(isinstance(agent, monitor_db.Agent))
861        self.assert_(agent.task)
862        delay_task = agent.task
863        self.assert_(isinstance(delay_task, scheduler_models.DelayedCallTask))
864        self.assert_(not delay_task.is_done())
865
866        self.god.stub_function(delay_task, 'abort')
867
868        self.god.stub_function(job, 'run')
869
870        self.god.stub_function(job, '_pending_count')
871        self.god.stub_with(job, 'synch_count', 9)
872        self.god.stub_function(job, 'request_abort')
873
874        # Test that the DelayedCallTask's callback queued up above does the
875        # correct thing and does not call run if there are not enough hosts
876        # in pending after the delay.
877        job._pending_count.expect_call().and_return(0)
878        job.request_abort.expect_call()
879        delay_task._callback()
880        self.god.check_playback()
881
882        # Test that the DelayedCallTask's callback queued up above does the
883        # correct thing and returns the Agent returned by job.run() if
884        # there are still enough hosts pending after the delay.
885        job.synch_count = 4
886        job._pending_count.expect_call().and_return(4)
887        job.run.expect_call(hqe)
888        delay_task._callback()
889        self.god.check_playback()
890
891        job._pending_count.expect_call().and_return(4)
892
893        # Adjust the delay deadline so that enough time has passed.
894        job._delay_ready_task.end_time = time.time() - 111111
895        job.run.expect_call(hqe)
896        # ...the delay_expired condition should cause us to call run()
897        self._dispatcher._handle_agents()
898        self.god.check_playback()
899        delay_task.success = False
900
901        # Adjust the delay deadline back so that enough time has not passed.
902        job._delay_ready_task.end_time = time.time() + 111111
903        self._dispatcher._handle_agents()
904        self.god.check_playback()
905
906        # Now max_number_of_machines HQEs are in pending state.  Remaining
907        # delay will now be ignored.
908        other_hqe = scheduler_models.HostQueueEntry(django_hqes[0].id)
909        self.god.unstub(job, 'run')
910        self.god.unstub(job, '_pending_count')
911        self.god.unstub(job, 'synch_count')
912        self.god.unstub(job, 'request_abort')
913        # ...the over_max_threshold test should cause us to call run()
914        delay_task.abort.expect_call()
915        other_hqe.on_pending()
916        self.assertEquals('Starting', other_hqe.status)
917        self.assertEquals('Starting', hqe.status)
918        self.god.stub_function(job, 'run')
919        self.god.unstub(delay_task, 'abort')
920
921        hqe.set_status('Pending')
922        other_hqe.set_status('Pending')
923        # Now we're not over the max for the atomic group.  But all assigned
924        # hosts are in pending state.  over_max_threshold should make us run().
925        hqe.atomic_group.max_number_of_machines += 1
926        hqe.atomic_group.save()
927        job.run.expect_call(hqe)
928        hqe.on_pending()
929        self.god.check_playback()
930        hqe.atomic_group.max_number_of_machines -= 1
931        hqe.atomic_group.save()
932
933        other_hqe = scheduler_models.HostQueueEntry(django_hqes[0].id)
934        self.assertTrue(hqe.job is other_hqe.job)
935        # DBObject classes should reuse instances so these should be the same.
936        self.assertEqual(job, other_hqe.job)
937        self.assertEqual(other_hqe.job, hqe.job)
938        # Be sure our delay was not lost during the other_hqe construction.
939        self.assertEqual(job._delay_ready_task, delay_task)
940        self.assert_(job._delay_ready_task)
941        self.assertFalse(job._delay_ready_task.is_done())
942        self.assertFalse(job._delay_ready_task.aborted)
943
944        # We want the real run() to be called below.
945        self.god.unstub(job, 'run')
946
947        # We pass in the other HQE this time the same way it would happen
948        # for real when one host finishes verifying and enters pending.
949        job.run_if_ready(other_hqe)
950
951        # The delayed task must be aborted by the actual run() call above.
952        self.assertTrue(job._delay_ready_task.aborted)
953        self.assertFalse(job._delay_ready_task.success)
954        self.assertTrue(job._delay_ready_task.is_done())
955
956        # Check that job run() and _finish_run() were called by the above:
957        self._dispatcher._schedule_running_host_queue_entries()
958        agent = self._dispatcher._agents[0]
959        self.assert_(agent.task)
960        task = agent.task
961        self.assert_(isinstance(task, monitor_db.QueueTask))
962        # Requery these hqes in order to verify the status from the DB.
963        django_hqes = list(models.HostQueueEntry.objects.filter(job=job.id))
964        for entry in django_hqes:
965            self.assertEqual(models.HostQueueEntry.Status.STARTING,
966                             entry.status)
967
968        # We're already running, but more calls to run_with_ready_delay can
969        # continue to come in due to straggler hosts enter Pending.  Make
970        # sure we don't do anything.
971        self.god.stub_function(job, 'run')
972        job.run_with_ready_delay(hqe)
973        self.god.check_playback()
974        self.god.unstub(job, 'run')
975
976
977    def test_run_synchronous_atomic_group_ready(self):
978        self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True)
979        self._update_hqe("status='Pending', execution_subdir=''")
980
981        queue_task = self._test_run_helper(expect_starting=True)
982
983        self.assert_(isinstance(queue_task, monitor_db.QueueTask))
984        # Atomic group jobs that do not depend on a specific label in the
985        # atomic group will use the atomic group name as their group name.
986        self.assertEquals(queue_task.queue_entries[0].get_group_name(),
987                          'atomic1')
988
989
990    def test_run_synchronous_atomic_group_with_label_ready(self):
991        job = self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True)
992        job.dependency_labels.add(self.label4)
993        self._update_hqe("status='Pending', execution_subdir=''")
994
995        queue_task = self._test_run_helper(expect_starting=True)
996
997        self.assert_(isinstance(queue_task, monitor_db.QueueTask))
998        # Atomic group jobs that also specify a label in the atomic group
999        # will use the label name as their group name.
1000        self.assertEquals(queue_task.queue_entries[0].get_group_name(),
1001                          'label4')
1002
1003
1004    def test_run_synchronous_ready(self):
1005        self._create_job(hosts=[1, 2], synchronous=True)
1006        self._update_hqe("status='Pending', execution_subdir=''")
1007
1008        queue_task = self._test_run_helper(expect_starting=True)
1009
1010        self.assert_(isinstance(queue_task, monitor_db.QueueTask))
1011        self.assertEquals(queue_task.job.id, 1)
1012        hqe_ids = [hqe.id for hqe in queue_task.queue_entries]
1013        self.assertEquals(hqe_ids, [1, 2])
1014
1015
1016    def test_schedule_running_host_queue_entries_fail(self):
1017        self._create_job(hosts=[2])
1018        self._update_hqe("status='%s', execution_subdir=''" %
1019                         models.HostQueueEntry.Status.PENDING)
1020        job = scheduler_models.Job.fetch('id = 1')[0]
1021        queue_entry = scheduler_models.HostQueueEntry.fetch('id = 1')[0]
1022        assert queue_entry.job is job
1023        job.run_if_ready(queue_entry)
1024        self.assertEqual(queue_entry.status,
1025                         models.HostQueueEntry.Status.STARTING)
1026        self.assert_(queue_entry.execution_subdir)
1027        self.god.check_playback()
1028
1029        class dummy_test_agent(object):
1030            task = 'dummy_test_agent'
1031        self._dispatcher._register_agent_for_ids(
1032                self._dispatcher._host_agents, [queue_entry.host.id],
1033                dummy_test_agent)
1034
1035        # Attempted to schedule on a host that already has an agent.
1036        self.assertRaises(scheduler_lib.SchedulerError,
1037                          self._dispatcher._schedule_running_host_queue_entries)
1038
1039
1040    def test_schedule_hostless_job(self):
1041        job = self._create_job(hostless=True)
1042        self.assertEqual(1, job.hostqueueentry_set.count())
1043        hqe_query = scheduler_models.HostQueueEntry.fetch(
1044                'id = %s' % job.hostqueueentry_set.all()[0].id)
1045        self.assertEqual(1, len(hqe_query))
1046        hqe = hqe_query[0]
1047
1048        self.assertEqual(models.HostQueueEntry.Status.QUEUED, hqe.status)
1049        self.assertEqual(0, len(self._dispatcher._agents))
1050
1051        self._dispatcher._schedule_new_jobs()
1052
1053        self.assertEqual(models.HostQueueEntry.Status.STARTING, hqe.status)
1054        self.assertEqual(1, len(self._dispatcher._agents))
1055
1056        self._dispatcher._schedule_new_jobs()
1057
1058        # No change to previously schedule hostless job, and no additional agent
1059        self.assertEqual(models.HostQueueEntry.Status.STARTING, hqe.status)
1060        self.assertEqual(1, len(self._dispatcher._agents))
1061
1062
1063class TopLevelFunctionsTest(unittest.TestCase):
1064    def setUp(self):
1065        self.god = mock.mock_god()
1066
1067
1068    def tearDown(self):
1069        self.god.unstub_all()
1070
1071
1072    def test_autoserv_command_line(self):
1073        machines = 'abcd12,efgh34'
1074        extra_args = ['-Z', 'hello']
1075        expected_command_line_base = set((monitor_db._autoserv_path, '-p',
1076                                          '-m', machines, '-r',
1077                                          '--lab', 'True',
1078                                          drone_manager.WORKING_DIRECTORY))
1079
1080        expected_command_line = expected_command_line_base.union(
1081                ['--verbose']).union(extra_args)
1082        command_line = set(
1083                monitor_db._autoserv_command_line(machines, extra_args))
1084        self.assertEqual(expected_command_line, command_line)
1085
1086        class FakeJob(object):
1087            owner = 'Bob'
1088            name = 'fake job name'
1089            test_retry = 0
1090            id = 1337
1091
1092        class FakeHQE(object):
1093            job = FakeJob
1094
1095        expected_command_line = expected_command_line_base.union(
1096                ['-u', FakeJob.owner, '-l', FakeJob.name])
1097        command_line = set(monitor_db._autoserv_command_line(
1098                machines, extra_args=[], queue_entry=FakeHQE, verbose=False))
1099        self.assertEqual(expected_command_line, command_line)
1100
1101
1102class AgentTaskTest(unittest.TestCase,
1103                    frontend_test_utils.FrontendTestMixin):
1104    def setUp(self):
1105        self._frontend_common_setup()
1106
1107
1108    def tearDown(self):
1109        self._frontend_common_teardown()
1110
1111
1112    def _setup_drones(self):
1113        self.god.stub_function(models.DroneSet, 'drone_sets_enabled')
1114        models.DroneSet.drone_sets_enabled.expect_call().and_return(True)
1115
1116        drones = []
1117        for x in xrange(4):
1118            drones.append(models.Drone.objects.create(hostname=str(x)))
1119
1120        drone_set_1 = models.DroneSet.objects.create(name='1')
1121        drone_set_1.drones.add(*drones[0:2])
1122        drone_set_2 = models.DroneSet.objects.create(name='2')
1123        drone_set_2.drones.add(*drones[2:4])
1124        drone_set_3 = models.DroneSet.objects.create(name='3')
1125
1126        job_1 = self._create_job_simple([self.hosts[0].id],
1127                                        drone_set=drone_set_1)
1128        job_2 = self._create_job_simple([self.hosts[0].id],
1129                                        drone_set=drone_set_2)
1130        job_3 = self._create_job_simple([self.hosts[0].id],
1131                                        drone_set=drone_set_3)
1132
1133        job_4 = self._create_job_simple([self.hosts[0].id])
1134        job_4.drone_set = None
1135        job_4.save()
1136
1137        hqe_1 = job_1.hostqueueentry_set.all()[0]
1138        hqe_2 = job_2.hostqueueentry_set.all()[0]
1139        hqe_3 = job_3.hostqueueentry_set.all()[0]
1140        hqe_4 = job_4.hostqueueentry_set.all()[0]
1141
1142        return (hqe_1, hqe_2, hqe_3, hqe_4), agent_task.AgentTask()
1143
1144
1145    def test_get_drone_hostnames_allowed_no_drones_in_set(self):
1146        hqes, task = self._setup_drones()
1147        task.queue_entry_ids = (hqes[2].id,)
1148        self.assertEqual(set(), task.get_drone_hostnames_allowed())
1149        self.god.check_playback()
1150
1151
1152    def test_get_drone_hostnames_allowed_no_drone_set(self):
1153        hqes, task = self._setup_drones()
1154        hqe = hqes[3]
1155        task.queue_entry_ids = (hqe.id,)
1156
1157        result = object()
1158
1159        self.god.stub_function(task, '_user_or_global_default_drone_set')
1160        task._user_or_global_default_drone_set.expect_call(
1161                hqe.job, hqe.job.user()).and_return(result)
1162
1163        self.assertEqual(result, task.get_drone_hostnames_allowed())
1164        self.god.check_playback()
1165
1166
1167    def test_get_drone_hostnames_allowed_success(self):
1168        hqes, task = self._setup_drones()
1169        task.queue_entry_ids = (hqes[0].id,)
1170        self.assertEqual(set(('0','1')), task.get_drone_hostnames_allowed([]))
1171        self.god.check_playback()
1172
1173
1174    def test_get_drone_hostnames_allowed_multiple_jobs(self):
1175        hqes, task = self._setup_drones()
1176        task.queue_entry_ids = (hqes[0].id, hqes[1].id)
1177        self.assertRaises(AssertionError,
1178                          task.get_drone_hostnames_allowed)
1179        self.god.check_playback()
1180
1181
1182    def test_get_drone_hostnames_allowed_no_hqe(self):
1183        class MockSpecialTask(object):
1184            requested_by = object()
1185
1186        class MockSpecialAgentTask(agent_task.SpecialAgentTask):
1187            task = MockSpecialTask()
1188            queue_entry_ids = []
1189            def __init__(self, *args, **kwargs):
1190                super(agent_task.SpecialAgentTask, self).__init__()
1191
1192        task = MockSpecialAgentTask()
1193        self.god.stub_function(models.DroneSet, 'drone_sets_enabled')
1194        self.god.stub_function(task, '_user_or_global_default_drone_set')
1195
1196        result = object()
1197        models.DroneSet.drone_sets_enabled.expect_call().and_return(True)
1198        task._user_or_global_default_drone_set.expect_call(
1199                task.task, MockSpecialTask.requested_by).and_return(result)
1200
1201        self.assertEqual(result, task.get_drone_hostnames_allowed())
1202        self.god.check_playback()
1203
1204
1205    def _setup_test_user_or_global_default_drone_set(self):
1206        result = object()
1207        class MockDroneSet(object):
1208            def get_drone_hostnames(self):
1209                return result
1210
1211        self.god.stub_function(models.DroneSet, 'get_default')
1212        models.DroneSet.get_default.expect_call().and_return(MockDroneSet())
1213        return result
1214
1215
1216    def test_user_or_global_default_drone_set(self):
1217        expected = object()
1218        class MockDroneSet(object):
1219            def get_drone_hostnames(self):
1220                return expected
1221        class MockUser(object):
1222            drone_set = MockDroneSet()
1223
1224        self._setup_test_user_or_global_default_drone_set()
1225
1226        actual = agent_task.AgentTask()._user_or_global_default_drone_set(
1227                None, MockUser())
1228
1229        self.assertEqual(expected, actual)
1230        self.god.check_playback()
1231
1232
1233    def test_user_or_global_default_drone_set_no_user(self):
1234        expected = self._setup_test_user_or_global_default_drone_set()
1235        actual = agent_task.AgentTask()._user_or_global_default_drone_set(
1236                None, None)
1237
1238        self.assertEqual(expected, actual)
1239        self.god.check_playback()
1240
1241
1242    def test_user_or_global_default_drone_set_no_user_drone_set(self):
1243        class MockUser(object):
1244            drone_set = None
1245            login = None
1246
1247        expected = self._setup_test_user_or_global_default_drone_set()
1248        actual = agent_task.AgentTask()._user_or_global_default_drone_set(
1249                None, MockUser())
1250
1251        self.assertEqual(expected, actual)
1252        self.god.check_playback()
1253
1254
1255    def test_abort_HostlessQueueTask(self):
1256        hqe = self.god.create_mock_class(scheduler_models.HostQueueEntry,
1257                                         'HostQueueEntry')
1258        # If hqe is still in STARTING status, aborting the task should finish
1259        # without changing hqe's status.
1260        hqe.status = models.HostQueueEntry.Status.STARTING
1261        hqe.job = None
1262        hqe.id = 0
1263        task = monitor_db.HostlessQueueTask(hqe)
1264        task.abort()
1265
1266        # If hqe is in RUNNING status, aborting the task should change hqe's
1267        # status to Parsing, so FinalReparseTask can be scheduled.
1268        hqe.set_status.expect_call('Parsing')
1269        hqe.status = models.HostQueueEntry.Status.RUNNING
1270        hqe.job = None
1271        hqe.id = 0
1272        task = monitor_db.HostlessQueueTask(hqe)
1273        task.abort()
1274
1275
1276if __name__ == '__main__':
1277    unittest.main()
1278