| # Copyright (c) 2013 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Helper class for instrumenation test jar.""" |
| # pylint: disable=W0702 |
| |
| import logging |
| import os |
| import pickle |
| import re |
| import sys |
| import tempfile |
| |
| from pylib import cmd_helper |
| from pylib import constants |
| from pylib.device import device_utils |
| |
| sys.path.insert(0, |
| os.path.join(constants.DIR_SOURCE_ROOT, |
| 'build', 'util', 'lib', 'common')) |
| |
| import unittest_util # pylint: disable=F0401 |
| |
| # If you change the cached output of proguard, increment this number |
| PICKLE_FORMAT_VERSION = 3 |
| |
| |
| class TestJar(object): |
| _ANNOTATIONS = frozenset( |
| ['Smoke', 'SmallTest', 'MediumTest', 'LargeTest', 'EnormousTest', |
| 'FlakyTest', 'DisabledTest', 'Manual', 'PerfTest', 'HostDrivenTest', |
| 'IntegrationTest']) |
| _DEFAULT_ANNOTATION = 'SmallTest' |
| _PROGUARD_CLASS_RE = re.compile(r'\s*?- Program class:\s*([\S]+)$') |
| _PROGUARD_SUPERCLASS_RE = re.compile(r'\s*? Superclass:\s*([\S]+)$') |
| _PROGUARD_METHOD_RE = re.compile(r'\s*?- Method:\s*(\S*)[(].*$') |
| _PROGUARD_ANNOTATION_RE = re.compile(r'\s*?- Annotation \[L(\S*);\]:$') |
| _PROGUARD_ANNOTATION_CONST_RE = ( |
| re.compile(r'\s*?- Constant element value.*$')) |
| _PROGUARD_ANNOTATION_VALUE_RE = re.compile(r'\s*?- \S+? \[(.*)\]$') |
| |
| def __init__(self, jar_path): |
| if not os.path.exists(jar_path): |
| raise Exception('%s not found, please build it' % jar_path) |
| |
| self._PROGUARD_PATH = os.path.join(constants.ANDROID_SDK_ROOT, |
| 'tools/proguard/lib/proguard.jar') |
| if not os.path.exists(self._PROGUARD_PATH): |
| self._PROGUARD_PATH = os.path.join(os.environ['ANDROID_BUILD_TOP'], |
| 'external/proguard/lib/proguard.jar') |
| self._jar_path = jar_path |
| self._pickled_proguard_name = self._jar_path + '-proguard.pickle' |
| self._test_methods = {} |
| if not self._GetCachedProguardData(): |
| self._GetProguardData() |
| |
| @staticmethod |
| def _CalculateMd5(path): |
| # TODO(jbudorick): Move MD5sum calculations out of here and |
| # AndroidCommands to their own module. |
| out = cmd_helper.GetCmdOutput( |
| [os.path.join(constants.GetOutDirectory(), |
| 'md5sum_bin_host'), |
| path]) |
| return out |
| |
| def _GetCachedProguardData(self): |
| if (os.path.exists(self._pickled_proguard_name) and |
| (os.path.getmtime(self._pickled_proguard_name) > |
| os.path.getmtime(self._jar_path))): |
| logging.info('Loading cached proguard output from %s', |
| self._pickled_proguard_name) |
| try: |
| with open(self._pickled_proguard_name, 'r') as r: |
| d = pickle.loads(r.read()) |
| jar_md5 = self._CalculateMd5(self._jar_path) |
| if (d['JAR_MD5SUM'] == jar_md5 and |
| d['VERSION'] == PICKLE_FORMAT_VERSION): |
| self._test_methods = d['TEST_METHODS'] |
| return True |
| except: |
| logging.warning('PICKLE_FORMAT_VERSION has changed, ignoring cache') |
| return False |
| |
| def _GetProguardData(self): |
| logging.info('Retrieving test methods via proguard.') |
| |
| with tempfile.NamedTemporaryFile() as proguard_output: |
| cmd_helper.RunCmd(['java', '-jar', |
| self._PROGUARD_PATH, |
| '-injars', self._jar_path, |
| '-dontshrink', |
| '-dontoptimize', |
| '-dontobfuscate', |
| '-dontpreverify', |
| '-dump', proguard_output.name]) |
| |
| clazzez = {} |
| |
| annotation = None |
| annotation_has_value = False |
| clazz = None |
| method = None |
| |
| for line in proguard_output: |
| if len(line) == 0: |
| annotation = None |
| annotation_has_value = False |
| method = None |
| continue |
| |
| m = self._PROGUARD_CLASS_RE.match(line) |
| if m: |
| clazz = m.group(1).replace('/', '.') |
| clazzez[clazz] = { |
| 'methods': {}, |
| 'annotations': {} |
| } |
| annotation = None |
| annotation_has_value = False |
| method = None |
| continue |
| |
| if not clazz: |
| continue |
| |
| m = self._PROGUARD_SUPERCLASS_RE.match(line) |
| if m: |
| clazzez[clazz]['superclass'] = m.group(1).replace('/', '.') |
| continue |
| |
| if clazz.endswith('Test'): |
| m = self._PROGUARD_METHOD_RE.match(line) |
| if m: |
| method = m.group(1) |
| clazzez[clazz]['methods'][method] = {'annotations': {}} |
| annotation = None |
| annotation_has_value = False |
| continue |
| |
| m = self._PROGUARD_ANNOTATION_RE.match(line) |
| if m: |
| # Ignore the annotation package. |
| annotation = m.group(1).split('/')[-1] |
| if method: |
| clazzez[clazz]['methods'][method]['annotations'][annotation] = None |
| else: |
| clazzez[clazz]['annotations'][annotation] = None |
| continue |
| |
| if annotation: |
| if not annotation_has_value: |
| m = self._PROGUARD_ANNOTATION_CONST_RE.match(line) |
| annotation_has_value = bool(m) |
| else: |
| m = self._PROGUARD_ANNOTATION_VALUE_RE.match(line) |
| if m: |
| if method: |
| clazzez[clazz]['methods'][method]['annotations'][annotation] = ( |
| m.group(1)) |
| else: |
| clazzez[clazz]['annotations'][annotation] = m.group(1) |
| annotation_has_value = None |
| |
| test_clazzez = ((n, i) for n, i in clazzez.items() if n.endswith('Test')) |
| for clazz_name, clazz_info in test_clazzez: |
| logging.info('Processing %s' % clazz_name) |
| c = clazz_name |
| min_sdk_level = None |
| |
| while c in clazzez: |
| c_info = clazzez[c] |
| if not min_sdk_level: |
| min_sdk_level = c_info['annotations'].get('MinAndroidSdkLevel', None) |
| c = c_info.get('superclass', None) |
| |
| for method_name, method_info in clazz_info['methods'].items(): |
| if method_name.startswith('test'): |
| qualified_method = '%s#%s' % (clazz_name, method_name) |
| method_annotations = [] |
| for annotation_name, annotation_value in ( |
| method_info['annotations'].items()): |
| method_annotations.append(annotation_name) |
| if annotation_value: |
| method_annotations.append( |
| annotation_name + ':' + annotation_value) |
| self._test_methods[qualified_method] = { |
| 'annotations': method_annotations |
| } |
| if min_sdk_level is not None: |
| self._test_methods[qualified_method]['min_sdk_level'] = ( |
| int(min_sdk_level)) |
| |
| logging.info('Storing proguard output to %s', self._pickled_proguard_name) |
| d = {'VERSION': PICKLE_FORMAT_VERSION, |
| 'TEST_METHODS': self._test_methods, |
| 'JAR_MD5SUM': self._CalculateMd5(self._jar_path)} |
| with open(self._pickled_proguard_name, 'w') as f: |
| f.write(pickle.dumps(d)) |
| |
| |
| @staticmethod |
| def _IsTestMethod(test): |
| class_name, method = test.split('#') |
| return class_name.endswith('Test') and method.startswith('test') |
| |
| def GetTestAnnotations(self, test): |
| """Returns a list of all annotations for the given |test|. May be empty.""" |
| if not self._IsTestMethod(test) or not test in self._test_methods: |
| return [] |
| return self._test_methods[test]['annotations'] |
| |
| @staticmethod |
| def _AnnotationsMatchFilters(annotation_filter_list, annotations): |
| """Checks if annotations match any of the filters.""" |
| if not annotation_filter_list: |
| return True |
| for annotation_filter in annotation_filter_list: |
| filters = annotation_filter.split('=') |
| if len(filters) == 2: |
| key = filters[0] |
| value_list = filters[1].split(',') |
| for value in value_list: |
| if key + ':' + value in annotations: |
| return True |
| elif annotation_filter in annotations: |
| return True |
| return False |
| |
| def GetAnnotatedTests(self, annotation_filter_list): |
| """Returns a list of all tests that match the given annotation filters.""" |
| return [test for test, attr in self.GetTestMethods().iteritems() |
| if self._IsTestMethod(test) and self._AnnotationsMatchFilters( |
| annotation_filter_list, attr['annotations'])] |
| |
| def GetTestMethods(self): |
| """Returns a dict of all test methods and relevant attributes. |
| |
| Test methods are retrieved as Class#testMethod. |
| """ |
| return self._test_methods |
| |
| def _GetTestsMissingAnnotation(self): |
| """Get a list of test methods with no known annotations.""" |
| tests_missing_annotations = [] |
| for test_method in self.GetTestMethods().iterkeys(): |
| annotations_ = frozenset(self.GetTestAnnotations(test_method)) |
| if (annotations_.isdisjoint(self._ANNOTATIONS) and |
| not self.IsHostDrivenTest(test_method)): |
| tests_missing_annotations.append(test_method) |
| return sorted(tests_missing_annotations) |
| |
| def _IsTestValidForSdkRange(self, test_name, attached_min_sdk_level): |
| required_min_sdk_level = self.GetTestMethods()[test_name].get( |
| 'min_sdk_level', None) |
| return (required_min_sdk_level is None or |
| attached_min_sdk_level >= required_min_sdk_level) |
| |
| def GetAllMatchingTests(self, annotation_filter_list, |
| exclude_annotation_list, test_filter): |
| """Get a list of tests matching any of the annotations and the filter. |
| |
| Args: |
| annotation_filter_list: List of test annotations. A test must have at |
| least one of these annotations. A test without any annotations is |
| considered to be SmallTest. |
| exclude_annotation_list: List of test annotations. A test must not have |
| any of these annotations. |
| test_filter: Filter used for partial matching on the test method names. |
| |
| Returns: |
| List of all matching tests. |
| """ |
| if annotation_filter_list: |
| available_tests = self.GetAnnotatedTests(annotation_filter_list) |
| # Include un-annotated tests in SmallTest. |
| if annotation_filter_list.count(self._DEFAULT_ANNOTATION) > 0: |
| for test in self._GetTestsMissingAnnotation(): |
| logging.warning( |
| '%s has no annotations. Assuming "%s".', test, |
| self._DEFAULT_ANNOTATION) |
| available_tests.append(test) |
| else: |
| available_tests = [m for m in self.GetTestMethods() |
| if not self.IsHostDrivenTest(m)] |
| |
| if exclude_annotation_list: |
| excluded_tests = self.GetAnnotatedTests(exclude_annotation_list) |
| available_tests = list(set(available_tests) - set(excluded_tests)) |
| |
| tests = [] |
| if test_filter: |
| # |available_tests| are in adb instrument format: package.path.class#test. |
| |
| # Maps a 'class.test' name to each 'package.path.class#test' name. |
| sanitized_test_names = dict([ |
| (t.split('.')[-1].replace('#', '.'), t) for t in available_tests]) |
| # Filters 'class.test' names and populates |tests| with the corresponding |
| # 'package.path.class#test' names. |
| tests = [ |
| sanitized_test_names[t] for t in unittest_util.FilterTestNames( |
| sanitized_test_names.keys(), test_filter.replace('#', '.'))] |
| else: |
| tests = available_tests |
| |
| # Filter out any tests with SDK level requirements that don't match the set |
| # of attached devices. |
| sdk_versions = [ |
| int(v) for v in |
| device_utils.DeviceUtils.parallel().GetProp( |
| 'ro.build.version.sdk').pGet(None)] |
| tests = filter( |
| lambda t: self._IsTestValidForSdkRange(t, min(sdk_versions)), |
| tests) |
| |
| return tests |
| |
| @staticmethod |
| def IsHostDrivenTest(test): |
| return 'pythonDrivenTests' in test |