1#!/usr/bin/python
2
3import cPickle
4import os, unittest
5import common
6from autotest_lib.client.bin import local_host
7from autotest_lib.client.common_lib import global_config
8from autotest_lib.client.common_lib import utils
9from autotest_lib.client.common_lib.test_utils import mock
10from autotest_lib.frontend import setup_django_lite_environment
11from autotest_lib.scheduler import drone_manager, drone_utility, drones
12from autotest_lib.scheduler import scheduler_config, site_drone_manager
13from autotest_lib.scheduler import thread_lib
14from autotest_lib.scheduler import pidfile_monitor
15from autotest_lib.server.hosts import ssh_host
16
17
18class MockDrone(drones._AbstractDrone):
19    def __init__(self, name, active_processes=0, max_processes=10,
20                 allowed_users=None, support_ssp=False):
21        super(MockDrone, self).__init__()
22        self.name = name
23        self.hostname = name
24        self.active_processes = active_processes
25        self.max_processes = max_processes
26        self.allowed_users = allowed_users
27        self._host = 'mock_drone'
28        self._support_ssp = support_ssp
29        # maps method names list of tuples containing method arguments
30        self._recorded_calls = {'queue_call': [],
31                                'send_file_to': []}
32
33
34    def queue_call(self, method, *args, **kwargs):
35        self._recorded_calls['queue_call'].append((method, args, kwargs))
36
37
38    def call(self, method, *args, **kwargs):
39        # don't bother differentiating between call() and queue_call()
40        return self.queue_call(method, *args, **kwargs)
41
42
43    def send_file_to(self, drone, source_path, destination_path,
44                     can_fail=False):
45        self._recorded_calls['send_file_to'].append(
46                (drone, source_path, destination_path))
47
48
49    # method for use by tests
50    def _check_for_recorded_call(self, method_name, arguments):
51        recorded_arg_list = self._recorded_calls[method_name]
52        was_called = arguments in recorded_arg_list
53        if not was_called:
54            print 'Recorded args:', recorded_arg_list
55            print 'Expected:', arguments
56        return was_called
57
58
59    def was_call_queued(self, method, *args, **kwargs):
60        return self._check_for_recorded_call('queue_call',
61                                             (method, args, kwargs))
62
63
64    def was_file_sent(self, drone, source_path, destination_path):
65        return self._check_for_recorded_call('send_file_to',
66                                             (drone, source_path,
67                                              destination_path))
68
69
70class DroneManager(unittest.TestCase):
71    _DRONE_INSTALL_DIR = '/drone/install/dir'
72    _DRONE_RESULTS_DIR = os.path.join(_DRONE_INSTALL_DIR, 'results')
73    _RESULTS_DIR = '/results/dir'
74    _SOURCE_PATH = 'source/path'
75    _DESTINATION_PATH = 'destination/path'
76    _WORKING_DIRECTORY = 'working/directory'
77    _USERNAME = 'my_user'
78
79    def setUp(self):
80        self.god = mock.mock_god()
81        self.god.stub_with(drones, 'AUTOTEST_INSTALL_DIR',
82                           self._DRONE_INSTALL_DIR)
83        self.manager = drone_manager.DroneManager()
84        self.god.stub_with(self.manager, '_results_dir', self._RESULTS_DIR)
85
86        # we don't want this to ever actually get called
87        self.god.stub_function(drones, 'get_drone')
88        # we don't want the DroneManager to go messing with global config
89        def do_nothing():
90            pass
91        self.god.stub_with(self.manager, 'refresh_drone_configs', do_nothing)
92
93        # set up some dummy drones
94        self.mock_drone = MockDrone('mock_drone')
95        self.manager._drones[self.mock_drone.name] = self.mock_drone
96        self.results_drone = MockDrone('results_drone', 0, 10)
97        self.manager._results_drone = self.results_drone
98
99        self.mock_drone_process = drone_manager.Process(self.mock_drone.name, 0)
100
101
102    def tearDown(self):
103        self.god.unstub_all()
104
105
106    def _test_choose_drone_for_execution_helper(self, processes_info_list,
107                                                requested_processes,
108                                                require_ssp=False):
109        for index, process_info in enumerate(processes_info_list):
110            if len(process_info) == 2:
111                active_processes, max_processes = process_info
112                support_ssp = False
113            else:
114                active_processes, max_processes, support_ssp = process_info
115            self.manager._enqueue_drone(MockDrone(
116                    index, active_processes, max_processes, allowed_users=None,
117                    support_ssp=support_ssp))
118
119        return self.manager._choose_drone_for_execution(
120                requested_processes, self._USERNAME, None, require_ssp)
121
122
123    def test_choose_drone_for_execution(self):
124        drone = self._test_choose_drone_for_execution_helper([(1, 2), (0, 2)],
125                                                             1)
126        self.assertEquals(drone.name, 1)
127
128
129    def test_choose_drone_for_execution_some_full(self):
130        drone = self._test_choose_drone_for_execution_helper([(0, 1), (1, 3)],
131                                                             2)
132        self.assertEquals(drone.name, 1)
133
134
135    def test_choose_drone_for_execution_all_full(self):
136        drone = self._test_choose_drone_for_execution_helper([(2, 1), (3, 2)],
137                                                             1)
138        self.assertEquals(drone.name, 1)
139
140
141    def test_choose_drone_for_execution_all_full_same_percentage_capacity(self):
142        drone = self._test_choose_drone_for_execution_helper([(5, 3), (10, 6)],
143                                                             1)
144        self.assertEquals(drone.name, 1)
145
146
147    def test_choose_drone_for_execution_no_ssp_support(self):
148        drone = self._test_choose_drone_for_execution_helper(
149                [(0, 1), (1, 3)], 1, True)
150        self.assertEquals(drone.name, 0)
151
152
153    def test_choose_drone_for_execution_with_ssp_support(self):
154        self.mock_drone._support_ssp = True
155        drone = self._test_choose_drone_for_execution_helper(
156                [(0, 1), (1, 3, True)], 1, True)
157        self.assertEquals(drone.name, 1)
158
159
160    def test_user_restrictions(self):
161        # this drone is restricted to a different user
162        self.manager._enqueue_drone(MockDrone(1, max_processes=10,
163                                              allowed_users=['fakeuser']))
164        # this drone is allowed but has lower capacity
165        self.manager._enqueue_drone(MockDrone(2, max_processes=2,
166                                              allowed_users=[self._USERNAME]))
167
168        self.assertEquals(2,
169                          self.manager.max_runnable_processes(self._USERNAME,
170                                                              None))
171        drone = self.manager._choose_drone_for_execution(
172                1, username=self._USERNAME, drone_hostnames_allowed=None)
173        self.assertEquals(drone.name, 2)
174
175
176    def test_user_restrictions_with_full_drone(self):
177        # this drone is restricted to a different user
178        self.manager._enqueue_drone(MockDrone(1, max_processes=10,
179                                              allowed_users=['fakeuser']))
180        # this drone is allowed but is full
181        self.manager._enqueue_drone(MockDrone(2, active_processes=3,
182                                              max_processes=2,
183                                              allowed_users=[self._USERNAME]))
184
185        self.assertEquals(0,
186                          self.manager.max_runnable_processes(self._USERNAME,
187                                                              None))
188        drone = self.manager._choose_drone_for_execution(
189                1, username=self._USERNAME, drone_hostnames_allowed=None)
190        self.assertEquals(drone.name, 2)
191
192
193    def _setup_test_drone_restrictions(self, active_processes=0):
194        self.manager._enqueue_drone(MockDrone(
195                1, active_processes=active_processes, max_processes=10))
196        self.manager._enqueue_drone(MockDrone(
197                2, active_processes=active_processes, max_processes=5))
198        self.manager._enqueue_drone(MockDrone(
199                3, active_processes=active_processes, max_processes=2))
200
201
202    def test_drone_restrictions_allow_any(self):
203        self._setup_test_drone_restrictions()
204        self.assertEquals(10,
205                          self.manager.max_runnable_processes(self._USERNAME,
206                                                              None))
207        drone = self.manager._choose_drone_for_execution(
208                1, username=self._USERNAME, drone_hostnames_allowed=None)
209        self.assertEqual(drone.name, 1)
210
211
212    def test_drone_restrictions_under_capacity(self):
213        self._setup_test_drone_restrictions()
214        drone_hostnames_allowed = (2, 3)
215        self.assertEquals(
216                5, self.manager.max_runnable_processes(self._USERNAME,
217                                                       drone_hostnames_allowed))
218        drone = self.manager._choose_drone_for_execution(
219                1, username=self._USERNAME,
220                drone_hostnames_allowed=drone_hostnames_allowed)
221
222        self.assertEqual(drone.name, 2)
223
224
225    def test_drone_restrictions_over_capacity(self):
226        self._setup_test_drone_restrictions(active_processes=6)
227        drone_hostnames_allowed = (2, 3)
228        self.assertEquals(
229                0, self.manager.max_runnable_processes(self._USERNAME,
230                                                       drone_hostnames_allowed))
231        drone = self.manager._choose_drone_for_execution(
232                7, username=self._USERNAME,
233                drone_hostnames_allowed=drone_hostnames_allowed)
234        self.assertEqual(drone.name, 2)
235
236
237    def test_drone_restrictions_allow_none(self):
238        self._setup_test_drone_restrictions()
239        drone_hostnames_allowed = ()
240        self.assertEquals(
241                0, self.manager.max_runnable_processes(self._USERNAME,
242                                                       drone_hostnames_allowed))
243        drone = self.manager._choose_drone_for_execution(
244                1, username=self._USERNAME,
245                drone_hostnames_allowed=drone_hostnames_allowed)
246        self.assertEqual(drone, None)
247
248
249    def test_initialize(self):
250        results_hostname = 'results_repo'
251        results_install_dir = '/results/install'
252        global_config.global_config.override_config_value(
253                scheduler_config.CONFIG_SECTION,
254                'results_host_installation_directory', results_install_dir)
255
256        (drones.get_drone.expect_call(self.mock_drone.name)
257         .and_return(self.mock_drone))
258
259        results_drone = MockDrone('results_drone')
260        self.god.stub_function(results_drone, 'set_autotest_install_dir')
261        drones.get_drone.expect_call(results_hostname).and_return(results_drone)
262        results_drone.set_autotest_install_dir.expect_call(results_install_dir)
263
264        self.manager.initialize(base_results_dir=self._RESULTS_DIR,
265                                drone_hostnames=[self.mock_drone.name],
266                                results_repository_hostname=results_hostname)
267
268        self.assert_(self.mock_drone.was_call_queued(
269                'initialize', self._DRONE_RESULTS_DIR + '/'))
270        self.god.check_playback()
271
272
273    def test_execute_command(self):
274        self.manager._enqueue_drone(self.mock_drone)
275
276        pidfile_name = 'my_pidfile'
277        log_file = 'log_file'
278
279        pidfile_id = self.manager.execute_command(
280                command=['test', drone_manager.WORKING_DIRECTORY],
281                working_directory=self._WORKING_DIRECTORY,
282                pidfile_name=pidfile_name,
283                num_processes=1,
284                log_file=log_file)
285
286        full_working_directory = os.path.join(self._DRONE_RESULTS_DIR,
287                                              self._WORKING_DIRECTORY)
288        self.assertEquals(pidfile_id.path,
289                          os.path.join(full_working_directory, pidfile_name))
290        self.assert_(self.mock_drone.was_call_queued(
291                'execute_command', ['test', full_working_directory],
292                full_working_directory,
293                os.path.join(self._DRONE_RESULTS_DIR, log_file), pidfile_name))
294
295
296    def test_attach_file_to_execution(self):
297        self.manager._enqueue_drone(self.mock_drone)
298
299        contents = 'my\ncontents'
300        attached_path = self.manager.attach_file_to_execution(
301                self._WORKING_DIRECTORY, contents)
302        self.manager.execute_command(command=['test'],
303                                     working_directory=self._WORKING_DIRECTORY,
304                                     pidfile_name='mypidfile',
305                                     num_processes=1,
306                                     drone_hostnames_allowed=None)
307
308        self.assert_(self.mock_drone.was_call_queued(
309                'write_to_file',
310                os.path.join(self._DRONE_RESULTS_DIR, attached_path),
311                contents))
312
313
314    def test_copy_results_on_drone(self):
315        self.manager.copy_results_on_drone(self.mock_drone_process,
316                                           self._SOURCE_PATH,
317                                           self._DESTINATION_PATH)
318        self.assert_(self.mock_drone.was_call_queued(
319                'copy_file_or_directory',
320                os.path.join(self._DRONE_RESULTS_DIR, self._SOURCE_PATH),
321                os.path.join(self._DRONE_RESULTS_DIR, self._DESTINATION_PATH)))
322
323
324    def test_copy_to_results_repository(self):
325        site_drone_manager.ENABLE_ARCHIVING = True
326        self.manager.copy_to_results_repository(self.mock_drone_process,
327                                                self._SOURCE_PATH)
328        self.assert_(self.mock_drone.was_file_sent(
329                self.results_drone,
330                os.path.join(self._DRONE_RESULTS_DIR, self._SOURCE_PATH),
331                os.path.join(self._RESULTS_DIR, self._SOURCE_PATH)))
332
333
334    def test_write_lines_to_file(self):
335        file_path = 'file/path'
336        lines = ['line1', 'line2']
337        written_data = 'line1\nline2\n'
338
339        # write to results repository
340        self.manager.write_lines_to_file(file_path, lines)
341        self.assert_(self.results_drone.was_call_queued(
342                'write_to_file', os.path.join(self._RESULTS_DIR, file_path),
343                written_data))
344
345        # write to a drone
346        self.manager.write_lines_to_file(
347                file_path, lines, paired_with_process=self.mock_drone_process)
348        self.assert_(self.mock_drone.was_call_queued(
349                'write_to_file',
350                os.path.join(self._DRONE_RESULTS_DIR, file_path), written_data))
351
352
353    def test_pidfile_expiration(self):
354        self.god.stub_with(self.manager, '_get_max_pidfile_refreshes',
355                           lambda: 0)
356        pidfile_id = self.manager.get_pidfile_id_from('tag', 'name')
357        self.manager.register_pidfile(pidfile_id)
358        self.manager._drop_old_pidfiles()
359        self.manager._drop_old_pidfiles()
360        self.assertFalse(self.manager._registered_pidfile_info)
361
362
363class ThreadedDroneTest(unittest.TestCase):
364    _DRONE_INSTALL_DIR = '/drone/install/dir'
365    _RESULTS_DIR = '/results/dir'
366    _DRONE_CLASS = drones._RemoteDrone
367    _DRONE_HOST = ssh_host.SSHHost
368
369
370    def create_drone(self, drone_hostname, mock_hostname,
371                     timestamp_remote_calls=False):
372        """Create and initialize a Remote Drone.
373
374        @return: A remote drone instance.
375        """
376        mock_host = self.god.create_mock_class(self._DRONE_HOST, mock_hostname)
377        self.god.stub_function(drones.drone_utility, 'create_host')
378        drones.drone_utility.create_host.expect_call(drone_hostname).and_return(
379                mock_host)
380        mock_host.is_up.expect_call().and_return(True)
381        return self._DRONE_CLASS(drone_hostname,
382                                 timestamp_remote_calls=timestamp_remote_calls)
383
384
385    def create_fake_pidfile_info(self, tag='tag', name='name'):
386        pidfile_id = self.manager.get_pidfile_id_from(tag, name)
387        self.manager.register_pidfile(pidfile_id)
388        return self.manager._registered_pidfile_info
389
390
391    def setUp(self):
392        self.god = mock.mock_god()
393        self.god.stub_with(drones, 'AUTOTEST_INSTALL_DIR',
394                           self._DRONE_INSTALL_DIR)
395        self.manager = drone_manager.DroneManager()
396        self.god.stub_with(self.manager, '_results_dir', self._RESULTS_DIR)
397
398        # we don't want this to ever actually get called
399        self.god.stub_function(drones, 'get_drone')
400        # we don't want the DroneManager to go messing with global config
401        def do_nothing():
402            pass
403        self.god.stub_with(self.manager, 'refresh_drone_configs', do_nothing)
404
405        self.results_drone = MockDrone('results_drone', 0, 10)
406        self.manager._results_drone = self.results_drone
407        self.drone_utility_path = 'mock-drone-utility-path'
408        self.mock_return = {'results': ['mock results'],
409                            'warnings': []}
410
411
412    def tearDown(self):
413        self.god.unstub_all()
414
415    def test_trigger_refresh(self):
416        """Test drone manager trigger refresh."""
417        self.god.stub_with(self._DRONE_CLASS, '_drone_utility_path',
418                           self.drone_utility_path)
419        mock_drone = self.create_drone('fakedrone1', 'fakehost1')
420        self.manager._drones[mock_drone.hostname] = mock_drone
421
422        # Create some fake pidfiles and confirm that a refresh call is
423        # executed on each drone host, with the same pidfile paths. Then
424        # check that each drone gets a key in the returned results dictionary.
425        for i in range(0, 1):
426            pidfile_info = self.create_fake_pidfile_info(
427                    'tag%s' % i, 'name%s' %i)
428        pidfile_paths = [pidfile.path for pidfile in pidfile_info.keys()]
429        refresh_call = drone_utility.call('refresh', pidfile_paths)
430        expected_results = {}
431        mock_result = utils.CmdResult(
432                stdout=cPickle.dumps(self.mock_return))
433        for drone in self.manager.get_drones():
434            drone._host.run.expect_call(
435                    'python %s' % self.drone_utility_path,
436                    stdin=cPickle.dumps([refresh_call]), stdout_tee=None,
437                    connect_timeout=mock.is_instance_comparator(int)
438                ).and_return(mock_result)
439            expected_results[drone] = self.mock_return['results']
440        self.manager.trigger_refresh()
441        self.assertTrue(self.manager._refresh_task_queue.get_results() ==
442                        expected_results)
443        self.god.check_playback()
444
445
446    def test_sync_refresh(self):
447        """Test drone manager sync refresh."""
448
449        mock_drone = self.create_drone('fakedrone1', 'fakehost1')
450        self.manager._drones[mock_drone.hostname] = mock_drone
451
452        # Insert some drone_utility results into the results queue, then
453        # check that get_results returns it in the right format, and that
454        # the rest of sync_refresh populates the right datastructures for
455        # correct handling of agents. Also confirm that this method of
456        # syncing is sufficient for the monitor to pick up the exit status
457        # of the process in the same way it would in handle_agents.
458        pidfile_path = 'results/hosts/host_id/job_id-name/.autoserv_execute'
459        pidfiles = {pidfile_path: '123\n12\n0\n'}
460        drone_utility_results = {
461                'pidfiles': pidfiles,
462                'autoserv_processes':{},
463                'all_processes':{},
464                'parse_processes':{},
465                'pidfiles_second_read':pidfiles,
466        }
467        # Our manager instance isn't the drone manager singletone that the
468        # pidfile_monitor will use by default, becuase setUp doesn't call
469        # drone_manager.instance().
470        self.god.stub_with(drone_manager, '_the_instance', self.manager)
471        monitor = pidfile_monitor.PidfileRunMonitor()
472        monitor.pidfile_id = drone_manager.PidfileId(pidfile_path)
473        self.manager.register_pidfile(monitor.pidfile_id)
474        self.assertTrue(monitor._state.exit_status == None)
475
476        self.manager._refresh_task_queue.results_queue.put(
477                thread_lib.ThreadedTaskQueue.result(
478                    mock_drone, [drone_utility_results]))
479        self.manager.sync_refresh()
480        pidfiles = self.manager._pidfiles
481        pidfile_id = pidfiles.keys()[0]
482        pidfile_contents = pidfiles[pidfile_id]
483
484        self.assertTrue(
485                pidfile_id.path == pidfile_path and
486                pidfile_contents.process.pid == 123 and
487                pidfile_contents.process.hostname ==
488                        mock_drone.hostname and
489                pidfile_contents.exit_status == 12 and
490                pidfile_contents.num_tests_failed == 0)
491        self.assertTrue(monitor.exit_code() == 12)
492        self.god.check_playback()
493
494
495class ThreadedLocalhostDroneTest(ThreadedDroneTest):
496    _DRONE_CLASS = drones._LocalDrone
497    _DRONE_HOST = local_host.LocalHost
498
499
500    def create_drone(self, drone_hostname, mock_hostname,
501                     timestamp_remote_calls=False):
502        """Create and initialize a Remote Drone.
503
504        @return: A remote drone instance.
505        """
506        mock_host = self.god.create_mock_class(self._DRONE_HOST, mock_hostname)
507        self.god.stub_function(drones.drone_utility, 'create_host')
508        local_drone = self._DRONE_CLASS(
509                timestamp_remote_calls=timestamp_remote_calls)
510        self.god.stub_with(local_drone, '_host', mock_host)
511        return local_drone
512
513
514if __name__ == '__main__':
515    unittest.main()
516