1#!/usr/bin/env python3
2#
3# Copyright 2018 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6#
7
8"""A tool for running puffin tests in a corpus of deflate compressed files."""
9
10import argparse
11import filecmp
12import logging
13import os
14import subprocess
15import sys
16import tempfile
17
18_PUFFHUFF = 'puffhuff'
19_PUFFDIFF = 'puffdiff'
20TESTS = (_PUFFHUFF, _PUFFDIFF)
21
22
23class Error(Exception):
24  """Puffin general processing error."""
25
26
27def ParseArguments(argv):
28  """Parses and Validates command line arguments.
29
30  Args:
31    argv: command line arguments to parse.
32
33  Returns:
34    The arguments list.
35  """
36  parser = argparse.ArgumentParser()
37
38  parser.add_argument('corpus', metavar='CORPUS',
39                      help='A corpus directory containing compressed files')
40  parser.add_argument('-d', '--disabled_tests', default=(), metavar='',
41                      nargs='*',
42                      help=('Space separated list of tests to disable. '
43                            'Allowed options include: ' + ', '.join(TESTS)),
44                      choices=TESTS)
45  parser.add_argument('--cache_size', type=int, metavar='SIZE',
46                      help='The size (in bytes) of the cache for puffpatch '
47                      'operations.')
48  parser.add_argument('--debug', action='store_true',
49                      help='Turns on verbosity.')
50
51  # Parse command-line arguments.
52  args = parser.parse_args(argv)
53
54  if not os.path.isdir(args.corpus):
55    raise Error('Corpus directory {} is non-existent or inaccesible'
56                .format(args.corpus))
57  return args
58
59
60def main(argv):
61  """The main function."""
62  args = ParseArguments(argv[1:])
63
64  if args.debug:
65    logging.getLogger().setLevel(logging.DEBUG)
66
67  # Construct list of appropriate files.
68  files = list(filter(os.path.isfile, [os.path.join(args.corpus, f)
69                                       for f in os.listdir(args.corpus)]))
70
71  # For each file in corpus run puffhuff.
72  if _PUFFHUFF not in args.disabled_tests:
73    for src in files:
74      with tempfile.NamedTemporaryFile() as tgt_file:
75
76        operation = 'puffhuff'
77        logging.debug('Running %s on %s', operation, src)
78        cmd = ['puffin',
79               '--operation={}'.format(operation),
80               '--src_file={}'.format(src),
81               '--dst_file={}'.format(tgt_file.name)]
82        if subprocess.call(cmd) != 0:
83          raise Error('Puffin failed to do {} command: {}'
84                      .format(operation, cmd))
85
86        if not filecmp.cmp(src, tgt_file.name):
87          raise Error('The generated file {} is not equivalent to the original '
88                      'file {} after {} operation.'
89                      .format(tgt_file.name, src, operation))
90
91  if _PUFFDIFF not in args.disabled_tests:
92    # Run puffdiff and puffpatch for each pairs of files in the corpus.
93    for src in files:
94      for tgt in files:
95        with tempfile.NamedTemporaryFile() as patch, \
96             tempfile.NamedTemporaryFile() as new_tgt:
97
98          operation = 'puffdiff'
99          logging.debug('Running %s on %s (%d) and %s (%d)',
100                        operation,
101                        os.path.basename(src), os.stat(src).st_size,
102                        os.path.basename(tgt), os.stat(tgt).st_size)
103          cmd = ['puffin',
104                 '--operation={}'.format(operation),
105                 '--src_file={}'.format(src),
106                 '--dst_file={}'.format(tgt),
107                 '--patch_file={}'.format(patch.name)]
108
109          # Running the puffdiff operation
110          if subprocess.call(cmd) != 0:
111            raise Error('Puffin failed to do {} command: {}'
112                        .format(operation, cmd))
113
114          logging.debug('Patch size is: %d', os.stat(patch.name).st_size)
115
116          operation = 'puffpatch'
117          logging.debug('Running %s on src file %s and patch %s',
118                        operation, os.path.basename(src), patch.name)
119          cmd = ['puffin',
120                 '--operation={}'.format(operation),
121                 '--src_file={}'.format(src),
122                 '--dst_file={}'.format(new_tgt.name),
123                 '--patch_file={}'.format(patch.name)]
124          if args.cache_size:
125            cmd += ['--cache_size={}'.format(args.cache_size)]
126
127          # Running the puffpatch operation
128          if subprocess.call(cmd) != 0:
129            raise Error('Puffin failed to do {} command: {}'
130                        .format(operation, cmd))
131
132          if not filecmp.cmp(tgt, new_tgt.name):
133            raise Error('The generated file {} is not equivalent to the '
134                        'original file {} after puffpatch operation.'
135                        .format(new_tgt.name, tgt))
136
137  return 0
138
139
140if __name__ == '__main__':
141  sys.exit(main(sys.argv))
142