monitor_db_unittest.py revision 04c82c5dc70a6de95f9cd77371d0a99cbdcf0959
1#!/usr/bin/python 2 3import unittest, time, subprocess 4import MySQLdb 5import common 6from autotest_lib.client.common_lib import global_config 7import monitor_db 8 9_DEBUG = False 10 11_TEST_DATA = """ 12-- create a user and an ACL group 13INSERT INTO users (login) VALUES ('my_user'); 14INSERT INTO acl_groups (name) VALUES ('my_acl'); 15INSERT INTO acl_groups_users (user_id, acl_group_id) VALUES (1, 1); 16 17-- create some hosts 18INSERT INTO hosts (hostname) VALUES ('host1'), ('host2'); 19-- add hosts to the ACL group 20INSERT INTO acl_groups_hosts (host_id, acl_group_id) VALUES 21 (1, 1), (2, 1); 22 23-- create a label for each host and one holding both 24INSERT INTO labels (name) VALUES ('label1'), ('label2'); 25 26-- add hosts to labels 27INSERT INTO hosts_labels (host_id, label_id) VALUES 28 (1, 1), (2, 2); 29""" 30 31class Dummy(object): 32 'Dummy object that can have attribute assigned to it' 33 34class DispatcherTest(unittest.TestCase): 35 _jobs_scheduled = [] 36 _job_counter = 0 37 38 39 def _read_db_info(self): 40 config = global_config.global_config 41 section = 'AUTOTEST_WEB' 42 self._host = config.get_config_value(section, "host") 43 self._db_name = config.get_config_value(section, "database") 44 self._user = config.get_config_value(section, "user") 45 self._password = config.get_config_value(section, "password") 46 47 48 def _connect_to_db(self, db_name=''): 49 self._con = MySQLdb.connect(host=self._host, user=self._user, 50 passwd=self._password, db=db_name) 51 self._con.autocommit(True) 52 self._cur = self._con.cursor() 53 54 55 def _disconnect_from_db(self): 56 self._con.close() 57 58 59 def _do_query(self, sql): 60 if _DEBUG: 61 print 'SQL:', sql 62 self._cur.execute(sql) 63 64 65 def _do_queries(self, sql_queries): 66 for query in sql_queries.split(';'): 67 query = query.strip() 68 if query: 69 self._do_query(query) 70 71 72 def _get_db_schema(self): 73 command = 'mysqldump --no-data -u %s -p%s -h %s %s' % ( 74 self._user, self._password, self._host, self._db_name) 75 proc = subprocess.Popen(command, stdout=subprocess.PIPE, 76 shell=True) 77 return proc.communicate()[0] 78 79 80 def _open_test_db(self, schema): 81 self._db_name = 'test_' + self._db_name 82 self._connect_to_db() 83 self._do_query('CREATE DATABASE ' + self._db_name) 84 self._disconnect_from_db() 85 self._connect_to_db(self._db_name) 86 self._do_queries(schema) 87 88 89 def _close_test_db(self): 90 self._do_query('DROP DATABASE ' + self._db_name) 91 self._disconnect_from_db() 92 93 94 def _fill_in_test_data(self): 95 self._do_queries(_TEST_DATA) 96 97 98 def _set_monitor_stubs(self): 99 monitor_db._db = monitor_db.DatabaseConn() 100 monitor_db._db.connect(db_name=self._db_name) 101 def run_stub(hqe_self, assigned_host=None): 102 if hqe_self.meta_host: 103 host = assigned_host 104 else: 105 host = hqe_self.host 106 self._record_job_scheduled(hqe_self.job.id, host.id) 107 return Dummy() 108 monitor_db.HostQueueEntry.run = run_stub 109 110 111 def _record_job_scheduled(self, job_id, host_id): 112 record = (job_id, host_id) 113 self.assert_(record not in self._jobs_scheduled, 114 'Job %d scheduled on host %d twice' % 115 (job_id, host_id)) 116 self._jobs_scheduled.append(record) 117 118 119 def _assert_job_scheduled_on(self, job_id, host_id): 120 record = (job_id, host_id) 121 self.assert_(record in self._jobs_scheduled, 122 'Job %d not scheduled on host %d as expected' % 123 (job_id, host_id)) 124 self._jobs_scheduled.remove(record) 125 126 127 def _check_for_extra_schedulings(self): 128 if len(self._jobs_scheduled) != 0: 129 self.fail('Extra jobs scheduled: ' + 130 str(self._jobs_scheduled)) 131 132 133 def _create_job(self, hosts=[], metahosts=[], priority=0, active=0): 134 self._do_query('INSERT INTO jobs (name, priority) VALUES ' 135 '("test", %d)' % priority) 136 self._job_counter += 1 137 job_id = self._job_counter 138 queue_entry_sql = ( 139 'INSERT INTO host_queue_entries ' 140 '(job_id, priority, host_id, meta_host, active) ' 141 'VALUES (%d, %d, %%s, %%s, %d)' % 142 (job_id, priority, active)) 143 for host_id in hosts: 144 self._do_query(queue_entry_sql % (host_id, 'NULL')) 145 self._do_query('INSERT INTO ineligible_host_queues ' 146 '(job_id, host_id) VALUES (%d, %d)' % 147 (job_id, host_id)) 148 for label_id in metahosts: 149 self._do_query(queue_entry_sql % ('NULL', label_id)) 150 151 152 def _create_job_simple(self, hosts, use_metahost=False, 153 priority=0, active=0): 154 'An alternative interface to _create_job' 155 args = {'hosts' : [], 'metahosts' : []} 156 if use_metahost: 157 args['metahosts'] = hosts 158 else: 159 args['hosts'] = hosts 160 self._create_job(priority=priority, active=active, **args) 161 162 163 def _convert_jobs_to_metahosts(self, *job_ids): 164 sql_tuple = '(' + ','.join(str(i) for i in job_ids) + ')' 165 self._do_query('UPDATE host_queue_entries SET ' 166 'meta_host=host_id, host_id=NULL ' 167 'WHERE job_id IN ' + sql_tuple) 168 169 170 def _lock_host(self, host_id): 171 self._do_query('UPDATE hosts SET locked=1 WHERE id=' + 172 str(host_id)) 173 174 175 def setUp(self): 176 self._read_db_info() 177 schema = self._get_db_schema() 178 self._open_test_db(schema) 179 self._fill_in_test_data() 180 self._set_monitor_stubs() 181 self._dispatcher = monitor_db.Dispatcher() 182 183 184 def tearDown(self): 185 self._close_test_db() 186 187 188 def _test_basic_scheduling_helper(self, use_metahosts): 189 'Basic nonmetahost scheduling' 190 self._create_job_simple([1], use_metahosts) 191 self._create_job_simple([2], use_metahosts) 192 self._dispatcher._schedule_new_jobs() 193 self._assert_job_scheduled_on(1, 1) 194 self._assert_job_scheduled_on(2, 2) 195 self._check_for_extra_schedulings() 196 197 198 def _test_priorities_helper(self, use_metahosts): 199 'Test prioritization ordering' 200 self._create_job_simple([1], use_metahosts) 201 self._create_job_simple([2], use_metahosts) 202 self._create_job_simple([1,2], use_metahosts) 203 self._create_job_simple([1], use_metahosts, priority=1) 204 self._dispatcher._schedule_new_jobs() 205 self._assert_job_scheduled_on(4, 1) # higher priority 206 self._assert_job_scheduled_on(2, 2) # earlier job over later 207 self._check_for_extra_schedulings() 208 209 210 def _test_hosts_ready_helper(self, use_metahosts): 211 """ 212 Only hosts that are status=Ready, unlocked and not invalid get 213 scheduled. 214 """ 215 self._create_job_simple([1], use_metahosts) 216 self._do_query('UPDATE hosts SET status="Running" WHERE id=1') 217 self._dispatcher._schedule_new_jobs() 218 self._check_for_extra_schedulings() 219 220 self._do_query('UPDATE hosts SET status="Ready", locked=1 ' 221 'WHERE id=1') 222 self._dispatcher._schedule_new_jobs() 223 self._check_for_extra_schedulings() 224 225 self._do_query('UPDATE hosts SET locked=0, invalid=1 ' 226 'WHERE id=1') 227 self._dispatcher._schedule_new_jobs() 228 self._check_for_extra_schedulings() 229 230 231 def _test_hosts_idle_helper(self, use_metahosts): 232 'Only idle hosts get scheduled' 233 self._create_job(hosts=[1], active=1) 234 self._create_job_simple([1], use_metahosts) 235 self._dispatcher._schedule_new_jobs() 236 self._check_for_extra_schedulings() 237 238 239 def test_basic_scheduling(self): 240 self._test_basic_scheduling_helper(False) 241 242 243 def test_priorities(self): 244 self._test_priorities_helper(False) 245 246 247 def test_hosts_ready(self): 248 self._test_hosts_ready_helper(False) 249 250 251 def test_hosts_idle(self): 252 self._test_hosts_idle_helper(False) 253 254 255 def test_metahost_scheduling(self): 256 'Basic metahost scheduling' 257 self._test_basic_scheduling_helper(True) 258 259 260 def test_priorities(self): 261 self._test_priorities_helper(True) 262 263 264 def test_metahost_hosts_ready(self): 265 self._test_hosts_ready_helper(True) 266 267 268 def test_metahost_hosts_idle(self): 269 self._test_hosts_idle_helper(True) 270 271 272 def test_nonmetahost_over_metahost(self): 273 """ 274 Non-metahost entries should take priority over metahost entries 275 for the same host 276 """ 277 self._create_job(metahosts=[1]) 278 self._create_job(hosts=[1]) 279 self._dispatcher._schedule_new_jobs() 280 self._assert_job_scheduled_on(2, 1) 281 self._check_for_extra_schedulings() 282 283 284 def test_metahosts_obey_blocks(self): 285 """ 286 Metahosts can't get scheduled on hosts already scheduled for 287 that job. 288 """ 289 self._create_job(metahosts=[1], hosts=[1]) 290 self._dispatcher._schedule_new_jobs() 291 self._assert_job_scheduled_on(1, 1) 292 self._check_for_extra_schedulings() 293 294 295 def test_metahosts_obey_ACLs(self): 296 "ACL-inaccessible hosts can't get scheduled for metahosts" 297 self._do_query('DELETE FROM acl_groups_hosts WHERE host_id=1') 298 self._create_job(metahosts=[1]) 299 self._do_query('INSERT INTO ineligible_host_queues ' 300 '(job_id, host_id) VALUES (1, 1)') 301 self._dispatcher._schedule_new_jobs() 302 self._check_for_extra_schedulings() 303 304 305if __name__ == '__main__': 306 unittest.main() 307