1# vim: set syntax=python ts=4 :
2#
3# Copyright (c) 2018-2022 Intel Corporation
4# Copyright 2022 NXP
5# SPDX-License-Identifier: Apache-2.0
6from __future__ import annotations
7import os
8import hashlib
9import random
10import logging
11import shutil
12import glob
13
14from twisterlib.testsuite import TestCase, TestSuite
15from twisterlib.platform import Platform
16from twisterlib.error import BuildError
17from twisterlib.size_calc import SizeCalculator
18from twisterlib.handlers import (
19    Handler,
20    SimulationHandler,
21    BinaryHandler,
22    QEMUHandler,
23    DeviceHandler,
24    SUPPORTED_SIMS,
25    SUPPORTED_SIMS_IN_PYTEST,
26)
27
28logger = logging.getLogger('twister')
29logger.setLevel(logging.DEBUG)
30
31class TestInstance:
32    """Class representing the execution of a particular TestSuite on a platform
33
34    @param test The TestSuite object we want to build/execute
35    @param platform Platform object that we want to build and run against
36    @param base_outdir Base directory for all test results. The actual
37        out directory used is <outdir>/<platform>/<test case name>
38    """
39
40    __test__ = False
41
42    def __init__(self, testsuite, platform, outdir):
43
44        self.testsuite: TestSuite = testsuite
45        self.platform: Platform = platform
46
47        self.status = None
48        self.reason = "Unknown"
49        self.metrics = dict()
50        self.handler = None
51        self.recording = None
52        self.outdir = outdir
53        self.execution_time = 0
54        self.build_time = 0
55        self.retries = 0
56
57        self.name = os.path.join(platform.name, testsuite.name)
58        self.dut = None
59        if testsuite.detailed_test_id:
60            self.build_dir = os.path.join(outdir, platform.name, testsuite.name)
61        else:
62            # if suite is not in zephyr, keep only the part after ".." in reconstructed dir structure
63            source_dir_rel = testsuite.source_dir_rel.rsplit(os.pardir+os.path.sep, 1)[-1]
64            self.build_dir = os.path.join(outdir, platform.name, source_dir_rel, testsuite.name)
65
66        self.run_id = self._get_run_id()
67        self.domains = None
68
69        self.run = False
70        self.testcases: list[TestCase] = []
71        self.init_cases()
72        self.filters = []
73        self.filter_type = None
74
75    def add_filter(self, reason, filter_type):
76        self.filters.append({'type': filter_type, 'reason': reason })
77        self.status = "filtered"
78        self.reason = reason
79        self.filter_type = filter_type
80
81    # Fix an issue with copying objects from testsuite, need better solution.
82    def init_cases(self):
83        for c in self.testsuite.testcases:
84            self.add_testcase(c.name, freeform=c.freeform)
85
86    def _get_run_id(self):
87        """ generate run id from instance unique identifier and a random
88        number
89        If exist, get cached run id from previous run."""
90        run_id = ""
91        run_id_file = os.path.join(self.build_dir, "run_id.txt")
92        if os.path.exists(run_id_file):
93            with open(run_id_file, "r") as fp:
94                run_id = fp.read()
95        else:
96            hash_object = hashlib.md5(self.name.encode())
97            random_str = f"{random.getrandbits(64)}".encode()
98            hash_object.update(random_str)
99            run_id = hash_object.hexdigest()
100            os.makedirs(self.build_dir, exist_ok=True)
101            with open(run_id_file, 'w+') as fp:
102                fp.write(run_id)
103        return run_id
104
105    def add_missing_case_status(self, status, reason=None):
106        for case in self.testcases:
107            if case.status == 'started':
108                case.status = "failed"
109            elif not case.status:
110                case.status = status
111                if reason:
112                    case.reason = reason
113                else:
114                    case.reason = self.reason
115
116    def __getstate__(self):
117        d = self.__dict__.copy()
118        return d
119
120    def __setstate__(self, d):
121        self.__dict__.update(d)
122
123    def __lt__(self, other):
124        return self.name < other.name
125
126    def set_case_status_by_name(self, name, status, reason=None):
127        tc = self.get_case_or_create(name)
128        tc.status = status
129        if reason:
130            tc.reason = reason
131        return tc
132
133    def add_testcase(self, name, freeform=False):
134        tc = TestCase(name=name)
135        tc.freeform = freeform
136        self.testcases.append(tc)
137        return tc
138
139    def get_case_by_name(self, name):
140        for c in self.testcases:
141            if c.name == name:
142                return c
143        return None
144
145    def get_case_or_create(self, name):
146        for c in self.testcases:
147            if c.name == name:
148                return c
149
150        logger.debug(f"Could not find a matching testcase for {name}")
151        tc = TestCase(name=name)
152        self.testcases.append(tc)
153        return tc
154
155    @staticmethod
156    def testsuite_runnable(testsuite, fixtures):
157        can_run = False
158        # console harness allows us to run the test and capture data.
159        if testsuite.harness in [ 'console', 'ztest', 'pytest', 'test', 'gtest', 'robot']:
160            can_run = True
161            # if we have a fixture that is also being supplied on the
162            # command-line, then we need to run the test, not just build it.
163            fixture = testsuite.harness_config.get('fixture')
164            if fixture:
165                can_run = fixture in fixtures
166
167        return can_run
168
169    def setup_handler(self, env):
170        if self.handler:
171            return
172
173        options = env.options
174        handler = Handler(self, "")
175        if options.device_testing:
176            handler = DeviceHandler(self, "device")
177            handler.call_make_run = False
178            handler.ready = True
179        elif self.platform.simulation != "na":
180            if self.platform.simulation == "qemu":
181                handler = QEMUHandler(self, "qemu")
182                handler.args.append(f"QEMU_PIPE={handler.get_fifo()}")
183                handler.ready = True
184            else:
185                handler = SimulationHandler(self, self.platform.simulation)
186
187            if self.platform.simulation_exec and shutil.which(self.platform.simulation_exec):
188                handler.ready = True
189        elif self.testsuite.type == "unit":
190            handler = BinaryHandler(self, "unit")
191            handler.binary = os.path.join(self.build_dir, "testbinary")
192            if options.enable_coverage:
193                handler.args.append("COVERAGE=1")
194            handler.call_make_run = False
195            handler.ready = True
196
197        if handler:
198            handler.options = options
199            handler.generator_cmd = env.generator_cmd
200            handler.generator = env.generator
201            handler.suite_name_check = not options.disable_suite_name_check
202        self.handler = handler
203
204    # Global testsuite parameters
205    def check_runnable(self, enable_slow=False, filter='buildable', fixtures=[], hardware_map=None):
206
207        # running on simulators is currently not supported on Windows
208        if os.name == 'nt' and self.platform.simulation != 'na':
209            return False
210
211        # we asked for build-only on the command line
212        if self.testsuite.build_only:
213            return False
214
215        # Do not run slow tests:
216        skip_slow = self.testsuite.slow and not enable_slow
217        if skip_slow:
218            return False
219
220        target_ready = bool(self.testsuite.type == "unit" or \
221                        self.platform.type == "native" or \
222                        (self.platform.simulation in SUPPORTED_SIMS and \
223                         self.platform.simulation not in self.testsuite.simulation_exclude) or \
224                        filter == 'runnable')
225
226        # check if test is runnable in pytest
227        if self.testsuite.harness == 'pytest':
228            target_ready = bool(filter == 'runnable' or self.platform.simulation in SUPPORTED_SIMS_IN_PYTEST)
229
230        SUPPORTED_SIMS_WITH_EXEC = ['nsim', 'mdb-nsim', 'renode', 'tsim', 'native']
231        if filter != 'runnable' and \
232                self.platform.simulation in SUPPORTED_SIMS_WITH_EXEC and \
233                self.platform.simulation_exec:
234            if not shutil.which(self.platform.simulation_exec):
235                target_ready = False
236
237        testsuite_runnable = self.testsuite_runnable(self.testsuite, fixtures)
238
239        if hardware_map:
240            for h in hardware_map.duts:
241                if (h.platform == self.platform.name and
242                        self.testsuite_runnable(self.testsuite, h.fixtures)):
243                    testsuite_runnable = True
244                    break
245
246        return testsuite_runnable and target_ready
247
248    def create_overlay(self, platform, enable_asan=False, enable_ubsan=False, enable_coverage=False, coverage_platform=[]):
249        # Create this in a "twister/" subdirectory otherwise this
250        # will pass this overlay to kconfig.py *twice* and kconfig.cmake
251        # will silently give that second time precedence over any
252        # --extra-args=CONFIG_*
253        subdir = os.path.join(self.build_dir, "twister")
254
255        content = ""
256
257        if self.testsuite.extra_configs:
258            new_config_list = []
259            # some configs might be conditional on arch or platform, see if we
260            # have a namespace defined and apply only if the namespace matches.
261            # we currently support both arch: and platform:
262            for config in self.testsuite.extra_configs:
263                cond_config = config.split(":")
264                if cond_config[0] == "arch" and len(cond_config) == 3:
265                    if self.platform.arch == cond_config[1]:
266                        new_config_list.append(cond_config[2])
267                elif cond_config[0] == "platform" and len(cond_config) == 3:
268                    if self.platform.name == cond_config[1]:
269                        new_config_list.append(cond_config[2])
270                else:
271                    new_config_list.append(config)
272
273            content = "\n".join(new_config_list)
274
275        if enable_coverage:
276            if platform.name in coverage_platform:
277                content = content + "\nCONFIG_COVERAGE=y"
278                content = content + "\nCONFIG_COVERAGE_DUMP=y"
279
280        if enable_asan:
281            if platform.type == "native":
282                content = content + "\nCONFIG_ASAN=y"
283
284        if enable_ubsan:
285            if platform.type == "native":
286                content = content + "\nCONFIG_UBSAN=y"
287
288        if content:
289            os.makedirs(subdir, exist_ok=True)
290            file = os.path.join(subdir, "testsuite_extra.conf")
291            with open(file, "w", encoding='utf-8') as f:
292                f.write(content)
293
294        return content
295
296    def calculate_sizes(self, from_buildlog: bool = False, generate_warning: bool = True) -> SizeCalculator:
297        """Get the RAM/ROM sizes of a test case.
298
299        This can only be run after the instance has been executed by
300        MakeGenerator, otherwise there won't be any binaries to measure.
301
302        @return A SizeCalculator object
303        """
304        elf_filepath = self.get_elf_file()
305        buildlog_filepath = self.get_buildlog_file() if from_buildlog else ''
306        return SizeCalculator(elf_filename=elf_filepath,
307                            extra_sections=self.testsuite.extra_sections,
308                            buildlog_filepath=buildlog_filepath,
309                            generate_warning=generate_warning)
310
311    def get_elf_file(self) -> str:
312
313        if self.testsuite.sysbuild:
314            build_dir = self.domains.get_default_domain().build_dir
315        else:
316            build_dir = self.build_dir
317
318        fns = glob.glob(os.path.join(build_dir, "zephyr", "*.elf"))
319        fns.extend(glob.glob(os.path.join(build_dir, "testbinary")))
320        blocklist = [
321                'remapped', # used for xtensa plaforms
322                'zefi', # EFI for Zephyr
323                'qemu', # elf files generated after running in qemu
324                '_pre']
325        fns = [x for x in fns if not any(bad in os.path.basename(x) for bad in blocklist)]
326        if not fns:
327            raise BuildError("Missing output binary")
328        elif len(fns) > 1:
329            logger.warning(f"multiple ELF files detected: {', '.join(fns)}")
330        return fns[0]
331
332    def get_buildlog_file(self) -> str:
333        """Get path to build.log file.
334
335        @raises BuildError: Incorrect amount (!=1) of build logs.
336        @return: Path to build.log (str).
337        """
338        buildlog_paths = glob.glob(os.path.join(self.build_dir, "build.log"))
339        if len(buildlog_paths) != 1:
340            raise BuildError("Missing/multiple build.log file.")
341        return buildlog_paths[0]
342
343    def __repr__(self):
344        return "<TestSuite %s on %s>" % (self.testsuite.name, self.platform.name)
345