monitor_db_unittest.py revision 20f47064be6855277e251cee7611d8336bcc9149
1#!/usr/bin/python
2
3import unittest, time, subprocess
4import MySQLdb
5import common
6from autotest_lib.client.common_lib import global_config
7import monitor_db
8
9_DEBUG = False
10
11_TEST_DATA = """
12-- create a user and an ACL group
13INSERT INTO users (login) VALUES ('my_user');
14INSERT INTO acl_groups (name) VALUES ('my_acl');
15INSERT INTO acl_groups_users (user_id, acl_group_id) VALUES (1, 1);
16
17-- create some hosts
18INSERT INTO hosts (hostname) VALUES ('host1'), ('host2');
19-- add hosts to the ACL group
20INSERT INTO acl_groups_hosts (host_id, acl_group_id) VALUES
21  (1, 1), (2, 1);
22
23-- create a label for each host and one holding both
24INSERT INTO labels (name) VALUES ('label1'), ('label2');
25
26-- add hosts to labels
27INSERT INTO hosts_labels (host_id, label_id) VALUES
28  (1, 1), (2, 2);
29"""
30
31class Dummy(object):
32	'Dummy object that can have attribute assigned to it'
33
34class DispatcherTest(unittest.TestCase):
35	_jobs_scheduled = []
36	_job_counter = 0
37
38
39	def _read_db_info(self):
40		config = global_config.global_config
41		section = 'AUTOTEST_WEB'
42		self._host = config.get_config_value(section, "host")
43		self._db_name = config.get_config_value(section, "database")
44		self._user = config.get_config_value(section, "user")
45		self._password = config.get_config_value(section, "password")
46
47
48	def _connect_to_db(self, db_name=''):
49		self._con = MySQLdb.connect(host=self._host, user=self._user,
50					    passwd=self._password, db=db_name)
51		self._con.autocommit(True)
52		self._cur = self._con.cursor()
53
54
55	def _disconnect_from_db(self):
56		self._con.close()
57
58
59	def _do_query(self, sql):
60		if _DEBUG:
61			print 'SQL:', sql
62		self._cur.execute(sql)
63
64
65	def _do_queries(self, sql_queries):
66		for query in sql_queries.split(';'):
67			query = query.strip()
68			if query:
69				self._do_query(query)
70
71
72	def _get_db_schema(self):
73		command = 'mysqldump --no-data -u %s -p%s -h %s %s' % (
74		    self._user, self._password, self._host, self._db_name)
75		proc = subprocess.Popen(command, stdout=subprocess.PIPE,
76					shell=True)
77		return proc.communicate()[0]
78
79
80	def _open_test_db(self, schema):
81		self._db_name = 'test_' + self._db_name
82		self._connect_to_db()
83		self._do_query('CREATE DATABASE ' + self._db_name)
84		self._disconnect_from_db()
85		self._connect_to_db(self._db_name)
86		self._do_queries(schema)
87
88
89	def _close_test_db(self):
90		self._do_query('DROP DATABASE ' + self._db_name)
91		self._disconnect_from_db()
92
93
94	def _fill_in_test_data(self):
95		self._do_queries(_TEST_DATA)
96
97
98	def _set_monitor_stubs(self):
99		monitor_db._db = monitor_db.DatabaseConn()
100		monitor_db._db.connect(db_name=self._db_name)
101		def run_stub(hqe_self, assigned_host=None):
102			if hqe_self.meta_host:
103				host = assigned_host
104			else:
105				host = hqe_self.host
106			self._record_job_scheduled(hqe_self.job.id, host.id)
107			return Dummy()
108		monitor_db.HostQueueEntry.run = run_stub
109
110
111	def _record_job_scheduled(self, job_id, host_id):
112		record = (job_id, host_id)
113		self.assert_(record not in self._jobs_scheduled,
114			     'Job %d scheduled on host %d twice' %
115			     (job_id, host_id))
116		self._jobs_scheduled.append(record)
117
118
119	def _assert_job_scheduled_on(self, job_id, host_id):
120		record = (job_id, host_id)
121		self.assert_(record in self._jobs_scheduled,
122			     'Job %d not scheduled on host %d as expected\n'
123			     'Jobs scheduled: %s' %
124			     (job_id, host_id, self._jobs_scheduled))
125		self._jobs_scheduled.remove(record)
126
127
128	def _check_for_extra_schedulings(self):
129		if len(self._jobs_scheduled) != 0:
130			self.fail('Extra jobs scheduled: ' +
131				  str(self._jobs_scheduled))
132
133
134	def _create_job(self, hosts=[], metahosts=[], priority=0, active=0):
135		self._do_query('INSERT INTO jobs (name, owner, priority) '
136			       'VALUES ("test", "my_user", %d)' % priority)
137		self._job_counter += 1
138		job_id = self._job_counter
139		queue_entry_sql = (
140		    'INSERT INTO host_queue_entries '
141		    '(job_id, priority, host_id, meta_host, active) '
142		    'VALUES (%d, %d, %%s, %%s, %d)' %
143		    (job_id, priority, active))
144		for host_id in hosts:
145			self._do_query(queue_entry_sql % (host_id, 'NULL'))
146			self._do_query('INSERT INTO ineligible_host_queues '
147				       '(job_id, host_id) VALUES (%d, %d)' %
148				       (job_id, host_id))
149		for label_id in metahosts:
150			self._do_query(queue_entry_sql % ('NULL', label_id))
151
152
153	def _create_job_simple(self, hosts, use_metahost=False,
154			      priority=0, active=0):
155		'An alternative interface to _create_job'
156		args = {'hosts' : [], 'metahosts' : []}
157		if use_metahost:
158			args['metahosts'] = hosts
159		else:
160			args['hosts'] = hosts
161		self._create_job(priority=priority, active=active, **args)
162
163
164	def _convert_jobs_to_metahosts(self, *job_ids):
165		sql_tuple = '(' + ','.join(str(i) for i in job_ids) + ')'
166		self._do_query('UPDATE host_queue_entries SET '
167			       'meta_host=host_id, host_id=NULL '
168			       'WHERE job_id IN ' + sql_tuple)
169
170
171	def _lock_host(self, host_id):
172		self._do_query('UPDATE hosts SET locked=1 WHERE id=' +
173			       str(host_id))
174
175
176	def setUp(self):
177		self._read_db_info()
178		schema = self._get_db_schema()
179		self._open_test_db(schema)
180		self._fill_in_test_data()
181		self._set_monitor_stubs()
182		self._dispatcher = monitor_db.Dispatcher()
183		self._jobs_scheduled = []
184		self._job_counter = 0
185
186
187	def tearDown(self):
188		self._close_test_db()
189
190
191	def _test_basic_scheduling_helper(self, use_metahosts):
192		'Basic nonmetahost scheduling'
193		self._create_job_simple([1], use_metahosts)
194		self._create_job_simple([2], use_metahosts)
195		self._dispatcher._schedule_new_jobs()
196		self._assert_job_scheduled_on(1, 1)
197		self._assert_job_scheduled_on(2, 2)
198		self._check_for_extra_schedulings()
199
200
201	def _test_priorities_helper(self, use_metahosts):
202		'Test prioritization ordering'
203		self._create_job_simple([1], use_metahosts)
204		self._create_job_simple([2], use_metahosts)
205		self._create_job_simple([1,2], use_metahosts)
206		self._create_job_simple([1], use_metahosts, priority=1)
207		self._dispatcher._schedule_new_jobs()
208		self._assert_job_scheduled_on(4, 1) # higher priority
209		self._assert_job_scheduled_on(2, 2) # earlier job over later
210		self._check_for_extra_schedulings()
211
212
213	def _test_hosts_ready_helper(self, use_metahosts):
214		"""
215		Only hosts that are status=Ready, unlocked and not invalid get
216		scheduled.
217		"""
218		self._create_job_simple([1], use_metahosts)
219		self._do_query('UPDATE hosts SET status="Running" WHERE id=1')
220		self._dispatcher._schedule_new_jobs()
221		self._check_for_extra_schedulings()
222
223		self._do_query('UPDATE hosts SET status="Ready", locked=1 '
224			       'WHERE id=1')
225		self._dispatcher._schedule_new_jobs()
226		self._check_for_extra_schedulings()
227
228		self._do_query('UPDATE hosts SET locked=0, invalid=1 '
229			       'WHERE id=1')
230		self._dispatcher._schedule_new_jobs()
231		self._check_for_extra_schedulings()
232
233
234	def _test_hosts_idle_helper(self, use_metahosts):
235		'Only idle hosts get scheduled'
236		self._create_job(hosts=[1], active=1)
237		self._create_job_simple([1], use_metahosts)
238		self._dispatcher._schedule_new_jobs()
239		self._check_for_extra_schedulings()
240
241
242	def test_basic_scheduling(self):
243		self._test_basic_scheduling_helper(False)
244
245
246	def test_priorities(self):
247		self._test_priorities_helper(False)
248
249
250	def test_hosts_ready(self):
251		self._test_hosts_ready_helper(False)
252
253
254	def test_hosts_idle(self):
255		self._test_hosts_idle_helper(False)
256
257
258	def test_metahost_scheduling(self):
259		'Basic metahost scheduling'
260		self._test_basic_scheduling_helper(True)
261
262
263	def test_metahost_priorities(self):
264		self._test_priorities_helper(True)
265
266
267	def test_metahost_hosts_ready(self):
268		self._test_hosts_ready_helper(True)
269
270
271	def test_metahost_hosts_idle(self):
272		self._test_hosts_idle_helper(True)
273
274
275	def test_nonmetahost_over_metahost(self):
276		"""
277		Non-metahost entries should take priority over metahost entries
278		for the same host
279		"""
280		self._create_job(metahosts=[1])
281		self._create_job(hosts=[1])
282		self._dispatcher._schedule_new_jobs()
283		self._assert_job_scheduled_on(2, 1)
284		self._check_for_extra_schedulings()
285
286
287	def test_metahosts_obey_blocks(self):
288		"""
289		Metahosts can't get scheduled on hosts already scheduled for
290		that job.
291		"""
292		self._create_job(metahosts=[1], hosts=[1])
293		# make the nonmetahost entry complete, so the metahost can try
294		# to get scheduled
295		self._do_query('UPDATE host_queue_entries SET complete = 1 '
296		               'WHERE host_id=1')
297		self._dispatcher._schedule_new_jobs()
298		self._check_for_extra_schedulings()
299
300
301	def test_metahosts_obey_ACLs(self):
302		"ACL-inaccessible hosts can't get scheduled for metahosts"
303		self._do_query('DELETE FROM acl_groups_hosts WHERE host_id=1')
304		self._create_job(metahosts=[1])
305		self._dispatcher._schedule_new_jobs()
306		self._check_for_extra_schedulings()
307
308
309if __name__ == '__main__':
310	unittest.main()
311