1#!/usr/bin/python -u
2
3import datetime
4import json
5import os, sys, optparse, fcntl, errno, traceback, socket
6
7import common
8from autotest_lib.client.common_lib import mail, pidfile
9from autotest_lib.client.common_lib import utils
10from autotest_lib.frontend import setup_django_environment
11from autotest_lib.frontend.tko import models as tko_models
12from autotest_lib.server.cros.dynamic_suite import constants
13from autotest_lib.site_utils import job_overhead
14from autotest_lib.tko import db as tko_db, utils as tko_utils
15from autotest_lib.tko import models, status_lib
16from autotest_lib.tko.perf_upload import perf_uploader
17
18
19def parse_args():
20    """Parse args."""
21    # build up our options parser and parse sys.argv
22    parser = optparse.OptionParser()
23    parser.add_option("-m", help="Send mail for FAILED tests",
24                      dest="mailit", action="store_true")
25    parser.add_option("-r", help="Reparse the results of a job",
26                      dest="reparse", action="store_true")
27    parser.add_option("-o", help="Parse a single results directory",
28                      dest="singledir", action="store_true")
29    parser.add_option("-l", help=("Levels of subdirectories to include "
30                                  "in the job name"),
31                      type="int", dest="level", default=1)
32    parser.add_option("-n", help="No blocking on an existing parse",
33                      dest="noblock", action="store_true")
34    parser.add_option("-s", help="Database server hostname",
35                      dest="db_host", action="store")
36    parser.add_option("-u", help="Database username", dest="db_user",
37                      action="store")
38    parser.add_option("-p", help="Database password", dest="db_pass",
39                      action="store")
40    parser.add_option("-d", help="Database name", dest="db_name",
41                      action="store")
42    parser.add_option("--write-pidfile",
43                      help="write pidfile (.parser_execute)",
44                      dest="write_pidfile", action="store_true",
45                      default=False)
46    parser.add_option("--record-duration",
47                      help="Record timing to metadata db",
48                      dest="record_duration", action="store_true",
49                      default=False)
50    options, args = parser.parse_args()
51
52    # we need a results directory
53    if len(args) == 0:
54        tko_utils.dprint("ERROR: at least one results directory must "
55                         "be provided")
56        parser.print_help()
57        sys.exit(1)
58
59    # pass the options back
60    return options, args
61
62
63def format_failure_message(jobname, kernel, testname, status, reason):
64    """Format failure message with the given information.
65
66    @param jobname: String representing the job name.
67    @param kernel: String representing the kernel.
68    @param testname: String representing the test name.
69    @param status: String representing the test status.
70    @param reason: String representing the reason.
71
72    @return: Failure message as a string.
73    """
74    format_string = "%-12s %-20s %-12s %-10s %s"
75    return format_string % (jobname, kernel, testname, status, reason)
76
77
78def mailfailure(jobname, job, message):
79    """Send an email about the failure.
80
81    @param jobname: String representing the job name.
82    @param job: A job object.
83    @param message: The message to mail.
84    """
85    message_lines = [""]
86    message_lines.append("The following tests FAILED for this job")
87    message_lines.append("http://%s/results/%s" %
88                         (socket.gethostname(), jobname))
89    message_lines.append("")
90    message_lines.append(format_failure_message("Job name", "Kernel",
91                                                "Test name", "FAIL/WARN",
92                                                "Failure reason"))
93    message_lines.append(format_failure_message("=" * 8, "=" * 6, "=" * 8,
94                                                "=" * 8, "=" * 14))
95    message_header = "\n".join(message_lines)
96
97    subject = "AUTOTEST: FAILED tests from job %s" % jobname
98    mail.send("", job.user, "", subject, message_header + message)
99
100
101def _invalidate_original_tests(orig_job_idx, retry_job_idx):
102    """Retry tests invalidates original tests.
103
104    Whenever a retry job is complete, we want to invalidate the original
105    job's test results, such that the consumers of the tko database
106    (e.g. tko frontend, wmatrix) could figure out which results are the latest.
107
108    When a retry job is parsed, we retrieve the original job's afe_job_id
109    from the retry job's keyvals, which is then converted to tko job_idx and
110    passed into this method as |orig_job_idx|.
111
112    In this method, we are going to invalidate the rows in tko_tests that are
113    associated with the original job by flipping their 'invalid' bit to True.
114    In addition, in tko_tests, we also maintain a pointer from the retry results
115    to the original results, so that later we can always know which rows in
116    tko_tests are retries and which are the corresponding original results.
117    This is done by setting the field 'invalidates_test_idx' of the tests
118    associated with the retry job.
119
120    For example, assume Job(job_idx=105) are retried by Job(job_idx=108), after
121    this method is run, their tko_tests rows will look like:
122    __________________________________________________________________________
123    test_idx| job_idx | test            | ... | invalid | invalidates_test_idx
124    10      | 105     | dummy_Fail.Error| ... | 1       | NULL
125    11      | 105     | dummy_Fail.Fail | ... | 1       | NULL
126    ...
127    20      | 108     | dummy_Fail.Error| ... | 0       | 10
128    21      | 108     | dummy_Fail.Fail | ... | 0       | 11
129    __________________________________________________________________________
130    Note the invalid bits of the rows for Job(job_idx=105) are set to '1'.
131    And the 'invalidates_test_idx' fields of the rows for Job(job_idx=108)
132    are set to 10 and 11 (the test_idx of the rows for the original job).
133
134    @param orig_job_idx: An integer representing the original job's
135                         tko job_idx. Tests associated with this job will
136                         be marked as 'invalid'.
137    @param retry_job_idx: An integer representing the retry job's
138                          tko job_idx. The field 'invalidates_test_idx'
139                          of the tests associated with this job will be updated.
140
141    """
142    msg = 'orig_job_idx: %s, retry_job_idx: %s' % (orig_job_idx, retry_job_idx)
143    if not orig_job_idx or not retry_job_idx:
144        tko_utils.dprint('ERROR: Could not invalidate tests: ' + msg)
145    # Using django models here makes things easier, but make sure that
146    # before this method is called, all other relevant transactions have been
147    # committed to avoid race condition. In the long run, we might consider
148    # to make the rest of parser use django models.
149    orig_tests = tko_models.Test.objects.filter(job__job_idx=orig_job_idx)
150    retry_tests = tko_models.Test.objects.filter(job__job_idx=retry_job_idx)
151
152    # Invalidate original tests.
153    orig_tests.update(invalid=True)
154
155    # Maintain a dictionary that maps (test, subdir) to original tests.
156    # Note that within the scope of a job, (test, subdir) uniquelly
157    # identifies a test run, but 'test' does not.
158    # In a control file, one could run the same test with different
159    # 'subdir_tag', for example,
160    #     job.run_test('dummy_Fail', tag='Error', subdir_tag='subdir_1')
161    #     job.run_test('dummy_Fail', tag='Error', subdir_tag='subdir_2')
162    # In tko, we will get
163    #    (test='dummy_Fail.Error', subdir='dummy_Fail.Error.subdir_1')
164    #    (test='dummy_Fail.Error', subdir='dummy_Fail.Error.subdir_2')
165    invalidated_tests = {(orig_test.test, orig_test.subdir): orig_test
166                         for orig_test in orig_tests}
167    for retry in retry_tests:
168        # It is possible that (retry.test, retry.subdir) doesn't exist
169        # in invalidated_tests. This could happen when the original job
170        # didn't run some of its tests. For example, a dut goes offline
171        # since the beginning of the job, in which case invalidated_tests
172        # will only have one entry for 'SERVER_JOB'.
173        orig_test = invalidated_tests.get((retry.test, retry.subdir), None)
174        if orig_test:
175            retry.invalidates_test = orig_test
176            retry.save()
177    tko_utils.dprint('DEBUG: Invalidated tests associated to job: ' + msg)
178
179
180def parse_one(db, jobname, path, reparse, mail_on_failure):
181    """Parse a single job. Optionally send email on failure.
182
183    @param db: database object.
184    @param jobname: the tag used to search for existing job in db,
185                    e.g. '1234-chromeos-test/host1'
186    @param path: The path to the results to be parsed.
187    @param reparse: True/False, whether this is reparsing of the job.
188    @param mail_on_failure: whether to send email on FAILED test.
189
190
191    """
192    tko_utils.dprint("\nScanning %s (%s)" % (jobname, path))
193    old_job_idx = db.find_job(jobname)
194    # old tests is a dict from tuple (test_name, subdir) to test_idx
195    old_tests = {}
196    if old_job_idx is not None:
197        if not reparse:
198            tko_utils.dprint("! Job is already parsed, done")
199            return
200
201        raw_old_tests = db.select("test_idx,subdir,test", "tko_tests",
202                                  {"job_idx": old_job_idx})
203        if raw_old_tests:
204            old_tests = dict(((test, subdir), test_idx)
205                             for test_idx, subdir, test in raw_old_tests)
206
207    # look up the status version
208    job_keyval = models.job.read_keyval(path)
209    status_version = job_keyval.get("status_version", 0)
210
211    # parse out the job
212    parser = status_lib.parser(status_version)
213    job = parser.make_job(path)
214    status_log = os.path.join(path, "status.log")
215    if not os.path.exists(status_log):
216        status_log = os.path.join(path, "status")
217    if not os.path.exists(status_log):
218        tko_utils.dprint("! Unable to parse job, no status file")
219        return
220
221    # parse the status logs
222    tko_utils.dprint("+ Parsing dir=%s, jobname=%s" % (path, jobname))
223    status_lines = open(status_log).readlines()
224    parser.start(job)
225    tests = parser.end(status_lines)
226
227    # parser.end can return the same object multiple times, so filter out dups
228    job.tests = []
229    already_added = set()
230    for test in tests:
231        if test not in already_added:
232            already_added.add(test)
233            job.tests.append(test)
234
235    # try and port test_idx over from the old tests, but if old tests stop
236    # matching up with new ones just give up
237    if reparse and old_job_idx is not None:
238        job.index = old_job_idx
239        for test in job.tests:
240            test_idx = old_tests.pop((test.testname, test.subdir), None)
241            if test_idx is not None:
242                test.test_idx = test_idx
243            else:
244                tko_utils.dprint("! Reparse returned new test "
245                                 "testname=%r subdir=%r" %
246                                 (test.testname, test.subdir))
247        for test_idx in old_tests.itervalues():
248            where = {'test_idx' : test_idx}
249            db.delete('tko_iteration_result', where)
250            db.delete('tko_iteration_perf_value', where)
251            db.delete('tko_iteration_attributes', where)
252            db.delete('tko_test_attributes', where)
253            db.delete('tko_test_labels_tests', {'test_id': test_idx})
254            db.delete('tko_tests', where)
255
256    # check for failures
257    message_lines = [""]
258    job_successful = True
259    for test in job.tests:
260        if not test.subdir:
261            continue
262        tko_utils.dprint("* testname, status, reason: %s %s %s"
263                         % (test.subdir, test.status, test.reason))
264        if test.status != 'GOOD':
265            job_successful = False
266            message_lines.append(format_failure_message(
267                jobname, test.kernel.base, test.subdir,
268                test.status, test.reason))
269
270    message = "\n".join(message_lines)
271
272    # send out a email report of failure
273    if len(message) > 2 and mail_on_failure:
274        tko_utils.dprint("Sending email report of failure on %s to %s"
275                         % (jobname, job.user))
276        mailfailure(jobname, job, message)
277
278    # write the job into the database.
279    db.insert_job(jobname, job,
280                  parent_job_id=job_keyval.get(constants.PARENT_JOB_ID, None))
281
282    # Upload perf values to the perf dashboard, if applicable.
283    for test in job.tests:
284        perf_uploader.upload_test(job, test)
285
286    # Although the cursor has autocommit, we still need to force it to commit
287    # existing changes before we can use django models, otherwise it
288    # will go into deadlock when django models try to start a new trasaction
289    # while the current one has not finished yet.
290    db.commit()
291
292    # Handle retry job.
293    orig_afe_job_id = job_keyval.get(constants.RETRY_ORIGINAL_JOB_ID, None)
294    if orig_afe_job_id:
295        orig_job_idx = tko_models.Job.objects.get(
296                afe_job_id=orig_afe_job_id).job_idx
297        _invalidate_original_tests(orig_job_idx, job.index)
298
299    # Serializing job into a binary file
300    try:
301        from autotest_lib.tko import tko_pb2
302        from autotest_lib.tko import job_serializer
303
304        serializer = job_serializer.JobSerializer()
305        binary_file_name = os.path.join(path, "job.serialize")
306        serializer.serialize_to_binary(job, jobname, binary_file_name)
307
308        if reparse:
309            site_export_file = "autotest_lib.tko.site_export"
310            site_export = utils.import_site_function(__file__,
311                                                     site_export_file,
312                                                     "site_export",
313                                                     _site_export_dummy)
314            site_export(binary_file_name)
315
316    except ImportError:
317        tko_utils.dprint("DEBUG: tko_pb2.py doesn't exist. Create by "
318                         "compiling tko/tko.proto.")
319
320    db.commit()
321
322    # Mark GS_OFFLOADER_NO_OFFLOAD in gs_offloader_instructions at the end of
323    # the function, so any failure, e.g., db connection error, will stop
324    # gs_offloader_instructions being updated, and logs can be uploaded for
325    # troubleshooting.
326    if job_successful:
327        # Check if we should not offload this test's results.
328        if job_keyval.get(constants.JOB_OFFLOAD_FAILURES_KEY, False):
329            # Update the gs_offloader_instructions json file.
330            gs_instructions_file = os.path.join(
331                    path, constants.GS_OFFLOADER_INSTRUCTIONS)
332            gs_offloader_instructions = {}
333            if os.path.exists(gs_instructions_file):
334                with open(gs_instructions_file, 'r') as f:
335                    gs_offloader_instructions = json.load(f)
336
337            gs_offloader_instructions[constants.GS_OFFLOADER_NO_OFFLOAD] = True
338            with open(gs_instructions_file, 'w') as f:
339                json.dump(gs_offloader_instructions, f)
340
341
342def _site_export_dummy(binary_file_name):
343    pass
344
345
346def _get_job_subdirs(path):
347    """
348    Returns a list of job subdirectories at path. Returns None if the test
349    is itself a job directory. Does not recurse into the subdirs.
350    """
351    # if there's a .machines file, use it to get the subdirs
352    machine_list = os.path.join(path, ".machines")
353    if os.path.exists(machine_list):
354        subdirs = set(line.strip() for line in file(machine_list))
355        existing_subdirs = set(subdir for subdir in subdirs
356                               if os.path.exists(os.path.join(path, subdir)))
357        if len(existing_subdirs) != 0:
358            return existing_subdirs
359
360    # if this dir contains ONLY subdirectories, return them
361    contents = set(os.listdir(path))
362    contents.discard(".parse.lock")
363    subdirs = set(sub for sub in contents if
364                  os.path.isdir(os.path.join(path, sub)))
365    if len(contents) == len(subdirs) != 0:
366        return subdirs
367
368    # this is a job directory, or something else we don't understand
369    return None
370
371
372def parse_leaf_path(db, path, level, reparse, mail_on_failure):
373    """Parse a leaf path.
374
375    @param db: database handle.
376    @param path: The path to the results to be parsed.
377    @param level: Integer, level of subdirectories to include in the job name.
378    @param reparse: True/False, whether this is reparsing of the job.
379    @param mail_on_failure: whether to send email on FAILED test.
380
381    @returns: The job name of the parsed job, e.g. '123-chromeos-test/host1'
382    """
383    job_elements = path.split("/")[-level:]
384    jobname = "/".join(job_elements)
385    try:
386        db.run_with_retry(parse_one, db, jobname, path, reparse,
387                          mail_on_failure)
388    except Exception:
389        traceback.print_exc()
390    return jobname
391
392
393def parse_path(db, path, level, reparse, mail_on_failure):
394    """Parse a path
395
396    @param db: database handle.
397    @param path: The path to the results to be parsed.
398    @param level: Integer, level of subdirectories to include in the job name.
399    @param reparse: True/False, whether this is reparsing of the job.
400    @param mail_on_failure: whether to send email on FAILED test.
401
402    @returns: A set of job names of the parsed jobs.
403              set(['123-chromeos-test/host1', '123-chromeos-test/host2'])
404    """
405    processed_jobs = set()
406    job_subdirs = _get_job_subdirs(path)
407    if job_subdirs is not None:
408        # parse status.log in current directory, if it exists. multi-machine
409        # synchronous server side tests record output in this directory. without
410        # this check, we do not parse these results.
411        if os.path.exists(os.path.join(path, 'status.log')):
412            new_job = parse_leaf_path(db, path, level, reparse, mail_on_failure)
413            processed_jobs.add(new_job)
414        # multi-machine job
415        for subdir in job_subdirs:
416            jobpath = os.path.join(path, subdir)
417            new_jobs = parse_path(db, jobpath, level + 1, reparse, mail_on_failure)
418            processed_jobs.update(new_jobs)
419    else:
420        # single machine job
421        new_job = parse_leaf_path(db, path, level, reparse, mail_on_failure)
422        processed_jobs.add(new_job)
423    return processed_jobs
424
425
426def record_parsing(processed_jobs, duration_secs):
427    """Record the time spent on parsing to metadata db.
428
429    @param processed_jobs: A set of job names of the parsed jobs.
430              set(['123-chromeos-test/host1', '123-chromeos-test/host2'])
431    @param duration_secs: Total time spent on parsing, in seconds.
432    """
433
434    for job_name in processed_jobs:
435        job_id, hostname = tko_utils.get_afe_job_id_and_hostname(job_name)
436        if not job_id or not hostname:
437            tko_utils.dprint('ERROR: can not parse job name %s, '
438                             'will not send duration to metadata db.'
439                             % job_name)
440            continue
441        else:
442            job_overhead.record_state_duration(
443                    job_id, hostname, job_overhead.STATUS.PARSING,
444                    duration_secs)
445
446
447def main():
448    """Main entrance."""
449    start_time = datetime.datetime.now()
450    # Record the processed jobs so that
451    # we can send the duration of parsing to metadata db.
452    processed_jobs = set()
453
454    options, args = parse_args()
455    results_dir = os.path.abspath(args[0])
456    assert os.path.exists(results_dir)
457
458    pid_file_manager = pidfile.PidFileManager("parser", results_dir)
459
460    if options.write_pidfile:
461        pid_file_manager.open_file()
462
463    try:
464        # build up the list of job dirs to parse
465        if options.singledir:
466            jobs_list = [results_dir]
467        else:
468            jobs_list = [os.path.join(results_dir, subdir)
469                         for subdir in os.listdir(results_dir)]
470
471        # build up the database
472        db = tko_db.db(autocommit=False, host=options.db_host,
473                       user=options.db_user, password=options.db_pass,
474                       database=options.db_name)
475
476        # parse all the jobs
477        for path in jobs_list:
478            lockfile = open(os.path.join(path, ".parse.lock"), "w")
479            flags = fcntl.LOCK_EX
480            if options.noblock:
481                flags |= fcntl.LOCK_NB
482            try:
483                fcntl.flock(lockfile, flags)
484            except IOError, e:
485                # lock is not available and nonblock has been requested
486                if e.errno == errno.EWOULDBLOCK:
487                    lockfile.close()
488                    continue
489                else:
490                    raise # something unexpected happened
491            try:
492                new_jobs = parse_path(db, path, options.level, options.reparse,
493                           options.mailit)
494                processed_jobs.update(new_jobs)
495
496            finally:
497                fcntl.flock(lockfile, fcntl.LOCK_UN)
498                lockfile.close()
499
500    except:
501        pid_file_manager.close_file(1)
502        raise
503    else:
504        pid_file_manager.close_file(0)
505    duration_secs = (datetime.datetime.now() - start_time).total_seconds()
506    if options.record_duration:
507        record_parsing(processed_jobs, duration_secs)
508
509
510if __name__ == "__main__":
511    main()
512