1#!/usr/bin/env python2
2# pylint: disable=missing-docstring
3
4import datetime
5import mox
6import unittest
7
8import common
9from autotest_lib.client.common_lib import control_data
10from autotest_lib.client.common_lib import error
11from autotest_lib.client.common_lib import global_config
12from autotest_lib.client.common_lib import priorities
13from autotest_lib.client.common_lib.cros import dev_server
14from autotest_lib.client.common_lib.test_utils import mock
15from autotest_lib.frontend import setup_django_environment
16from autotest_lib.frontend.afe import frontend_test_utils
17from autotest_lib.frontend.afe import model_logic
18from autotest_lib.frontend.afe import models
19from autotest_lib.frontend.afe import rpc_interface
20from autotest_lib.frontend.afe import rpc_utils
21from autotest_lib.server import frontend
22from autotest_lib.server import utils as server_utils
23from autotest_lib.server.cros import provision
24from autotest_lib.server.cros.dynamic_suite import constants
25from autotest_lib.server.cros.dynamic_suite import control_file_getter
26from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
27
28CLIENT = control_data.CONTROL_TYPE_NAMES.CLIENT
29SERVER = control_data.CONTROL_TYPE_NAMES.SERVER
30
31_hqe_status = models.HostQueueEntry.Status
32
33
34class RpcInterfaceTest(unittest.TestCase,
35                       frontend_test_utils.FrontendTestMixin):
36    def setUp(self):
37        self._frontend_common_setup()
38        self.god = mock.mock_god()
39
40
41    def tearDown(self):
42        self.god.unstub_all()
43        self._frontend_common_teardown()
44        global_config.global_config.reset_config_values()
45
46
47    def test_validation(self):
48        # omit a required field
49        self.assertRaises(model_logic.ValidationError, rpc_interface.add_label,
50                          name=None)
51        # violate uniqueness constraint
52        self.assertRaises(model_logic.ValidationError, rpc_interface.add_host,
53                          hostname='host1')
54
55
56    def test_multiple_platforms(self):
57        platform2 = models.Label.objects.create(name='platform2', platform=True)
58        self.assertRaises(model_logic.ValidationError,
59                          rpc_interface. label_add_hosts, id='platform2',
60                          hosts=['host1', 'host2'])
61        self.assertRaises(model_logic.ValidationError,
62                          rpc_interface.host_add_labels,
63                          id='host1', labels=['platform2'])
64        # make sure the platform didn't get added
65        platforms = rpc_interface.get_labels(
66            host__hostname__in=['host1', 'host2'], platform=True)
67        self.assertEquals(len(platforms), 1)
68        self.assertEquals(platforms[0]['name'], 'myplatform')
69
70
71    def _check_hostnames(self, hosts, expected_hostnames):
72        self.assertEquals(set(host['hostname'] for host in hosts),
73                          set(expected_hostnames))
74
75
76    def test_get_hosts(self):
77        hosts = rpc_interface.get_hosts()
78        self._check_hostnames(hosts, [host.hostname for host in self.hosts])
79
80        hosts = rpc_interface.get_hosts(hostname='host1')
81        self._check_hostnames(hosts, ['host1'])
82        host = hosts[0]
83        self.assertEquals(sorted(host['labels']), ['label1', 'myplatform'])
84        self.assertEquals(host['platform'], 'myplatform')
85        self.assertEquals(host['acls'], ['my_acl'])
86        self.assertEquals(host['attributes'], {})
87
88
89    def test_get_hosts_multiple_labels(self):
90        hosts = rpc_interface.get_hosts(
91                multiple_labels=['myplatform', 'label1'])
92        self._check_hostnames(hosts, ['host1'])
93
94
95    def test_get_hosts_exclude_only_if_needed(self):
96        self.hosts[0].labels.add(self.label3)
97
98        hosts = rpc_interface.get_hosts(hostname__in=['host1', 'host2'],
99                                        exclude_only_if_needed_labels=True)
100        self._check_hostnames(hosts, ['host2'])
101
102
103    def test_job_keyvals(self):
104        keyval_dict = {'mykey': 'myvalue'}
105        job_id = rpc_interface.create_job(name='test',
106                                          priority=priorities.Priority.DEFAULT,
107                                          control_file='foo',
108                                          control_type=CLIENT,
109                                          hosts=['host1'],
110                                          keyvals=keyval_dict)
111        jobs = rpc_interface.get_jobs(id=job_id)
112        self.assertEquals(len(jobs), 1)
113        self.assertEquals(jobs[0]['keyvals'], keyval_dict)
114
115
116    def test_test_retry(self):
117        job_id = rpc_interface.create_job(name='flake',
118                                          priority=priorities.Priority.DEFAULT,
119                                          control_file='foo',
120                                          control_type=CLIENT,
121                                          hosts=['host1'],
122                                          test_retry=10)
123        jobs = rpc_interface.get_jobs(id=job_id)
124        self.assertEquals(len(jobs), 1)
125        self.assertEquals(jobs[0]['test_retry'], 10)
126
127
128    def test_get_jobs_summary(self):
129        job = self._create_job(hosts=xrange(1, 4))
130        entries = list(job.hostqueueentry_set.all())
131        entries[1].status = _hqe_status.FAILED
132        entries[1].save()
133        entries[2].status = _hqe_status.FAILED
134        entries[2].aborted = True
135        entries[2].save()
136
137        # Mock up tko_rpc_interface.get_status_counts.
138        self.god.stub_function_to_return(rpc_interface.tko_rpc_interface,
139                                         'get_status_counts',
140                                         None)
141
142        job_summaries = rpc_interface.get_jobs_summary(id=job.id)
143        self.assertEquals(len(job_summaries), 1)
144        summary = job_summaries[0]
145        self.assertEquals(summary['status_counts'], {'Queued': 1,
146                                                     'Failed': 2})
147
148
149    def _check_job_ids(self, actual_job_dicts, expected_jobs):
150        self.assertEquals(
151                set(job_dict['id'] for job_dict in actual_job_dicts),
152                set(job.id for job in expected_jobs))
153
154
155    def test_get_jobs_status_filters(self):
156        HqeStatus = models.HostQueueEntry.Status
157        def create_two_host_job():
158            return self._create_job(hosts=[1, 2])
159        def set_hqe_statuses(job, first_status, second_status):
160            entries = job.hostqueueentry_set.all()
161            entries[0].update_object(status=first_status)
162            entries[1].update_object(status=second_status)
163
164        queued = create_two_host_job()
165
166        queued_and_running = create_two_host_job()
167        set_hqe_statuses(queued_and_running, HqeStatus.QUEUED,
168                           HqeStatus.RUNNING)
169
170        running_and_complete = create_two_host_job()
171        set_hqe_statuses(running_and_complete, HqeStatus.RUNNING,
172                           HqeStatus.COMPLETED)
173
174        complete = create_two_host_job()
175        set_hqe_statuses(complete, HqeStatus.COMPLETED, HqeStatus.COMPLETED)
176
177        started_but_inactive = create_two_host_job()
178        set_hqe_statuses(started_but_inactive, HqeStatus.QUEUED,
179                           HqeStatus.COMPLETED)
180
181        parsing = create_two_host_job()
182        set_hqe_statuses(parsing, HqeStatus.PARSING, HqeStatus.PARSING)
183
184        self._check_job_ids(rpc_interface.get_jobs(not_yet_run=True), [queued])
185        self._check_job_ids(rpc_interface.get_jobs(running=True),
186                      [queued_and_running, running_and_complete,
187                       started_but_inactive, parsing])
188        self._check_job_ids(rpc_interface.get_jobs(finished=True), [complete])
189
190
191    def test_get_jobs_type_filters(self):
192        self.assertRaises(AssertionError, rpc_interface.get_jobs,
193                          suite=True, sub=True)
194        self.assertRaises(AssertionError, rpc_interface.get_jobs,
195                          suite=True, standalone=True)
196        self.assertRaises(AssertionError, rpc_interface.get_jobs,
197                          standalone=True, sub=True)
198
199        parent_job = self._create_job(hosts=[1])
200        child_jobs = self._create_job(hosts=[1, 2],
201                                      parent_job_id=parent_job.id)
202        standalone_job = self._create_job(hosts=[1])
203
204        self._check_job_ids(rpc_interface.get_jobs(suite=True), [parent_job])
205        self._check_job_ids(rpc_interface.get_jobs(sub=True), [child_jobs])
206        self._check_job_ids(rpc_interface.get_jobs(standalone=True),
207                            [standalone_job])
208
209
210    def _create_job_helper(self, **kwargs):
211        return rpc_interface.create_job(name='test',
212                                        priority=priorities.Priority.DEFAULT,
213                                        control_file='control file',
214                                        control_type=SERVER, **kwargs)
215
216
217    def test_one_time_hosts(self):
218        job = self._create_job_helper(one_time_hosts=['testhost'])
219        host = models.Host.objects.get(hostname='testhost')
220        self.assertEquals(host.invalid, True)
221        self.assertEquals(host.labels.count(), 0)
222        self.assertEquals(host.aclgroup_set.count(), 0)
223
224
225    def test_create_job_duplicate_hosts(self):
226        self.assertRaises(model_logic.ValidationError, self._create_job_helper,
227                          hosts=[1, 1])
228
229
230    def test_create_unrunnable_metahost_job(self):
231        self.assertRaises(error.NoEligibleHostException,
232                          self._create_job_helper, meta_hosts=['unused'])
233
234
235    def test_create_hostless_job(self):
236        job_id = self._create_job_helper(hostless=True)
237        job = models.Job.objects.get(pk=job_id)
238        queue_entries = job.hostqueueentry_set.all()
239        self.assertEquals(len(queue_entries), 1)
240        self.assertEquals(queue_entries[0].host, None)
241        self.assertEquals(queue_entries[0].meta_host, None)
242
243
244    def _setup_special_tasks(self):
245        host = self.hosts[0]
246
247        job1 = self._create_job(hosts=[1])
248        job2 = self._create_job(hosts=[1])
249
250        entry1 = job1.hostqueueentry_set.all()[0]
251        entry1.update_object(started_on=datetime.datetime(2009, 1, 2),
252                             execution_subdir='host1')
253        entry2 = job2.hostqueueentry_set.all()[0]
254        entry2.update_object(started_on=datetime.datetime(2009, 1, 3),
255                             execution_subdir='host1')
256
257        self.task1 = models.SpecialTask.objects.create(
258                host=host, task=models.SpecialTask.Task.VERIFY,
259                time_started=datetime.datetime(2009, 1, 1), # ran before job 1
260                is_complete=True, requested_by=models.User.current_user())
261        self.task2 = models.SpecialTask.objects.create(
262                host=host, task=models.SpecialTask.Task.VERIFY,
263                queue_entry=entry2, # ran with job 2
264                is_active=True, requested_by=models.User.current_user())
265        self.task3 = models.SpecialTask.objects.create(
266                host=host, task=models.SpecialTask.Task.VERIFY,
267                requested_by=models.User.current_user()) # not yet run
268
269
270    def test_get_special_tasks(self):
271        self._setup_special_tasks()
272        tasks = rpc_interface.get_special_tasks(host__hostname='host1',
273                                                queue_entry__isnull=True)
274        self.assertEquals(len(tasks), 2)
275        self.assertEquals(tasks[0]['task'], models.SpecialTask.Task.VERIFY)
276        self.assertEquals(tasks[0]['is_active'], False)
277        self.assertEquals(tasks[0]['is_complete'], True)
278
279
280    def test_get_latest_special_task(self):
281        # a particular usage of get_special_tasks()
282        self._setup_special_tasks()
283        self.task2.time_started = datetime.datetime(2009, 1, 2)
284        self.task2.save()
285
286        tasks = rpc_interface.get_special_tasks(
287                host__hostname='host1', task=models.SpecialTask.Task.VERIFY,
288                time_started__isnull=False, sort_by=['-time_started'],
289                query_limit=1)
290        self.assertEquals(len(tasks), 1)
291        self.assertEquals(tasks[0]['id'], 2)
292
293
294    def _common_entry_check(self, entry_dict):
295        self.assertEquals(entry_dict['host']['hostname'], 'host1')
296        self.assertEquals(entry_dict['job']['id'], 2)
297
298
299    def test_get_host_queue_entries_and_special_tasks(self):
300        self._setup_special_tasks()
301
302        host = self.hosts[0].id
303        entries_and_tasks = (
304                rpc_interface.get_host_queue_entries_and_special_tasks(host))
305
306        paths = [entry['execution_path'] for entry in entries_and_tasks]
307        self.assertEquals(paths, ['hosts/host1/3-verify',
308                                  '2-autotest_system/host1',
309                                  'hosts/host1/2-verify',
310                                  '1-autotest_system/host1',
311                                  'hosts/host1/1-verify'])
312
313        verify2 = entries_and_tasks[2]
314        self._common_entry_check(verify2)
315        self.assertEquals(verify2['type'], 'Verify')
316        self.assertEquals(verify2['status'], 'Running')
317        self.assertEquals(verify2['execution_path'], 'hosts/host1/2-verify')
318
319        entry2 = entries_and_tasks[1]
320        self._common_entry_check(entry2)
321        self.assertEquals(entry2['type'], 'Job')
322        self.assertEquals(entry2['status'], 'Queued')
323        self.assertEquals(entry2['started_on'], '2009-01-03 00:00:00')
324
325
326    def test_view_invalid_host(self):
327        # RPCs used by View Host page should work for invalid hosts
328        self._create_job_helper(hosts=[1])
329        host = self.hosts[0]
330        host.delete()
331
332        self.assertEquals(1, rpc_interface.get_num_hosts(hostname='host1',
333                                                         valid_only=False))
334        data = rpc_interface.get_hosts(hostname='host1', valid_only=False)
335        self.assertEquals(1, len(data))
336
337        self.assertEquals(1, rpc_interface.get_num_host_queue_entries(
338                host__hostname='host1'))
339        data = rpc_interface.get_host_queue_entries(host__hostname='host1')
340        self.assertEquals(1, len(data))
341
342        count = rpc_interface.get_num_host_queue_entries_and_special_tasks(
343                host=host.id)
344        self.assertEquals(1, count)
345        data = rpc_interface.get_host_queue_entries_and_special_tasks(
346                host=host.id)
347        self.assertEquals(1, len(data))
348
349
350    def test_reverify_hosts(self):
351        hostname_list = rpc_interface.reverify_hosts(id__in=[1, 2])
352        self.assertEquals(hostname_list, ['host1', 'host2'])
353        tasks = rpc_interface.get_special_tasks()
354        self.assertEquals(len(tasks), 2)
355        self.assertEquals(set(task['host']['id'] for task in tasks),
356                          set([1, 2]))
357
358        task = tasks[0]
359        self.assertEquals(task['task'], models.SpecialTask.Task.VERIFY)
360        self.assertEquals(task['requested_by'], 'autotest_system')
361
362
363    def test_repair_hosts(self):
364        hostname_list = rpc_interface.repair_hosts(id__in=[1, 2])
365        self.assertEquals(hostname_list, ['host1', 'host2'])
366        tasks = rpc_interface.get_special_tasks()
367        self.assertEquals(len(tasks), 2)
368        self.assertEquals(set(task['host']['id'] for task in tasks),
369                          set([1, 2]))
370
371        task = tasks[0]
372        self.assertEquals(task['task'], models.SpecialTask.Task.REPAIR)
373        self.assertEquals(task['requested_by'], 'autotest_system')
374
375
376    def _modify_host_helper(self, on_shard=False, host_on_shard=False):
377        shard_hostname = 'shard1'
378        if on_shard:
379            global_config.global_config.override_config_value(
380                'SHARD', 'shard_hostname', shard_hostname)
381
382        host = models.Host.objects.all()[0]
383        if host_on_shard:
384            shard = models.Shard.objects.create(hostname=shard_hostname)
385            host.shard = shard
386            host.save()
387
388        self.assertFalse(host.locked)
389
390        self.god.stub_class_method(frontend.AFE, 'run')
391
392        if host_on_shard and not on_shard:
393            mock_afe = self.god.create_mock_class_obj(
394                    frontend_wrappers.RetryingAFE, 'MockAFE')
395            self.god.stub_with(frontend_wrappers, 'RetryingAFE', mock_afe)
396
397            mock_afe2 = frontend_wrappers.RetryingAFE.expect_new(
398                    server=shard_hostname, user=None)
399            mock_afe2.run.expect_call('modify_host_local', id=host.id,
400                    locked=True, lock_reason='_modify_host_helper lock',
401                    lock_time=datetime.datetime(2015, 12, 15))
402        elif on_shard:
403            mock_afe = self.god.create_mock_class_obj(
404                    frontend_wrappers.RetryingAFE, 'MockAFE')
405            self.god.stub_with(frontend_wrappers, 'RetryingAFE', mock_afe)
406
407            mock_afe2 = frontend_wrappers.RetryingAFE.expect_new(
408                    server=server_utils.get_global_afe_hostname(), user=None)
409            mock_afe2.run.expect_call('modify_host', id=host.id,
410                    locked=True, lock_reason='_modify_host_helper lock',
411                    lock_time=datetime.datetime(2015, 12, 15))
412
413        rpc_interface.modify_host(id=host.id, locked=True,
414                                  lock_reason='_modify_host_helper lock',
415                                  lock_time=datetime.datetime(2015, 12, 15))
416
417        host = models.Host.objects.get(pk=host.id)
418        if on_shard:
419            # modify_host on shard does nothing but routing the RPC to master.
420            self.assertFalse(host.locked)
421        else:
422            self.assertTrue(host.locked)
423        self.god.check_playback()
424
425
426    def test_modify_host_on_master_host_on_master(self):
427        """Call modify_host to master for host in master."""
428        self._modify_host_helper()
429
430
431    def test_modify_host_on_master_host_on_shard(self):
432        """Call modify_host to master for host in shard."""
433        self._modify_host_helper(host_on_shard=True)
434
435
436    def test_modify_host_on_shard(self):
437        """Call modify_host to shard for host in shard."""
438        self._modify_host_helper(on_shard=True, host_on_shard=True)
439
440
441    def test_modify_hosts_on_master_host_on_shard(self):
442        """Ensure calls to modify_hosts are correctly forwarded to shards."""
443        host1 = models.Host.objects.all()[0]
444        host2 = models.Host.objects.all()[1]
445
446        shard1 = models.Shard.objects.create(hostname='shard1')
447        host1.shard = shard1
448        host1.save()
449
450        shard2 = models.Shard.objects.create(hostname='shard2')
451        host2.shard = shard2
452        host2.save()
453
454        self.assertFalse(host1.locked)
455        self.assertFalse(host2.locked)
456
457        mock_afe = self.god.create_mock_class_obj(frontend_wrappers.RetryingAFE,
458                                                  'MockAFE')
459        self.god.stub_with(frontend_wrappers, 'RetryingAFE', mock_afe)
460
461        # The statuses of one host might differ on master and shard.
462        # Filters are always applied on the master. So the host on the shard
463        # will be affected no matter what his status is.
464        filters_to_use = {'status': 'Ready'}
465
466        mock_afe2 = frontend_wrappers.RetryingAFE.expect_new(
467                server='shard2', user=None)
468        mock_afe2.run.expect_call(
469            'modify_hosts_local',
470            host_filter_data={'id__in': [shard1.id, shard2.id]},
471            update_data={'locked': True,
472                         'lock_reason': 'Testing forward to shard',
473                         'lock_time' : datetime.datetime(2015, 12, 15) })
474
475        mock_afe1 = frontend_wrappers.RetryingAFE.expect_new(
476                server='shard1', user=None)
477        mock_afe1.run.expect_call(
478            'modify_hosts_local',
479            host_filter_data={'id__in': [shard1.id, shard2.id]},
480            update_data={'locked': True,
481                         'lock_reason': 'Testing forward to shard',
482                         'lock_time' : datetime.datetime(2015, 12, 15)})
483
484        rpc_interface.modify_hosts(
485                host_filter_data={'status': 'Ready'},
486                update_data={'locked': True,
487                             'lock_reason': 'Testing forward to shard',
488                             'lock_time' : datetime.datetime(2015, 12, 15) })
489
490        host1 = models.Host.objects.get(pk=host1.id)
491        self.assertTrue(host1.locked)
492        host2 = models.Host.objects.get(pk=host2.id)
493        self.assertTrue(host2.locked)
494        self.god.check_playback()
495
496
497    def test_delete_host(self):
498        """Ensure an RPC is made on delete a host, if it is on a shard."""
499        host1 = models.Host.objects.all()[0]
500        shard1 = models.Shard.objects.create(hostname='shard1')
501        host1.shard = shard1
502        host1.save()
503        host1_id = host1.id
504
505        mock_afe = self.god.create_mock_class_obj(frontend_wrappers.RetryingAFE,
506                                                 'MockAFE')
507        self.god.stub_with(frontend_wrappers, 'RetryingAFE', mock_afe)
508
509        mock_afe1 = frontend_wrappers.RetryingAFE.expect_new(
510                server='shard1', user=None)
511        mock_afe1.run.expect_call('delete_host', id=host1.id)
512
513        rpc_interface.delete_host(id=host1.id)
514
515        self.assertRaises(models.Host.DoesNotExist,
516                          models.Host.smart_get, host1_id)
517
518        self.god.check_playback()
519
520
521    def test_modify_label(self):
522        label1 = models.Label.objects.all()[0]
523        self.assertEqual(label1.invalid, 0)
524
525        host2 = models.Host.objects.all()[1]
526        shard1 = models.Shard.objects.create(hostname='shard1')
527        host2.shard = shard1
528        host2.labels.add(label1)
529        host2.save()
530
531        mock_afe = self.god.create_mock_class_obj(frontend_wrappers.RetryingAFE,
532                                                  'MockAFE')
533        self.god.stub_with(frontend_wrappers, 'RetryingAFE', mock_afe)
534
535        mock_afe1 = frontend_wrappers.RetryingAFE.expect_new(
536                server='shard1', user=None)
537        mock_afe1.run.expect_call('modify_label', id=label1.id, invalid=1)
538
539        rpc_interface.modify_label(label1.id, invalid=1)
540
541        self.assertEqual(models.Label.objects.all()[0].invalid, 1)
542        self.god.check_playback()
543
544
545    def test_delete_label(self):
546        label1 = models.Label.objects.all()[0]
547
548        host2 = models.Host.objects.all()[1]
549        shard1 = models.Shard.objects.create(hostname='shard1')
550        host2.shard = shard1
551        host2.labels.add(label1)
552        host2.save()
553
554        mock_afe = self.god.create_mock_class_obj(frontend_wrappers.RetryingAFE,
555                                                  'MockAFE')
556        self.god.stub_with(frontend_wrappers, 'RetryingAFE', mock_afe)
557
558        mock_afe1 = frontend_wrappers.RetryingAFE.expect_new(
559                server='shard1', user=None)
560        mock_afe1.run.expect_call('delete_label', id=label1.id)
561
562        rpc_interface.delete_label(id=label1.id)
563
564        self.assertRaises(models.Label.DoesNotExist,
565                          models.Label.smart_get, label1.id)
566        self.god.check_playback()
567
568
569    def test_get_image_for_job_with_keyval_build(self):
570        keyval_dict = {'build': 'cool-image'}
571        job_id = rpc_interface.create_job(name='test',
572                                          priority=priorities.Priority.DEFAULT,
573                                          control_file='foo',
574                                          control_type=CLIENT,
575                                          hosts=['host1'],
576                                          keyvals=keyval_dict)
577        job = models.Job.objects.get(id=job_id)
578        self.assertIsNotNone(job)
579        image = rpc_interface._get_image_for_job(job, True)
580        self.assertEquals('cool-image', image)
581
582
583    def test_get_image_for_job_with_keyval_builds(self):
584        keyval_dict = {'builds': {'cros-version': 'cool-image'}}
585        job_id = rpc_interface.create_job(name='test',
586                                          priority=priorities.Priority.DEFAULT,
587                                          control_file='foo',
588                                          control_type=CLIENT,
589                                          hosts=['host1'],
590                                          keyvals=keyval_dict)
591        job = models.Job.objects.get(id=job_id)
592        self.assertIsNotNone(job)
593        image = rpc_interface._get_image_for_job(job, True)
594        self.assertEquals('cool-image', image)
595
596
597    def test_get_image_for_job_with_control_build(self):
598        CONTROL_FILE = """build='cool-image'
599        """
600        job_id = rpc_interface.create_job(name='test',
601                                          priority=priorities.Priority.DEFAULT,
602                                          control_file='foo',
603                                          control_type=CLIENT,
604                                          hosts=['host1'])
605        job = models.Job.objects.get(id=job_id)
606        self.assertIsNotNone(job)
607        job.control_file = CONTROL_FILE
608        image = rpc_interface._get_image_for_job(job, True)
609        self.assertEquals('cool-image', image)
610
611
612    def test_get_image_for_job_with_control_builds(self):
613        CONTROL_FILE = """builds={'cros-version': 'cool-image'}
614        """
615        job_id = rpc_interface.create_job(name='test',
616                                          priority=priorities.Priority.DEFAULT,
617                                          control_file='foo',
618                                          control_type=CLIENT,
619                                          hosts=['host1'])
620        job = models.Job.objects.get(id=job_id)
621        self.assertIsNotNone(job)
622        job.control_file = CONTROL_FILE
623        image = rpc_interface._get_image_for_job(job, True)
624        self.assertEquals('cool-image', image)
625
626
627class ExtraRpcInterfaceTest(mox.MoxTestBase,
628                           frontend_test_utils.FrontendTestMixin):
629    """Unit tests for functions originally in site_rpc_interface.py.
630
631    @var _NAME: fake suite name.
632    @var _BOARD: fake board to reimage.
633    @var _BUILD: fake build with which to reimage.
634    @var _PRIORITY: fake priority with which to reimage.
635    """
636    _NAME = 'name'
637    _BOARD = 'link'
638    _BUILD = 'link-release/R36-5812.0.0'
639    _BUILDS = {provision.CROS_VERSION_PREFIX: _BUILD}
640    _PRIORITY = priorities.Priority.DEFAULT
641    _TIMEOUT = 24
642
643
644    def setUp(self):
645        super(ExtraRpcInterfaceTest, self).setUp()
646        self._SUITE_NAME = rpc_interface.canonicalize_suite_name(
647            self._NAME)
648        self.dev_server = self.mox.CreateMock(dev_server.ImageServer)
649        self._frontend_common_setup(fill_data=False)
650
651
652    def tearDown(self):
653        self._frontend_common_teardown()
654
655
656    def _setupDevserver(self):
657        self.mox.StubOutClassWithMocks(dev_server, 'ImageServer')
658        dev_server.resolve(self._BUILD).AndReturn(self.dev_server)
659
660
661    def _mockDevServerGetter(self, get_control_file=True):
662        self._setupDevserver()
663        if get_control_file:
664          self.getter = self.mox.CreateMock(
665              control_file_getter.DevServerGetter)
666          self.mox.StubOutWithMock(control_file_getter.DevServerGetter,
667                                   'create')
668          control_file_getter.DevServerGetter.create(
669              mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(self.getter)
670
671
672    def _mockRpcUtils(self, to_return, control_file_substring=''):
673        """Fake out the autotest rpc_utils module with a mockable class.
674
675        @param to_return: the value that rpc_utils.create_job_common() should
676                          be mocked out to return.
677        @param control_file_substring: A substring that is expected to appear
678                                       in the control file output string that
679                                       is passed to create_job_common.
680                                       Default: ''
681        """
682        download_started_time = constants.DOWNLOAD_STARTED_TIME
683        payload_finished_time = constants.PAYLOAD_FINISHED_TIME
684        self.mox.StubOutWithMock(rpc_utils, 'create_job_common')
685        rpc_utils.create_job_common(mox.And(mox.StrContains(self._NAME),
686                                    mox.StrContains(self._BUILD)),
687                            priority=self._PRIORITY,
688                            timeout_mins=self._TIMEOUT*60,
689                            max_runtime_mins=self._TIMEOUT*60,
690                            control_type='Server',
691                            control_file=mox.And(mox.StrContains(self._BOARD),
692                                                 mox.StrContains(self._BUILD),
693                                                 mox.StrContains(
694                                                     control_file_substring)),
695                            hostless=True,
696                            keyvals=mox.And(mox.In(download_started_time),
697                                            mox.In(payload_finished_time))
698                            ).AndReturn(to_return)
699
700
701    def testStageBuildFail(self):
702        """Ensure that a failure to stage the desired build fails the RPC."""
703        self._setupDevserver()
704
705        self.dev_server.hostname = 'mox_url'
706        self.dev_server.stage_artifacts(
707                image=self._BUILD, artifacts=['test_suites']).AndRaise(
708                dev_server.DevServerException())
709        self.mox.ReplayAll()
710        self.assertRaises(error.StageControlFileFailure,
711                          rpc_interface.create_suite_job,
712                          name=self._NAME,
713                          board=self._BOARD,
714                          builds=self._BUILDS,
715                          pool=None)
716
717
718    def testGetControlFileFail(self):
719        """Ensure that a failure to get needed control file fails the RPC."""
720        self._mockDevServerGetter()
721
722        self.dev_server.hostname = 'mox_url'
723        self.dev_server.stage_artifacts(
724                image=self._BUILD, artifacts=['test_suites']).AndReturn(True)
725
726        self.getter.get_control_file_contents_by_name(
727            self._SUITE_NAME).AndReturn(None)
728        self.mox.ReplayAll()
729        self.assertRaises(error.ControlFileEmpty,
730                          rpc_interface.create_suite_job,
731                          name=self._NAME,
732                          board=self._BOARD,
733                          builds=self._BUILDS,
734                          pool=None)
735
736
737    def testGetControlFileListFail(self):
738        """Ensure that a failure to get needed control file fails the RPC."""
739        self._mockDevServerGetter()
740
741        self.dev_server.hostname = 'mox_url'
742        self.dev_server.stage_artifacts(
743                image=self._BUILD, artifacts=['test_suites']).AndReturn(True)
744
745        self.getter.get_control_file_contents_by_name(
746            self._SUITE_NAME).AndRaise(error.NoControlFileList())
747        self.mox.ReplayAll()
748        self.assertRaises(error.NoControlFileList,
749                          rpc_interface.create_suite_job,
750                          name=self._NAME,
751                          board=self._BOARD,
752                          builds=self._BUILDS,
753                          pool=None)
754
755
756    def testBadNumArgument(self):
757        """Ensure we handle bad values for the |num| argument."""
758        self.assertRaises(error.SuiteArgumentException,
759                          rpc_interface.create_suite_job,
760                          name=self._NAME,
761                          board=self._BOARD,
762                          builds=self._BUILDS,
763                          pool=None,
764                          num='goo')
765        self.assertRaises(error.SuiteArgumentException,
766                          rpc_interface.create_suite_job,
767                          name=self._NAME,
768                          board=self._BOARD,
769                          builds=self._BUILDS,
770                          pool=None,
771                          num=[])
772        self.assertRaises(error.SuiteArgumentException,
773                          rpc_interface.create_suite_job,
774                          name=self._NAME,
775                          board=self._BOARD,
776                          builds=self._BUILDS,
777                          pool=None,
778                          num='5')
779
780
781
782    def testCreateSuiteJobFail(self):
783        """Ensure that failure to schedule the suite job fails the RPC."""
784        self._mockDevServerGetter()
785
786        self.dev_server.hostname = 'mox_url'
787        self.dev_server.stage_artifacts(
788                image=self._BUILD, artifacts=['test_suites']).AndReturn(True)
789
790        self.getter.get_control_file_contents_by_name(
791            self._SUITE_NAME).AndReturn('f')
792
793        self.dev_server.url().AndReturn('mox_url')
794        self._mockRpcUtils(-1)
795        self.mox.ReplayAll()
796        self.assertEquals(
797            rpc_interface.create_suite_job(name=self._NAME,
798                                           board=self._BOARD,
799                                           builds=self._BUILDS, pool=None),
800            -1)
801
802
803    def testCreateSuiteJobSuccess(self):
804        """Ensures that success results in a successful RPC."""
805        self._mockDevServerGetter()
806
807        self.dev_server.hostname = 'mox_url'
808        self.dev_server.stage_artifacts(
809                image=self._BUILD, artifacts=['test_suites']).AndReturn(True)
810
811        self.getter.get_control_file_contents_by_name(
812            self._SUITE_NAME).AndReturn('f')
813
814        self.dev_server.url().AndReturn('mox_url')
815        job_id = 5
816        self._mockRpcUtils(job_id)
817        self.mox.ReplayAll()
818        self.assertEquals(
819            rpc_interface.create_suite_job(name=self._NAME,
820                                           board=self._BOARD,
821                                           builds=self._BUILDS,
822                                           pool=None),
823            job_id)
824
825
826    def testCreateSuiteJobNoHostCheckSuccess(self):
827        """Ensures that success results in a successful RPC."""
828        self._mockDevServerGetter()
829
830        self.dev_server.hostname = 'mox_url'
831        self.dev_server.stage_artifacts(
832                image=self._BUILD, artifacts=['test_suites']).AndReturn(True)
833
834        self.getter.get_control_file_contents_by_name(
835            self._SUITE_NAME).AndReturn('f')
836
837        self.dev_server.url().AndReturn('mox_url')
838        job_id = 5
839        self._mockRpcUtils(job_id)
840        self.mox.ReplayAll()
841        self.assertEquals(
842          rpc_interface.create_suite_job(name=self._NAME,
843                                         board=self._BOARD,
844                                         builds=self._BUILDS,
845                                         pool=None, check_hosts=False),
846          job_id)
847
848    def testCreateSuiteIntegerNum(self):
849        """Ensures that success results in a successful RPC."""
850        self._mockDevServerGetter()
851
852        self.dev_server.hostname = 'mox_url'
853        self.dev_server.stage_artifacts(
854                image=self._BUILD, artifacts=['test_suites']).AndReturn(True)
855
856        self.getter.get_control_file_contents_by_name(
857            self._SUITE_NAME).AndReturn('f')
858
859        self.dev_server.url().AndReturn('mox_url')
860        job_id = 5
861        self._mockRpcUtils(job_id, control_file_substring='num=17')
862        self.mox.ReplayAll()
863        self.assertEquals(
864            rpc_interface.create_suite_job(name=self._NAME,
865                                           board=self._BOARD,
866                                           builds=self._BUILDS,
867                                           pool=None,
868                                           check_hosts=False,
869                                           num=17),
870            job_id)
871
872
873    def testCreateSuiteJobControlFileSupplied(self):
874        """Ensure we can supply the control file to create_suite_job."""
875        self._mockDevServerGetter(get_control_file=False)
876
877        self.dev_server.hostname = 'mox_url'
878        self.dev_server.stage_artifacts(
879                image=self._BUILD, artifacts=['test_suites']).AndReturn(True)
880        self.dev_server.url().AndReturn('mox_url')
881        job_id = 5
882        self._mockRpcUtils(job_id)
883        self.mox.ReplayAll()
884        self.assertEquals(
885            rpc_interface.create_suite_job(name='%s/%s' % (self._NAME,
886                                                           self._BUILD),
887                                           board=None,
888                                           builds=self._BUILDS,
889                                           pool=None,
890                                           control_file='CONTROL FILE'),
891            job_id)
892
893
894    def _get_records_for_sending_to_master(self):
895        return [{'control_file': 'foo',
896                 'control_type': 1,
897                 'created_on': datetime.datetime(2014, 8, 21),
898                 'drone_set': None,
899                 'email_list': '',
900                 'max_runtime_hrs': 72,
901                 'max_runtime_mins': 1440,
902                 'name': 'dummy',
903                 'owner': 'autotest_system',
904                 'parse_failed_repair': True,
905                 'priority': 40,
906                 'reboot_after': 0,
907                 'reboot_before': 1,
908                 'run_reset': True,
909                 'run_verify': False,
910                 'synch_count': 0,
911                 'test_retry': 10,
912                 'timeout': 24,
913                 'timeout_mins': 1440,
914                 'id': 1
915                 }], [{
916                    'aborted': False,
917                    'active': False,
918                    'complete': False,
919                    'deleted': False,
920                    'execution_subdir': '',
921                    'finished_on': None,
922                    'started_on': None,
923                    'status': 'Queued',
924                    'id': 1
925                }]
926
927
928    def _do_heartbeat_and_assert_response(self, shard_hostname='shard1',
929                                          upload_jobs=(), upload_hqes=(),
930                                          known_jobs=(), known_hosts=(),
931                                          **kwargs):
932        known_job_ids = [job.id for job in known_jobs]
933        known_host_ids = [host.id for host in known_hosts]
934        known_host_statuses = [host.status for host in known_hosts]
935
936        retval = rpc_interface.shard_heartbeat(
937            shard_hostname=shard_hostname,
938            jobs=upload_jobs, hqes=upload_hqes,
939            known_job_ids=known_job_ids, known_host_ids=known_host_ids,
940            known_host_statuses=known_host_statuses)
941
942        self._assert_shard_heartbeat_response(shard_hostname, retval,
943                                              **kwargs)
944
945        return shard_hostname
946
947
948    def _assert_shard_heartbeat_response(self, shard_hostname, retval, jobs=[],
949                                         hosts=[], hqes=[]):
950
951        retval_hosts, retval_jobs = retval['hosts'], retval['jobs']
952
953        expected_jobs = [
954            (job.id, job.name, shard_hostname) for job in jobs]
955        returned_jobs = [(job['id'], job['name'], job['shard']['hostname'])
956                         for job in retval_jobs]
957        self.assertEqual(returned_jobs, expected_jobs)
958
959        expected_hosts = [(host.id, host.hostname) for host in hosts]
960        returned_hosts = [(host['id'], host['hostname'])
961                          for host in retval_hosts]
962        self.assertEqual(returned_hosts, expected_hosts)
963
964        retval_hqes = []
965        for job in retval_jobs:
966            retval_hqes += job['hostqueueentry_set']
967
968        expected_hqes = [(hqe.id) for hqe in hqes]
969        returned_hqes = [(hqe['id']) for hqe in retval_hqes]
970        self.assertEqual(returned_hqes, expected_hqes)
971
972
973    def _send_records_to_master_helper(
974        self, jobs, hqes, shard_hostname='host1',
975        exception_to_throw=error.UnallowedRecordsSentToMaster, aborted=False):
976        job_id = rpc_interface.create_job(
977                name='dummy',
978                priority=self._PRIORITY,
979                control_file='foo',
980                control_type=SERVER,
981                test_retry=10, hostless=True)
982        job = models.Job.objects.get(pk=job_id)
983        shard = models.Shard.objects.create(hostname='host1')
984        job.shard = shard
985        job.save()
986
987        if aborted:
988            job.hostqueueentry_set.update(aborted=True)
989            job.shard = None
990            job.save()
991
992        hqe = job.hostqueueentry_set.all()[0]
993        if not exception_to_throw:
994            self._do_heartbeat_and_assert_response(
995                shard_hostname=shard_hostname,
996                upload_jobs=jobs, upload_hqes=hqes)
997        else:
998            self.assertRaises(
999                exception_to_throw,
1000                self._do_heartbeat_and_assert_response,
1001                shard_hostname=shard_hostname,
1002                upload_jobs=jobs, upload_hqes=hqes)
1003
1004
1005    def testSendingRecordsToMaster(self):
1006        """Send records to the master and ensure they are persisted."""
1007        jobs, hqes = self._get_records_for_sending_to_master()
1008        hqes[0]['status'] = 'Completed'
1009        self._send_records_to_master_helper(
1010            jobs=jobs, hqes=hqes, exception_to_throw=None)
1011
1012        # Check the entry was actually written to db
1013        self.assertEqual(models.HostQueueEntry.objects.all()[0].status,
1014                         'Completed')
1015
1016
1017    def testSendingRecordsToMasterAbortedOnMaster(self):
1018        """Send records to the master and ensure they are persisted."""
1019        jobs, hqes = self._get_records_for_sending_to_master()
1020        hqes[0]['status'] = 'Completed'
1021        self._send_records_to_master_helper(
1022            jobs=jobs, hqes=hqes, exception_to_throw=None, aborted=True)
1023
1024        # Check the entry was actually written to db
1025        self.assertEqual(models.HostQueueEntry.objects.all()[0].status,
1026                         'Completed')
1027
1028
1029    def testSendingRecordsToMasterJobAssignedToDifferentShard(self):
1030        """Ensure records that belong to a different shard are rejected."""
1031        jobs, hqes = self._get_records_for_sending_to_master()
1032        models.Shard.objects.create(hostname='other_shard')
1033        self._send_records_to_master_helper(
1034            jobs=jobs, hqes=hqes, shard_hostname='other_shard')
1035
1036
1037    def testSendingRecordsToMasterJobHqeWithoutJob(self):
1038        """Ensure update for hqe without update for it's job gets rejected."""
1039        _, hqes = self._get_records_for_sending_to_master()
1040        self._send_records_to_master_helper(
1041            jobs=[], hqes=hqes)
1042
1043
1044    def testSendingRecordsToMasterNotExistingJob(self):
1045        """Ensure update for non existing job gets rejected."""
1046        jobs, hqes = self._get_records_for_sending_to_master()
1047        jobs[0]['id'] = 3
1048
1049        self._send_records_to_master_helper(
1050            jobs=jobs, hqes=hqes)
1051
1052
1053    def _createShardAndHostWithLabel(self, shard_hostname='shard1',
1054                                     host_hostname='host1',
1055                                     label_name='board:lumpy'):
1056        label = models.Label.objects.create(name=label_name)
1057
1058        shard = models.Shard.objects.create(hostname=shard_hostname)
1059        shard.labels.add(label)
1060
1061        host = models.Host.objects.create(hostname=host_hostname, leased=False)
1062        host.labels.add(label)
1063
1064        return shard, host, label
1065
1066
1067    def _createJobForLabel(self, label):
1068        job_id = rpc_interface.create_job(name='dummy', priority=self._PRIORITY,
1069                                          control_file='foo',
1070                                          control_type=CLIENT,
1071                                          meta_hosts=[label.name],
1072                                          dependencies=(label.name,))
1073        return models.Job.objects.get(id=job_id)
1074
1075
1076    def testShardHeartbeatFetchHostlessJob(self):
1077        """Create a hostless job and ensure it's not assigned to a shard."""
1078        shard1, host1, lumpy_label = self._createShardAndHostWithLabel(
1079            'shard1', 'host1', 'board:lumpy')
1080
1081        label2 = models.Label.objects.create(name='bluetooth', platform=False)
1082
1083        job1 = self._create_job(hostless=True)
1084
1085        # Hostless jobs should be executed by the global scheduler.
1086        self._do_heartbeat_and_assert_response(hosts=[host1])
1087
1088
1089    def testShardRetrieveJobs(self):
1090        """Create jobs and retrieve them."""
1091        # should never be returned by heartbeat
1092        leased_host = models.Host.objects.create(hostname='leased_host',
1093                                                 leased=True)
1094
1095        shard1, host1, lumpy_label = self._createShardAndHostWithLabel()
1096        shard2, host2, grumpy_label = self._createShardAndHostWithLabel(
1097            'shard2', 'host2', 'board:grumpy')
1098
1099        leased_host.labels.add(lumpy_label)
1100
1101        job1 = self._createJobForLabel(lumpy_label)
1102
1103        job2 = self._createJobForLabel(grumpy_label)
1104
1105        job_completed = self._createJobForLabel(lumpy_label)
1106        # Job is already being run, so don't sync it
1107        job_completed.hostqueueentry_set.update(complete=True)
1108        job_completed.hostqueueentry_set.create(complete=False)
1109
1110        job_active = self._createJobForLabel(lumpy_label)
1111        # Job is already started, so don't sync it
1112        job_active.hostqueueentry_set.update(active=True)
1113        job_active.hostqueueentry_set.create(complete=False, active=False)
1114
1115        self._do_heartbeat_and_assert_response(
1116            jobs=[job1], hosts=[host1], hqes=job1.hostqueueentry_set.all())
1117
1118        self._do_heartbeat_and_assert_response(
1119            shard_hostname=shard2.hostname,
1120            jobs=[job2], hosts=[host2], hqes=job2.hostqueueentry_set.all())
1121
1122        host3 = models.Host.objects.create(hostname='host3', leased=False)
1123        host3.labels.add(lumpy_label)
1124
1125        self._do_heartbeat_and_assert_response(
1126            known_jobs=[job1], known_hosts=[host1], hosts=[host3])
1127
1128
1129    def testResendJobsAfterFailedHeartbeat(self):
1130        """Create jobs, retrieve them, fail on client, fetch them again."""
1131        shard1, host1, lumpy_label = self._createShardAndHostWithLabel()
1132
1133        job1 = self._createJobForLabel(lumpy_label)
1134
1135        self._do_heartbeat_and_assert_response(
1136            jobs=[job1],
1137            hqes=job1.hostqueueentry_set.all(), hosts=[host1])
1138
1139        # Make sure it's resubmitted by sending last_job=None again
1140        self._do_heartbeat_and_assert_response(
1141            known_hosts=[host1],
1142            jobs=[job1], hqes=job1.hostqueueentry_set.all(), hosts=[])
1143
1144        # Now it worked, make sure it's not sent again
1145        self._do_heartbeat_and_assert_response(
1146            known_jobs=[job1], known_hosts=[host1])
1147
1148        job1 = models.Job.objects.get(pk=job1.id)
1149        job1.hostqueueentry_set.all().update(complete=True)
1150
1151        # Job is completed, make sure it's not sent again
1152        self._do_heartbeat_and_assert_response(
1153            known_hosts=[host1])
1154
1155        job2 = self._createJobForLabel(lumpy_label)
1156
1157        # job2's creation was later, it should be returned now.
1158        self._do_heartbeat_and_assert_response(
1159            known_hosts=[host1],
1160            jobs=[job2], hqes=job2.hostqueueentry_set.all())
1161
1162        self._do_heartbeat_and_assert_response(
1163            known_jobs=[job2], known_hosts=[host1])
1164
1165        job2 = models.Job.objects.get(pk=job2.pk)
1166        job2.hostqueueentry_set.update(aborted=True)
1167        # Setting a job to a complete status will set the shard_id to None in
1168        # scheduler_models. We have to emulate that here, because we use Django
1169        # models in tests.
1170        job2.shard = None
1171        job2.save()
1172
1173        self._do_heartbeat_and_assert_response(
1174            known_jobs=[job2], known_hosts=[host1],
1175            jobs=[job2],
1176            hqes=job2.hostqueueentry_set.all())
1177
1178        models.Test.objects.create(name='platform_BootPerfServer:shard',
1179                                   test_type=1)
1180        self.mox.StubOutWithMock(server_utils, 'read_file')
1181        server_utils.read_file(mox.IgnoreArg()).AndReturn('')
1182        self.mox.ReplayAll()
1183        rpc_interface.delete_shard(hostname=shard1.hostname)
1184
1185        self.assertRaises(
1186            models.Shard.DoesNotExist, models.Shard.objects.get, pk=shard1.id)
1187
1188        job1 = models.Job.objects.get(pk=job1.id)
1189        lumpy_label = models.Label.objects.get(pk=lumpy_label.id)
1190        host1 = models.Host.objects.get(pk=host1.id)
1191        super_job = models.Job.objects.get(priority=priorities.Priority.SUPER)
1192        super_job_host = models.HostQueueEntry.objects.get(
1193                job_id=super_job.id)
1194
1195        self.assertIsNone(job1.shard)
1196        self.assertEqual(len(lumpy_label.shard_set.all()), 0)
1197        self.assertIsNone(host1.shard)
1198        self.assertIsNotNone(super_job)
1199        self.assertEqual(super_job_host.host_id, host1.id)
1200
1201
1202    def testCreateListShard(self):
1203        """Retrieve a list of all shards."""
1204        lumpy_label = models.Label.objects.create(name='board:lumpy',
1205                                                  platform=True)
1206        stumpy_label = models.Label.objects.create(name='board:stumpy',
1207                                                  platform=True)
1208        peppy_label = models.Label.objects.create(name='board:peppy',
1209                                                  platform=True)
1210
1211        shard_id = rpc_interface.add_shard(
1212            hostname='host1', labels='board:lumpy,board:stumpy')
1213        self.assertRaises(error.RPCException,
1214                          rpc_interface.add_shard,
1215                          hostname='host1', labels='board:lumpy,board:stumpy')
1216        self.assertRaises(model_logic.ValidationError,
1217                          rpc_interface.add_shard,
1218                          hostname='host1', labels='board:peppy')
1219        shard = models.Shard.objects.get(pk=shard_id)
1220        self.assertEqual(shard.hostname, 'host1')
1221        self.assertEqual(shard.labels.values_list('pk')[0], (lumpy_label.id,))
1222        self.assertEqual(shard.labels.values_list('pk')[1], (stumpy_label.id,))
1223
1224        self.assertEqual(rpc_interface.get_shards(),
1225                         [{'labels': ['board:lumpy','board:stumpy'],
1226                           'hostname': 'host1',
1227                           'id': 1}])
1228
1229
1230    def testAddBoardsToShard(self):
1231        """Add boards to a given shard."""
1232        shard1, host1, lumpy_label = self._createShardAndHostWithLabel()
1233        stumpy_label = models.Label.objects.create(name='board:stumpy',
1234                                                   platform=True)
1235        shard_id = rpc_interface.add_board_to_shard(
1236            hostname='shard1', labels='board:stumpy')
1237        # Test whether raise exception when board label does not exist.
1238        self.assertRaises(models.Label.DoesNotExist,
1239                          rpc_interface.add_board_to_shard,
1240                          hostname='shard1', labels='board:test')
1241        # Test whether raise exception when board already sharded.
1242        self.assertRaises(error.RPCException,
1243                          rpc_interface.add_board_to_shard,
1244                          hostname='shard1', labels='board:lumpy')
1245        shard = models.Shard.objects.get(pk=shard_id)
1246        self.assertEqual(shard.hostname, 'shard1')
1247        self.assertEqual(shard.labels.values_list('pk')[0], (lumpy_label.id,))
1248        self.assertEqual(shard.labels.values_list('pk')[1], (stumpy_label.id,))
1249
1250        self.assertEqual(rpc_interface.get_shards(),
1251                         [{'labels': ['board:lumpy','board:stumpy'],
1252                           'hostname': 'shard1',
1253                           'id': 1}])
1254
1255
1256    def testResendHostsAfterFailedHeartbeat(self):
1257        """Check that master accepts resending updated records after failure."""
1258        shard1, host1, lumpy_label = self._createShardAndHostWithLabel()
1259
1260        # Send the host
1261        self._do_heartbeat_and_assert_response(hosts=[host1])
1262
1263        # Send it again because previous one didn't persist correctly
1264        self._do_heartbeat_and_assert_response(hosts=[host1])
1265
1266        # Now it worked, make sure it isn't sent again
1267        self._do_heartbeat_and_assert_response(known_hosts=[host1])
1268
1269
1270if __name__ == '__main__':
1271    unittest.main()
1272