1# vim: set syntax=python ts=4 :
2#
3# Copyright (c) 2018-2022 Intel Corporation
4# Copyright 2022 NXP
5# SPDX-License-Identifier: Apache-2.0
6from __future__ import annotations
7import os
8import hashlib
9import random
10import logging
11import shutil
12import glob
13import csv
14
15from twisterlib.testsuite import TestCase, TestSuite
16from twisterlib.platform import Platform
17from twisterlib.error import BuildError
18from twisterlib.size_calc import SizeCalculator
19from twisterlib.handlers import (
20    Handler,
21    SimulationHandler,
22    BinaryHandler,
23    QEMUHandler,
24    QEMUWinHandler,
25    DeviceHandler,
26    SUPPORTED_SIMS,
27    SUPPORTED_SIMS_IN_PYTEST,
28)
29
30logger = logging.getLogger('twister')
31logger.setLevel(logging.DEBUG)
32
33class TestInstance:
34    """Class representing the execution of a particular TestSuite on a platform
35
36    @param test The TestSuite object we want to build/execute
37    @param platform Platform object that we want to build and run against
38    @param base_outdir Base directory for all test results. The actual
39        out directory used is <outdir>/<platform>/<test case name>
40    """
41
42    __test__ = False
43
44    def __init__(self, testsuite, platform, outdir):
45
46        self.testsuite: TestSuite = testsuite
47        self.platform: Platform = platform
48
49        self.status = None
50        self.reason = "Unknown"
51        self.metrics = dict()
52        self.handler = None
53        self.recording = None
54        self.outdir = outdir
55        self.execution_time = 0
56        self.build_time = 0
57        self.retries = 0
58
59        self.name = os.path.join(platform.name, testsuite.name)
60        self.dut = None
61
62        if testsuite.detailed_test_id:
63            self.build_dir = os.path.join(outdir, platform.normalized_name, testsuite.name)
64        else:
65            # if suite is not in zephyr, keep only the part after ".." in reconstructed dir structure
66            source_dir_rel = testsuite.source_dir_rel.rsplit(os.pardir+os.path.sep, 1)[-1]
67            self.build_dir = os.path.join(outdir, platform.normalized_name, source_dir_rel, testsuite.name)
68        self.run_id = self._get_run_id()
69        self.domains = None
70        # Instance need to use sysbuild if a given suite or a platform requires it
71        self.sysbuild = testsuite.sysbuild or platform.sysbuild
72
73        self.run = False
74        self.testcases: list[TestCase] = []
75        self.init_cases()
76        self.filters = []
77        self.filter_type = None
78
79    def record(self, recording, fname_csv="recording.csv"):
80        if recording:
81            if self.recording is None:
82                self.recording = recording.copy()
83            else:
84                self.recording.extend(recording)
85
86            filename = os.path.join(self.build_dir, fname_csv)
87            with open(filename, "wt") as csvfile:
88                cw = csv.DictWriter(csvfile,
89                                    fieldnames = self.recording[0].keys(),
90                                    lineterminator = os.linesep,
91                                    quoting = csv.QUOTE_NONNUMERIC)
92                cw.writeheader()
93                cw.writerows(self.recording)
94
95    def add_filter(self, reason, filter_type):
96        self.filters.append({'type': filter_type, 'reason': reason })
97        self.status = "filtered"
98        self.reason = reason
99        self.filter_type = filter_type
100
101    # Fix an issue with copying objects from testsuite, need better solution.
102    def init_cases(self):
103        for c in self.testsuite.testcases:
104            self.add_testcase(c.name, freeform=c.freeform)
105
106    def _get_run_id(self):
107        """ generate run id from instance unique identifier and a random
108        number
109        If exist, get cached run id from previous run."""
110        run_id = ""
111        run_id_file = os.path.join(self.build_dir, "run_id.txt")
112        if os.path.exists(run_id_file):
113            with open(run_id_file, "r") as fp:
114                run_id = fp.read()
115        else:
116            hash_object = hashlib.md5(self.name.encode())
117            random_str = f"{random.getrandbits(64)}".encode()
118            hash_object.update(random_str)
119            run_id = hash_object.hexdigest()
120            os.makedirs(self.build_dir, exist_ok=True)
121            with open(run_id_file, 'w+') as fp:
122                fp.write(run_id)
123        return run_id
124
125    def add_missing_case_status(self, status, reason=None):
126        for case in self.testcases:
127            if case.status == 'started':
128                case.status = "failed"
129            elif not case.status:
130                case.status = status
131                if reason:
132                    case.reason = reason
133                else:
134                    case.reason = self.reason
135
136    def __getstate__(self):
137        d = self.__dict__.copy()
138        return d
139
140    def __setstate__(self, d):
141        self.__dict__.update(d)
142
143    def __lt__(self, other):
144        return self.name < other.name
145
146    def set_case_status_by_name(self, name, status, reason=None):
147        tc = self.get_case_or_create(name)
148        tc.status = status
149        if reason:
150            tc.reason = reason
151        return tc
152
153    def add_testcase(self, name, freeform=False):
154        tc = TestCase(name=name)
155        tc.freeform = freeform
156        self.testcases.append(tc)
157        return tc
158
159    def get_case_by_name(self, name):
160        for c in self.testcases:
161            if c.name == name:
162                return c
163        return None
164
165    def get_case_or_create(self, name):
166        for c in self.testcases:
167            if c.name == name:
168                return c
169
170        logger.debug(f"Could not find a matching testcase for {name}")
171        tc = TestCase(name=name)
172        self.testcases.append(tc)
173        return tc
174
175    @staticmethod
176    def testsuite_runnable(testsuite, fixtures):
177        can_run = False
178        # console harness allows us to run the test and capture data.
179        if testsuite.harness in [ 'console', 'ztest', 'pytest', 'test', 'gtest', 'robot']:
180            can_run = True
181            # if we have a fixture that is also being supplied on the
182            # command-line, then we need to run the test, not just build it.
183            fixture = testsuite.harness_config.get('fixture')
184            if fixture:
185                can_run = fixture in map(lambda f: f.split(sep=':')[0], fixtures)
186
187        return can_run
188
189    def setup_handler(self, env):
190        if self.handler:
191            return
192
193        options = env.options
194        handler = Handler(self, "")
195        if options.device_testing:
196            handler = DeviceHandler(self, "device")
197            handler.call_make_run = False
198            handler.ready = True
199        elif self.platform.simulation != "na":
200            if self.platform.simulation == "qemu":
201                if os.name != "nt":
202                    handler = QEMUHandler(self, "qemu")
203                else:
204                    handler = QEMUWinHandler(self, "qemu")
205                handler.args.append(f"QEMU_PIPE={handler.get_fifo()}")
206                handler.ready = True
207            else:
208                handler = SimulationHandler(self, self.platform.simulation)
209
210            if self.platform.simulation_exec and shutil.which(self.platform.simulation_exec):
211                handler.ready = True
212        elif self.testsuite.type == "unit":
213            handler = BinaryHandler(self, "unit")
214            handler.binary = os.path.join(self.build_dir, "testbinary")
215            if options.enable_coverage:
216                handler.args.append("COVERAGE=1")
217            handler.call_make_run = False
218            handler.ready = True
219
220        if handler:
221            handler.options = options
222            handler.generator_cmd = env.generator_cmd
223            handler.suite_name_check = not options.disable_suite_name_check
224        self.handler = handler
225
226    # Global testsuite parameters
227    def check_runnable(self, enable_slow=False, filter='buildable', fixtures=[], hardware_map=None):
228
229        if os.name == 'nt':
230            # running on simulators is currently supported only for QEMU on Windows
231            if self.platform.simulation not in ('na', 'qemu'):
232                return False
233
234            # check presence of QEMU on Windows
235            if self.platform.simulation == 'qemu' and 'QEMU_BIN_PATH' not in os.environ:
236                return False
237
238        # we asked for build-only on the command line
239        if self.testsuite.build_only:
240            return False
241
242        # Do not run slow tests:
243        skip_slow = self.testsuite.slow and not enable_slow
244        if skip_slow:
245            return False
246
247        target_ready = bool(self.testsuite.type == "unit" or \
248                        self.platform.type == "native" or \
249                        (self.platform.simulation in SUPPORTED_SIMS and \
250                         self.platform.simulation not in self.testsuite.simulation_exclude) or \
251                        filter == 'runnable')
252
253        # check if test is runnable in pytest
254        if self.testsuite.harness == 'pytest':
255            target_ready = bool(filter == 'runnable' or self.platform.simulation in SUPPORTED_SIMS_IN_PYTEST)
256
257        SUPPORTED_SIMS_WITH_EXEC = ['nsim', 'mdb-nsim', 'renode', 'tsim', 'native']
258        if filter != 'runnable' and \
259                self.platform.simulation in SUPPORTED_SIMS_WITH_EXEC and \
260                self.platform.simulation_exec:
261            if not shutil.which(self.platform.simulation_exec):
262                target_ready = False
263
264        testsuite_runnable = self.testsuite_runnable(self.testsuite, fixtures)
265
266        if hardware_map:
267            for h in hardware_map.duts:
268                if (h.platform == self.platform.name and
269                        self.testsuite_runnable(self.testsuite, h.fixtures)):
270                    testsuite_runnable = True
271                    break
272
273        return testsuite_runnable and target_ready
274
275    def create_overlay(self, platform, enable_asan=False, enable_ubsan=False, enable_coverage=False, coverage_platform=[]):
276        # Create this in a "twister/" subdirectory otherwise this
277        # will pass this overlay to kconfig.py *twice* and kconfig.cmake
278        # will silently give that second time precedence over any
279        # --extra-args=CONFIG_*
280        subdir = os.path.join(self.build_dir, "twister")
281
282        content = ""
283
284        if self.testsuite.extra_configs:
285            new_config_list = []
286            # some configs might be conditional on arch or platform, see if we
287            # have a namespace defined and apply only if the namespace matches.
288            # we currently support both arch: and platform:
289            for config in self.testsuite.extra_configs:
290                cond_config = config.split(":")
291                if cond_config[0] == "arch" and len(cond_config) == 3:
292                    if self.platform.arch == cond_config[1]:
293                        new_config_list.append(cond_config[2])
294                elif cond_config[0] == "platform" and len(cond_config) == 3:
295                    if self.platform.name == cond_config[1]:
296                        new_config_list.append(cond_config[2])
297                else:
298                    new_config_list.append(config)
299
300            content = "\n".join(new_config_list)
301
302        if enable_coverage:
303            if platform.name in coverage_platform:
304                content = content + "\nCONFIG_COVERAGE=y"
305                content = content + "\nCONFIG_COVERAGE_DUMP=y"
306
307        if enable_asan:
308            if platform.type == "native":
309                content = content + "\nCONFIG_ASAN=y"
310
311        if enable_ubsan:
312            if platform.type == "native":
313                content = content + "\nCONFIG_UBSAN=y"
314
315        if content:
316            os.makedirs(subdir, exist_ok=True)
317            file = os.path.join(subdir, "testsuite_extra.conf")
318            with open(file, "w", encoding='utf-8') as f:
319                f.write(content)
320
321        return content
322
323    def calculate_sizes(self, from_buildlog: bool = False, generate_warning: bool = True) -> SizeCalculator:
324        """Get the RAM/ROM sizes of a test case.
325
326        This can only be run after the instance has been executed by
327        MakeGenerator, otherwise there won't be any binaries to measure.
328
329        @return A SizeCalculator object
330        """
331        elf_filepath = self.get_elf_file()
332        buildlog_filepath = self.get_buildlog_file() if from_buildlog else ''
333        return SizeCalculator(elf_filename=elf_filepath,
334                            extra_sections=self.testsuite.extra_sections,
335                            buildlog_filepath=buildlog_filepath,
336                            generate_warning=generate_warning)
337
338    def get_elf_file(self) -> str:
339
340        if self.sysbuild:
341            build_dir = self.domains.get_default_domain().build_dir
342        else:
343            build_dir = self.build_dir
344
345        fns = glob.glob(os.path.join(build_dir, "zephyr", "*.elf"))
346        fns.extend(glob.glob(os.path.join(build_dir, "testbinary")))
347        blocklist = [
348                'remapped', # used for xtensa plaforms
349                'zefi', # EFI for Zephyr
350                'qemu', # elf files generated after running in qemu
351                '_pre']
352        fns = [x for x in fns if not any(bad in os.path.basename(x) for bad in blocklist)]
353        if not fns:
354            raise BuildError("Missing output binary")
355        elif len(fns) > 1:
356            logger.warning(f"multiple ELF files detected: {', '.join(fns)}")
357        return fns[0]
358
359    def get_buildlog_file(self) -> str:
360        """Get path to build.log file.
361
362        @raises BuildError: Incorrect amount (!=1) of build logs.
363        @return: Path to build.log (str).
364        """
365        buildlog_paths = glob.glob(os.path.join(self.build_dir, "build.log"))
366        if len(buildlog_paths) != 1:
367            raise BuildError("Missing/multiple build.log file.")
368        return buildlog_paths[0]
369
370    def __repr__(self):
371        return "<TestSuite %s on %s>" % (self.testsuite.name, self.platform.name)
372