monitor_db_unittest.py revision ce38e0c281b1046574b8112209944e9daf2c3641
1#!/usr/bin/python
2
3import unittest, time, subprocess
4import MySQLdb
5import common
6from autotest_lib.client.common_lib import global_config
7import monitor_db
8
9_DEBUG = False
10
11_TEST_DATA = """
12-- create a user and an ACL group
13INSERT INTO users (login) VALUES ('my_user');
14INSERT INTO acl_groups (name) VALUES ('my_acl');
15INSERT INTO acl_groups_users (user_id, acl_group_id) VALUES (1, 1);
16
17-- create some hosts
18INSERT INTO hosts (hostname) VALUES ('host1'), ('host2');
19-- add hosts to the ACL group
20INSERT INTO acl_groups_hosts (host_id, acl_group_id) VALUES
21  (1, 1), (2, 1);
22
23-- create a label for each host and one holding both
24INSERT INTO labels (name) VALUES ('label1'), ('label2');
25
26-- add hosts to labels
27INSERT INTO hosts_labels (host_id, label_id) VALUES
28  (1, 1), (2, 2);
29"""
30
31class DispatcherTest(unittest.TestCase):
32	_jobs_scheduled = []
33	_job_counter = 0
34
35
36	def _read_db_info(self):
37		config = global_config.global_config
38		section = 'AUTOTEST_WEB'
39		self._host = config.get_config_value(section, "host")
40		self._db_name = config.get_config_value(section, "database")
41		self._user = config.get_config_value(section, "user")
42		self._password = config.get_config_value(section, "password")
43
44
45	def _connect_to_db(self, db_name=''):
46		self._con = MySQLdb.connect(host=self._host, user=self._user,
47					    passwd=self._password, db=db_name)
48		self._con.autocommit(True)
49		self._cur = self._con.cursor()
50
51
52	def _disconnect_from_db(self):
53		self._con.close()
54
55
56	def _do_query(self, sql):
57		if _DEBUG:
58			print 'SQL:', sql
59		self._cur.execute(sql)
60
61
62	def _do_queries(self, sql_queries):
63		for query in sql_queries.split(';'):
64			query = query.strip()
65			if query:
66				self._do_query(query)
67
68
69	def _get_db_schema(self):
70		command = 'mysqldump --no-data -u %s -p%s -h %s %s' % (
71		    self._user, self._password, self._host, self._db_name)
72		proc = subprocess.Popen(command, stdout=subprocess.PIPE,
73					shell=True)
74		return proc.communicate()[0]
75
76
77	def _open_test_db(self, schema):
78		self._db_name = 'test_' + self._db_name
79		self._connect_to_db()
80		self._do_query('CREATE DATABASE ' + self._db_name)
81		self._disconnect_from_db()
82		self._connect_to_db(self._db_name)
83		self._do_queries(schema)
84
85
86	def _close_test_db(self):
87		self._do_query('DROP DATABASE ' + self._db_name)
88		self._disconnect_from_db()
89
90
91	def _fill_in_test_data(self):
92		self._do_queries(_TEST_DATA)
93
94
95	def _set_monitor_stubs(self):
96		monitor_db._db = monitor_db.DatabaseConn()
97		monitor_db._db.connect(db_name=self._db_name)
98		def run_stub(hqe_self, assigned_host=None):
99			if hqe_self.meta_host:
100				host = assigned_host
101			else:
102				host = hqe_self.host
103			self._record_job_scheduled(hqe_self.job.id, host.id)
104		monitor_db.HostQueueEntry.run = run_stub
105
106
107	def _record_job_scheduled(self, job_id, host_id):
108		record = (job_id, host_id)
109		self.assert_(record not in self._jobs_scheduled,
110			     'Job %d scheduled on host %d twice' %
111			     (job_id, host_id))
112		self._jobs_scheduled.append(record)
113
114
115	def _assert_job_scheduled_on(self, job_id, host_id):
116		record = (job_id, host_id)
117		self.assert_(record in self._jobs_scheduled,
118			     'Job %d not scheduled on host %d as expected' %
119			     (job_id, host_id))
120		self._jobs_scheduled.remove(record)
121
122
123	def _check_for_extra_schedulings(self):
124		if len(self._jobs_scheduled) != 0:
125			self.fail('Extra jobs scheduled: ' +
126				  str(self._jobs_scheduled))
127
128
129	def _create_job(self, hosts=[], metahosts=[], priority=0, active=0):
130		self._do_query('INSERT INTO jobs (name, priority) VALUES '
131			       '("test", %d)' % priority)
132		self._job_counter += 1
133		job_id = self._job_counter
134		queue_entry_sql = (
135		    'INSERT INTO host_queue_entries '
136		    '(job_id, priority, host_id, meta_host, active) '
137		    'VALUES (%d, %d, %%s, %%s, %d)' %
138		    (job_id, priority, active))
139		for host_id in hosts:
140			self._do_query(queue_entry_sql % (host_id, 'NULL'))
141			self._do_query('INSERT INTO ineligible_host_queues '
142				       '(job_id, host_id) VALUES (%d, %d)' %
143				       (job_id, host_id))
144		for label_id in metahosts:
145			self._do_query(queue_entry_sql % ('NULL', label_id))
146
147
148	def _create_job_simple(self, hosts, use_metahost=False,
149			      priority=0, active=0):
150		'An alternative interface to _create_job'
151		args = {'hosts' : [], 'metahosts' : []}
152		if use_metahost:
153			args['metahosts'] = hosts
154		else:
155			args['hosts'] = hosts
156		self._create_job(priority=priority, active=active, **args)
157
158
159	def _convert_jobs_to_metahosts(self, *job_ids):
160		sql_tuple = '(' + ','.join(str(i) for i in job_ids) + ')'
161		self._do_query('UPDATE host_queue_entries SET '
162			       'meta_host=host_id, host_id=NULL '
163			       'WHERE job_id IN ' + sql_tuple)
164
165
166	def _lock_host(self, host_id):
167		self._do_query('UPDATE hosts SET locked=1 WHERE id=' +
168			       str(host_id))
169
170
171	def setUp(self):
172		self._read_db_info()
173		schema = self._get_db_schema()
174		self._open_test_db(schema)
175		self._fill_in_test_data()
176		self._set_monitor_stubs()
177		self._dispatcher = monitor_db.Dispatcher()
178
179
180	def tearDown(self):
181		self._close_test_db()
182
183
184	def _test_basic_scheduling_helper(self, use_metahosts):
185		'Basic nonmetahost scheduling'
186		self._create_job_simple([1], use_metahosts)
187		self._create_job_simple([2], use_metahosts)
188		self._dispatcher._find_more_work()
189		self._assert_job_scheduled_on(1, 1)
190		self._assert_job_scheduled_on(2, 2)
191		self._check_for_extra_schedulings()
192
193
194	def _test_priorities_helper(self, use_metahosts):
195		'Test prioritization ordering'
196		self._create_job_simple([1], use_metahosts)
197		self._create_job_simple([2], use_metahosts)
198		self._create_job_simple([1,2], use_metahosts)
199		self._create_job_simple([1], use_metahosts, priority=1)
200		self._dispatcher._find_more_work()
201		self._assert_job_scheduled_on(4, 1) # higher priority
202		self._assert_job_scheduled_on(2, 2) # earlier job over later
203		self._check_for_extra_schedulings()
204
205
206	def _test_hosts_ready_helper(self, use_metahosts):
207		"""
208		Only hosts that are status=Ready, unlocked and not invalid get
209		scheduled.
210		"""
211		self._create_job_simple([1], use_metahosts)
212		self._do_query('UPDATE hosts SET status="Running" WHERE id=1')
213		self._dispatcher._find_more_work()
214		self._check_for_extra_schedulings()
215
216		self._do_query('UPDATE hosts SET status="Ready", locked=1 '
217			       'WHERE id=1')
218		self._dispatcher._find_more_work()
219		self._check_for_extra_schedulings()
220
221		self._do_query('UPDATE hosts SET locked=0, invalid=1 '
222			       'WHERE id=1')
223		self._dispatcher._find_more_work()
224		self._check_for_extra_schedulings()
225
226
227	def _test_hosts_idle_helper(self, use_metahosts):
228		'Only idle hosts get scheduled'
229		self._create_job(hosts=[1], active=1)
230		self._create_job_simple([1], use_metahosts)
231		self._dispatcher._find_more_work()
232		self._check_for_extra_schedulings()
233
234
235	def test_basic_scheduling(self):
236		self._test_basic_scheduling_helper(False)
237
238
239	def test_priorities(self):
240		self._test_priorities_helper(False)
241
242
243	def test_hosts_ready(self):
244		self._test_hosts_ready_helper(False)
245
246
247	def test_hosts_idle(self):
248		self._test_hosts_idle_helper(False)
249
250
251	def test_metahost_scheduling(self):
252		'Basic metahost scheduling'
253		self._test_basic_scheduling_helper(True)
254
255
256	def test_priorities(self):
257		self._test_priorities_helper(True)
258
259
260	def test_metahost_hosts_ready(self):
261		self._test_hosts_ready_helper(True)
262
263
264	def test_metahost_hosts_idle(self):
265		self._test_hosts_idle_helper(True)
266
267
268	def test_nonmetahost_over_metahost(self):
269		"""
270		Non-metahost entries should take priority over metahost entries
271		for the same host
272		"""
273		self._create_job(metahosts=[1])
274		self._create_job(hosts=[1])
275		self._dispatcher._find_more_work()
276		self._assert_job_scheduled_on(2, 1)
277		self._check_for_extra_schedulings()
278
279
280	def test_metahosts_obey_blocks(self):
281		"""
282		Metahosts can't get scheduled on hosts already scheduled for
283		that job.
284		"""
285		self._create_job(metahosts=[1], hosts=[1])
286		self._dispatcher._find_more_work()
287		self._assert_job_scheduled_on(1, 1)
288		self._check_for_extra_schedulings()
289
290
291	def test_metahosts_obey_ACLs(self):
292		"ACL-inaccessible hosts can't get scheduled for metahosts"
293		self._do_query('DELETE FROM acl_groups_hosts WHERE host_id=1')
294		self._create_job(metahosts=[1])
295		self._do_query('INSERT INTO ineligible_host_queues '
296			       '(job_id, host_id) VALUES (1, 1)')
297		self._dispatcher._find_more_work()
298		self._check_for_extra_schedulings()
299
300
301if __name__ == '__main__':
302	unittest.main()
303