1#!/usr/bin/python
2#pylint: disable-msg=C0111
3
4import gc, time
5import common
6from autotest_lib.frontend import setup_django_environment
7from autotest_lib.frontend.afe import frontend_test_utils
8from autotest_lib.client.common_lib.test_utils import mock
9from autotest_lib.client.common_lib.test_utils import unittest
10from autotest_lib.database import database_connection
11from autotest_lib.frontend.afe import models
12from autotest_lib.scheduler import agent_task
13from autotest_lib.scheduler import monitor_db, drone_manager, email_manager
14from autotest_lib.scheduler import pidfile_monitor
15from autotest_lib.scheduler import scheduler_config, gc_stats, host_scheduler
16from autotest_lib.scheduler import monitor_db_functional_test
17from autotest_lib.scheduler import monitor_db_unittest
18from autotest_lib.scheduler import scheduler_models
19
20_DEBUG = False
21
22
23class AtomicGroupTest(monitor_db_unittest.DispatcherSchedulingTest):
24
25    def test_atomic_group_hosts_blocked_from_non_atomic_jobs(self):
26        # Create a job scheduled to run on label6.
27        self._create_job(metahosts=[self.label6.id])
28        self._run_scheduler()
29        # label6 only has hosts that are in atomic groups associated with it,
30        # there should be no scheduling.
31        self._check_for_extra_schedulings()
32
33
34    def test_atomic_group_hosts_blocked_from_non_atomic_jobs_explicit(self):
35        # Create a job scheduled to run on label5.  This is an atomic group
36        # label but this job does not request atomic group scheduling.
37        self._create_job(metahosts=[self.label5.id])
38        self._run_scheduler()
39        # label6 only has hosts that are in atomic groups associated with it,
40        # there should be no scheduling.
41        self._check_for_extra_schedulings()
42
43
44    def test_atomic_group_scheduling_basics(self):
45        # Create jobs scheduled to run on an atomic group.
46        job_a = self._create_job(synchronous=True, metahosts=[self.label4.id],
47                         atomic_group=1)
48        job_b = self._create_job(synchronous=True, metahosts=[self.label5.id],
49                         atomic_group=1)
50        self._run_scheduler()
51        # atomic_group.max_number_of_machines was 2 so we should run on 2.
52        self._assert_job_scheduled_on_number_of(job_a.id, (5, 6, 7), 2)
53        self._assert_job_scheduled_on(job_b.id, 8)  # label5
54        self._assert_job_scheduled_on(job_b.id, 9)  # label5
55        self._check_for_extra_schedulings()
56
57        # The three host label4 atomic group still has one host available.
58        # That means a job with a synch_count of 1 asking to be scheduled on
59        # the atomic group can still use the final machine.
60        #
61        # This may seem like a somewhat odd use case.  It allows the use of an
62        # atomic group as a set of machines to run smaller jobs within (a set
63        # of hosts configured for use in network tests with eachother perhaps?)
64        onehost_job = self._create_job(atomic_group=1)
65        self._run_scheduler()
66        self._assert_job_scheduled_on_number_of(onehost_job.id, (5, 6, 7), 1)
67        self._check_for_extra_schedulings()
68
69        # No more atomic groups have hosts available, no more jobs should
70        # be scheduled.
71        self._create_job(atomic_group=1)
72        self._run_scheduler()
73        self._check_for_extra_schedulings()
74
75
76    def test_atomic_group_scheduling_obeys_acls(self):
77        # Request scheduling on a specific atomic label but be denied by ACLs.
78        self._do_query('DELETE FROM afe_acl_groups_hosts '
79                       'WHERE host_id in (8,9)')
80        job = self._create_job(metahosts=[self.label5.id], atomic_group=1)
81        self._run_scheduler()
82        self._check_for_extra_schedulings()
83
84
85    def test_atomic_group_scheduling_dependency_label_exclude(self):
86        # A dependency label that matches no hosts in the atomic group.
87        job_a = self._create_job(atomic_group=1)
88        job_a.dependency_labels.add(self.label3)
89        self._run_scheduler()
90        self._check_for_extra_schedulings()
91
92
93    def test_atomic_group_scheduling_metahost_dependency_label_exclude(self):
94        # A metahost and dependency label that excludes too many hosts.
95        job_b = self._create_job(synchronous=True, metahosts=[self.label4.id],
96                                 atomic_group=1)
97        job_b.dependency_labels.add(self.label7)
98        self._run_scheduler()
99        self._check_for_extra_schedulings()
100
101
102    def test_atomic_group_scheduling_dependency_label_match(self):
103        # A dependency label that exists on enough atomic group hosts in only
104        # one of the two atomic group labels.
105        job_c = self._create_job(synchronous=True, atomic_group=1)
106        job_c.dependency_labels.add(self.label7)
107        self._run_scheduler()
108        self._assert_job_scheduled_on_number_of(job_c.id, (8, 9), 2)
109        self._check_for_extra_schedulings()
110
111
112    def test_atomic_group_scheduling_no_metahost(self):
113        # Force it to schedule on the other group for a reliable test.
114        self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=9')
115        # An atomic job without a metahost.
116        job = self._create_job(synchronous=True, atomic_group=1)
117        self._run_scheduler()
118        self._assert_job_scheduled_on_number_of(job.id, (5, 6, 7), 2)
119        self._check_for_extra_schedulings()
120
121
122    def test_atomic_group_scheduling_partial_group(self):
123        # Make one host in labels[3] unavailable so that there are only two
124        # hosts left in the group.
125        self._do_query('UPDATE afe_hosts SET status="Repair Failed" WHERE id=5')
126        job = self._create_job(synchronous=True, metahosts=[self.label4.id],
127                         atomic_group=1)
128        self._run_scheduler()
129        # Verify that it was scheduled on the 2 ready hosts in that group.
130        self._assert_job_scheduled_on(job.id, 6)
131        self._assert_job_scheduled_on(job.id, 7)
132        self._check_for_extra_schedulings()
133
134
135    def test_atomic_group_scheduling_not_enough_available(self):
136        # Mark some hosts in each atomic group label as not usable.
137        # One host running, another invalid in the first group label.
138        self._do_query('UPDATE afe_hosts SET status="Running" WHERE id=5')
139        self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=6')
140        # One host invalid in the second group label.
141        self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=9')
142        # Nothing to schedule when no group label has enough (2) good hosts..
143        self._create_job(atomic_group=1, synchronous=True)
144        self._run_scheduler()
145        # There are not enough hosts in either atomic group,
146        # No more scheduling should occur.
147        self._check_for_extra_schedulings()
148
149        # Now create an atomic job that has a synch count of 1.  It should
150        # schedule on exactly one of the hosts.
151        onehost_job = self._create_job(atomic_group=1)
152        self._run_scheduler()
153        self._assert_job_scheduled_on_number_of(onehost_job.id, (7, 8), 1)
154
155
156    def test_atomic_group_scheduling_no_valid_hosts(self):
157        self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id in (8,9)')
158        self._create_job(synchronous=True, metahosts=[self.label5.id],
159                         atomic_group=1)
160        self._run_scheduler()
161        # no hosts in the selected group and label are valid.  no schedulings.
162        self._check_for_extra_schedulings()
163
164
165    def test_atomic_group_scheduling_metahost_works(self):
166        # Test that atomic group scheduling also obeys metahosts.
167        self._create_job(metahosts=[0], atomic_group=1)
168        self._run_scheduler()
169        # There are no atomic group hosts that also have that metahost.
170        self._check_for_extra_schedulings()
171
172        job_b = self._create_job(metahosts=[self.label5.id], atomic_group=1)
173        self._run_scheduler()
174        self._assert_job_scheduled_on(job_b.id, 8)
175        self._assert_job_scheduled_on(job_b.id, 9)
176        self._check_for_extra_schedulings()
177
178
179    def test_atomic_group_skips_ineligible_hosts(self):
180        # Test hosts marked ineligible for this job are not eligible.
181        # How would this ever happen anyways?
182        job = self._create_job(metahosts=[self.label4.id], atomic_group=1)
183        models.IneligibleHostQueue.objects.create(job=job, host_id=5)
184        models.IneligibleHostQueue.objects.create(job=job, host_id=6)
185        models.IneligibleHostQueue.objects.create(job=job, host_id=7)
186        self._run_scheduler()
187        # No scheduling should occur as all desired hosts were ineligible.
188        self._check_for_extra_schedulings()
189
190
191    def test_atomic_group_scheduling_fail(self):
192        # If synch_count is > the atomic group number of machines, the job
193        # should be aborted immediately.
194        model_job = self._create_job(synchronous=True, atomic_group=1)
195        model_job.synch_count = 4
196        model_job.save()
197        job = scheduler_models.Job(id=model_job.id)
198        self._run_scheduler()
199        self._check_for_extra_schedulings()
200        queue_entries = job.get_host_queue_entries()
201        self.assertEqual(1, len(queue_entries))
202        self.assertEqual(queue_entries[0].status,
203                         models.HostQueueEntry.Status.ABORTED)
204
205
206    def test_atomic_group_no_labels_no_scheduling(self):
207        # Never schedule on atomic groups marked invalid.
208        job = self._create_job(metahosts=[self.label5.id], synchronous=True,
209                               atomic_group=1)
210        # Deleting an atomic group via the frontend marks it invalid and
211        # removes all label references to the group.  The job now references
212        # an invalid atomic group with no labels associated with it.
213        self.label5.atomic_group.invalid = True
214        self.label5.atomic_group.save()
215        self.label5.atomic_group = None
216        self.label5.save()
217
218        self._run_scheduler()
219        self._check_for_extra_schedulings()
220
221
222    def test_schedule_directly_on_atomic_group_host_fail(self):
223        # Scheduling a job directly on hosts in an atomic group must
224        # fail to avoid users inadvertently holding up the use of an
225        # entire atomic group by using the machines individually.
226        job = self._create_job(hosts=[5])
227        self._run_scheduler()
228        self._check_for_extra_schedulings()
229
230
231    def test_schedule_directly_on_atomic_group_host(self):
232        # Scheduling a job directly on one host in an atomic group will
233        # work when the atomic group is listed on the HQE in addition
234        # to the host (assuming the sync count is 1).
235        job = self._create_job(hosts=[5], atomic_group=1)
236        self._run_scheduler()
237        self._assert_job_scheduled_on(job.id, 5)
238        self._check_for_extra_schedulings()
239
240
241    def test_schedule_directly_on_atomic_group_hosts_sync2(self):
242        job = self._create_job(hosts=[5,8], atomic_group=1, synchronous=True)
243        self._run_scheduler()
244        self._assert_job_scheduled_on(job.id, 5)
245        self._assert_job_scheduled_on(job.id, 8)
246        self._check_for_extra_schedulings()
247
248
249    def test_schedule_directly_on_atomic_group_hosts_wrong_group(self):
250        job = self._create_job(hosts=[5,8], atomic_group=2, synchronous=True)
251        self._run_scheduler()
252        self._check_for_extra_schedulings()
253
254
255    # TODO(gps): These should probably live in their own TestCase class
256    # specific to testing HostScheduler methods directly.  It was convenient
257    # to put it here for now to share existing test environment setup code.
258    def test_HostScheduler_check_atomic_group_labels(self):
259        normal_job = self._create_job(metahosts=[0])
260        atomic_job = self._create_job(atomic_group=1)
261        # Indirectly initialize the internal state of the host scheduler.
262        self._dispatcher._refresh_pending_queue_entries()
263
264        atomic_hqe = scheduler_models.HostQueueEntry.fetch(where='job_id=%d' %
265                                                     atomic_job.id)[0]
266        normal_hqe = scheduler_models.HostQueueEntry.fetch(where='job_id=%d' %
267                                                     normal_job.id)[0]
268
269        host_scheduler = self._dispatcher._host_scheduler
270        self.assertTrue(host_scheduler._check_atomic_group_labels(
271                [self.label4.id], atomic_hqe))
272        self.assertFalse(host_scheduler._check_atomic_group_labels(
273                [self.label4.id], normal_hqe))
274        self.assertFalse(host_scheduler._check_atomic_group_labels(
275                [self.label5.id, self.label6.id, self.label7.id], normal_hqe))
276        self.assertTrue(host_scheduler._check_atomic_group_labels(
277                [self.label4.id, self.label6.id], atomic_hqe))
278        self.assertTrue(host_scheduler._check_atomic_group_labels(
279                        [self.label4.id, self.label5.id],
280                        atomic_hqe))
281
282
283class OnlyIfNeededTest(monitor_db_unittest.DispatcherSchedulingTest):
284
285    def _setup_test_only_if_needed_labels(self):
286        # apply only_if_needed label3 to host1
287        models.Host.smart_get('host1').labels.add(self.label3)
288        return self._create_job_simple([1], use_metahost=True)
289
290
291    def test_only_if_needed_labels_avoids_host(self):
292        job = self._setup_test_only_if_needed_labels()
293        # if the job doesn't depend on label3, there should be no scheduling
294        self._run_scheduler()
295        self._check_for_extra_schedulings()
296
297
298    def test_only_if_needed_labels_schedules(self):
299        job = self._setup_test_only_if_needed_labels()
300        job.dependency_labels.add(self.label3)
301        self._run_scheduler()
302        self._assert_job_scheduled_on(1, 1)
303        self._check_for_extra_schedulings()
304
305
306    def test_only_if_needed_labels_via_metahost(self):
307        job = self._setup_test_only_if_needed_labels()
308        job.dependency_labels.add(self.label3)
309        # should also work if the metahost is the only_if_needed label
310        self._do_query('DELETE FROM afe_jobs_dependency_labels')
311        self._create_job(metahosts=[3])
312        self._run_scheduler()
313        self._assert_job_scheduled_on(2, 1)
314        self._check_for_extra_schedulings()
315
316
317    def test_metahosts_obey_blocks(self):
318        """
319        Metahosts can't get scheduled on hosts already scheduled for
320        that job.
321        """
322        self._create_job(metahosts=[1], hosts=[1])
323        # make the nonmetahost entry complete, so the metahost can try
324        # to get scheduled
325        self._update_hqe(set='complete = 1', where='host_id=1')
326        self._run_scheduler()
327        self._check_for_extra_schedulings()
328
329
330