1# SPDX-License-Identifier: Apache-2.0 2from __future__ import annotations 3 4import json 5import logging 6import os 7import platform 8import re 9import shlex 10import shutil 11import subprocess 12import sys 13import threading 14import time 15import xml.etree.ElementTree as ET 16from collections import OrderedDict 17from enum import Enum 18 19from pytest import ExitCode 20from twisterlib.constants import SUPPORTED_SIMS_IN_PYTEST 21from twisterlib.environment import PYTEST_PLUGIN_INSTALLED, ZEPHYR_BASE 22from twisterlib.error import ConfigurationError, StatusAttributeError 23from twisterlib.handlers import Handler, terminate_process 24from twisterlib.reports import ReportStatus 25from twisterlib.statuses import TwisterStatus 26from twisterlib.testinstance import TestInstance 27 28logger = logging.getLogger('twister') 29logger.setLevel(logging.DEBUG) 30 31_WINDOWS = platform.system() == 'Windows' 32 33 34class Harness: 35 GCOV_START = "GCOV_COVERAGE_DUMP_START" 36 GCOV_END = "GCOV_COVERAGE_DUMP_END" 37 FAULT = "ZEPHYR FATAL ERROR" 38 RUN_PASSED = "PROJECT EXECUTION SUCCESSFUL" 39 RUN_FAILED = "PROJECT EXECUTION FAILED" 40 run_id_pattern = r"RunID: (?P<run_id>.*)" 41 42 def __init__(self): 43 self._status = TwisterStatus.NONE 44 self.reason = None 45 self.type = None 46 self.regex = [] 47 self.matches = OrderedDict() 48 self.ordered = True 49 self.id = None 50 self.fail_on_fault = True 51 self.fault = False 52 self.capture_coverage = False 53 self.next_pattern = 0 54 self.record = None 55 self.record_pattern = None 56 self.record_as_json = None 57 self.recording = [] 58 self.ztest = False 59 self.detected_suite_names = [] 60 self.run_id = None 61 self.started_suites = {} 62 self.started_cases = {} 63 self.matched_run_id = False 64 self.run_id_exists = False 65 self.instance: TestInstance | None = None 66 self.testcase_output = "" 67 self._match = False 68 69 70 @property 71 def trace(self) -> bool: 72 return self.instance.handler.options.verbose > 2 73 74 @property 75 def status(self) -> TwisterStatus: 76 return self._status 77 78 @status.setter 79 def status(self, value : TwisterStatus) -> None: 80 # Check for illegal assignments by value 81 try: 82 key = value.name if isinstance(value, Enum) else value 83 self._status = TwisterStatus[key] 84 except KeyError as err: 85 raise StatusAttributeError(self.__class__, value) from err 86 87 def configure(self, instance): 88 self.instance = instance 89 config = instance.testsuite.harness_config 90 self.id = instance.testsuite.id 91 self.run_id = instance.run_id 92 if instance.testsuite.ignore_faults: 93 self.fail_on_fault = False 94 95 if config: 96 self.type = config.get('type', None) 97 self.regex = config.get('regex', []) 98 self.ordered = config.get('ordered', True) 99 self.record = config.get('record', {}) 100 if self.record: 101 self.record_pattern = re.compile(self.record.get("regex", "")) 102 self.record_as_json = self.record.get("as_json") 103 104 def build(self): 105 pass 106 107 def get_testcase_name(self): 108 """ 109 Get current TestCase name. 110 """ 111 return self.id 112 113 def translate_record(self, record: dict) -> dict: 114 if self.record_as_json: 115 for k in self.record_as_json: 116 if k not in record: 117 continue 118 try: 119 record[k] = json.loads(record[k]) if record[k] else {} 120 except json.JSONDecodeError as parse_error: 121 logger.warning(f"HARNESS:{self.__class__.__name__}: recording JSON failed:" 122 f" {parse_error} for '{k}':'{record[k]}'") 123 # Don't set the Harness state to failed for recordings. 124 record[k] = { 'ERROR': { 'msg': str(parse_error), 'doc': record[k] } } 125 return record 126 127 def parse_record(self, line) -> re.Match: 128 match = None 129 if self.record_pattern: 130 match = self.record_pattern.search(line) 131 if match: 132 rec = self.translate_record( 133 { k:v.strip() for k,v in match.groupdict(default="").items() } 134 ) 135 self.recording.append(rec) 136 return match 137 # 138 139 def process_test(self, line): 140 141 self.parse_record(line) 142 143 runid_match = re.search(self.run_id_pattern, line) 144 if runid_match: 145 run_id = runid_match.group("run_id") 146 self.run_id_exists = True 147 if run_id == str(self.run_id): 148 self.matched_run_id = True 149 150 if self.RUN_PASSED in line: 151 if self.fault: 152 self.status = TwisterStatus.FAIL 153 self.reason = "Fault detected while running test" 154 else: 155 self.status = TwisterStatus.PASS 156 157 if self.RUN_FAILED in line: 158 self.status = TwisterStatus.FAIL 159 self.reason = "Testsuite failed" 160 161 if self.fail_on_fault and line == self.FAULT: 162 self.fault = True 163 164 if self.GCOV_START in line: 165 self.capture_coverage = True 166 elif self.GCOV_END in line: 167 self.capture_coverage = False 168 169class Robot(Harness): 170 171 is_robot_test = True 172 173 def configure(self, instance): 174 super().configure(instance) 175 self.instance = instance 176 177 config = instance.testsuite.harness_config 178 if config: 179 self.path = config.get('robot_testsuite', None) 180 self.option = config.get('robot_option', None) 181 182 def handle(self, line): 183 ''' Test cases that make use of this harness care about results given 184 by Robot Framework which is called in run_robot_test(), so works of this 185 handle is trying to give a PASS or FAIL to avoid timeout, nothing 186 is writen into handler.log 187 ''' 188 self.instance.status = TwisterStatus.PASS 189 tc = self.instance.get_case_or_create(self.id) 190 tc.status = TwisterStatus.PASS 191 192 def run_robot_test(self, command, handler): 193 start_time = time.time() 194 env = os.environ.copy() 195 196 if self.option: 197 if isinstance(self.option, list): 198 for option in self.option: 199 for v in str(option).split(): 200 command.append(f'{v}') 201 else: 202 for v in str(self.option).split(): 203 command.append(f'{v}') 204 205 if self.path is None: 206 raise PytestHarnessException('The parameter robot_testsuite is mandatory') 207 208 if isinstance(self.path, list): 209 for suite in self.path: 210 command.append(os.path.join(handler.sourcedir, suite)) 211 else: 212 command.append(os.path.join(handler.sourcedir, self.path)) 213 214 with subprocess.Popen(command, stdout=subprocess.PIPE, 215 stderr=subprocess.STDOUT, cwd=self.instance.build_dir, env=env) as renode_test_proc: 216 out, _ = renode_test_proc.communicate() 217 218 self.instance.execution_time = time.time() - start_time 219 220 if renode_test_proc.returncode == 0: 221 self.instance.status = TwisterStatus.PASS 222 # all tests in one Robot file are treated as a single test case, 223 # so its status should be set accordingly to the instance status 224 # please note that there should be only one testcase in testcases list 225 self.instance.testcases[0].status = TwisterStatus.PASS 226 else: 227 logger.error( 228 f"Robot test failure: {handler.sourcedir} for {self.instance.platform.name}" 229 ) 230 self.instance.status = TwisterStatus.FAIL 231 self.instance.testcases[0].status = TwisterStatus.FAIL 232 233 if out: 234 with open(os.path.join(self.instance.build_dir, handler.log), 'w') as log: 235 log_msg = out.decode(sys.getdefaultencoding()) 236 log.write(log_msg) 237 238class Console(Harness): 239 240 def get_testcase_name(self): 241 ''' 242 Get current TestCase name. 243 244 Console Harness id has only TestSuite id without TestCase name suffix. 245 Only the first TestCase name might be taken if available when a Ztest with 246 a single test case is configured to use this harness type for simplified 247 output parsing instead of the Ztest harness as Ztest suite should do. 248 ''' 249 if self.instance and len(self.instance.testcases) == 1: 250 return self.instance.testcases[0].name 251 return super().get_testcase_name() 252 253 def configure(self, instance): 254 super().configure(instance) 255 if self.regex is None or len(self.regex) == 0: 256 self.status = TwisterStatus.FAIL 257 tc = self.instance.set_case_status_by_name( 258 self.get_testcase_name(), 259 TwisterStatus.FAIL, 260 f"HARNESS:{self.__class__.__name__}:no regex patterns configured." 261 ) 262 raise ConfigurationError(self.instance.name, tc.reason) 263 if self.type == "one_line": 264 self.pattern = re.compile(self.regex[0]) 265 self.patterns_expected = 1 266 elif self.type == "multi_line": 267 self.patterns = [] 268 for r in self.regex: 269 self.patterns.append(re.compile(r)) 270 self.patterns_expected = len(self.patterns) 271 else: 272 self.status = TwisterStatus.FAIL 273 tc = self.instance.set_case_status_by_name( 274 self.get_testcase_name(), 275 TwisterStatus.FAIL, 276 f"HARNESS:{self.__class__.__name__}:incorrect type={self.type}" 277 ) 278 raise ConfigurationError(self.instance.name, tc.reason) 279 # 280 281 def handle(self, line): 282 if self.type == "one_line": 283 if self.pattern.search(line): 284 logger.debug(f"HARNESS:{self.__class__.__name__}:EXPECTED:" 285 f"'{self.pattern.pattern}'") 286 self.next_pattern += 1 287 self.status = TwisterStatus.PASS 288 elif self.type == "multi_line" and self.ordered: 289 if (self.next_pattern < len(self.patterns) and 290 self.patterns[self.next_pattern].search(line)): 291 logger.debug(f"HARNESS:{self.__class__.__name__}:EXPECTED(" 292 f"{self.next_pattern + 1}/{self.patterns_expected}):" 293 f"'{self.patterns[self.next_pattern].pattern}'") 294 self.next_pattern += 1 295 if self.next_pattern >= len(self.patterns): 296 self.status = TwisterStatus.PASS 297 elif self.type == "multi_line" and not self.ordered: 298 for i, pattern in enumerate(self.patterns): 299 r = self.regex[i] 300 if pattern.search(line) and r not in self.matches: 301 self.matches[r] = line 302 logger.debug(f"HARNESS:{self.__class__.__name__}:EXPECTED(" 303 f"{len(self.matches)}/{self.patterns_expected}):" 304 f"'{pattern.pattern}'") 305 if len(self.matches) == len(self.regex): 306 self.status = TwisterStatus.PASS 307 else: 308 logger.error("Unknown harness_config type") 309 310 if self.fail_on_fault and self.FAULT in line: 311 self.fault = True 312 313 if self.GCOV_START in line: 314 self.capture_coverage = True 315 elif self.GCOV_END in line: 316 self.capture_coverage = False 317 318 self.process_test(line) 319 # Reset the resulting test state to FAIL when not all of the patterns were 320 # found in the output, but just ztest's 'PROJECT EXECUTION SUCCESSFUL'. 321 # It might happen because of the pattern sequence diverged from the 322 # test code, the test platform has console issues, or even some other 323 # test image was executed. 324 # TODO: Introduce explicit match policy type to reject 325 # unexpected console output, allow missing patterns, deny duplicates. 326 if self.status == TwisterStatus.PASS and \ 327 self.ordered and \ 328 self.next_pattern < self.patterns_expected: 329 logger.error(f"HARNESS:{self.__class__.__name__}: failed with" 330 f" {self.next_pattern} of {self.patterns_expected}" 331 f" expected ordered patterns.") 332 self.status = TwisterStatus.FAIL 333 self.reason = "patterns did not match (ordered)" 334 if self.status == TwisterStatus.PASS and \ 335 not self.ordered and \ 336 len(self.matches) < self.patterns_expected: 337 logger.error(f"HARNESS:{self.__class__.__name__}: failed with" 338 f" {len(self.matches)} of {self.patterns_expected}" 339 f" expected unordered patterns.") 340 self.status = TwisterStatus.FAIL 341 self.reason = "patterns did not match (unordered)" 342 343 tc = self.instance.get_case_or_create(self.get_testcase_name()) 344 if self.status == TwisterStatus.PASS: 345 tc.status = TwisterStatus.PASS 346 else: 347 tc.status = TwisterStatus.FAIL 348 349 350class PytestHarnessException(Exception): 351 """General exception for pytest.""" 352 353 354class Pytest(Harness): 355 356 def configure(self, instance: TestInstance): 357 super().configure(instance) 358 self.running_dir = instance.build_dir 359 self.source_dir = instance.testsuite.source_dir 360 self.report_file = os.path.join(self.running_dir, 'report.xml') 361 self.pytest_log_file_path = os.path.join(self.running_dir, 'twister_harness.log') 362 self.reserved_dut = None 363 self._output = [] 364 365 def pytest_run(self, timeout): 366 try: 367 cmd = self.generate_command() 368 self.run_command(cmd, timeout) 369 except PytestHarnessException as pytest_exception: 370 logger.error(str(pytest_exception)) 371 self.status = TwisterStatus.FAIL 372 self.instance.reason = str(pytest_exception) 373 finally: 374 self.instance.record(self.recording) 375 self._update_test_status() 376 if self.reserved_dut: 377 self.instance.handler.make_dut_available(self.reserved_dut) 378 379 def generate_command(self): 380 config = self.instance.testsuite.harness_config 381 handler: Handler = self.instance.handler 382 pytest_root = config.get('pytest_root', ['pytest']) if config else ['pytest'] 383 pytest_args_yaml = config.get('pytest_args', []) if config else [] 384 pytest_dut_scope = config.get('pytest_dut_scope', None) if config else None 385 command = [ 386 'pytest', 387 '--twister-harness', 388 '-s', '-v', 389 f'--build-dir={self.running_dir}', 390 f'--junit-xml={self.report_file}', 391 '--log-file-level=DEBUG', 392 '--log-file-format=%(asctime)s.%(msecs)03d:%(levelname)s:%(name)s: %(message)s', 393 f'--log-file={self.pytest_log_file_path}', 394 f'--platform={self.instance.platform.name}' 395 ] 396 command.extend([os.path.normpath(os.path.join( 397 self.source_dir, os.path.expanduser(os.path.expandvars(src)))) for src in pytest_root]) 398 399 if pytest_dut_scope: 400 command.append(f'--dut-scope={pytest_dut_scope}') 401 402 # Always pass output from the pytest test and the test image up to Twister log. 403 command.extend([ 404 '--log-cli-level=DEBUG', 405 '--log-cli-format=%(levelname)s: %(message)s' 406 ]) 407 408 # Use the test timeout as the base timeout for pytest 409 base_timeout = handler.get_test_timeout() 410 command.append(f'--base-timeout={base_timeout}') 411 412 if handler.type_str == 'device': 413 command.extend( 414 self._generate_parameters_for_hardware(handler) 415 ) 416 elif handler.type_str in SUPPORTED_SIMS_IN_PYTEST: 417 command.append(f'--device-type={handler.type_str}') 418 elif handler.type_str == 'build': 419 command.append('--device-type=custom') 420 else: 421 raise PytestHarnessException( 422 f'Support for handler {handler.type_str} not implemented yet' 423 ) 424 425 if handler.type_str != 'device': 426 for fixture in handler.options.fixture: 427 command.append(f'--twister-fixture={fixture}') 428 429 if handler.options.extra_test_args and handler.type_str == 'native': 430 command.append(f'--extra-test-args={shlex.join(handler.options.extra_test_args)}') 431 432 command.extend(pytest_args_yaml) 433 434 if handler.options.pytest_args: 435 command.extend(handler.options.pytest_args) 436 437 return command 438 439 def _generate_parameters_for_hardware(self, handler: Handler): 440 command = ['--device-type=hardware'] 441 hardware = handler.get_hardware() 442 if not hardware: 443 raise PytestHarnessException('Hardware is not available') 444 # update the instance with the device id to have it in the summary report 445 self.instance.dut = hardware.id 446 447 self.reserved_dut = hardware 448 if hardware.serial_pty: 449 command.append(f'--device-serial-pty={hardware.serial_pty}') 450 else: 451 command.extend([ 452 f'--device-serial={hardware.serial}', 453 f'--device-serial-baud={hardware.baud}' 454 ]) 455 456 if hardware.flash_timeout: 457 command.append(f'--flash-timeout={hardware.flash_timeout}') 458 459 options = handler.options 460 if runner := hardware.runner or options.west_runner: 461 command.append(f'--runner={runner}') 462 463 if hardware.runner_params: 464 for param in hardware.runner_params: 465 command.append(f'--runner-params={param}') 466 467 if options.west_flash and options.west_flash != []: 468 command.append(f'--west-flash-extra-args={options.west_flash}') 469 470 if board_id := hardware.probe_id or hardware.id: 471 command.append(f'--device-id={board_id}') 472 473 if hardware.product: 474 command.append(f'--device-product={hardware.product}') 475 476 if hardware.pre_script: 477 command.append(f'--pre-script={hardware.pre_script}') 478 479 if hardware.post_flash_script: 480 command.append(f'--post-flash-script={hardware.post_flash_script}') 481 482 if hardware.post_script: 483 command.append(f'--post-script={hardware.post_script}') 484 485 if hardware.flash_before: 486 command.append(f'--flash-before={hardware.flash_before}') 487 488 for fixture in hardware.fixtures: 489 command.append(f'--twister-fixture={fixture}') 490 491 return command 492 493 def run_command(self, cmd, timeout): 494 cmd, env = self._update_command_with_env_dependencies(cmd) 495 with subprocess.Popen( 496 cmd, 497 stdout=subprocess.PIPE, 498 stderr=subprocess.STDOUT, 499 env=env 500 ) as proc: 501 try: 502 reader_t = threading.Thread(target=self._output_reader, args=(proc,), daemon=True) 503 reader_t.start() 504 reader_t.join(timeout) 505 if reader_t.is_alive(): 506 terminate_process(proc) 507 logger.warning('Timeout has occurred. Can be extended in testspec file. ' 508 f'Currently set to {timeout} seconds.') 509 self.instance.reason = 'Pytest timeout' 510 self.status = TwisterStatus.FAIL 511 proc.wait(timeout) 512 except subprocess.TimeoutExpired: 513 self.status = TwisterStatus.FAIL 514 proc.kill() 515 516 if proc.returncode in (ExitCode.INTERRUPTED, ExitCode.USAGE_ERROR, ExitCode.INTERNAL_ERROR): 517 self.status = TwisterStatus.ERROR 518 self.instance.reason = f'Pytest error - return code {proc.returncode}' 519 with open(self.pytest_log_file_path, 'w') as log_file: 520 log_file.write(shlex.join(cmd) + '\n\n') 521 log_file.write('\n'.join(self._output)) 522 523 @staticmethod 524 def _update_command_with_env_dependencies(cmd): 525 ''' 526 If python plugin wasn't installed by pip, then try to indicate it to 527 pytest by update PYTHONPATH and append -p argument to pytest command. 528 ''' 529 env = os.environ.copy() 530 if not PYTEST_PLUGIN_INSTALLED: 531 cmd.extend(['-p', 'twister_harness.plugin']) 532 pytest_plugin_path = os.path.join( 533 ZEPHYR_BASE, 534 'scripts', 535 'pylib', 536 'pytest-twister-harness', 537 'src' 538 ) 539 env['PYTHONPATH'] = pytest_plugin_path + os.pathsep + env.get('PYTHONPATH', '') 540 if _WINDOWS: 541 cmd_append_python_path = f'set PYTHONPATH={pytest_plugin_path};%PYTHONPATH% && ' 542 else: 543 cmd_append_python_path = ( 544 f'export PYTHONPATH={pytest_plugin_path}:${{PYTHONPATH}} && ' 545 ) 546 else: 547 cmd_append_python_path = '' 548 cmd_to_print = cmd_append_python_path + shlex.join(cmd) 549 logger.debug(f'Running pytest command: {cmd_to_print}') 550 551 return cmd, env 552 553 def _output_reader(self, proc): 554 self._output = [] 555 while proc.stdout.readable() and proc.poll() is None: 556 line = proc.stdout.readline().decode().strip() 557 if not line: 558 continue 559 self._output.append(line) 560 logger.debug(f'PYTEST: {line}') 561 self.parse_record(line) 562 proc.communicate() 563 564 def _update_test_status(self): 565 if self.status == TwisterStatus.NONE: 566 self.instance.testcases = [] 567 try: 568 self._parse_report_file(self.report_file) 569 except Exception as e: 570 logger.error(f'Error when parsing file {self.report_file}: {e}') 571 self.status = TwisterStatus.FAIL 572 finally: 573 if not self.instance.testcases: 574 self.instance.init_cases() 575 576 self.instance.status = self.status if self.status != TwisterStatus.NONE else \ 577 TwisterStatus.FAIL 578 if self.instance.status in [TwisterStatus.ERROR, TwisterStatus.FAIL]: 579 self.instance.reason = self.instance.reason or 'Pytest failed' 580 self.instance.add_missing_case_status(TwisterStatus.BLOCK, self.instance.reason) 581 582 def _parse_report_file(self, report): 583 tree = ET.parse(report) 584 root = tree.getroot() 585 586 if (elem_ts := root.find('testsuite')) is not None: 587 if elem_ts.get('failures') != '0': 588 self.status = TwisterStatus.FAIL 589 self.instance.reason = ( 590 f"{elem_ts.get('failures')}/{elem_ts.get('tests')} pytest scenario(s) failed" 591 ) 592 elif elem_ts.get('errors') != '0': 593 self.status = TwisterStatus.ERROR 594 self.instance.reason = 'Error during pytest execution' 595 elif elem_ts.get('skipped') == elem_ts.get('tests'): 596 self.status = TwisterStatus.SKIP 597 else: 598 self.status = TwisterStatus.PASS 599 self.instance.execution_time = float(elem_ts.get('time')) 600 601 for elem_tc in elem_ts.findall('testcase'): 602 tc = self.instance.add_testcase(f"{self.id}.{elem_tc.get('name')}") 603 tc.duration = float(elem_tc.get('time')) 604 elem = elem_tc.find('*') 605 if elem is None: 606 tc.status = TwisterStatus.PASS 607 else: 608 if elem.tag == ReportStatus.SKIP: 609 tc.status = TwisterStatus.SKIP 610 elif elem.tag == ReportStatus.FAIL: 611 tc.status = TwisterStatus.FAIL 612 else: 613 tc.status = TwisterStatus.ERROR 614 tc.reason = elem.get('message') 615 tc.output = elem.text 616 else: 617 self.status = TwisterStatus.SKIP 618 self.instance.reason = 'No tests collected' 619 620 621class Gtest(Harness): 622 ANSI_ESCAPE = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') 623 _NAME_PATTERN = "[a-zA-Z_][a-zA-Z0-9_]*" 624 _SUITE_TEST_NAME_PATTERN = f"(?P<suite_name>{_NAME_PATTERN})\\.(?P<test_name>{_NAME_PATTERN})" 625 TEST_START_PATTERN = f".*\\[ RUN \\] {_SUITE_TEST_NAME_PATTERN}" 626 TEST_PASS_PATTERN = f".*\\[ OK \\] {_SUITE_TEST_NAME_PATTERN}" 627 TEST_SKIP_PATTERN = f".*\\[ DISABLED \\] {_SUITE_TEST_NAME_PATTERN}" 628 TEST_FAIL_PATTERN = f".*\\[ FAILED \\] {_SUITE_TEST_NAME_PATTERN}" 629 FINISHED_PATTERN = ( 630 ".*(?:\\[==========\\] Done running all tests\\.|" 631 + "\\[----------\\] Global test environment tear-down)" 632 ) 633 634 def __init__(self): 635 super().__init__() 636 self.tc = None 637 self.has_failures = False 638 639 def handle(self, line): 640 # Strip the ANSI characters, they mess up the patterns 641 non_ansi_line = self.ANSI_ESCAPE.sub('', line) 642 643 if self.status != TwisterStatus.NONE: 644 return 645 646 # Check if we started running a new test 647 test_start_match = re.search(self.TEST_START_PATTERN, non_ansi_line) 648 if test_start_match: 649 # Add the suite name 650 suite_name = test_start_match.group("suite_name") 651 if suite_name not in self.detected_suite_names: 652 self.detected_suite_names.append(suite_name) 653 654 # Generate the internal name of the test 655 name = "{}.{}.{}".format(self.id, suite_name, test_start_match.group("test_name")) 656 657 # Assert that we don't already have a running test 658 assert ( 659 self.tc is None 660 ), f"gTest error, {self.tc} didn't finish" 661 662 # Check that the instance doesn't exist yet (prevents re-running) 663 tc = self.instance.get_case_by_name(name) 664 assert tc is None, f"gTest error, {tc} running twice" 665 666 # Create the test instance and set the context 667 tc = self.instance.get_case_or_create(name) 668 self.tc = tc 669 self.tc.status = TwisterStatus.STARTED 670 self.testcase_output += line + "\n" 671 self._match = True 672 673 # Check if the test run finished 674 finished_match = re.search(self.FINISHED_PATTERN, non_ansi_line) 675 if finished_match: 676 tc = self.instance.get_case_or_create(self.id) 677 if self.has_failures or self.tc is not None: 678 self.status = TwisterStatus.FAIL 679 tc.status = TwisterStatus.FAIL 680 else: 681 self.status = TwisterStatus.PASS 682 tc.status = TwisterStatus.PASS 683 return 684 685 # Check if the individual test finished 686 state, name = self._check_result(non_ansi_line) 687 if state == TwisterStatus.NONE or name is None: 688 # Nothing finished, keep processing lines 689 return 690 691 # Get the matching test and make sure it's the same as the current context 692 tc = self.instance.get_case_by_name(name) 693 assert ( 694 tc is not None and tc == self.tc 695 ), f"gTest error, mismatched tests. Expected {self.tc} but got {tc}" 696 697 # Test finished, clear the context 698 self.tc = None 699 700 # Update the status of the test 701 tc.status = state 702 if tc.status == TwisterStatus.FAIL: 703 self.has_failures = True 704 tc.output = self.testcase_output 705 self.testcase_output = "" 706 self._match = False 707 708 def _check_result(self, line): 709 test_pass_match = re.search(self.TEST_PASS_PATTERN, line) 710 if test_pass_match: 711 return TwisterStatus.PASS, \ 712 "{}.{}.{}".format( 713 self.id, test_pass_match.group("suite_name"), 714 test_pass_match.group("test_name") 715 ) 716 test_skip_match = re.search(self.TEST_SKIP_PATTERN, line) 717 if test_skip_match: 718 return TwisterStatus.SKIP, \ 719 "{}.{}.{}".format( 720 self.id, test_skip_match.group("suite_name"), 721 test_skip_match.group("test_name") 722 ) 723 test_fail_match = re.search(self.TEST_FAIL_PATTERN, line) 724 if test_fail_match: 725 return TwisterStatus.FAIL, \ 726 "{}.{}.{}".format( 727 self.id, test_fail_match.group("suite_name"), 728 test_fail_match.group("test_name") 729 ) 730 return None, None 731 732 733class Test(Harness): 734 __test__ = False # for pytest to skip this class when collects tests 735 736 test_suite_start_pattern = re.compile(r"Running TESTSUITE (?P<suite_name>\S*)") 737 test_suite_end_pattern = re.compile( 738 r"TESTSUITE (?P<suite_name>\S*)\s+(?P<suite_status>succeeded|failed)" 739 ) 740 test_case_start_pattern = re.compile(r"START - (test_)?([a-zA-Z0-9_-]+)") 741 test_case_end_pattern = re.compile( 742 r".*(PASS|FAIL|SKIP) - (test_)?(\S*) in (\d*[.,]?\d*) seconds" 743 ) 744 test_suite_summary_pattern = re.compile( 745 r"SUITE (?P<suite_status>\S*) - .* \[(?P<suite_name>\S*)\]:" 746 r" .* duration = (\d*[.,]?\d*) seconds" 747 ) 748 test_case_summary_pattern = re.compile( 749 r" - (PASS|FAIL|SKIP) - \[([^\.]*).(test_)?(\S*)\] duration = (\d*[.,]?\d*) seconds" 750 ) 751 752 753 def get_testcase(self, tc_name, phase, ts_name=None): 754 """ Search a Ztest case among detected in the test image binary 755 expecting the same test names as already known from the ELF. 756 Track suites and cases unexpectedly found in the log. 757 """ 758 ts_names = self.started_suites.keys() 759 if ts_name: 760 if ts_name not in self.instance.testsuite.ztest_suite_names: 761 logger.warning(f"On {phase}: unexpected Ztest suite '{ts_name}' " 762 f"not present among: {self.instance.testsuite.ztest_suite_names}") 763 if ts_name not in self.detected_suite_names: 764 if self.trace: 765 logger.debug(f"On {phase}: detected new Ztest suite '{ts_name}'") 766 self.detected_suite_names.append(ts_name) 767 ts_names = [ ts_name ] if ts_name in ts_names else [] 768 769 # First, try to match the test case ID to the first running Ztest suite with this test name. 770 for ts_name_ in ts_names: 771 if self.started_suites[ts_name_]['count'] < (0 if phase == 'TS_SUM' else 1): 772 continue 773 tc_fq_id = self.instance.compose_case_name(f"{ts_name_}.{tc_name}") 774 if tc := self.instance.get_case_by_name(tc_fq_id): 775 if self.trace: 776 logger.debug(f"On {phase}: Ztest case '{tc_name}' matched to '{tc_fq_id}") 777 return tc 778 logger.debug( 779 f"On {phase}: Ztest case '{tc_name}' is not known" 780 f" in {self.started_suites} running suite(s)." 781 ) 782 tc_id = self.instance.compose_case_name(tc_name) 783 return self.instance.get_case_or_create(tc_id) 784 785 def start_suite(self, suite_name): 786 if suite_name not in self.detected_suite_names: 787 self.detected_suite_names.append(suite_name) 788 if suite_name not in self.instance.testsuite.ztest_suite_names: 789 logger.warning(f"Unexpected Ztest suite '{suite_name}'") 790 if suite_name in self.started_suites: 791 if self.started_suites[suite_name]['count'] > 0: 792 logger.warning(f"Already STARTED '{suite_name}':{self.started_suites[suite_name]}") 793 elif self.trace: 794 logger.debug(f"START suite '{suite_name}'") 795 self.started_suites[suite_name]['count'] += 1 796 self.started_suites[suite_name]['repeat'] += 1 797 else: 798 self.started_suites[suite_name] = { 'count': 1, 'repeat': 0 } 799 800 def end_suite(self, suite_name, phase='', suite_status=None): 801 if suite_name in self.started_suites: 802 if phase == 'TS_SUM' and self.started_suites[suite_name]['count'] == 0: 803 return 804 if self.started_suites[suite_name]['count'] < 1: 805 logger.error( 806 f"Already ENDED {phase} suite '{suite_name}':{self.started_suites[suite_name]}" 807 ) 808 elif self.trace: 809 logger.debug(f"END {phase} suite '{suite_name}':{self.started_suites[suite_name]}") 810 self.started_suites[suite_name]['count'] -= 1 811 elif suite_status == 'SKIP': 812 self.start_suite(suite_name) # register skipped suites at their summary end 813 self.started_suites[suite_name]['count'] -= 1 814 else: 815 logger.warning(f"END {phase} suite '{suite_name}' without START detected") 816 817 def start_case(self, tc_name): 818 if tc_name in self.started_cases: 819 if self.started_cases[tc_name]['count'] > 0: 820 logger.warning(f"Already STARTED '{tc_name}':{self.started_cases[tc_name]}") 821 self.started_cases[tc_name]['count'] += 1 822 else: 823 self.started_cases[tc_name] = { 'count': 1 } 824 825 def end_case(self, tc_name, phase=''): 826 if tc_name in self.started_cases: 827 if phase == 'TS_SUM' and self.started_cases[tc_name]['count'] == 0: 828 return 829 if self.started_cases[tc_name]['count'] < 1: 830 logger.error( 831 f"Already ENDED {phase} case '{tc_name}':{self.started_cases[tc_name]}" 832 ) 833 elif self.trace: 834 logger.debug(f"END {phase} case '{tc_name}':{self.started_cases[tc_name]}") 835 self.started_cases[tc_name]['count'] -= 1 836 elif phase != 'TS_SUM': 837 logger.warning(f"END {phase} case '{tc_name}' without START detected") 838 839 840 def handle(self, line): 841 testcase_match = None 842 if self._match: 843 self.testcase_output += line + "\n" 844 845 if test_suite_start_match := re.search(self.test_suite_start_pattern, line): 846 self.start_suite(test_suite_start_match.group("suite_name")) 847 elif test_suite_end_match := re.search(self.test_suite_end_pattern, line): 848 suite_name=test_suite_end_match.group("suite_name") 849 self.end_suite(suite_name, 'TS_END') 850 elif testcase_match := re.search(self.test_case_start_pattern, line): 851 tc_name = testcase_match.group(2) 852 tc = self.get_testcase(tc_name, 'TC_START') 853 self.start_case(tc.name) 854 # Mark the test as started, if something happens here, it is mostly 855 # due to this tests, for example timeout. This should in this case 856 # be marked as failed and not blocked (not run). 857 tc.status = TwisterStatus.STARTED 858 if not self._match: 859 self.testcase_output += line + "\n" 860 self._match = True 861 # some testcases are skipped based on predicates and do not show up 862 # during test execution, however they are listed in the summary. Parse 863 # the summary for status and use that status instead. 864 elif result_match := self.test_case_end_pattern.match(line): 865 matched_status = result_match.group(1) 866 tc_name = result_match.group(3) 867 tc = self.get_testcase(tc_name, 'TC_END') 868 self.end_case(tc.name) 869 tc.status = TwisterStatus[matched_status] 870 if tc.status == TwisterStatus.SKIP: 871 tc.reason = "ztest skip" 872 tc.duration = float(result_match.group(4)) 873 if tc.status == TwisterStatus.FAIL: 874 tc.output = self.testcase_output 875 self.testcase_output = "" 876 self._match = False 877 self.ztest = True 878 elif test_suite_summary_match := self.test_suite_summary_pattern.match(line): 879 suite_name=test_suite_summary_match.group("suite_name") 880 suite_status=test_suite_summary_match.group("suite_status") 881 self._match = False 882 self.ztest = True 883 self.end_suite(suite_name, 'TS_SUM', suite_status=suite_status) 884 elif test_case_summary_match := self.test_case_summary_pattern.match(line): 885 matched_status = test_case_summary_match.group(1) 886 suite_name = test_case_summary_match.group(2) 887 tc_name = test_case_summary_match.group(4) 888 tc = self.get_testcase(tc_name, 'TS_SUM', suite_name) 889 self.end_case(tc.name, 'TS_SUM') 890 tc.status = TwisterStatus[matched_status] 891 if tc.status == TwisterStatus.SKIP: 892 tc.reason = "ztest skip" 893 tc.duration = float(test_case_summary_match.group(5)) 894 if tc.status == TwisterStatus.FAIL: 895 tc.output = self.testcase_output 896 self.testcase_output = "" 897 self._match = False 898 self.ztest = True 899 900 self.process_test(line) 901 902 if not self.ztest and self.status != TwisterStatus.NONE: 903 logger.debug(f"not a ztest and no state for {self.id}") 904 tc = self.instance.get_case_or_create(self.id) 905 if self.status == TwisterStatus.PASS: 906 tc.status = TwisterStatus.PASS 907 else: 908 tc.status = TwisterStatus.FAIL 909 tc.reason = "Test failure" 910 911 912class Ztest(Test): 913 pass 914 915 916class Bsim(Harness): 917 918 def build(self): 919 """ 920 Copying the application executable to BabbleSim's bin directory enables 921 running multidevice bsim tests after twister has built them. 922 """ 923 924 if self.instance is None: 925 return 926 927 original_exe_path: str = os.path.join(self.instance.build_dir, 'zephyr', 'zephyr.exe') 928 if not os.path.exists(original_exe_path): 929 logger.warning('Cannot copy bsim exe - cannot find original executable.') 930 return 931 932 bsim_out_path: str = os.getenv('BSIM_OUT_PATH', '') 933 if not bsim_out_path: 934 logger.warning('Cannot copy bsim exe - BSIM_OUT_PATH not provided.') 935 return 936 937 new_exe_name: str = self.instance.testsuite.harness_config.get('bsim_exe_name', '') 938 if new_exe_name: 939 new_exe_name = f'bs_{self.instance.platform.name}_{new_exe_name}' 940 else: 941 new_exe_name = self.instance.name 942 new_exe_name = f'bs_{new_exe_name}' 943 944 new_exe_name = new_exe_name.replace(os.path.sep, '_').replace('.', '_').replace('@', '_') 945 946 new_exe_path: str = os.path.join(bsim_out_path, 'bin', new_exe_name) 947 logger.debug(f'Copying executable from {original_exe_path} to {new_exe_path}') 948 shutil.copy(original_exe_path, new_exe_path) 949 950 951class HarnessImporter: 952 953 @staticmethod 954 def get_harness(harness_name): 955 thismodule = sys.modules[__name__] 956 try: 957 if harness_name: 958 harness_class = getattr(thismodule, harness_name) 959 else: 960 harness_class = thismodule.Test 961 return harness_class() 962 except AttributeError as e: 963 logger.debug(f"harness {harness_name} not implemented: {e}") 964 return None 965