1# SPDX-License-Identifier: Apache-2.0
2from __future__ import annotations
3
4import json
5import logging
6import os
7import platform
8import re
9import shlex
10import shutil
11import subprocess
12import sys
13import threading
14import time
15import xml.etree.ElementTree as ET
16from collections import OrderedDict
17from enum import Enum
18
19from pytest import ExitCode
20from twisterlib.constants import SUPPORTED_SIMS_IN_PYTEST
21from twisterlib.environment import PYTEST_PLUGIN_INSTALLED, ZEPHYR_BASE
22from twisterlib.error import ConfigurationError, StatusAttributeError
23from twisterlib.handlers import Handler, terminate_process
24from twisterlib.reports import ReportStatus
25from twisterlib.statuses import TwisterStatus
26from twisterlib.testinstance import TestInstance
27
28logger = logging.getLogger('twister')
29logger.setLevel(logging.DEBUG)
30
31_WINDOWS = platform.system() == 'Windows'
32
33
34class Harness:
35    GCOV_START = "GCOV_COVERAGE_DUMP_START"
36    GCOV_END = "GCOV_COVERAGE_DUMP_END"
37    FAULT = "ZEPHYR FATAL ERROR"
38    RUN_PASSED = "PROJECT EXECUTION SUCCESSFUL"
39    RUN_FAILED = "PROJECT EXECUTION FAILED"
40    run_id_pattern = r"RunID: (?P<run_id>.*)"
41
42    def __init__(self):
43        self._status = TwisterStatus.NONE
44        self.reason = None
45        self.type = None
46        self.regex = []
47        self.matches = OrderedDict()
48        self.ordered = True
49        self.id = None
50        self.fail_on_fault = True
51        self.fault = False
52        self.capture_coverage = False
53        self.next_pattern = 0
54        self.record = None
55        self.record_pattern = None
56        self.record_as_json = None
57        self.recording = []
58        self.ztest = False
59        self.detected_suite_names = []
60        self.run_id = None
61        self.started_suites = {}
62        self.started_cases = {}
63        self.matched_run_id = False
64        self.run_id_exists = False
65        self.instance: TestInstance | None = None
66        self.testcase_output = ""
67        self._match = False
68
69
70    @property
71    def trace(self) -> bool:
72        return self.instance.handler.options.verbose > 2
73
74    @property
75    def status(self) -> TwisterStatus:
76        return self._status
77
78    @status.setter
79    def status(self, value : TwisterStatus) -> None:
80        # Check for illegal assignments by value
81        try:
82            key = value.name if isinstance(value, Enum) else value
83            self._status = TwisterStatus[key]
84        except KeyError as err:
85            raise StatusAttributeError(self.__class__, value) from err
86
87    def configure(self, instance):
88        self.instance = instance
89        config = instance.testsuite.harness_config
90        self.id = instance.testsuite.id
91        self.run_id = instance.run_id
92        if instance.testsuite.ignore_faults:
93            self.fail_on_fault = False
94
95        if config:
96            self.type = config.get('type', None)
97            self.regex = config.get('regex', [])
98            self.ordered = config.get('ordered', True)
99            self.record = config.get('record', {})
100            if self.record:
101                self.record_pattern = re.compile(self.record.get("regex", ""))
102                self.record_as_json = self.record.get("as_json")
103
104    def build(self):
105        pass
106
107    def get_testcase_name(self):
108        """
109        Get current TestCase name.
110        """
111        return self.id
112
113    def translate_record(self, record: dict) -> dict:
114        if self.record_as_json:
115            for k in self.record_as_json:
116                if k not in record:
117                    continue
118                try:
119                    record[k] = json.loads(record[k]) if record[k] else {}
120                except json.JSONDecodeError as parse_error:
121                    logger.warning(f"HARNESS:{self.__class__.__name__}: recording JSON failed:"
122                                   f" {parse_error} for '{k}':'{record[k]}'")
123                    # Don't set the Harness state to failed for recordings.
124                    record[k] = { 'ERROR': { 'msg': str(parse_error), 'doc': record[k] } }
125        return record
126
127    def parse_record(self, line) -> re.Match:
128        match = None
129        if self.record_pattern:
130            match = self.record_pattern.search(line)
131            if match:
132                rec = self.translate_record(
133                    { k:v.strip() for k,v in match.groupdict(default="").items() }
134                )
135                self.recording.append(rec)
136        return match
137    #
138
139    def process_test(self, line):
140
141        self.parse_record(line)
142
143        runid_match = re.search(self.run_id_pattern, line)
144        if runid_match:
145            run_id = runid_match.group("run_id")
146            self.run_id_exists = True
147            if run_id == str(self.run_id):
148                self.matched_run_id = True
149
150        if self.RUN_PASSED in line:
151            if self.fault:
152                self.status = TwisterStatus.FAIL
153                self.reason = "Fault detected while running test"
154            else:
155                self.status = TwisterStatus.PASS
156
157        if self.RUN_FAILED in line:
158            self.status = TwisterStatus.FAIL
159            self.reason = "Testsuite failed"
160
161        if self.fail_on_fault and line == self.FAULT:
162            self.fault = True
163
164        if self.GCOV_START in line:
165            self.capture_coverage = True
166        elif self.GCOV_END in line:
167            self.capture_coverage = False
168
169class Robot(Harness):
170
171    is_robot_test = True
172
173    def configure(self, instance):
174        super().configure(instance)
175        self.instance = instance
176
177        config = instance.testsuite.harness_config
178        if config:
179            self.path = config.get('robot_testsuite', None)
180            self.option = config.get('robot_option', None)
181
182    def handle(self, line):
183        ''' Test cases that make use of this harness care about results given
184            by Robot Framework which is called in run_robot_test(), so works of this
185            handle is trying to give a PASS or FAIL to avoid timeout, nothing
186            is writen into handler.log
187        '''
188        self.instance.status = TwisterStatus.PASS
189        tc = self.instance.get_case_or_create(self.id)
190        tc.status = TwisterStatus.PASS
191
192    def run_robot_test(self, command, handler):
193        start_time = time.time()
194        env = os.environ.copy()
195
196        if self.option:
197            if isinstance(self.option, list):
198                for option in self.option:
199                    for v in str(option).split():
200                        command.append(f'{v}')
201            else:
202                for v in str(self.option).split():
203                    command.append(f'{v}')
204
205        if self.path is None:
206            raise PytestHarnessException('The parameter robot_testsuite is mandatory')
207
208        if isinstance(self.path, list):
209            for suite in self.path:
210                command.append(os.path.join(handler.sourcedir, suite))
211        else:
212            command.append(os.path.join(handler.sourcedir, self.path))
213
214        with subprocess.Popen(command, stdout=subprocess.PIPE,
215                stderr=subprocess.STDOUT, cwd=self.instance.build_dir, env=env) as renode_test_proc:
216            out, _ = renode_test_proc.communicate()
217
218            self.instance.execution_time = time.time() - start_time
219
220            if renode_test_proc.returncode == 0:
221                self.instance.status = TwisterStatus.PASS
222                # all tests in one Robot file are treated as a single test case,
223                # so its status should be set accordingly to the instance status
224                # please note that there should be only one testcase in testcases list
225                self.instance.testcases[0].status = TwisterStatus.PASS
226            else:
227                logger.error(
228                    f"Robot test failure: {handler.sourcedir} for {self.instance.platform.name}"
229                )
230                self.instance.status = TwisterStatus.FAIL
231                self.instance.testcases[0].status = TwisterStatus.FAIL
232
233            if out:
234                with open(os.path.join(self.instance.build_dir, handler.log), 'w') as log:
235                    log_msg = out.decode(sys.getdefaultencoding())
236                    log.write(log_msg)
237
238class Console(Harness):
239
240    def get_testcase_name(self):
241        '''
242        Get current TestCase name.
243
244        Console Harness id has only TestSuite id without TestCase name suffix.
245        Only the first TestCase name might be taken if available when a Ztest with
246        a single test case is configured to use this harness type for simplified
247        output parsing instead of the Ztest harness as Ztest suite should do.
248        '''
249        if self.instance and len(self.instance.testcases) == 1:
250            return self.instance.testcases[0].name
251        return super().get_testcase_name()
252
253    def configure(self, instance):
254        super().configure(instance)
255        if self.regex is None or len(self.regex) == 0:
256            self.status = TwisterStatus.FAIL
257            tc = self.instance.set_case_status_by_name(
258                self.get_testcase_name(),
259                TwisterStatus.FAIL,
260                f"HARNESS:{self.__class__.__name__}:no regex patterns configured."
261            )
262            raise ConfigurationError(self.instance.name, tc.reason)
263        if self.type == "one_line":
264            self.pattern = re.compile(self.regex[0])
265            self.patterns_expected = 1
266        elif self.type == "multi_line":
267            self.patterns = []
268            for r in self.regex:
269                self.patterns.append(re.compile(r))
270            self.patterns_expected = len(self.patterns)
271        else:
272            self.status = TwisterStatus.FAIL
273            tc = self.instance.set_case_status_by_name(
274                self.get_testcase_name(),
275                TwisterStatus.FAIL,
276                f"HARNESS:{self.__class__.__name__}:incorrect type={self.type}"
277            )
278            raise ConfigurationError(self.instance.name, tc.reason)
279        #
280
281    def handle(self, line):
282        if self.type == "one_line":
283            if self.pattern.search(line):
284                logger.debug(f"HARNESS:{self.__class__.__name__}:EXPECTED:"
285                             f"'{self.pattern.pattern}'")
286                self.next_pattern += 1
287                self.status = TwisterStatus.PASS
288        elif self.type == "multi_line" and self.ordered:
289            if (self.next_pattern < len(self.patterns) and
290                self.patterns[self.next_pattern].search(line)):
291                logger.debug(f"HARNESS:{self.__class__.__name__}:EXPECTED("
292                             f"{self.next_pattern + 1}/{self.patterns_expected}):"
293                             f"'{self.patterns[self.next_pattern].pattern}'")
294                self.next_pattern += 1
295                if self.next_pattern >= len(self.patterns):
296                    self.status = TwisterStatus.PASS
297        elif self.type == "multi_line" and not self.ordered:
298            for i, pattern in enumerate(self.patterns):
299                r = self.regex[i]
300                if pattern.search(line) and r not in self.matches:
301                    self.matches[r] = line
302                    logger.debug(f"HARNESS:{self.__class__.__name__}:EXPECTED("
303                                 f"{len(self.matches)}/{self.patterns_expected}):"
304                                 f"'{pattern.pattern}'")
305            if len(self.matches) == len(self.regex):
306                self.status = TwisterStatus.PASS
307        else:
308            logger.error("Unknown harness_config type")
309
310        if self.fail_on_fault and self.FAULT in line:
311            self.fault = True
312
313        if self.GCOV_START in line:
314            self.capture_coverage = True
315        elif self.GCOV_END in line:
316            self.capture_coverage = False
317
318        self.process_test(line)
319        # Reset the resulting test state to FAIL when not all of the patterns were
320        # found in the output, but just ztest's 'PROJECT EXECUTION SUCCESSFUL'.
321        # It might happen because of the pattern sequence diverged from the
322        # test code, the test platform has console issues, or even some other
323        # test image was executed.
324        # TODO: Introduce explicit match policy type to reject
325        # unexpected console output, allow missing patterns, deny duplicates.
326        if self.status == TwisterStatus.PASS and \
327           self.ordered and \
328           self.next_pattern < self.patterns_expected:
329            logger.error(f"HARNESS:{self.__class__.__name__}: failed with"
330                         f" {self.next_pattern} of {self.patterns_expected}"
331                         f" expected ordered patterns.")
332            self.status = TwisterStatus.FAIL
333            self.reason = "patterns did not match (ordered)"
334        if self.status == TwisterStatus.PASS and \
335           not self.ordered and \
336           len(self.matches) < self.patterns_expected:
337            logger.error(f"HARNESS:{self.__class__.__name__}: failed with"
338                         f" {len(self.matches)} of {self.patterns_expected}"
339                         f" expected unordered patterns.")
340            self.status = TwisterStatus.FAIL
341            self.reason = "patterns did not match (unordered)"
342
343        tc = self.instance.get_case_or_create(self.get_testcase_name())
344        if self.status == TwisterStatus.PASS:
345            tc.status = TwisterStatus.PASS
346        else:
347            tc.status = TwisterStatus.FAIL
348
349
350class PytestHarnessException(Exception):
351    """General exception for pytest."""
352
353
354class Pytest(Harness):
355
356    def configure(self, instance: TestInstance):
357        super().configure(instance)
358        self.running_dir = instance.build_dir
359        self.source_dir = instance.testsuite.source_dir
360        self.report_file = os.path.join(self.running_dir, 'report.xml')
361        self.pytest_log_file_path = os.path.join(self.running_dir, 'twister_harness.log')
362        self.reserved_dut = None
363        self._output = []
364
365    def pytest_run(self, timeout):
366        try:
367            cmd = self.generate_command()
368            self.run_command(cmd, timeout)
369        except PytestHarnessException as pytest_exception:
370            logger.error(str(pytest_exception))
371            self.status = TwisterStatus.FAIL
372            self.instance.reason = str(pytest_exception)
373        finally:
374            self.instance.record(self.recording)
375            self._update_test_status()
376            if self.reserved_dut:
377                self.instance.handler.make_dut_available(self.reserved_dut)
378
379    def generate_command(self):
380        config = self.instance.testsuite.harness_config
381        handler: Handler = self.instance.handler
382        pytest_root = config.get('pytest_root', ['pytest']) if config else ['pytest']
383        pytest_args_yaml = config.get('pytest_args', []) if config else []
384        pytest_dut_scope = config.get('pytest_dut_scope', None) if config else None
385        command = [
386            'pytest',
387            '--twister-harness',
388            '-s', '-v',
389            f'--build-dir={self.running_dir}',
390            f'--junit-xml={self.report_file}',
391            '--log-file-level=DEBUG',
392            '--log-file-format=%(asctime)s.%(msecs)03d:%(levelname)s:%(name)s: %(message)s',
393            f'--log-file={self.pytest_log_file_path}',
394            f'--platform={self.instance.platform.name}'
395        ]
396        command.extend([os.path.normpath(os.path.join(
397            self.source_dir, os.path.expanduser(os.path.expandvars(src)))) for src in pytest_root])
398
399        if pytest_dut_scope:
400            command.append(f'--dut-scope={pytest_dut_scope}')
401
402        # Always pass output from the pytest test and the test image up to Twister log.
403        command.extend([
404            '--log-cli-level=DEBUG',
405            '--log-cli-format=%(levelname)s: %(message)s'
406        ])
407
408        # Use the test timeout as the base timeout for pytest
409        base_timeout = handler.get_test_timeout()
410        command.append(f'--base-timeout={base_timeout}')
411
412        if handler.type_str == 'device':
413            command.extend(
414                self._generate_parameters_for_hardware(handler)
415            )
416        elif handler.type_str in SUPPORTED_SIMS_IN_PYTEST:
417            command.append(f'--device-type={handler.type_str}')
418        elif handler.type_str == 'build':
419            command.append('--device-type=custom')
420        else:
421            raise PytestHarnessException(
422                f'Support for handler {handler.type_str} not implemented yet'
423            )
424
425        if handler.type_str != 'device':
426            for fixture in handler.options.fixture:
427                command.append(f'--twister-fixture={fixture}')
428
429        if handler.options.extra_test_args and handler.type_str == 'native':
430            command.append(f'--extra-test-args={shlex.join(handler.options.extra_test_args)}')
431
432        command.extend(pytest_args_yaml)
433
434        if handler.options.pytest_args:
435            command.extend(handler.options.pytest_args)
436
437        return command
438
439    def _generate_parameters_for_hardware(self, handler: Handler):
440        command = ['--device-type=hardware']
441        hardware = handler.get_hardware()
442        if not hardware:
443            raise PytestHarnessException('Hardware is not available')
444        # update the instance with the device id to have it in the summary report
445        self.instance.dut = hardware.id
446
447        self.reserved_dut = hardware
448        if hardware.serial_pty:
449            command.append(f'--device-serial-pty={hardware.serial_pty}')
450        else:
451            command.extend([
452                f'--device-serial={hardware.serial}',
453                f'--device-serial-baud={hardware.baud}'
454            ])
455
456        if hardware.flash_timeout:
457            command.append(f'--flash-timeout={hardware.flash_timeout}')
458
459        options = handler.options
460        if runner := hardware.runner or options.west_runner:
461            command.append(f'--runner={runner}')
462
463        if hardware.runner_params:
464            for param in hardware.runner_params:
465                command.append(f'--runner-params={param}')
466
467        if options.west_flash and options.west_flash != []:
468            command.append(f'--west-flash-extra-args={options.west_flash}')
469
470        if board_id := hardware.probe_id or hardware.id:
471            command.append(f'--device-id={board_id}')
472
473        if hardware.product:
474            command.append(f'--device-product={hardware.product}')
475
476        if hardware.pre_script:
477            command.append(f'--pre-script={hardware.pre_script}')
478
479        if hardware.post_flash_script:
480            command.append(f'--post-flash-script={hardware.post_flash_script}')
481
482        if hardware.post_script:
483            command.append(f'--post-script={hardware.post_script}')
484
485        if hardware.flash_before:
486            command.append(f'--flash-before={hardware.flash_before}')
487
488        for fixture in hardware.fixtures:
489            command.append(f'--twister-fixture={fixture}')
490
491        return command
492
493    def run_command(self, cmd, timeout):
494        cmd, env = self._update_command_with_env_dependencies(cmd)
495        with subprocess.Popen(
496            cmd,
497            stdout=subprocess.PIPE,
498            stderr=subprocess.STDOUT,
499            env=env
500        ) as proc:
501            try:
502                reader_t = threading.Thread(target=self._output_reader, args=(proc,), daemon=True)
503                reader_t.start()
504                reader_t.join(timeout)
505                if reader_t.is_alive():
506                    terminate_process(proc)
507                    logger.warning('Timeout has occurred. Can be extended in testspec file. '
508                                   f'Currently set to {timeout} seconds.')
509                    self.instance.reason = 'Pytest timeout'
510                    self.status = TwisterStatus.FAIL
511                proc.wait(timeout)
512            except subprocess.TimeoutExpired:
513                self.status = TwisterStatus.FAIL
514                proc.kill()
515
516        if proc.returncode in (ExitCode.INTERRUPTED, ExitCode.USAGE_ERROR, ExitCode.INTERNAL_ERROR):
517            self.status = TwisterStatus.ERROR
518            self.instance.reason = f'Pytest error - return code {proc.returncode}'
519            with open(self.pytest_log_file_path, 'w') as log_file:
520                log_file.write(shlex.join(cmd) + '\n\n')
521                log_file.write('\n'.join(self._output))
522
523    @staticmethod
524    def _update_command_with_env_dependencies(cmd):
525        '''
526        If python plugin wasn't installed by pip, then try to indicate it to
527        pytest by update PYTHONPATH and append -p argument to pytest command.
528        '''
529        env = os.environ.copy()
530        if not PYTEST_PLUGIN_INSTALLED:
531            cmd.extend(['-p', 'twister_harness.plugin'])
532            pytest_plugin_path = os.path.join(
533                ZEPHYR_BASE,
534                'scripts',
535                'pylib',
536                'pytest-twister-harness',
537                'src'
538            )
539            env['PYTHONPATH'] = pytest_plugin_path + os.pathsep + env.get('PYTHONPATH', '')
540            if _WINDOWS:
541                cmd_append_python_path = f'set PYTHONPATH={pytest_plugin_path};%PYTHONPATH% && '
542            else:
543                cmd_append_python_path = (
544                    f'export PYTHONPATH={pytest_plugin_path}:${{PYTHONPATH}} && '
545                )
546        else:
547            cmd_append_python_path = ''
548        cmd_to_print = cmd_append_python_path + shlex.join(cmd)
549        logger.debug(f'Running pytest command: {cmd_to_print}')
550
551        return cmd, env
552
553    def _output_reader(self, proc):
554        self._output = []
555        while proc.stdout.readable() and proc.poll() is None:
556            line = proc.stdout.readline().decode().strip()
557            if not line:
558                continue
559            self._output.append(line)
560            logger.debug(f'PYTEST: {line}')
561            self.parse_record(line)
562        proc.communicate()
563
564    def _update_test_status(self):
565        if self.status == TwisterStatus.NONE:
566            self.instance.testcases = []
567            try:
568                self._parse_report_file(self.report_file)
569            except Exception as e:
570                logger.error(f'Error when parsing file {self.report_file}: {e}')
571                self.status = TwisterStatus.FAIL
572            finally:
573                if not self.instance.testcases:
574                    self.instance.init_cases()
575
576        self.instance.status = self.status if self.status != TwisterStatus.NONE else \
577                               TwisterStatus.FAIL
578        if self.instance.status in [TwisterStatus.ERROR, TwisterStatus.FAIL]:
579            self.instance.reason = self.instance.reason or 'Pytest failed'
580            self.instance.add_missing_case_status(TwisterStatus.BLOCK, self.instance.reason)
581
582    def _parse_report_file(self, report):
583        tree = ET.parse(report)
584        root = tree.getroot()
585
586        if (elem_ts := root.find('testsuite')) is not None:
587            if elem_ts.get('failures') != '0':
588                self.status = TwisterStatus.FAIL
589                self.instance.reason = (
590                    f"{elem_ts.get('failures')}/{elem_ts.get('tests')} pytest scenario(s) failed"
591                )
592            elif elem_ts.get('errors') != '0':
593                self.status = TwisterStatus.ERROR
594                self.instance.reason = 'Error during pytest execution'
595            elif elem_ts.get('skipped') == elem_ts.get('tests'):
596                self.status = TwisterStatus.SKIP
597            else:
598                self.status = TwisterStatus.PASS
599            self.instance.execution_time = float(elem_ts.get('time'))
600
601            for elem_tc in elem_ts.findall('testcase'):
602                tc = self.instance.add_testcase(f"{self.id}.{elem_tc.get('name')}")
603                tc.duration = float(elem_tc.get('time'))
604                elem = elem_tc.find('*')
605                if elem is None:
606                    tc.status = TwisterStatus.PASS
607                else:
608                    if elem.tag == ReportStatus.SKIP:
609                        tc.status = TwisterStatus.SKIP
610                    elif elem.tag == ReportStatus.FAIL:
611                        tc.status = TwisterStatus.FAIL
612                    else:
613                        tc.status = TwisterStatus.ERROR
614                    tc.reason = elem.get('message')
615                    tc.output = elem.text
616        else:
617            self.status = TwisterStatus.SKIP
618            self.instance.reason = 'No tests collected'
619
620
621class Gtest(Harness):
622    ANSI_ESCAPE = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
623    _NAME_PATTERN = "[a-zA-Z_][a-zA-Z0-9_]*"
624    _SUITE_TEST_NAME_PATTERN = f"(?P<suite_name>{_NAME_PATTERN})\\.(?P<test_name>{_NAME_PATTERN})"
625    TEST_START_PATTERN = f".*\\[ RUN      \\] {_SUITE_TEST_NAME_PATTERN}"
626    TEST_PASS_PATTERN = f".*\\[       OK \\] {_SUITE_TEST_NAME_PATTERN}"
627    TEST_SKIP_PATTERN = f".*\\[ DISABLED \\] {_SUITE_TEST_NAME_PATTERN}"
628    TEST_FAIL_PATTERN = f".*\\[  FAILED  \\] {_SUITE_TEST_NAME_PATTERN}"
629    FINISHED_PATTERN = (
630        ".*(?:\\[==========\\] Done running all tests\\.|"
631        + "\\[----------\\] Global test environment tear-down)"
632    )
633
634    def __init__(self):
635        super().__init__()
636        self.tc = None
637        self.has_failures = False
638
639    def handle(self, line):
640        # Strip the ANSI characters, they mess up the patterns
641        non_ansi_line = self.ANSI_ESCAPE.sub('', line)
642
643        if self.status != TwisterStatus.NONE:
644            return
645
646        # Check if we started running a new test
647        test_start_match = re.search(self.TEST_START_PATTERN, non_ansi_line)
648        if test_start_match:
649            # Add the suite name
650            suite_name = test_start_match.group("suite_name")
651            if suite_name not in self.detected_suite_names:
652                self.detected_suite_names.append(suite_name)
653
654            # Generate the internal name of the test
655            name = "{}.{}.{}".format(self.id, suite_name, test_start_match.group("test_name"))
656
657            # Assert that we don't already have a running test
658            assert (
659                self.tc is None
660            ), f"gTest error, {self.tc} didn't finish"
661
662            # Check that the instance doesn't exist yet (prevents re-running)
663            tc = self.instance.get_case_by_name(name)
664            assert tc is None, f"gTest error, {tc} running twice"
665
666            # Create the test instance and set the context
667            tc = self.instance.get_case_or_create(name)
668            self.tc = tc
669            self.tc.status = TwisterStatus.STARTED
670            self.testcase_output += line + "\n"
671            self._match = True
672
673        # Check if the test run finished
674        finished_match = re.search(self.FINISHED_PATTERN, non_ansi_line)
675        if finished_match:
676            tc = self.instance.get_case_or_create(self.id)
677            if self.has_failures or self.tc is not None:
678                self.status = TwisterStatus.FAIL
679                tc.status = TwisterStatus.FAIL
680            else:
681                self.status = TwisterStatus.PASS
682                tc.status = TwisterStatus.PASS
683            return
684
685        # Check if the individual test finished
686        state, name = self._check_result(non_ansi_line)
687        if state == TwisterStatus.NONE or name is None:
688            # Nothing finished, keep processing lines
689            return
690
691        # Get the matching test and make sure it's the same as the current context
692        tc = self.instance.get_case_by_name(name)
693        assert (
694            tc is not None and tc == self.tc
695        ), f"gTest error, mismatched tests. Expected {self.tc} but got {tc}"
696
697        # Test finished, clear the context
698        self.tc = None
699
700        # Update the status of the test
701        tc.status = state
702        if tc.status == TwisterStatus.FAIL:
703            self.has_failures = True
704            tc.output = self.testcase_output
705        self.testcase_output = ""
706        self._match = False
707
708    def _check_result(self, line):
709        test_pass_match = re.search(self.TEST_PASS_PATTERN, line)
710        if test_pass_match:
711            return TwisterStatus.PASS, \
712                   "{}.{}.{}".format(
713                        self.id, test_pass_match.group("suite_name"),
714                        test_pass_match.group("test_name")
715                    )
716        test_skip_match = re.search(self.TEST_SKIP_PATTERN, line)
717        if test_skip_match:
718            return TwisterStatus.SKIP, \
719                   "{}.{}.{}".format(
720                       self.id, test_skip_match.group("suite_name"),
721                       test_skip_match.group("test_name")
722                    )
723        test_fail_match = re.search(self.TEST_FAIL_PATTERN, line)
724        if test_fail_match:
725            return TwisterStatus.FAIL, \
726                   "{}.{}.{}".format(
727                       self.id, test_fail_match.group("suite_name"),
728                       test_fail_match.group("test_name")
729                    )
730        return None, None
731
732
733class Test(Harness):
734    __test__ = False  # for pytest to skip this class when collects tests
735
736    test_suite_start_pattern = re.compile(r"Running TESTSUITE (?P<suite_name>\S*)")
737    test_suite_end_pattern = re.compile(
738        r"TESTSUITE (?P<suite_name>\S*)\s+(?P<suite_status>succeeded|failed)"
739    )
740    test_case_start_pattern = re.compile(r"START - (test_)?([a-zA-Z0-9_-]+)")
741    test_case_end_pattern = re.compile(
742        r".*(PASS|FAIL|SKIP) - (test_)?(\S*) in (\d*[.,]?\d*) seconds"
743    )
744    test_suite_summary_pattern = re.compile(
745        r"SUITE (?P<suite_status>\S*) - .* \[(?P<suite_name>\S*)\]:"
746        r" .* duration = (\d*[.,]?\d*) seconds"
747    )
748    test_case_summary_pattern = re.compile(
749        r" - (PASS|FAIL|SKIP) - \[([^\.]*).(test_)?(\S*)\] duration = (\d*[.,]?\d*) seconds"
750    )
751
752
753    def get_testcase(self, tc_name, phase, ts_name=None):
754        """ Search a Ztest case among detected in the test image binary
755            expecting the same test names as already known from the ELF.
756            Track suites and cases unexpectedly found in the log.
757        """
758        ts_names = self.started_suites.keys()
759        if ts_name:
760            if ts_name not in self.instance.testsuite.ztest_suite_names:
761                logger.warning(f"On {phase}: unexpected Ztest suite '{ts_name}' "
762                               f"not present among: {self.instance.testsuite.ztest_suite_names}")
763            if ts_name not in self.detected_suite_names:
764                if self.trace:
765                    logger.debug(f"On {phase}: detected new Ztest suite '{ts_name}'")
766                self.detected_suite_names.append(ts_name)
767            ts_names = [ ts_name ] if ts_name in ts_names else []
768
769        # First, try to match the test case ID to the first running Ztest suite with this test name.
770        for ts_name_ in ts_names:
771            if self.started_suites[ts_name_]['count'] < (0 if phase == 'TS_SUM' else 1):
772                continue
773            tc_fq_id = self.instance.compose_case_name(f"{ts_name_}.{tc_name}")
774            if tc := self.instance.get_case_by_name(tc_fq_id):
775                if self.trace:
776                    logger.debug(f"On {phase}: Ztest case '{tc_name}' matched to '{tc_fq_id}")
777                return tc
778        logger.debug(
779            f"On {phase}: Ztest case '{tc_name}' is not known"
780            f" in {self.started_suites} running suite(s)."
781        )
782        tc_id = self.instance.compose_case_name(tc_name)
783        return self.instance.get_case_or_create(tc_id)
784
785    def start_suite(self, suite_name):
786        if suite_name not in self.detected_suite_names:
787            self.detected_suite_names.append(suite_name)
788        if suite_name not in self.instance.testsuite.ztest_suite_names:
789            logger.warning(f"Unexpected Ztest suite '{suite_name}'")
790        if suite_name in self.started_suites:
791            if self.started_suites[suite_name]['count'] > 0:
792                logger.warning(f"Already STARTED '{suite_name}':{self.started_suites[suite_name]}")
793            elif self.trace:
794                logger.debug(f"START suite '{suite_name}'")
795            self.started_suites[suite_name]['count'] += 1
796            self.started_suites[suite_name]['repeat'] += 1
797        else:
798            self.started_suites[suite_name] = { 'count': 1, 'repeat': 0 }
799
800    def end_suite(self, suite_name, phase='', suite_status=None):
801        if suite_name in self.started_suites:
802            if phase == 'TS_SUM' and self.started_suites[suite_name]['count'] == 0:
803                return
804            if self.started_suites[suite_name]['count'] < 1:
805                logger.error(
806                    f"Already ENDED {phase} suite '{suite_name}':{self.started_suites[suite_name]}"
807                )
808            elif self.trace:
809                logger.debug(f"END {phase} suite '{suite_name}':{self.started_suites[suite_name]}")
810            self.started_suites[suite_name]['count'] -= 1
811        elif suite_status == 'SKIP':
812            self.start_suite(suite_name)  # register skipped suites at their summary end
813            self.started_suites[suite_name]['count'] -= 1
814        else:
815            logger.warning(f"END {phase} suite '{suite_name}' without START detected")
816
817    def start_case(self, tc_name):
818        if tc_name in self.started_cases:
819            if self.started_cases[tc_name]['count'] > 0:
820                logger.warning(f"Already STARTED '{tc_name}':{self.started_cases[tc_name]}")
821            self.started_cases[tc_name]['count'] += 1
822        else:
823            self.started_cases[tc_name] = { 'count': 1 }
824
825    def end_case(self, tc_name, phase=''):
826        if tc_name in self.started_cases:
827            if phase == 'TS_SUM' and self.started_cases[tc_name]['count'] == 0:
828                return
829            if self.started_cases[tc_name]['count'] < 1:
830                logger.error(
831                    f"Already ENDED {phase} case '{tc_name}':{self.started_cases[tc_name]}"
832                )
833            elif self.trace:
834                logger.debug(f"END {phase} case '{tc_name}':{self.started_cases[tc_name]}")
835            self.started_cases[tc_name]['count'] -= 1
836        elif phase != 'TS_SUM':
837            logger.warning(f"END {phase} case '{tc_name}' without START detected")
838
839
840    def handle(self, line):
841        testcase_match = None
842        if self._match:
843            self.testcase_output += line + "\n"
844
845        if test_suite_start_match := re.search(self.test_suite_start_pattern, line):
846            self.start_suite(test_suite_start_match.group("suite_name"))
847        elif test_suite_end_match := re.search(self.test_suite_end_pattern, line):
848            suite_name=test_suite_end_match.group("suite_name")
849            self.end_suite(suite_name, 'TS_END')
850        elif testcase_match := re.search(self.test_case_start_pattern, line):
851            tc_name = testcase_match.group(2)
852            tc = self.get_testcase(tc_name, 'TC_START')
853            self.start_case(tc.name)
854            # Mark the test as started, if something happens here, it is mostly
855            # due to this tests, for example timeout. This should in this case
856            # be marked as failed and not blocked (not run).
857            tc.status = TwisterStatus.STARTED
858            if not self._match:
859                self.testcase_output += line + "\n"
860                self._match = True
861        # some testcases are skipped based on predicates and do not show up
862        # during test execution, however they are listed in the summary. Parse
863        # the summary for status and use that status instead.
864        elif result_match := self.test_case_end_pattern.match(line):
865            matched_status = result_match.group(1)
866            tc_name = result_match.group(3)
867            tc = self.get_testcase(tc_name, 'TC_END')
868            self.end_case(tc.name)
869            tc.status = TwisterStatus[matched_status]
870            if tc.status == TwisterStatus.SKIP:
871                tc.reason = "ztest skip"
872            tc.duration = float(result_match.group(4))
873            if tc.status == TwisterStatus.FAIL:
874                tc.output = self.testcase_output
875            self.testcase_output = ""
876            self._match = False
877            self.ztest = True
878        elif test_suite_summary_match := self.test_suite_summary_pattern.match(line):
879            suite_name=test_suite_summary_match.group("suite_name")
880            suite_status=test_suite_summary_match.group("suite_status")
881            self._match = False
882            self.ztest = True
883            self.end_suite(suite_name, 'TS_SUM', suite_status=suite_status)
884        elif test_case_summary_match := self.test_case_summary_pattern.match(line):
885            matched_status = test_case_summary_match.group(1)
886            suite_name = test_case_summary_match.group(2)
887            tc_name = test_case_summary_match.group(4)
888            tc = self.get_testcase(tc_name, 'TS_SUM', suite_name)
889            self.end_case(tc.name, 'TS_SUM')
890            tc.status = TwisterStatus[matched_status]
891            if tc.status == TwisterStatus.SKIP:
892                tc.reason = "ztest skip"
893            tc.duration = float(test_case_summary_match.group(5))
894            if tc.status == TwisterStatus.FAIL:
895                tc.output = self.testcase_output
896            self.testcase_output = ""
897            self._match = False
898            self.ztest = True
899
900        self.process_test(line)
901
902        if not self.ztest and self.status != TwisterStatus.NONE:
903            logger.debug(f"not a ztest and no state for {self.id}")
904            tc = self.instance.get_case_or_create(self.id)
905            if self.status == TwisterStatus.PASS:
906                tc.status = TwisterStatus.PASS
907            else:
908                tc.status = TwisterStatus.FAIL
909                tc.reason = "Test failure"
910
911
912class Ztest(Test):
913    pass
914
915
916class Bsim(Harness):
917
918    def build(self):
919        """
920        Copying the application executable to BabbleSim's bin directory enables
921        running multidevice bsim tests after twister has built them.
922        """
923
924        if self.instance is None:
925            return
926
927        original_exe_path: str = os.path.join(self.instance.build_dir, 'zephyr', 'zephyr.exe')
928        if not os.path.exists(original_exe_path):
929            logger.warning('Cannot copy bsim exe - cannot find original executable.')
930            return
931
932        bsim_out_path: str = os.getenv('BSIM_OUT_PATH', '')
933        if not bsim_out_path:
934            logger.warning('Cannot copy bsim exe - BSIM_OUT_PATH not provided.')
935            return
936
937        new_exe_name: str = self.instance.testsuite.harness_config.get('bsim_exe_name', '')
938        if new_exe_name:
939            new_exe_name = f'bs_{self.instance.platform.name}_{new_exe_name}'
940        else:
941            new_exe_name = self.instance.name
942            new_exe_name = f'bs_{new_exe_name}'
943
944        new_exe_name = new_exe_name.replace(os.path.sep, '_').replace('.', '_').replace('@', '_')
945
946        new_exe_path: str = os.path.join(bsim_out_path, 'bin', new_exe_name)
947        logger.debug(f'Copying executable from {original_exe_path} to {new_exe_path}')
948        shutil.copy(original_exe_path, new_exe_path)
949
950
951class HarnessImporter:
952
953    @staticmethod
954    def get_harness(harness_name):
955        thismodule = sys.modules[__name__]
956        try:
957            if harness_name:
958                harness_class = getattr(thismodule, harness_name)
959            else:
960                harness_class = thismodule.Test
961            return harness_class()
962        except AttributeError as e:
963            logger.debug(f"harness {harness_name} not implemented: {e}")
964            return None
965