1#!/usr/bin/env python3 2 3"""Analyze the test outcomes from a full CI run. 4 5This script can also run on outcomes from a partial run, but the results are 6less likely to be useful. 7""" 8 9import argparse 10import sys 11import traceback 12import re 13 14import check_test_cases 15 16class Results: 17 """Process analysis results.""" 18 19 def __init__(self): 20 self.error_count = 0 21 self.warning_count = 0 22 23 @staticmethod 24 def log(fmt, *args, **kwargs): 25 sys.stderr.write((fmt + '\n').format(*args, **kwargs)) 26 27 def error(self, fmt, *args, **kwargs): 28 self.log('Error: ' + fmt, *args, **kwargs) 29 self.error_count += 1 30 31 def warning(self, fmt, *args, **kwargs): 32 self.log('Warning: ' + fmt, *args, **kwargs) 33 self.warning_count += 1 34 35class TestCaseOutcomes: 36 """The outcomes of one test case across many configurations.""" 37 # pylint: disable=too-few-public-methods 38 39 def __init__(self): 40 # Collect a list of witnesses of the test case succeeding or failing. 41 # Currently we don't do anything with witnesses except count them. 42 # The format of a witness is determined by the read_outcome_file 43 # function; it's the platform and configuration joined by ';'. 44 self.successes = [] 45 self.failures = [] 46 47 def hits(self): 48 """Return the number of times a test case has been run. 49 50 This includes passes and failures, but not skips. 51 """ 52 return len(self.successes) + len(self.failures) 53 54def analyze_coverage(results, outcomes): 55 """Check that all available test cases are executed at least once.""" 56 available = check_test_cases.collect_available_test_cases() 57 for key in available: 58 hits = outcomes[key].hits() if key in outcomes else 0 59 if hits == 0: 60 # Make this a warning, not an error, as long as we haven't 61 # fixed this branch to have full coverage of test cases. 62 results.warning('Test case not executed: {}', key) 63 64def analyze_driver_vs_reference(outcomes, component_ref, component_driver, 65 ignored_suites, ignored_test=None): 66 """Check that all tests executed in the reference component are also 67 executed in the corresponding driver component. 68 Skip: 69 - full test suites provided in ignored_suites list 70 - only some specific test inside a test suite, for which the corresponding 71 output string is provided 72 """ 73 available = check_test_cases.collect_available_test_cases() 74 result = True 75 76 for key in available: 77 # Continue if test was not executed by any component 78 hits = outcomes[key].hits() if key in outcomes else 0 79 if hits == 0: 80 continue 81 # Skip ignored test suites 82 full_test_suite = key.split(';')[0] # retrieve full test suite name 83 test_string = key.split(';')[1] # retrieve the text string of this test 84 test_suite = full_test_suite.split('.')[0] # retrieve main part of test suite name 85 if test_suite in ignored_suites: 86 continue 87 if ((full_test_suite in ignored_test) and 88 (test_string in ignored_test[full_test_suite])): 89 continue 90 # Search for tests that run in reference component and not in driver component 91 driver_test_passed = False 92 reference_test_passed = False 93 for entry in outcomes[key].successes: 94 if component_driver in entry: 95 driver_test_passed = True 96 if component_ref in entry: 97 reference_test_passed = True 98 if(reference_test_passed and not driver_test_passed): 99 Results.log(key) 100 result = False 101 return result 102 103def analyze_outcomes(outcomes): 104 """Run all analyses on the given outcome collection.""" 105 results = Results() 106 analyze_coverage(results, outcomes) 107 return results 108 109def read_outcome_file(outcome_file): 110 """Parse an outcome file and return an outcome collection. 111 112An outcome collection is a dictionary mapping keys to TestCaseOutcomes objects. 113The keys are the test suite name and the test case description, separated 114by a semicolon. 115""" 116 outcomes = {} 117 with open(outcome_file, 'r', encoding='utf-8') as input_file: 118 for line in input_file: 119 (platform, config, suite, case, result, _cause) = line.split(';') 120 key = ';'.join([suite, case]) 121 setup = ';'.join([platform, config]) 122 if key not in outcomes: 123 outcomes[key] = TestCaseOutcomes() 124 if result == 'PASS': 125 outcomes[key].successes.append(setup) 126 elif result == 'FAIL': 127 outcomes[key].failures.append(setup) 128 return outcomes 129 130def do_analyze_coverage(outcome_file, args): 131 """Perform coverage analysis.""" 132 del args # unused 133 outcomes = read_outcome_file(outcome_file) 134 Results.log("\n*** Analyze coverage ***\n") 135 results = analyze_outcomes(outcomes) 136 return results.error_count == 0 137 138def do_analyze_driver_vs_reference(outcome_file, args): 139 """Perform driver vs reference analyze.""" 140 ignored_suites = ['test_suite_' + x for x in args['ignored_suites']] 141 142 outcomes = read_outcome_file(outcome_file) 143 Results.log("\n*** Analyze driver {} vs reference {} ***\n".format( 144 args['component_driver'], args['component_ref'])) 145 return analyze_driver_vs_reference(outcomes, args['component_ref'], 146 args['component_driver'], ignored_suites, 147 args['ignored_tests']) 148 149# List of tasks with a function that can handle this task and additional arguments if required 150TASKS = { 151 'analyze_coverage': { 152 'test_function': do_analyze_coverage, 153 'args': {} 154 }, 155 # How to use analyze_driver_vs_reference_xxx locally: 156 # 1. tests/scripts/all.sh --outcome-file "$PWD/out.csv" <component_ref> <component_driver> 157 # 2. tests/scripts/analyze_outcomes.py out.csv analyze_driver_vs_reference_xxx 158 'analyze_driver_vs_reference_hash': { 159 'test_function': do_analyze_driver_vs_reference, 160 'args': { 161 'component_ref': 'test_psa_crypto_config_reference_hash_use_psa', 162 'component_driver': 'test_psa_crypto_config_accel_hash_use_psa', 163 'ignored_suites': [ 164 'shax', 'mdx', # the software implementations that are being excluded 165 'md', # the legacy abstraction layer that's being excluded 166 ], 167 'ignored_tests': { 168 } 169 } 170 }, 171 'analyze_driver_vs_reference_ecdsa': { 172 'test_function': do_analyze_driver_vs_reference, 173 'args': { 174 'component_ref': 'test_psa_crypto_config_reference_ecdsa_use_psa', 175 'component_driver': 'test_psa_crypto_config_accel_ecdsa_use_psa', 176 'ignored_suites': [ 177 'ecdsa', # the software implementation that's excluded 178 ], 179 'ignored_tests': { 180 'test_suite_random': [ 181 'PSA classic wrapper: ECDSA signature (SECP256R1)', 182 ], 183 } 184 } 185 }, 186 'analyze_driver_vs_reference_ecdh': { 187 'test_function': do_analyze_driver_vs_reference, 188 'args': { 189 'component_ref': 'test_psa_crypto_config_reference_ecdh_use_psa', 190 'component_driver': 'test_psa_crypto_config_accel_ecdh_use_psa', 191 'ignored_suites': [ 192 'ecdh', # the software implementation that's excluded 193 ], 194 'ignored_tests': { 195 } 196 } 197 }, 198 'analyze_driver_vs_reference_ecjpake': { 199 'test_function': do_analyze_driver_vs_reference, 200 'args': { 201 'component_ref': 'test_psa_crypto_config_reference_ecjpake_use_psa', 202 'component_driver': 'test_psa_crypto_config_accel_ecjpake_use_psa', 203 'ignored_suites': [ 204 'ecjpake', # the software implementation that's excluded 205 ], 206 'ignored_tests': { 207 } 208 } 209 }, 210} 211 212def main(): 213 try: 214 parser = argparse.ArgumentParser(description=__doc__) 215 parser.add_argument('outcomes', metavar='OUTCOMES.CSV', 216 help='Outcome file to analyze') 217 parser.add_argument('task', default='all', nargs='?', 218 help='Analysis to be done. By default, run all tasks. ' 219 'With one or more TASK, run only those. ' 220 'TASK can be the name of a single task or ' 221 'comma/space-separated list of tasks. ') 222 parser.add_argument('--list', action='store_true', 223 help='List all available tasks and exit.') 224 options = parser.parse_args() 225 226 if options.list: 227 for task in TASKS: 228 Results.log(task) 229 sys.exit(0) 230 231 result = True 232 233 if options.task == 'all': 234 tasks = TASKS.keys() 235 else: 236 tasks = re.split(r'[, ]+', options.task) 237 238 for task in tasks: 239 if task not in TASKS: 240 Results.log('Error: invalid task: {}'.format(task)) 241 sys.exit(1) 242 243 for task in TASKS: 244 if task in tasks: 245 if not TASKS[task]['test_function'](options.outcomes, TASKS[task]['args']): 246 result = False 247 248 if result is False: 249 sys.exit(1) 250 Results.log("SUCCESS :-)") 251 except Exception: # pylint: disable=broad-except 252 # Print the backtrace and exit explicitly with our chosen status. 253 traceback.print_exc() 254 sys.exit(120) 255 256if __name__ == '__main__': 257 main() 258