1#!/usr/bin/python
2#pylint: disable-msg=C0111
3
4# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
5# Use of this source code is governed by a BSD-style license that can be
6# found in the LICENSE file.
7
8import unittest
9
10import common
11from autotest_lib.client.common_lib import global_config
12from autotest_lib.frontend import setup_django_environment
13from autotest_lib.frontend.afe import frontend_test_utils
14from autotest_lib.frontend.afe import models
15from autotest_lib.scheduler import rdb_testing_utils
16from autotest_lib.scheduler import scheduler_models
17from autotest_lib.scheduler.shard import shard_client
18
19
20class ShardClientIntegrationTest(rdb_testing_utils.AbstractBaseRDBTester,
21                                 unittest.TestCase):
22    """Integration tests for the shard_client."""
23
24
25    def setup_global_config(self):
26        """Mock out global_config for shard client creation."""
27        global_config.global_config.override_config_value(
28                'SHARD', 'is_slave_shard', 'True')
29        global_config.global_config.override_config_value(
30                'SHARD', 'shard_hostname', 'host1')
31
32
33    def initialize_shard_client(self):
34        self.setup_global_config()
35        return shard_client.get_shard_client()
36
37
38    def testCompleteStatusBasic(self):
39        """Test that complete jobs are uploaded properly."""
40
41        client = self.initialize_shard_client()
42        job = self.create_job(deps=set(['a']), shard_hostname=client.hostname)
43        scheduler_models.initialize()
44        hqe = scheduler_models.HostQueueEntry.fetch(
45                where='job_id = %s' % job.id)[0]
46
47        # This should set both the shard_id and the complete bit.
48        hqe.set_status('Completed')
49
50        # Only incomplete jobs should be in known ids.
51        job_ids, host_ids, _ = client._get_known_jobs_and_hosts()
52        assert(job_ids == [])
53
54        # Jobs that have successfully gone through a set_status should
55        # be ready for upload.
56        jobs = client._get_jobs_to_upload()
57        assert(job.id in [j.id for j in jobs])
58
59
60    def testOnlyShardId(self):
61        """Test that setting only the shardid prevents the job from upload."""
62
63        client = self.initialize_shard_client()
64        job = self.create_job(deps=set(['a']), shard_hostname=client.hostname)
65        scheduler_models.initialize()
66        hqe = scheduler_models.HostQueueEntry.fetch(
67                where='job_id = %s' % job.id)[0]
68
69        def _local_update_field(hqe, field_name, value):
70            """Turns update_field on the complete field into a no-op."""
71            if field_name == 'complete':
72                return
73            models.HostQueueEntry.objects.filter(id=hqe.id).update(
74                    **{field_name: value})
75            setattr(hqe, field_name, value)
76
77        self.god.stub_with(scheduler_models.HostQueueEntry, 'update_field',
78                _local_update_field)
79
80        # This should only update the shard_id.
81        hqe.set_status('Completed')
82
83        # Retrieve the hqe along an independent code path so we're assured of
84        # freshness, then make sure it has shard=None and an unset complete bit.
85        modified_hqe = self.db_helper.get_hqes(job_id=job.id)[0]
86        assert(modified_hqe.id == hqe.id and
87               modified_hqe.complete == 0 and
88               modified_hqe.job.shard == None)
89
90        # Make sure the job with a shard but without complete is still
91        # in known_ids.
92        job_ids, host_ids, _ = client._get_known_jobs_and_hosts()
93        assert(set(job_ids) == set([job.id]))
94
95        # Make sure the job with a shard but without complete is not
96        # in uploaded jobs.
97        jobs = client._get_jobs_to_upload()
98        assert(jobs == [])
99
100
101    def testHostSerialization(self):
102        """Test simple host serialization."""
103        client = self.initialize_shard_client()
104        host = self.db_helper.create_host(name='test_host')
105        serialized_host = host.serialize()
106        models.Host.objects.all().delete()
107        models.Host.deserialize(serialized_host)
108        models.Host.objects.get(hostname='test_host')
109
110
111    def testUserExists(self):
112        """Test user related race conditions."""
113        client = self.initialize_shard_client()
114        user = self.db_helper.create_user(name='test_user')
115        serialized_user = user.serialize()
116
117        # Master sends a user with the same login but different id
118        serialized_user['id'] = '3'
119        models.User.deserialize(serialized_user)
120        models.User.objects.get(id=3, login='test_user')
121
122        # Master sends a user with the same id, different login
123        serialized_user['login'] = 'fake_user'
124        models.User.deserialize(serialized_user)
125        models.User.objects.get(id=3, login='fake_user')
126
127        # Master sends a new user
128        user = self.db_helper.create_user(name='new_user')
129        serialized_user = user.serialize()
130        models.User.objects.all().delete()
131        models.User.deserialize(serialized_user)
132        models.User.objects.get(login='new_user')
133
134
135if __name__ == '__main__':
136    unittest.main()
137
138