monitor_db_unittest.py revision 04c82c5dc70a6de95f9cd77371d0a99cbdcf0959
1#!/usr/bin/python
2
3import unittest, time, subprocess
4import MySQLdb
5import common
6from autotest_lib.client.common_lib import global_config
7import monitor_db
8
9_DEBUG = False
10
11_TEST_DATA = """
12-- create a user and an ACL group
13INSERT INTO users (login) VALUES ('my_user');
14INSERT INTO acl_groups (name) VALUES ('my_acl');
15INSERT INTO acl_groups_users (user_id, acl_group_id) VALUES (1, 1);
16
17-- create some hosts
18INSERT INTO hosts (hostname) VALUES ('host1'), ('host2');
19-- add hosts to the ACL group
20INSERT INTO acl_groups_hosts (host_id, acl_group_id) VALUES
21  (1, 1), (2, 1);
22
23-- create a label for each host and one holding both
24INSERT INTO labels (name) VALUES ('label1'), ('label2');
25
26-- add hosts to labels
27INSERT INTO hosts_labels (host_id, label_id) VALUES
28  (1, 1), (2, 2);
29"""
30
31class Dummy(object):
32	'Dummy object that can have attribute assigned to it'
33
34class DispatcherTest(unittest.TestCase):
35	_jobs_scheduled = []
36	_job_counter = 0
37
38
39	def _read_db_info(self):
40		config = global_config.global_config
41		section = 'AUTOTEST_WEB'
42		self._host = config.get_config_value(section, "host")
43		self._db_name = config.get_config_value(section, "database")
44		self._user = config.get_config_value(section, "user")
45		self._password = config.get_config_value(section, "password")
46
47
48	def _connect_to_db(self, db_name=''):
49		self._con = MySQLdb.connect(host=self._host, user=self._user,
50					    passwd=self._password, db=db_name)
51		self._con.autocommit(True)
52		self._cur = self._con.cursor()
53
54
55	def _disconnect_from_db(self):
56		self._con.close()
57
58
59	def _do_query(self, sql):
60		if _DEBUG:
61			print 'SQL:', sql
62		self._cur.execute(sql)
63
64
65	def _do_queries(self, sql_queries):
66		for query in sql_queries.split(';'):
67			query = query.strip()
68			if query:
69				self._do_query(query)
70
71
72	def _get_db_schema(self):
73		command = 'mysqldump --no-data -u %s -p%s -h %s %s' % (
74		    self._user, self._password, self._host, self._db_name)
75		proc = subprocess.Popen(command, stdout=subprocess.PIPE,
76					shell=True)
77		return proc.communicate()[0]
78
79
80	def _open_test_db(self, schema):
81		self._db_name = 'test_' + self._db_name
82		self._connect_to_db()
83		self._do_query('CREATE DATABASE ' + self._db_name)
84		self._disconnect_from_db()
85		self._connect_to_db(self._db_name)
86		self._do_queries(schema)
87
88
89	def _close_test_db(self):
90		self._do_query('DROP DATABASE ' + self._db_name)
91		self._disconnect_from_db()
92
93
94	def _fill_in_test_data(self):
95		self._do_queries(_TEST_DATA)
96
97
98	def _set_monitor_stubs(self):
99		monitor_db._db = monitor_db.DatabaseConn()
100		monitor_db._db.connect(db_name=self._db_name)
101		def run_stub(hqe_self, assigned_host=None):
102			if hqe_self.meta_host:
103				host = assigned_host
104			else:
105				host = hqe_self.host
106			self._record_job_scheduled(hqe_self.job.id, host.id)
107			return Dummy()
108		monitor_db.HostQueueEntry.run = run_stub
109
110
111	def _record_job_scheduled(self, job_id, host_id):
112		record = (job_id, host_id)
113		self.assert_(record not in self._jobs_scheduled,
114			     'Job %d scheduled on host %d twice' %
115			     (job_id, host_id))
116		self._jobs_scheduled.append(record)
117
118
119	def _assert_job_scheduled_on(self, job_id, host_id):
120		record = (job_id, host_id)
121		self.assert_(record in self._jobs_scheduled,
122			     'Job %d not scheduled on host %d as expected' %
123			     (job_id, host_id))
124		self._jobs_scheduled.remove(record)
125
126
127	def _check_for_extra_schedulings(self):
128		if len(self._jobs_scheduled) != 0:
129			self.fail('Extra jobs scheduled: ' +
130				  str(self._jobs_scheduled))
131
132
133	def _create_job(self, hosts=[], metahosts=[], priority=0, active=0):
134		self._do_query('INSERT INTO jobs (name, priority) VALUES '
135			       '("test", %d)' % priority)
136		self._job_counter += 1
137		job_id = self._job_counter
138		queue_entry_sql = (
139		    'INSERT INTO host_queue_entries '
140		    '(job_id, priority, host_id, meta_host, active) '
141		    'VALUES (%d, %d, %%s, %%s, %d)' %
142		    (job_id, priority, active))
143		for host_id in hosts:
144			self._do_query(queue_entry_sql % (host_id, 'NULL'))
145			self._do_query('INSERT INTO ineligible_host_queues '
146				       '(job_id, host_id) VALUES (%d, %d)' %
147				       (job_id, host_id))
148		for label_id in metahosts:
149			self._do_query(queue_entry_sql % ('NULL', label_id))
150
151
152	def _create_job_simple(self, hosts, use_metahost=False,
153			      priority=0, active=0):
154		'An alternative interface to _create_job'
155		args = {'hosts' : [], 'metahosts' : []}
156		if use_metahost:
157			args['metahosts'] = hosts
158		else:
159			args['hosts'] = hosts
160		self._create_job(priority=priority, active=active, **args)
161
162
163	def _convert_jobs_to_metahosts(self, *job_ids):
164		sql_tuple = '(' + ','.join(str(i) for i in job_ids) + ')'
165		self._do_query('UPDATE host_queue_entries SET '
166			       'meta_host=host_id, host_id=NULL '
167			       'WHERE job_id IN ' + sql_tuple)
168
169
170	def _lock_host(self, host_id):
171		self._do_query('UPDATE hosts SET locked=1 WHERE id=' +
172			       str(host_id))
173
174
175	def setUp(self):
176		self._read_db_info()
177		schema = self._get_db_schema()
178		self._open_test_db(schema)
179		self._fill_in_test_data()
180		self._set_monitor_stubs()
181		self._dispatcher = monitor_db.Dispatcher()
182
183
184	def tearDown(self):
185		self._close_test_db()
186
187
188	def _test_basic_scheduling_helper(self, use_metahosts):
189		'Basic nonmetahost scheduling'
190		self._create_job_simple([1], use_metahosts)
191		self._create_job_simple([2], use_metahosts)
192		self._dispatcher._schedule_new_jobs()
193		self._assert_job_scheduled_on(1, 1)
194		self._assert_job_scheduled_on(2, 2)
195		self._check_for_extra_schedulings()
196
197
198	def _test_priorities_helper(self, use_metahosts):
199		'Test prioritization ordering'
200		self._create_job_simple([1], use_metahosts)
201		self._create_job_simple([2], use_metahosts)
202		self._create_job_simple([1,2], use_metahosts)
203		self._create_job_simple([1], use_metahosts, priority=1)
204		self._dispatcher._schedule_new_jobs()
205		self._assert_job_scheduled_on(4, 1) # higher priority
206		self._assert_job_scheduled_on(2, 2) # earlier job over later
207		self._check_for_extra_schedulings()
208
209
210	def _test_hosts_ready_helper(self, use_metahosts):
211		"""
212		Only hosts that are status=Ready, unlocked and not invalid get
213		scheduled.
214		"""
215		self._create_job_simple([1], use_metahosts)
216		self._do_query('UPDATE hosts SET status="Running" WHERE id=1')
217		self._dispatcher._schedule_new_jobs()
218		self._check_for_extra_schedulings()
219
220		self._do_query('UPDATE hosts SET status="Ready", locked=1 '
221			       'WHERE id=1')
222		self._dispatcher._schedule_new_jobs()
223		self._check_for_extra_schedulings()
224
225		self._do_query('UPDATE hosts SET locked=0, invalid=1 '
226			       'WHERE id=1')
227		self._dispatcher._schedule_new_jobs()
228		self._check_for_extra_schedulings()
229
230
231	def _test_hosts_idle_helper(self, use_metahosts):
232		'Only idle hosts get scheduled'
233		self._create_job(hosts=[1], active=1)
234		self._create_job_simple([1], use_metahosts)
235		self._dispatcher._schedule_new_jobs()
236		self._check_for_extra_schedulings()
237
238
239	def test_basic_scheduling(self):
240		self._test_basic_scheduling_helper(False)
241
242
243	def test_priorities(self):
244		self._test_priorities_helper(False)
245
246
247	def test_hosts_ready(self):
248		self._test_hosts_ready_helper(False)
249
250
251	def test_hosts_idle(self):
252		self._test_hosts_idle_helper(False)
253
254
255	def test_metahost_scheduling(self):
256		'Basic metahost scheduling'
257		self._test_basic_scheduling_helper(True)
258
259
260	def test_priorities(self):
261		self._test_priorities_helper(True)
262
263
264	def test_metahost_hosts_ready(self):
265		self._test_hosts_ready_helper(True)
266
267
268	def test_metahost_hosts_idle(self):
269		self._test_hosts_idle_helper(True)
270
271
272	def test_nonmetahost_over_metahost(self):
273		"""
274		Non-metahost entries should take priority over metahost entries
275		for the same host
276		"""
277		self._create_job(metahosts=[1])
278		self._create_job(hosts=[1])
279		self._dispatcher._schedule_new_jobs()
280		self._assert_job_scheduled_on(2, 1)
281		self._check_for_extra_schedulings()
282
283
284	def test_metahosts_obey_blocks(self):
285		"""
286		Metahosts can't get scheduled on hosts already scheduled for
287		that job.
288		"""
289		self._create_job(metahosts=[1], hosts=[1])
290		self._dispatcher._schedule_new_jobs()
291		self._assert_job_scheduled_on(1, 1)
292		self._check_for_extra_schedulings()
293
294
295	def test_metahosts_obey_ACLs(self):
296		"ACL-inaccessible hosts can't get scheduled for metahosts"
297		self._do_query('DELETE FROM acl_groups_hosts WHERE host_id=1')
298		self._create_job(metahosts=[1])
299		self._do_query('INSERT INTO ineligible_host_queues '
300			       '(job_id, host_id) VALUES (1, 1)')
301		self._dispatcher._schedule_new_jobs()
302		self._check_for_extra_schedulings()
303
304
305if __name__ == '__main__':
306	unittest.main()
307