1#!/usr/bin/python 2 3import cPickle 4import os, unittest 5import common 6from autotest_lib.client.bin import local_host 7from autotest_lib.client.common_lib import global_config 8from autotest_lib.client.common_lib import utils 9from autotest_lib.client.common_lib.test_utils import mock 10from autotest_lib.frontend import setup_django_lite_environment 11from autotest_lib.scheduler import drone_manager, drone_utility, drones 12from autotest_lib.scheduler import scheduler_config, site_drone_manager 13from autotest_lib.scheduler import thread_lib 14from autotest_lib.scheduler import pidfile_monitor 15from autotest_lib.server.hosts import ssh_host 16 17 18class MockDrone(drones._AbstractDrone): 19 def __init__(self, name, active_processes=0, max_processes=10, 20 allowed_users=None, support_ssp=False): 21 super(MockDrone, self).__init__() 22 self.name = name 23 self.hostname = name 24 self.active_processes = active_processes 25 self.max_processes = max_processes 26 self.allowed_users = allowed_users 27 self._host = 'mock_drone' 28 self._support_ssp = support_ssp 29 # maps method names list of tuples containing method arguments 30 self._recorded_calls = {'queue_call': [], 31 'send_file_to': []} 32 33 34 def queue_call(self, method, *args, **kwargs): 35 self._recorded_calls['queue_call'].append((method, args, kwargs)) 36 37 38 def call(self, method, *args, **kwargs): 39 # don't bother differentiating between call() and queue_call() 40 return self.queue_call(method, *args, **kwargs) 41 42 43 def send_file_to(self, drone, source_path, destination_path, 44 can_fail=False): 45 self._recorded_calls['send_file_to'].append( 46 (drone, source_path, destination_path)) 47 48 49 # method for use by tests 50 def _check_for_recorded_call(self, method_name, arguments): 51 recorded_arg_list = self._recorded_calls[method_name] 52 was_called = arguments in recorded_arg_list 53 if not was_called: 54 print 'Recorded args:', recorded_arg_list 55 print 'Expected:', arguments 56 return was_called 57 58 59 def was_call_queued(self, method, *args, **kwargs): 60 return self._check_for_recorded_call('queue_call', 61 (method, args, kwargs)) 62 63 64 def was_file_sent(self, drone, source_path, destination_path): 65 return self._check_for_recorded_call('send_file_to', 66 (drone, source_path, 67 destination_path)) 68 69 70class DroneManager(unittest.TestCase): 71 _DRONE_INSTALL_DIR = '/drone/install/dir' 72 _DRONE_RESULTS_DIR = os.path.join(_DRONE_INSTALL_DIR, 'results') 73 _RESULTS_DIR = '/results/dir' 74 _SOURCE_PATH = 'source/path' 75 _DESTINATION_PATH = 'destination/path' 76 _WORKING_DIRECTORY = 'working/directory' 77 _USERNAME = 'my_user' 78 79 def setUp(self): 80 self.god = mock.mock_god() 81 self.god.stub_with(drones, 'AUTOTEST_INSTALL_DIR', 82 self._DRONE_INSTALL_DIR) 83 self.manager = drone_manager.DroneManager() 84 self.god.stub_with(self.manager, '_results_dir', self._RESULTS_DIR) 85 86 # we don't want this to ever actually get called 87 self.god.stub_function(drones, 'get_drone') 88 # we don't want the DroneManager to go messing with global config 89 def do_nothing(): 90 pass 91 self.god.stub_with(self.manager, 'refresh_drone_configs', do_nothing) 92 93 # set up some dummy drones 94 self.mock_drone = MockDrone('mock_drone') 95 self.manager._drones[self.mock_drone.name] = self.mock_drone 96 self.results_drone = MockDrone('results_drone', 0, 10) 97 self.manager._results_drone = self.results_drone 98 99 self.mock_drone_process = drone_manager.Process(self.mock_drone.name, 0) 100 101 102 def tearDown(self): 103 self.god.unstub_all() 104 105 106 def _test_choose_drone_for_execution_helper(self, processes_info_list, 107 requested_processes, 108 require_ssp=False): 109 for index, process_info in enumerate(processes_info_list): 110 if len(process_info) == 2: 111 active_processes, max_processes = process_info 112 support_ssp = False 113 else: 114 active_processes, max_processes, support_ssp = process_info 115 self.manager._enqueue_drone(MockDrone( 116 index, active_processes, max_processes, allowed_users=None, 117 support_ssp=support_ssp)) 118 119 return self.manager._choose_drone_for_execution( 120 requested_processes, self._USERNAME, None, require_ssp) 121 122 123 def test_choose_drone_for_execution(self): 124 drone = self._test_choose_drone_for_execution_helper([(1, 2), (0, 2)], 125 1) 126 self.assertEquals(drone.name, 1) 127 128 129 def test_choose_drone_for_execution_some_full(self): 130 drone = self._test_choose_drone_for_execution_helper([(0, 1), (1, 3)], 131 2) 132 self.assertEquals(drone.name, 1) 133 134 135 def test_choose_drone_for_execution_all_full(self): 136 drone = self._test_choose_drone_for_execution_helper([(2, 1), (3, 2)], 137 1) 138 self.assertEquals(drone.name, 1) 139 140 141 def test_choose_drone_for_execution_all_full_same_percentage_capacity(self): 142 drone = self._test_choose_drone_for_execution_helper([(5, 3), (10, 6)], 143 1) 144 self.assertEquals(drone.name, 1) 145 146 147 def test_choose_drone_for_execution_no_ssp_support(self): 148 drone = self._test_choose_drone_for_execution_helper( 149 [(0, 1), (1, 3)], 1, True) 150 self.assertEquals(drone.name, 0) 151 152 153 def test_choose_drone_for_execution_with_ssp_support(self): 154 self.mock_drone._support_ssp = True 155 drone = self._test_choose_drone_for_execution_helper( 156 [(0, 1), (1, 3, True)], 1, True) 157 self.assertEquals(drone.name, 1) 158 159 160 def test_user_restrictions(self): 161 # this drone is restricted to a different user 162 self.manager._enqueue_drone(MockDrone(1, max_processes=10, 163 allowed_users=['fakeuser'])) 164 # this drone is allowed but has lower capacity 165 self.manager._enqueue_drone(MockDrone(2, max_processes=2, 166 allowed_users=[self._USERNAME])) 167 168 self.assertEquals(2, 169 self.manager.max_runnable_processes(self._USERNAME, 170 None)) 171 drone = self.manager._choose_drone_for_execution( 172 1, username=self._USERNAME, drone_hostnames_allowed=None) 173 self.assertEquals(drone.name, 2) 174 175 176 def test_user_restrictions_with_full_drone(self): 177 # this drone is restricted to a different user 178 self.manager._enqueue_drone(MockDrone(1, max_processes=10, 179 allowed_users=['fakeuser'])) 180 # this drone is allowed but is full 181 self.manager._enqueue_drone(MockDrone(2, active_processes=3, 182 max_processes=2, 183 allowed_users=[self._USERNAME])) 184 185 self.assertEquals(0, 186 self.manager.max_runnable_processes(self._USERNAME, 187 None)) 188 drone = self.manager._choose_drone_for_execution( 189 1, username=self._USERNAME, drone_hostnames_allowed=None) 190 self.assertEquals(drone.name, 2) 191 192 193 def _setup_test_drone_restrictions(self, active_processes=0): 194 self.manager._enqueue_drone(MockDrone( 195 1, active_processes=active_processes, max_processes=10)) 196 self.manager._enqueue_drone(MockDrone( 197 2, active_processes=active_processes, max_processes=5)) 198 self.manager._enqueue_drone(MockDrone( 199 3, active_processes=active_processes, max_processes=2)) 200 201 202 def test_drone_restrictions_allow_any(self): 203 self._setup_test_drone_restrictions() 204 self.assertEquals(10, 205 self.manager.max_runnable_processes(self._USERNAME, 206 None)) 207 drone = self.manager._choose_drone_for_execution( 208 1, username=self._USERNAME, drone_hostnames_allowed=None) 209 self.assertEqual(drone.name, 1) 210 211 212 def test_drone_restrictions_under_capacity(self): 213 self._setup_test_drone_restrictions() 214 drone_hostnames_allowed = (2, 3) 215 self.assertEquals( 216 5, self.manager.max_runnable_processes(self._USERNAME, 217 drone_hostnames_allowed)) 218 drone = self.manager._choose_drone_for_execution( 219 1, username=self._USERNAME, 220 drone_hostnames_allowed=drone_hostnames_allowed) 221 222 self.assertEqual(drone.name, 2) 223 224 225 def test_drone_restrictions_over_capacity(self): 226 self._setup_test_drone_restrictions(active_processes=6) 227 drone_hostnames_allowed = (2, 3) 228 self.assertEquals( 229 0, self.manager.max_runnable_processes(self._USERNAME, 230 drone_hostnames_allowed)) 231 drone = self.manager._choose_drone_for_execution( 232 7, username=self._USERNAME, 233 drone_hostnames_allowed=drone_hostnames_allowed) 234 self.assertEqual(drone.name, 2) 235 236 237 def test_drone_restrictions_allow_none(self): 238 self._setup_test_drone_restrictions() 239 drone_hostnames_allowed = () 240 self.assertEquals( 241 0, self.manager.max_runnable_processes(self._USERNAME, 242 drone_hostnames_allowed)) 243 drone = self.manager._choose_drone_for_execution( 244 1, username=self._USERNAME, 245 drone_hostnames_allowed=drone_hostnames_allowed) 246 self.assertEqual(drone, None) 247 248 249 def test_initialize(self): 250 results_hostname = 'results_repo' 251 results_install_dir = '/results/install' 252 global_config.global_config.override_config_value( 253 scheduler_config.CONFIG_SECTION, 254 'results_host_installation_directory', results_install_dir) 255 256 (drones.get_drone.expect_call(self.mock_drone.name) 257 .and_return(self.mock_drone)) 258 259 results_drone = MockDrone('results_drone') 260 self.god.stub_function(results_drone, 'set_autotest_install_dir') 261 drones.get_drone.expect_call(results_hostname).and_return(results_drone) 262 results_drone.set_autotest_install_dir.expect_call(results_install_dir) 263 264 self.manager.initialize(base_results_dir=self._RESULTS_DIR, 265 drone_hostnames=[self.mock_drone.name], 266 results_repository_hostname=results_hostname) 267 268 self.assert_(self.mock_drone.was_call_queued( 269 'initialize', self._DRONE_RESULTS_DIR + '/')) 270 self.god.check_playback() 271 272 273 def test_execute_command(self): 274 self.manager._enqueue_drone(self.mock_drone) 275 276 pidfile_name = 'my_pidfile' 277 log_file = 'log_file' 278 279 pidfile_id = self.manager.execute_command( 280 command=['test', drone_manager.WORKING_DIRECTORY], 281 working_directory=self._WORKING_DIRECTORY, 282 pidfile_name=pidfile_name, 283 num_processes=1, 284 log_file=log_file) 285 286 full_working_directory = os.path.join(self._DRONE_RESULTS_DIR, 287 self._WORKING_DIRECTORY) 288 self.assertEquals(pidfile_id.path, 289 os.path.join(full_working_directory, pidfile_name)) 290 self.assert_(self.mock_drone.was_call_queued( 291 'execute_command', ['test', full_working_directory], 292 full_working_directory, 293 os.path.join(self._DRONE_RESULTS_DIR, log_file), pidfile_name)) 294 295 296 def test_attach_file_to_execution(self): 297 self.manager._enqueue_drone(self.mock_drone) 298 299 contents = 'my\ncontents' 300 attached_path = self.manager.attach_file_to_execution( 301 self._WORKING_DIRECTORY, contents) 302 self.manager.execute_command(command=['test'], 303 working_directory=self._WORKING_DIRECTORY, 304 pidfile_name='mypidfile', 305 num_processes=1, 306 drone_hostnames_allowed=None) 307 308 self.assert_(self.mock_drone.was_call_queued( 309 'write_to_file', 310 os.path.join(self._DRONE_RESULTS_DIR, attached_path), 311 contents)) 312 313 314 def test_copy_results_on_drone(self): 315 self.manager.copy_results_on_drone(self.mock_drone_process, 316 self._SOURCE_PATH, 317 self._DESTINATION_PATH) 318 self.assert_(self.mock_drone.was_call_queued( 319 'copy_file_or_directory', 320 os.path.join(self._DRONE_RESULTS_DIR, self._SOURCE_PATH), 321 os.path.join(self._DRONE_RESULTS_DIR, self._DESTINATION_PATH))) 322 323 324 def test_copy_to_results_repository(self): 325 site_drone_manager.ENABLE_ARCHIVING = True 326 self.manager.copy_to_results_repository(self.mock_drone_process, 327 self._SOURCE_PATH) 328 self.assert_(self.mock_drone.was_file_sent( 329 self.results_drone, 330 os.path.join(self._DRONE_RESULTS_DIR, self._SOURCE_PATH), 331 os.path.join(self._RESULTS_DIR, self._SOURCE_PATH))) 332 333 334 def test_write_lines_to_file(self): 335 file_path = 'file/path' 336 lines = ['line1', 'line2'] 337 written_data = 'line1\nline2\n' 338 339 # write to results repository 340 self.manager.write_lines_to_file(file_path, lines) 341 self.assert_(self.results_drone.was_call_queued( 342 'write_to_file', os.path.join(self._RESULTS_DIR, file_path), 343 written_data)) 344 345 # write to a drone 346 self.manager.write_lines_to_file( 347 file_path, lines, paired_with_process=self.mock_drone_process) 348 self.assert_(self.mock_drone.was_call_queued( 349 'write_to_file', 350 os.path.join(self._DRONE_RESULTS_DIR, file_path), written_data)) 351 352 353 def test_pidfile_expiration(self): 354 self.god.stub_with(self.manager, '_get_max_pidfile_refreshes', 355 lambda: 0) 356 pidfile_id = self.manager.get_pidfile_id_from('tag', 'name') 357 self.manager.register_pidfile(pidfile_id) 358 self.manager._drop_old_pidfiles() 359 self.manager._drop_old_pidfiles() 360 self.assertFalse(self.manager._registered_pidfile_info) 361 362 363class ThreadedDroneTest(unittest.TestCase): 364 _DRONE_INSTALL_DIR = '/drone/install/dir' 365 _RESULTS_DIR = '/results/dir' 366 _DRONE_CLASS = drones._RemoteDrone 367 _DRONE_HOST = ssh_host.SSHHost 368 369 370 def create_drone(self, drone_hostname, mock_hostname, 371 timestamp_remote_calls=False): 372 """Create and initialize a Remote Drone. 373 374 @return: A remote drone instance. 375 """ 376 mock_host = self.god.create_mock_class(self._DRONE_HOST, mock_hostname) 377 self.god.stub_function(drones.drone_utility, 'create_host') 378 drones.drone_utility.create_host.expect_call(drone_hostname).and_return( 379 mock_host) 380 mock_host.is_up.expect_call().and_return(True) 381 return self._DRONE_CLASS(drone_hostname, 382 timestamp_remote_calls=timestamp_remote_calls) 383 384 385 def create_fake_pidfile_info(self, tag='tag', name='name'): 386 pidfile_id = self.manager.get_pidfile_id_from(tag, name) 387 self.manager.register_pidfile(pidfile_id) 388 return self.manager._registered_pidfile_info 389 390 391 def setUp(self): 392 self.god = mock.mock_god() 393 self.god.stub_with(drones, 'AUTOTEST_INSTALL_DIR', 394 self._DRONE_INSTALL_DIR) 395 self.manager = drone_manager.DroneManager() 396 self.god.stub_with(self.manager, '_results_dir', self._RESULTS_DIR) 397 398 # we don't want this to ever actually get called 399 self.god.stub_function(drones, 'get_drone') 400 # we don't want the DroneManager to go messing with global config 401 def do_nothing(): 402 pass 403 self.god.stub_with(self.manager, 'refresh_drone_configs', do_nothing) 404 405 self.results_drone = MockDrone('results_drone', 0, 10) 406 self.manager._results_drone = self.results_drone 407 self.drone_utility_path = 'mock-drone-utility-path' 408 self.mock_return = {'results': ['mock results'], 409 'warnings': []} 410 411 412 def tearDown(self): 413 self.god.unstub_all() 414 415 def test_trigger_refresh(self): 416 """Test drone manager trigger refresh.""" 417 self.god.stub_with(self._DRONE_CLASS, '_drone_utility_path', 418 self.drone_utility_path) 419 mock_drone = self.create_drone('fakedrone1', 'fakehost1') 420 self.manager._drones[mock_drone.hostname] = mock_drone 421 422 # Create some fake pidfiles and confirm that a refresh call is 423 # executed on each drone host, with the same pidfile paths. Then 424 # check that each drone gets a key in the returned results dictionary. 425 for i in range(0, 1): 426 pidfile_info = self.create_fake_pidfile_info( 427 'tag%s' % i, 'name%s' %i) 428 pidfile_paths = [pidfile.path for pidfile in pidfile_info.keys()] 429 refresh_call = drone_utility.call('refresh', pidfile_paths) 430 expected_results = {} 431 mock_result = utils.CmdResult( 432 stdout=cPickle.dumps(self.mock_return)) 433 for drone in self.manager.get_drones(): 434 drone._host.run.expect_call( 435 'python %s' % self.drone_utility_path, 436 stdin=cPickle.dumps([refresh_call]), stdout_tee=None, 437 connect_timeout=mock.is_instance_comparator(int) 438 ).and_return(mock_result) 439 expected_results[drone] = self.mock_return['results'] 440 self.manager.trigger_refresh() 441 self.assertTrue(self.manager._refresh_task_queue.get_results() == 442 expected_results) 443 self.god.check_playback() 444 445 446 def test_sync_refresh(self): 447 """Test drone manager sync refresh.""" 448 449 mock_drone = self.create_drone('fakedrone1', 'fakehost1') 450 self.manager._drones[mock_drone.hostname] = mock_drone 451 452 # Insert some drone_utility results into the results queue, then 453 # check that get_results returns it in the right format, and that 454 # the rest of sync_refresh populates the right datastructures for 455 # correct handling of agents. Also confirm that this method of 456 # syncing is sufficient for the monitor to pick up the exit status 457 # of the process in the same way it would in handle_agents. 458 pidfile_path = 'results/hosts/host_id/job_id-name/.autoserv_execute' 459 pidfiles = {pidfile_path: '123\n12\n0\n'} 460 drone_utility_results = { 461 'pidfiles': pidfiles, 462 'autoserv_processes':{}, 463 'all_processes':{}, 464 'parse_processes':{}, 465 'pidfiles_second_read':pidfiles, 466 } 467 # Our manager instance isn't the drone manager singletone that the 468 # pidfile_monitor will use by default, becuase setUp doesn't call 469 # drone_manager.instance(). 470 self.god.stub_with(drone_manager, '_the_instance', self.manager) 471 monitor = pidfile_monitor.PidfileRunMonitor() 472 monitor.pidfile_id = drone_manager.PidfileId(pidfile_path) 473 self.manager.register_pidfile(monitor.pidfile_id) 474 self.assertTrue(monitor._state.exit_status == None) 475 476 self.manager._refresh_task_queue.results_queue.put( 477 thread_lib.ThreadedTaskQueue.result( 478 mock_drone, [drone_utility_results])) 479 self.manager.sync_refresh() 480 pidfiles = self.manager._pidfiles 481 pidfile_id = pidfiles.keys()[0] 482 pidfile_contents = pidfiles[pidfile_id] 483 484 self.assertTrue( 485 pidfile_id.path == pidfile_path and 486 pidfile_contents.process.pid == 123 and 487 pidfile_contents.process.hostname == 488 mock_drone.hostname and 489 pidfile_contents.exit_status == 12 and 490 pidfile_contents.num_tests_failed == 0) 491 self.assertTrue(monitor.exit_code() == 12) 492 self.god.check_playback() 493 494 495class ThreadedLocalhostDroneTest(ThreadedDroneTest): 496 _DRONE_CLASS = drones._LocalDrone 497 _DRONE_HOST = local_host.LocalHost 498 499 500 def create_drone(self, drone_hostname, mock_hostname, 501 timestamp_remote_calls=False): 502 """Create and initialize a Remote Drone. 503 504 @return: A remote drone instance. 505 """ 506 mock_host = self.god.create_mock_class(self._DRONE_HOST, mock_hostname) 507 self.god.stub_function(drones.drone_utility, 'create_host') 508 local_drone = self._DRONE_CLASS( 509 timestamp_remote_calls=timestamp_remote_calls) 510 self.god.stub_with(local_drone, '_host', mock_host) 511 return local_drone 512 513 514if __name__ == '__main__': 515 unittest.main() 516