1# vim: set syntax=python ts=4 :
2#
3# Copyright (c) 2018-2025 Intel Corporation
4# Copyright 2022 NXP
5# Copyright (c) 2024 Arm Limited (or its affiliates). All rights reserved.
6#
7# SPDX-License-Identifier: Apache-2.0
8from __future__ import annotations
9
10import csv
11import glob
12import hashlib
13import logging
14import os
15import random
16from enum import Enum
17
18from twisterlib.constants import (
19    SUPPORTED_SIMS,
20    SUPPORTED_SIMS_IN_PYTEST,
21    SUPPORTED_SIMS_WITH_EXEC,
22)
23from twisterlib.environment import TwisterEnv
24from twisterlib.error import BuildError, StatusAttributeError
25from twisterlib.handlers import (
26    BinaryHandler,
27    DeviceHandler,
28    Handler,
29    QEMUHandler,
30    QEMUWinHandler,
31    SimulationHandler,
32)
33from twisterlib.platform import Platform
34from twisterlib.size_calc import SizeCalculator
35from twisterlib.statuses import TwisterStatus
36from twisterlib.testsuite import TestCase, TestSuite
37
38logger = logging.getLogger('twister')
39logger.setLevel(logging.DEBUG)
40
41class TestInstance:
42    """Class representing the execution of a particular TestSuite on a platform
43
44    @param test The TestSuite object we want to build/execute
45    @param platform Platform object that we want to build and run against
46    @param base_outdir Base directory for all test results. The actual
47        out directory used is <outdir>/<platform>/<test case name>
48    """
49
50    __test__ = False
51
52    def __init__(self, testsuite, platform, toolchain, outdir):
53
54        self.testsuite: TestSuite = testsuite
55        self.platform: Platform = platform
56
57        self._status = TwisterStatus.NONE
58        self.reason = "Unknown"
59        self.metrics = dict()
60        self.handler = None
61        self.recording = None
62        self.coverage = None
63        self.coverage_status = None
64        self.outdir = outdir
65        self.execution_time = 0
66        self.build_time = 0
67        self.retries = 0
68        self.toolchain = toolchain
69
70        self.name = os.path.join(platform.name, toolchain, testsuite.name)
71        self.dut = None
72
73        if testsuite.detailed_test_id:
74            self.build_dir = os.path.join(
75                outdir, platform.normalized_name, self.toolchain, testsuite.name
76            )
77        else:
78            # if suite is not in zephyr,
79            # keep only the part after ".." in reconstructed dir structure
80            source_dir_rel = testsuite.source_dir_rel.rsplit(os.pardir+os.path.sep, 1)[-1]
81            self.build_dir = os.path.join(
82                outdir,
83                platform.normalized_name,
84                self.toolchain,
85                source_dir_rel,
86                testsuite.name
87            )
88        self.run_id = None
89        self.domains = None
90        # Instance need to use sysbuild if a given suite or a platform requires it
91        self.sysbuild = testsuite.sysbuild or platform.sysbuild
92
93        self.run = False
94        self.testcases: list[TestCase] = []
95        self.init_cases()
96        self.filters = []
97        self.filter_type = None
98
99    def setup_run_id(self):
100        self.run_id = self._get_run_id()
101
102    def record(self, recording, fname_csv="recording.csv"):
103        if recording:
104            if self.recording is None:
105                self.recording = recording.copy()
106            else:
107                self.recording.extend(recording)
108
109            filename = os.path.join(self.build_dir, fname_csv)
110            fieldnames = set()
111            for r in self.recording:
112                fieldnames.update(r)
113            with open(filename, 'w') as csvfile:
114                cw = csv.DictWriter(csvfile,
115                                    fieldnames = sorted(list(fieldnames)),
116                                    lineterminator = os.linesep,
117                                    quoting = csv.QUOTE_NONNUMERIC)
118                cw.writeheader()
119                cw.writerows(self.recording)
120
121    @property
122    def status(self) -> TwisterStatus:
123        return self._status
124
125    @status.setter
126    def status(self, value : TwisterStatus) -> None:
127        # Check for illegal assignments by value
128        try:
129            key = value.name if isinstance(value, Enum) else value
130            self._status = TwisterStatus[key]
131        except KeyError as err:
132            raise StatusAttributeError(self.__class__, value) from err
133
134    def add_filter(self, reason, filter_type):
135        self.filters.append({'type': filter_type, 'reason': reason })
136        self.status = TwisterStatus.FILTER
137        self.reason = reason
138        self.filter_type = filter_type
139
140    # Fix an issue with copying objects from testsuite, need better solution.
141    def init_cases(self):
142        for c in self.testsuite.testcases:
143            self.add_testcase(c.name, freeform=c.freeform)
144
145    def _get_run_id(self):
146        """ generate run id from instance unique identifier and a random
147        number
148        If exist, get cached run id from previous run."""
149        run_id = ""
150        run_id_file = os.path.join(self.build_dir, "run_id.txt")
151        if os.path.exists(run_id_file):
152            with open(run_id_file) as fp:
153                run_id = fp.read()
154        else:
155            hash_object = hashlib.md5(self.name.encode(), usedforsecurity=False)
156            random_str = f"{random.getrandbits(64)}".encode()
157            hash_object.update(random_str)
158            run_id = hash_object.hexdigest()
159            os.makedirs(self.build_dir, exist_ok=True)
160            with open(run_id_file, 'w+') as fp:
161                fp.write(run_id)
162        return run_id
163
164    def add_missing_case_status(self, status, reason=None):
165        for case in self.testcases:
166            if case.status == TwisterStatus.STARTED:
167                case.status = TwisterStatus.FAIL
168            elif case.status == TwisterStatus.NONE:
169                case.status = status
170                if reason:
171                    case.reason = reason
172                else:
173                    case.reason = self.reason
174
175    def __getstate__(self):
176        d = self.__dict__.copy()
177        return d
178
179    def __setstate__(self, d):
180        self.__dict__.update(d)
181
182    def __lt__(self, other):
183        return self.name < other.name
184
185    def compose_case_name(self, tc_name) -> str:
186        return self.testsuite.compose_case_name(tc_name)
187
188    def set_case_status_by_name(self, name, status, reason=None):
189        tc = self.get_case_or_create(name)
190        tc.status = status
191        if reason:
192            tc.reason = reason
193        return tc
194
195    def add_testcase(self, name, freeform=False):
196        tc = TestCase(name=name)
197        tc.freeform = freeform
198        self.testcases.append(tc)
199        return tc
200
201    def get_case_by_name(self, name):
202        for c in self.testcases:
203            if c.name == name:
204                return c
205        return None
206
207    def get_case_or_create(self, name):
208        for c in self.testcases:
209            if c.name == name:
210                return c
211
212        logger.debug(f"Could not find a matching testcase for {name}")
213        tc = TestCase(name=name)
214        self.testcases.append(tc)
215        return tc
216
217    @staticmethod
218    def testsuite_runnable(testsuite, fixtures):
219        can_run = False
220        # console harness allows us to run the test and capture data.
221        if testsuite.harness in [
222            'console',
223            'ztest',
224            'pytest',
225            'test',
226            'gtest',
227            'robot',
228            'ctest',
229            'shell'
230            ]:
231            can_run = True
232            # if we have a fixture that is also being supplied on the
233            # command-line, then we need to run the test, not just build it.
234            fixture = testsuite.harness_config.get('fixture')
235            if fixture:
236                can_run = fixture in map(lambda f: f.split(sep=':')[0], fixtures)
237
238        return can_run
239
240    def setup_handler(self, env: TwisterEnv):
241        # only setup once.
242        if self.handler:
243            return
244
245        options = env.options
246        common_args = (options, env.generator_cmd, not options.disable_suite_name_check)
247        simulator = self.platform.simulator_by_name(options.sim_name)
248        if options.device_testing:
249            handler = DeviceHandler(self, "device", *common_args)
250            handler.call_make_run = False
251            handler.ready = True
252        elif simulator:
253            if simulator.name == "qemu":
254                if os.name != "nt":
255                    handler = QEMUHandler(self, "qemu", *common_args)
256                else:
257                    handler = QEMUWinHandler(self, "qemu", *common_args)
258                handler.args.append(f"QEMU_PIPE={handler.get_fifo()}")
259                handler.ready = True
260            else:
261                handler = SimulationHandler(self, simulator.name, *common_args)
262                handler.ready = simulator.is_runnable()
263
264        elif self.testsuite.type == "unit":
265            handler = BinaryHandler(self, "unit", *common_args)
266            handler.binary = os.path.join(self.build_dir, "testbinary")
267            if options.enable_coverage:
268                handler.args.append("COVERAGE=1")
269            handler.call_make_run = False
270            handler.ready = True
271        else:
272            handler = Handler(self, "", *common_args)
273            if self.testsuite.harness == "ctest":
274                handler.ready = True
275
276        self.handler = handler
277
278    # Global testsuite parameters
279    def check_runnable(self,
280                       options: TwisterEnv,
281                       hardware_map=None):
282
283        enable_slow = options.enable_slow
284        filter = options.filter
285        fixtures = options.fixture
286        device_testing = options.device_testing
287        simulation = options.sim_name
288
289        simulator = self.platform.simulator_by_name(simulation)
290        if os.name == 'nt' and simulator:
291            # running on simulators is currently supported only for QEMU on Windows
292            if simulator.name not in ('na', 'qemu'):
293                return False
294
295            # check presence of QEMU on Windows
296            if simulator.name == 'qemu' and 'QEMU_BIN_PATH' not in os.environ:
297                return False
298
299        # we asked for build-only on the command line
300        if self.testsuite.build_only:
301            return False
302
303        # Do not run slow tests:
304        skip_slow = self.testsuite.slow and not enable_slow
305        if skip_slow:
306            return False
307
308        target_ready = bool(self.testsuite.type == "unit" or \
309                            self.platform.type == "native" or \
310                            self.testsuite.harness == "ctest" or \
311                            (simulator and simulator.name in SUPPORTED_SIMS and \
312                             simulator.name not in self.testsuite.simulation_exclude) or \
313                            device_testing)
314
315        # check if test is runnable in pytest
316        if self.testsuite.harness in ['pytest', 'shell']:
317            target_ready = bool(
318                filter == 'runnable' or simulator and simulator.name in SUPPORTED_SIMS_IN_PYTEST
319            )
320
321        if filter != 'runnable' and \
322                simulator and \
323                simulator.name in SUPPORTED_SIMS_WITH_EXEC and \
324                not simulator.is_runnable():
325            target_ready = False
326
327        testsuite_runnable = self.testsuite_runnable(self.testsuite, fixtures)
328
329        if hardware_map:
330            for h in hardware_map.duts:
331                if (h.platform in self.platform.aliases and
332                        self.testsuite_runnable(self.testsuite, h.fixtures)):
333                    testsuite_runnable = True
334                    break
335
336        return testsuite_runnable and target_ready
337
338    def create_overlay(
339        self,
340        platform,
341        enable_asan=False,
342        enable_ubsan=False,
343        enable_coverage=False,
344        coverage_platform=None
345    ):
346        if coverage_platform is None:
347            coverage_platform = []
348        # Create this in a "twister/" subdirectory otherwise this
349        # will pass this overlay to kconfig.py *twice* and kconfig.cmake
350        # will silently give that second time precedence over any
351        # --extra-args=CONFIG_*
352        subdir = os.path.join(self.build_dir, "twister")
353
354        content = ""
355
356        if self.testsuite.extra_configs:
357            new_config_list = []
358            # some configs might be conditional on arch or platform, see if we
359            # have a namespace defined and apply only if the namespace matches.
360            # we currently support both arch: and platform:
361            for config in self.testsuite.extra_configs:
362                cond_config = config.split(":")
363                if cond_config[0] == "arch" and len(cond_config) == 3:
364                    if self.platform.arch == cond_config[1]:
365                        new_config_list.append(cond_config[2])
366                elif cond_config[0] == "platform" and len(cond_config) == 3:
367                    if self.platform.name == cond_config[1]:
368                        new_config_list.append(cond_config[2])
369                else:
370                    new_config_list.append(config)
371
372            content = "\n".join(new_config_list)
373
374        if enable_coverage:
375            for cp in coverage_platform:
376                if cp in platform.aliases:
377                    content = content + "\nCONFIG_COVERAGE=y"
378                    content = content + "\nCONFIG_COVERAGE_DUMP=y"
379
380        if platform.type == "native":
381            if enable_asan:
382                content = content + "\nCONFIG_ASAN=y"
383            if enable_ubsan:
384                content = content + "\nCONFIG_UBSAN=y"
385
386        if content:
387            os.makedirs(subdir, exist_ok=True)
388            file = os.path.join(subdir, "testsuite_extra.conf")
389            with open(file, "w", encoding='utf-8') as f:
390                f.write(content)
391
392        return content
393
394    def calculate_sizes(
395        self,
396        from_buildlog: bool = False,
397        generate_warning: bool = True
398    ) -> SizeCalculator:
399        """Get the RAM/ROM sizes of a test case.
400
401        This can only be run after the instance has been executed by
402        MakeGenerator, otherwise there won't be any binaries to measure.
403
404        @return A SizeCalculator object
405        """
406        elf_filepath = self.get_elf_file()
407        buildlog_filepath = self.get_buildlog_file() if from_buildlog else ''
408        return SizeCalculator(elf_filename=elf_filepath,
409                            extra_sections=self.testsuite.extra_sections,
410                            buildlog_filepath=buildlog_filepath,
411                            generate_warning=generate_warning)
412
413    def get_elf_file(self) -> str:
414
415        if self.sysbuild:
416            build_dir = self.domains.get_default_domain().build_dir
417        else:
418            build_dir = self.build_dir
419
420        fns = glob.glob(os.path.join(build_dir, "zephyr", "*.elf"))
421        fns.extend(glob.glob(os.path.join(build_dir, "testbinary")))
422        blocklist = [
423                'remapped', # used for xtensa plaforms
424                'zefi', # EFI for Zephyr
425                'qemu', # elf files generated after running in qemu
426                '_pre']
427        fns = [x for x in fns if not any(bad in os.path.basename(x) for bad in blocklist)]
428        if not fns:
429            raise BuildError("Missing output binary")
430        elif len(fns) > 1:
431            logger.warning(f"multiple ELF files detected: {', '.join(fns)}")
432        return fns[0]
433
434    def get_buildlog_file(self) -> str:
435        """Get path to build.log file.
436
437        @raises BuildError: Incorrect amount (!=1) of build logs.
438        @return: Path to build.log (str).
439        """
440        buildlog_paths = glob.glob(os.path.join(self.build_dir, "build.log"))
441        if len(buildlog_paths) != 1:
442            raise BuildError("Missing/multiple build.log file.")
443        return buildlog_paths[0]
444
445    def __repr__(self):
446        return f"<TestSuite {self.testsuite.name} on {self.platform.name}>"
447