monitor_db_unittest.py revision ce38e0c281b1046574b8112209944e9daf2c3641
1#!/usr/bin/python 2 3import unittest, time, subprocess 4import MySQLdb 5import common 6from autotest_lib.client.common_lib import global_config 7import monitor_db 8 9_DEBUG = False 10 11_TEST_DATA = """ 12-- create a user and an ACL group 13INSERT INTO users (login) VALUES ('my_user'); 14INSERT INTO acl_groups (name) VALUES ('my_acl'); 15INSERT INTO acl_groups_users (user_id, acl_group_id) VALUES (1, 1); 16 17-- create some hosts 18INSERT INTO hosts (hostname) VALUES ('host1'), ('host2'); 19-- add hosts to the ACL group 20INSERT INTO acl_groups_hosts (host_id, acl_group_id) VALUES 21 (1, 1), (2, 1); 22 23-- create a label for each host and one holding both 24INSERT INTO labels (name) VALUES ('label1'), ('label2'); 25 26-- add hosts to labels 27INSERT INTO hosts_labels (host_id, label_id) VALUES 28 (1, 1), (2, 2); 29""" 30 31class DispatcherTest(unittest.TestCase): 32 _jobs_scheduled = [] 33 _job_counter = 0 34 35 36 def _read_db_info(self): 37 config = global_config.global_config 38 section = 'AUTOTEST_WEB' 39 self._host = config.get_config_value(section, "host") 40 self._db_name = config.get_config_value(section, "database") 41 self._user = config.get_config_value(section, "user") 42 self._password = config.get_config_value(section, "password") 43 44 45 def _connect_to_db(self, db_name=''): 46 self._con = MySQLdb.connect(host=self._host, user=self._user, 47 passwd=self._password, db=db_name) 48 self._con.autocommit(True) 49 self._cur = self._con.cursor() 50 51 52 def _disconnect_from_db(self): 53 self._con.close() 54 55 56 def _do_query(self, sql): 57 if _DEBUG: 58 print 'SQL:', sql 59 self._cur.execute(sql) 60 61 62 def _do_queries(self, sql_queries): 63 for query in sql_queries.split(';'): 64 query = query.strip() 65 if query: 66 self._do_query(query) 67 68 69 def _get_db_schema(self): 70 command = 'mysqldump --no-data -u %s -p%s -h %s %s' % ( 71 self._user, self._password, self._host, self._db_name) 72 proc = subprocess.Popen(command, stdout=subprocess.PIPE, 73 shell=True) 74 return proc.communicate()[0] 75 76 77 def _open_test_db(self, schema): 78 self._db_name = 'test_' + self._db_name 79 self._connect_to_db() 80 self._do_query('CREATE DATABASE ' + self._db_name) 81 self._disconnect_from_db() 82 self._connect_to_db(self._db_name) 83 self._do_queries(schema) 84 85 86 def _close_test_db(self): 87 self._do_query('DROP DATABASE ' + self._db_name) 88 self._disconnect_from_db() 89 90 91 def _fill_in_test_data(self): 92 self._do_queries(_TEST_DATA) 93 94 95 def _set_monitor_stubs(self): 96 monitor_db._db = monitor_db.DatabaseConn() 97 monitor_db._db.connect(db_name=self._db_name) 98 def run_stub(hqe_self, assigned_host=None): 99 if hqe_self.meta_host: 100 host = assigned_host 101 else: 102 host = hqe_self.host 103 self._record_job_scheduled(hqe_self.job.id, host.id) 104 monitor_db.HostQueueEntry.run = run_stub 105 106 107 def _record_job_scheduled(self, job_id, host_id): 108 record = (job_id, host_id) 109 self.assert_(record not in self._jobs_scheduled, 110 'Job %d scheduled on host %d twice' % 111 (job_id, host_id)) 112 self._jobs_scheduled.append(record) 113 114 115 def _assert_job_scheduled_on(self, job_id, host_id): 116 record = (job_id, host_id) 117 self.assert_(record in self._jobs_scheduled, 118 'Job %d not scheduled on host %d as expected' % 119 (job_id, host_id)) 120 self._jobs_scheduled.remove(record) 121 122 123 def _check_for_extra_schedulings(self): 124 if len(self._jobs_scheduled) != 0: 125 self.fail('Extra jobs scheduled: ' + 126 str(self._jobs_scheduled)) 127 128 129 def _create_job(self, hosts=[], metahosts=[], priority=0, active=0): 130 self._do_query('INSERT INTO jobs (name, priority) VALUES ' 131 '("test", %d)' % priority) 132 self._job_counter += 1 133 job_id = self._job_counter 134 queue_entry_sql = ( 135 'INSERT INTO host_queue_entries ' 136 '(job_id, priority, host_id, meta_host, active) ' 137 'VALUES (%d, %d, %%s, %%s, %d)' % 138 (job_id, priority, active)) 139 for host_id in hosts: 140 self._do_query(queue_entry_sql % (host_id, 'NULL')) 141 self._do_query('INSERT INTO ineligible_host_queues ' 142 '(job_id, host_id) VALUES (%d, %d)' % 143 (job_id, host_id)) 144 for label_id in metahosts: 145 self._do_query(queue_entry_sql % ('NULL', label_id)) 146 147 148 def _create_job_simple(self, hosts, use_metahost=False, 149 priority=0, active=0): 150 'An alternative interface to _create_job' 151 args = {'hosts' : [], 'metahosts' : []} 152 if use_metahost: 153 args['metahosts'] = hosts 154 else: 155 args['hosts'] = hosts 156 self._create_job(priority=priority, active=active, **args) 157 158 159 def _convert_jobs_to_metahosts(self, *job_ids): 160 sql_tuple = '(' + ','.join(str(i) for i in job_ids) + ')' 161 self._do_query('UPDATE host_queue_entries SET ' 162 'meta_host=host_id, host_id=NULL ' 163 'WHERE job_id IN ' + sql_tuple) 164 165 166 def _lock_host(self, host_id): 167 self._do_query('UPDATE hosts SET locked=1 WHERE id=' + 168 str(host_id)) 169 170 171 def setUp(self): 172 self._read_db_info() 173 schema = self._get_db_schema() 174 self._open_test_db(schema) 175 self._fill_in_test_data() 176 self._set_monitor_stubs() 177 self._dispatcher = monitor_db.Dispatcher() 178 179 180 def tearDown(self): 181 self._close_test_db() 182 183 184 def _test_basic_scheduling_helper(self, use_metahosts): 185 'Basic nonmetahost scheduling' 186 self._create_job_simple([1], use_metahosts) 187 self._create_job_simple([2], use_metahosts) 188 self._dispatcher._find_more_work() 189 self._assert_job_scheduled_on(1, 1) 190 self._assert_job_scheduled_on(2, 2) 191 self._check_for_extra_schedulings() 192 193 194 def _test_priorities_helper(self, use_metahosts): 195 'Test prioritization ordering' 196 self._create_job_simple([1], use_metahosts) 197 self._create_job_simple([2], use_metahosts) 198 self._create_job_simple([1,2], use_metahosts) 199 self._create_job_simple([1], use_metahosts, priority=1) 200 self._dispatcher._find_more_work() 201 self._assert_job_scheduled_on(4, 1) # higher priority 202 self._assert_job_scheduled_on(2, 2) # earlier job over later 203 self._check_for_extra_schedulings() 204 205 206 def _test_hosts_ready_helper(self, use_metahosts): 207 """ 208 Only hosts that are status=Ready, unlocked and not invalid get 209 scheduled. 210 """ 211 self._create_job_simple([1], use_metahosts) 212 self._do_query('UPDATE hosts SET status="Running" WHERE id=1') 213 self._dispatcher._find_more_work() 214 self._check_for_extra_schedulings() 215 216 self._do_query('UPDATE hosts SET status="Ready", locked=1 ' 217 'WHERE id=1') 218 self._dispatcher._find_more_work() 219 self._check_for_extra_schedulings() 220 221 self._do_query('UPDATE hosts SET locked=0, invalid=1 ' 222 'WHERE id=1') 223 self._dispatcher._find_more_work() 224 self._check_for_extra_schedulings() 225 226 227 def _test_hosts_idle_helper(self, use_metahosts): 228 'Only idle hosts get scheduled' 229 self._create_job(hosts=[1], active=1) 230 self._create_job_simple([1], use_metahosts) 231 self._dispatcher._find_more_work() 232 self._check_for_extra_schedulings() 233 234 235 def test_basic_scheduling(self): 236 self._test_basic_scheduling_helper(False) 237 238 239 def test_priorities(self): 240 self._test_priorities_helper(False) 241 242 243 def test_hosts_ready(self): 244 self._test_hosts_ready_helper(False) 245 246 247 def test_hosts_idle(self): 248 self._test_hosts_idle_helper(False) 249 250 251 def test_metahost_scheduling(self): 252 'Basic metahost scheduling' 253 self._test_basic_scheduling_helper(True) 254 255 256 def test_priorities(self): 257 self._test_priorities_helper(True) 258 259 260 def test_metahost_hosts_ready(self): 261 self._test_hosts_ready_helper(True) 262 263 264 def test_metahost_hosts_idle(self): 265 self._test_hosts_idle_helper(True) 266 267 268 def test_nonmetahost_over_metahost(self): 269 """ 270 Non-metahost entries should take priority over metahost entries 271 for the same host 272 """ 273 self._create_job(metahosts=[1]) 274 self._create_job(hosts=[1]) 275 self._dispatcher._find_more_work() 276 self._assert_job_scheduled_on(2, 1) 277 self._check_for_extra_schedulings() 278 279 280 def test_metahosts_obey_blocks(self): 281 """ 282 Metahosts can't get scheduled on hosts already scheduled for 283 that job. 284 """ 285 self._create_job(metahosts=[1], hosts=[1]) 286 self._dispatcher._find_more_work() 287 self._assert_job_scheduled_on(1, 1) 288 self._check_for_extra_schedulings() 289 290 291 def test_metahosts_obey_ACLs(self): 292 "ACL-inaccessible hosts can't get scheduled for metahosts" 293 self._do_query('DELETE FROM acl_groups_hosts WHERE host_id=1') 294 self._create_job(metahosts=[1]) 295 self._do_query('INSERT INTO ineligible_host_queues ' 296 '(job_id, host_id) VALUES (1, 1)') 297 self._dispatcher._find_more_work() 298 self._check_for_extra_schedulings() 299 300 301if __name__ == '__main__': 302 unittest.main() 303