1# SPDX-License-Identifier: Apache-2.0
2from __future__ import annotations
4import json
5import logging
6import os
7import platform
8import re
9import shlex
10import shutil
11import subprocess
12import sys
13import threading
14import time
15import xml.etree.ElementTree as ET
16from collections import OrderedDict
17from enum import Enum
19import junitparser.junitparser as junit
20import yaml
21from pytest import ExitCode
22from twisterlib.constants import SUPPORTED_SIMS_IN_PYTEST
23from twisterlib.environment import PYTEST_PLUGIN_INSTALLED, ZEPHYR_BASE
24from twisterlib.error import ConfigurationError, StatusAttributeError
25from twisterlib.handlers import Handler, terminate_process
26from twisterlib.reports import ReportStatus
27from twisterlib.statuses import TwisterStatus
28from twisterlib.testinstance import TestInstance
30logger = logging.getLogger('twister')
33_WINDOWS = platform.system() == 'Windows'
36class Harness:
42    run_id_pattern = r"RunID: (?P<run_id>.*)"
44    def __init__(self):
45        self._status = TwisterStatus.NONE
46        self.reason = None
47        self.type = None
48        self.regex = []
49        self.matches = OrderedDict()
50        self.ordered = True
51        self.id = None
52        self.fail_on_fault = True
53        self.fault = False
54        self.capture_coverage = False
55        self.next_pattern = 0
56        self.record = None
57        self.record_patterns = []
58        self.record_merge = False
59        self.record_as_json = None
60        self.recording = []
61        self.ztest = False
62        self.detected_suite_names = []
63        self.run_id = None
64        self.started_suites = {}
65        self.started_cases = {}
66        self.matched_run_id = False
67        self.run_id_exists = False
68        self.instance: TestInstance | None = None
69        self.testcase_output = ""
70        self._match = False
73    @property
74    def trace(self) -> bool:
75        return self.instance.handler.options.verbose > 2
77    @property
78    def status(self) -> TwisterStatus:
79        return self._status
81    @status.setter
82    def status(self, value : TwisterStatus) -> None:
83        # Check for illegal assignments by value
84        try:
85            key = value.name if isinstance(value, Enum) else value
86            self._status = TwisterStatus[key]
87        except KeyError as err:
88            raise StatusAttributeError(self.__class__, value) from err
90    def configure(self, instance):
91        self.instance = instance
92        config = instance.testsuite.harness_config
93        self.id = instance.testsuite.id
94        self.run_id = instance.run_id
95        if instance.testsuite.ignore_faults:
96            self.fail_on_fault = False
98        if config:
99            self.type = config.get('type', None)
100            self.regex = config.get('regex', [])
101            self.ordered = config.get('ordered', True)
102            self.record = config.get('record', {})
103            if self.record:
104                self.record_patterns = [re.compile(p) for p in self.record.get("regex", [])]
105                self.record_merge = self.record.get("merge", False)
106                self.record_as_json = self.record.get("as_json")
108    def build(self):
109        pass
111    def get_testcase_name(self):
112        """
113        Get current TestCase name.
114        """
115        return self.id
117    def translate_record(self, record: dict) -> dict:
118        if self.record_as_json:
119            for k in self.record_as_json:
120                if k not in record:
121                    continue
122                try:
123                    record[k] = json.loads(record[k]) if record[k] else {}
124                except json.JSONDecodeError as parse_error:
125                    logger.warning(f"HARNESS:{self.__class__.__name__}: recording JSON failed:"
126                                   f" {parse_error} for '{k}':'{record[k]}'")
127                    # Don't set the Harness state to failed for recordings.
128                    record[k] = { 'ERROR': { 'msg': str(parse_error), 'doc': record[k] } }
129        return record
131    def parse_record(self, line) -> int:
132        match_cnt = 0
133        for record_pattern in self.record_patterns:
134            match = record_pattern.search(line)
135            if match:
136                match_cnt += 1
137                rec = self.translate_record(
138                    { k:v.strip() for k,v in match.groupdict(default="").items() }
139                )
140                if self.record_merge and len(self.recording) > 0:
141                    for k,v in rec.items():
142                        if k in self.recording[0]:
143                            if isinstance(self.recording[0][k], list):
144                                self.recording[0][k].append(v)
145                            else:
146                                self.recording[0][k] = [self.recording[0][k], v]
147                        else:
148                            self.recording[0][k] = v
149                else:
150                    self.recording.append(rec)
151        return match_cnt
153    def process_test(self, line):
155        self.parse_record(line)
157        runid_match = re.search(self.run_id_pattern, line)
158        if runid_match:
159            run_id = runid_match.group("run_id")
160            self.run_id_exists = True
161            if run_id == str(self.run_id):
162                self.matched_run_id = True
164        if self.RUN_PASSED in line:
165            if self.fault:
166                self.status = TwisterStatus.FAIL
167                self.reason = "Fault detected while running test"
168            else:
169                self.status = TwisterStatus.PASS
171        if self.RUN_FAILED in line:
172            self.status = TwisterStatus.FAIL
173            self.reason = "Testsuite failed"
175        if self.fail_on_fault and line == self.FAULT:
176            self.fault = True
178        if self.GCOV_START in line:
179            self.capture_coverage = True
180        elif self.GCOV_END in line:
181            self.capture_coverage = False
183class Robot(Harness):
185    is_robot_test = True
187    def configure(self, instance):
188        super().configure(instance)
189        self.instance = instance
191        config = instance.testsuite.harness_config
192        if config:
193            self.path = config.get('robot_testsuite', None)
194            self.option = config.get('robot_option', None)
196    def handle(self, line):
197        ''' Test cases that make use of this harness care about results given
198            by Robot Framework which is called in run_robot_test(), so works of this
199            handle is trying to give a PASS or FAIL to avoid timeout, nothing
200            is written into handler.log
201        '''
202        self.instance.status = TwisterStatus.PASS
203        tc = self.instance.get_case_or_create(self.id)
204        tc.status = TwisterStatus.PASS
206    def run_robot_test(self, command, handler):
207        start_time = time.time()
208        env = os.environ.copy()
210        if self.option:
211            if isinstance(self.option, list):
212                for option in self.option:
213                    for v in str(option).split():
214                        command.append(f'{v}')
215            else:
216                for v in str(self.option).split():
217                    command.append(f'{v}')
219        if self.path is None:
220            raise PytestHarnessException('The parameter robot_testsuite is mandatory')
222        if isinstance(self.path, list):
223            for suite in self.path:
224                command.append(os.path.join(handler.sourcedir, suite))
225        else:
226            command.append(os.path.join(handler.sourcedir, self.path))
228        with subprocess.Popen(command, stdout=subprocess.PIPE,
229                stderr=subprocess.STDOUT, cwd=self.instance.build_dir, env=env) as renode_test_proc:
230            out, _ = renode_test_proc.communicate()
232            self.instance.execution_time = time.time() - start_time
234            if renode_test_proc.returncode == 0:
235                self.instance.status = TwisterStatus.PASS
236                # all tests in one Robot file are treated as a single test case,
237                # so its status should be set accordingly to the instance status
238                # please note that there should be only one testcase in testcases list
239                self.instance.testcases[0].status = TwisterStatus.PASS
240            else:
241                logger.error(
242                    f"Robot test failure: {handler.sourcedir} for {self.instance.platform.name}"
243                )
244                self.instance.status = TwisterStatus.FAIL
245                self.instance.testcases[0].status = TwisterStatus.FAIL
247            if out:
248                with open(os.path.join(self.instance.build_dir, handler.log), 'w') as log:
249                    log_msg = out.decode(sys.getdefaultencoding())
250                    log.write(log_msg)
252class Console(Harness):
254    def get_testcase_name(self):
255        '''
256        Get current TestCase name.
258        Console Harness id has only TestSuite id without TestCase name suffix.
259        Only the first TestCase name might be taken if available when a Ztest with
260        a single test case is configured to use this harness type for simplified
261        output parsing instead of the Ztest harness as Ztest suite should do.
262        '''
263        if self.instance and len(self.instance.testcases) == 1:
264            return self.instance.testcases[0].name
265        return super().get_testcase_name()
267    def configure(self, instance):
268        super().configure(instance)
269        if self.regex is None or len(self.regex) == 0:
270            self.status = TwisterStatus.FAIL
271            tc = self.instance.set_case_status_by_name(
272                self.get_testcase_name(),
273                TwisterStatus.FAIL,
274                f"HARNESS:{self.__class__.__name__}:no regex patterns configured."
275            )
276            raise ConfigurationError(self.instance.name, tc.reason)
277        if self.type == "one_line":
278            self.pattern = re.compile(self.regex[0])
279            self.patterns_expected = 1
280        elif self.type == "multi_line":
281            self.patterns = []
282            for r in self.regex:
283                self.patterns.append(re.compile(r))
284            self.patterns_expected = len(self.patterns)
285        else:
286            self.status = TwisterStatus.FAIL
287            tc = self.instance.set_case_status_by_name(
288                self.get_testcase_name(),
289                TwisterStatus.FAIL,
290                f"HARNESS:{self.__class__.__name__}:incorrect type={self.type}"
291            )
292            raise ConfigurationError(self.instance.name, tc.reason)
293        #
295    def handle(self, line):
296        if self.type == "one_line":
297            if self.pattern.search(line):
298                logger.debug(f"HARNESS:{self.__class__.__name__}:EXPECTED:"
299                             f"'{self.pattern.pattern}'")
300                self.next_pattern += 1
301                self.status = TwisterStatus.PASS
302        elif self.type == "multi_line" and self.ordered:
303            if (self.next_pattern < len(self.patterns) and
304                self.patterns[self.next_pattern].search(line)):
305                logger.debug(f"HARNESS:{self.__class__.__name__}:EXPECTED("
306                             f"{self.next_pattern + 1}/{self.patterns_expected}):"
307                             f"'{self.patterns[self.next_pattern].pattern}'")
308                self.next_pattern += 1
309                if self.next_pattern >= len(self.patterns):
310                    self.status = TwisterStatus.PASS
311        elif self.type == "multi_line" and not self.ordered:
312            for i, pattern in enumerate(self.patterns):
313                r = self.regex[i]
314                if pattern.search(line) and r not in self.matches:
315                    self.matches[r] = line
316                    logger.debug(f"HARNESS:{self.__class__.__name__}:EXPECTED("
317                                 f"{len(self.matches)}/{self.patterns_expected}):"
318                                 f"'{pattern.pattern}'")
319            if len(self.matches) == len(self.regex):
320                self.status = TwisterStatus.PASS
321        else:
322            logger.error("Unknown harness_config type")
324        if self.fail_on_fault and self.FAULT in line:
325            self.fault = True
327        if self.GCOV_START in line:
328            self.capture_coverage = True
329        elif self.GCOV_END in line:
330            self.capture_coverage = False
332        self.process_test(line)
333        # Reset the resulting test state to FAIL when not all of the patterns were
334        # found in the output, but just ztest's 'PROJECT EXECUTION SUCCESSFUL'.
335        # It might happen because of the pattern sequence diverged from the
336        # test code, the test platform has console issues, or even some other
337        # test image was executed.
338        # TODO: Introduce explicit match policy type to reject
339        # unexpected console output, allow missing patterns, deny duplicates.
340        if self.status == TwisterStatus.PASS and \
341           self.ordered and \
342           self.next_pattern < self.patterns_expected:
343            logger.error(f"HARNESS:{self.__class__.__name__}: failed with"
344                         f" {self.next_pattern} of {self.patterns_expected}"
345                         f" expected ordered patterns.")
346            self.status = TwisterStatus.FAIL
347            self.reason = "patterns did not match (ordered)"
348        if self.status == TwisterStatus.PASS and \
349           not self.ordered and \
350           len(self.matches) < self.patterns_expected:
351            logger.error(f"HARNESS:{self.__class__.__name__}: failed with"
352                         f" {len(self.matches)} of {self.patterns_expected}"
353                         f" expected unordered patterns.")
354            self.status = TwisterStatus.FAIL
355            self.reason = "patterns did not match (unordered)"
357        tc = self.instance.get_case_or_create(self.get_testcase_name())
358        if self.status == TwisterStatus.PASS:
359            tc.status = TwisterStatus.PASS
360        else:
361            tc.status = TwisterStatus.FAIL
364class PytestHarnessException(Exception):
365    """General exception for pytest."""
368class Pytest(Harness):
370    def configure(self, instance: TestInstance):
371        super().configure(instance)
372        self.running_dir = instance.build_dir
373        self.source_dir = instance.testsuite.source_dir
374        self.report_file = os.path.join(self.running_dir, 'report.xml')
375        self.pytest_log_file_path = os.path.join(self.running_dir, 'twister_harness.log')
376        self.reserved_dut = None
377        self._output = []
379    def pytest_run(self, timeout):
380        try:
381            cmd = self.generate_command()
382            self.run_command(cmd, timeout)
383        except PytestHarnessException as pytest_exception:
384            logger.error(str(pytest_exception))
385            self.status = TwisterStatus.FAIL
386            self.instance.reason = str(pytest_exception)
387        finally:
388            self.instance.record(self.recording)
389            self._update_test_status()
390            if self.reserved_dut:
391                self.instance.handler.make_dut_available(self.reserved_dut)
393    def generate_command(self):
394        config = self.instance.testsuite.harness_config
395        handler: Handler = self.instance.handler
396        pytest_root = config.get('pytest_root', ['pytest']) if config else ['pytest']
397        pytest_args_yaml = config.get('pytest_args', []) if config else []
398        pytest_dut_scope = config.get('pytest_dut_scope', None) if config else None
399        command = [
400            'pytest',
401            '--twister-harness',
402            '-s', '-v',
403            f'--build-dir={self.running_dir}',
404            f'--junit-xml={self.report_file}',
405            f'--platform={self.instance.platform.name}'
406        ]
408        command.extend([os.path.normpath(os.path.join(
409            self.source_dir, os.path.expanduser(os.path.expandvars(src)))) for src in pytest_root])
411        if pytest_dut_scope:
412            command.append(f'--dut-scope={pytest_dut_scope}')
414        # Always pass output from the pytest test and the test image up to Twister log.
415        command.extend([
416            '--log-cli-level=DEBUG',
417            '--log-cli-format=%(levelname)s: %(message)s'
418        ])
420        # Use the test timeout as the base timeout for pytest
421        base_timeout = handler.get_test_timeout()
422        command.append(f'--base-timeout={base_timeout}')
424        if handler.type_str == 'device':
425            command.extend(
426                self._generate_parameters_for_hardware(handler)
427            )
428        elif handler.type_str in SUPPORTED_SIMS_IN_PYTEST:
429            command.append(f'--device-type={handler.type_str}')
430        elif handler.type_str == 'build':
431            command.append('--device-type=custom')
432        else:
433            raise PytestHarnessException(
434                f'Support for handler {handler.type_str} not implemented yet'
435            )
437        if handler.type_str != 'device':
438            for fixture in handler.options.fixture:
439                command.append(f'--twister-fixture={fixture}')
441        if handler.options.extra_test_args and handler.type_str == 'native':
442            command.append(f'--extra-test-args={shlex.join(handler.options.extra_test_args)}')
444        command.extend(pytest_args_yaml)
446        if handler.options.pytest_args:
447            command.extend(handler.options.pytest_args)
449        return command
451    def _generate_parameters_for_hardware(self, handler: Handler):
452        command = ['--device-type=hardware']
453        hardware = handler.get_hardware()
454        if not hardware:
455            raise PytestHarnessException('Hardware is not available')
456        # update the instance with the device id to have it in the summary report
457        self.instance.dut = hardware.id
459        self.reserved_dut = hardware
460        if hardware.serial_pty:
461            command.append(f'--device-serial-pty={hardware.serial_pty}')
462        else:
463            command.extend([
464                f'--device-serial={hardware.serial}',
465                f'--device-serial-baud={hardware.baud}'
466            ])
468        if hardware.flash_timeout:
469            command.append(f'--flash-timeout={hardware.flash_timeout}')
471        options = handler.options
472        if runner := hardware.runner or options.west_runner:
473            command.append(f'--runner={runner}')
475        if hardware.runner_params:
476            for param in hardware.runner_params:
477                command.append(f'--runner-params={param}')
479        if options.west_flash and options.west_flash != []:
480            command.append(f'--west-flash-extra-args={options.west_flash}')
482        if board_id := hardware.probe_id or hardware.id:
483            command.append(f'--device-id={board_id}')
485        if hardware.product:
486            command.append(f'--device-product={hardware.product}')
488        if hardware.pre_script:
489            command.append(f'--pre-script={hardware.pre_script}')
491        if hardware.post_flash_script:
492            command.append(f'--post-flash-script={hardware.post_flash_script}')
494        if hardware.post_script:
495            command.append(f'--post-script={hardware.post_script}')
497        if hardware.flash_before:
498            command.append(f'--flash-before={hardware.flash_before}')
500        for fixture in hardware.fixtures:
501            command.append(f'--twister-fixture={fixture}')
503        return command
505    def run_command(self, cmd, timeout):
506        cmd, env = self._update_command_with_env_dependencies(cmd)
507        with subprocess.Popen(
508            cmd,
509            stdout=subprocess.PIPE,
510            stderr=subprocess.STDOUT,
511            env=env
512        ) as proc:
513            try:
514                reader_t = threading.Thread(target=self._output_reader, args=(proc,), daemon=True)
515                reader_t.start()
516                reader_t.join(timeout)
517                if reader_t.is_alive():
518                    terminate_process(proc)
519                    logger.warning('Timeout has occurred. Can be extended in testspec file. '
520                                   f'Currently set to {timeout} seconds.')
521                    self.instance.reason = 'Pytest timeout'
522                    self.status = TwisterStatus.FAIL
523                proc.wait(timeout)
524            except subprocess.TimeoutExpired:
525                self.status = TwisterStatus.FAIL
526                proc.kill()
528        if proc.returncode in (ExitCode.INTERRUPTED, ExitCode.USAGE_ERROR, ExitCode.INTERNAL_ERROR):
529            self.status = TwisterStatus.ERROR
530            self.instance.reason = f'Pytest error - return code {proc.returncode}'
531        with open(self.pytest_log_file_path, 'w') as log_file:
532            log_file.write(shlex.join(cmd) + '\n\n')
533            log_file.write('\n'.join(self._output))
535    @staticmethod
536    def _update_command_with_env_dependencies(cmd):
537        '''
538        If python plugin wasn't installed by pip, then try to indicate it to
539        pytest by update PYTHONPATH and append -p argument to pytest command.
540        '''
541        env = os.environ.copy()
542        if not PYTEST_PLUGIN_INSTALLED:
543            cmd.extend(['-p', 'twister_harness.plugin'])
544            pytest_plugin_path = os.path.join(
545                ZEPHYR_BASE,
546                'scripts',
547                'pylib',
548                'pytest-twister-harness',
549                'src'
550            )
551            env['PYTHONPATH'] = pytest_plugin_path + os.pathsep + env.get('PYTHONPATH', '')
552            if _WINDOWS:
553                cmd_append_python_path = f'set PYTHONPATH={pytest_plugin_path};%PYTHONPATH% && '
554            else:
555                cmd_append_python_path = (
556                    f'export PYTHONPATH={pytest_plugin_path}:${{PYTHONPATH}} && '
557                )
558        else:
559            cmd_append_python_path = ''
560        cmd_to_print = cmd_append_python_path + shlex.join(cmd)
561        logger.debug(f'Running pytest command: {cmd_to_print}')
563        return cmd, env
565    def _output_reader(self, proc):
566        self._output = []
567        while proc.stdout.readable() and proc.poll() is None:
568            line = proc.stdout.readline().decode().strip()
569            if not line:
570                continue
571            self._output.append(line)
572            logger.debug(f'PYTEST: {line}')
573            self.parse_record(line)
574        proc.communicate()
576    def _update_test_status(self):
577        if self.status == TwisterStatus.NONE:
578            self.instance.testcases = []
579            try:
580                self._parse_report_file(self.report_file)
581            except Exception as e:
582                logger.error(f'Error when parsing file {self.report_file}: {e}')
583                self.status = TwisterStatus.FAIL
584            finally:
585                if not self.instance.testcases:
586                    self.instance.init_cases()
588        self.instance.status = self.status if self.status != TwisterStatus.NONE else \
589                               TwisterStatus.FAIL
590        if self.instance.status in [TwisterStatus.ERROR, TwisterStatus.FAIL]:
591            self.instance.reason = self.instance.reason or 'Pytest failed'
592            self.instance.add_missing_case_status(TwisterStatus.BLOCK, self.instance.reason)
594    def _parse_report_file(self, report):
595        tree = ET.parse(report)
596        root = tree.getroot()
598        if (elem_ts := root.find('testsuite')) is not None:
599            if elem_ts.get('failures') != '0':
600                self.status = TwisterStatus.FAIL
601                self.instance.reason = (
602                    f"{elem_ts.get('failures')}/{elem_ts.get('tests')} pytest scenario(s) failed"
603                )
604            elif elem_ts.get('errors') != '0':
605                self.status = TwisterStatus.ERROR
606                self.instance.reason = 'Error during pytest execution'
607            elif elem_ts.get('skipped') == elem_ts.get('tests'):
608                self.status = TwisterStatus.SKIP
609            else:
610                self.status = TwisterStatus.PASS
611            self.instance.execution_time = float(elem_ts.get('time'))
613            for elem_tc in elem_ts.findall('testcase'):
614                tc = self.instance.add_testcase(f"{self.id}.{elem_tc.get('name')}")
615                tc.duration = float(elem_tc.get('time'))
616                elem = elem_tc.find('*')
617                if elem is None:
618                    tc.status = TwisterStatus.PASS
619                else:
620                    if elem.tag == ReportStatus.SKIP:
621                        tc.status = TwisterStatus.SKIP
622                    elif elem.tag == ReportStatus.FAIL:
623                        tc.status = TwisterStatus.FAIL
624                    else:
625                        tc.status = TwisterStatus.ERROR
626                    tc.reason = elem.get('message')
627                    tc.output = elem.text
628        else:
629            self.status = TwisterStatus.SKIP
630            self.instance.reason = 'No tests collected'
633class Shell(Pytest):
634    def generate_command(self):
635        config = self.instance.testsuite.harness_config
636        pytest_root = [os.path.join(ZEPHYR_BASE, 'scripts', 'pylib', 'shell-twister-harness')]
637        config['pytest_root'] = pytest_root
639        command = super().generate_command()
640        if test_shell_file := self._get_shell_commands_file(config):
641            command.append(f'--testdata={test_shell_file}')
642        else:
643            logger.warning('No shell commands provided')
644        return command
646    def _get_shell_commands_file(self, harness_config):
647        if shell_commands := harness_config.get('shell_commands'):
648            test_shell_file = os.path.join(self.running_dir, 'test_shell.yml')
649            with open(test_shell_file, 'w') as f:
650                yaml.dump(shell_commands, f)
651            return test_shell_file
653        test_shell_file = harness_config.get('shell_commands_file', 'test_shell.yml')
654        test_shell_file = os.path.join(
655            self.source_dir, os.path.expanduser(os.path.expandvars(test_shell_file))
656        )
657        if os.path.exists(test_shell_file):
658            return test_shell_file
659        return None
662class Gtest(Harness):
663    ANSI_ESCAPE = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
664    _NAME_PATTERN = "[a-zA-Z_][a-zA-Z0-9_]*"
665    _SUITE_TEST_NAME_PATTERN = f"(?P<suite_name>{_NAME_PATTERN})\\.(?P<test_name>{_NAME_PATTERN})"
667    TEST_PASS_PATTERN = f".*\\[       OK \\] {_SUITE_TEST_NAME_PATTERN}"
671        ".*(?:\\[==========\\] Done running all tests\\.|"
672        + "\\[----------\\] Global test environment tear-down)"
673    )
675    def __init__(self):
676        super().__init__()
677        self.tc = None
678        self.has_failures = False
680    def handle(self, line):
681        # Strip the ANSI characters, they mess up the patterns
682        non_ansi_line = self.ANSI_ESCAPE.sub('', line)
684        if self.status != TwisterStatus.NONE:
685            return
687        # Check if we started running a new test
688        test_start_match = re.search(self.TEST_START_PATTERN, non_ansi_line)
689        if test_start_match:
690            # Add the suite name
691            suite_name = test_start_match.group("suite_name")
692            if suite_name not in self.detected_suite_names:
693                self.detected_suite_names.append(suite_name)
695            # Generate the internal name of the test
696            name = "{}.{}.{}".format(self.id, suite_name, test_start_match.group("test_name"))
698            # Assert that we don't already have a running test
699            assert (
700                self.tc is None
701            ), f"gTest error, {self.tc} didn't finish"
703            # Check that the instance doesn't exist yet (prevents re-running)
704            tc = self.instance.get_case_by_name(name)
705            assert tc is None, f"gTest error, {tc} running twice"
707            # Create the test instance and set the context
708            tc = self.instance.get_case_or_create(name)
709            self.tc = tc
710            self.tc.status = TwisterStatus.STARTED
711            self.testcase_output += line + "\n"
712            self._match = True
714        # Check if the test run finished
715        finished_match = re.search(self.FINISHED_PATTERN, non_ansi_line)
716        if finished_match:
717            tc = self.instance.get_case_or_create(self.id)
718            if self.has_failures or self.tc is not None:
719                self.status = TwisterStatus.FAIL
720                tc.status = TwisterStatus.FAIL
721            else:
722                self.status = TwisterStatus.PASS
723                tc.status = TwisterStatus.PASS
724            return
726        # Check if the individual test finished
727        state, name = self._check_result(non_ansi_line)
728        if state == TwisterStatus.NONE or name is None:
729            # Nothing finished, keep processing lines
730            return
732        # Get the matching test and make sure it's the same as the current context
733        tc = self.instance.get_case_by_name(name)
734        assert (
735            tc is not None and tc == self.tc
736        ), f"gTest error, mismatched tests. Expected {self.tc} but got {tc}"
738        # Test finished, clear the context
739        self.tc = None
741        # Update the status of the test
742        tc.status = state
743        if tc.status == TwisterStatus.FAIL:
744            self.has_failures = True
745            tc.output = self.testcase_output
746        self.testcase_output = ""
747        self._match = False
749    def _check_result(self, line):
750        test_pass_match = re.search(self.TEST_PASS_PATTERN, line)
751        if test_pass_match:
752            return TwisterStatus.PASS, \
753                   "{}.{}.{}".format(
754                        self.id, test_pass_match.group("suite_name"),
755                        test_pass_match.group("test_name")
756                    )
757        test_skip_match = re.search(self.TEST_SKIP_PATTERN, line)
758        if test_skip_match:
759            return TwisterStatus.SKIP, \
760                   "{}.{}.{}".format(
761                       self.id, test_skip_match.group("suite_name"),
762                       test_skip_match.group("test_name")
763                    )
764        test_fail_match = re.search(self.TEST_FAIL_PATTERN, line)
765        if test_fail_match:
766            return TwisterStatus.FAIL, \
767                   "{}.{}.{}".format(
768                       self.id, test_fail_match.group("suite_name"),
769                       test_fail_match.group("test_name")
770                    )
771        return None, None
774class Test(Harness):
775    __test__ = False  # for pytest to skip this class when collects tests
777    test_suite_start_pattern = re.compile(r"Running TESTSUITE (?P<suite_name>\S*)")
778    test_suite_end_pattern = re.compile(
779        r"TESTSUITE (?P<suite_name>\S*)\s+(?P<suite_status>succeeded|failed)"
780    )
781    test_case_start_pattern = re.compile(r"START - (test_)?([a-zA-Z0-9_-]+)")
782    test_case_end_pattern = re.compile(
783        r".*(PASS|FAIL|SKIP) - (test_)?(\S*) in (\d*[.,]?\d*) seconds"
784    )
785    test_suite_summary_pattern = re.compile(
786        r"SUITE (?P<suite_status>\S*) - .* \[(?P<suite_name>\S*)\]:"
787        r" .* duration = (\d*[.,]?\d*) seconds"
788    )
789    test_case_summary_pattern = re.compile(
790        r" - (PASS|FAIL|SKIP) - \[([^\.]*).(test_)?(\S*)\] duration = (\d*[.,]?\d*) seconds"
791    )
794    def get_testcase(self, tc_name, phase, ts_name=None):
795        """ Search a Ztest case among detected in the test image binary
796            expecting the same test names as already known from the ELF.
797            Track suites and cases unexpectedly found in the log.
798        """
799        ts_names = self.started_suites.keys()
800        if ts_name:
801            if self.trace and ts_name not in self.instance.testsuite.ztest_suite_names:
802                # This can happen if a ZTEST_SUITE name is macro-generated
803                # in the test source files, e.g. based on DT information.
804                logger.debug(f"{phase}: unexpected Ztest suite '{ts_name}' is "
805                             f"not present among: {self.instance.testsuite.ztest_suite_names}")
806            if ts_name not in self.detected_suite_names:
807                if self.trace:
808                    logger.debug(f"{phase}: detected new Ztest suite '{ts_name}'")
809                self.detected_suite_names.append(ts_name)
810            ts_names = [ ts_name ] if ts_name in ts_names else []
812        # First, try to match the test case ID to the first running Ztest suite with this test name.
813        for ts_name_ in ts_names:
814            if self.started_suites[ts_name_]['count'] < (0 if phase == 'TS_SUM' else 1):
815                continue
816            tc_fq_id = self.instance.compose_case_name(f"{ts_name_}.{tc_name}")
817            if tc := self.instance.get_case_by_name(tc_fq_id):
818                if self.trace:
819                    logger.debug(f"{phase}: Ztest case '{tc_name}' matched to '{tc_fq_id}")
820                return tc
821        logger.debug(
822            f"{phase}: Ztest case '{tc_name}' is not known"
823            f" in {self.started_suites} running suite(s)."
824        )
825        tc_id = self.instance.compose_case_name(tc_name)
826        return self.instance.get_case_or_create(tc_id)
828    def start_suite(self, suite_name, phase='TS_START'):
829        if suite_name not in self.detected_suite_names:
830            self.detected_suite_names.append(suite_name)
831        if self.trace and suite_name not in self.instance.testsuite.ztest_suite_names:
832            # This can happen if a ZTEST_SUITE name is macro-generated
833            # in the test source files, e.g. based on DT information.
834            logger.debug(f"{phase}: unexpected Ztest suite '{suite_name}' is "
835                         f"not present among: {self.instance.testsuite.ztest_suite_names}")
836        if suite_name in self.started_suites:
837            if self.started_suites[suite_name]['count'] > 0:
838                # Either the suite restarts itself or unexpected state transition.
839                logger.warning(f"{phase}: already STARTED '{suite_name}':"
840                               f"{self.started_suites[suite_name]}")
841            elif self.trace:
842                logger.debug(f"{phase}: START suite '{suite_name}'")
843            self.started_suites[suite_name]['count'] += 1
844            self.started_suites[suite_name]['repeat'] += 1
845        else:
846            self.started_suites[suite_name] = { 'count': 1, 'repeat': 0 }
848    def end_suite(self, suite_name, phase='TS_END', suite_status=None):
849        if suite_name in self.started_suites:
850            if phase == 'TS_SUM' and self.started_suites[suite_name]['count'] == 0:
851                return
852            if self.started_suites[suite_name]['count'] < 1:
853                logger.error(
854                    f"{phase}: already ENDED suite '{suite_name}':{self.started_suites[suite_name]}"
855                )
856            elif self.trace:
857                logger.debug(f"{phase}: END suite '{suite_name}':{self.started_suites[suite_name]}")
858            self.started_suites[suite_name]['count'] -= 1
859        elif suite_status == 'SKIP':
860            self.start_suite(suite_name, phase)  # register skipped suites at their summary end
861            self.started_suites[suite_name]['count'] -= 1
862        else:
863            logger.warning(f"{phase}: END suite '{suite_name}' without START detected")
865    def start_case(self, tc_name, phase='TC_START'):
866        if tc_name in self.started_cases:
867            if self.started_cases[tc_name]['count'] > 0:
868                logger.warning(f"{phase}: already STARTED case "
869                               f"'{tc_name}':{self.started_cases[tc_name]}")
870            self.started_cases[tc_name]['count'] += 1
871        else:
872            self.started_cases[tc_name] = { 'count': 1 }
874    def end_case(self, tc_name, phase='TC_END'):
875        if tc_name in self.started_cases:
876            if phase == 'TS_SUM' and self.started_cases[tc_name]['count'] == 0:
877                return
878            if self.started_cases[tc_name]['count'] < 1:
879                logger.error(
880                    f"{phase}: already ENDED case '{tc_name}':{self.started_cases[tc_name]}"
881                )
882            elif self.trace:
883                logger.debug(f"{phase}: END case '{tc_name}':{self.started_cases[tc_name]}")
884            self.started_cases[tc_name]['count'] -= 1
885        elif phase != 'TS_SUM':
886            logger.warning(f"{phase}: END case '{tc_name}' without START detected")
889    def handle(self, line):
890        testcase_match = None
891        if self._match:
892            self.testcase_output += line + "\n"
894        if test_suite_start_match := re.search(self.test_suite_start_pattern, line):
895            self.start_suite(test_suite_start_match.group("suite_name"))
896        elif test_suite_end_match := re.search(self.test_suite_end_pattern, line):
897            suite_name=test_suite_end_match.group("suite_name")
898            self.end_suite(suite_name)
899        elif testcase_match := re.search(self.test_case_start_pattern, line):
900            tc_name = testcase_match.group(2)
901            tc = self.get_testcase(tc_name, 'TC_START')
902            self.start_case(tc.name)
903            # Mark the test as started, if something happens here, it is mostly
904            # due to this tests, for example timeout. This should in this case
905            # be marked as failed and not blocked (not run).
906            tc.status = TwisterStatus.STARTED
907            if not self._match:
908                self.testcase_output += line + "\n"
909                self._match = True
910        # some testcases are skipped based on predicates and do not show up
911        # during test execution, however they are listed in the summary. Parse
912        # the summary for status and use that status instead.
913        elif result_match := self.test_case_end_pattern.match(line):
914            matched_status = result_match.group(1)
915            tc_name = result_match.group(3)
916            tc = self.get_testcase(tc_name, 'TC_END')
917            self.end_case(tc.name)
918            tc.status = TwisterStatus[matched_status]
919            if tc.status == TwisterStatus.SKIP:
920                tc.reason = "ztest skip"
921            tc.duration = float(result_match.group(4))
922            if tc.status == TwisterStatus.FAIL:
923                tc.output = self.testcase_output
924            self.testcase_output = ""
925            self._match = False
926            self.ztest = True
927        elif test_suite_summary_match := self.test_suite_summary_pattern.match(line):
928            suite_name=test_suite_summary_match.group("suite_name")
929            suite_status=test_suite_summary_match.group("suite_status")
930            self._match = False
931            self.ztest = True
932            self.end_suite(suite_name, 'TS_SUM', suite_status=suite_status)
933        elif test_case_summary_match := self.test_case_summary_pattern.match(line):
934            matched_status = test_case_summary_match.group(1)
935            suite_name = test_case_summary_match.group(2)
936            tc_name = test_case_summary_match.group(4)
937            tc = self.get_testcase(tc_name, 'TS_SUM', suite_name)
938            self.end_case(tc.name, 'TS_SUM')
939            tc.status = TwisterStatus[matched_status]
940            if tc.status == TwisterStatus.SKIP:
941                tc.reason = "ztest skip"
942            tc.duration = float(test_case_summary_match.group(5))
943            if tc.status == TwisterStatus.FAIL:
944                tc.output = self.testcase_output
945            self.testcase_output = ""
946            self._match = False
947            self.ztest = True
949        self.process_test(line)
951        if not self.ztest and self.status != TwisterStatus.NONE:
952            logger.debug(f"not a ztest and no state for {self.id}")
953            tc = self.instance.get_case_or_create(self.id)
954            if self.status == TwisterStatus.PASS:
955                tc.status = TwisterStatus.PASS
956            else:
957                tc.status = TwisterStatus.FAIL
958                tc.reason = "Test failure"
961class Ztest(Test):
962    pass
965class Bsim(Harness):
967    def build(self):
968        """
969        Copying the application executable to BabbleSim's bin directory enables
970        running multidevice bsim tests after twister has built them.
971        """
973        if self.instance is None:
974            return
976        original_exe_path: str = os.path.join(self.instance.build_dir, 'zephyr', 'zephyr.exe')
977        if not os.path.exists(original_exe_path):
978            logger.warning('Cannot copy bsim exe - cannot find original executable.')
979            return
981        bsim_out_path: str = os.getenv('BSIM_OUT_PATH', '')
982        if not bsim_out_path:
983            logger.warning('Cannot copy bsim exe - BSIM_OUT_PATH not provided.')
984            return
986        new_exe_name: str = self.instance.testsuite.harness_config.get('bsim_exe_name', '')
987        if new_exe_name:
988            new_exe_name = f'bs_{self.instance.platform.name}_{new_exe_name}'
989        else:
990            new_exe_name = self.instance.name
991            new_exe_name = f'bs_{new_exe_name}'
993        new_exe_name = new_exe_name.replace(os.path.sep, '_').replace('.', '_').replace('@', '_')
995        new_exe_path: str = os.path.join(bsim_out_path, 'bin', new_exe_name)
996        logger.debug(f'Copying executable from {original_exe_path} to {new_exe_path}')
997        shutil.copy(original_exe_path, new_exe_path)
999class Ctest(Harness):
1000    def configure(self, instance: TestInstance):
1001        super().configure(instance)
1002        self.running_dir = instance.build_dir
1003        self.report_file = os.path.join(self.running_dir, 'report.xml')
1004        self.ctest_log_file_path = os.path.join(self.running_dir, 'twister_harness.log')
1005        self._output = []
1007    def ctest_run(self, timeout):
1008        assert self.instance is not None
1009        try:
1010            cmd = self.generate_command()
1011            self.run_command(cmd, timeout)
1012        except Exception as err:
1013            logger.error(str(err))
1014            self.status = TwisterStatus.FAIL
1015            self.instance.reason = str(err)
1016        finally:
1017            self.instance.record(self.recording)
1018            self._update_test_status()
1020    def generate_command(self):
1021        config = self.instance.testsuite.harness_config
1022        handler: Handler = self.instance.handler
1023        ctest_args_yaml = config.get('ctest_args', []) if config else []
1024        command = [
1025            'ctest',
1026            '--build-nocmake',
1027            '--test-dir',
1028            self.running_dir,
1029            '--output-junit',
1030            self.report_file,
1031            '--output-log',
1032            self.ctest_log_file_path,
1033            '--output-on-failure',
1034        ]
1035        base_timeout = handler.get_test_timeout()
1036        command.extend(['--timeout', str(base_timeout)])
1037        command.extend(ctest_args_yaml)
1039        if handler.options.ctest_args:
1040            command.extend(handler.options.ctest_args)
1042        return command
1044    def run_command(self, cmd, timeout):
1045        with subprocess.Popen(
1046            cmd,
1047            stdout=subprocess.PIPE,
1048            stderr=subprocess.STDOUT,
1049        ) as proc:
1050            try:
1051                reader_t = threading.Thread(target=self._output_reader, args=(proc,), daemon=True)
1052                reader_t.start()
1053                reader_t.join(timeout)
1054                if reader_t.is_alive():
1055                    terminate_process(proc)
1056                    logger.warning('Timeout has occurred. Can be extended in testspec file. '
1057                                   f'Currently set to {timeout} seconds.')
1058                    self.instance.reason = 'Ctest timeout'
1059                    self.status = TwisterStatus.FAIL
1060                proc.wait(timeout)
1061            except subprocess.TimeoutExpired:
1062                self.status = TwisterStatus.FAIL
1063                proc.kill()
1065        if proc.returncode in (ExitCode.INTERRUPTED, ExitCode.USAGE_ERROR, ExitCode.INTERNAL_ERROR):
1066            self.status = TwisterStatus.ERROR
1067            self.instance.reason = f'Ctest error - return code {proc.returncode}'
1068            with open(self.ctest_log_file_path, 'w') as log_file:
1069                log_file.write(shlex.join(cmd) + '\n\n')
1070                log_file.write('\n'.join(self._output))
1072    def _output_reader(self, proc):
1073        self._output = []
1074        while proc.stdout.readable() and proc.poll() is None:
1075            line = proc.stdout.readline().decode().strip()
1076            if not line:
1077                continue
1078            self._output.append(line)
1079            logger.debug(f'CTEST: {line}')
1080            self.parse_record(line)
1081        proc.communicate()
1083    def _update_test_status(self):
1084        if self.status == TwisterStatus.NONE:
1085            self.instance.testcases = []
1086            try:
1087                self._parse_report_file(self.report_file)
1088            except Exception as e:
1089                logger.error(f'Error when parsing file {self.report_file}: {e}')
1090                self.status = TwisterStatus.FAIL
1091            finally:
1092                if not self.instance.testcases:
1093                    self.instance.init_cases()
1095        self.instance.status = self.status if self.status != TwisterStatus.NONE else \
1096                               TwisterStatus.FAIL
1097        if self.instance.status in [TwisterStatus.ERROR, TwisterStatus.FAIL]:
1098            self.instance.reason = self.instance.reason or 'Ctest failed'
1099            self.instance.add_missing_case_status(TwisterStatus.BLOCK, self.instance.reason)
1101    def _parse_report_file(self, report):
1102        suite = junit.JUnitXml.fromfile(report)
1103        if suite is None:
1104            self.status = TwisterStatus.SKIP
1105            self.instance.reason = 'No tests collected'
1106            return
1108        assert isinstance(suite, junit.TestSuite)
1110        if suite.failures and suite.failures > 0:
1111            self.status = TwisterStatus.FAIL
1112            self.instance.reason = f"{suite.failures}/{suite.tests} ctest scenario(s) failed"
1113        elif suite.errors and suite.errors > 0:
1114            self.status = TwisterStatus.ERROR
1115            self.instance.reason = 'Error during ctest execution'
1116        elif suite.skipped and suite.skipped > 0:
1117            self.status = TwisterStatus.SKIP
1118        else:
1119            self.status = TwisterStatus.PASS
1120        self.instance.execution_time = suite.time
1122        for case in suite:
1123            tc = self.instance.add_testcase(f"{self.id}.{case.name}")
1124            tc.duration = case.time
1125            if any(isinstance(r, junit.Failure) for r in case.result):
1126                tc.status = TwisterStatus.FAIL
1127                tc.output = case.system_out
1128            elif any(isinstance(r, junit.Error) for r in case.result):
1129                tc.status = TwisterStatus.ERROR
1130                tc.output = case.system_out
1131            elif any(isinstance(r, junit.Skipped) for r in case.result):
1132                tc.status = TwisterStatus.SKIP
1133            else:
1134                tc.status = TwisterStatus.PASS
1136class HarnessImporter:
1138    @staticmethod
1139    def get_harness(harness_name):
1140        thismodule = sys.modules[__name__]
1141        try:
1142            if harness_name:
1143                harness_class = getattr(thismodule, harness_name)
1144            else:
1145                harness_class = thismodule.Test
1146            return harness_class()
1147        except AttributeError as e:
1148            logger.debug(f"harness {harness_name} not implemented: {e}")
1149            return None