1# SPDX-License-Identifier: Apache-2.0
2from __future__ import annotations
3
4import json
5import logging
6import os
7import platform
8import re
9import shlex
10import shutil
11import subprocess
12import sys
13import threading
14import time
15import xml.etree.ElementTree as ET
16from collections import OrderedDict
17from enum import Enum
18
19import junitparser.junitparser as junit
20import yaml
21from pytest import ExitCode
22from twisterlib.constants import SUPPORTED_SIMS_IN_PYTEST
23from twisterlib.environment import PYTEST_PLUGIN_INSTALLED, ZEPHYR_BASE
24from twisterlib.error import ConfigurationError, StatusAttributeError
25from twisterlib.handlers import Handler, terminate_process
26from twisterlib.reports import ReportStatus
27from twisterlib.statuses import TwisterStatus
28from twisterlib.testinstance import TestInstance
29
30logger = logging.getLogger('twister')
31logger.setLevel(logging.DEBUG)
32
33_WINDOWS = platform.system() == 'Windows'
34
35
36class Harness:
37    GCOV_START = "GCOV_COVERAGE_DUMP_START"
38    GCOV_END = "GCOV_COVERAGE_DUMP_END"
39    FAULT = "ZEPHYR FATAL ERROR"
40    RUN_PASSED = "PROJECT EXECUTION SUCCESSFUL"
41    RUN_FAILED = "PROJECT EXECUTION FAILED"
42    run_id_pattern = r"RunID: (?P<run_id>.*)"
43
44    def __init__(self):
45        self._status = TwisterStatus.NONE
46        self.reason = None
47        self.type = None
48        self.regex = []
49        self.matches = OrderedDict()
50        self.ordered = True
51        self.id = None
52        self.fail_on_fault = True
53        self.fault = False
54        self.capture_coverage = False
55        self.next_pattern = 0
56        self.record = None
57        self.record_patterns = []
58        self.record_merge = False
59        self.record_as_json = None
60        self.recording = []
61        self.ztest = False
62        self.detected_suite_names = []
63        self.run_id = None
64        self.started_suites = {}
65        self.started_cases = {}
66        self.matched_run_id = False
67        self.run_id_exists = False
68        self.instance: TestInstance | None = None
69        self.testcase_output = ""
70        self._match = False
71
72
73    @property
74    def trace(self) -> bool:
75        return self.instance.handler.options.verbose > 2
76
77    @property
78    def status(self) -> TwisterStatus:
79        return self._status
80
81    @status.setter
82    def status(self, value : TwisterStatus) -> None:
83        # Check for illegal assignments by value
84        try:
85            key = value.name if isinstance(value, Enum) else value
86            self._status = TwisterStatus[key]
87        except KeyError as err:
88            raise StatusAttributeError(self.__class__, value) from err
89
90    def configure(self, instance):
91        self.instance = instance
92        config = instance.testsuite.harness_config
93        self.id = instance.testsuite.id
94        self.run_id = instance.run_id
95        if instance.testsuite.ignore_faults:
96            self.fail_on_fault = False
97
98        if config:
99            self.type = config.get('type', None)
100            self.regex = config.get('regex', [])
101            self.ordered = config.get('ordered', True)
102            self.record = config.get('record', {})
103            if self.record:
104                self.record_patterns = [re.compile(p) for p in self.record.get("regex", [])]
105                self.record_merge = self.record.get("merge", False)
106                self.record_as_json = self.record.get("as_json")
107
108    def build(self):
109        pass
110
111    def get_testcase_name(self):
112        """
113        Get current TestCase name.
114        """
115        return self.id
116
117    def translate_record(self, record: dict) -> dict:
118        if self.record_as_json:
119            for k in self.record_as_json:
120                if k not in record:
121                    continue
122                try:
123                    record[k] = json.loads(record[k]) if record[k] else {}
124                except json.JSONDecodeError as parse_error:
125                    logger.warning(f"HARNESS:{self.__class__.__name__}: recording JSON failed:"
126                                   f" {parse_error} for '{k}':'{record[k]}'")
127                    # Don't set the Harness state to failed for recordings.
128                    record[k] = { 'ERROR': { 'msg': str(parse_error), 'doc': record[k] } }
129        return record
130
131    def parse_record(self, line) -> int:
132        match_cnt = 0
133        for record_pattern in self.record_patterns:
134            match = record_pattern.search(line)
135            if match:
136                match_cnt += 1
137                rec = self.translate_record(
138                    { k:v.strip() for k,v in match.groupdict(default="").items() }
139                )
140                if self.record_merge and len(self.recording) > 0:
141                    for k,v in rec.items():
142                        if k in self.recording[0]:
143                            if isinstance(self.recording[0][k], list):
144                                self.recording[0][k].append(v)
145                            else:
146                                self.recording[0][k] = [self.recording[0][k], v]
147                        else:
148                            self.recording[0][k] = v
149                else:
150                    self.recording.append(rec)
151        return match_cnt
152
153    def process_test(self, line):
154
155        self.parse_record(line)
156
157        runid_match = re.search(self.run_id_pattern, line)
158        if runid_match:
159            run_id = runid_match.group("run_id")
160            self.run_id_exists = True
161            if run_id == str(self.run_id):
162                self.matched_run_id = True
163
164        if self.RUN_PASSED in line:
165            if self.fault:
166                self.status = TwisterStatus.FAIL
167                self.reason = "Fault detected while running test"
168            else:
169                self.status = TwisterStatus.PASS
170
171        if self.RUN_FAILED in line:
172            self.status = TwisterStatus.FAIL
173            self.reason = "Testsuite failed"
174
175        if self.fail_on_fault and line == self.FAULT:
176            self.fault = True
177
178        if self.GCOV_START in line:
179            self.capture_coverage = True
180        elif self.GCOV_END in line:
181            self.capture_coverage = False
182
183class Robot(Harness):
184
185    is_robot_test = True
186
187    def configure(self, instance):
188        super().configure(instance)
189        self.instance = instance
190
191        config = instance.testsuite.harness_config
192        if config:
193            self.path = config.get('robot_testsuite', None)
194            self.option = config.get('robot_option', None)
195
196    def handle(self, line):
197        ''' Test cases that make use of this harness care about results given
198            by Robot Framework which is called in run_robot_test(), so works of this
199            handle is trying to give a PASS or FAIL to avoid timeout, nothing
200            is written into handler.log
201        '''
202        self.instance.status = TwisterStatus.PASS
203        tc = self.instance.get_case_or_create(self.id)
204        tc.status = TwisterStatus.PASS
205
206    def run_robot_test(self, command, handler):
207        start_time = time.time()
208        env = os.environ.copy()
209
210        if self.option:
211            if isinstance(self.option, list):
212                for option in self.option:
213                    for v in str(option).split():
214                        command.append(f'{v}')
215            else:
216                for v in str(self.option).split():
217                    command.append(f'{v}')
218
219        if self.path is None:
220            raise PytestHarnessException('The parameter robot_testsuite is mandatory')
221
222        if isinstance(self.path, list):
223            for suite in self.path:
224                command.append(os.path.join(handler.sourcedir, suite))
225        else:
226            command.append(os.path.join(handler.sourcedir, self.path))
227
228        with subprocess.Popen(command, stdout=subprocess.PIPE,
229                stderr=subprocess.STDOUT, cwd=self.instance.build_dir, env=env) as renode_test_proc:
230            out, _ = renode_test_proc.communicate()
231
232            self.instance.execution_time = time.time() - start_time
233
234            if renode_test_proc.returncode == 0:
235                self.instance.status = TwisterStatus.PASS
236                # all tests in one Robot file are treated as a single test case,
237                # so its status should be set accordingly to the instance status
238                # please note that there should be only one testcase in testcases list
239                self.instance.testcases[0].status = TwisterStatus.PASS
240            else:
241                logger.error(
242                    f"Robot test failure: {handler.sourcedir} for {self.instance.platform.name}"
243                )
244                self.instance.status = TwisterStatus.FAIL
245                self.instance.testcases[0].status = TwisterStatus.FAIL
246
247            if out:
248                with open(os.path.join(self.instance.build_dir, handler.log), 'w') as log:
249                    log_msg = out.decode(sys.getdefaultencoding())
250                    log.write(log_msg)
251
252class Console(Harness):
253
254    def get_testcase_name(self):
255        '''
256        Get current TestCase name.
257
258        Console Harness id has only TestSuite id without TestCase name suffix.
259        Only the first TestCase name might be taken if available when a Ztest with
260        a single test case is configured to use this harness type for simplified
261        output parsing instead of the Ztest harness as Ztest suite should do.
262        '''
263        if self.instance and len(self.instance.testcases) == 1:
264            return self.instance.testcases[0].name
265        return super().get_testcase_name()
266
267    def configure(self, instance):
268        super().configure(instance)
269        if self.regex is None or len(self.regex) == 0:
270            self.status = TwisterStatus.FAIL
271            tc = self.instance.set_case_status_by_name(
272                self.get_testcase_name(),
273                TwisterStatus.FAIL,
274                f"HARNESS:{self.__class__.__name__}:no regex patterns configured."
275            )
276            raise ConfigurationError(self.instance.name, tc.reason)
277        if self.type == "one_line":
278            self.pattern = re.compile(self.regex[0])
279            self.patterns_expected = 1
280        elif self.type == "multi_line":
281            self.patterns = []
282            for r in self.regex:
283                self.patterns.append(re.compile(r))
284            self.patterns_expected = len(self.patterns)
285        else:
286            self.status = TwisterStatus.FAIL
287            tc = self.instance.set_case_status_by_name(
288                self.get_testcase_name(),
289                TwisterStatus.FAIL,
290                f"HARNESS:{self.__class__.__name__}:incorrect type={self.type}"
291            )
292            raise ConfigurationError(self.instance.name, tc.reason)
293        #
294
295    def handle(self, line):
296        if self.type == "one_line":
297            if self.pattern.search(line):
298                logger.debug(f"HARNESS:{self.__class__.__name__}:EXPECTED:"
299                             f"'{self.pattern.pattern}'")
300                self.next_pattern += 1
301                self.status = TwisterStatus.PASS
302        elif self.type == "multi_line" and self.ordered:
303            if (self.next_pattern < len(self.patterns) and
304                self.patterns[self.next_pattern].search(line)):
305                logger.debug(f"HARNESS:{self.__class__.__name__}:EXPECTED("
306                             f"{self.next_pattern + 1}/{self.patterns_expected}):"
307                             f"'{self.patterns[self.next_pattern].pattern}'")
308                self.next_pattern += 1
309                if self.next_pattern >= len(self.patterns):
310                    self.status = TwisterStatus.PASS
311        elif self.type == "multi_line" and not self.ordered:
312            for i, pattern in enumerate(self.patterns):
313                r = self.regex[i]
314                if pattern.search(line) and r not in self.matches:
315                    self.matches[r] = line
316                    logger.debug(f"HARNESS:{self.__class__.__name__}:EXPECTED("
317                                 f"{len(self.matches)}/{self.patterns_expected}):"
318                                 f"'{pattern.pattern}'")
319            if len(self.matches) == len(self.regex):
320                self.status = TwisterStatus.PASS
321        else:
322            logger.error("Unknown harness_config type")
323
324        if self.fail_on_fault and self.FAULT in line:
325            self.fault = True
326
327        if self.GCOV_START in line:
328            self.capture_coverage = True
329        elif self.GCOV_END in line:
330            self.capture_coverage = False
331
332        self.process_test(line)
333        # Reset the resulting test state to FAIL when not all of the patterns were
334        # found in the output, but just ztest's 'PROJECT EXECUTION SUCCESSFUL'.
335        # It might happen because of the pattern sequence diverged from the
336        # test code, the test platform has console issues, or even some other
337        # test image was executed.
338        # TODO: Introduce explicit match policy type to reject
339        # unexpected console output, allow missing patterns, deny duplicates.
340        if self.status == TwisterStatus.PASS and \
341           self.ordered and \
342           self.next_pattern < self.patterns_expected:
343            logger.error(f"HARNESS:{self.__class__.__name__}: failed with"
344                         f" {self.next_pattern} of {self.patterns_expected}"
345                         f" expected ordered patterns.")
346            self.status = TwisterStatus.FAIL
347            self.reason = "patterns did not match (ordered)"
348        if self.status == TwisterStatus.PASS and \
349           not self.ordered and \
350           len(self.matches) < self.patterns_expected:
351            logger.error(f"HARNESS:{self.__class__.__name__}: failed with"
352                         f" {len(self.matches)} of {self.patterns_expected}"
353                         f" expected unordered patterns.")
354            self.status = TwisterStatus.FAIL
355            self.reason = "patterns did not match (unordered)"
356
357        tc = self.instance.get_case_or_create(self.get_testcase_name())
358        if self.status == TwisterStatus.PASS:
359            tc.status = TwisterStatus.PASS
360        else:
361            tc.status = TwisterStatus.FAIL
362
363
364class PytestHarnessException(Exception):
365    """General exception for pytest."""
366
367
368class Pytest(Harness):
369
370    def configure(self, instance: TestInstance):
371        super().configure(instance)
372        self.running_dir = instance.build_dir
373        self.source_dir = instance.testsuite.source_dir
374        self.report_file = os.path.join(self.running_dir, 'report.xml')
375        self.pytest_log_file_path = os.path.join(self.running_dir, 'twister_harness.log')
376        self.reserved_dut = None
377        self._output = []
378
379    def pytest_run(self, timeout):
380        try:
381            cmd = self.generate_command()
382            self.run_command(cmd, timeout)
383        except PytestHarnessException as pytest_exception:
384            logger.error(str(pytest_exception))
385            self.status = TwisterStatus.FAIL
386            self.instance.reason = str(pytest_exception)
387        finally:
388            self.instance.record(self.recording)
389            self._update_test_status()
390            if self.reserved_dut:
391                self.instance.handler.make_dut_available(self.reserved_dut)
392
393    def generate_command(self):
394        config = self.instance.testsuite.harness_config
395        handler: Handler = self.instance.handler
396        pytest_root = config.get('pytest_root', ['pytest']) if config else ['pytest']
397        pytest_args_yaml = config.get('pytest_args', []) if config else []
398        pytest_dut_scope = config.get('pytest_dut_scope', None) if config else None
399        command = [
400            'pytest',
401            '--twister-harness',
402            '-s', '-v',
403            f'--build-dir={self.running_dir}',
404            f'--junit-xml={self.report_file}',
405            f'--platform={self.instance.platform.name}'
406        ]
407
408        command.extend([os.path.normpath(os.path.join(
409            self.source_dir, os.path.expanduser(os.path.expandvars(src)))) for src in pytest_root])
410
411        if pytest_dut_scope:
412            command.append(f'--dut-scope={pytest_dut_scope}')
413
414        # Always pass output from the pytest test and the test image up to Twister log.
415        command.extend([
416            '--log-cli-level=DEBUG',
417            '--log-cli-format=%(levelname)s: %(message)s'
418        ])
419
420        # Use the test timeout as the base timeout for pytest
421        base_timeout = handler.get_test_timeout()
422        command.append(f'--base-timeout={base_timeout}')
423
424        if handler.type_str == 'device':
425            command.extend(
426                self._generate_parameters_for_hardware(handler)
427            )
428        elif handler.type_str in SUPPORTED_SIMS_IN_PYTEST:
429            command.append(f'--device-type={handler.type_str}')
430        elif handler.type_str == 'build':
431            command.append('--device-type=custom')
432        else:
433            raise PytestHarnessException(
434                f'Support for handler {handler.type_str} not implemented yet'
435            )
436
437        if handler.type_str != 'device':
438            for fixture in handler.options.fixture:
439                command.append(f'--twister-fixture={fixture}')
440
441        if handler.options.extra_test_args and handler.type_str == 'native':
442            command.append(f'--extra-test-args={shlex.join(handler.options.extra_test_args)}')
443
444        command.extend(pytest_args_yaml)
445
446        if handler.options.pytest_args:
447            command.extend(handler.options.pytest_args)
448
449        return command
450
451    def _generate_parameters_for_hardware(self, handler: Handler):
452        command = ['--device-type=hardware']
453        hardware = handler.get_hardware()
454        if not hardware:
455            raise PytestHarnessException('Hardware is not available')
456        # update the instance with the device id to have it in the summary report
457        self.instance.dut = hardware.id
458
459        self.reserved_dut = hardware
460        if hardware.serial_pty:
461            command.append(f'--device-serial-pty={hardware.serial_pty}')
462        else:
463            command.extend([
464                f'--device-serial={hardware.serial}',
465                f'--device-serial-baud={hardware.baud}'
466            ])
467
468        if hardware.flash_timeout:
469            command.append(f'--flash-timeout={hardware.flash_timeout}')
470
471        options = handler.options
472        if runner := hardware.runner or options.west_runner:
473            command.append(f'--runner={runner}')
474
475        if hardware.runner_params:
476            for param in hardware.runner_params:
477                command.append(f'--runner-params={param}')
478
479        if options.west_flash and options.west_flash != []:
480            command.append(f'--west-flash-extra-args={options.west_flash}')
481
482        if board_id := hardware.probe_id or hardware.id:
483            command.append(f'--device-id={board_id}')
484
485        if hardware.product:
486            command.append(f'--device-product={hardware.product}')
487
488        if hardware.pre_script:
489            command.append(f'--pre-script={hardware.pre_script}')
490
491        if hardware.post_flash_script:
492            command.append(f'--post-flash-script={hardware.post_flash_script}')
493
494        if hardware.post_script:
495            command.append(f'--post-script={hardware.post_script}')
496
497        if hardware.flash_before:
498            command.append(f'--flash-before={hardware.flash_before}')
499
500        for fixture in hardware.fixtures:
501            command.append(f'--twister-fixture={fixture}')
502
503        return command
504
505    def run_command(self, cmd, timeout):
506        cmd, env = self._update_command_with_env_dependencies(cmd)
507        with subprocess.Popen(
508            cmd,
509            stdout=subprocess.PIPE,
510            stderr=subprocess.STDOUT,
511            env=env
512        ) as proc:
513            try:
514                reader_t = threading.Thread(target=self._output_reader, args=(proc,), daemon=True)
515                reader_t.start()
516                reader_t.join(timeout)
517                if reader_t.is_alive():
518                    terminate_process(proc)
519                    logger.warning('Timeout has occurred. Can be extended in testspec file. '
520                                   f'Currently set to {timeout} seconds.')
521                    self.instance.reason = 'Pytest timeout'
522                    self.status = TwisterStatus.FAIL
523                proc.wait(timeout)
524            except subprocess.TimeoutExpired:
525                self.status = TwisterStatus.FAIL
526                proc.kill()
527
528        if proc.returncode in (ExitCode.INTERRUPTED, ExitCode.USAGE_ERROR, ExitCode.INTERNAL_ERROR):
529            self.status = TwisterStatus.ERROR
530            self.instance.reason = f'Pytest error - return code {proc.returncode}'
531        with open(self.pytest_log_file_path, 'w') as log_file:
532            log_file.write(shlex.join(cmd) + '\n\n')
533            log_file.write('\n'.join(self._output))
534
535    @staticmethod
536    def _update_command_with_env_dependencies(cmd):
537        '''
538        If python plugin wasn't installed by pip, then try to indicate it to
539        pytest by update PYTHONPATH and append -p argument to pytest command.
540        '''
541        env = os.environ.copy()
542        if not PYTEST_PLUGIN_INSTALLED:
543            cmd.extend(['-p', 'twister_harness.plugin'])
544            pytest_plugin_path = os.path.join(
545                ZEPHYR_BASE,
546                'scripts',
547                'pylib',
548                'pytest-twister-harness',
549                'src'
550            )
551            env['PYTHONPATH'] = pytest_plugin_path + os.pathsep + env.get('PYTHONPATH', '')
552            if _WINDOWS:
553                cmd_append_python_path = f'set PYTHONPATH={pytest_plugin_path};%PYTHONPATH% && '
554            else:
555                cmd_append_python_path = (
556                    f'export PYTHONPATH={pytest_plugin_path}:${{PYTHONPATH}} && '
557                )
558        else:
559            cmd_append_python_path = ''
560        cmd_to_print = cmd_append_python_path + shlex.join(cmd)
561        logger.debug(f'Running pytest command: {cmd_to_print}')
562
563        return cmd, env
564
565    def _output_reader(self, proc):
566        self._output = []
567        while proc.stdout.readable() and proc.poll() is None:
568            line = proc.stdout.readline().decode().strip()
569            if not line:
570                continue
571            self._output.append(line)
572            logger.debug(f'PYTEST: {line}')
573            self.parse_record(line)
574        proc.communicate()
575
576    def _update_test_status(self):
577        if self.status == TwisterStatus.NONE:
578            self.instance.testcases = []
579            try:
580                self._parse_report_file(self.report_file)
581            except Exception as e:
582                logger.error(f'Error when parsing file {self.report_file}: {e}')
583                self.status = TwisterStatus.FAIL
584            finally:
585                if not self.instance.testcases:
586                    self.instance.init_cases()
587
588        self.instance.status = self.status if self.status != TwisterStatus.NONE else \
589                               TwisterStatus.FAIL
590        if self.instance.status in [TwisterStatus.ERROR, TwisterStatus.FAIL]:
591            self.instance.reason = self.instance.reason or 'Pytest failed'
592            self.instance.add_missing_case_status(TwisterStatus.BLOCK, self.instance.reason)
593
594    def _parse_report_file(self, report):
595        tree = ET.parse(report)
596        root = tree.getroot()
597
598        if (elem_ts := root.find('testsuite')) is not None:
599            if elem_ts.get('failures') != '0':
600                self.status = TwisterStatus.FAIL
601                self.instance.reason = (
602                    f"{elem_ts.get('failures')}/{elem_ts.get('tests')} pytest scenario(s) failed"
603                )
604            elif elem_ts.get('errors') != '0':
605                self.status = TwisterStatus.ERROR
606                self.instance.reason = 'Error during pytest execution'
607            elif elem_ts.get('skipped') == elem_ts.get('tests'):
608                self.status = TwisterStatus.SKIP
609            else:
610                self.status = TwisterStatus.PASS
611            self.instance.execution_time = float(elem_ts.get('time'))
612
613            for elem_tc in elem_ts.findall('testcase'):
614                tc = self.instance.add_testcase(f"{self.id}.{elem_tc.get('name')}")
615                tc.duration = float(elem_tc.get('time'))
616                elem = elem_tc.find('*')
617                if elem is None:
618                    tc.status = TwisterStatus.PASS
619                else:
620                    if elem.tag == ReportStatus.SKIP:
621                        tc.status = TwisterStatus.SKIP
622                    elif elem.tag == ReportStatus.FAIL:
623                        tc.status = TwisterStatus.FAIL
624                    else:
625                        tc.status = TwisterStatus.ERROR
626                    tc.reason = elem.get('message')
627                    tc.output = elem.text
628        else:
629            self.status = TwisterStatus.SKIP
630            self.instance.reason = 'No tests collected'
631
632
633class Shell(Pytest):
634    def generate_command(self):
635        config = self.instance.testsuite.harness_config
636        pytest_root = [os.path.join(ZEPHYR_BASE, 'scripts', 'pylib', 'shell-twister-harness')]
637        config['pytest_root'] = pytest_root
638
639        command = super().generate_command()
640        if test_shell_file := self._get_shell_commands_file(config):
641            command.append(f'--testdata={test_shell_file}')
642        else:
643            logger.warning('No shell commands provided')
644        return command
645
646    def _get_shell_commands_file(self, harness_config):
647        if shell_commands := harness_config.get('shell_commands'):
648            test_shell_file = os.path.join(self.running_dir, 'test_shell.yml')
649            with open(test_shell_file, 'w') as f:
650                yaml.dump(shell_commands, f)
651            return test_shell_file
652
653        test_shell_file = harness_config.get('shell_commands_file', 'test_shell.yml')
654        test_shell_file = os.path.join(
655            self.source_dir, os.path.expanduser(os.path.expandvars(test_shell_file))
656        )
657        if os.path.exists(test_shell_file):
658            return test_shell_file
659        return None
660
661
662class Gtest(Harness):
663    ANSI_ESCAPE = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
664    _NAME_PATTERN = "[a-zA-Z_][a-zA-Z0-9_]*"
665    _SUITE_TEST_NAME_PATTERN = f"(?P<suite_name>{_NAME_PATTERN})\\.(?P<test_name>{_NAME_PATTERN})"
666    TEST_START_PATTERN = f".*\\[ RUN      \\] {_SUITE_TEST_NAME_PATTERN}"
667    TEST_PASS_PATTERN = f".*\\[       OK \\] {_SUITE_TEST_NAME_PATTERN}"
668    TEST_SKIP_PATTERN = f".*\\[ DISABLED \\] {_SUITE_TEST_NAME_PATTERN}"
669    TEST_FAIL_PATTERN = f".*\\[  FAILED  \\] {_SUITE_TEST_NAME_PATTERN}"
670    FINISHED_PATTERN = (
671        ".*(?:\\[==========\\] Done running all tests\\.|"
672        + "\\[----------\\] Global test environment tear-down)"
673    )
674
675    def __init__(self):
676        super().__init__()
677        self.tc = None
678        self.has_failures = False
679
680    def handle(self, line):
681        # Strip the ANSI characters, they mess up the patterns
682        non_ansi_line = self.ANSI_ESCAPE.sub('', line)
683
684        if self.status != TwisterStatus.NONE:
685            return
686
687        # Check if we started running a new test
688        test_start_match = re.search(self.TEST_START_PATTERN, non_ansi_line)
689        if test_start_match:
690            # Add the suite name
691            suite_name = test_start_match.group("suite_name")
692            if suite_name not in self.detected_suite_names:
693                self.detected_suite_names.append(suite_name)
694
695            # Generate the internal name of the test
696            name = "{}.{}.{}".format(self.id, suite_name, test_start_match.group("test_name"))
697
698            # Assert that we don't already have a running test
699            assert (
700                self.tc is None
701            ), f"gTest error, {self.tc} didn't finish"
702
703            # Check that the instance doesn't exist yet (prevents re-running)
704            tc = self.instance.get_case_by_name(name)
705            assert tc is None, f"gTest error, {tc} running twice"
706
707            # Create the test instance and set the context
708            tc = self.instance.get_case_or_create(name)
709            self.tc = tc
710            self.tc.status = TwisterStatus.STARTED
711            self.testcase_output += line + "\n"
712            self._match = True
713
714        # Check if the test run finished
715        finished_match = re.search(self.FINISHED_PATTERN, non_ansi_line)
716        if finished_match:
717            tc = self.instance.get_case_or_create(self.id)
718            if self.has_failures or self.tc is not None:
719                self.status = TwisterStatus.FAIL
720                tc.status = TwisterStatus.FAIL
721            else:
722                self.status = TwisterStatus.PASS
723                tc.status = TwisterStatus.PASS
724            return
725
726        # Check if the individual test finished
727        state, name = self._check_result(non_ansi_line)
728        if state == TwisterStatus.NONE or name is None:
729            # Nothing finished, keep processing lines
730            return
731
732        # Get the matching test and make sure it's the same as the current context
733        tc = self.instance.get_case_by_name(name)
734        assert (
735            tc is not None and tc == self.tc
736        ), f"gTest error, mismatched tests. Expected {self.tc} but got {tc}"
737
738        # Test finished, clear the context
739        self.tc = None
740
741        # Update the status of the test
742        tc.status = state
743        if tc.status == TwisterStatus.FAIL:
744            self.has_failures = True
745            tc.output = self.testcase_output
746        self.testcase_output = ""
747        self._match = False
748
749    def _check_result(self, line):
750        test_pass_match = re.search(self.TEST_PASS_PATTERN, line)
751        if test_pass_match:
752            return TwisterStatus.PASS, \
753                   "{}.{}.{}".format(
754                        self.id, test_pass_match.group("suite_name"),
755                        test_pass_match.group("test_name")
756                    )
757        test_skip_match = re.search(self.TEST_SKIP_PATTERN, line)
758        if test_skip_match:
759            return TwisterStatus.SKIP, \
760                   "{}.{}.{}".format(
761                       self.id, test_skip_match.group("suite_name"),
762                       test_skip_match.group("test_name")
763                    )
764        test_fail_match = re.search(self.TEST_FAIL_PATTERN, line)
765        if test_fail_match:
766            return TwisterStatus.FAIL, \
767                   "{}.{}.{}".format(
768                       self.id, test_fail_match.group("suite_name"),
769                       test_fail_match.group("test_name")
770                    )
771        return None, None
772
773
774class Test(Harness):
775    __test__ = False  # for pytest to skip this class when collects tests
776
777    test_suite_start_pattern = re.compile(r"Running TESTSUITE (?P<suite_name>\S*)")
778    test_suite_end_pattern = re.compile(
779        r"TESTSUITE (?P<suite_name>\S*)\s+(?P<suite_status>succeeded|failed)"
780    )
781    test_case_start_pattern = re.compile(r"START - (test_)?([a-zA-Z0-9_-]+)")
782    test_case_end_pattern = re.compile(
783        r".*(PASS|FAIL|SKIP) - (test_)?(\S*) in (\d*[.,]?\d*) seconds"
784    )
785    test_suite_summary_pattern = re.compile(
786        r"SUITE (?P<suite_status>\S*) - .* \[(?P<suite_name>\S*)\]:"
787        r" .* duration = (\d*[.,]?\d*) seconds"
788    )
789    test_case_summary_pattern = re.compile(
790        r" - (PASS|FAIL|SKIP) - \[([^\.]*).(test_)?(\S*)\] duration = (\d*[.,]?\d*) seconds"
791    )
792
793
794    def get_testcase(self, tc_name, phase, ts_name=None):
795        """ Search a Ztest case among detected in the test image binary
796            expecting the same test names as already known from the ELF.
797            Track suites and cases unexpectedly found in the log.
798        """
799        ts_names = self.started_suites.keys()
800        if ts_name:
801            if self.trace and ts_name not in self.instance.testsuite.ztest_suite_names:
802                # This can happen if a ZTEST_SUITE name is macro-generated
803                # in the test source files, e.g. based on DT information.
804                logger.debug(f"{phase}: unexpected Ztest suite '{ts_name}' is "
805                             f"not present among: {self.instance.testsuite.ztest_suite_names}")
806            if ts_name not in self.detected_suite_names:
807                if self.trace:
808                    logger.debug(f"{phase}: detected new Ztest suite '{ts_name}'")
809                self.detected_suite_names.append(ts_name)
810            ts_names = [ ts_name ] if ts_name in ts_names else []
811
812        # First, try to match the test case ID to the first running Ztest suite with this test name.
813        for ts_name_ in ts_names:
814            if self.started_suites[ts_name_]['count'] < (0 if phase == 'TS_SUM' else 1):
815                continue
816            tc_fq_id = self.instance.compose_case_name(f"{ts_name_}.{tc_name}")
817            if tc := self.instance.get_case_by_name(tc_fq_id):
818                if self.trace:
819                    logger.debug(f"{phase}: Ztest case '{tc_name}' matched to '{tc_fq_id}")
820                return tc
821        logger.debug(
822            f"{phase}: Ztest case '{tc_name}' is not known"
823            f" in {self.started_suites} running suite(s)."
824        )
825        tc_id = self.instance.compose_case_name(tc_name)
826        return self.instance.get_case_or_create(tc_id)
827
828    def start_suite(self, suite_name, phase='TS_START'):
829        if suite_name not in self.detected_suite_names:
830            self.detected_suite_names.append(suite_name)
831        if self.trace and suite_name not in self.instance.testsuite.ztest_suite_names:
832            # This can happen if a ZTEST_SUITE name is macro-generated
833            # in the test source files, e.g. based on DT information.
834            logger.debug(f"{phase}: unexpected Ztest suite '{suite_name}' is "
835                         f"not present among: {self.instance.testsuite.ztest_suite_names}")
836        if suite_name in self.started_suites:
837            if self.started_suites[suite_name]['count'] > 0:
838                # Either the suite restarts itself or unexpected state transition.
839                logger.warning(f"{phase}: already STARTED '{suite_name}':"
840                               f"{self.started_suites[suite_name]}")
841            elif self.trace:
842                logger.debug(f"{phase}: START suite '{suite_name}'")
843            self.started_suites[suite_name]['count'] += 1
844            self.started_suites[suite_name]['repeat'] += 1
845        else:
846            self.started_suites[suite_name] = { 'count': 1, 'repeat': 0 }
847
848    def end_suite(self, suite_name, phase='TS_END', suite_status=None):
849        if suite_name in self.started_suites:
850            if phase == 'TS_SUM' and self.started_suites[suite_name]['count'] == 0:
851                return
852            if self.started_suites[suite_name]['count'] < 1:
853                logger.error(
854                    f"{phase}: already ENDED suite '{suite_name}':{self.started_suites[suite_name]}"
855                )
856            elif self.trace:
857                logger.debug(f"{phase}: END suite '{suite_name}':{self.started_suites[suite_name]}")
858            self.started_suites[suite_name]['count'] -= 1
859        elif suite_status == 'SKIP':
860            self.start_suite(suite_name, phase)  # register skipped suites at their summary end
861            self.started_suites[suite_name]['count'] -= 1
862        else:
863            logger.warning(f"{phase}: END suite '{suite_name}' without START detected")
864
865    def start_case(self, tc_name, phase='TC_START'):
866        if tc_name in self.started_cases:
867            if self.started_cases[tc_name]['count'] > 0:
868                logger.warning(f"{phase}: already STARTED case "
869                               f"'{tc_name}':{self.started_cases[tc_name]}")
870            self.started_cases[tc_name]['count'] += 1
871        else:
872            self.started_cases[tc_name] = { 'count': 1 }
873
874    def end_case(self, tc_name, phase='TC_END'):
875        if tc_name in self.started_cases:
876            if phase == 'TS_SUM' and self.started_cases[tc_name]['count'] == 0:
877                return
878            if self.started_cases[tc_name]['count'] < 1:
879                logger.error(
880                    f"{phase}: already ENDED case '{tc_name}':{self.started_cases[tc_name]}"
881                )
882            elif self.trace:
883                logger.debug(f"{phase}: END case '{tc_name}':{self.started_cases[tc_name]}")
884            self.started_cases[tc_name]['count'] -= 1
885        elif phase != 'TS_SUM':
886            logger.warning(f"{phase}: END case '{tc_name}' without START detected")
887
888
889    def handle(self, line):
890        testcase_match = None
891        if self._match:
892            self.testcase_output += line + "\n"
893
894        if test_suite_start_match := re.search(self.test_suite_start_pattern, line):
895            self.start_suite(test_suite_start_match.group("suite_name"))
896        elif test_suite_end_match := re.search(self.test_suite_end_pattern, line):
897            suite_name=test_suite_end_match.group("suite_name")
898            self.end_suite(suite_name)
899        elif testcase_match := re.search(self.test_case_start_pattern, line):
900            tc_name = testcase_match.group(2)
901            tc = self.get_testcase(tc_name, 'TC_START')
902            self.start_case(tc.name)
903            # Mark the test as started, if something happens here, it is mostly
904            # due to this tests, for example timeout. This should in this case
905            # be marked as failed and not blocked (not run).
906            tc.status = TwisterStatus.STARTED
907            if not self._match:
908                self.testcase_output += line + "\n"
909                self._match = True
910        # some testcases are skipped based on predicates and do not show up
911        # during test execution, however they are listed in the summary. Parse
912        # the summary for status and use that status instead.
913        elif result_match := self.test_case_end_pattern.match(line):
914            matched_status = result_match.group(1)
915            tc_name = result_match.group(3)
916            tc = self.get_testcase(tc_name, 'TC_END')
917            self.end_case(tc.name)
918            tc.status = TwisterStatus[matched_status]
919            if tc.status == TwisterStatus.SKIP:
920                tc.reason = "ztest skip"
921            tc.duration = float(result_match.group(4))
922            if tc.status == TwisterStatus.FAIL:
923                tc.output = self.testcase_output
924            self.testcase_output = ""
925            self._match = False
926            self.ztest = True
927        elif test_suite_summary_match := self.test_suite_summary_pattern.match(line):
928            suite_name=test_suite_summary_match.group("suite_name")
929            suite_status=test_suite_summary_match.group("suite_status")
930            self._match = False
931            self.ztest = True
932            self.end_suite(suite_name, 'TS_SUM', suite_status=suite_status)
933        elif test_case_summary_match := self.test_case_summary_pattern.match(line):
934            matched_status = test_case_summary_match.group(1)
935            suite_name = test_case_summary_match.group(2)
936            tc_name = test_case_summary_match.group(4)
937            tc = self.get_testcase(tc_name, 'TS_SUM', suite_name)
938            self.end_case(tc.name, 'TS_SUM')
939            tc.status = TwisterStatus[matched_status]
940            if tc.status == TwisterStatus.SKIP:
941                tc.reason = "ztest skip"
942            tc.duration = float(test_case_summary_match.group(5))
943            if tc.status == TwisterStatus.FAIL:
944                tc.output = self.testcase_output
945            self.testcase_output = ""
946            self._match = False
947            self.ztest = True
948
949        self.process_test(line)
950
951        if not self.ztest and self.status != TwisterStatus.NONE:
952            logger.debug(f"not a ztest and no state for {self.id}")
953            tc = self.instance.get_case_or_create(self.id)
954            if self.status == TwisterStatus.PASS:
955                tc.status = TwisterStatus.PASS
956            else:
957                tc.status = TwisterStatus.FAIL
958                tc.reason = "Test failure"
959
960
961class Ztest(Test):
962    pass
963
964
965class Bsim(Harness):
966
967    def build(self):
968        """
969        Copying the application executable to BabbleSim's bin directory enables
970        running multidevice bsim tests after twister has built them.
971        """
972
973        if self.instance is None:
974            return
975
976        original_exe_path: str = os.path.join(self.instance.build_dir, 'zephyr', 'zephyr.exe')
977        if not os.path.exists(original_exe_path):
978            logger.warning('Cannot copy bsim exe - cannot find original executable.')
979            return
980
981        bsim_out_path: str = os.getenv('BSIM_OUT_PATH', '')
982        if not bsim_out_path:
983            logger.warning('Cannot copy bsim exe - BSIM_OUT_PATH not provided.')
984            return
985
986        new_exe_name: str = self.instance.testsuite.harness_config.get('bsim_exe_name', '')
987        if new_exe_name:
988            new_exe_name = f'bs_{self.instance.platform.name}_{new_exe_name}'
989        else:
990            new_exe_name = self.instance.name
991            new_exe_name = f'bs_{new_exe_name}'
992
993        new_exe_name = new_exe_name.replace(os.path.sep, '_').replace('.', '_').replace('@', '_')
994
995        new_exe_path: str = os.path.join(bsim_out_path, 'bin', new_exe_name)
996        logger.debug(f'Copying executable from {original_exe_path} to {new_exe_path}')
997        shutil.copy(original_exe_path, new_exe_path)
998
999class Ctest(Harness):
1000    def configure(self, instance: TestInstance):
1001        super().configure(instance)
1002        self.running_dir = instance.build_dir
1003        self.report_file = os.path.join(self.running_dir, 'report.xml')
1004        self.ctest_log_file_path = os.path.join(self.running_dir, 'twister_harness.log')
1005        self._output = []
1006
1007    def ctest_run(self, timeout):
1008        assert self.instance is not None
1009        try:
1010            cmd = self.generate_command()
1011            self.run_command(cmd, timeout)
1012        except Exception as err:
1013            logger.error(str(err))
1014            self.status = TwisterStatus.FAIL
1015            self.instance.reason = str(err)
1016        finally:
1017            self.instance.record(self.recording)
1018            self._update_test_status()
1019
1020    def generate_command(self):
1021        config = self.instance.testsuite.harness_config
1022        handler: Handler = self.instance.handler
1023        ctest_args_yaml = config.get('ctest_args', []) if config else []
1024        command = [
1025            'ctest',
1026            '--build-nocmake',
1027            '--test-dir',
1028            self.running_dir,
1029            '--output-junit',
1030            self.report_file,
1031            '--output-log',
1032            self.ctest_log_file_path,
1033            '--output-on-failure',
1034        ]
1035        base_timeout = handler.get_test_timeout()
1036        command.extend(['--timeout', str(base_timeout)])
1037        command.extend(ctest_args_yaml)
1038
1039        if handler.options.ctest_args:
1040            command.extend(handler.options.ctest_args)
1041
1042        return command
1043
1044    def run_command(self, cmd, timeout):
1045        with subprocess.Popen(
1046            cmd,
1047            stdout=subprocess.PIPE,
1048            stderr=subprocess.STDOUT,
1049        ) as proc:
1050            try:
1051                reader_t = threading.Thread(target=self._output_reader, args=(proc,), daemon=True)
1052                reader_t.start()
1053                reader_t.join(timeout)
1054                if reader_t.is_alive():
1055                    terminate_process(proc)
1056                    logger.warning('Timeout has occurred. Can be extended in testspec file. '
1057                                   f'Currently set to {timeout} seconds.')
1058                    self.instance.reason = 'Ctest timeout'
1059                    self.status = TwisterStatus.FAIL
1060                proc.wait(timeout)
1061            except subprocess.TimeoutExpired:
1062                self.status = TwisterStatus.FAIL
1063                proc.kill()
1064
1065        if proc.returncode in (ExitCode.INTERRUPTED, ExitCode.USAGE_ERROR, ExitCode.INTERNAL_ERROR):
1066            self.status = TwisterStatus.ERROR
1067            self.instance.reason = f'Ctest error - return code {proc.returncode}'
1068            with open(self.ctest_log_file_path, 'w') as log_file:
1069                log_file.write(shlex.join(cmd) + '\n\n')
1070                log_file.write('\n'.join(self._output))
1071
1072    def _output_reader(self, proc):
1073        self._output = []
1074        while proc.stdout.readable() and proc.poll() is None:
1075            line = proc.stdout.readline().decode().strip()
1076            if not line:
1077                continue
1078            self._output.append(line)
1079            logger.debug(f'CTEST: {line}')
1080            self.parse_record(line)
1081        proc.communicate()
1082
1083    def _update_test_status(self):
1084        if self.status == TwisterStatus.NONE:
1085            self.instance.testcases = []
1086            try:
1087                self._parse_report_file(self.report_file)
1088            except Exception as e:
1089                logger.error(f'Error when parsing file {self.report_file}: {e}')
1090                self.status = TwisterStatus.FAIL
1091            finally:
1092                if not self.instance.testcases:
1093                    self.instance.init_cases()
1094
1095        self.instance.status = self.status if self.status != TwisterStatus.NONE else \
1096                               TwisterStatus.FAIL
1097        if self.instance.status in [TwisterStatus.ERROR, TwisterStatus.FAIL]:
1098            self.instance.reason = self.instance.reason or 'Ctest failed'
1099            self.instance.add_missing_case_status(TwisterStatus.BLOCK, self.instance.reason)
1100
1101    def _parse_report_file(self, report):
1102        suite = junit.JUnitXml.fromfile(report)
1103        if suite is None:
1104            self.status = TwisterStatus.SKIP
1105            self.instance.reason = 'No tests collected'
1106            return
1107
1108        assert isinstance(suite, junit.TestSuite)
1109
1110        if suite.failures and suite.failures > 0:
1111            self.status = TwisterStatus.FAIL
1112            self.instance.reason = f"{suite.failures}/{suite.tests} ctest scenario(s) failed"
1113        elif suite.errors and suite.errors > 0:
1114            self.status = TwisterStatus.ERROR
1115            self.instance.reason = 'Error during ctest execution'
1116        elif suite.skipped and suite.skipped > 0:
1117            self.status = TwisterStatus.SKIP
1118        else:
1119            self.status = TwisterStatus.PASS
1120        self.instance.execution_time = suite.time
1121
1122        for case in suite:
1123            tc = self.instance.add_testcase(f"{self.id}.{case.name}")
1124            tc.duration = case.time
1125            if any(isinstance(r, junit.Failure) for r in case.result):
1126                tc.status = TwisterStatus.FAIL
1127                tc.output = case.system_out
1128            elif any(isinstance(r, junit.Error) for r in case.result):
1129                tc.status = TwisterStatus.ERROR
1130                tc.output = case.system_out
1131            elif any(isinstance(r, junit.Skipped) for r in case.result):
1132                tc.status = TwisterStatus.SKIP
1133            else:
1134                tc.status = TwisterStatus.PASS
1135
1136class HarnessImporter:
1137
1138    @staticmethod
1139    def get_harness(harness_name):
1140        thismodule = sys.modules[__name__]
1141        try:
1142            if harness_name:
1143                harness_class = getattr(thismodule, harness_name)
1144            else:
1145                harness_class = thismodule.Test
1146            return harness_class()
1147        except AttributeError as e:
1148            logger.debug(f"harness {harness_name} not implemented: {e}")
1149            return None
1150