1#!/usr/bin/env python3 2# vim: set syntax=python ts=4 : 3# 4# Copyright (c) 2018 Intel Corporation 5# SPDX-License-Identifier: Apache-2.0 6 7import os 8import json 9import logging 10from colorama import Fore 11import xml.etree.ElementTree as ET 12import string 13from datetime import datetime 14 15logger = logging.getLogger('twister') 16logger.setLevel(logging.DEBUG) 17 18class Reporting: 19 20 def __init__(self, plan, env) -> None: 21 self.plan = plan #FIXME 22 self.instances = plan.instances 23 self.platforms = plan.platforms 24 self.selected_platforms = plan.selected_platforms 25 self.filtered_platforms = plan.filtered_platforms 26 self.env = env 27 self.timestamp = datetime.now().isoformat() 28 self.outdir = os.path.abspath(env.options.outdir) 29 30 @staticmethod 31 def process_log(log_file): 32 filtered_string = "" 33 if os.path.exists(log_file): 34 with open(log_file, "rb") as f: 35 log = f.read().decode("utf-8") 36 filtered_string = ''.join(filter(lambda x: x in string.printable, log)) 37 38 return filtered_string 39 40 41 @staticmethod 42 def xunit_testcase(eleTestsuite, name, classname, status, ts_status, reason, duration, runnable, stats, log, build_only_as_skip): 43 fails, passes, errors, skips = stats 44 45 if status in ['skipped', 'filtered']: 46 duration = 0 47 48 eleTestcase = ET.SubElement( 49 eleTestsuite, "testcase", 50 classname=classname, 51 name=f"{name}", 52 time=f"{duration}") 53 54 if status in ['skipped', 'filtered']: 55 skips += 1 56 # temporarily add build_only_as_skip to restore existing CI report behaviour 57 if ts_status == "passed" and not runnable: 58 tc_type = "build" 59 else: 60 tc_type = status 61 ET.SubElement(eleTestcase, 'skipped', type=f"{tc_type}", message=f"{reason}") 62 elif status in ["failed", "blocked"]: 63 fails += 1 64 el = ET.SubElement(eleTestcase, 'failure', type="failure", message=f"{reason}") 65 if log: 66 el.text = log 67 elif status == "error": 68 errors += 1 69 el = ET.SubElement(eleTestcase, 'error', type="failure", message=f"{reason}") 70 if log: 71 el.text = log 72 elif status == 'passed': 73 if not runnable and build_only_as_skip: 74 ET.SubElement(eleTestcase, 'skipped', type="build", message="built only") 75 skips += 1 76 else: 77 passes += 1 78 else: 79 if not status: 80 logger.debug(f"{name}: No status") 81 ET.SubElement(eleTestcase, 'skipped', type=f"untested", message="No results captured, testsuite misconfiguration?") 82 else: 83 logger.error(f"{name}: Unknown status '{status}'") 84 85 return (fails, passes, errors, skips) 86 87 # Generate a report with all testsuites instead of doing this per platform 88 def xunit_report_suites(self, json_file, filename): 89 90 json_data = {} 91 with open(json_file, "r") as json_results: 92 json_data = json.load(json_results) 93 94 95 env = json_data.get('environment', {}) 96 version = env.get('zephyr_version', None) 97 98 eleTestsuites = ET.Element('testsuites') 99 all_suites = json_data.get("testsuites", []) 100 101 suites_to_report = all_suites 102 # do not create entry if everything is filtered out 103 if not self.env.options.detailed_skipped_report: 104 suites_to_report = list(filter(lambda d: d.get('status') != "filtered", all_suites)) 105 106 for suite in suites_to_report: 107 duration = 0 108 eleTestsuite = ET.SubElement(eleTestsuites, 'testsuite', 109 name=suite.get("name"), time="0", 110 timestamp = self.timestamp, 111 tests="0", 112 failures="0", 113 errors="0", skipped="0") 114 eleTSPropetries = ET.SubElement(eleTestsuite, 'properties') 115 # Multiple 'property' can be added to 'properties' 116 # differing by name and value 117 ET.SubElement(eleTSPropetries, 'property', name="version", value=version) 118 ET.SubElement(eleTSPropetries, 'property', name="platform", value=suite.get("platform")) 119 ET.SubElement(eleTSPropetries, 'property', name="architecture", value=suite.get("arch")) 120 121 total = 0 122 fails = passes = errors = skips = 0 123 handler_time = suite.get('execution_time', 0) 124 runnable = suite.get('runnable', 0) 125 duration += float(handler_time) 126 ts_status = suite.get('status') 127 for tc in suite.get("testcases", []): 128 status = tc.get('status') 129 reason = tc.get('reason', suite.get('reason', 'Unknown')) 130 log = tc.get("log", suite.get("log")) 131 132 tc_duration = tc.get('execution_time', handler_time) 133 name = tc.get("identifier") 134 classname = ".".join(name.split(".")[:2]) 135 fails, passes, errors, skips = self.xunit_testcase(eleTestsuite, 136 name, classname, status, ts_status, reason, tc_duration, runnable, 137 (fails, passes, errors, skips), log, True) 138 139 total = errors + passes + fails + skips 140 141 eleTestsuite.attrib['time'] = f"{duration}" 142 eleTestsuite.attrib['failures'] = f"{fails}" 143 eleTestsuite.attrib['errors'] = f"{errors}" 144 eleTestsuite.attrib['skipped'] = f"{skips}" 145 eleTestsuite.attrib['tests'] = f"{total}" 146 147 result = ET.tostring(eleTestsuites) 148 with open(filename, 'wb') as report: 149 report.write(result) 150 151 def xunit_report(self, json_file, filename, selected_platform=None, full_report=False): 152 if selected_platform: 153 selected = [selected_platform] 154 logger.info(f"Writing target report for {selected_platform}...") 155 else: 156 logger.info(f"Writing xunit report {filename}...") 157 selected = self.selected_platforms 158 159 json_data = {} 160 with open(json_file, "r") as json_results: 161 json_data = json.load(json_results) 162 163 164 env = json_data.get('environment', {}) 165 version = env.get('zephyr_version', None) 166 167 eleTestsuites = ET.Element('testsuites') 168 all_suites = json_data.get("testsuites", []) 169 170 for platform in selected: 171 suites = list(filter(lambda d: d['platform'] == platform, all_suites)) 172 # do not create entry if everything is filtered out 173 if not self.env.options.detailed_skipped_report: 174 non_filtered = list(filter(lambda d: d.get('status') != "filtered", suites)) 175 if not non_filtered: 176 continue 177 178 duration = 0 179 eleTestsuite = ET.SubElement(eleTestsuites, 'testsuite', 180 name=platform, 181 timestamp = self.timestamp, 182 time="0", 183 tests="0", 184 failures="0", 185 errors="0", skipped="0") 186 eleTSPropetries = ET.SubElement(eleTestsuite, 'properties') 187 # Multiple 'property' can be added to 'properties' 188 # differing by name and value 189 ET.SubElement(eleTSPropetries, 'property', name="version", value=version) 190 191 total = 0 192 fails = passes = errors = skips = 0 193 for ts in suites: 194 handler_time = ts.get('execution_time', 0) 195 runnable = ts.get('runnable', 0) 196 duration += float(handler_time) 197 198 ts_status = ts.get('status') 199 # Do not report filtered testcases 200 if ts_status == 'filtered' and not self.env.options.detailed_skipped_report: 201 continue 202 if full_report: 203 for tc in ts.get("testcases", []): 204 status = tc.get('status') 205 reason = tc.get('reason', ts.get('reason', 'Unknown')) 206 log = tc.get("log", ts.get("log")) 207 208 tc_duration = tc.get('execution_time', handler_time) 209 name = tc.get("identifier") 210 classname = ".".join(name.split(".")[:2]) 211 fails, passes, errors, skips = self.xunit_testcase(eleTestsuite, 212 name, classname, status, ts_status, reason, tc_duration, runnable, 213 (fails, passes, errors, skips), log, True) 214 else: 215 reason = ts.get('reason', 'Unknown') 216 name = ts.get("name") 217 classname = f"{platform}:{name}" 218 log = ts.get("log") 219 fails, passes, errors, skips = self.xunit_testcase(eleTestsuite, 220 name, classname, ts_status, ts_status, reason, duration, runnable, 221 (fails, passes, errors, skips), log, False) 222 223 total = errors + passes + fails + skips 224 225 eleTestsuite.attrib['time'] = f"{duration}" 226 eleTestsuite.attrib['failures'] = f"{fails}" 227 eleTestsuite.attrib['errors'] = f"{errors}" 228 eleTestsuite.attrib['skipped'] = f"{skips}" 229 eleTestsuite.attrib['tests'] = f"{total}" 230 231 result = ET.tostring(eleTestsuites) 232 with open(filename, 'wb') as report: 233 report.write(result) 234 235 def json_report(self, filename, version="NA", platform=None): 236 logger.info(f"Writing JSON report {filename}") 237 report = {} 238 report["environment"] = {"os": os.name, 239 "zephyr_version": version, 240 "toolchain": self.env.toolchain, 241 "commit_date": self.env.commit_date, 242 "run_date": self.env.run_date 243 } 244 suites = [] 245 246 for instance in self.instances.values(): 247 if platform and platform != instance.platform.name: 248 continue 249 suite = {} 250 handler_log = os.path.join(instance.build_dir, "handler.log") 251 pytest_log = os.path.join(instance.build_dir, "twister_harness.log") 252 build_log = os.path.join(instance.build_dir, "build.log") 253 device_log = os.path.join(instance.build_dir, "device.log") 254 255 handler_time = instance.metrics.get('handler_time', 0) 256 used_ram = instance.metrics.get ("used_ram", 0) 257 used_rom = instance.metrics.get("used_rom",0) 258 available_ram = instance.metrics.get("available_ram", 0) 259 available_rom = instance.metrics.get("available_rom", 0) 260 suite = { 261 "name": instance.testsuite.name, 262 "arch": instance.platform.arch, 263 "platform": instance.platform.name, 264 "path": instance.testsuite.source_dir_rel 265 } 266 if instance.run_id: 267 suite['run_id'] = instance.run_id 268 269 suite["runnable"] = False 270 if instance.status != 'filtered': 271 suite["runnable"] = instance.run 272 273 if used_ram: 274 suite["used_ram"] = used_ram 275 if used_rom: 276 suite["used_rom"] = used_rom 277 278 suite['retries'] = instance.retries 279 280 if instance.dut: 281 suite["dut"] = instance.dut 282 if available_ram: 283 suite["available_ram"] = available_ram 284 if available_rom: 285 suite["available_rom"] = available_rom 286 if instance.status in ["error", "failed"]: 287 suite['status'] = instance.status 288 suite["reason"] = instance.reason 289 # FIXME 290 if os.path.exists(pytest_log): 291 suite["log"] = self.process_log(pytest_log) 292 elif os.path.exists(handler_log): 293 suite["log"] = self.process_log(handler_log) 294 elif os.path.exists(device_log): 295 suite["log"] = self.process_log(device_log) 296 else: 297 suite["log"] = self.process_log(build_log) 298 elif instance.status == 'filtered': 299 suite["status"] = "filtered" 300 suite["reason"] = instance.reason 301 elif instance.status == 'passed': 302 suite["status"] = "passed" 303 elif instance.status == 'skipped': 304 suite["status"] = "skipped" 305 suite["reason"] = instance.reason 306 307 if instance.status is not None: 308 suite["execution_time"] = f"{float(handler_time):.2f}" 309 suite["build_time"] = f"{float(instance.build_time):.2f}" 310 311 testcases = [] 312 313 if len(instance.testcases) == 1: 314 single_case_duration = f"{float(handler_time):.2f}" 315 else: 316 single_case_duration = 0 317 318 for case in instance.testcases: 319 # freeform was set when no sub testcases were parsed, however, 320 # if we discover those at runtime, the fallback testcase wont be 321 # needed anymore and can be removed from the output, it does 322 # not have a status and would otherwise be reported as skipped. 323 if case.freeform and case.status is None and len(instance.testcases) > 1: 324 continue 325 testcase = {} 326 testcase['identifier'] = case.name 327 if instance.status: 328 if single_case_duration: 329 testcase['execution_time'] = single_case_duration 330 else: 331 testcase['execution_time'] = f"{float(case.duration):.2f}" 332 333 if case.output != "": 334 testcase['log'] = case.output 335 336 if case.status == "skipped": 337 if instance.status == "filtered": 338 testcase["status"] = "filtered" 339 else: 340 testcase["status"] = "skipped" 341 testcase["reason"] = case.reason or instance.reason 342 else: 343 testcase["status"] = case.status 344 if case.reason: 345 testcase["reason"] = case.reason 346 347 testcases.append(testcase) 348 349 suite['testcases'] = testcases 350 351 if instance.recording is not None: 352 suite['recording'] = instance.recording 353 354 suites.append(suite) 355 356 report["testsuites"] = suites 357 with open(filename, "wt") as json_file: 358 json.dump(report, json_file, indent=4, separators=(',',':')) 359 360 361 def compare_metrics(self, filename): 362 # name, datatype, lower results better 363 interesting_metrics = [("used_ram", int, True), 364 ("used_rom", int, True)] 365 366 if not os.path.exists(filename): 367 logger.error("Cannot compare metrics, %s not found" % filename) 368 return [] 369 370 results = [] 371 saved_metrics = {} 372 with open(filename) as fp: 373 jt = json.load(fp) 374 for ts in jt.get("testsuites", []): 375 d = {} 376 for m, _, _ in interesting_metrics: 377 d[m] = ts.get(m, 0) 378 ts_name = ts.get('name') 379 ts_platform = ts.get('platform') 380 saved_metrics[(ts_name, ts_platform)] = d 381 382 for instance in self.instances.values(): 383 mkey = (instance.testsuite.name, instance.platform.name) 384 if mkey not in saved_metrics: 385 continue 386 sm = saved_metrics[mkey] 387 for metric, mtype, lower_better in interesting_metrics: 388 if metric not in instance.metrics: 389 continue 390 if sm[metric] == "": 391 continue 392 delta = instance.metrics.get(metric, 0) - mtype(sm[metric]) 393 if delta == 0: 394 continue 395 results.append((instance, metric, instance.metrics.get(metric, 0), delta, 396 lower_better)) 397 return results 398 399 def footprint_reports(self, report, show_footprint, all_deltas, 400 footprint_threshold, last_metrics): 401 if not report: 402 return 403 404 logger.debug("running footprint_reports") 405 deltas = self.compare_metrics(report) 406 warnings = 0 407 if deltas and show_footprint: 408 for i, metric, value, delta, lower_better in deltas: 409 if not all_deltas and ((delta < 0 and lower_better) or 410 (delta > 0 and not lower_better)): 411 continue 412 413 percentage = 0 414 if value > delta: 415 percentage = (float(delta) / float(value - delta)) 416 417 if not all_deltas and (percentage < (footprint_threshold / 100.0)): 418 continue 419 420 logger.info("{:<25} {:<60} {}{}{}: {} {:<+4}, is now {:6} {:+.2%}".format( 421 i.platform.name, i.testsuite.name, Fore.YELLOW, 422 "INFO" if all_deltas else "WARNING", Fore.RESET, 423 metric, delta, value, percentage)) 424 warnings += 1 425 426 if warnings: 427 logger.warning("Deltas based on metrics from last %s" % 428 ("release" if not last_metrics else "run")) 429 430 def synopsis(self): 431 cnt = 0 432 example_instance = None 433 detailed_test_id = self.env.options.detailed_test_id 434 for instance in self.instances.values(): 435 if instance.status not in ["passed", "filtered", "skipped"]: 436 cnt = cnt + 1 437 if cnt == 1: 438 logger.info("-+" * 40) 439 logger.info("The following issues were found (showing the top 10 items):") 440 441 logger.info(f"{cnt}) {instance.testsuite.name} on {instance.platform.name} {instance.status} ({instance.reason})") 442 example_instance = instance 443 if cnt == 10: 444 break 445 446 if cnt and example_instance: 447 logger.info("") 448 logger.info("To rerun the tests, call twister using the following commandline:") 449 extra_parameters = '' if detailed_test_id else ' --no-detailed-test-id' 450 logger.info(f"west twister -p <PLATFORM> -s <TEST ID>{extra_parameters}, for example:") 451 logger.info("") 452 logger.info(f"west twister -p {example_instance.platform.name} -s {example_instance.testsuite.name}" 453 f"{extra_parameters}") 454 logger.info(f"or with west:") 455 logger.info(f"west build -p -b {example_instance.platform.name} " 456 f"{example_instance.testsuite.source_dir_rel} -T {example_instance.testsuite.id}") 457 logger.info("-+" * 40) 458 459 def summary(self, results, unrecognized_sections, duration): 460 failed = 0 461 run = 0 462 for instance in self.instances.values(): 463 if instance.status == "failed": 464 failed += 1 465 elif instance.metrics.get("unrecognized") and not unrecognized_sections: 466 logger.error("%sFAILED%s: %s has unrecognized binary sections: %s" % 467 (Fore.RED, Fore.RESET, instance.name, 468 str(instance.metrics.get("unrecognized", [])))) 469 failed += 1 470 471 # FIXME: need a better way to identify executed tests 472 handler_time = instance.metrics.get('handler_time', 0) 473 if float(handler_time) > 0: 474 run += 1 475 476 if results.total and results.total != results.skipped_configs: 477 pass_rate = (float(results.passed) / float(results.total - results.skipped_configs)) 478 else: 479 pass_rate = 0 480 481 logger.info( 482 "{}{} of {}{} test configurations passed ({:.2%}), {}{}{} failed, {}{}{} errored, {} skipped with {}{}{} warnings in {:.2f} seconds".format( 483 Fore.RED if failed else Fore.GREEN, 484 results.passed, 485 results.total, 486 Fore.RESET, 487 pass_rate, 488 Fore.RED if results.failed else Fore.RESET, 489 results.failed, 490 Fore.RESET, 491 Fore.RED if results.error else Fore.RESET, 492 results.error, 493 Fore.RESET, 494 results.skipped_configs, 495 Fore.YELLOW if self.plan.warnings else Fore.RESET, 496 self.plan.warnings, 497 Fore.RESET, 498 duration)) 499 500 total_platforms = len(self.platforms) 501 # if we are only building, do not report about tests being executed. 502 if self.platforms and not self.env.options.build_only: 503 logger.info("In total {} test cases were executed, {} skipped on {} out of total {} platforms ({:02.2f}%)".format( 504 results.cases - results.skipped_cases, 505 results.skipped_cases, 506 len(self.filtered_platforms), 507 total_platforms, 508 (100 * len(self.filtered_platforms) / len(self.platforms)) 509 )) 510 511 built_only = results.total - run - results.skipped_configs 512 logger.info(f"{Fore.GREEN}{run}{Fore.RESET} test configurations executed on platforms, \ 513{Fore.RED}{built_only}{Fore.RESET} test configurations were only built.") 514 515 def save_reports(self, name, suffix, report_dir, no_update, platform_reports): 516 if not self.instances: 517 return 518 519 logger.info("Saving reports...") 520 if name: 521 report_name = name 522 else: 523 report_name = "twister" 524 525 if report_dir: 526 os.makedirs(report_dir, exist_ok=True) 527 filename = os.path.join(report_dir, report_name) 528 outdir = report_dir 529 else: 530 outdir = self.outdir 531 filename = os.path.join(outdir, report_name) 532 533 if suffix: 534 filename = "{}_{}".format(filename, suffix) 535 536 if not no_update: 537 json_file = filename + ".json" 538 self.json_report(json_file, version=self.env.version) 539 self.xunit_report(json_file, filename + ".xml", full_report=False) 540 self.xunit_report(json_file, filename + "_report.xml", full_report=True) 541 self.xunit_report_suites(json_file, filename + "_suite_report.xml") 542 543 if platform_reports: 544 self.target_report(json_file, outdir, suffix) 545 546 547 def target_report(self, json_file, outdir, suffix): 548 platforms = {inst.platform.name for _, inst in self.instances.items()} 549 for platform in platforms: 550 if suffix: 551 filename = os.path.join(outdir,"{}_{}.xml".format(platform, suffix)) 552 json_platform_file = os.path.join(outdir,"{}_{}.json".format(platform, suffix)) 553 else: 554 filename = os.path.join(outdir,"{}.xml".format(platform)) 555 json_platform_file = os.path.join(outdir,"{}.json".format(platform)) 556 self.xunit_report(json_file, filename, platform, full_report=True) 557 self.json_report(json_platform_file, version=self.env.version, platform=platform) 558