1# SPDX-License-Identifier: Apache-2.0 2from __future__ import annotations 3 4import json 5import logging 6import os 7import platform 8import re 9import shlex 10import shutil 11import subprocess 12import sys 13import threading 14import time 15import xml.etree.ElementTree as ET 16from collections import OrderedDict 17from enum import Enum 18 19import junitparser.junitparser as junit 20import yaml 21from pytest import ExitCode 22from twisterlib.constants import SUPPORTED_SIMS_IN_PYTEST 23from twisterlib.environment import PYTEST_PLUGIN_INSTALLED, ZEPHYR_BASE 24from twisterlib.error import ConfigurationError, StatusAttributeError 25from twisterlib.handlers import Handler, terminate_process 26from twisterlib.reports import ReportStatus 27from twisterlib.statuses import TwisterStatus 28from twisterlib.testinstance import TestInstance 29 30logger = logging.getLogger('twister') 31logger.setLevel(logging.DEBUG) 32 33_WINDOWS = platform.system() == 'Windows' 34 35 36class Harness: 37 GCOV_START = "GCOV_COVERAGE_DUMP_START" 38 GCOV_END = "GCOV_COVERAGE_DUMP_END" 39 FAULT = "ZEPHYR FATAL ERROR" 40 RUN_PASSED = "PROJECT EXECUTION SUCCESSFUL" 41 RUN_FAILED = "PROJECT EXECUTION FAILED" 42 run_id_pattern = r"RunID: (?P<run_id>.*)" 43 44 def __init__(self): 45 self._status = TwisterStatus.NONE 46 self.reason = None 47 self.type = None 48 self.regex = [] 49 self.matches = OrderedDict() 50 self.ordered = True 51 self.id = None 52 self.fail_on_fault = True 53 self.fault = False 54 self.capture_coverage = False 55 self.next_pattern = 0 56 self.record = None 57 self.record_patterns = [] 58 self.record_merge = False 59 self.record_as_json = None 60 self.recording = [] 61 self.ztest = False 62 self.detected_suite_names = [] 63 self.run_id = None 64 self.started_suites = {} 65 self.started_cases = {} 66 self.matched_run_id = False 67 self.run_id_exists = False 68 self.instance: TestInstance | None = None 69 self.testcase_output = "" 70 self._match = False 71 72 73 @property 74 def trace(self) -> bool: 75 return self.instance.handler.options.verbose > 2 76 77 @property 78 def status(self) -> TwisterStatus: 79 return self._status 80 81 @status.setter 82 def status(self, value : TwisterStatus) -> None: 83 # Check for illegal assignments by value 84 try: 85 key = value.name if isinstance(value, Enum) else value 86 self._status = TwisterStatus[key] 87 except KeyError as err: 88 raise StatusAttributeError(self.__class__, value) from err 89 90 def configure(self, instance): 91 self.instance = instance 92 config = instance.testsuite.harness_config 93 self.id = instance.testsuite.id 94 self.run_id = instance.run_id 95 if instance.testsuite.ignore_faults: 96 self.fail_on_fault = False 97 98 if config: 99 self.type = config.get('type', None) 100 self.regex = config.get('regex', []) 101 self.ordered = config.get('ordered', True) 102 self.record = config.get('record', {}) 103 if self.record: 104 self.record_patterns = [re.compile(p) for p in self.record.get("regex", [])] 105 self.record_merge = self.record.get("merge", False) 106 self.record_as_json = self.record.get("as_json") 107 108 def build(self): 109 pass 110 111 def get_testcase_name(self): 112 """ 113 Get current TestCase name. 114 """ 115 return self.id 116 117 def translate_record(self, record: dict) -> dict: 118 if self.record_as_json: 119 for k in self.record_as_json: 120 if k not in record: 121 continue 122 try: 123 record[k] = json.loads(record[k]) if record[k] else {} 124 except json.JSONDecodeError as parse_error: 125 logger.warning(f"HARNESS:{self.__class__.__name__}: recording JSON failed:" 126 f" {parse_error} for '{k}':'{record[k]}'") 127 # Don't set the Harness state to failed for recordings. 128 record[k] = { 'ERROR': { 'msg': str(parse_error), 'doc': record[k] } } 129 return record 130 131 def parse_record(self, line) -> int: 132 match_cnt = 0 133 for record_pattern in self.record_patterns: 134 match = record_pattern.search(line) 135 if match: 136 match_cnt += 1 137 rec = self.translate_record( 138 { k:v.strip() for k,v in match.groupdict(default="").items() } 139 ) 140 if self.record_merge and len(self.recording) > 0: 141 for k,v in rec.items(): 142 if k in self.recording[0]: 143 if isinstance(self.recording[0][k], list): 144 self.recording[0][k].append(v) 145 else: 146 self.recording[0][k] = [self.recording[0][k], v] 147 else: 148 self.recording[0][k] = v 149 else: 150 self.recording.append(rec) 151 return match_cnt 152 153 def process_test(self, line): 154 155 self.parse_record(line) 156 157 runid_match = re.search(self.run_id_pattern, line) 158 if runid_match: 159 run_id = runid_match.group("run_id") 160 self.run_id_exists = True 161 if run_id == str(self.run_id): 162 self.matched_run_id = True 163 164 if self.RUN_PASSED in line: 165 if self.fault: 166 self.status = TwisterStatus.FAIL 167 self.reason = "Fault detected while running test" 168 else: 169 self.status = TwisterStatus.PASS 170 171 if self.RUN_FAILED in line: 172 self.status = TwisterStatus.FAIL 173 self.reason = "Testsuite failed" 174 175 if self.fail_on_fault and line == self.FAULT: 176 self.fault = True 177 178 if self.GCOV_START in line: 179 self.capture_coverage = True 180 elif self.GCOV_END in line: 181 self.capture_coverage = False 182 183class Robot(Harness): 184 185 is_robot_test = True 186 187 def configure(self, instance): 188 super().configure(instance) 189 self.instance = instance 190 191 config = instance.testsuite.harness_config 192 if config: 193 self.path = config.get('robot_testsuite', None) 194 self.option = config.get('robot_option', None) 195 196 def handle(self, line): 197 ''' Test cases that make use of this harness care about results given 198 by Robot Framework which is called in run_robot_test(), so works of this 199 handle is trying to give a PASS or FAIL to avoid timeout, nothing 200 is written into handler.log 201 ''' 202 self.instance.status = TwisterStatus.PASS 203 tc = self.instance.get_case_or_create(self.id) 204 tc.status = TwisterStatus.PASS 205 206 def run_robot_test(self, command, handler): 207 start_time = time.time() 208 env = os.environ.copy() 209 210 if self.option: 211 if isinstance(self.option, list): 212 for option in self.option: 213 for v in str(option).split(): 214 command.append(f'{v}') 215 else: 216 for v in str(self.option).split(): 217 command.append(f'{v}') 218 219 if self.path is None: 220 raise PytestHarnessException('The parameter robot_testsuite is mandatory') 221 222 if isinstance(self.path, list): 223 for suite in self.path: 224 command.append(os.path.join(handler.sourcedir, suite)) 225 else: 226 command.append(os.path.join(handler.sourcedir, self.path)) 227 228 with subprocess.Popen(command, stdout=subprocess.PIPE, 229 stderr=subprocess.STDOUT, cwd=self.instance.build_dir, env=env) as renode_test_proc: 230 out, _ = renode_test_proc.communicate() 231 232 self.instance.execution_time = time.time() - start_time 233 234 if renode_test_proc.returncode == 0: 235 self.instance.status = TwisterStatus.PASS 236 # all tests in one Robot file are treated as a single test case, 237 # so its status should be set accordingly to the instance status 238 # please note that there should be only one testcase in testcases list 239 self.instance.testcases[0].status = TwisterStatus.PASS 240 else: 241 logger.error( 242 f"Robot test failure: {handler.sourcedir} for {self.instance.platform.name}" 243 ) 244 self.instance.status = TwisterStatus.FAIL 245 self.instance.testcases[0].status = TwisterStatus.FAIL 246 247 if out: 248 with open(os.path.join(self.instance.build_dir, handler.log), 'w') as log: 249 log_msg = out.decode(sys.getdefaultencoding()) 250 log.write(log_msg) 251 252class Console(Harness): 253 254 def get_testcase_name(self): 255 ''' 256 Get current TestCase name. 257 258 Console Harness id has only TestSuite id without TestCase name suffix. 259 Only the first TestCase name might be taken if available when a Ztest with 260 a single test case is configured to use this harness type for simplified 261 output parsing instead of the Ztest harness as Ztest suite should do. 262 ''' 263 if self.instance and len(self.instance.testcases) == 1: 264 return self.instance.testcases[0].name 265 return super().get_testcase_name() 266 267 def configure(self, instance): 268 super().configure(instance) 269 if self.regex is None or len(self.regex) == 0: 270 self.status = TwisterStatus.FAIL 271 tc = self.instance.set_case_status_by_name( 272 self.get_testcase_name(), 273 TwisterStatus.FAIL, 274 f"HARNESS:{self.__class__.__name__}:no regex patterns configured." 275 ) 276 raise ConfigurationError(self.instance.name, tc.reason) 277 if self.type == "one_line": 278 self.pattern = re.compile(self.regex[0]) 279 self.patterns_expected = 1 280 elif self.type == "multi_line": 281 self.patterns = [] 282 for r in self.regex: 283 self.patterns.append(re.compile(r)) 284 self.patterns_expected = len(self.patterns) 285 else: 286 self.status = TwisterStatus.FAIL 287 tc = self.instance.set_case_status_by_name( 288 self.get_testcase_name(), 289 TwisterStatus.FAIL, 290 f"HARNESS:{self.__class__.__name__}:incorrect type={self.type}" 291 ) 292 raise ConfigurationError(self.instance.name, tc.reason) 293 # 294 295 def handle(self, line): 296 if self.type == "one_line": 297 if self.pattern.search(line): 298 logger.debug(f"HARNESS:{self.__class__.__name__}:EXPECTED:" 299 f"'{self.pattern.pattern}'") 300 self.next_pattern += 1 301 self.status = TwisterStatus.PASS 302 elif self.type == "multi_line" and self.ordered: 303 if (self.next_pattern < len(self.patterns) and 304 self.patterns[self.next_pattern].search(line)): 305 logger.debug(f"HARNESS:{self.__class__.__name__}:EXPECTED(" 306 f"{self.next_pattern + 1}/{self.patterns_expected}):" 307 f"'{self.patterns[self.next_pattern].pattern}'") 308 self.next_pattern += 1 309 if self.next_pattern >= len(self.patterns): 310 self.status = TwisterStatus.PASS 311 elif self.type == "multi_line" and not self.ordered: 312 for i, pattern in enumerate(self.patterns): 313 r = self.regex[i] 314 if pattern.search(line) and r not in self.matches: 315 self.matches[r] = line 316 logger.debug(f"HARNESS:{self.__class__.__name__}:EXPECTED(" 317 f"{len(self.matches)}/{self.patterns_expected}):" 318 f"'{pattern.pattern}'") 319 if len(self.matches) == len(self.regex): 320 self.status = TwisterStatus.PASS 321 else: 322 logger.error("Unknown harness_config type") 323 324 if self.fail_on_fault and self.FAULT in line: 325 self.fault = True 326 327 if self.GCOV_START in line: 328 self.capture_coverage = True 329 elif self.GCOV_END in line: 330 self.capture_coverage = False 331 332 self.process_test(line) 333 # Reset the resulting test state to FAIL when not all of the patterns were 334 # found in the output, but just ztest's 'PROJECT EXECUTION SUCCESSFUL'. 335 # It might happen because of the pattern sequence diverged from the 336 # test code, the test platform has console issues, or even some other 337 # test image was executed. 338 # TODO: Introduce explicit match policy type to reject 339 # unexpected console output, allow missing patterns, deny duplicates. 340 if self.status == TwisterStatus.PASS and \ 341 self.ordered and \ 342 self.next_pattern < self.patterns_expected: 343 logger.error(f"HARNESS:{self.__class__.__name__}: failed with" 344 f" {self.next_pattern} of {self.patterns_expected}" 345 f" expected ordered patterns.") 346 self.status = TwisterStatus.FAIL 347 self.reason = "patterns did not match (ordered)" 348 if self.status == TwisterStatus.PASS and \ 349 not self.ordered and \ 350 len(self.matches) < self.patterns_expected: 351 logger.error(f"HARNESS:{self.__class__.__name__}: failed with" 352 f" {len(self.matches)} of {self.patterns_expected}" 353 f" expected unordered patterns.") 354 self.status = TwisterStatus.FAIL 355 self.reason = "patterns did not match (unordered)" 356 357 tc = self.instance.get_case_or_create(self.get_testcase_name()) 358 if self.status == TwisterStatus.PASS: 359 tc.status = TwisterStatus.PASS 360 else: 361 tc.status = TwisterStatus.FAIL 362 363 364class PytestHarnessException(Exception): 365 """General exception for pytest.""" 366 367 368class Pytest(Harness): 369 370 def configure(self, instance: TestInstance): 371 super().configure(instance) 372 self.running_dir = instance.build_dir 373 self.source_dir = instance.testsuite.source_dir 374 self.report_file = os.path.join(self.running_dir, 'report.xml') 375 self.pytest_log_file_path = os.path.join(self.running_dir, 'twister_harness.log') 376 self.reserved_dut = None 377 self._output = [] 378 379 def pytest_run(self, timeout): 380 try: 381 cmd = self.generate_command() 382 self.run_command(cmd, timeout) 383 except PytestHarnessException as pytest_exception: 384 logger.error(str(pytest_exception)) 385 self.status = TwisterStatus.FAIL 386 self.instance.reason = str(pytest_exception) 387 finally: 388 self.instance.record(self.recording) 389 self._update_test_status() 390 if self.reserved_dut: 391 self.instance.handler.make_dut_available(self.reserved_dut) 392 393 def generate_command(self): 394 config = self.instance.testsuite.harness_config 395 handler: Handler = self.instance.handler 396 pytest_root = config.get('pytest_root', ['pytest']) if config else ['pytest'] 397 pytest_args_yaml = config.get('pytest_args', []) if config else [] 398 pytest_dut_scope = config.get('pytest_dut_scope', None) if config else None 399 command = [ 400 'pytest', 401 '--twister-harness', 402 '-s', '-v', 403 f'--build-dir={self.running_dir}', 404 f'--junit-xml={self.report_file}', 405 f'--platform={self.instance.platform.name}' 406 ] 407 408 command.extend([os.path.normpath(os.path.join( 409 self.source_dir, os.path.expanduser(os.path.expandvars(src)))) for src in pytest_root]) 410 411 if pytest_dut_scope: 412 command.append(f'--dut-scope={pytest_dut_scope}') 413 414 # Always pass output from the pytest test and the test image up to Twister log. 415 command.extend([ 416 '--log-cli-level=DEBUG', 417 '--log-cli-format=%(levelname)s: %(message)s' 418 ]) 419 420 # Use the test timeout as the base timeout for pytest 421 base_timeout = handler.get_test_timeout() 422 command.append(f'--base-timeout={base_timeout}') 423 424 if handler.type_str == 'device': 425 command.extend( 426 self._generate_parameters_for_hardware(handler) 427 ) 428 elif handler.type_str in SUPPORTED_SIMS_IN_PYTEST: 429 command.append(f'--device-type={handler.type_str}') 430 elif handler.type_str == 'build': 431 command.append('--device-type=custom') 432 else: 433 raise PytestHarnessException( 434 f'Support for handler {handler.type_str} not implemented yet' 435 ) 436 437 if handler.type_str != 'device': 438 for fixture in handler.options.fixture: 439 command.append(f'--twister-fixture={fixture}') 440 441 if handler.options.extra_test_args and handler.type_str == 'native': 442 command.append(f'--extra-test-args={shlex.join(handler.options.extra_test_args)}') 443 444 command.extend(pytest_args_yaml) 445 446 if handler.options.pytest_args: 447 command.extend(handler.options.pytest_args) 448 449 return command 450 451 def _generate_parameters_for_hardware(self, handler: Handler): 452 command = ['--device-type=hardware'] 453 hardware = handler.get_hardware() 454 if not hardware: 455 raise PytestHarnessException('Hardware is not available') 456 # update the instance with the device id to have it in the summary report 457 self.instance.dut = hardware.id 458 459 self.reserved_dut = hardware 460 if hardware.serial_pty: 461 command.append(f'--device-serial-pty={hardware.serial_pty}') 462 else: 463 command.extend([ 464 f'--device-serial={hardware.serial}', 465 f'--device-serial-baud={hardware.baud}' 466 ]) 467 468 if hardware.flash_timeout: 469 command.append(f'--flash-timeout={hardware.flash_timeout}') 470 471 options = handler.options 472 if runner := hardware.runner or options.west_runner: 473 command.append(f'--runner={runner}') 474 475 if hardware.runner_params: 476 for param in hardware.runner_params: 477 command.append(f'--runner-params={param}') 478 479 if options.west_flash and options.west_flash != []: 480 command.append(f'--west-flash-extra-args={options.west_flash}') 481 482 if board_id := hardware.probe_id or hardware.id: 483 command.append(f'--device-id={board_id}') 484 485 if hardware.product: 486 command.append(f'--device-product={hardware.product}') 487 488 if hardware.pre_script: 489 command.append(f'--pre-script={hardware.pre_script}') 490 491 if hardware.post_flash_script: 492 command.append(f'--post-flash-script={hardware.post_flash_script}') 493 494 if hardware.post_script: 495 command.append(f'--post-script={hardware.post_script}') 496 497 if hardware.flash_before: 498 command.append(f'--flash-before={hardware.flash_before}') 499 500 for fixture in hardware.fixtures: 501 command.append(f'--twister-fixture={fixture}') 502 503 return command 504 505 def run_command(self, cmd, timeout): 506 cmd, env = self._update_command_with_env_dependencies(cmd) 507 with subprocess.Popen( 508 cmd, 509 stdout=subprocess.PIPE, 510 stderr=subprocess.STDOUT, 511 env=env 512 ) as proc: 513 try: 514 reader_t = threading.Thread(target=self._output_reader, args=(proc,), daemon=True) 515 reader_t.start() 516 reader_t.join(timeout) 517 if reader_t.is_alive(): 518 terminate_process(proc) 519 logger.warning('Timeout has occurred. Can be extended in testspec file. ' 520 f'Currently set to {timeout} seconds.') 521 self.instance.reason = 'Pytest timeout' 522 self.status = TwisterStatus.FAIL 523 proc.wait(timeout) 524 except subprocess.TimeoutExpired: 525 self.status = TwisterStatus.FAIL 526 proc.kill() 527 528 if proc.returncode in (ExitCode.INTERRUPTED, ExitCode.USAGE_ERROR, ExitCode.INTERNAL_ERROR): 529 self.status = TwisterStatus.ERROR 530 self.instance.reason = f'Pytest error - return code {proc.returncode}' 531 with open(self.pytest_log_file_path, 'w') as log_file: 532 log_file.write(shlex.join(cmd) + '\n\n') 533 log_file.write('\n'.join(self._output)) 534 535 @staticmethod 536 def _update_command_with_env_dependencies(cmd): 537 ''' 538 If python plugin wasn't installed by pip, then try to indicate it to 539 pytest by update PYTHONPATH and append -p argument to pytest command. 540 ''' 541 env = os.environ.copy() 542 if not PYTEST_PLUGIN_INSTALLED: 543 cmd.extend(['-p', 'twister_harness.plugin']) 544 pytest_plugin_path = os.path.join( 545 ZEPHYR_BASE, 546 'scripts', 547 'pylib', 548 'pytest-twister-harness', 549 'src' 550 ) 551 env['PYTHONPATH'] = pytest_plugin_path + os.pathsep + env.get('PYTHONPATH', '') 552 if _WINDOWS: 553 cmd_append_python_path = f'set PYTHONPATH={pytest_plugin_path};%PYTHONPATH% && ' 554 else: 555 cmd_append_python_path = ( 556 f'export PYTHONPATH={pytest_plugin_path}:${{PYTHONPATH}} && ' 557 ) 558 else: 559 cmd_append_python_path = '' 560 cmd_to_print = cmd_append_python_path + shlex.join(cmd) 561 logger.debug(f'Running pytest command: {cmd_to_print}') 562 563 return cmd, env 564 565 def _output_reader(self, proc): 566 self._output = [] 567 while proc.stdout.readable() and proc.poll() is None: 568 line = proc.stdout.readline().decode().strip() 569 if not line: 570 continue 571 self._output.append(line) 572 logger.debug(f'PYTEST: {line}') 573 self.parse_record(line) 574 proc.communicate() 575 576 def _update_test_status(self): 577 if self.status == TwisterStatus.NONE: 578 self.instance.testcases = [] 579 try: 580 self._parse_report_file(self.report_file) 581 except Exception as e: 582 logger.error(f'Error when parsing file {self.report_file}: {e}') 583 self.status = TwisterStatus.FAIL 584 finally: 585 if not self.instance.testcases: 586 self.instance.init_cases() 587 588 self.instance.status = self.status if self.status != TwisterStatus.NONE else \ 589 TwisterStatus.FAIL 590 if self.instance.status in [TwisterStatus.ERROR, TwisterStatus.FAIL]: 591 self.instance.reason = self.instance.reason or 'Pytest failed' 592 self.instance.add_missing_case_status(TwisterStatus.BLOCK, self.instance.reason) 593 594 def _parse_report_file(self, report): 595 tree = ET.parse(report) 596 root = tree.getroot() 597 598 if (elem_ts := root.find('testsuite')) is not None: 599 if elem_ts.get('failures') != '0': 600 self.status = TwisterStatus.FAIL 601 self.instance.reason = ( 602 f"{elem_ts.get('failures')}/{elem_ts.get('tests')} pytest scenario(s) failed" 603 ) 604 elif elem_ts.get('errors') != '0': 605 self.status = TwisterStatus.ERROR 606 self.instance.reason = 'Error during pytest execution' 607 elif elem_ts.get('skipped') == elem_ts.get('tests'): 608 self.status = TwisterStatus.SKIP 609 else: 610 self.status = TwisterStatus.PASS 611 self.instance.execution_time = float(elem_ts.get('time')) 612 613 for elem_tc in elem_ts.findall('testcase'): 614 tc = self.instance.add_testcase(f"{self.id}.{elem_tc.get('name')}") 615 tc.duration = float(elem_tc.get('time')) 616 elem = elem_tc.find('*') 617 if elem is None: 618 tc.status = TwisterStatus.PASS 619 else: 620 if elem.tag == ReportStatus.SKIP: 621 tc.status = TwisterStatus.SKIP 622 elif elem.tag == ReportStatus.FAIL: 623 tc.status = TwisterStatus.FAIL 624 else: 625 tc.status = TwisterStatus.ERROR 626 tc.reason = elem.get('message') 627 tc.output = elem.text 628 else: 629 self.status = TwisterStatus.SKIP 630 self.instance.reason = 'No tests collected' 631 632 633class Shell(Pytest): 634 def generate_command(self): 635 config = self.instance.testsuite.harness_config 636 pytest_root = [os.path.join(ZEPHYR_BASE, 'scripts', 'pylib', 'shell-twister-harness')] 637 config['pytest_root'] = pytest_root 638 639 command = super().generate_command() 640 if test_shell_file := self._get_shell_commands_file(config): 641 command.append(f'--testdata={test_shell_file}') 642 else: 643 logger.warning('No shell commands provided') 644 return command 645 646 def _get_shell_commands_file(self, harness_config): 647 if shell_commands := harness_config.get('shell_commands'): 648 test_shell_file = os.path.join(self.running_dir, 'test_shell.yml') 649 with open(test_shell_file, 'w') as f: 650 yaml.dump(shell_commands, f) 651 return test_shell_file 652 653 test_shell_file = harness_config.get('shell_commands_file', 'test_shell.yml') 654 test_shell_file = os.path.join( 655 self.source_dir, os.path.expanduser(os.path.expandvars(test_shell_file)) 656 ) 657 if os.path.exists(test_shell_file): 658 return test_shell_file 659 return None 660 661 662class Gtest(Harness): 663 ANSI_ESCAPE = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') 664 _NAME_PATTERN = "[a-zA-Z_][a-zA-Z0-9_]*" 665 _SUITE_TEST_NAME_PATTERN = f"(?P<suite_name>{_NAME_PATTERN})\\.(?P<test_name>{_NAME_PATTERN})" 666 TEST_START_PATTERN = f".*\\[ RUN \\] {_SUITE_TEST_NAME_PATTERN}" 667 TEST_PASS_PATTERN = f".*\\[ OK \\] {_SUITE_TEST_NAME_PATTERN}" 668 TEST_SKIP_PATTERN = f".*\\[ DISABLED \\] {_SUITE_TEST_NAME_PATTERN}" 669 TEST_FAIL_PATTERN = f".*\\[ FAILED \\] {_SUITE_TEST_NAME_PATTERN}" 670 FINISHED_PATTERN = ( 671 ".*(?:\\[==========\\] Done running all tests\\.|" 672 + "\\[----------\\] Global test environment tear-down)" 673 ) 674 675 def __init__(self): 676 super().__init__() 677 self.tc = None 678 self.has_failures = False 679 680 def handle(self, line): 681 # Strip the ANSI characters, they mess up the patterns 682 non_ansi_line = self.ANSI_ESCAPE.sub('', line) 683 684 if self.status != TwisterStatus.NONE: 685 return 686 687 # Check if we started running a new test 688 test_start_match = re.search(self.TEST_START_PATTERN, non_ansi_line) 689 if test_start_match: 690 # Add the suite name 691 suite_name = test_start_match.group("suite_name") 692 if suite_name not in self.detected_suite_names: 693 self.detected_suite_names.append(suite_name) 694 695 # Generate the internal name of the test 696 name = "{}.{}.{}".format(self.id, suite_name, test_start_match.group("test_name")) 697 698 # Assert that we don't already have a running test 699 assert ( 700 self.tc is None 701 ), f"gTest error, {self.tc} didn't finish" 702 703 # Check that the instance doesn't exist yet (prevents re-running) 704 tc = self.instance.get_case_by_name(name) 705 assert tc is None, f"gTest error, {tc} running twice" 706 707 # Create the test instance and set the context 708 tc = self.instance.get_case_or_create(name) 709 self.tc = tc 710 self.tc.status = TwisterStatus.STARTED 711 self.testcase_output += line + "\n" 712 self._match = True 713 714 # Check if the test run finished 715 finished_match = re.search(self.FINISHED_PATTERN, non_ansi_line) 716 if finished_match: 717 tc = self.instance.get_case_or_create(self.id) 718 if self.has_failures or self.tc is not None: 719 self.status = TwisterStatus.FAIL 720 tc.status = TwisterStatus.FAIL 721 else: 722 self.status = TwisterStatus.PASS 723 tc.status = TwisterStatus.PASS 724 return 725 726 # Check if the individual test finished 727 state, name = self._check_result(non_ansi_line) 728 if state == TwisterStatus.NONE or name is None: 729 # Nothing finished, keep processing lines 730 return 731 732 # Get the matching test and make sure it's the same as the current context 733 tc = self.instance.get_case_by_name(name) 734 assert ( 735 tc is not None and tc == self.tc 736 ), f"gTest error, mismatched tests. Expected {self.tc} but got {tc}" 737 738 # Test finished, clear the context 739 self.tc = None 740 741 # Update the status of the test 742 tc.status = state 743 if tc.status == TwisterStatus.FAIL: 744 self.has_failures = True 745 tc.output = self.testcase_output 746 self.testcase_output = "" 747 self._match = False 748 749 def _check_result(self, line): 750 test_pass_match = re.search(self.TEST_PASS_PATTERN, line) 751 if test_pass_match: 752 return TwisterStatus.PASS, \ 753 "{}.{}.{}".format( 754 self.id, test_pass_match.group("suite_name"), 755 test_pass_match.group("test_name") 756 ) 757 test_skip_match = re.search(self.TEST_SKIP_PATTERN, line) 758 if test_skip_match: 759 return TwisterStatus.SKIP, \ 760 "{}.{}.{}".format( 761 self.id, test_skip_match.group("suite_name"), 762 test_skip_match.group("test_name") 763 ) 764 test_fail_match = re.search(self.TEST_FAIL_PATTERN, line) 765 if test_fail_match: 766 return TwisterStatus.FAIL, \ 767 "{}.{}.{}".format( 768 self.id, test_fail_match.group("suite_name"), 769 test_fail_match.group("test_name") 770 ) 771 return None, None 772 773 774class Test(Harness): 775 __test__ = False # for pytest to skip this class when collects tests 776 777 test_suite_start_pattern = re.compile(r"Running TESTSUITE (?P<suite_name>\S*)") 778 test_suite_end_pattern = re.compile( 779 r"TESTSUITE (?P<suite_name>\S*)\s+(?P<suite_status>succeeded|failed)" 780 ) 781 test_case_start_pattern = re.compile(r"START - (test_)?([a-zA-Z0-9_-]+)") 782 test_case_end_pattern = re.compile( 783 r".*(PASS|FAIL|SKIP) - (test_)?(\S*) in (\d*[.,]?\d*) seconds" 784 ) 785 test_suite_summary_pattern = re.compile( 786 r"SUITE (?P<suite_status>\S*) - .* \[(?P<suite_name>\S*)\]:" 787 r" .* duration = (\d*[.,]?\d*) seconds" 788 ) 789 test_case_summary_pattern = re.compile( 790 r" - (PASS|FAIL|SKIP) - \[([^\.]*).(test_)?(\S*)\] duration = (\d*[.,]?\d*) seconds" 791 ) 792 793 794 def get_testcase(self, tc_name, phase, ts_name=None): 795 """ Search a Ztest case among detected in the test image binary 796 expecting the same test names as already known from the ELF. 797 Track suites and cases unexpectedly found in the log. 798 """ 799 ts_names = self.started_suites.keys() 800 if ts_name: 801 if self.trace and ts_name not in self.instance.testsuite.ztest_suite_names: 802 # This can happen if a ZTEST_SUITE name is macro-generated 803 # in the test source files, e.g. based on DT information. 804 logger.debug(f"{phase}: unexpected Ztest suite '{ts_name}' is " 805 f"not present among: {self.instance.testsuite.ztest_suite_names}") 806 if ts_name not in self.detected_suite_names: 807 if self.trace: 808 logger.debug(f"{phase}: detected new Ztest suite '{ts_name}'") 809 self.detected_suite_names.append(ts_name) 810 ts_names = [ ts_name ] if ts_name in ts_names else [] 811 812 # First, try to match the test case ID to the first running Ztest suite with this test name. 813 for ts_name_ in ts_names: 814 if self.started_suites[ts_name_]['count'] < (0 if phase == 'TS_SUM' else 1): 815 continue 816 tc_fq_id = self.instance.compose_case_name(f"{ts_name_}.{tc_name}") 817 if tc := self.instance.get_case_by_name(tc_fq_id): 818 if self.trace: 819 logger.debug(f"{phase}: Ztest case '{tc_name}' matched to '{tc_fq_id}") 820 return tc 821 logger.debug( 822 f"{phase}: Ztest case '{tc_name}' is not known" 823 f" in {self.started_suites} running suite(s)." 824 ) 825 tc_id = self.instance.compose_case_name(tc_name) 826 return self.instance.get_case_or_create(tc_id) 827 828 def start_suite(self, suite_name, phase='TS_START'): 829 if suite_name not in self.detected_suite_names: 830 self.detected_suite_names.append(suite_name) 831 if self.trace and suite_name not in self.instance.testsuite.ztest_suite_names: 832 # This can happen if a ZTEST_SUITE name is macro-generated 833 # in the test source files, e.g. based on DT information. 834 logger.debug(f"{phase}: unexpected Ztest suite '{suite_name}' is " 835 f"not present among: {self.instance.testsuite.ztest_suite_names}") 836 if suite_name in self.started_suites: 837 if self.started_suites[suite_name]['count'] > 0: 838 # Either the suite restarts itself or unexpected state transition. 839 logger.warning(f"{phase}: already STARTED '{suite_name}':" 840 f"{self.started_suites[suite_name]}") 841 elif self.trace: 842 logger.debug(f"{phase}: START suite '{suite_name}'") 843 self.started_suites[suite_name]['count'] += 1 844 self.started_suites[suite_name]['repeat'] += 1 845 else: 846 self.started_suites[suite_name] = { 'count': 1, 'repeat': 0 } 847 848 def end_suite(self, suite_name, phase='TS_END', suite_status=None): 849 if suite_name in self.started_suites: 850 if phase == 'TS_SUM' and self.started_suites[suite_name]['count'] == 0: 851 return 852 if self.started_suites[suite_name]['count'] < 1: 853 logger.error( 854 f"{phase}: already ENDED suite '{suite_name}':{self.started_suites[suite_name]}" 855 ) 856 elif self.trace: 857 logger.debug(f"{phase}: END suite '{suite_name}':{self.started_suites[suite_name]}") 858 self.started_suites[suite_name]['count'] -= 1 859 elif suite_status == 'SKIP': 860 self.start_suite(suite_name, phase) # register skipped suites at their summary end 861 self.started_suites[suite_name]['count'] -= 1 862 else: 863 logger.warning(f"{phase}: END suite '{suite_name}' without START detected") 864 865 def start_case(self, tc_name, phase='TC_START'): 866 if tc_name in self.started_cases: 867 if self.started_cases[tc_name]['count'] > 0: 868 logger.warning(f"{phase}: already STARTED case " 869 f"'{tc_name}':{self.started_cases[tc_name]}") 870 self.started_cases[tc_name]['count'] += 1 871 else: 872 self.started_cases[tc_name] = { 'count': 1 } 873 874 def end_case(self, tc_name, phase='TC_END'): 875 if tc_name in self.started_cases: 876 if phase == 'TS_SUM' and self.started_cases[tc_name]['count'] == 0: 877 return 878 if self.started_cases[tc_name]['count'] < 1: 879 logger.error( 880 f"{phase}: already ENDED case '{tc_name}':{self.started_cases[tc_name]}" 881 ) 882 elif self.trace: 883 logger.debug(f"{phase}: END case '{tc_name}':{self.started_cases[tc_name]}") 884 self.started_cases[tc_name]['count'] -= 1 885 elif phase != 'TS_SUM': 886 logger.warning(f"{phase}: END case '{tc_name}' without START detected") 887 888 889 def handle(self, line): 890 testcase_match = None 891 if self._match: 892 self.testcase_output += line + "\n" 893 894 if test_suite_start_match := re.search(self.test_suite_start_pattern, line): 895 self.start_suite(test_suite_start_match.group("suite_name")) 896 elif test_suite_end_match := re.search(self.test_suite_end_pattern, line): 897 suite_name=test_suite_end_match.group("suite_name") 898 self.end_suite(suite_name) 899 elif testcase_match := re.search(self.test_case_start_pattern, line): 900 tc_name = testcase_match.group(2) 901 tc = self.get_testcase(tc_name, 'TC_START') 902 self.start_case(tc.name) 903 # Mark the test as started, if something happens here, it is mostly 904 # due to this tests, for example timeout. This should in this case 905 # be marked as failed and not blocked (not run). 906 tc.status = TwisterStatus.STARTED 907 if not self._match: 908 self.testcase_output += line + "\n" 909 self._match = True 910 # some testcases are skipped based on predicates and do not show up 911 # during test execution, however they are listed in the summary. Parse 912 # the summary for status and use that status instead. 913 elif result_match := self.test_case_end_pattern.match(line): 914 matched_status = result_match.group(1) 915 tc_name = result_match.group(3) 916 tc = self.get_testcase(tc_name, 'TC_END') 917 self.end_case(tc.name) 918 tc.status = TwisterStatus[matched_status] 919 if tc.status == TwisterStatus.SKIP: 920 tc.reason = "ztest skip" 921 tc.duration = float(result_match.group(4)) 922 if tc.status == TwisterStatus.FAIL: 923 tc.output = self.testcase_output 924 self.testcase_output = "" 925 self._match = False 926 self.ztest = True 927 elif test_suite_summary_match := self.test_suite_summary_pattern.match(line): 928 suite_name=test_suite_summary_match.group("suite_name") 929 suite_status=test_suite_summary_match.group("suite_status") 930 self._match = False 931 self.ztest = True 932 self.end_suite(suite_name, 'TS_SUM', suite_status=suite_status) 933 elif test_case_summary_match := self.test_case_summary_pattern.match(line): 934 matched_status = test_case_summary_match.group(1) 935 suite_name = test_case_summary_match.group(2) 936 tc_name = test_case_summary_match.group(4) 937 tc = self.get_testcase(tc_name, 'TS_SUM', suite_name) 938 self.end_case(tc.name, 'TS_SUM') 939 tc.status = TwisterStatus[matched_status] 940 if tc.status == TwisterStatus.SKIP: 941 tc.reason = "ztest skip" 942 tc.duration = float(test_case_summary_match.group(5)) 943 if tc.status == TwisterStatus.FAIL: 944 tc.output = self.testcase_output 945 self.testcase_output = "" 946 self._match = False 947 self.ztest = True 948 949 self.process_test(line) 950 951 if not self.ztest and self.status != TwisterStatus.NONE: 952 logger.debug(f"not a ztest and no state for {self.id}") 953 tc = self.instance.get_case_or_create(self.id) 954 if self.status == TwisterStatus.PASS: 955 tc.status = TwisterStatus.PASS 956 else: 957 tc.status = TwisterStatus.FAIL 958 tc.reason = "Test failure" 959 960 961class Ztest(Test): 962 pass 963 964 965class Bsim(Harness): 966 967 def build(self): 968 """ 969 Copying the application executable to BabbleSim's bin directory enables 970 running multidevice bsim tests after twister has built them. 971 """ 972 973 if self.instance is None: 974 return 975 976 original_exe_path: str = os.path.join(self.instance.build_dir, 'zephyr', 'zephyr.exe') 977 if not os.path.exists(original_exe_path): 978 logger.warning('Cannot copy bsim exe - cannot find original executable.') 979 return 980 981 bsim_out_path: str = os.getenv('BSIM_OUT_PATH', '') 982 if not bsim_out_path: 983 logger.warning('Cannot copy bsim exe - BSIM_OUT_PATH not provided.') 984 return 985 986 new_exe_name: str = self.instance.testsuite.harness_config.get('bsim_exe_name', '') 987 if new_exe_name: 988 new_exe_name = f'bs_{self.instance.platform.name}_{new_exe_name}' 989 else: 990 new_exe_name = self.instance.name 991 new_exe_name = f'bs_{new_exe_name}' 992 993 new_exe_name = new_exe_name.replace(os.path.sep, '_').replace('.', '_').replace('@', '_') 994 995 new_exe_path: str = os.path.join(bsim_out_path, 'bin', new_exe_name) 996 logger.debug(f'Copying executable from {original_exe_path} to {new_exe_path}') 997 shutil.copy(original_exe_path, new_exe_path) 998 999class Ctest(Harness): 1000 def configure(self, instance: TestInstance): 1001 super().configure(instance) 1002 self.running_dir = instance.build_dir 1003 self.report_file = os.path.join(self.running_dir, 'report.xml') 1004 self.ctest_log_file_path = os.path.join(self.running_dir, 'twister_harness.log') 1005 self._output = [] 1006 1007 def ctest_run(self, timeout): 1008 assert self.instance is not None 1009 try: 1010 cmd = self.generate_command() 1011 self.run_command(cmd, timeout) 1012 except Exception as err: 1013 logger.error(str(err)) 1014 self.status = TwisterStatus.FAIL 1015 self.instance.reason = str(err) 1016 finally: 1017 self.instance.record(self.recording) 1018 self._update_test_status() 1019 1020 def generate_command(self): 1021 config = self.instance.testsuite.harness_config 1022 handler: Handler = self.instance.handler 1023 ctest_args_yaml = config.get('ctest_args', []) if config else [] 1024 command = [ 1025 'ctest', 1026 '--build-nocmake', 1027 '--test-dir', 1028 self.running_dir, 1029 '--output-junit', 1030 self.report_file, 1031 '--output-log', 1032 self.ctest_log_file_path, 1033 '--output-on-failure', 1034 ] 1035 base_timeout = handler.get_test_timeout() 1036 command.extend(['--timeout', str(base_timeout)]) 1037 command.extend(ctest_args_yaml) 1038 1039 if handler.options.ctest_args: 1040 command.extend(handler.options.ctest_args) 1041 1042 return command 1043 1044 def run_command(self, cmd, timeout): 1045 with subprocess.Popen( 1046 cmd, 1047 stdout=subprocess.PIPE, 1048 stderr=subprocess.STDOUT, 1049 ) as proc: 1050 try: 1051 reader_t = threading.Thread(target=self._output_reader, args=(proc,), daemon=True) 1052 reader_t.start() 1053 reader_t.join(timeout) 1054 if reader_t.is_alive(): 1055 terminate_process(proc) 1056 logger.warning('Timeout has occurred. Can be extended in testspec file. ' 1057 f'Currently set to {timeout} seconds.') 1058 self.instance.reason = 'Ctest timeout' 1059 self.status = TwisterStatus.FAIL 1060 proc.wait(timeout) 1061 except subprocess.TimeoutExpired: 1062 self.status = TwisterStatus.FAIL 1063 proc.kill() 1064 1065 if proc.returncode in (ExitCode.INTERRUPTED, ExitCode.USAGE_ERROR, ExitCode.INTERNAL_ERROR): 1066 self.status = TwisterStatus.ERROR 1067 self.instance.reason = f'Ctest error - return code {proc.returncode}' 1068 with open(self.ctest_log_file_path, 'w') as log_file: 1069 log_file.write(shlex.join(cmd) + '\n\n') 1070 log_file.write('\n'.join(self._output)) 1071 1072 def _output_reader(self, proc): 1073 self._output = [] 1074 while proc.stdout.readable() and proc.poll() is None: 1075 line = proc.stdout.readline().decode().strip() 1076 if not line: 1077 continue 1078 self._output.append(line) 1079 logger.debug(f'CTEST: {line}') 1080 self.parse_record(line) 1081 proc.communicate() 1082 1083 def _update_test_status(self): 1084 if self.status == TwisterStatus.NONE: 1085 self.instance.testcases = [] 1086 try: 1087 self._parse_report_file(self.report_file) 1088 except Exception as e: 1089 logger.error(f'Error when parsing file {self.report_file}: {e}') 1090 self.status = TwisterStatus.FAIL 1091 finally: 1092 if not self.instance.testcases: 1093 self.instance.init_cases() 1094 1095 self.instance.status = self.status if self.status != TwisterStatus.NONE else \ 1096 TwisterStatus.FAIL 1097 if self.instance.status in [TwisterStatus.ERROR, TwisterStatus.FAIL]: 1098 self.instance.reason = self.instance.reason or 'Ctest failed' 1099 self.instance.add_missing_case_status(TwisterStatus.BLOCK, self.instance.reason) 1100 1101 def _parse_report_file(self, report): 1102 suite = junit.JUnitXml.fromfile(report) 1103 if suite is None: 1104 self.status = TwisterStatus.SKIP 1105 self.instance.reason = 'No tests collected' 1106 return 1107 1108 assert isinstance(suite, junit.TestSuite) 1109 1110 if suite.failures and suite.failures > 0: 1111 self.status = TwisterStatus.FAIL 1112 self.instance.reason = f"{suite.failures}/{suite.tests} ctest scenario(s) failed" 1113 elif suite.errors and suite.errors > 0: 1114 self.status = TwisterStatus.ERROR 1115 self.instance.reason = 'Error during ctest execution' 1116 elif suite.skipped and suite.skipped > 0: 1117 self.status = TwisterStatus.SKIP 1118 else: 1119 self.status = TwisterStatus.PASS 1120 self.instance.execution_time = suite.time 1121 1122 for case in suite: 1123 tc = self.instance.add_testcase(f"{self.id}.{case.name}") 1124 tc.duration = case.time 1125 if any(isinstance(r, junit.Failure) for r in case.result): 1126 tc.status = TwisterStatus.FAIL 1127 tc.output = case.system_out 1128 elif any(isinstance(r, junit.Error) for r in case.result): 1129 tc.status = TwisterStatus.ERROR 1130 tc.output = case.system_out 1131 elif any(isinstance(r, junit.Skipped) for r in case.result): 1132 tc.status = TwisterStatus.SKIP 1133 else: 1134 tc.status = TwisterStatus.PASS 1135 1136class HarnessImporter: 1137 1138 @staticmethod 1139 def get_harness(harness_name): 1140 thismodule = sys.modules[__name__] 1141 try: 1142 if harness_name: 1143 harness_class = getattr(thismodule, harness_name) 1144 else: 1145 harness_class = thismodule.Test 1146 return harness_class() 1147 except AttributeError as e: 1148 logger.debug(f"harness {harness_name} not implemented: {e}") 1149 return None 1150