1#!/usr/bin/env python2 2# pylint: disable=missing-docstring 3 4import datetime 5import mox 6import unittest 7 8import common 9from autotest_lib.client.common_lib import control_data 10from autotest_lib.client.common_lib import error 11from autotest_lib.client.common_lib import global_config 12from autotest_lib.client.common_lib import priorities 13from autotest_lib.client.common_lib.cros import dev_server 14from autotest_lib.client.common_lib.test_utils import mock 15from autotest_lib.frontend import setup_django_environment 16from autotest_lib.frontend.afe import frontend_test_utils 17from autotest_lib.frontend.afe import model_logic 18from autotest_lib.frontend.afe import models 19from autotest_lib.frontend.afe import rpc_interface 20from autotest_lib.frontend.afe import rpc_utils 21from autotest_lib.server import frontend 22from autotest_lib.server import utils as server_utils 23from autotest_lib.server.cros import provision 24from autotest_lib.server.cros.dynamic_suite import constants 25from autotest_lib.server.cros.dynamic_suite import control_file_getter 26from autotest_lib.server.cros.dynamic_suite import frontend_wrappers 27 28CLIENT = control_data.CONTROL_TYPE_NAMES.CLIENT 29SERVER = control_data.CONTROL_TYPE_NAMES.SERVER 30 31_hqe_status = models.HostQueueEntry.Status 32 33 34class RpcInterfaceTest(unittest.TestCase, 35 frontend_test_utils.FrontendTestMixin): 36 def setUp(self): 37 self._frontend_common_setup() 38 self.god = mock.mock_god() 39 40 41 def tearDown(self): 42 self.god.unstub_all() 43 self._frontend_common_teardown() 44 global_config.global_config.reset_config_values() 45 46 47 def test_validation(self): 48 # omit a required field 49 self.assertRaises(model_logic.ValidationError, rpc_interface.add_label, 50 name=None) 51 # violate uniqueness constraint 52 self.assertRaises(model_logic.ValidationError, rpc_interface.add_host, 53 hostname='host1') 54 55 56 def test_multiple_platforms(self): 57 platform2 = models.Label.objects.create(name='platform2', platform=True) 58 self.assertRaises(model_logic.ValidationError, 59 rpc_interface. label_add_hosts, id='platform2', 60 hosts=['host1', 'host2']) 61 self.assertRaises(model_logic.ValidationError, 62 rpc_interface.host_add_labels, 63 id='host1', labels=['platform2']) 64 # make sure the platform didn't get added 65 platforms = rpc_interface.get_labels( 66 host__hostname__in=['host1', 'host2'], platform=True) 67 self.assertEquals(len(platforms), 1) 68 self.assertEquals(platforms[0]['name'], 'myplatform') 69 70 71 def _check_hostnames(self, hosts, expected_hostnames): 72 self.assertEquals(set(host['hostname'] for host in hosts), 73 set(expected_hostnames)) 74 75 76 def test_get_hosts(self): 77 hosts = rpc_interface.get_hosts() 78 self._check_hostnames(hosts, [host.hostname for host in self.hosts]) 79 80 hosts = rpc_interface.get_hosts(hostname='host1') 81 self._check_hostnames(hosts, ['host1']) 82 host = hosts[0] 83 self.assertEquals(sorted(host['labels']), ['label1', 'myplatform']) 84 self.assertEquals(host['platform'], 'myplatform') 85 self.assertEquals(host['acls'], ['my_acl']) 86 self.assertEquals(host['attributes'], {}) 87 88 89 def test_get_hosts_multiple_labels(self): 90 hosts = rpc_interface.get_hosts( 91 multiple_labels=['myplatform', 'label1']) 92 self._check_hostnames(hosts, ['host1']) 93 94 95 def test_get_hosts_exclude_only_if_needed(self): 96 self.hosts[0].labels.add(self.label3) 97 98 hosts = rpc_interface.get_hosts(hostname__in=['host1', 'host2'], 99 exclude_only_if_needed_labels=True) 100 self._check_hostnames(hosts, ['host2']) 101 102 103 def test_job_keyvals(self): 104 keyval_dict = {'mykey': 'myvalue'} 105 job_id = rpc_interface.create_job(name='test', 106 priority=priorities.Priority.DEFAULT, 107 control_file='foo', 108 control_type=CLIENT, 109 hosts=['host1'], 110 keyvals=keyval_dict) 111 jobs = rpc_interface.get_jobs(id=job_id) 112 self.assertEquals(len(jobs), 1) 113 self.assertEquals(jobs[0]['keyvals'], keyval_dict) 114 115 116 def test_test_retry(self): 117 job_id = rpc_interface.create_job(name='flake', 118 priority=priorities.Priority.DEFAULT, 119 control_file='foo', 120 control_type=CLIENT, 121 hosts=['host1'], 122 test_retry=10) 123 jobs = rpc_interface.get_jobs(id=job_id) 124 self.assertEquals(len(jobs), 1) 125 self.assertEquals(jobs[0]['test_retry'], 10) 126 127 128 def test_get_jobs_summary(self): 129 job = self._create_job(hosts=xrange(1, 4)) 130 entries = list(job.hostqueueentry_set.all()) 131 entries[1].status = _hqe_status.FAILED 132 entries[1].save() 133 entries[2].status = _hqe_status.FAILED 134 entries[2].aborted = True 135 entries[2].save() 136 137 # Mock up tko_rpc_interface.get_status_counts. 138 self.god.stub_function_to_return(rpc_interface.tko_rpc_interface, 139 'get_status_counts', 140 None) 141 142 job_summaries = rpc_interface.get_jobs_summary(id=job.id) 143 self.assertEquals(len(job_summaries), 1) 144 summary = job_summaries[0] 145 self.assertEquals(summary['status_counts'], {'Queued': 1, 146 'Failed': 2}) 147 148 149 def _check_job_ids(self, actual_job_dicts, expected_jobs): 150 self.assertEquals( 151 set(job_dict['id'] for job_dict in actual_job_dicts), 152 set(job.id for job in expected_jobs)) 153 154 155 def test_get_jobs_status_filters(self): 156 HqeStatus = models.HostQueueEntry.Status 157 def create_two_host_job(): 158 return self._create_job(hosts=[1, 2]) 159 def set_hqe_statuses(job, first_status, second_status): 160 entries = job.hostqueueentry_set.all() 161 entries[0].update_object(status=first_status) 162 entries[1].update_object(status=second_status) 163 164 queued = create_two_host_job() 165 166 queued_and_running = create_two_host_job() 167 set_hqe_statuses(queued_and_running, HqeStatus.QUEUED, 168 HqeStatus.RUNNING) 169 170 running_and_complete = create_two_host_job() 171 set_hqe_statuses(running_and_complete, HqeStatus.RUNNING, 172 HqeStatus.COMPLETED) 173 174 complete = create_two_host_job() 175 set_hqe_statuses(complete, HqeStatus.COMPLETED, HqeStatus.COMPLETED) 176 177 started_but_inactive = create_two_host_job() 178 set_hqe_statuses(started_but_inactive, HqeStatus.QUEUED, 179 HqeStatus.COMPLETED) 180 181 parsing = create_two_host_job() 182 set_hqe_statuses(parsing, HqeStatus.PARSING, HqeStatus.PARSING) 183 184 self._check_job_ids(rpc_interface.get_jobs(not_yet_run=True), [queued]) 185 self._check_job_ids(rpc_interface.get_jobs(running=True), 186 [queued_and_running, running_and_complete, 187 started_but_inactive, parsing]) 188 self._check_job_ids(rpc_interface.get_jobs(finished=True), [complete]) 189 190 191 def test_get_jobs_type_filters(self): 192 self.assertRaises(AssertionError, rpc_interface.get_jobs, 193 suite=True, sub=True) 194 self.assertRaises(AssertionError, rpc_interface.get_jobs, 195 suite=True, standalone=True) 196 self.assertRaises(AssertionError, rpc_interface.get_jobs, 197 standalone=True, sub=True) 198 199 parent_job = self._create_job(hosts=[1]) 200 child_jobs = self._create_job(hosts=[1, 2], 201 parent_job_id=parent_job.id) 202 standalone_job = self._create_job(hosts=[1]) 203 204 self._check_job_ids(rpc_interface.get_jobs(suite=True), [parent_job]) 205 self._check_job_ids(rpc_interface.get_jobs(sub=True), [child_jobs]) 206 self._check_job_ids(rpc_interface.get_jobs(standalone=True), 207 [standalone_job]) 208 209 210 def _create_job_helper(self, **kwargs): 211 return rpc_interface.create_job(name='test', 212 priority=priorities.Priority.DEFAULT, 213 control_file='control file', 214 control_type=SERVER, **kwargs) 215 216 217 def test_one_time_hosts(self): 218 job = self._create_job_helper(one_time_hosts=['testhost']) 219 host = models.Host.objects.get(hostname='testhost') 220 self.assertEquals(host.invalid, True) 221 self.assertEquals(host.labels.count(), 0) 222 self.assertEquals(host.aclgroup_set.count(), 0) 223 224 225 def test_create_job_duplicate_hosts(self): 226 self.assertRaises(model_logic.ValidationError, self._create_job_helper, 227 hosts=[1, 1]) 228 229 230 def test_create_unrunnable_metahost_job(self): 231 self.assertRaises(error.NoEligibleHostException, 232 self._create_job_helper, meta_hosts=['unused']) 233 234 235 def test_create_hostless_job(self): 236 job_id = self._create_job_helper(hostless=True) 237 job = models.Job.objects.get(pk=job_id) 238 queue_entries = job.hostqueueentry_set.all() 239 self.assertEquals(len(queue_entries), 1) 240 self.assertEquals(queue_entries[0].host, None) 241 self.assertEquals(queue_entries[0].meta_host, None) 242 243 244 def _setup_special_tasks(self): 245 host = self.hosts[0] 246 247 job1 = self._create_job(hosts=[1]) 248 job2 = self._create_job(hosts=[1]) 249 250 entry1 = job1.hostqueueentry_set.all()[0] 251 entry1.update_object(started_on=datetime.datetime(2009, 1, 2), 252 execution_subdir='host1') 253 entry2 = job2.hostqueueentry_set.all()[0] 254 entry2.update_object(started_on=datetime.datetime(2009, 1, 3), 255 execution_subdir='host1') 256 257 self.task1 = models.SpecialTask.objects.create( 258 host=host, task=models.SpecialTask.Task.VERIFY, 259 time_started=datetime.datetime(2009, 1, 1), # ran before job 1 260 is_complete=True, requested_by=models.User.current_user()) 261 self.task2 = models.SpecialTask.objects.create( 262 host=host, task=models.SpecialTask.Task.VERIFY, 263 queue_entry=entry2, # ran with job 2 264 is_active=True, requested_by=models.User.current_user()) 265 self.task3 = models.SpecialTask.objects.create( 266 host=host, task=models.SpecialTask.Task.VERIFY, 267 requested_by=models.User.current_user()) # not yet run 268 269 270 def test_get_special_tasks(self): 271 self._setup_special_tasks() 272 tasks = rpc_interface.get_special_tasks(host__hostname='host1', 273 queue_entry__isnull=True) 274 self.assertEquals(len(tasks), 2) 275 self.assertEquals(tasks[0]['task'], models.SpecialTask.Task.VERIFY) 276 self.assertEquals(tasks[0]['is_active'], False) 277 self.assertEquals(tasks[0]['is_complete'], True) 278 279 280 def test_get_latest_special_task(self): 281 # a particular usage of get_special_tasks() 282 self._setup_special_tasks() 283 self.task2.time_started = datetime.datetime(2009, 1, 2) 284 self.task2.save() 285 286 tasks = rpc_interface.get_special_tasks( 287 host__hostname='host1', task=models.SpecialTask.Task.VERIFY, 288 time_started__isnull=False, sort_by=['-time_started'], 289 query_limit=1) 290 self.assertEquals(len(tasks), 1) 291 self.assertEquals(tasks[0]['id'], 2) 292 293 294 def _common_entry_check(self, entry_dict): 295 self.assertEquals(entry_dict['host']['hostname'], 'host1') 296 self.assertEquals(entry_dict['job']['id'], 2) 297 298 299 def test_get_host_queue_entries_and_special_tasks(self): 300 self._setup_special_tasks() 301 302 host = self.hosts[0].id 303 entries_and_tasks = ( 304 rpc_interface.get_host_queue_entries_and_special_tasks(host)) 305 306 paths = [entry['execution_path'] for entry in entries_and_tasks] 307 self.assertEquals(paths, ['hosts/host1/3-verify', 308 '2-autotest_system/host1', 309 'hosts/host1/2-verify', 310 '1-autotest_system/host1', 311 'hosts/host1/1-verify']) 312 313 verify2 = entries_and_tasks[2] 314 self._common_entry_check(verify2) 315 self.assertEquals(verify2['type'], 'Verify') 316 self.assertEquals(verify2['status'], 'Running') 317 self.assertEquals(verify2['execution_path'], 'hosts/host1/2-verify') 318 319 entry2 = entries_and_tasks[1] 320 self._common_entry_check(entry2) 321 self.assertEquals(entry2['type'], 'Job') 322 self.assertEquals(entry2['status'], 'Queued') 323 self.assertEquals(entry2['started_on'], '2009-01-03 00:00:00') 324 325 326 def test_view_invalid_host(self): 327 # RPCs used by View Host page should work for invalid hosts 328 self._create_job_helper(hosts=[1]) 329 host = self.hosts[0] 330 host.delete() 331 332 self.assertEquals(1, rpc_interface.get_num_hosts(hostname='host1', 333 valid_only=False)) 334 data = rpc_interface.get_hosts(hostname='host1', valid_only=False) 335 self.assertEquals(1, len(data)) 336 337 self.assertEquals(1, rpc_interface.get_num_host_queue_entries( 338 host__hostname='host1')) 339 data = rpc_interface.get_host_queue_entries(host__hostname='host1') 340 self.assertEquals(1, len(data)) 341 342 count = rpc_interface.get_num_host_queue_entries_and_special_tasks( 343 host=host.id) 344 self.assertEquals(1, count) 345 data = rpc_interface.get_host_queue_entries_and_special_tasks( 346 host=host.id) 347 self.assertEquals(1, len(data)) 348 349 350 def test_reverify_hosts(self): 351 hostname_list = rpc_interface.reverify_hosts(id__in=[1, 2]) 352 self.assertEquals(hostname_list, ['host1', 'host2']) 353 tasks = rpc_interface.get_special_tasks() 354 self.assertEquals(len(tasks), 2) 355 self.assertEquals(set(task['host']['id'] for task in tasks), 356 set([1, 2])) 357 358 task = tasks[0] 359 self.assertEquals(task['task'], models.SpecialTask.Task.VERIFY) 360 self.assertEquals(task['requested_by'], 'autotest_system') 361 362 363 def test_repair_hosts(self): 364 hostname_list = rpc_interface.repair_hosts(id__in=[1, 2]) 365 self.assertEquals(hostname_list, ['host1', 'host2']) 366 tasks = rpc_interface.get_special_tasks() 367 self.assertEquals(len(tasks), 2) 368 self.assertEquals(set(task['host']['id'] for task in tasks), 369 set([1, 2])) 370 371 task = tasks[0] 372 self.assertEquals(task['task'], models.SpecialTask.Task.REPAIR) 373 self.assertEquals(task['requested_by'], 'autotest_system') 374 375 376 def _modify_host_helper(self, on_shard=False, host_on_shard=False): 377 shard_hostname = 'shard1' 378 if on_shard: 379 global_config.global_config.override_config_value( 380 'SHARD', 'shard_hostname', shard_hostname) 381 382 host = models.Host.objects.all()[0] 383 if host_on_shard: 384 shard = models.Shard.objects.create(hostname=shard_hostname) 385 host.shard = shard 386 host.save() 387 388 self.assertFalse(host.locked) 389 390 self.god.stub_class_method(frontend.AFE, 'run') 391 392 if host_on_shard and not on_shard: 393 mock_afe = self.god.create_mock_class_obj( 394 frontend_wrappers.RetryingAFE, 'MockAFE') 395 self.god.stub_with(frontend_wrappers, 'RetryingAFE', mock_afe) 396 397 mock_afe2 = frontend_wrappers.RetryingAFE.expect_new( 398 server=shard_hostname, user=None) 399 mock_afe2.run.expect_call('modify_host_local', id=host.id, 400 locked=True, lock_reason='_modify_host_helper lock', 401 lock_time=datetime.datetime(2015, 12, 15)) 402 elif on_shard: 403 mock_afe = self.god.create_mock_class_obj( 404 frontend_wrappers.RetryingAFE, 'MockAFE') 405 self.god.stub_with(frontend_wrappers, 'RetryingAFE', mock_afe) 406 407 mock_afe2 = frontend_wrappers.RetryingAFE.expect_new( 408 server=server_utils.get_global_afe_hostname(), user=None) 409 mock_afe2.run.expect_call('modify_host', id=host.id, 410 locked=True, lock_reason='_modify_host_helper lock', 411 lock_time=datetime.datetime(2015, 12, 15)) 412 413 rpc_interface.modify_host(id=host.id, locked=True, 414 lock_reason='_modify_host_helper lock', 415 lock_time=datetime.datetime(2015, 12, 15)) 416 417 host = models.Host.objects.get(pk=host.id) 418 if on_shard: 419 # modify_host on shard does nothing but routing the RPC to master. 420 self.assertFalse(host.locked) 421 else: 422 self.assertTrue(host.locked) 423 self.god.check_playback() 424 425 426 def test_modify_host_on_master_host_on_master(self): 427 """Call modify_host to master for host in master.""" 428 self._modify_host_helper() 429 430 431 def test_modify_host_on_master_host_on_shard(self): 432 """Call modify_host to master for host in shard.""" 433 self._modify_host_helper(host_on_shard=True) 434 435 436 def test_modify_host_on_shard(self): 437 """Call modify_host to shard for host in shard.""" 438 self._modify_host_helper(on_shard=True, host_on_shard=True) 439 440 441 def test_modify_hosts_on_master_host_on_shard(self): 442 """Ensure calls to modify_hosts are correctly forwarded to shards.""" 443 host1 = models.Host.objects.all()[0] 444 host2 = models.Host.objects.all()[1] 445 446 shard1 = models.Shard.objects.create(hostname='shard1') 447 host1.shard = shard1 448 host1.save() 449 450 shard2 = models.Shard.objects.create(hostname='shard2') 451 host2.shard = shard2 452 host2.save() 453 454 self.assertFalse(host1.locked) 455 self.assertFalse(host2.locked) 456 457 mock_afe = self.god.create_mock_class_obj(frontend_wrappers.RetryingAFE, 458 'MockAFE') 459 self.god.stub_with(frontend_wrappers, 'RetryingAFE', mock_afe) 460 461 # The statuses of one host might differ on master and shard. 462 # Filters are always applied on the master. So the host on the shard 463 # will be affected no matter what his status is. 464 filters_to_use = {'status': 'Ready'} 465 466 mock_afe2 = frontend_wrappers.RetryingAFE.expect_new( 467 server='shard2', user=None) 468 mock_afe2.run.expect_call( 469 'modify_hosts_local', 470 host_filter_data={'id__in': [shard1.id, shard2.id]}, 471 update_data={'locked': True, 472 'lock_reason': 'Testing forward to shard', 473 'lock_time' : datetime.datetime(2015, 12, 15) }) 474 475 mock_afe1 = frontend_wrappers.RetryingAFE.expect_new( 476 server='shard1', user=None) 477 mock_afe1.run.expect_call( 478 'modify_hosts_local', 479 host_filter_data={'id__in': [shard1.id, shard2.id]}, 480 update_data={'locked': True, 481 'lock_reason': 'Testing forward to shard', 482 'lock_time' : datetime.datetime(2015, 12, 15)}) 483 484 rpc_interface.modify_hosts( 485 host_filter_data={'status': 'Ready'}, 486 update_data={'locked': True, 487 'lock_reason': 'Testing forward to shard', 488 'lock_time' : datetime.datetime(2015, 12, 15) }) 489 490 host1 = models.Host.objects.get(pk=host1.id) 491 self.assertTrue(host1.locked) 492 host2 = models.Host.objects.get(pk=host2.id) 493 self.assertTrue(host2.locked) 494 self.god.check_playback() 495 496 497 def test_delete_host(self): 498 """Ensure an RPC is made on delete a host, if it is on a shard.""" 499 host1 = models.Host.objects.all()[0] 500 shard1 = models.Shard.objects.create(hostname='shard1') 501 host1.shard = shard1 502 host1.save() 503 host1_id = host1.id 504 505 mock_afe = self.god.create_mock_class_obj(frontend_wrappers.RetryingAFE, 506 'MockAFE') 507 self.god.stub_with(frontend_wrappers, 'RetryingAFE', mock_afe) 508 509 mock_afe1 = frontend_wrappers.RetryingAFE.expect_new( 510 server='shard1', user=None) 511 mock_afe1.run.expect_call('delete_host', id=host1.id) 512 513 rpc_interface.delete_host(id=host1.id) 514 515 self.assertRaises(models.Host.DoesNotExist, 516 models.Host.smart_get, host1_id) 517 518 self.god.check_playback() 519 520 521 def test_modify_label(self): 522 label1 = models.Label.objects.all()[0] 523 self.assertEqual(label1.invalid, 0) 524 525 host2 = models.Host.objects.all()[1] 526 shard1 = models.Shard.objects.create(hostname='shard1') 527 host2.shard = shard1 528 host2.labels.add(label1) 529 host2.save() 530 531 mock_afe = self.god.create_mock_class_obj(frontend_wrappers.RetryingAFE, 532 'MockAFE') 533 self.god.stub_with(frontend_wrappers, 'RetryingAFE', mock_afe) 534 535 mock_afe1 = frontend_wrappers.RetryingAFE.expect_new( 536 server='shard1', user=None) 537 mock_afe1.run.expect_call('modify_label', id=label1.id, invalid=1) 538 539 rpc_interface.modify_label(label1.id, invalid=1) 540 541 self.assertEqual(models.Label.objects.all()[0].invalid, 1) 542 self.god.check_playback() 543 544 545 def test_delete_label(self): 546 label1 = models.Label.objects.all()[0] 547 548 host2 = models.Host.objects.all()[1] 549 shard1 = models.Shard.objects.create(hostname='shard1') 550 host2.shard = shard1 551 host2.labels.add(label1) 552 host2.save() 553 554 mock_afe = self.god.create_mock_class_obj(frontend_wrappers.RetryingAFE, 555 'MockAFE') 556 self.god.stub_with(frontend_wrappers, 'RetryingAFE', mock_afe) 557 558 mock_afe1 = frontend_wrappers.RetryingAFE.expect_new( 559 server='shard1', user=None) 560 mock_afe1.run.expect_call('delete_label', id=label1.id) 561 562 rpc_interface.delete_label(id=label1.id) 563 564 self.assertRaises(models.Label.DoesNotExist, 565 models.Label.smart_get, label1.id) 566 self.god.check_playback() 567 568 569 def test_get_image_for_job_with_keyval_build(self): 570 keyval_dict = {'build': 'cool-image'} 571 job_id = rpc_interface.create_job(name='test', 572 priority=priorities.Priority.DEFAULT, 573 control_file='foo', 574 control_type=CLIENT, 575 hosts=['host1'], 576 keyvals=keyval_dict) 577 job = models.Job.objects.get(id=job_id) 578 self.assertIsNotNone(job) 579 image = rpc_interface._get_image_for_job(job, True) 580 self.assertEquals('cool-image', image) 581 582 583 def test_get_image_for_job_with_keyval_builds(self): 584 keyval_dict = {'builds': {'cros-version': 'cool-image'}} 585 job_id = rpc_interface.create_job(name='test', 586 priority=priorities.Priority.DEFAULT, 587 control_file='foo', 588 control_type=CLIENT, 589 hosts=['host1'], 590 keyvals=keyval_dict) 591 job = models.Job.objects.get(id=job_id) 592 self.assertIsNotNone(job) 593 image = rpc_interface._get_image_for_job(job, True) 594 self.assertEquals('cool-image', image) 595 596 597 def test_get_image_for_job_with_control_build(self): 598 CONTROL_FILE = """build='cool-image' 599 """ 600 job_id = rpc_interface.create_job(name='test', 601 priority=priorities.Priority.DEFAULT, 602 control_file='foo', 603 control_type=CLIENT, 604 hosts=['host1']) 605 job = models.Job.objects.get(id=job_id) 606 self.assertIsNotNone(job) 607 job.control_file = CONTROL_FILE 608 image = rpc_interface._get_image_for_job(job, True) 609 self.assertEquals('cool-image', image) 610 611 612 def test_get_image_for_job_with_control_builds(self): 613 CONTROL_FILE = """builds={'cros-version': 'cool-image'} 614 """ 615 job_id = rpc_interface.create_job(name='test', 616 priority=priorities.Priority.DEFAULT, 617 control_file='foo', 618 control_type=CLIENT, 619 hosts=['host1']) 620 job = models.Job.objects.get(id=job_id) 621 self.assertIsNotNone(job) 622 job.control_file = CONTROL_FILE 623 image = rpc_interface._get_image_for_job(job, True) 624 self.assertEquals('cool-image', image) 625 626 627class ExtraRpcInterfaceTest(mox.MoxTestBase, 628 frontend_test_utils.FrontendTestMixin): 629 """Unit tests for functions originally in site_rpc_interface.py. 630 631 @var _NAME: fake suite name. 632 @var _BOARD: fake board to reimage. 633 @var _BUILD: fake build with which to reimage. 634 @var _PRIORITY: fake priority with which to reimage. 635 """ 636 _NAME = 'name' 637 _BOARD = 'link' 638 _BUILD = 'link-release/R36-5812.0.0' 639 _BUILDS = {provision.CROS_VERSION_PREFIX: _BUILD} 640 _PRIORITY = priorities.Priority.DEFAULT 641 _TIMEOUT = 24 642 643 644 def setUp(self): 645 super(ExtraRpcInterfaceTest, self).setUp() 646 self._SUITE_NAME = rpc_interface.canonicalize_suite_name( 647 self._NAME) 648 self.dev_server = self.mox.CreateMock(dev_server.ImageServer) 649 self._frontend_common_setup(fill_data=False) 650 651 652 def tearDown(self): 653 self._frontend_common_teardown() 654 655 656 def _setupDevserver(self): 657 self.mox.StubOutClassWithMocks(dev_server, 'ImageServer') 658 dev_server.resolve(self._BUILD).AndReturn(self.dev_server) 659 660 661 def _mockDevServerGetter(self, get_control_file=True): 662 self._setupDevserver() 663 if get_control_file: 664 self.getter = self.mox.CreateMock( 665 control_file_getter.DevServerGetter) 666 self.mox.StubOutWithMock(control_file_getter.DevServerGetter, 667 'create') 668 control_file_getter.DevServerGetter.create( 669 mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(self.getter) 670 671 672 def _mockRpcUtils(self, to_return, control_file_substring=''): 673 """Fake out the autotest rpc_utils module with a mockable class. 674 675 @param to_return: the value that rpc_utils.create_job_common() should 676 be mocked out to return. 677 @param control_file_substring: A substring that is expected to appear 678 in the control file output string that 679 is passed to create_job_common. 680 Default: '' 681 """ 682 download_started_time = constants.DOWNLOAD_STARTED_TIME 683 payload_finished_time = constants.PAYLOAD_FINISHED_TIME 684 self.mox.StubOutWithMock(rpc_utils, 'create_job_common') 685 rpc_utils.create_job_common(mox.And(mox.StrContains(self._NAME), 686 mox.StrContains(self._BUILD)), 687 priority=self._PRIORITY, 688 timeout_mins=self._TIMEOUT*60, 689 max_runtime_mins=self._TIMEOUT*60, 690 control_type='Server', 691 control_file=mox.And(mox.StrContains(self._BOARD), 692 mox.StrContains(self._BUILD), 693 mox.StrContains( 694 control_file_substring)), 695 hostless=True, 696 keyvals=mox.And(mox.In(download_started_time), 697 mox.In(payload_finished_time)) 698 ).AndReturn(to_return) 699 700 701 def testStageBuildFail(self): 702 """Ensure that a failure to stage the desired build fails the RPC.""" 703 self._setupDevserver() 704 705 self.dev_server.hostname = 'mox_url' 706 self.dev_server.stage_artifacts( 707 image=self._BUILD, artifacts=['test_suites']).AndRaise( 708 dev_server.DevServerException()) 709 self.mox.ReplayAll() 710 self.assertRaises(error.StageControlFileFailure, 711 rpc_interface.create_suite_job, 712 name=self._NAME, 713 board=self._BOARD, 714 builds=self._BUILDS, 715 pool=None) 716 717 718 def testGetControlFileFail(self): 719 """Ensure that a failure to get needed control file fails the RPC.""" 720 self._mockDevServerGetter() 721 722 self.dev_server.hostname = 'mox_url' 723 self.dev_server.stage_artifacts( 724 image=self._BUILD, artifacts=['test_suites']).AndReturn(True) 725 726 self.getter.get_control_file_contents_by_name( 727 self._SUITE_NAME).AndReturn(None) 728 self.mox.ReplayAll() 729 self.assertRaises(error.ControlFileEmpty, 730 rpc_interface.create_suite_job, 731 name=self._NAME, 732 board=self._BOARD, 733 builds=self._BUILDS, 734 pool=None) 735 736 737 def testGetControlFileListFail(self): 738 """Ensure that a failure to get needed control file fails the RPC.""" 739 self._mockDevServerGetter() 740 741 self.dev_server.hostname = 'mox_url' 742 self.dev_server.stage_artifacts( 743 image=self._BUILD, artifacts=['test_suites']).AndReturn(True) 744 745 self.getter.get_control_file_contents_by_name( 746 self._SUITE_NAME).AndRaise(error.NoControlFileList()) 747 self.mox.ReplayAll() 748 self.assertRaises(error.NoControlFileList, 749 rpc_interface.create_suite_job, 750 name=self._NAME, 751 board=self._BOARD, 752 builds=self._BUILDS, 753 pool=None) 754 755 756 def testBadNumArgument(self): 757 """Ensure we handle bad values for the |num| argument.""" 758 self.assertRaises(error.SuiteArgumentException, 759 rpc_interface.create_suite_job, 760 name=self._NAME, 761 board=self._BOARD, 762 builds=self._BUILDS, 763 pool=None, 764 num='goo') 765 self.assertRaises(error.SuiteArgumentException, 766 rpc_interface.create_suite_job, 767 name=self._NAME, 768 board=self._BOARD, 769 builds=self._BUILDS, 770 pool=None, 771 num=[]) 772 self.assertRaises(error.SuiteArgumentException, 773 rpc_interface.create_suite_job, 774 name=self._NAME, 775 board=self._BOARD, 776 builds=self._BUILDS, 777 pool=None, 778 num='5') 779 780 781 782 def testCreateSuiteJobFail(self): 783 """Ensure that failure to schedule the suite job fails the RPC.""" 784 self._mockDevServerGetter() 785 786 self.dev_server.hostname = 'mox_url' 787 self.dev_server.stage_artifacts( 788 image=self._BUILD, artifacts=['test_suites']).AndReturn(True) 789 790 self.getter.get_control_file_contents_by_name( 791 self._SUITE_NAME).AndReturn('f') 792 793 self.dev_server.url().AndReturn('mox_url') 794 self._mockRpcUtils(-1) 795 self.mox.ReplayAll() 796 self.assertEquals( 797 rpc_interface.create_suite_job(name=self._NAME, 798 board=self._BOARD, 799 builds=self._BUILDS, pool=None), 800 -1) 801 802 803 def testCreateSuiteJobSuccess(self): 804 """Ensures that success results in a successful RPC.""" 805 self._mockDevServerGetter() 806 807 self.dev_server.hostname = 'mox_url' 808 self.dev_server.stage_artifacts( 809 image=self._BUILD, artifacts=['test_suites']).AndReturn(True) 810 811 self.getter.get_control_file_contents_by_name( 812 self._SUITE_NAME).AndReturn('f') 813 814 self.dev_server.url().AndReturn('mox_url') 815 job_id = 5 816 self._mockRpcUtils(job_id) 817 self.mox.ReplayAll() 818 self.assertEquals( 819 rpc_interface.create_suite_job(name=self._NAME, 820 board=self._BOARD, 821 builds=self._BUILDS, 822 pool=None), 823 job_id) 824 825 826 def testCreateSuiteJobNoHostCheckSuccess(self): 827 """Ensures that success results in a successful RPC.""" 828 self._mockDevServerGetter() 829 830 self.dev_server.hostname = 'mox_url' 831 self.dev_server.stage_artifacts( 832 image=self._BUILD, artifacts=['test_suites']).AndReturn(True) 833 834 self.getter.get_control_file_contents_by_name( 835 self._SUITE_NAME).AndReturn('f') 836 837 self.dev_server.url().AndReturn('mox_url') 838 job_id = 5 839 self._mockRpcUtils(job_id) 840 self.mox.ReplayAll() 841 self.assertEquals( 842 rpc_interface.create_suite_job(name=self._NAME, 843 board=self._BOARD, 844 builds=self._BUILDS, 845 pool=None, check_hosts=False), 846 job_id) 847 848 def testCreateSuiteIntegerNum(self): 849 """Ensures that success results in a successful RPC.""" 850 self._mockDevServerGetter() 851 852 self.dev_server.hostname = 'mox_url' 853 self.dev_server.stage_artifacts( 854 image=self._BUILD, artifacts=['test_suites']).AndReturn(True) 855 856 self.getter.get_control_file_contents_by_name( 857 self._SUITE_NAME).AndReturn('f') 858 859 self.dev_server.url().AndReturn('mox_url') 860 job_id = 5 861 self._mockRpcUtils(job_id, control_file_substring='num=17') 862 self.mox.ReplayAll() 863 self.assertEquals( 864 rpc_interface.create_suite_job(name=self._NAME, 865 board=self._BOARD, 866 builds=self._BUILDS, 867 pool=None, 868 check_hosts=False, 869 num=17), 870 job_id) 871 872 873 def testCreateSuiteJobControlFileSupplied(self): 874 """Ensure we can supply the control file to create_suite_job.""" 875 self._mockDevServerGetter(get_control_file=False) 876 877 self.dev_server.hostname = 'mox_url' 878 self.dev_server.stage_artifacts( 879 image=self._BUILD, artifacts=['test_suites']).AndReturn(True) 880 self.dev_server.url().AndReturn('mox_url') 881 job_id = 5 882 self._mockRpcUtils(job_id) 883 self.mox.ReplayAll() 884 self.assertEquals( 885 rpc_interface.create_suite_job(name='%s/%s' % (self._NAME, 886 self._BUILD), 887 board=None, 888 builds=self._BUILDS, 889 pool=None, 890 control_file='CONTROL FILE'), 891 job_id) 892 893 894 def _get_records_for_sending_to_master(self): 895 return [{'control_file': 'foo', 896 'control_type': 1, 897 'created_on': datetime.datetime(2014, 8, 21), 898 'drone_set': None, 899 'email_list': '', 900 'max_runtime_hrs': 72, 901 'max_runtime_mins': 1440, 902 'name': 'dummy', 903 'owner': 'autotest_system', 904 'parse_failed_repair': True, 905 'priority': 40, 906 'reboot_after': 0, 907 'reboot_before': 1, 908 'run_reset': True, 909 'run_verify': False, 910 'synch_count': 0, 911 'test_retry': 10, 912 'timeout': 24, 913 'timeout_mins': 1440, 914 'id': 1 915 }], [{ 916 'aborted': False, 917 'active': False, 918 'complete': False, 919 'deleted': False, 920 'execution_subdir': '', 921 'finished_on': None, 922 'started_on': None, 923 'status': 'Queued', 924 'id': 1 925 }] 926 927 928 def _do_heartbeat_and_assert_response(self, shard_hostname='shard1', 929 upload_jobs=(), upload_hqes=(), 930 known_jobs=(), known_hosts=(), 931 **kwargs): 932 known_job_ids = [job.id for job in known_jobs] 933 known_host_ids = [host.id for host in known_hosts] 934 known_host_statuses = [host.status for host in known_hosts] 935 936 retval = rpc_interface.shard_heartbeat( 937 shard_hostname=shard_hostname, 938 jobs=upload_jobs, hqes=upload_hqes, 939 known_job_ids=known_job_ids, known_host_ids=known_host_ids, 940 known_host_statuses=known_host_statuses) 941 942 self._assert_shard_heartbeat_response(shard_hostname, retval, 943 **kwargs) 944 945 return shard_hostname 946 947 948 def _assert_shard_heartbeat_response(self, shard_hostname, retval, jobs=[], 949 hosts=[], hqes=[]): 950 951 retval_hosts, retval_jobs = retval['hosts'], retval['jobs'] 952 953 expected_jobs = [ 954 (job.id, job.name, shard_hostname) for job in jobs] 955 returned_jobs = [(job['id'], job['name'], job['shard']['hostname']) 956 for job in retval_jobs] 957 self.assertEqual(returned_jobs, expected_jobs) 958 959 expected_hosts = [(host.id, host.hostname) for host in hosts] 960 returned_hosts = [(host['id'], host['hostname']) 961 for host in retval_hosts] 962 self.assertEqual(returned_hosts, expected_hosts) 963 964 retval_hqes = [] 965 for job in retval_jobs: 966 retval_hqes += job['hostqueueentry_set'] 967 968 expected_hqes = [(hqe.id) for hqe in hqes] 969 returned_hqes = [(hqe['id']) for hqe in retval_hqes] 970 self.assertEqual(returned_hqes, expected_hqes) 971 972 973 def _send_records_to_master_helper( 974 self, jobs, hqes, shard_hostname='host1', 975 exception_to_throw=error.UnallowedRecordsSentToMaster, aborted=False): 976 job_id = rpc_interface.create_job( 977 name='dummy', 978 priority=self._PRIORITY, 979 control_file='foo', 980 control_type=SERVER, 981 test_retry=10, hostless=True) 982 job = models.Job.objects.get(pk=job_id) 983 shard = models.Shard.objects.create(hostname='host1') 984 job.shard = shard 985 job.save() 986 987 if aborted: 988 job.hostqueueentry_set.update(aborted=True) 989 job.shard = None 990 job.save() 991 992 hqe = job.hostqueueentry_set.all()[0] 993 if not exception_to_throw: 994 self._do_heartbeat_and_assert_response( 995 shard_hostname=shard_hostname, 996 upload_jobs=jobs, upload_hqes=hqes) 997 else: 998 self.assertRaises( 999 exception_to_throw, 1000 self._do_heartbeat_and_assert_response, 1001 shard_hostname=shard_hostname, 1002 upload_jobs=jobs, upload_hqes=hqes) 1003 1004 1005 def testSendingRecordsToMaster(self): 1006 """Send records to the master and ensure they are persisted.""" 1007 jobs, hqes = self._get_records_for_sending_to_master() 1008 hqes[0]['status'] = 'Completed' 1009 self._send_records_to_master_helper( 1010 jobs=jobs, hqes=hqes, exception_to_throw=None) 1011 1012 # Check the entry was actually written to db 1013 self.assertEqual(models.HostQueueEntry.objects.all()[0].status, 1014 'Completed') 1015 1016 1017 def testSendingRecordsToMasterAbortedOnMaster(self): 1018 """Send records to the master and ensure they are persisted.""" 1019 jobs, hqes = self._get_records_for_sending_to_master() 1020 hqes[0]['status'] = 'Completed' 1021 self._send_records_to_master_helper( 1022 jobs=jobs, hqes=hqes, exception_to_throw=None, aborted=True) 1023 1024 # Check the entry was actually written to db 1025 self.assertEqual(models.HostQueueEntry.objects.all()[0].status, 1026 'Completed') 1027 1028 1029 def testSendingRecordsToMasterJobAssignedToDifferentShard(self): 1030 """Ensure records that belong to a different shard are rejected.""" 1031 jobs, hqes = self._get_records_for_sending_to_master() 1032 models.Shard.objects.create(hostname='other_shard') 1033 self._send_records_to_master_helper( 1034 jobs=jobs, hqes=hqes, shard_hostname='other_shard') 1035 1036 1037 def testSendingRecordsToMasterJobHqeWithoutJob(self): 1038 """Ensure update for hqe without update for it's job gets rejected.""" 1039 _, hqes = self._get_records_for_sending_to_master() 1040 self._send_records_to_master_helper( 1041 jobs=[], hqes=hqes) 1042 1043 1044 def testSendingRecordsToMasterNotExistingJob(self): 1045 """Ensure update for non existing job gets rejected.""" 1046 jobs, hqes = self._get_records_for_sending_to_master() 1047 jobs[0]['id'] = 3 1048 1049 self._send_records_to_master_helper( 1050 jobs=jobs, hqes=hqes) 1051 1052 1053 def _createShardAndHostWithLabel(self, shard_hostname='shard1', 1054 host_hostname='host1', 1055 label_name='board:lumpy'): 1056 label = models.Label.objects.create(name=label_name) 1057 1058 shard = models.Shard.objects.create(hostname=shard_hostname) 1059 shard.labels.add(label) 1060 1061 host = models.Host.objects.create(hostname=host_hostname, leased=False) 1062 host.labels.add(label) 1063 1064 return shard, host, label 1065 1066 1067 def _createJobForLabel(self, label): 1068 job_id = rpc_interface.create_job(name='dummy', priority=self._PRIORITY, 1069 control_file='foo', 1070 control_type=CLIENT, 1071 meta_hosts=[label.name], 1072 dependencies=(label.name,)) 1073 return models.Job.objects.get(id=job_id) 1074 1075 1076 def testShardHeartbeatFetchHostlessJob(self): 1077 """Create a hostless job and ensure it's not assigned to a shard.""" 1078 shard1, host1, lumpy_label = self._createShardAndHostWithLabel( 1079 'shard1', 'host1', 'board:lumpy') 1080 1081 label2 = models.Label.objects.create(name='bluetooth', platform=False) 1082 1083 job1 = self._create_job(hostless=True) 1084 1085 # Hostless jobs should be executed by the global scheduler. 1086 self._do_heartbeat_and_assert_response(hosts=[host1]) 1087 1088 1089 def testShardRetrieveJobs(self): 1090 """Create jobs and retrieve them.""" 1091 # should never be returned by heartbeat 1092 leased_host = models.Host.objects.create(hostname='leased_host', 1093 leased=True) 1094 1095 shard1, host1, lumpy_label = self._createShardAndHostWithLabel() 1096 shard2, host2, grumpy_label = self._createShardAndHostWithLabel( 1097 'shard2', 'host2', 'board:grumpy') 1098 1099 leased_host.labels.add(lumpy_label) 1100 1101 job1 = self._createJobForLabel(lumpy_label) 1102 1103 job2 = self._createJobForLabel(grumpy_label) 1104 1105 job_completed = self._createJobForLabel(lumpy_label) 1106 # Job is already being run, so don't sync it 1107 job_completed.hostqueueentry_set.update(complete=True) 1108 job_completed.hostqueueentry_set.create(complete=False) 1109 1110 job_active = self._createJobForLabel(lumpy_label) 1111 # Job is already started, so don't sync it 1112 job_active.hostqueueentry_set.update(active=True) 1113 job_active.hostqueueentry_set.create(complete=False, active=False) 1114 1115 self._do_heartbeat_and_assert_response( 1116 jobs=[job1], hosts=[host1], hqes=job1.hostqueueentry_set.all()) 1117 1118 self._do_heartbeat_and_assert_response( 1119 shard_hostname=shard2.hostname, 1120 jobs=[job2], hosts=[host2], hqes=job2.hostqueueentry_set.all()) 1121 1122 host3 = models.Host.objects.create(hostname='host3', leased=False) 1123 host3.labels.add(lumpy_label) 1124 1125 self._do_heartbeat_and_assert_response( 1126 known_jobs=[job1], known_hosts=[host1], hosts=[host3]) 1127 1128 1129 def testResendJobsAfterFailedHeartbeat(self): 1130 """Create jobs, retrieve them, fail on client, fetch them again.""" 1131 shard1, host1, lumpy_label = self._createShardAndHostWithLabel() 1132 1133 job1 = self._createJobForLabel(lumpy_label) 1134 1135 self._do_heartbeat_and_assert_response( 1136 jobs=[job1], 1137 hqes=job1.hostqueueentry_set.all(), hosts=[host1]) 1138 1139 # Make sure it's resubmitted by sending last_job=None again 1140 self._do_heartbeat_and_assert_response( 1141 known_hosts=[host1], 1142 jobs=[job1], hqes=job1.hostqueueentry_set.all(), hosts=[]) 1143 1144 # Now it worked, make sure it's not sent again 1145 self._do_heartbeat_and_assert_response( 1146 known_jobs=[job1], known_hosts=[host1]) 1147 1148 job1 = models.Job.objects.get(pk=job1.id) 1149 job1.hostqueueentry_set.all().update(complete=True) 1150 1151 # Job is completed, make sure it's not sent again 1152 self._do_heartbeat_and_assert_response( 1153 known_hosts=[host1]) 1154 1155 job2 = self._createJobForLabel(lumpy_label) 1156 1157 # job2's creation was later, it should be returned now. 1158 self._do_heartbeat_and_assert_response( 1159 known_hosts=[host1], 1160 jobs=[job2], hqes=job2.hostqueueentry_set.all()) 1161 1162 self._do_heartbeat_and_assert_response( 1163 known_jobs=[job2], known_hosts=[host1]) 1164 1165 job2 = models.Job.objects.get(pk=job2.pk) 1166 job2.hostqueueentry_set.update(aborted=True) 1167 # Setting a job to a complete status will set the shard_id to None in 1168 # scheduler_models. We have to emulate that here, because we use Django 1169 # models in tests. 1170 job2.shard = None 1171 job2.save() 1172 1173 self._do_heartbeat_and_assert_response( 1174 known_jobs=[job2], known_hosts=[host1], 1175 jobs=[job2], 1176 hqes=job2.hostqueueentry_set.all()) 1177 1178 models.Test.objects.create(name='platform_BootPerfServer:shard', 1179 test_type=1) 1180 self.mox.StubOutWithMock(server_utils, 'read_file') 1181 server_utils.read_file(mox.IgnoreArg()).AndReturn('') 1182 self.mox.ReplayAll() 1183 rpc_interface.delete_shard(hostname=shard1.hostname) 1184 1185 self.assertRaises( 1186 models.Shard.DoesNotExist, models.Shard.objects.get, pk=shard1.id) 1187 1188 job1 = models.Job.objects.get(pk=job1.id) 1189 lumpy_label = models.Label.objects.get(pk=lumpy_label.id) 1190 host1 = models.Host.objects.get(pk=host1.id) 1191 super_job = models.Job.objects.get(priority=priorities.Priority.SUPER) 1192 super_job_host = models.HostQueueEntry.objects.get( 1193 job_id=super_job.id) 1194 1195 self.assertIsNone(job1.shard) 1196 self.assertEqual(len(lumpy_label.shard_set.all()), 0) 1197 self.assertIsNone(host1.shard) 1198 self.assertIsNotNone(super_job) 1199 self.assertEqual(super_job_host.host_id, host1.id) 1200 1201 1202 def testCreateListShard(self): 1203 """Retrieve a list of all shards.""" 1204 lumpy_label = models.Label.objects.create(name='board:lumpy', 1205 platform=True) 1206 stumpy_label = models.Label.objects.create(name='board:stumpy', 1207 platform=True) 1208 peppy_label = models.Label.objects.create(name='board:peppy', 1209 platform=True) 1210 1211 shard_id = rpc_interface.add_shard( 1212 hostname='host1', labels='board:lumpy,board:stumpy') 1213 self.assertRaises(error.RPCException, 1214 rpc_interface.add_shard, 1215 hostname='host1', labels='board:lumpy,board:stumpy') 1216 self.assertRaises(model_logic.ValidationError, 1217 rpc_interface.add_shard, 1218 hostname='host1', labels='board:peppy') 1219 shard = models.Shard.objects.get(pk=shard_id) 1220 self.assertEqual(shard.hostname, 'host1') 1221 self.assertEqual(shard.labels.values_list('pk')[0], (lumpy_label.id,)) 1222 self.assertEqual(shard.labels.values_list('pk')[1], (stumpy_label.id,)) 1223 1224 self.assertEqual(rpc_interface.get_shards(), 1225 [{'labels': ['board:lumpy','board:stumpy'], 1226 'hostname': 'host1', 1227 'id': 1}]) 1228 1229 1230 def testAddBoardsToShard(self): 1231 """Add boards to a given shard.""" 1232 shard1, host1, lumpy_label = self._createShardAndHostWithLabel() 1233 stumpy_label = models.Label.objects.create(name='board:stumpy', 1234 platform=True) 1235 shard_id = rpc_interface.add_board_to_shard( 1236 hostname='shard1', labels='board:stumpy') 1237 # Test whether raise exception when board label does not exist. 1238 self.assertRaises(models.Label.DoesNotExist, 1239 rpc_interface.add_board_to_shard, 1240 hostname='shard1', labels='board:test') 1241 # Test whether raise exception when board already sharded. 1242 self.assertRaises(error.RPCException, 1243 rpc_interface.add_board_to_shard, 1244 hostname='shard1', labels='board:lumpy') 1245 shard = models.Shard.objects.get(pk=shard_id) 1246 self.assertEqual(shard.hostname, 'shard1') 1247 self.assertEqual(shard.labels.values_list('pk')[0], (lumpy_label.id,)) 1248 self.assertEqual(shard.labels.values_list('pk')[1], (stumpy_label.id,)) 1249 1250 self.assertEqual(rpc_interface.get_shards(), 1251 [{'labels': ['board:lumpy','board:stumpy'], 1252 'hostname': 'shard1', 1253 'id': 1}]) 1254 1255 1256 def testResendHostsAfterFailedHeartbeat(self): 1257 """Check that master accepts resending updated records after failure.""" 1258 shard1, host1, lumpy_label = self._createShardAndHostWithLabel() 1259 1260 # Send the host 1261 self._do_heartbeat_and_assert_response(hosts=[host1]) 1262 1263 # Send it again because previous one didn't persist correctly 1264 self._do_heartbeat_and_assert_response(hosts=[host1]) 1265 1266 # Now it worked, make sure it isn't sent again 1267 self._do_heartbeat_and_assert_response(known_hosts=[host1]) 1268 1269 1270if __name__ == '__main__': 1271 unittest.main() 1272