1# SPDX-License-Identifier: Apache-2.0 2from __future__ import annotations 3from asyncio.log import logger 4import platform 5import re 6import os 7import sys 8import subprocess 9import shlex 10from collections import OrderedDict 11import xml.etree.ElementTree as ET 12import logging 13import threading 14import time 15import shutil 16import json 17 18from twisterlib.error import ConfigurationError 19from twisterlib.environment import ZEPHYR_BASE, PYTEST_PLUGIN_INSTALLED 20from twisterlib.handlers import Handler, terminate_process, SUPPORTED_SIMS_IN_PYTEST 21from twisterlib.testinstance import TestInstance 22 23 24logger = logging.getLogger('twister') 25logger.setLevel(logging.DEBUG) 26 27_WINDOWS = platform.system() == 'Windows' 28 29 30result_re = re.compile(r".*(PASS|FAIL|SKIP) - (test_)?(\S*) in (\d*[.,]?\d*) seconds") 31class Harness: 32 GCOV_START = "GCOV_COVERAGE_DUMP_START" 33 GCOV_END = "GCOV_COVERAGE_DUMP_END" 34 FAULT = "ZEPHYR FATAL ERROR" 35 RUN_PASSED = "PROJECT EXECUTION SUCCESSFUL" 36 RUN_FAILED = "PROJECT EXECUTION FAILED" 37 run_id_pattern = r"RunID: (?P<run_id>.*)" 38 39 40 ztest_to_status = { 41 'PASS': 'passed', 42 'SKIP': 'skipped', 43 'BLOCK': 'blocked', 44 'FAIL': 'failed' 45 } 46 47 def __init__(self): 48 self.state = None 49 self.reason = None 50 self.type = None 51 self.regex = [] 52 self.matches = OrderedDict() 53 self.ordered = True 54 self.id = None 55 self.fail_on_fault = True 56 self.fault = False 57 self.capture_coverage = False 58 self.next_pattern = 0 59 self.record = None 60 self.record_pattern = None 61 self.record_as_json = None 62 self.recording = [] 63 self.ztest = False 64 self.detected_suite_names = [] 65 self.run_id = None 66 self.matched_run_id = False 67 self.run_id_exists = False 68 self.instance: TestInstance | None = None 69 self.testcase_output = "" 70 self._match = False 71 72 def configure(self, instance): 73 self.instance = instance 74 config = instance.testsuite.harness_config 75 self.id = instance.testsuite.id 76 self.run_id = instance.run_id 77 if instance.testsuite.ignore_faults: 78 self.fail_on_fault = False 79 80 if config: 81 self.type = config.get('type', None) 82 self.regex = config.get('regex', []) 83 self.ordered = config.get('ordered', True) 84 self.record = config.get('record', {}) 85 if self.record: 86 self.record_pattern = re.compile(self.record.get("regex", "")) 87 self.record_as_json = self.record.get("as_json") 88 89 def build(self): 90 pass 91 92 def get_testcase_name(self): 93 """ 94 Get current TestCase name. 95 """ 96 return self.id 97 98 def translate_record(self, record: dict) -> dict: 99 if self.record_as_json: 100 for k in self.record_as_json: 101 if not k in record: 102 continue 103 try: 104 record[k] = json.loads(record[k]) if record[k] else {} 105 except json.JSONDecodeError as parse_error: 106 logger.warning(f"HARNESS:{self.__class__.__name__}: recording JSON failed:" 107 f" {parse_error} for '{k}':'{record[k]}'") 108 # Don't set the Harness state to failed for recordings. 109 record[k] = { 'ERROR': { 'msg': str(parse_error), 'doc': record[k] } } 110 return record 111 112 def parse_record(self, line) -> re.Match: 113 match = None 114 if self.record_pattern: 115 match = self.record_pattern.search(line) 116 if match: 117 rec = self.translate_record({ k:v.strip() for k,v in match.groupdict(default="").items() }) 118 self.recording.append(rec) 119 return match 120 # 121 122 def process_test(self, line): 123 124 self.parse_record(line) 125 126 runid_match = re.search(self.run_id_pattern, line) 127 if runid_match: 128 run_id = runid_match.group("run_id") 129 self.run_id_exists = True 130 if run_id == str(self.run_id): 131 self.matched_run_id = True 132 133 if self.RUN_PASSED in line: 134 if self.fault: 135 self.state = "failed" 136 self.reason = "Fault detected while running test" 137 else: 138 self.state = "passed" 139 140 if self.RUN_FAILED in line: 141 self.state = "failed" 142 self.reason = "Testsuite failed" 143 144 if self.fail_on_fault: 145 if self.FAULT == line: 146 self.fault = True 147 148 if self.GCOV_START in line: 149 self.capture_coverage = True 150 elif self.GCOV_END in line: 151 self.capture_coverage = False 152 153class Robot(Harness): 154 155 is_robot_test = True 156 157 def configure(self, instance): 158 super(Robot, self).configure(instance) 159 self.instance = instance 160 161 config = instance.testsuite.harness_config 162 if config: 163 self.path = config.get('robot_testsuite', None) 164 self.option = config.get('robot_option', None) 165 166 def handle(self, line): 167 ''' Test cases that make use of this harness care about results given 168 by Robot Framework which is called in run_robot_test(), so works of this 169 handle is trying to give a PASS or FAIL to avoid timeout, nothing 170 is writen into handler.log 171 ''' 172 self.instance.state = "passed" 173 tc = self.instance.get_case_or_create(self.id) 174 tc.status = "passed" 175 176 def run_robot_test(self, command, handler): 177 start_time = time.time() 178 env = os.environ.copy() 179 180 if self.option: 181 if isinstance(self.option, list): 182 for option in self.option: 183 for v in str(option).split(): 184 command.append(f'{v}') 185 else: 186 for v in str(self.option).split(): 187 command.append(f'{v}') 188 189 if self.path is None: 190 raise PytestHarnessException(f'The parameter robot_testsuite is mandatory') 191 192 if isinstance(self.path, list): 193 for suite in self.path: 194 command.append(os.path.join(handler.sourcedir, suite)) 195 else: 196 command.append(os.path.join(handler.sourcedir, self.path)) 197 198 with subprocess.Popen(command, stdout=subprocess.PIPE, 199 stderr=subprocess.STDOUT, cwd=self.instance.build_dir, env=env) as renode_test_proc: 200 out, _ = renode_test_proc.communicate() 201 202 self.instance.execution_time = time.time() - start_time 203 204 if renode_test_proc.returncode == 0: 205 self.instance.status = "passed" 206 # all tests in one Robot file are treated as a single test case, 207 # so its status should be set accordingly to the instance status 208 # please note that there should be only one testcase in testcases list 209 self.instance.testcases[0].status = "passed" 210 else: 211 logger.error("Robot test failure: %s for %s" % 212 (handler.sourcedir, self.instance.platform.name)) 213 self.instance.status = "failed" 214 self.instance.testcases[0].status = "failed" 215 216 if out: 217 with open(os.path.join(self.instance.build_dir, handler.log), "wt") as log: 218 log_msg = out.decode(sys.getdefaultencoding()) 219 log.write(log_msg) 220 221class Console(Harness): 222 223 def get_testcase_name(self): 224 ''' 225 Get current TestCase name. 226 227 Console Harness id has only TestSuite id without TestCase name suffix. 228 Only the first TestCase name might be taken if available when a Ztest with 229 a single test case is configured to use this harness type for simplified 230 output parsing instead of the Ztest harness as Ztest suite should do. 231 ''' 232 if self.instance and len(self.instance.testcases) == 1: 233 return self.instance.testcases[0].name 234 return super(Console, self).get_testcase_name() 235 236 def configure(self, instance): 237 super(Console, self).configure(instance) 238 if self.regex is None or len(self.regex) == 0: 239 self.state = "failed" 240 tc = self.instance.set_case_status_by_name( 241 self.get_testcase_name(), 242 "failed", 243 f"HARNESS:{self.__class__.__name__}:no regex patterns configured." 244 ) 245 raise ConfigurationError(self.instance.name, tc.reason) 246 if self.type == "one_line": 247 self.pattern = re.compile(self.regex[0]) 248 self.patterns_expected = 1 249 elif self.type == "multi_line": 250 self.patterns = [] 251 for r in self.regex: 252 self.patterns.append(re.compile(r)) 253 self.patterns_expected = len(self.patterns) 254 else: 255 self.state = "failed" 256 tc = self.instance.set_case_status_by_name( 257 self.get_testcase_name(), 258 "failed", 259 f"HARNESS:{self.__class__.__name__}:incorrect type={self.type}" 260 ) 261 raise ConfigurationError(self.instance.name, tc.reason) 262 # 263 264 def handle(self, line): 265 if self.type == "one_line": 266 if self.pattern.search(line): 267 logger.debug(f"HARNESS:{self.__class__.__name__}:EXPECTED:" 268 f"'{self.pattern.pattern}'") 269 self.next_pattern += 1 270 self.state = "passed" 271 elif self.type == "multi_line" and self.ordered: 272 if (self.next_pattern < len(self.patterns) and 273 self.patterns[self.next_pattern].search(line)): 274 logger.debug(f"HARNESS:{self.__class__.__name__}:EXPECTED(" 275 f"{self.next_pattern + 1}/{self.patterns_expected}):" 276 f"'{self.patterns[self.next_pattern].pattern}'") 277 self.next_pattern += 1 278 if self.next_pattern >= len(self.patterns): 279 self.state = "passed" 280 elif self.type == "multi_line" and not self.ordered: 281 for i, pattern in enumerate(self.patterns): 282 r = self.regex[i] 283 if pattern.search(line) and not r in self.matches: 284 self.matches[r] = line 285 logger.debug(f"HARNESS:{self.__class__.__name__}:EXPECTED(" 286 f"{len(self.matches)}/{self.patterns_expected}):" 287 f"'{pattern.pattern}'") 288 if len(self.matches) == len(self.regex): 289 self.state = "passed" 290 else: 291 logger.error("Unknown harness_config type") 292 293 if self.fail_on_fault: 294 if self.FAULT in line: 295 self.fault = True 296 297 if self.GCOV_START in line: 298 self.capture_coverage = True 299 elif self.GCOV_END in line: 300 self.capture_coverage = False 301 302 self.process_test(line) 303 # Reset the resulting test state to 'failed' when not all of the patterns were 304 # found in the output, but just ztest's 'PROJECT EXECUTION SUCCESSFUL'. 305 # It might happen because of the pattern sequence diverged from the 306 # test code, the test platform has console issues, or even some other 307 # test image was executed. 308 # TODO: Introduce explicit match policy type to reject 309 # unexpected console output, allow missing patterns, deny duplicates. 310 if self.state == "passed" and self.ordered and self.next_pattern < self.patterns_expected: 311 logger.error(f"HARNESS:{self.__class__.__name__}: failed with" 312 f" {self.next_pattern} of {self.patterns_expected}" 313 f" expected ordered patterns.") 314 self.state = "failed" 315 self.reason = "patterns did not match (ordered)" 316 if self.state == "passed" and not self.ordered and len(self.matches) < self.patterns_expected: 317 logger.error(f"HARNESS:{self.__class__.__name__}: failed with" 318 f" {len(self.matches)} of {self.patterns_expected}" 319 f" expected unordered patterns.") 320 self.state = "failed" 321 self.reason = "patterns did not match (unordered)" 322 323 tc = self.instance.get_case_or_create(self.get_testcase_name()) 324 if self.state == "passed": 325 tc.status = "passed" 326 else: 327 tc.status = "failed" 328 329 330class PytestHarnessException(Exception): 331 """General exception for pytest.""" 332 333 334class Pytest(Harness): 335 336 def configure(self, instance: TestInstance): 337 super(Pytest, self).configure(instance) 338 self.running_dir = instance.build_dir 339 self.source_dir = instance.testsuite.source_dir 340 self.report_file = os.path.join(self.running_dir, 'report.xml') 341 self.pytest_log_file_path = os.path.join(self.running_dir, 'twister_harness.log') 342 self.reserved_serial = None 343 344 def pytest_run(self, timeout): 345 try: 346 cmd = self.generate_command() 347 self.run_command(cmd, timeout) 348 except PytestHarnessException as pytest_exception: 349 logger.error(str(pytest_exception)) 350 self.state = 'failed' 351 self.instance.reason = str(pytest_exception) 352 finally: 353 if self.reserved_serial: 354 self.instance.handler.make_device_available(self.reserved_serial) 355 self.instance.record(self.recording) 356 self._update_test_status() 357 358 def generate_command(self): 359 config = self.instance.testsuite.harness_config 360 handler: Handler = self.instance.handler 361 pytest_root = config.get('pytest_root', ['pytest']) if config else ['pytest'] 362 pytest_args_yaml = config.get('pytest_args', []) if config else [] 363 pytest_dut_scope = config.get('pytest_dut_scope', None) if config else None 364 command = [ 365 'pytest', 366 '--twister-harness', 367 '-s', '-v', 368 f'--build-dir={self.running_dir}', 369 f'--junit-xml={self.report_file}', 370 '--log-file-level=DEBUG', 371 '--log-file-format=%(asctime)s.%(msecs)d:%(levelname)s:%(name)s: %(message)s', 372 f'--log-file={self.pytest_log_file_path}' 373 ] 374 command.extend([os.path.normpath(os.path.join( 375 self.source_dir, os.path.expanduser(os.path.expandvars(src)))) for src in pytest_root]) 376 377 if pytest_dut_scope: 378 command.append(f'--dut-scope={pytest_dut_scope}') 379 380 # Always pass output from the pytest test and the test image up to Twister log. 381 command.extend([ 382 '--log-cli-level=DEBUG', 383 '--log-cli-format=%(levelname)s: %(message)s' 384 ]) 385 386 # Use the test timeout as the base timeout for pytest 387 base_timeout = handler.get_test_timeout() 388 command.append(f'--base-timeout={base_timeout}') 389 390 if handler.type_str == 'device': 391 command.extend( 392 self._generate_parameters_for_hardware(handler) 393 ) 394 elif handler.type_str in SUPPORTED_SIMS_IN_PYTEST: 395 command.append(f'--device-type={handler.type_str}') 396 elif handler.type_str == 'build': 397 command.append('--device-type=custom') 398 else: 399 raise PytestHarnessException(f'Support for handler {handler.type_str} not implemented yet') 400 401 if handler.type_str != 'device': 402 for fixture in handler.options.fixture: 403 command.append(f'--twister-fixture={fixture}') 404 405 if handler.options.pytest_args: 406 command.extend(handler.options.pytest_args) 407 if pytest_args_yaml: 408 logger.warning(f'The pytest_args ({handler.options.pytest_args}) specified ' 409 'in the command line will override the pytest_args defined ' 410 f'in the YAML file {pytest_args_yaml}') 411 else: 412 command.extend(pytest_args_yaml) 413 414 return command 415 416 def _generate_parameters_for_hardware(self, handler: Handler): 417 command = ['--device-type=hardware'] 418 hardware = handler.get_hardware() 419 if not hardware: 420 raise PytestHarnessException('Hardware is not available') 421 # update the instance with the device id to have it in the summary report 422 self.instance.dut = hardware.id 423 424 self.reserved_serial = hardware.serial_pty or hardware.serial 425 if hardware.serial_pty: 426 command.append(f'--device-serial-pty={hardware.serial_pty}') 427 else: 428 command.extend([ 429 f'--device-serial={hardware.serial}', 430 f'--device-serial-baud={hardware.baud}' 431 ]) 432 433 options = handler.options 434 if runner := hardware.runner or options.west_runner: 435 command.append(f'--runner={runner}') 436 437 if hardware.runner_params: 438 for param in hardware.runner_params: 439 command.append(f'--runner-params={param}') 440 441 if options.west_flash and options.west_flash != []: 442 command.append(f'--west-flash-extra-args={options.west_flash}') 443 444 if board_id := hardware.probe_id or hardware.id: 445 command.append(f'--device-id={board_id}') 446 447 if hardware.product: 448 command.append(f'--device-product={hardware.product}') 449 450 if hardware.pre_script: 451 command.append(f'--pre-script={hardware.pre_script}') 452 453 if hardware.post_flash_script: 454 command.append(f'--post-flash-script={hardware.post_flash_script}') 455 456 if hardware.post_script: 457 command.append(f'--post-script={hardware.post_script}') 458 459 if hardware.flash_before: 460 command.append(f'--flash-before={hardware.flash_before}') 461 462 for fixture in hardware.fixtures: 463 command.append(f'--twister-fixture={fixture}') 464 465 return command 466 467 def run_command(self, cmd, timeout): 468 cmd, env = self._update_command_with_env_dependencies(cmd) 469 with subprocess.Popen( 470 cmd, 471 stdout=subprocess.PIPE, 472 stderr=subprocess.STDOUT, 473 env=env 474 ) as proc: 475 try: 476 reader_t = threading.Thread(target=self._output_reader, args=(proc, self), daemon=True) 477 reader_t.start() 478 reader_t.join(timeout) 479 if reader_t.is_alive(): 480 terminate_process(proc) 481 logger.warning('Timeout has occurred. Can be extended in testspec file. ' 482 f'Currently set to {timeout} seconds.') 483 self.instance.reason = 'Pytest timeout' 484 self.state = 'failed' 485 proc.wait(timeout) 486 except subprocess.TimeoutExpired: 487 self.state = 'failed' 488 proc.kill() 489 490 @staticmethod 491 def _update_command_with_env_dependencies(cmd): 492 ''' 493 If python plugin wasn't installed by pip, then try to indicate it to 494 pytest by update PYTHONPATH and append -p argument to pytest command. 495 ''' 496 env = os.environ.copy() 497 if not PYTEST_PLUGIN_INSTALLED: 498 cmd.extend(['-p', 'twister_harness.plugin']) 499 pytest_plugin_path = os.path.join(ZEPHYR_BASE, 'scripts', 'pylib', 'pytest-twister-harness', 'src') 500 env['PYTHONPATH'] = pytest_plugin_path + os.pathsep + env.get('PYTHONPATH', '') 501 if _WINDOWS: 502 cmd_append_python_path = f'set PYTHONPATH={pytest_plugin_path};%PYTHONPATH% && ' 503 else: 504 cmd_append_python_path = f'export PYTHONPATH={pytest_plugin_path}:${{PYTHONPATH}} && ' 505 else: 506 cmd_append_python_path = '' 507 cmd_to_print = cmd_append_python_path + shlex.join(cmd) 508 logger.debug('Running pytest command: %s', cmd_to_print) 509 510 return cmd, env 511 512 @staticmethod 513 def _output_reader(proc, harness): 514 while proc.stdout.readable() and proc.poll() is None: 515 line = proc.stdout.readline().decode().strip() 516 if not line: 517 continue 518 logger.debug('PYTEST: %s', line) 519 harness.parse_record(line) 520 proc.communicate() 521 522 def _update_test_status(self): 523 if not self.state: 524 self.instance.testcases = [] 525 try: 526 self._parse_report_file(self.report_file) 527 except Exception as e: 528 logger.error(f'Error when parsing file {self.report_file}: {e}') 529 self.state = 'failed' 530 finally: 531 if not self.instance.testcases: 532 self.instance.init_cases() 533 534 self.instance.status = self.state or 'failed' 535 if self.instance.status in ['error', 'failed']: 536 self.instance.reason = self.instance.reason or 'Pytest failed' 537 self.instance.add_missing_case_status('blocked', self.instance.reason) 538 539 def _parse_report_file(self, report): 540 tree = ET.parse(report) 541 root = tree.getroot() 542 if elem_ts := root.find('testsuite'): 543 if elem_ts.get('failures') != '0': 544 self.state = 'failed' 545 self.instance.reason = f"{elem_ts.get('failures')}/{elem_ts.get('tests')} pytest scenario(s) failed" 546 elif elem_ts.get('errors') != '0': 547 self.state = 'error' 548 self.instance.reason = 'Error during pytest execution' 549 elif elem_ts.get('skipped') == elem_ts.get('tests'): 550 self.state = 'skipped' 551 else: 552 self.state = 'passed' 553 self.instance.execution_time = float(elem_ts.get('time')) 554 555 for elem_tc in elem_ts.findall('testcase'): 556 tc = self.instance.add_testcase(f"{self.id}.{elem_tc.get('name')}") 557 tc.duration = float(elem_tc.get('time')) 558 elem = elem_tc.find('*') 559 if elem is None: 560 tc.status = 'passed' 561 else: 562 if elem.tag == 'skipped': 563 tc.status = 'skipped' 564 elif elem.tag == 'failure': 565 tc.status = 'failed' 566 else: 567 tc.status = 'error' 568 tc.reason = elem.get('message') 569 tc.output = elem.text 570 else: 571 self.state = 'skipped' 572 self.instance.reason = 'No tests collected' 573 574 575class Gtest(Harness): 576 ANSI_ESCAPE = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') 577 TEST_START_PATTERN = r".*\[ RUN \] (?P<suite_name>[a-zA-Z_][a-zA-Z0-9_]*)\.(?P<test_name>[a-zA-Z_][a-zA-Z0-9_]*)" 578 TEST_PASS_PATTERN = r".*\[ OK \] (?P<suite_name>[a-zA-Z_][a-zA-Z0-9_]*)\.(?P<test_name>[a-zA-Z_][a-zA-Z0-9_]*)" 579 TEST_SKIP_PATTERN = r".*\[ DISABLED \] (?P<suite_name>[a-zA-Z_][a-zA-Z0-9_]*)\.(?P<test_name>[a-zA-Z_][a-zA-Z0-9_]*)" 580 TEST_FAIL_PATTERN = r".*\[ FAILED \] (?P<suite_name>[a-zA-Z_][a-zA-Z0-9_]*)\.(?P<test_name>[a-zA-Z_][a-zA-Z0-9_]*)" 581 FINISHED_PATTERN = r".*\[==========\] Done running all tests\." 582 583 def __init__(self): 584 super().__init__() 585 self.tc = None 586 self.has_failures = False 587 588 def handle(self, line): 589 # Strip the ANSI characters, they mess up the patterns 590 non_ansi_line = self.ANSI_ESCAPE.sub('', line) 591 592 if self.state: 593 return 594 595 # Check if we started running a new test 596 test_start_match = re.search(self.TEST_START_PATTERN, non_ansi_line) 597 if test_start_match: 598 # Add the suite name 599 suite_name = test_start_match.group("suite_name") 600 if suite_name not in self.detected_suite_names: 601 self.detected_suite_names.append(suite_name) 602 603 # Generate the internal name of the test 604 name = "{}.{}.{}".format(self.id, suite_name, test_start_match.group("test_name")) 605 606 # Assert that we don't already have a running test 607 assert ( 608 self.tc is None 609 ), "gTest error, {} didn't finish".format(self.tc) 610 611 # Check that the instance doesn't exist yet (prevents re-running) 612 tc = self.instance.get_case_by_name(name) 613 assert tc is None, "gTest error, {} running twice".format(tc) 614 615 # Create the test instance and set the context 616 tc = self.instance.get_case_or_create(name) 617 self.tc = tc 618 self.tc.status = "started" 619 self.testcase_output += line + "\n" 620 self._match = True 621 622 # Check if the test run finished 623 finished_match = re.search(self.FINISHED_PATTERN, non_ansi_line) 624 if finished_match: 625 tc = self.instance.get_case_or_create(self.id) 626 if self.has_failures or self.tc is not None: 627 self.state = "failed" 628 tc.status = "failed" 629 else: 630 self.state = "passed" 631 tc.status = "passed" 632 return 633 634 # Check if the individual test finished 635 state, name = self._check_result(non_ansi_line) 636 if state is None or name is None: 637 # Nothing finished, keep processing lines 638 return 639 640 # Get the matching test and make sure it's the same as the current context 641 tc = self.instance.get_case_by_name(name) 642 assert ( 643 tc is not None and tc == self.tc 644 ), "gTest error, mismatched tests. Expected {} but got {}".format(self.tc, tc) 645 646 # Test finished, clear the context 647 self.tc = None 648 649 # Update the status of the test 650 tc.status = state 651 if tc.status == "failed": 652 self.has_failures = True 653 tc.output = self.testcase_output 654 self.testcase_output = "" 655 self._match = False 656 657 def _check_result(self, line): 658 test_pass_match = re.search(self.TEST_PASS_PATTERN, line) 659 if test_pass_match: 660 return "passed", "{}.{}.{}".format(self.id, test_pass_match.group("suite_name"), test_pass_match.group("test_name")) 661 test_skip_match = re.search(self.TEST_SKIP_PATTERN, line) 662 if test_skip_match: 663 return "skipped", "{}.{}.{}".format(self.id, test_skip_match.group("suite_name"), test_skip_match.group("test_name")) 664 test_fail_match = re.search(self.TEST_FAIL_PATTERN, line) 665 if test_fail_match: 666 return "failed", "{}.{}.{}".format(self.id, test_fail_match.group("suite_name"), test_fail_match.group("test_name")) 667 return None, None 668 669 670class Test(Harness): 671 RUN_PASSED = "PROJECT EXECUTION SUCCESSFUL" 672 RUN_FAILED = "PROJECT EXECUTION FAILED" 673 test_suite_start_pattern = r"Running TESTSUITE (?P<suite_name>.*)" 674 ZTEST_START_PATTERN = r"START - (test_)?([a-zA-Z0-9_-]+)" 675 676 def handle(self, line): 677 test_suite_match = re.search(self.test_suite_start_pattern, line) 678 if test_suite_match: 679 suite_name = test_suite_match.group("suite_name") 680 self.detected_suite_names.append(suite_name) 681 682 testcase_match = re.search(self.ZTEST_START_PATTERN, line) 683 if testcase_match: 684 name = "{}.{}".format(self.id, testcase_match.group(2)) 685 tc = self.instance.get_case_or_create(name) 686 # Mark the test as started, if something happens here, it is mostly 687 # due to this tests, for example timeout. This should in this case 688 # be marked as failed and not blocked (not run). 689 tc.status = "started" 690 691 if testcase_match or self._match: 692 self.testcase_output += line + "\n" 693 self._match = True 694 695 result_match = result_re.match(line) 696 # some testcases are skipped based on predicates and do not show up 697 # during test execution, however they are listed in the summary. Parse 698 # the summary for status and use that status instead. 699 700 summary_re = re.compile(r"- (PASS|FAIL|SKIP) - \[([^\.]*).(test_)?(\S*)\] duration = (\d*[.,]?\d*) seconds") 701 summary_match = summary_re.match(line) 702 703 if result_match: 704 matched_status = result_match.group(1) 705 name = "{}.{}".format(self.id, result_match.group(3)) 706 tc = self.instance.get_case_or_create(name) 707 tc.status = self.ztest_to_status[matched_status] 708 if tc.status == "skipped": 709 tc.reason = "ztest skip" 710 tc.duration = float(result_match.group(4)) 711 if tc.status == "failed": 712 tc.output = self.testcase_output 713 self.testcase_output = "" 714 self._match = False 715 self.ztest = True 716 elif summary_match: 717 matched_status = summary_match.group(1) 718 self.detected_suite_names.append(summary_match.group(2)) 719 name = "{}.{}".format(self.id, summary_match.group(4)) 720 tc = self.instance.get_case_or_create(name) 721 tc.status = self.ztest_to_status[matched_status] 722 if tc.status == "skipped": 723 tc.reason = "ztest skip" 724 tc.duration = float(summary_match.group(5)) 725 if tc.status == "failed": 726 tc.output = self.testcase_output 727 self.testcase_output = "" 728 self._match = False 729 self.ztest = True 730 731 self.process_test(line) 732 733 if not self.ztest and self.state: 734 logger.debug(f"not a ztest and no state for {self.id}") 735 tc = self.instance.get_case_or_create(self.id) 736 if self.state == "passed": 737 tc.status = "passed" 738 else: 739 tc.status = "failed" 740 tc.reason = "Test failure" 741 742 743class Ztest(Test): 744 pass 745 746 747class Bsim(Harness): 748 749 def build(self): 750 """ 751 Copying the application executable to BabbleSim's bin directory enables 752 running multidevice bsim tests after twister has built them. 753 """ 754 755 if self.instance is None: 756 return 757 758 original_exe_path: str = os.path.join(self.instance.build_dir, 'zephyr', 'zephyr.exe') 759 if not os.path.exists(original_exe_path): 760 logger.warning('Cannot copy bsim exe - cannot find original executable.') 761 return 762 763 bsim_out_path: str = os.getenv('BSIM_OUT_PATH', '') 764 if not bsim_out_path: 765 logger.warning('Cannot copy bsim exe - BSIM_OUT_PATH not provided.') 766 return 767 768 new_exe_name: str = self.instance.testsuite.harness_config.get('bsim_exe_name', '') 769 if new_exe_name: 770 new_exe_name = f'bs_{self.instance.platform.name}_{new_exe_name}' 771 else: 772 new_exe_name = self.instance.name 773 new_exe_name = f'bs_{new_exe_name}' 774 775 new_exe_name = new_exe_name.replace(os.path.sep, '_').replace('.', '_').replace('@', '_') 776 777 new_exe_path: str = os.path.join(bsim_out_path, 'bin', new_exe_name) 778 logger.debug(f'Copying executable from {original_exe_path} to {new_exe_path}') 779 shutil.copy(original_exe_path, new_exe_path) 780 781 782class HarnessImporter: 783 784 @staticmethod 785 def get_harness(harness_name): 786 thismodule = sys.modules[__name__] 787 try: 788 if harness_name: 789 harness_class = getattr(thismodule, harness_name) 790 else: 791 harness_class = getattr(thismodule, 'Test') 792 return harness_class() 793 except AttributeError as e: 794 logger.debug(f"harness {harness_name} not implemented: {e}") 795 return None 796