1#!/usr/bin/python
2"""
3Usage: ./cron_scripts/log_distiller.py job_id path_to_logfile
4    If the job_id is a suite it will find all subjobs.
5You need to change the location of the log it will parse.
6The job_id needs to be in the afe database.
7"""
8import abc
9import datetime
10import os
11import re
12import pprint
13import subprocess
14import sys
15import time
16
17import common
18from autotest_lib.server import frontend
19
20
21LOGFIE = './logs/scheduler.log.2014-04-17-16.51.47'
22# logfile name format: scheduler.log.2014-02-14-18.10.56
23time_format = '%Y-%m-%d-%H.%M.%S'
24logfile_regex = r'scheduler.log.([0-9,.,-]+)'
25logdir = os.path.join('/usr/local/autotest', 'logs')
26
27class StateMachineViolation(Exception):
28    pass
29
30
31class LogLineException(Exception):
32    pass
33
34
35def should_process_log(time_str, time_format, cutoff_days=7):
36    """Returns true if the logs was created after cutoff days.
37
38    @param time_str: A string representing the time.
39        eg: 2014-02-14-18.10.56
40    @param time_format: A string representing the format of the time string.
41        ref: http://docs.python.org/2/library/datetime.html#strftime-strptime-behavior
42    @param cutoff_days: Int representind the cutoff in days.
43
44    @return: Returns True if time_str has aged more than cutoff_days.
45    """
46    log_time = datetime.datetime.strptime(time_str, time_format)
47    now = datetime.datetime.strptime(time.strftime(time_format), time_format)
48    cutoff = now - datetime.timedelta(days=cutoff_days)
49    return log_time < cutoff
50
51
52def apply_regex(regex, line):
53    """Simple regex applicator.
54
55    @param regex: Regex to apply.
56    @param line: The line to apply regex on.
57
58    @return: A tuple with the matching groups, if there was a match.
59    """
60    log_match  = re.match(regex, line)
61    if log_match:
62        return log_match.groups()
63
64
65class StateMachineParser(object):
66    """Abstract class that enforces state transition ordering.
67
68    Classes inheriting from StateMachineParser need to define an
69    expected_transitions dictionary. The SMP will pop 'to' states
70    from the dictionary as they occur, so you cannot same state transitions
71    unless you specify 2 of them.
72    """
73    __metaclass__ = abc.ABCMeta
74
75
76    @abc.abstractmethod
77    def __init__(self):
78        self.visited_states = []
79        self.expected_transitions = {}
80
81
82    def advance_state(self, from_state, to_state):
83        """Checks that a transition is valid.
84
85        @param from_state: A string representind the state the host is leaving.
86        @param to_state: The state The host is going to, represented as a string.
87
88        @raises LogLineException: If an invalid state transition was
89            detected.
90        """
91        # TODO: Updating to the same state is a waste of bw.
92        if from_state and from_state == to_state:
93            return ('Updating to the same state is a waste of BW: %s->%s' %
94                    (from_state, to_state))
95            return
96
97        if (from_state in self.expected_transitions and
98            to_state in self.expected_transitions[from_state]):
99            self.expected_transitions[from_state].remove(to_state)
100            self.visited_states.append(to_state)
101        else:
102            return (from_state, to_state)
103
104
105class SingleJobHostSMP(StateMachineParser):
106    def __init__(self):
107        self.visited_states = []
108        self.expected_transitions = {
109                'Ready': ['Resetting', 'Verifying', 'Pending', 'Provisioning'],
110                'Resetting': ['Ready', 'Provisioning'],
111                'Pending': ['Running'],
112                'Provisioning': ['Repairing'],
113                'Running': ['Ready']
114        }
115
116
117    def check_transitions(self, hostline):
118        if hostline.line_info['field'] == 'status':
119            self.advance_state(hostline.line_info['state'],
120                    hostline.line_info['value'])
121
122
123class SingleJobHqeSMP(StateMachineParser):
124    def __init__(self):
125        self.visited_states = []
126        self.expected_transitions = {
127                'Queued': ['Starting', 'Resetting', 'Aborted'],
128                'Resetting': ['Pending', 'Provisioning'],
129                'Provisioning': ['Pending', 'Queued', 'Repairing'],
130                'Pending': ['Starting'],
131                'Starting': ['Running'],
132                'Running': ['Gathering', 'Parsing'],
133                'Gathering': ['Parsing'],
134                'Parsing': ['Completed', 'Aborted']
135        }
136
137
138    def check_transitions(self, hqeline):
139        invalid_states = self.advance_state(
140                hqeline.line_info['from_state'], hqeline.line_info['to_state'])
141        if not invalid_states:
142            return
143
144        # Deal with repair.
145        if (invalid_states[0] == 'Queued' and
146            'Running' in self.visited_states):
147            raise StateMachineViolation('Unrecognized state transition '
148                    '%s->%s, expected transitions are %s' %
149                    (invalid_states[0], invalid_states[1],
150                     self.expected_transitions))
151
152
153class LogLine(object):
154    """Line objects.
155
156    All classes inheriting from LogLine represent a line of some sort.
157    A line is responsible for parsing itself, and invoking an SMP to
158    validate state transitions. A line can be part of several state machines.
159    """
160    line_format = '%s'
161
162
163    def __init__(self, state_machine_parsers):
164        """
165        @param state_machine_parsers: A list of smp objects to use to validate
166            state changes on these types of lines..
167        """
168        self.smps = state_machine_parsers
169
170        # Because, this is easier to flush.
171        self.line_info = {}
172
173
174    def parse_line(self, line):
175        """Apply a line regex and save any information the parsed line contains.
176
177        @param line: A string representing a line.
178        """
179        # Regex for all the things.
180        line_rgx = '(.*)'
181        parsed_line = apply_regex(line_rgx, line)
182        if parsed_line:
183            self.line_info['line'] = parsed_line[0]
184
185
186    def flush(self):
187        """Call any state machine parsers, persist line info if needed.
188        """
189        for smp in self.smps:
190            smp.check_transitions(self)
191        # TODO: persist this?
192        self.line_info={}
193
194
195    def format_line(self):
196        try:
197            return self.line_format % self.line_info
198        except KeyError:
199            return self.line_info['line']
200
201
202class TimeLine(LogLine):
203    """Filters timestamps for scheduler logs.
204    """
205
206    def parse_line(self, line):
207        super(TimeLine, self).parse_line(line)
208
209        # Regex for isolating the date and time from scheduler logs, eg:
210        # 02/16 16:04:36.573 INFO |scheduler_:0574|...
211        line_rgx = '([0-9,/,:,., ]+)(.*)'
212        parsed_line = apply_regex(line_rgx, self.line_info['line'])
213        if parsed_line:
214            self.line_info['time'] = parsed_line[0]
215            self.line_info['line'] = parsed_line[1]
216
217
218class HostLine(TimeLine):
219    """Manages hosts line parsing.
220    """
221    line_format = (' \t\t %(time)s %(host)s, currently in %(state)s, '
222                'updated %(field)s->%(value)s')
223
224
225    def record_state_transition(self, line):
226        """Apply the state_transition_rgx to a line and record state changes.
227
228        @param line: The line we're expecting to contain a state transition.
229        """
230        state_transition_rgx = ".* ([a-zA-Z]+) updating {'([a-zA-Z]+)': ('[a-zA-Z]+'|[0-9])}.*"
231        match = apply_regex(state_transition_rgx, line)
232        if match:
233            self.line_info['state'] = match[0]
234            self.line_info['field'] = match[1]
235            self.line_info['value'] = match[2].replace("'", "")
236
237
238    def parse_line(self, line):
239        super(HostLine, self).parse_line(line)
240
241        # Regex for getting host status. Eg:
242        # 172.22.4 in Running updating {'status': 'Running'}
243        line_rgx = '.*Host (([0-9,.,a-z,-]+).*)'
244        parsed_line = apply_regex(line_rgx, self.line_info['line'])
245        if parsed_line:
246            self.line_info['line'] = parsed_line[0]
247            self.line_info['host'] = parsed_line[1]
248            self.record_state_transition(self.line_info['line'])
249            return self.format_line()
250
251
252class HQELine(TimeLine):
253    """Manages HQE line parsing.
254    """
255    line_format = ('%(time)s %(hqe)s, currently in %(from_state)s, '
256            'updated to %(to_state)s. Flags: %(flags)s')
257
258
259    def record_state_transition(self, line):
260        """Apply the state_transition_rgx to a line and record state changes.
261
262        @param line: The line we're expecting to contain a state transition.
263        """
264        # Regex for getting hqe status. Eg:
265        # status:Running [active] -> Gathering
266        state_transition_rgx = ".*status:([a-zA-Z]+)( \[[a-z\,]+\])? -> ([a-zA-Z]+)"
267        match = apply_regex(state_transition_rgx, line)
268        if match:
269            self.line_info['from_state'] = match[0]
270            self.line_info['flags'] = match[1]
271            self.line_info['to_state'] = match[2]
272
273
274    def parse_line(self, line):
275        super(HQELine, self).parse_line(line)
276        line_rgx = r'.*\| HQE: (([0-9]+).*)'
277        parsed_line = apply_regex(line_rgx, self.line_info['line'])
278        if parsed_line:
279            self.line_info['line'] = parsed_line[0]
280            self.line_info['hqe'] = parsed_line[1]
281            self.record_state_transition(self.line_info['line'])
282            return self.format_line()
283
284
285class LogCrawler(object):
286    """Crawl logs.
287
288    Log crawlers are meant to apply some basic preprocessing to a log, and crawl
289    the output validating state changes. They manage line and state machine
290    creation. The initial filtering applied to the log needs to be grab all lines
291    that match an action, such as the running of a job.
292    """
293
294    def __init__(self, log_name):
295        self.log = log_name
296        self.filter_command = 'cat %s' % log_name
297
298
299    def preprocess_log(self):
300        """Apply some basic filtering to the log.
301        """
302        proc = subprocess.Popen(self.filter_command,
303                shell=True, stdout=subprocess.PIPE)
304        out, err = proc.communicate()
305        return out
306
307
308class SchedulerLogCrawler(LogCrawler):
309    """A log crawler for the scheduler logs.
310
311    This crawler is only capable of processing information about a single job.
312    """
313
314    def __init__(self, log_name, **kwargs):
315        super(SchedulerLogCrawler, self).__init__(log_name)
316        self.job_id = kwargs['job_id']
317        self.line_processors = [HostLine([SingleJobHostSMP()]),
318                HQELine([SingleJobHqeSMP()])]
319        self.filter_command = ('%s | grep "for job: %s"' %
320                (self.filter_command, self.job_id))
321
322
323    def parse_log(self):
324        """Parse each line of the preprocessed log output.
325
326        Pass each line through each possible line_processor. The one that matches
327        will populate itself, call flush, this will walk the state machine of that
328        line to the next step.
329        """
330        out = self.preprocess_log()
331        response = []
332        for job_line in out.split('\n'):
333            parsed_line = None
334            for processor in self.line_processors:
335                line = processor.parse_line(job_line)
336                if line and parsed_line:
337                    raise LogLineException('Multiple Parsers claiming the line %s: '
338                            'previous parsing: %s, current parsing: %s ' %
339                            (job_line, parsed_line, line))
340                elif line:
341                    parsed_line = line
342                    try:
343                        processor.flush()
344                    except StateMachineViolation as e:
345                        response.append(str(e))
346                        raise StateMachineViolation(response)
347            response.append(parsed_line if parsed_line else job_line)
348        return response
349
350
351def process_logs():
352    if len(sys.argv) < 2:
353        print ('Usage: ./cron_scripts/log_distiller.py 0 8415620 '
354               'You need to change the location of the log it will parse.'
355                'The job_id needs to be in the afe database.')
356        sys.exit(1)
357
358    job_id = int(sys.argv[1])
359    rpc = frontend.AFE()
360    suite_jobs = rpc.run('get_jobs', id=job_id)
361    if not suite_jobs[0]['parent_job']:
362        suite_jobs = rpc.run('get_jobs', parent_job=job_id)
363    try:
364        logfile = sys.argv[2]
365    except Exception:
366        logfile = LOGFILE
367
368    for job in suite_jobs:
369        log_crawler = SchedulerLogCrawler(logfile, job_id=job['id'])
370        for line in log_crawler.parse_log():
371            print line
372    return
373
374
375if __name__ == '__main__':
376    process_logs()
377