1# Copyright (c) 2013 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Helper class for instrumenation test jar."""
6
7import collections
8import logging
9import os
10import pickle
11import re
12
13from pylib import cmd_helper
14from pylib import constants
15
16
17# If you change the cached output of proguard, increment this number
18PICKLE_FORMAT_VERSION = 1
19
20
21class TestJar(object):
22  _ANNOTATIONS = frozenset(
23      ['Smoke', 'SmallTest', 'MediumTest', 'LargeTest', 'EnormousTest',
24       'FlakyTest', 'DisabledTest', 'Manual', 'PerfTest'])
25  _DEFAULT_ANNOTATION = 'SmallTest'
26  _PROGUARD_CLASS_RE = re.compile(r'\s*?- Program class:\s*([\S]+)$')
27  _PROGUARD_METHOD_RE = re.compile(r'\s*?- Method:\s*(\S*)[(].*$')
28  _PROGUARD_ANNOTATION_RE = re.compile(r'\s*?- Annotation \[L(\S*);\]:$')
29  _PROGUARD_ANNOTATION_CONST_RE = (
30      re.compile(r'\s*?- Constant element value.*$'))
31  _PROGUARD_ANNOTATION_VALUE_RE = re.compile(r'\s*?- \S+? \[(.*)\]$')
32
33  def __init__(self, jar_path):
34    if not os.path.exists(jar_path):
35      raise Exception('%s not found, please build it' % jar_path)
36
37    sdk_root = os.getenv('ANDROID_SDK_ROOT', constants.ANDROID_SDK_ROOT)
38    self._PROGUARD_PATH = os.path.join(sdk_root,
39                                       'tools/proguard/bin/proguard.sh')
40    if not os.path.exists(self._PROGUARD_PATH):
41      self._PROGUARD_PATH = os.path.join(os.environ['ANDROID_BUILD_TOP'],
42                                         'external/proguard/bin/proguard.sh')
43    self._jar_path = jar_path
44    self._annotation_map = collections.defaultdict(list)
45    self._pickled_proguard_name = self._jar_path + '-proguard.pickle'
46    self._test_methods = []
47    if not self._GetCachedProguardData():
48      self._GetProguardData()
49
50  def _GetCachedProguardData(self):
51    if (os.path.exists(self._pickled_proguard_name) and
52        (os.path.getmtime(self._pickled_proguard_name) >
53         os.path.getmtime(self._jar_path))):
54      logging.info('Loading cached proguard output from %s',
55                   self._pickled_proguard_name)
56      try:
57        with open(self._pickled_proguard_name, 'r') as r:
58          d = pickle.loads(r.read())
59        if d['VERSION'] == PICKLE_FORMAT_VERSION:
60          self._annotation_map = d['ANNOTATION_MAP']
61          self._test_methods = d['TEST_METHODS']
62          return True
63      except:
64        logging.warning('PICKLE_FORMAT_VERSION has changed, ignoring cache')
65    return False
66
67  def _GetProguardData(self):
68    proguard_output = cmd_helper.GetCmdOutput([self._PROGUARD_PATH,
69                                               '-injars', self._jar_path,
70                                               '-dontshrink',
71                                               '-dontoptimize',
72                                               '-dontobfuscate',
73                                               '-dontpreverify',
74                                               '-dump',
75                                              ]).split('\n')
76    clazz = None
77    method = None
78    annotation = None
79    has_value = False
80    qualified_method = None
81    for line in proguard_output:
82      m = self._PROGUARD_CLASS_RE.match(line)
83      if m:
84        clazz = m.group(1).replace('/', '.')  # Change package delim.
85        annotation = None
86        continue
87
88      m = self._PROGUARD_METHOD_RE.match(line)
89      if m:
90        method = m.group(1)
91        annotation = None
92        qualified_method = clazz + '#' + method
93        if method.startswith('test') and clazz.endswith('Test'):
94          self._test_methods += [qualified_method]
95        continue
96
97      if not qualified_method:
98        # Ignore non-method annotations.
99        continue
100
101      m = self._PROGUARD_ANNOTATION_RE.match(line)
102      if m:
103        annotation = m.group(1).split('/')[-1]  # Ignore the annotation package.
104        self._annotation_map[qualified_method].append(annotation)
105        has_value = False
106        continue
107      if annotation:
108        if not has_value:
109          m = self._PROGUARD_ANNOTATION_CONST_RE.match(line)
110          if m:
111            has_value = True
112        else:
113          m = self._PROGUARD_ANNOTATION_VALUE_RE.match(line)
114          if m:
115            value = m.group(1)
116            self._annotation_map[qualified_method].append(
117                annotation + ':' + value)
118            has_value = False
119
120    logging.info('Storing proguard output to %s', self._pickled_proguard_name)
121    d = {'VERSION': PICKLE_FORMAT_VERSION,
122         'ANNOTATION_MAP': self._annotation_map,
123         'TEST_METHODS': self._test_methods}
124    with open(self._pickled_proguard_name, 'w') as f:
125      f.write(pickle.dumps(d))
126
127  def _GetAnnotationMap(self):
128    return self._annotation_map
129
130  def _IsTestMethod(self, test):
131    class_name, method = test.split('#')
132    return class_name.endswith('Test') and method.startswith('test')
133
134  def GetTestAnnotations(self, test):
135    """Returns a list of all annotations for the given |test|. May be empty."""
136    if not self._IsTestMethod(test):
137      return []
138    return self._GetAnnotationMap()[test]
139
140  def _AnnotationsMatchFilters(self, annotation_filter_list, annotations):
141    """Checks if annotations match any of the filters."""
142    if not annotation_filter_list:
143      return True
144    for annotation_filter in annotation_filter_list:
145      filters = annotation_filter.split('=')
146      if len(filters) == 2:
147        key = filters[0]
148        value_list = filters[1].split(',')
149        for value in value_list:
150          if key + ':' + value in annotations:
151            return True
152      elif annotation_filter in annotations:
153        return True
154    return False
155
156  def GetAnnotatedTests(self, annotation_filter_list):
157    """Returns a list of all tests that match the given annotation filters."""
158    return [test for test, annotations in self._GetAnnotationMap().iteritems()
159            if self._IsTestMethod(test) and self._AnnotationsMatchFilters(
160                annotation_filter_list, annotations)]
161
162  def GetTestMethods(self):
163    """Returns a list of all test methods in this apk as Class#testMethod."""
164    return self._test_methods
165
166  def _GetTestsMissingAnnotation(self):
167    """Get a list of test methods with no known annotations."""
168    tests_missing_annotations = []
169    for test_method in self.GetTestMethods():
170      annotations_ = frozenset(self.GetTestAnnotations(test_method))
171      if (annotations_.isdisjoint(self._ANNOTATIONS) and
172          not self.IsHostDrivenTest(test_method)):
173        tests_missing_annotations.append(test_method)
174    return sorted(tests_missing_annotations)
175
176  def _GetAllMatchingTests(self, annotation_filter_list,
177                           exclude_annotation_list, test_filter):
178    """Get a list of tests matching any of the annotations and the filter.
179
180    Args:
181      annotation_filter_list: List of test annotations. A test must have at
182        least one of these annotations. A test without any annotations is
183        considered to be SmallTest.
184      exclude_annotation_list: List of test annotations. A test must not have
185        any of these annotations.
186      test_filter: Filter used for partial matching on the test method names.
187
188    Returns:
189      List of all matching tests.
190    """
191    if annotation_filter_list:
192      available_tests = self.GetAnnotatedTests(annotation_filter_list)
193      # Include un-annotated tests in SmallTest.
194      if annotation_filter_list.count(self._DEFAULT_ANNOTATION) > 0:
195        for test in self._GetTestsMissingAnnotation():
196          logging.warning(
197              '%s has no annotations. Assuming "%s".', test,
198              self._DEFAULT_ANNOTATION)
199          available_tests.append(test)
200      if exclude_annotation_list:
201        excluded_tests = self.GetAnnotatedTests(exclude_annotation_list)
202        available_tests = list(set(available_tests) - set(excluded_tests))
203    else:
204      available_tests = [m for m in self.GetTestMethods()
205                         if not self.IsHostDrivenTest(m)]
206
207    tests = []
208    if test_filter:
209      # |available_tests| are in adb instrument format: package.path.class#test.
210      filter_without_hash = test_filter.replace('#', '.')
211      tests = [t for t in available_tests
212               if filter_without_hash in t.replace('#', '.')]
213    else:
214      tests = available_tests
215
216    return tests
217
218  @staticmethod
219  def IsHostDrivenTest(test):
220    return 'pythonDrivenTests' in test
221