1#    Copyright 2015-2017 ARM Limited
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15
16
17import unittest
18import matplotlib
19import numpy as np
20import pandas as pd
21import tempfile
22import os
23import warnings
24
25from test_thermal import BaseTestThermal
26import trappy
27
28
29class TestPlotter(BaseTestThermal):
30
31    """No Bombing testcases for plotter"""
32
33    def __init__(self, *args, **kwargs):
34        super(TestPlotter, self).__init__(*args, **kwargs)
35
36    def test_plot_no_pivot(self):
37        """Tests LinePlot with no pivot"""
38        trace1 = trappy.FTrace(name="first")
39        l = trappy.LinePlot(trace1, trappy.thermal.Thermal, column="temp")
40        l.view(test=True)
41
42    def test_plot_multi_trace(self):
43        """Tests LinePlot with no Pivot multi traces"""
44        trace1 = trappy.FTrace(name="first")
45        trace2 = trappy.FTrace(name="second")
46        l = trappy.LinePlot(
47            [trace1, trace2], trappy.thermal.Thermal, column="temp")
48        l.view(test=True)
49
50    def test_plot_multi(self):
51        """Tests LinePlot with no Pivot multi attrs"""
52        trace1 = trappy.FTrace(name="first")
53        trace2 = trappy.FTrace(name="second")
54        l = trappy.LinePlot([trace1,
55                          trace2],
56                         [trappy.thermal.Thermal,
57                          trappy.thermal.ThermalGovernor],
58                         column=["temp",
59                                 "power_range"])
60        l.view(test=True)
61
62    def test_plot_filter(self):
63        """Tests LinePlot with no Pivot with filters"""
64        trace1 = trappy.FTrace(name="first")
65        trace2 = trappy.FTrace(name="second")
66        l = trappy.LinePlot([trace1,
67                          trace2],
68                         [trappy.cpu_power.CpuOutPower],
69                         column=["power"],
70                         filters={"cdev_state": [0]})
71        l.view(test=True)
72
73    def test_plot_pivot(self):
74        """Tests LinePlot with Pivot"""
75        trace1 = trappy.FTrace(name="first")
76        l = trappy.LinePlot(
77            trace1,
78            trappy.thermal.Thermal,
79            column="temp",
80            pivot="thermal_zone")
81        l.view(test=True)
82
83    def test_plot_multi_trace_pivot(self):
84        """Tests LinePlot with Pivot multi traces"""
85        trace1 = trappy.FTrace(name="first")
86        trace2 = trappy.FTrace(name="second")
87        l = trappy.LinePlot(
88            [trace1, trace2], trappy.cpu_power.CpuOutPower, column="power", pivot="cpus")
89        l.view(test=True)
90
91    def test_plot_multi_pivot(self):
92        """Tests LinePlot with Pivot with multi attrs"""
93        trace1 = trappy.FTrace(name="first")
94        trace2 = trappy.FTrace(name="second")
95        l = trappy.LinePlot([trace1,
96                          trace2],
97                         [trappy.cpu_power.CpuInPower,
98                          trappy.cpu_power.CpuOutPower],
99                         column=["dynamic_power",
100                                 "power"],
101                         pivot="cpus")
102        l.view(test=True)
103
104    def test_plot_multi_pivot_filter(self):
105        """Tests LinePlot with Pivot and filters"""
106        trace1 = trappy.FTrace(name="first")
107        trace2 = trappy.FTrace(name="second")
108        l = trappy.LinePlot(
109            trace1,
110            trappy.cpu_power.CpuInPower,
111            column=[
112                "dynamic_power",
113                "load1"],
114            filters={
115                "cdev_state": [
116                    1,
117                    0]},
118            pivot="cpus")
119        l.view(test=True)
120
121    def test_plot_savefig(self):
122        """Tests plotter: savefig"""
123        trace1 = trappy.FTrace(name="first")
124        trace2 = trappy.FTrace(name="second")
125        l = trappy.LinePlot(
126            trace1,
127            trappy.cpu_power.CpuInPower,
128            column=[
129                "dynamic_power",
130                "load1"],
131            filters={
132                "cdev_state": [
133                    1,
134                    0]},
135            pivot="cpus")
136        png_file = tempfile.mktemp(dir="/tmp", suffix=".png")
137        l.savefig(png_file)
138        self.assertTrue(os.path.isfile(png_file))
139        os.remove(png_file)
140
141
142    def test_signals(self):
143        """Test signals input for LinePlot"""
144
145        trace1 = trappy.FTrace(name="first")
146        trace2 = trappy.FTrace(name="second")
147
148        l = trappy.LinePlot([trace1,
149                          trace2],
150                         signals=["cpu_in_power:dynamic_power",
151                                 "cpu_out_power:power"],
152                         pivot="cpus")
153
154        self.assertTrue(isinstance(l.templates[0], type(trappy.cpu_power.CpuInPower)))
155        self.assertEquals(l._attr["column"][0], "dynamic_power")
156        self.assertTrue(l.templates[1], type(trappy.cpu_power.CpuOutPower))
157        self.assertEquals(l._attr["column"][1], "power")
158        self.assertTrue("colors" not in l._attr)
159
160        # Check that plotting doesn't barf
161        l.view(test=True)
162
163
164    def test_signals_exceptions(self):
165        """Test incorrect input combinations: signals"""
166
167        trace1 = trappy.FTrace(name="first")
168        trace2 = trappy.FTrace(name="second")
169
170        with self.assertRaises(ValueError):
171            l = trappy.LinePlot([trace1, trace2],
172                            column=[
173                                "dynamic_power",
174                                "load1"],
175                            signals=["cpu_in_power:dynamic_power",
176                                 "cpu_out_power:power"],
177                            pivot="cpus")
178
179        with self.assertRaises(ValueError):
180            l = trappy.LinePlot([trace1, trace2],
181                            trappy.cpu_power.CpuInPower,
182                            signals=["cpu_in_power:dynamic_power",
183                                 "cpu_out_power:power"],
184                            pivot="cpus")
185
186        with self.assertRaises(ValueError):
187            l = trappy.LinePlot([trace1, trace2],
188                            trappy.cpu_power.CpuInPower,
189                            column=[
190                                "dynamic_power",
191                                "load1"],
192                            signals=["cpu_in_power:dynamic_power",
193                                 "cpu_out_power:power"],
194                            pivot="cpus")
195
196    def test_signals_invalid(self):
197        """Test that invalid signal defs result in a helpful errror"""
198        trace = trappy.FTrace()
199
200        with self.assertRaises(ValueError) as assertion:
201            l = trappy.LinePlot(trace, signals=["INVALID_SIGNAL_DEF"])
202        msg = str(assertion.exception)
203        self.assertIn("Invalid signal definition", msg)
204        self.assertIn("INVALID_SIGNAL_DEF", msg)
205
206    def test_signals_colors(self):
207        """Test signals with colors in LinePlot"""
208
209        trace1 = trappy.FTrace(name="first")
210        trace2 = trappy.FTrace(name="second")
211
212        l = trappy.LinePlot([trace1, trace2],
213                         signals=["thermal:temp:1,2,3",
214                                 "cpu_in_power:load2:200,100,0"],
215                         pivot="cpus")
216
217        self.assertTrue(isinstance(l.templates[0], type(trappy.thermal.Thermal)))
218        self.assertEquals(l._attr["column"][0], "temp")
219        self.assertEquals(l._attr["colors"][0], [1, 2, 3])
220        self.assertTrue(l.templates[1], type(trappy.cpu_power.CpuInPower))
221        self.assertEquals(l._attr["column"][1], "load2")
222        self.assertEquals(l._attr["colors"][1], [200, 100, 0])
223
224        # Check that plotting doesn't barf
225        l.view(test=True)
226
227        # Test hex color
228        l = trappy.LinePlot([trace1, trace2],
229                         signals=["thermal:prev_temp:0xff,0x3a,0x3"],
230                         pivot="cpus")
231        self.assertEquals(l._attr["colors"][0], [0xff, 0x3a, 0x3])
232
233    def test_lineplot_dataframe(self):
234        """LinePlot plots DataFrames without exploding"""
235        data = np.random.randn(4, 2)
236        dfr = pd.DataFrame(data, columns=["tick", "tock"]).cumsum()
237        trappy.LinePlot(dfr, column=["tick"]).view(test=True)
238
239    def test_get_trace_event_data_corrupted_trace(self):
240        """get_trace_event_data() works with a corrupted trace"""
241        from trappy.plotter.Utils import get_trace_event_data
242
243        trace = trappy.FTrace()
244
245        # We create this trace:
246        #
247        # 1 15414 -> 15411
248        # 2 15411 -> 15414
249        # 3 15414 -> 15411 (corrupted, should be dropped)
250        # 4 15413 -> 15411
251        # 5 15411 -> 15413
252        #
253        # Which should plot like:
254        #
255        # CPU
256        #    +-------+-------+
257        #  0 | 15411 | 15414 |
258        #    +-------+-------+       +-------+
259        #  1                         | 15411 |
260        #                            +-------+
261        #    +-------+-------+-------+-------+
262        #   0.1     0.2     0.3     0.4     0.5
263
264        broken_trace = pd.DataFrame({
265            '__comm': ["task2", "task1", "task2", "task3", "task1"],
266            '__cpu':  [0, 0, 0, 1, 1],
267            '__pid':  [15414, 15411, 15414, 15413, 15411],
268            'next_comm': ["task1", "task2", "task1", "task1", "task3"],
269            'next_pid':  [15411, 15414, 15411, 15411, 15413],
270            'prev_comm': ["task2", "task1", "task2", "task3", "task1"],
271            'prev_pid':  [15414, 15411, 15414, 15413, 15411],
272            'prev_state': ["S", "R", "S", "S", "S"]},
273            index=pd.Series(range(1, 6), name="Time"))
274
275        trace.sched_switch.data_frame = broken_trace
276
277        with warnings.catch_warnings(record=True) as warn:
278            data, procs, window = get_trace_event_data(trace)
279            self.assertEquals(len(warn), 1)
280
281            warn_str = str(warn[-1])
282            self.assertTrue("15411" in warn_str)
283            self.assertTrue("4" in warn_str)
284
285        zipped_comms = zip(broken_trace["next_comm"], broken_trace["next_pid"])
286        expected_procs = set("-".join([comm, str(pid)]) for comm, pid in zipped_comms)
287
288        self.assertTrue([1, 2, 0] in data["task1-15411"])
289        self.assertTrue([2, 3, 0] in data["task2-15414"])
290        self.assertTrue([4, 5, 1] in data["task1-15411"])
291        self.assertEquals(procs, expected_procs)
292        self.assertEquals(window, [1, 5])
293
294class TestILinePlotter(unittest.TestCase):
295    def test_simple_dfr(self):
296        """ILinePlot doesn't barf when plotting DataFrames"""
297        dfr1 = pd.DataFrame([1, 2, 3, 4], columns=["a"])
298        dfr2 = pd.DataFrame([2, 3, 4, 5], columns=["a"])
299
300        trappy.ILinePlot([dfr1, dfr2], column=["a", "a"]).view(test=True)
301
302        with self.assertRaises(ValueError):
303            trappy.ILinePlot([dfr1, dfr2]).view(test=True)
304
305    def test_df_to_dygraph(self):
306        """Test the ILinePlot util function: df_to_dygraph"""
307
308        dfr1 = pd.DataFrame([[1, 2],
309                             [3, 4],
310                             [5, 6]],
311                             index=[0., 1., 2.], columns=["a", "b"])
312
313        dfr2 = pd.DataFrame([1, 2, 3, 4],
314                            index=[0., 1., 2., 3.], columns=["a"])
315
316        expected_result_1 = {
317            'labels': ['index', 'a', 'b'],
318            'data': [[0.0, 1, 2], [1.0, 3, 4], [2.0, 5, 6]]
319        }
320        expected_result_2 = {
321            'labels': ['index', 'a'],
322            'data': [[0.0, 1], [1.0, 2], [2.0, 3], [3.0, 4]]
323        }
324
325        result_1 = trappy.plotter.ILinePlotGen.df_to_dygraph(dfr1)
326        result_2 = trappy.plotter.ILinePlotGen.df_to_dygraph(dfr2)
327
328        self.assertDictEqual(result_1, expected_result_1)
329        self.assertDictEqual(result_2, expected_result_2)
330
331    def test_duplicate_merging(self):
332        dfr1 = pd.DataFrame([1, 2, 3, 4], index=[0., 0., 1., 2.], columns=["a"])
333        dfr2 = pd.DataFrame([2, 3, 4, 5], index=[1., 1., 1., 2.], columns=["a"])
334
335        trappy.ILinePlot([dfr1, dfr2], column=["a", "a"]).view(test=True)
336
337    def test_independent_series_merging(self):
338        """ILinePlot fixes indexes of independent series"""
339        index1 = [0., 1., 2., 3.]
340        s1 = pd.Series([1, 2, 3, 4], index=index1)
341        index2 = [0.5, 1.5, 2.5, 3.5]
342        s2 = pd.Series([2, 3, 4, 5], index=index2)
343
344        dfr = pd.DataFrame([0, 1, 2, 3], columns=["a"])
345        iplot = trappy.ILinePlot(dfr, column=["a"])
346        s = {"s1": s1, "s2": s2}
347        merged = iplot._fix_indexes(s)
348
349        expected_index = index1 + index2
350        expected_index.sort()
351        self.assertEquals(expected_index, sorted(merged["s1"].keys()))
352
353    def test_dygraph_colors(self):
354        """Check that to_dygraph_colors() constructs a valid dygraph colors argument"""
355        from trappy.plotter.ColorMap import to_dygraph_colors
356
357        color_map = [[86, 58, 206]]
358        expected = '["rgb(86, 58, 206)"]'
359        self.assertEquals(to_dygraph_colors(color_map), expected)
360
361        color_map = [[0, 0, 0], [123, 23, 45]]
362        expected = '["rgb(0, 0, 0)", "rgb(123, 23, 45)"]'
363        self.assertEquals(to_dygraph_colors(color_map), expected)
364
365class TestBarPlot(unittest.TestCase):
366    def setUp(self):
367        self.dfr = pd.DataFrame({"foo": [1, 2, 3],
368                                 "bar": [2, 3, 1],
369                                 "baz": [3, 2, 1]})
370
371    def test_barplot_dfr(self):
372        """BarPlot plots dataframes without exploding"""
373        trappy.BarPlot(self.dfr, column=["foo", "bar"]).view(test=True)
374
375    def test_barplot_trace(self):
376        """BarPlot plots traces without exploding"""
377        trace = trappy.BareTrace()
378        trace.add_parsed_event("event", self.dfr)
379
380        trappy.BarPlot(trace, signals=["event:foo", "event:bar"]).view(test=True)
381