1#    Copyright 2015-2016 ARM Limited
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15
16"""
17:mod:`bart.sched.SchedAssert` provides ability to assert scheduler behaviour.
18The analysis is based on TRAPpy's statistics framework and is potent enough
19to aggregate statistics over processor hierarchies.
20"""
21
22import trappy
23import itertools
24import math
25from trappy.stats.Aggregator import MultiTriggerAggregator
26from bart.sched import functions as sched_funcs
27from bart.common import Utils
28import numpy as np
29
30# pylint: disable=invalid-name
31# pylint: disable=too-many-arguments
32class SchedAssert(object):
33
34    """The primary focus of this class is to assert and verify
35    predefined scheduler scenarios. This does not compare parameters
36    across runs
37
38    :param ftrace: A single trappy.FTrace object
39        or a path that can be passed to trappy.FTrace
40    :type ftrace: :mod:`trappy.ftrace.FTrace`
41
42    :param topology: A topology that describes the arrangement of
43        CPU's on a system. This is useful for multi-cluster systems
44        where data needs to be aggregated at different topological
45        levels
46    :type topology: :mod:`trappy.stats.Topology.Topology`
47
48    :param execname: The execname of the task to be analysed
49
50        .. note::
51
52                There should be only one PID that maps to the specified
53                execname. If there are multiple PIDs :mod:`bart.sched.SchedMultiAssert`
54                should be used
55
56    :type execname: str
57
58    :param pid: The process ID of the task to be analysed
59    :type pid: int
60
61    .. note:
62
63        One of pid or execname is mandatory. If only execname
64        is specified, The current implementation will fail if
65        there are more than one processes with the same execname
66    """
67
68    def __init__(self, ftrace, topology, execname=None, pid=None):
69
70        ftrace = Utils.init_ftrace(ftrace)
71
72        if not execname and not pid:
73            raise ValueError("Need to specify at least one of pid or execname")
74
75        self.execname = execname
76        self._ftrace = ftrace
77        self._pid = self._validate_pid(pid)
78        self._aggs = {}
79        self._topology = topology
80        self._triggers = sched_funcs.sched_triggers(self._ftrace, self._pid,
81                                              trappy.sched.SchedSwitch)
82        self.name = "{}-{}".format(self.execname, self._pid)
83
84    def _validate_pid(self, pid):
85        """Validate the passed pid argument"""
86
87        if not pid:
88            pids = sched_funcs.get_pids_for_process(self._ftrace,
89                                              self.execname)
90
91            if len(pids) != 1:
92                raise RuntimeError(
93                    "There should be exactly one PID {0} for {1}".format(
94                        pids,
95                        self.execname))
96
97            return pids[0]
98
99        elif self.execname:
100
101            pids = sched_funcs.get_pids_for_process(self._ftrace,
102                                              self.execname)
103            if pid not in pids:
104                raise RuntimeError(
105                    "PID {0} not mapped to {1}".format(
106                        pid,
107                        self.execname))
108        else:
109            self.execname = sched_funcs.get_task_name(self._ftrace, pid)
110
111        return pid
112
113    def _aggregator(self, aggfunc):
114        """
115        Return an aggregator corresponding to the
116        aggfunc, the aggregators are memoized for performance
117
118        :param aggfunc: Function parameter that
119            accepts a :mod:`pandas.Series` object and
120            returns a vector/scalar
121
122        :type: function(:mod:`pandas.Series`)
123        """
124
125        if aggfunc not in self._aggs.keys():
126            self._aggs[aggfunc] = MultiTriggerAggregator(self._triggers,
127                                                         self._topology,
128                                                         aggfunc)
129        return self._aggs[aggfunc]
130
131    def getResidency(self, level, node, window=None, percent=False):
132        """
133        Residency of the task is the amount of time it spends executing
134        a particular group of a topological level. For example:
135        ::
136
137            from trappy.stats.Topology import Topology
138
139            big = [1, 2]
140            little = [0, 3, 4, 5]
141
142            topology = Topology(clusters=[little, big])
143
144            s = SchedAssert(trace, topology, pid=123)
145            s.getResidency("cluster", big)
146
147        This will return the residency of the task on the big cluster. If
148        percent is specified it will be normalized to the total runtime
149        of the task
150
151        :param level: The topological level to which the group belongs
152        :type level: str
153
154        :param node: The group of CPUs for which residency
155            needs to calculated
156        :type node: list
157
158        :param window: A (start, end) tuple to limit the scope of the
159            residency calculation.
160        :type window: tuple
161
162        :param percent: If true the result is normalized to the total runtime
163            of the task and returned as a percentage
164        :type percent: bool
165
166        .. math::
167
168            R = \\frac{T_{group} \\times 100}{T_{total}}
169
170        .. seealso:: :mod:`bart.sched.SchedAssert.SchedAssert.assertResidency`
171        """
172
173        # Get the index of the node in the level
174        node_index = self._topology.get_index(level, node)
175
176        agg = self._aggregator(sched_funcs.residency_sum)
177        level_result = agg.aggregate(level=level, window=window)
178
179        node_value = level_result[node_index]
180
181        if percent:
182            total = agg.aggregate(level="all", window=window)[0]
183            node_value = node_value * 100
184            node_value = node_value / total
185
186        return node_value
187
188    def assertResidency(
189            self,
190            level,
191            node,
192            expected_value,
193            operator,
194            window=None,
195            percent=False):
196        """
197        :param level: The topological level to which the group belongs
198        :type level: str
199
200        :param node: The group of CPUs for which residency
201            needs to calculated
202        :type node: list
203
204        :param expected_value: The expected value of the residency
205        :type expected_value: double
206
207        :param operator: A binary operator function that returns
208            a boolean. For example:
209            ::
210
211                import operator
212                op = operator.ge
213                assertResidency(level, node, expected_value, op)
214
215            Will do the following check:
216            ::
217
218                getResidency(level, node) >= expected_value
219
220            A custom function can also be passed:
221            ::
222
223                THRESHOLD=5
224                def between_threshold(a, expected):
225                    return abs(a - expected) <= THRESHOLD
226
227        :type operator: function
228
229        :param window: A (start, end) tuple to limit the scope of the
230            residency calculation.
231        :type window: tuple
232
233        :param percent: If true the result is normalized to the total runtime
234            of the task and returned as a percentage
235        :type percent: bool
236
237        .. seealso:: :mod:`bart.sched.SchedAssert.SchedAssert.getResidency`
238        """
239        node_value = self.getResidency(level, node, window, percent)
240        return operator(node_value, expected_value)
241
242    def getStartTime(self):
243        """
244        :return: The first time the task ran across all the CPUs
245        """
246
247        agg = self._aggregator(sched_funcs.first_time)
248        result = agg.aggregate(level="all", value=sched_funcs.TASK_RUNNING)
249        return min(result[0])
250
251    def getEndTime(self):
252        """
253        :return: The first last time the task ran across
254            all the CPUs
255        """
256
257        agg = self._aggregator(sched_funcs.first_time)
258        agg = self._aggregator(sched_funcs.last_time)
259        result = agg.aggregate(level="all", value=sched_funcs.TASK_RUNNING)
260        return max(result[0])
261
262    def _relax_switch_window(self, series, direction, window):
263        """
264            direction == "left"
265                return the last time the task was running
266                if no such time exists in the window,
267                extend the window's left extent to
268                getStartTime
269
270            direction == "right"
271                return the first time the task was running
272                in the window. If no such time exists in the
273                window, extend the window's right extent to
274                getEndTime()
275
276            The function returns a None if
277            len(series[series == TASK_RUNNING]) == 0
278            even in the extended window
279        """
280
281        series = series[series == sched_funcs.TASK_RUNNING]
282        w_series = sched_funcs.select_window(series, window)
283        start, stop = window
284
285        if direction == "left":
286            if len(w_series):
287                return w_series.index.values[-1]
288            else:
289                start_time = self.getStartTime()
290                w_series = sched_funcs.select_window(
291                    series,
292                    window=(
293                        start_time,
294                        start))
295
296                if not len(w_series):
297                    return None
298                else:
299                    return w_series.index.values[-1]
300
301        elif direction == "right":
302            if len(w_series):
303                return w_series.index.values[0]
304            else:
305                end_time = self.getEndTime()
306                w_series = sched_funcs.select_window(series, window=(stop, end_time))
307
308                if not len(w_series):
309                    return None
310                else:
311                    return w_series.index.values[0]
312        else:
313            raise ValueError("direction should be either left or right")
314
315    def assertSwitch(
316            self,
317            level,
318            from_node,
319            to_node,
320            window,
321            ignore_multiple=True):
322        """
323        This function asserts that there is context switch from the
324           :code:`from_node` to the :code:`to_node`:
325
326        :param level: The topological level to which the group belongs
327        :type level: str
328
329        :param from_node: The node from which the task switches out
330        :type from_node: list
331
332        :param to_node: The node to which the task switches
333        :type to_node: list
334
335        :param window: A (start, end) tuple to limit the scope of the
336            residency calculation.
337        :type window: tuple
338
339        :param ignore_multiple: If true, the function will ignore multiple
340           switches in the window, If false the assert will be true if and
341           only if there is a single switch within the specified window
342        :type ignore_multiple: bool
343        """
344
345        from_node_index = self._topology.get_index(level, from_node)
346        to_node_index = self._topology.get_index(level, to_node)
347
348        agg = self._aggregator(sched_funcs.csum)
349        level_result = agg.aggregate(level=level)
350
351        from_node_result = level_result[from_node_index]
352        to_node_result = level_result[to_node_index]
353
354        from_time = self._relax_switch_window(from_node_result, "left", window)
355        if ignore_multiple:
356            to_time = self._relax_switch_window(to_node_result, "left", window)
357        else:
358            to_time = self._relax_switch_window(
359                to_node_result,
360                "right", window)
361
362        if from_time and to_time:
363            if from_time < to_time:
364                return True
365
366        return False
367
368    def getRuntime(self, window=None, percent=False):
369        """Return the Total Runtime of a task
370
371        :param window: A (start, end) tuple to limit the scope of the
372            residency calculation.
373        :type window: tuple
374
375        :param percent: If True, the result is returned
376            as a percentage of the total execution time
377            of the run.
378        :type percent: bool
379
380        .. seealso:: :mod:`bart.sched.SchedAssert.SchedAssert.assertRuntime`
381        """
382
383        agg = self._aggregator(sched_funcs.residency_sum)
384        run_time = agg.aggregate(level="all", window=window)[0]
385
386        if percent:
387
388            if window:
389                begin, end = window
390                total_time = end - begin
391            else:
392                total_time = self._ftrace.get_duration()
393
394            run_time = run_time * 100
395            run_time = run_time / total_time
396
397        return run_time
398
399    def assertRuntime(
400            self,
401            expected_value,
402            operator,
403            window=None,
404            percent=False):
405        """Assert on the total runtime of the task
406
407        :param expected_value: The expected value of the runtime
408        :type expected_value: double
409
410        :param operator: A binary operator function that returns
411            a boolean. For example:
412            ::
413
414                import operator
415                op = operator.ge
416                assertRuntime(expected_value, op)
417
418            Will do the following check:
419            ::
420
421                getRuntime() >= expected_value
422
423            A custom function can also be passed:
424            ::
425
426                THRESHOLD=5
427                def between_threshold(a, expected):
428                    return abs(a - expected) <= THRESHOLD
429
430        :type operator: function
431
432        :param window: A (start, end) tuple to limit the scope of the
433            residency calculation.
434        :type window: tuple
435
436        :param percent: If True, the result is returned
437            as a percentage of the total execution time
438            of the run.
439        :type percent: bool
440
441        .. seealso:: :mod:`bart.sched.SchedAssert.SchedAssert.getRuntime`
442        """
443
444        run_time = self.getRuntime(window, percent)
445        return operator(run_time, expected_value)
446
447    def getPeriod(self, window=None, align="start"):
448        """Return the period of the task in (ms)
449
450        Let's say a task started execution at the following times:
451
452            .. math::
453
454                T_1, T_2, ...T_n
455
456        The period is defined as:
457
458            .. math::
459
460                Median((T_2 - T_1), (T_4 - T_3), ....(T_n - T_{n-1}))
461
462        :param window: A (start, end) tuple to limit the scope of the
463            residency calculation.
464        :type window: tuple
465
466        :param align:
467            :code:`"start"` aligns period calculation to switch-in events
468            :code:`"end"` aligns the calculation to switch-out events
469        :type param: str
470
471        .. seealso:: :mod:`bart.sched.SchedAssert.SchedAssert.assertPeriod`
472        """
473
474        agg = self._aggregator(sched_funcs.period)
475        deltas = agg.aggregate(level="all", window=window)[0]
476
477        if not len(deltas):
478            return float("NaN")
479        else:
480            return np.median(deltas) * 1000
481
482    def assertPeriod(
483            self,
484            expected_value,
485            operator,
486            window=None,
487            align="start"):
488        """Assert on the period of the task
489
490        :param expected_value: The expected value of the runtime
491        :type expected_value: double
492
493        :param operator: A binary operator function that returns
494            a boolean. For example:
495            ::
496
497                import operator
498                op = operator.ge
499                assertPeriod(expected_value, op)
500
501            Will do the following check:
502            ::
503
504                getPeriod() >= expected_value
505
506            A custom function can also be passed:
507            ::
508
509                THRESHOLD=5
510                def between_threshold(a, expected):
511                    return abs(a - expected) <= THRESHOLD
512
513        :param window: A (start, end) tuple to limit the scope of the
514            calculation.
515        :type window: tuple
516
517        :param align:
518            :code:`"start"` aligns period calculation to switch-in events
519            :code:`"end"` aligns the calculation to switch-out events
520        :type param: str
521
522        .. seealso:: :mod:`bart.sched.SchedAssert.SchedAssert.getPeriod`
523        """
524
525        period = self.getPeriod(window, align)
526        return operator(period, expected_value)
527
528    def getDutyCycle(self, window):
529        """Return the duty cycle of the task
530
531        :param window: A (start, end) tuple to limit the scope of the
532            calculation.
533        :type window: tuple
534
535        Duty Cycle:
536            The percentage of time the task spends executing
537            in the given window of time
538
539            .. math::
540
541                \delta_{cycle} = \\frac{T_{exec} \\times 100}{T_{window}}
542
543        .. seealso:: :mod:`bart.sched.SchedAssert.SchedAssert.assertDutyCycle`
544        """
545
546        return self.getRuntime(window, percent=True)
547
548    def assertDutyCycle(self, expected_value, operator, window):
549        """
550        :param operator: A binary operator function that returns
551            a boolean. For example:
552            ::
553
554                import operator
555                op = operator.ge
556                assertPeriod(expected_value, op)
557
558            Will do the following check:
559            ::
560
561                getPeriod() >= expected_value
562
563            A custom function can also be passed:
564            ::
565
566                THRESHOLD=5
567                def between_threshold(a, expected):
568                    return abs(a - expected) <= THRESHOLD
569
570        :param window: A (start, end) tuple to limit the scope of the
571            calculation.
572        :type window: tuple
573
574        .. seealso:: :mod:`bart.sched.SchedAssert.SchedAssert.getDutyCycle`
575
576        """
577        return self.assertRuntime(
578            expected_value,
579            operator,
580            window,
581            percent=True)
582
583    def getFirstCpu(self, window=None):
584        """
585        :return: The first CPU the task ran on
586
587        .. seealso:: :mod:`bart.sched.SchedAssert.SchedAssert.assertFirstCPU`
588        """
589
590        agg = self._aggregator(sched_funcs.first_cpu)
591        result = agg.aggregate(level="cpu", window=window)
592        result = list(itertools.chain.from_iterable(result))
593
594        min_time = min(result)
595        if math.isinf(min_time):
596            return -1
597        index = result.index(min_time)
598        return self._topology.get_node("cpu", index)[0]
599
600    def assertFirstCpu(self, cpus, window=None):
601        """
602        Check if the Task started (first ran on in the duration
603        of the trace) on a particular CPU(s)
604
605        :param cpus: A list of acceptable CPUs
606        :type cpus: int, list
607
608        .. seealso:: :mod:`bart.sched.SchedAssert.SchedAssert.getFirstCPU`
609        """
610
611        first_cpu = self.getFirstCpu(window=window)
612        cpus = Utils.listify(cpus)
613        return first_cpu in cpus
614
615    def getLastCpu(self, window=None):
616        """Return the last CPU the task ran on"""
617
618        agg = self._aggregator(sched_funcs.last_cpu)
619        result = agg.aggregate(level="cpu", window=window)
620        result = list(itertools.chain.from_iterable(result))
621
622        end_time = max(result)
623        if not end_time:
624            return -1
625
626        return result.index(end_time)
627
628    def generate_events(self, level, start_id=0, window=None):
629        """Generate events for the trace plot
630
631        .. note::
632            This is an internal function accessed by the
633            :mod:`bart.sched.SchedMultiAssert` class for plotting data
634        """
635
636        agg = self._aggregator(sched_funcs.trace_event)
637        result = agg.aggregate(level=level, window=window)
638        events = []
639
640        for idx, level_events in enumerate(result):
641            if not len(level_events):
642                continue
643            events += np.column_stack((level_events, np.full(len(level_events), idx))).tolist()
644
645        return sorted(events, key = lambda x : x[0])
646
647    def plot(self, level="cpu", window=None, xlim=None):
648        """
649        :return: :mod:`trappy.plotter.AbstractDataPlotter` instance
650            Call :func:`view` to draw the graph
651        """
652
653        if not xlim:
654            if not window:
655                xlim = [0, self._ftrace.get_duration()]
656            else:
657                xlim = list(window)
658
659        events = {}
660        events[self.name] = self.generate_events(level, window)
661        names = [self.name]
662        num_lanes = self._topology.level_span(level)
663        lane_prefix = level.upper() + ": "
664        return trappy.EventPlot(events, names, xlim,
665                                lane_prefix=lane_prefix,
666                                num_lanes=num_lanes)
667