1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging, os, re, utils
6
7from autotest_lib.client.bin import utils
8from autotest_lib.client.common_lib import error
9
10class KernelTrace(object):
11    """Allows access and control to Kernel tracing facilities.
12
13    Example code snippet:
14        trace = KernelTrace(events=['mali_dvfs:mali_dvfs_set_clock'])
15        results = trace.read(regexp=r'frequency=(\d+)')
16
17    Public methods:
18        on          : Enables tracing
19        off         : Disables tracing
20        is_tracing  : Returns Boolean of tracing status.
21        event_on    : Turns event on.  Returns boolean of success
22        event_off   : Turns event off.  Returns boolean of success
23        flush       : Flushes trace buffer
24        read        : Reads trace buffer returns list of
25                      - tuples if regexp provided
26                      - else matching string
27        uptime_secs : Returns float of current uptime.
28
29    Private functions:
30        _onoff       : Disable/enable tracing
31        _onoff_event : Disable/enable events
32
33    Private attributes:
34        _buffer      : list to hold parsed results from trace buffer
35        _buffer_ptr  : integer pointing to last byte read
36
37    TODO(tbroch):  List of potential enhancements
38       - currently only supports trace events.  Add other tracers.
39    """
40    _TRACE_ROOT = '/sys/kernel/debug/tracing'
41    _TRACE_EN_PATH = os.path.join(_TRACE_ROOT, 'tracing_enabled')
42
43    def __init__(self, flush=True, events=None, on=True):
44        """Constructor for KernelTrace class"""
45        self._buffer = []
46        self._buffer_ptr = 0
47        self._events = []
48        self._on = on
49
50        if flush:
51            self.flush()
52        for event in events:
53            if self.event_on(event):
54                self._events.append(event)
55        if on:
56            self.on()
57
58
59    def __del__(self, flush=True, events=None, on=True):
60        """Deconstructor for KernelTrace class"""
61        for event in self._events:
62            self.event_off(event)
63        if self._on:
64            self.off()
65
66
67    def _onoff(self, val):
68        """Enable/Disable tracing.
69
70        Arguments:
71            val: integer, 1 for on, 0 for off
72
73        Raises:
74            error.TestFail: If unable to enable/disable tracing
75              boolean of tracing on/off status
76        """
77        utils.write_one_line(self._TRACE_EN_PATH, val)
78        fname = os.path.join(self._TRACE_ROOT, 'tracing_on')
79        result = int(utils.read_one_line(fname).strip())
80        if not result == val:
81            raise error.TestFail("Unable to %sable tracing" %
82                                 'en' if val == 1 else 'dis')
83
84
85    def on(self):
86        """Enable tracing."""
87        return self._onoff(1)
88
89
90    def off(self):
91        """Disable tracing."""
92        self._onoff(0)
93
94
95    def is_tracing(self):
96        """Is tracing on?
97
98        Returns:
99            True if tracing enabled and at least one event is enabled.
100        """
101        fname = os.path.join(self._TRACE_ROOT, 'tracing_on')
102        result = int(utils.read_one_line(fname).strip())
103        if result == 1 and len(self._events) > 0:
104            return True
105        return False
106
107
108    def _event_onoff(self, event, val):
109        """Enable/Disable tracing event.
110
111        TODO(tbroch) Consider allowing wild card enabling of trace events via
112            /sys/kernel/debug/tracing/set_event although it makes filling buffer
113            really easy
114
115        Arguments:
116            event: list of events.
117                   See kernel(Documentation/trace/events.txt) for formatting.
118            val: integer, 1 for on, 0 for off
119
120         Returns:
121            True if success, false otherwise
122        """
123        logging.debug("event_onoff: event:%s val:%d", event, val)
124        event_path = event.replace(':', '/')
125        fname = os.path.join(self._TRACE_ROOT, 'events', event_path, 'enable')
126
127        if not os.path.exists(fname):
128            logging.warning("Unable to locate tracing event %s", fname)
129            return False
130        utils.write_one_line(fname, val)
131
132        fname = os.path.join(self._TRACE_ROOT, "set_event")
133        found = False
134        with open(fname) as fd:
135            for ln in fd.readlines():
136                logging.debug("set_event ln:%s", ln)
137                if re.findall(event, ln):
138                    found = True
139                    break
140
141        if val == 1 and not found:
142            logging.warning("Event %s not enabled", event)
143            return False
144
145        if val == 0 and found:
146            logging.warning("Event %s not disabled", event)
147            return False
148
149        return True
150
151
152    def event_on(self, event):
153        return self._event_onoff(event, 1)
154
155
156    def event_off(self, event):
157        return self._event_onoff(event, 0)
158
159
160    def flush(self):
161        """Flush trace buffer.
162
163        Raises:
164            error.TestFail: If unable to flush
165        """
166        self.off()
167        fname = os.path.join(self._TRACE_ROOT, 'free_buffer')
168        utils.write_one_line(fname, 1)
169        self._buffer_ptr = 0
170
171        fname = os.path.join(self._TRACE_ROOT, 'buffer_size_kb')
172        result = utils.read_one_line(fname).strip()
173        if result is '0':
174            return True
175        return False
176
177
178    def read(self, regexp=None):
179        fname = os.path.join(self._TRACE_ROOT, 'trace')
180        fd = open(fname)
181        fd.seek(self._buffer_ptr)
182        for ln in fd.readlines():
183            if regexp is None:
184                self._buffer.append(ln)
185                continue
186            results = re.findall(regexp, ln)
187            if results:
188                logging.debug(ln)
189                self._buffer.append(results[0])
190        self._buffer_ptr = fd.tell()
191        fd.close()
192        return self._buffer
193
194
195    @staticmethod
196    def uptime_secs():
197        results = utils.read_one_line("/proc/uptime")
198        return float(results.split()[0])
199