1# SPDX-License-Identifier: Apache-2.0
2from __future__ import annotations
3from asyncio.log import logger
4import platform
5import re
6import os
7import sys
8import subprocess
9import shlex
10from collections import OrderedDict
11import xml.etree.ElementTree as ET
12import logging
13import threading
14import time
15import shutil
16import json
17
18from twisterlib.error import ConfigurationError
19from twisterlib.environment import ZEPHYR_BASE, PYTEST_PLUGIN_INSTALLED
20from twisterlib.handlers import Handler, terminate_process, SUPPORTED_SIMS_IN_PYTEST
21from twisterlib.testinstance import TestInstance
22
23
24logger = logging.getLogger('twister')
25logger.setLevel(logging.DEBUG)
26
27_WINDOWS = platform.system() == 'Windows'
28
29
30result_re = re.compile(r".*(PASS|FAIL|SKIP) - (test_)?(\S*) in (\d*[.,]?\d*) seconds")
31class Harness:
32    GCOV_START = "GCOV_COVERAGE_DUMP_START"
33    GCOV_END = "GCOV_COVERAGE_DUMP_END"
34    FAULT = "ZEPHYR FATAL ERROR"
35    RUN_PASSED = "PROJECT EXECUTION SUCCESSFUL"
36    RUN_FAILED = "PROJECT EXECUTION FAILED"
37    run_id_pattern = r"RunID: (?P<run_id>.*)"
38
39
40    ztest_to_status = {
41        'PASS': 'passed',
42        'SKIP': 'skipped',
43        'BLOCK': 'blocked',
44        'FAIL': 'failed'
45        }
46
47    def __init__(self):
48        self.state = None
49        self.reason = None
50        self.type = None
51        self.regex = []
52        self.matches = OrderedDict()
53        self.ordered = True
54        self.id = None
55        self.fail_on_fault = True
56        self.fault = False
57        self.capture_coverage = False
58        self.next_pattern = 0
59        self.record = None
60        self.record_pattern = None
61        self.record_as_json = None
62        self.recording = []
63        self.ztest = False
64        self.detected_suite_names = []
65        self.run_id = None
66        self.matched_run_id = False
67        self.run_id_exists = False
68        self.instance: TestInstance | None = None
69        self.testcase_output = ""
70        self._match = False
71
72    def configure(self, instance):
73        self.instance = instance
74        config = instance.testsuite.harness_config
75        self.id = instance.testsuite.id
76        self.run_id = instance.run_id
77        if instance.testsuite.ignore_faults:
78            self.fail_on_fault = False
79
80        if config:
81            self.type = config.get('type', None)
82            self.regex = config.get('regex', [])
83            self.ordered = config.get('ordered', True)
84            self.record = config.get('record', {})
85            if self.record:
86                self.record_pattern = re.compile(self.record.get("regex", ""))
87                self.record_as_json = self.record.get("as_json")
88
89    def build(self):
90        pass
91
92    def get_testcase_name(self):
93        """
94        Get current TestCase name.
95        """
96        return self.id
97
98    def translate_record(self, record: dict) -> dict:
99        if self.record_as_json:
100            for k in self.record_as_json:
101                if not k in record:
102                    continue
103                try:
104                    record[k] = json.loads(record[k]) if record[k] else {}
105                except json.JSONDecodeError as parse_error:
106                    logger.warning(f"HARNESS:{self.__class__.__name__}: recording JSON failed:"
107                                   f" {parse_error} for '{k}':'{record[k]}'")
108                    # Don't set the Harness state to failed for recordings.
109                    record[k] = { 'ERROR': { 'msg': str(parse_error), 'doc': record[k] } }
110        return record
111
112    def parse_record(self, line) -> re.Match:
113        match = None
114        if self.record_pattern:
115            match = self.record_pattern.search(line)
116            if match:
117                rec = self.translate_record({ k:v.strip() for k,v in match.groupdict(default="").items() })
118                self.recording.append(rec)
119        return match
120    #
121
122    def process_test(self, line):
123
124        self.parse_record(line)
125
126        runid_match = re.search(self.run_id_pattern, line)
127        if runid_match:
128            run_id = runid_match.group("run_id")
129            self.run_id_exists = True
130            if run_id == str(self.run_id):
131                self.matched_run_id = True
132
133        if self.RUN_PASSED in line:
134            if self.fault:
135                self.state = "failed"
136                self.reason = "Fault detected while running test"
137            else:
138                self.state = "passed"
139
140        if self.RUN_FAILED in line:
141            self.state = "failed"
142            self.reason = "Testsuite failed"
143
144        if self.fail_on_fault:
145            if self.FAULT == line:
146                self.fault = True
147
148        if self.GCOV_START in line:
149            self.capture_coverage = True
150        elif self.GCOV_END in line:
151            self.capture_coverage = False
152
153class Robot(Harness):
154
155    is_robot_test = True
156
157    def configure(self, instance):
158        super(Robot, self).configure(instance)
159        self.instance = instance
160
161        config = instance.testsuite.harness_config
162        if config:
163            self.path = config.get('robot_testsuite', None)
164            self.option = config.get('robot_option', None)
165
166    def handle(self, line):
167        ''' Test cases that make use of this harness care about results given
168            by Robot Framework which is called in run_robot_test(), so works of this
169            handle is trying to give a PASS or FAIL to avoid timeout, nothing
170            is writen into handler.log
171        '''
172        self.instance.state = "passed"
173        tc = self.instance.get_case_or_create(self.id)
174        tc.status = "passed"
175
176    def run_robot_test(self, command, handler):
177        start_time = time.time()
178        env = os.environ.copy()
179
180        if self.option:
181            if isinstance(self.option, list):
182                for option in self.option:
183                    for v in str(option).split():
184                        command.append(f'{v}')
185            else:
186                for v in str(self.option).split():
187                    command.append(f'{v}')
188
189        if self.path is None:
190            raise PytestHarnessException(f'The parameter robot_testsuite is mandatory')
191
192        if isinstance(self.path, list):
193            for suite in self.path:
194                command.append(os.path.join(handler.sourcedir, suite))
195        else:
196            command.append(os.path.join(handler.sourcedir, self.path))
197
198        with subprocess.Popen(command, stdout=subprocess.PIPE,
199                stderr=subprocess.STDOUT, cwd=self.instance.build_dir, env=env) as renode_test_proc:
200            out, _ = renode_test_proc.communicate()
201
202            self.instance.execution_time = time.time() - start_time
203
204            if renode_test_proc.returncode == 0:
205                self.instance.status = "passed"
206                # all tests in one Robot file are treated as a single test case,
207                # so its status should be set accordingly to the instance status
208                # please note that there should be only one testcase in testcases list
209                self.instance.testcases[0].status = "passed"
210            else:
211                logger.error("Robot test failure: %s for %s" %
212                             (handler.sourcedir, self.instance.platform.name))
213                self.instance.status = "failed"
214                self.instance.testcases[0].status = "failed"
215
216            if out:
217                with open(os.path.join(self.instance.build_dir, handler.log), "wt") as log:
218                    log_msg = out.decode(sys.getdefaultencoding())
219                    log.write(log_msg)
220
221class Console(Harness):
222
223    def get_testcase_name(self):
224        '''
225        Get current TestCase name.
226
227        Console Harness id has only TestSuite id without TestCase name suffix.
228        Only the first TestCase name might be taken if available when a Ztest with
229        a single test case is configured to use this harness type for simplified
230        output parsing instead of the Ztest harness as Ztest suite should do.
231        '''
232        if self.instance and len(self.instance.testcases) == 1:
233            return self.instance.testcases[0].name
234        return super(Console, self).get_testcase_name()
235
236    def configure(self, instance):
237        super(Console, self).configure(instance)
238        if self.regex is None or len(self.regex) == 0:
239            self.state = "failed"
240            tc = self.instance.set_case_status_by_name(
241                self.get_testcase_name(),
242                "failed",
243                f"HARNESS:{self.__class__.__name__}:no regex patterns configured."
244            )
245            raise ConfigurationError(self.instance.name, tc.reason)
246        if self.type == "one_line":
247            self.pattern = re.compile(self.regex[0])
248            self.patterns_expected = 1
249        elif self.type == "multi_line":
250            self.patterns = []
251            for r in self.regex:
252                self.patterns.append(re.compile(r))
253            self.patterns_expected = len(self.patterns)
254        else:
255            self.state = "failed"
256            tc = self.instance.set_case_status_by_name(
257                self.get_testcase_name(),
258                "failed",
259                f"HARNESS:{self.__class__.__name__}:incorrect type={self.type}"
260            )
261            raise ConfigurationError(self.instance.name, tc.reason)
262        #
263
264    def handle(self, line):
265        if self.type == "one_line":
266            if self.pattern.search(line):
267                logger.debug(f"HARNESS:{self.__class__.__name__}:EXPECTED:"
268                             f"'{self.pattern.pattern}'")
269                self.next_pattern += 1
270                self.state = "passed"
271        elif self.type == "multi_line" and self.ordered:
272            if (self.next_pattern < len(self.patterns) and
273                self.patterns[self.next_pattern].search(line)):
274                logger.debug(f"HARNESS:{self.__class__.__name__}:EXPECTED("
275                             f"{self.next_pattern + 1}/{self.patterns_expected}):"
276                             f"'{self.patterns[self.next_pattern].pattern}'")
277                self.next_pattern += 1
278                if self.next_pattern >= len(self.patterns):
279                    self.state = "passed"
280        elif self.type == "multi_line" and not self.ordered:
281            for i, pattern in enumerate(self.patterns):
282                r = self.regex[i]
283                if pattern.search(line) and not r in self.matches:
284                    self.matches[r] = line
285                    logger.debug(f"HARNESS:{self.__class__.__name__}:EXPECTED("
286                                 f"{len(self.matches)}/{self.patterns_expected}):"
287                                 f"'{pattern.pattern}'")
288            if len(self.matches) == len(self.regex):
289                self.state = "passed"
290        else:
291            logger.error("Unknown harness_config type")
292
293        if self.fail_on_fault:
294            if self.FAULT in line:
295                self.fault = True
296
297        if self.GCOV_START in line:
298            self.capture_coverage = True
299        elif self.GCOV_END in line:
300            self.capture_coverage = False
301
302        self.process_test(line)
303        # Reset the resulting test state to 'failed' when not all of the patterns were
304        # found in the output, but just ztest's 'PROJECT EXECUTION SUCCESSFUL'.
305        # It might happen because of the pattern sequence diverged from the
306        # test code, the test platform has console issues, or even some other
307        # test image was executed.
308        # TODO: Introduce explicit match policy type to reject
309        # unexpected console output, allow missing patterns, deny duplicates.
310        if self.state == "passed" and self.ordered and self.next_pattern < self.patterns_expected:
311            logger.error(f"HARNESS:{self.__class__.__name__}: failed with"
312                         f" {self.next_pattern} of {self.patterns_expected}"
313                         f" expected ordered patterns.")
314            self.state = "failed"
315            self.reason = "patterns did not match (ordered)"
316        if self.state == "passed" and not self.ordered and len(self.matches) < self.patterns_expected:
317            logger.error(f"HARNESS:{self.__class__.__name__}: failed with"
318                         f" {len(self.matches)} of {self.patterns_expected}"
319                         f" expected unordered patterns.")
320            self.state = "failed"
321            self.reason = "patterns did not match (unordered)"
322
323        tc = self.instance.get_case_or_create(self.get_testcase_name())
324        if self.state == "passed":
325            tc.status = "passed"
326        else:
327            tc.status = "failed"
328
329
330class PytestHarnessException(Exception):
331    """General exception for pytest."""
332
333
334class Pytest(Harness):
335
336    def configure(self, instance: TestInstance):
337        super(Pytest, self).configure(instance)
338        self.running_dir = instance.build_dir
339        self.source_dir = instance.testsuite.source_dir
340        self.report_file = os.path.join(self.running_dir, 'report.xml')
341        self.pytest_log_file_path = os.path.join(self.running_dir, 'twister_harness.log')
342        self.reserved_serial = None
343
344    def pytest_run(self, timeout):
345        try:
346            cmd = self.generate_command()
347            self.run_command(cmd, timeout)
348        except PytestHarnessException as pytest_exception:
349            logger.error(str(pytest_exception))
350            self.state = 'failed'
351            self.instance.reason = str(pytest_exception)
352        finally:
353            if self.reserved_serial:
354                self.instance.handler.make_device_available(self.reserved_serial)
355        self.instance.record(self.recording)
356        self._update_test_status()
357
358    def generate_command(self):
359        config = self.instance.testsuite.harness_config
360        handler: Handler = self.instance.handler
361        pytest_root = config.get('pytest_root', ['pytest']) if config else ['pytest']
362        pytest_args_yaml = config.get('pytest_args', []) if config else []
363        pytest_dut_scope = config.get('pytest_dut_scope', None) if config else None
364        command = [
365            'pytest',
366            '--twister-harness',
367            '-s', '-v',
368            f'--build-dir={self.running_dir}',
369            f'--junit-xml={self.report_file}',
370            '--log-file-level=DEBUG',
371            '--log-file-format=%(asctime)s.%(msecs)d:%(levelname)s:%(name)s: %(message)s',
372            f'--log-file={self.pytest_log_file_path}'
373        ]
374        command.extend([os.path.normpath(os.path.join(
375            self.source_dir, os.path.expanduser(os.path.expandvars(src)))) for src in pytest_root])
376
377        if pytest_dut_scope:
378            command.append(f'--dut-scope={pytest_dut_scope}')
379
380        # Always pass output from the pytest test and the test image up to Twister log.
381        command.extend([
382            '--log-cli-level=DEBUG',
383            '--log-cli-format=%(levelname)s: %(message)s'
384        ])
385
386        # Use the test timeout as the base timeout for pytest
387        base_timeout = handler.get_test_timeout()
388        command.append(f'--base-timeout={base_timeout}')
389
390        if handler.type_str == 'device':
391            command.extend(
392                self._generate_parameters_for_hardware(handler)
393            )
394        elif handler.type_str in SUPPORTED_SIMS_IN_PYTEST:
395            command.append(f'--device-type={handler.type_str}')
396        elif handler.type_str == 'build':
397            command.append('--device-type=custom')
398        else:
399            raise PytestHarnessException(f'Support for handler {handler.type_str} not implemented yet')
400
401        if handler.type_str != 'device':
402            for fixture in handler.options.fixture:
403                command.append(f'--twister-fixture={fixture}')
404
405        if handler.options.pytest_args:
406            command.extend(handler.options.pytest_args)
407            if pytest_args_yaml:
408                logger.warning(f'The pytest_args ({handler.options.pytest_args}) specified '
409                               'in the command line will override the pytest_args defined '
410                               f'in the YAML file {pytest_args_yaml}')
411        else:
412            command.extend(pytest_args_yaml)
413
414        return command
415
416    def _generate_parameters_for_hardware(self, handler: Handler):
417        command = ['--device-type=hardware']
418        hardware = handler.get_hardware()
419        if not hardware:
420            raise PytestHarnessException('Hardware is not available')
421        # update the instance with the device id to have it in the summary report
422        self.instance.dut = hardware.id
423
424        self.reserved_serial = hardware.serial_pty or hardware.serial
425        if hardware.serial_pty:
426            command.append(f'--device-serial-pty={hardware.serial_pty}')
427        else:
428            command.extend([
429                f'--device-serial={hardware.serial}',
430                f'--device-serial-baud={hardware.baud}'
431            ])
432
433        options = handler.options
434        if runner := hardware.runner or options.west_runner:
435            command.append(f'--runner={runner}')
436
437        if hardware.runner_params:
438            for param in hardware.runner_params:
439                command.append(f'--runner-params={param}')
440
441        if options.west_flash and options.west_flash != []:
442            command.append(f'--west-flash-extra-args={options.west_flash}')
443
444        if board_id := hardware.probe_id or hardware.id:
445            command.append(f'--device-id={board_id}')
446
447        if hardware.product:
448            command.append(f'--device-product={hardware.product}')
449
450        if hardware.pre_script:
451            command.append(f'--pre-script={hardware.pre_script}')
452
453        if hardware.post_flash_script:
454            command.append(f'--post-flash-script={hardware.post_flash_script}')
455
456        if hardware.post_script:
457            command.append(f'--post-script={hardware.post_script}')
458
459        if hardware.flash_before:
460            command.append(f'--flash-before={hardware.flash_before}')
461
462        for fixture in hardware.fixtures:
463            command.append(f'--twister-fixture={fixture}')
464
465        return command
466
467    def run_command(self, cmd, timeout):
468        cmd, env = self._update_command_with_env_dependencies(cmd)
469        with subprocess.Popen(
470            cmd,
471            stdout=subprocess.PIPE,
472            stderr=subprocess.STDOUT,
473            env=env
474        ) as proc:
475            try:
476                reader_t = threading.Thread(target=self._output_reader, args=(proc, self), daemon=True)
477                reader_t.start()
478                reader_t.join(timeout)
479                if reader_t.is_alive():
480                    terminate_process(proc)
481                    logger.warning('Timeout has occurred. Can be extended in testspec file. '
482                                   f'Currently set to {timeout} seconds.')
483                    self.instance.reason = 'Pytest timeout'
484                    self.state = 'failed'
485                proc.wait(timeout)
486            except subprocess.TimeoutExpired:
487                self.state = 'failed'
488                proc.kill()
489
490    @staticmethod
491    def _update_command_with_env_dependencies(cmd):
492        '''
493        If python plugin wasn't installed by pip, then try to indicate it to
494        pytest by update PYTHONPATH and append -p argument to pytest command.
495        '''
496        env = os.environ.copy()
497        if not PYTEST_PLUGIN_INSTALLED:
498            cmd.extend(['-p', 'twister_harness.plugin'])
499            pytest_plugin_path = os.path.join(ZEPHYR_BASE, 'scripts', 'pylib', 'pytest-twister-harness', 'src')
500            env['PYTHONPATH'] = pytest_plugin_path + os.pathsep + env.get('PYTHONPATH', '')
501            if _WINDOWS:
502                cmd_append_python_path = f'set PYTHONPATH={pytest_plugin_path};%PYTHONPATH% && '
503            else:
504                cmd_append_python_path = f'export PYTHONPATH={pytest_plugin_path}:${{PYTHONPATH}} && '
505        else:
506            cmd_append_python_path = ''
507        cmd_to_print = cmd_append_python_path + shlex.join(cmd)
508        logger.debug('Running pytest command: %s', cmd_to_print)
509
510        return cmd, env
511
512    @staticmethod
513    def _output_reader(proc, harness):
514        while proc.stdout.readable() and proc.poll() is None:
515            line = proc.stdout.readline().decode().strip()
516            if not line:
517                continue
518            logger.debug('PYTEST: %s', line)
519            harness.parse_record(line)
520        proc.communicate()
521
522    def _update_test_status(self):
523        if not self.state:
524            self.instance.testcases = []
525            try:
526                self._parse_report_file(self.report_file)
527            except Exception as e:
528                logger.error(f'Error when parsing file {self.report_file}: {e}')
529                self.state = 'failed'
530            finally:
531                if not self.instance.testcases:
532                    self.instance.init_cases()
533
534        self.instance.status = self.state or 'failed'
535        if self.instance.status in ['error', 'failed']:
536            self.instance.reason = self.instance.reason or 'Pytest failed'
537            self.instance.add_missing_case_status('blocked', self.instance.reason)
538
539    def _parse_report_file(self, report):
540        tree = ET.parse(report)
541        root = tree.getroot()
542        if elem_ts := root.find('testsuite'):
543            if elem_ts.get('failures') != '0':
544                self.state = 'failed'
545                self.instance.reason = f"{elem_ts.get('failures')}/{elem_ts.get('tests')} pytest scenario(s) failed"
546            elif elem_ts.get('errors') != '0':
547                self.state = 'error'
548                self.instance.reason = 'Error during pytest execution'
549            elif elem_ts.get('skipped') == elem_ts.get('tests'):
550                self.state = 'skipped'
551            else:
552                self.state = 'passed'
553            self.instance.execution_time = float(elem_ts.get('time'))
554
555            for elem_tc in elem_ts.findall('testcase'):
556                tc = self.instance.add_testcase(f"{self.id}.{elem_tc.get('name')}")
557                tc.duration = float(elem_tc.get('time'))
558                elem = elem_tc.find('*')
559                if elem is None:
560                    tc.status = 'passed'
561                else:
562                    if elem.tag == 'skipped':
563                        tc.status = 'skipped'
564                    elif elem.tag == 'failure':
565                        tc.status = 'failed'
566                    else:
567                        tc.status = 'error'
568                    tc.reason = elem.get('message')
569                    tc.output = elem.text
570        else:
571            self.state = 'skipped'
572            self.instance.reason = 'No tests collected'
573
574
575class Gtest(Harness):
576    ANSI_ESCAPE = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
577    TEST_START_PATTERN = r".*\[ RUN      \] (?P<suite_name>[a-zA-Z_][a-zA-Z0-9_]*)\.(?P<test_name>[a-zA-Z_][a-zA-Z0-9_]*)"
578    TEST_PASS_PATTERN = r".*\[       OK \] (?P<suite_name>[a-zA-Z_][a-zA-Z0-9_]*)\.(?P<test_name>[a-zA-Z_][a-zA-Z0-9_]*)"
579    TEST_SKIP_PATTERN = r".*\[ DISABLED \] (?P<suite_name>[a-zA-Z_][a-zA-Z0-9_]*)\.(?P<test_name>[a-zA-Z_][a-zA-Z0-9_]*)"
580    TEST_FAIL_PATTERN = r".*\[  FAILED  \] (?P<suite_name>[a-zA-Z_][a-zA-Z0-9_]*)\.(?P<test_name>[a-zA-Z_][a-zA-Z0-9_]*)"
581    FINISHED_PATTERN = r".*\[==========\] Done running all tests\."
582
583    def __init__(self):
584        super().__init__()
585        self.tc = None
586        self.has_failures = False
587
588    def handle(self, line):
589        # Strip the ANSI characters, they mess up the patterns
590        non_ansi_line = self.ANSI_ESCAPE.sub('', line)
591
592        if self.state:
593            return
594
595        # Check if we started running a new test
596        test_start_match = re.search(self.TEST_START_PATTERN, non_ansi_line)
597        if test_start_match:
598            # Add the suite name
599            suite_name = test_start_match.group("suite_name")
600            if suite_name not in self.detected_suite_names:
601                self.detected_suite_names.append(suite_name)
602
603            # Generate the internal name of the test
604            name = "{}.{}.{}".format(self.id, suite_name, test_start_match.group("test_name"))
605
606            # Assert that we don't already have a running test
607            assert (
608                self.tc is None
609            ), "gTest error, {} didn't finish".format(self.tc)
610
611            # Check that the instance doesn't exist yet (prevents re-running)
612            tc = self.instance.get_case_by_name(name)
613            assert tc is None, "gTest error, {} running twice".format(tc)
614
615            # Create the test instance and set the context
616            tc = self.instance.get_case_or_create(name)
617            self.tc = tc
618            self.tc.status = "started"
619            self.testcase_output += line + "\n"
620            self._match = True
621
622        # Check if the test run finished
623        finished_match = re.search(self.FINISHED_PATTERN, non_ansi_line)
624        if finished_match:
625            tc = self.instance.get_case_or_create(self.id)
626            if self.has_failures or self.tc is not None:
627                self.state = "failed"
628                tc.status = "failed"
629            else:
630                self.state = "passed"
631                tc.status = "passed"
632            return
633
634        # Check if the individual test finished
635        state, name = self._check_result(non_ansi_line)
636        if state is None or name is None:
637            # Nothing finished, keep processing lines
638            return
639
640        # Get the matching test and make sure it's the same as the current context
641        tc = self.instance.get_case_by_name(name)
642        assert (
643            tc is not None and tc == self.tc
644        ), "gTest error, mismatched tests. Expected {} but got {}".format(self.tc, tc)
645
646        # Test finished, clear the context
647        self.tc = None
648
649        # Update the status of the test
650        tc.status = state
651        if tc.status == "failed":
652            self.has_failures = True
653            tc.output = self.testcase_output
654        self.testcase_output = ""
655        self._match = False
656
657    def _check_result(self, line):
658        test_pass_match = re.search(self.TEST_PASS_PATTERN, line)
659        if test_pass_match:
660            return "passed", "{}.{}.{}".format(self.id, test_pass_match.group("suite_name"), test_pass_match.group("test_name"))
661        test_skip_match = re.search(self.TEST_SKIP_PATTERN, line)
662        if test_skip_match:
663            return "skipped", "{}.{}.{}".format(self.id, test_skip_match.group("suite_name"), test_skip_match.group("test_name"))
664        test_fail_match = re.search(self.TEST_FAIL_PATTERN, line)
665        if test_fail_match:
666            return "failed", "{}.{}.{}".format(self.id, test_fail_match.group("suite_name"), test_fail_match.group("test_name"))
667        return None, None
668
669
670class Test(Harness):
671    RUN_PASSED = "PROJECT EXECUTION SUCCESSFUL"
672    RUN_FAILED = "PROJECT EXECUTION FAILED"
673    test_suite_start_pattern = r"Running TESTSUITE (?P<suite_name>.*)"
674    ZTEST_START_PATTERN = r"START - (test_)?([a-zA-Z0-9_-]+)"
675
676    def handle(self, line):
677        test_suite_match = re.search(self.test_suite_start_pattern, line)
678        if test_suite_match:
679            suite_name = test_suite_match.group("suite_name")
680            self.detected_suite_names.append(suite_name)
681
682        testcase_match = re.search(self.ZTEST_START_PATTERN, line)
683        if testcase_match:
684            name = "{}.{}".format(self.id, testcase_match.group(2))
685            tc = self.instance.get_case_or_create(name)
686            # Mark the test as started, if something happens here, it is mostly
687            # due to this tests, for example timeout. This should in this case
688            # be marked as failed and not blocked (not run).
689            tc.status = "started"
690
691        if testcase_match or self._match:
692            self.testcase_output += line + "\n"
693            self._match = True
694
695        result_match = result_re.match(line)
696        # some testcases are skipped based on predicates and do not show up
697        # during test execution, however they are listed in the summary. Parse
698        # the summary for status and use that status instead.
699
700        summary_re = re.compile(r"- (PASS|FAIL|SKIP) - \[([^\.]*).(test_)?(\S*)\] duration = (\d*[.,]?\d*) seconds")
701        summary_match = summary_re.match(line)
702
703        if result_match:
704            matched_status = result_match.group(1)
705            name = "{}.{}".format(self.id, result_match.group(3))
706            tc = self.instance.get_case_or_create(name)
707            tc.status = self.ztest_to_status[matched_status]
708            if tc.status == "skipped":
709                tc.reason = "ztest skip"
710            tc.duration = float(result_match.group(4))
711            if tc.status == "failed":
712                tc.output = self.testcase_output
713            self.testcase_output = ""
714            self._match = False
715            self.ztest = True
716        elif summary_match:
717            matched_status = summary_match.group(1)
718            self.detected_suite_names.append(summary_match.group(2))
719            name = "{}.{}".format(self.id, summary_match.group(4))
720            tc = self.instance.get_case_or_create(name)
721            tc.status = self.ztest_to_status[matched_status]
722            if tc.status == "skipped":
723                tc.reason = "ztest skip"
724            tc.duration = float(summary_match.group(5))
725            if tc.status == "failed":
726                tc.output = self.testcase_output
727            self.testcase_output = ""
728            self._match = False
729            self.ztest = True
730
731        self.process_test(line)
732
733        if not self.ztest and self.state:
734            logger.debug(f"not a ztest and no state for {self.id}")
735            tc = self.instance.get_case_or_create(self.id)
736            if self.state == "passed":
737                tc.status = "passed"
738            else:
739                tc.status = "failed"
740                tc.reason = "Test failure"
741
742
743class Ztest(Test):
744    pass
745
746
747class Bsim(Harness):
748
749    def build(self):
750        """
751        Copying the application executable to BabbleSim's bin directory enables
752        running multidevice bsim tests after twister has built them.
753        """
754
755        if self.instance is None:
756            return
757
758        original_exe_path: str = os.path.join(self.instance.build_dir, 'zephyr', 'zephyr.exe')
759        if not os.path.exists(original_exe_path):
760            logger.warning('Cannot copy bsim exe - cannot find original executable.')
761            return
762
763        bsim_out_path: str = os.getenv('BSIM_OUT_PATH', '')
764        if not bsim_out_path:
765            logger.warning('Cannot copy bsim exe - BSIM_OUT_PATH not provided.')
766            return
767
768        new_exe_name: str = self.instance.testsuite.harness_config.get('bsim_exe_name', '')
769        if new_exe_name:
770            new_exe_name = f'bs_{self.instance.platform.name}_{new_exe_name}'
771        else:
772            new_exe_name = self.instance.name
773            new_exe_name = f'bs_{new_exe_name}'
774
775        new_exe_name = new_exe_name.replace(os.path.sep, '_').replace('.', '_').replace('@', '_')
776
777        new_exe_path: str = os.path.join(bsim_out_path, 'bin', new_exe_name)
778        logger.debug(f'Copying executable from {original_exe_path} to {new_exe_path}')
779        shutil.copy(original_exe_path, new_exe_path)
780
781
782class HarnessImporter:
783
784    @staticmethod
785    def get_harness(harness_name):
786        thismodule = sys.modules[__name__]
787        try:
788            if harness_name:
789                harness_class = getattr(thismodule, harness_name)
790            else:
791                harness_class = getattr(thismodule, 'Test')
792            return harness_class()
793        except AttributeError as e:
794            logger.debug(f"harness {harness_name} not implemented: {e}")
795            return None
796