monitor_db_unittest.py revision 20f47064be6855277e251cee7611d8336bcc9149
1#!/usr/bin/python 2 3import unittest, time, subprocess 4import MySQLdb 5import common 6from autotest_lib.client.common_lib import global_config 7import monitor_db 8 9_DEBUG = False 10 11_TEST_DATA = """ 12-- create a user and an ACL group 13INSERT INTO users (login) VALUES ('my_user'); 14INSERT INTO acl_groups (name) VALUES ('my_acl'); 15INSERT INTO acl_groups_users (user_id, acl_group_id) VALUES (1, 1); 16 17-- create some hosts 18INSERT INTO hosts (hostname) VALUES ('host1'), ('host2'); 19-- add hosts to the ACL group 20INSERT INTO acl_groups_hosts (host_id, acl_group_id) VALUES 21 (1, 1), (2, 1); 22 23-- create a label for each host and one holding both 24INSERT INTO labels (name) VALUES ('label1'), ('label2'); 25 26-- add hosts to labels 27INSERT INTO hosts_labels (host_id, label_id) VALUES 28 (1, 1), (2, 2); 29""" 30 31class Dummy(object): 32 'Dummy object that can have attribute assigned to it' 33 34class DispatcherTest(unittest.TestCase): 35 _jobs_scheduled = [] 36 _job_counter = 0 37 38 39 def _read_db_info(self): 40 config = global_config.global_config 41 section = 'AUTOTEST_WEB' 42 self._host = config.get_config_value(section, "host") 43 self._db_name = config.get_config_value(section, "database") 44 self._user = config.get_config_value(section, "user") 45 self._password = config.get_config_value(section, "password") 46 47 48 def _connect_to_db(self, db_name=''): 49 self._con = MySQLdb.connect(host=self._host, user=self._user, 50 passwd=self._password, db=db_name) 51 self._con.autocommit(True) 52 self._cur = self._con.cursor() 53 54 55 def _disconnect_from_db(self): 56 self._con.close() 57 58 59 def _do_query(self, sql): 60 if _DEBUG: 61 print 'SQL:', sql 62 self._cur.execute(sql) 63 64 65 def _do_queries(self, sql_queries): 66 for query in sql_queries.split(';'): 67 query = query.strip() 68 if query: 69 self._do_query(query) 70 71 72 def _get_db_schema(self): 73 command = 'mysqldump --no-data -u %s -p%s -h %s %s' % ( 74 self._user, self._password, self._host, self._db_name) 75 proc = subprocess.Popen(command, stdout=subprocess.PIPE, 76 shell=True) 77 return proc.communicate()[0] 78 79 80 def _open_test_db(self, schema): 81 self._db_name = 'test_' + self._db_name 82 self._connect_to_db() 83 self._do_query('CREATE DATABASE ' + self._db_name) 84 self._disconnect_from_db() 85 self._connect_to_db(self._db_name) 86 self._do_queries(schema) 87 88 89 def _close_test_db(self): 90 self._do_query('DROP DATABASE ' + self._db_name) 91 self._disconnect_from_db() 92 93 94 def _fill_in_test_data(self): 95 self._do_queries(_TEST_DATA) 96 97 98 def _set_monitor_stubs(self): 99 monitor_db._db = monitor_db.DatabaseConn() 100 monitor_db._db.connect(db_name=self._db_name) 101 def run_stub(hqe_self, assigned_host=None): 102 if hqe_self.meta_host: 103 host = assigned_host 104 else: 105 host = hqe_self.host 106 self._record_job_scheduled(hqe_self.job.id, host.id) 107 return Dummy() 108 monitor_db.HostQueueEntry.run = run_stub 109 110 111 def _record_job_scheduled(self, job_id, host_id): 112 record = (job_id, host_id) 113 self.assert_(record not in self._jobs_scheduled, 114 'Job %d scheduled on host %d twice' % 115 (job_id, host_id)) 116 self._jobs_scheduled.append(record) 117 118 119 def _assert_job_scheduled_on(self, job_id, host_id): 120 record = (job_id, host_id) 121 self.assert_(record in self._jobs_scheduled, 122 'Job %d not scheduled on host %d as expected\n' 123 'Jobs scheduled: %s' % 124 (job_id, host_id, self._jobs_scheduled)) 125 self._jobs_scheduled.remove(record) 126 127 128 def _check_for_extra_schedulings(self): 129 if len(self._jobs_scheduled) != 0: 130 self.fail('Extra jobs scheduled: ' + 131 str(self._jobs_scheduled)) 132 133 134 def _create_job(self, hosts=[], metahosts=[], priority=0, active=0): 135 self._do_query('INSERT INTO jobs (name, owner, priority) ' 136 'VALUES ("test", "my_user", %d)' % priority) 137 self._job_counter += 1 138 job_id = self._job_counter 139 queue_entry_sql = ( 140 'INSERT INTO host_queue_entries ' 141 '(job_id, priority, host_id, meta_host, active) ' 142 'VALUES (%d, %d, %%s, %%s, %d)' % 143 (job_id, priority, active)) 144 for host_id in hosts: 145 self._do_query(queue_entry_sql % (host_id, 'NULL')) 146 self._do_query('INSERT INTO ineligible_host_queues ' 147 '(job_id, host_id) VALUES (%d, %d)' % 148 (job_id, host_id)) 149 for label_id in metahosts: 150 self._do_query(queue_entry_sql % ('NULL', label_id)) 151 152 153 def _create_job_simple(self, hosts, use_metahost=False, 154 priority=0, active=0): 155 'An alternative interface to _create_job' 156 args = {'hosts' : [], 'metahosts' : []} 157 if use_metahost: 158 args['metahosts'] = hosts 159 else: 160 args['hosts'] = hosts 161 self._create_job(priority=priority, active=active, **args) 162 163 164 def _convert_jobs_to_metahosts(self, *job_ids): 165 sql_tuple = '(' + ','.join(str(i) for i in job_ids) + ')' 166 self._do_query('UPDATE host_queue_entries SET ' 167 'meta_host=host_id, host_id=NULL ' 168 'WHERE job_id IN ' + sql_tuple) 169 170 171 def _lock_host(self, host_id): 172 self._do_query('UPDATE hosts SET locked=1 WHERE id=' + 173 str(host_id)) 174 175 176 def setUp(self): 177 self._read_db_info() 178 schema = self._get_db_schema() 179 self._open_test_db(schema) 180 self._fill_in_test_data() 181 self._set_monitor_stubs() 182 self._dispatcher = monitor_db.Dispatcher() 183 self._jobs_scheduled = [] 184 self._job_counter = 0 185 186 187 def tearDown(self): 188 self._close_test_db() 189 190 191 def _test_basic_scheduling_helper(self, use_metahosts): 192 'Basic nonmetahost scheduling' 193 self._create_job_simple([1], use_metahosts) 194 self._create_job_simple([2], use_metahosts) 195 self._dispatcher._schedule_new_jobs() 196 self._assert_job_scheduled_on(1, 1) 197 self._assert_job_scheduled_on(2, 2) 198 self._check_for_extra_schedulings() 199 200 201 def _test_priorities_helper(self, use_metahosts): 202 'Test prioritization ordering' 203 self._create_job_simple([1], use_metahosts) 204 self._create_job_simple([2], use_metahosts) 205 self._create_job_simple([1,2], use_metahosts) 206 self._create_job_simple([1], use_metahosts, priority=1) 207 self._dispatcher._schedule_new_jobs() 208 self._assert_job_scheduled_on(4, 1) # higher priority 209 self._assert_job_scheduled_on(2, 2) # earlier job over later 210 self._check_for_extra_schedulings() 211 212 213 def _test_hosts_ready_helper(self, use_metahosts): 214 """ 215 Only hosts that are status=Ready, unlocked and not invalid get 216 scheduled. 217 """ 218 self._create_job_simple([1], use_metahosts) 219 self._do_query('UPDATE hosts SET status="Running" WHERE id=1') 220 self._dispatcher._schedule_new_jobs() 221 self._check_for_extra_schedulings() 222 223 self._do_query('UPDATE hosts SET status="Ready", locked=1 ' 224 'WHERE id=1') 225 self._dispatcher._schedule_new_jobs() 226 self._check_for_extra_schedulings() 227 228 self._do_query('UPDATE hosts SET locked=0, invalid=1 ' 229 'WHERE id=1') 230 self._dispatcher._schedule_new_jobs() 231 self._check_for_extra_schedulings() 232 233 234 def _test_hosts_idle_helper(self, use_metahosts): 235 'Only idle hosts get scheduled' 236 self._create_job(hosts=[1], active=1) 237 self._create_job_simple([1], use_metahosts) 238 self._dispatcher._schedule_new_jobs() 239 self._check_for_extra_schedulings() 240 241 242 def test_basic_scheduling(self): 243 self._test_basic_scheduling_helper(False) 244 245 246 def test_priorities(self): 247 self._test_priorities_helper(False) 248 249 250 def test_hosts_ready(self): 251 self._test_hosts_ready_helper(False) 252 253 254 def test_hosts_idle(self): 255 self._test_hosts_idle_helper(False) 256 257 258 def test_metahost_scheduling(self): 259 'Basic metahost scheduling' 260 self._test_basic_scheduling_helper(True) 261 262 263 def test_metahost_priorities(self): 264 self._test_priorities_helper(True) 265 266 267 def test_metahost_hosts_ready(self): 268 self._test_hosts_ready_helper(True) 269 270 271 def test_metahost_hosts_idle(self): 272 self._test_hosts_idle_helper(True) 273 274 275 def test_nonmetahost_over_metahost(self): 276 """ 277 Non-metahost entries should take priority over metahost entries 278 for the same host 279 """ 280 self._create_job(metahosts=[1]) 281 self._create_job(hosts=[1]) 282 self._dispatcher._schedule_new_jobs() 283 self._assert_job_scheduled_on(2, 1) 284 self._check_for_extra_schedulings() 285 286 287 def test_metahosts_obey_blocks(self): 288 """ 289 Metahosts can't get scheduled on hosts already scheduled for 290 that job. 291 """ 292 self._create_job(metahosts=[1], hosts=[1]) 293 # make the nonmetahost entry complete, so the metahost can try 294 # to get scheduled 295 self._do_query('UPDATE host_queue_entries SET complete = 1 ' 296 'WHERE host_id=1') 297 self._dispatcher._schedule_new_jobs() 298 self._check_for_extra_schedulings() 299 300 301 def test_metahosts_obey_ACLs(self): 302 "ACL-inaccessible hosts can't get scheduled for metahosts" 303 self._do_query('DELETE FROM acl_groups_hosts WHERE host_id=1') 304 self._create_job(metahosts=[1]) 305 self._dispatcher._schedule_new_jobs() 306 self._check_for_extra_schedulings() 307 308 309if __name__ == '__main__': 310 unittest.main() 311