1# vim: set syntax=python ts=4 :
2#
3# Copyright (c) 2018-2024 Intel Corporation
4# Copyright 2022 NXP
5# Copyright (c) 2024 Arm Limited (or its affiliates). All rights reserved.
6#
7# SPDX-License-Identifier: Apache-2.0
8from __future__ import annotations
9
10import csv
11import glob
12import hashlib
13import logging
14import os
15import random
16from enum import Enum
17
18from twisterlib.constants import (
19    SUPPORTED_SIMS,
20    SUPPORTED_SIMS_IN_PYTEST,
21    SUPPORTED_SIMS_WITH_EXEC,
22)
23from twisterlib.environment import TwisterEnv
24from twisterlib.error import BuildError, StatusAttributeError
25from twisterlib.handlers import (
26    BinaryHandler,
27    DeviceHandler,
28    Handler,
29    QEMUHandler,
30    QEMUWinHandler,
31    SimulationHandler,
32)
33from twisterlib.platform import Platform
34from twisterlib.size_calc import SizeCalculator
35from twisterlib.statuses import TwisterStatus
36from twisterlib.testsuite import TestCase, TestSuite
37
38logger = logging.getLogger('twister')
39logger.setLevel(logging.DEBUG)
40
41class TestInstance:
42    """Class representing the execution of a particular TestSuite on a platform
43
44    @param test The TestSuite object we want to build/execute
45    @param platform Platform object that we want to build and run against
46    @param base_outdir Base directory for all test results. The actual
47        out directory used is <outdir>/<platform>/<test case name>
48    """
49
50    __test__ = False
51
52    def __init__(self, testsuite, platform, outdir):
53
54        self.testsuite: TestSuite = testsuite
55        self.platform: Platform = platform
56
57        self._status = TwisterStatus.NONE
58        self.reason = "Unknown"
59        self.metrics = dict()
60        self.handler = None
61        self.recording = None
62        self.outdir = outdir
63        self.execution_time = 0
64        self.build_time = 0
65        self.retries = 0
66
67        self.name = os.path.join(platform.name, testsuite.name)
68        self.dut = None
69
70        if testsuite.detailed_test_id:
71            self.build_dir = os.path.join(outdir, platform.normalized_name, testsuite.name)
72        else:
73            # if suite is not in zephyr,
74            # keep only the part after ".." in reconstructed dir structure
75            source_dir_rel = testsuite.source_dir_rel.rsplit(os.pardir+os.path.sep, 1)[-1]
76            self.build_dir = os.path.join(
77                outdir,
78                platform.normalized_name,
79                source_dir_rel,
80                testsuite.name
81            )
82        self.run_id = None
83        self.domains = None
84        # Instance need to use sysbuild if a given suite or a platform requires it
85        self.sysbuild = testsuite.sysbuild or platform.sysbuild
86
87        self.run = False
88        self.testcases: list[TestCase] = []
89        self.init_cases()
90        self.filters = []
91        self.filter_type = None
92
93    def setup_run_id(self):
94        self.run_id = self._get_run_id()
95
96    def record(self, recording, fname_csv="recording.csv"):
97        if recording:
98            if self.recording is None:
99                self.recording = recording.copy()
100            else:
101                self.recording.extend(recording)
102
103            filename = os.path.join(self.build_dir, fname_csv)
104            with open(filename, 'w') as csvfile:
105                cw = csv.DictWriter(csvfile,
106                                    fieldnames = self.recording[0].keys(),
107                                    lineterminator = os.linesep,
108                                    quoting = csv.QUOTE_NONNUMERIC)
109                cw.writeheader()
110                cw.writerows(self.recording)
111
112    @property
113    def status(self) -> TwisterStatus:
114        return self._status
115
116    @status.setter
117    def status(self, value : TwisterStatus) -> None:
118        # Check for illegal assignments by value
119        try:
120            key = value.name if isinstance(value, Enum) else value
121            self._status = TwisterStatus[key]
122        except KeyError as err:
123            raise StatusAttributeError(self.__class__, value) from err
124
125    def add_filter(self, reason, filter_type):
126        self.filters.append({'type': filter_type, 'reason': reason })
127        self.status = TwisterStatus.FILTER
128        self.reason = reason
129        self.filter_type = filter_type
130
131    # Fix an issue with copying objects from testsuite, need better solution.
132    def init_cases(self):
133        for c in self.testsuite.testcases:
134            self.add_testcase(c.name, freeform=c.freeform)
135
136    def _get_run_id(self):
137        """ generate run id from instance unique identifier and a random
138        number
139        If exist, get cached run id from previous run."""
140        run_id = ""
141        run_id_file = os.path.join(self.build_dir, "run_id.txt")
142        if os.path.exists(run_id_file):
143            with open(run_id_file) as fp:
144                run_id = fp.read()
145        else:
146            hash_object = hashlib.md5(self.name.encode())
147            random_str = f"{random.getrandbits(64)}".encode()
148            hash_object.update(random_str)
149            run_id = hash_object.hexdigest()
150            os.makedirs(self.build_dir, exist_ok=True)
151            with open(run_id_file, 'w+') as fp:
152                fp.write(run_id)
153        return run_id
154
155    def add_missing_case_status(self, status, reason=None):
156        for case in self.testcases:
157            if case.status == TwisterStatus.STARTED:
158                case.status = TwisterStatus.FAIL
159            elif case.status == TwisterStatus.NONE:
160                case.status = status
161                if reason:
162                    case.reason = reason
163                else:
164                    case.reason = self.reason
165
166    def __getstate__(self):
167        d = self.__dict__.copy()
168        return d
169
170    def __setstate__(self, d):
171        self.__dict__.update(d)
172
173    def __lt__(self, other):
174        return self.name < other.name
175
176    def compose_case_name(self, tc_name) -> str:
177        return self.testsuite.compose_case_name(tc_name)
178
179    def set_case_status_by_name(self, name, status, reason=None):
180        tc = self.get_case_or_create(name)
181        tc.status = status
182        if reason:
183            tc.reason = reason
184        return tc
185
186    def add_testcase(self, name, freeform=False):
187        tc = TestCase(name=name)
188        tc.freeform = freeform
189        self.testcases.append(tc)
190        return tc
191
192    def get_case_by_name(self, name):
193        for c in self.testcases:
194            if c.name == name:
195                return c
196        return None
197
198    def get_case_or_create(self, name):
199        for c in self.testcases:
200            if c.name == name:
201                return c
202
203        logger.debug(f"Could not find a matching testcase for {name}")
204        tc = TestCase(name=name)
205        self.testcases.append(tc)
206        return tc
207
208    @staticmethod
209    def testsuite_runnable(testsuite, fixtures):
210        can_run = False
211        # console harness allows us to run the test and capture data.
212        if testsuite.harness in [ 'console', 'ztest', 'pytest', 'test', 'gtest', 'robot']:
213            can_run = True
214            # if we have a fixture that is also being supplied on the
215            # command-line, then we need to run the test, not just build it.
216            fixture = testsuite.harness_config.get('fixture')
217            if fixture:
218                can_run = fixture in map(lambda f: f.split(sep=':')[0], fixtures)
219
220        return can_run
221
222    def setup_handler(self, env: TwisterEnv):
223        # only setup once.
224        if self.handler:
225            return
226
227        options = env.options
228        common_args = (options, env.generator_cmd, not options.disable_suite_name_check)
229        simulator = self.platform.simulator_by_name(options.sim_name)
230        if options.device_testing:
231            handler = DeviceHandler(self, "device", *common_args)
232            handler.call_make_run = False
233            handler.ready = True
234        elif simulator:
235            if simulator.name == "qemu":
236                if os.name != "nt":
237                    handler = QEMUHandler(self, "qemu", *common_args)
238                else:
239                    handler = QEMUWinHandler(self, "qemu", *common_args)
240                handler.args.append(f"QEMU_PIPE={handler.get_fifo()}")
241                handler.ready = True
242            else:
243                handler = SimulationHandler(self, simulator.name, *common_args)
244                handler.ready = simulator.is_runnable()
245
246        elif self.testsuite.type == "unit":
247            handler = BinaryHandler(self, "unit", *common_args)
248            handler.binary = os.path.join(self.build_dir, "testbinary")
249            if options.enable_coverage:
250                handler.args.append("COVERAGE=1")
251            handler.call_make_run = False
252            handler.ready = True
253        else:
254            handler = Handler(self, "", *common_args)
255
256        self.handler = handler
257
258    # Global testsuite parameters
259    def check_runnable(self,
260                       options: TwisterEnv,
261                       hardware_map=None):
262
263        enable_slow = options.enable_slow
264        filter = options.filter
265        fixtures = options.fixture
266        device_testing = options.device_testing
267        simulation = options.sim_name
268
269        simulator = self.platform.simulator_by_name(simulation)
270        if os.name == 'nt' and simulator:
271            # running on simulators is currently supported only for QEMU on Windows
272            if simulator.name not in ('na', 'qemu'):
273                return False
274
275            # check presence of QEMU on Windows
276            if simulator.name == 'qemu' and 'QEMU_BIN_PATH' not in os.environ:
277                return False
278
279        # we asked for build-only on the command line
280        if self.testsuite.build_only:
281            return False
282
283        # Do not run slow tests:
284        skip_slow = self.testsuite.slow and not enable_slow
285        if skip_slow:
286            return False
287
288        target_ready = bool(self.testsuite.type == "unit" or \
289                            self.platform.type == "native" or \
290                            (simulator and simulator.name in SUPPORTED_SIMS and \
291                             simulator.name not in self.testsuite.simulation_exclude) or \
292                            device_testing)
293
294        # check if test is runnable in pytest
295        if self.testsuite.harness == 'pytest':
296            target_ready = bool(
297                filter == 'runnable' or simulator and simulator.name in SUPPORTED_SIMS_IN_PYTEST
298            )
299
300        if filter != 'runnable' and \
301                simulator and \
302                simulator.name in SUPPORTED_SIMS_WITH_EXEC and \
303                not simulator.is_runnable():
304            target_ready = False
305
306        testsuite_runnable = self.testsuite_runnable(self.testsuite, fixtures)
307
308        if hardware_map:
309            for h in hardware_map.duts:
310                if (h.platform == self.platform.name and
311                        self.testsuite_runnable(self.testsuite, h.fixtures)):
312                    testsuite_runnable = True
313                    break
314
315        return testsuite_runnable and target_ready
316
317    def create_overlay(
318        self,
319        platform,
320        enable_asan=False,
321        enable_ubsan=False,
322        enable_coverage=False,
323        coverage_platform=None
324    ):
325        if coverage_platform is None:
326            coverage_platform = []
327        # Create this in a "twister/" subdirectory otherwise this
328        # will pass this overlay to kconfig.py *twice* and kconfig.cmake
329        # will silently give that second time precedence over any
330        # --extra-args=CONFIG_*
331        subdir = os.path.join(self.build_dir, "twister")
332
333        content = ""
334
335        if self.testsuite.extra_configs:
336            new_config_list = []
337            # some configs might be conditional on arch or platform, see if we
338            # have a namespace defined and apply only if the namespace matches.
339            # we currently support both arch: and platform:
340            for config in self.testsuite.extra_configs:
341                cond_config = config.split(":")
342                if cond_config[0] == "arch" and len(cond_config) == 3:
343                    if self.platform.arch == cond_config[1]:
344                        new_config_list.append(cond_config[2])
345                elif cond_config[0] == "platform" and len(cond_config) == 3:
346                    if self.platform.name == cond_config[1]:
347                        new_config_list.append(cond_config[2])
348                else:
349                    new_config_list.append(config)
350
351            content = "\n".join(new_config_list)
352
353        if enable_coverage:
354            for cp in coverage_platform:
355                if cp in platform.aliases:
356                    content = content + "\nCONFIG_COVERAGE=y"
357                    content = content + "\nCONFIG_COVERAGE_DUMP=y"
358
359        if platform.type == "native":
360            if enable_asan:
361                content = content + "\nCONFIG_ASAN=y"
362            if enable_ubsan:
363                content = content + "\nCONFIG_UBSAN=y"
364
365        if content:
366            os.makedirs(subdir, exist_ok=True)
367            file = os.path.join(subdir, "testsuite_extra.conf")
368            with open(file, "w", encoding='utf-8') as f:
369                f.write(content)
370
371        return content
372
373    def calculate_sizes(
374        self,
375        from_buildlog: bool = False,
376        generate_warning: bool = True
377    ) -> SizeCalculator:
378        """Get the RAM/ROM sizes of a test case.
379
380        This can only be run after the instance has been executed by
381        MakeGenerator, otherwise there won't be any binaries to measure.
382
383        @return A SizeCalculator object
384        """
385        elf_filepath = self.get_elf_file()
386        buildlog_filepath = self.get_buildlog_file() if from_buildlog else ''
387        return SizeCalculator(elf_filename=elf_filepath,
388                            extra_sections=self.testsuite.extra_sections,
389                            buildlog_filepath=buildlog_filepath,
390                            generate_warning=generate_warning)
391
392    def get_elf_file(self) -> str:
393
394        if self.sysbuild:
395            build_dir = self.domains.get_default_domain().build_dir
396        else:
397            build_dir = self.build_dir
398
399        fns = glob.glob(os.path.join(build_dir, "zephyr", "*.elf"))
400        fns.extend(glob.glob(os.path.join(build_dir, "testbinary")))
401        blocklist = [
402                'remapped', # used for xtensa plaforms
403                'zefi', # EFI for Zephyr
404                'qemu', # elf files generated after running in qemu
405                '_pre']
406        fns = [x for x in fns if not any(bad in os.path.basename(x) for bad in blocklist)]
407        if not fns:
408            raise BuildError("Missing output binary")
409        elif len(fns) > 1:
410            logger.warning(f"multiple ELF files detected: {', '.join(fns)}")
411        return fns[0]
412
413    def get_buildlog_file(self) -> str:
414        """Get path to build.log file.
415
416        @raises BuildError: Incorrect amount (!=1) of build logs.
417        @return: Path to build.log (str).
418        """
419        buildlog_paths = glob.glob(os.path.join(self.build_dir, "build.log"))
420        if len(buildlog_paths) != 1:
421            raise BuildError("Missing/multiple build.log file.")
422        return buildlog_paths[0]
423
424    def __repr__(self):
425        return f"<TestSuite {self.testsuite.name} on {self.platform.name}>"
426