1# vim: set syntax=python ts=4 : 2# 3# Copyright (c) 2018-2025 Intel Corporation 4# Copyright 2022 NXP 5# Copyright (c) 2024 Arm Limited (or its affiliates). All rights reserved. 6# 7# SPDX-License-Identifier: Apache-2.0 8from __future__ import annotations 9 10import csv 11import glob 12import hashlib 13import logging 14import os 15import random 16from enum import Enum 17 18from twisterlib.constants import ( 19 SUPPORTED_SIMS, 20 SUPPORTED_SIMS_IN_PYTEST, 21 SUPPORTED_SIMS_WITH_EXEC, 22) 23from twisterlib.environment import TwisterEnv 24from twisterlib.error import BuildError, StatusAttributeError 25from twisterlib.handlers import ( 26 BinaryHandler, 27 DeviceHandler, 28 Handler, 29 QEMUHandler, 30 QEMUWinHandler, 31 SimulationHandler, 32) 33from twisterlib.platform import Platform 34from twisterlib.size_calc import SizeCalculator 35from twisterlib.statuses import TwisterStatus 36from twisterlib.testsuite import TestCase, TestSuite 37 38logger = logging.getLogger('twister') 39logger.setLevel(logging.DEBUG) 40 41class TestInstance: 42 """Class representing the execution of a particular TestSuite on a platform 43 44 @param test The TestSuite object we want to build/execute 45 @param platform Platform object that we want to build and run against 46 @param base_outdir Base directory for all test results. The actual 47 out directory used is <outdir>/<platform>/<test case name> 48 """ 49 50 __test__ = False 51 52 def __init__(self, testsuite, platform, toolchain, outdir): 53 54 self.testsuite: TestSuite = testsuite 55 self.platform: Platform = platform 56 57 self._status = TwisterStatus.NONE 58 self.reason = "Unknown" 59 self.metrics = dict() 60 self.handler = None 61 self.recording = None 62 self.coverage = None 63 self.coverage_status = None 64 self.outdir = outdir 65 self.execution_time = 0 66 self.build_time = 0 67 self.retries = 0 68 self.toolchain = toolchain 69 70 self.name = os.path.join(platform.name, toolchain, testsuite.name) 71 self.dut = None 72 73 if testsuite.detailed_test_id: 74 self.build_dir = os.path.join( 75 outdir, platform.normalized_name, self.toolchain, testsuite.name 76 ) 77 else: 78 # if suite is not in zephyr, 79 # keep only the part after ".." in reconstructed dir structure 80 source_dir_rel = testsuite.source_dir_rel.rsplit(os.pardir+os.path.sep, 1)[-1] 81 self.build_dir = os.path.join( 82 outdir, 83 platform.normalized_name, 84 self.toolchain, 85 source_dir_rel, 86 testsuite.name 87 ) 88 self.run_id = None 89 self.domains = None 90 # Instance need to use sysbuild if a given suite or a platform requires it 91 self.sysbuild = testsuite.sysbuild or platform.sysbuild 92 93 self.run = False 94 self.testcases: list[TestCase] = [] 95 self.init_cases() 96 self.filters = [] 97 self.filter_type = None 98 99 def setup_run_id(self): 100 self.run_id = self._get_run_id() 101 102 def record(self, recording, fname_csv="recording.csv"): 103 if recording: 104 if self.recording is None: 105 self.recording = recording.copy() 106 else: 107 self.recording.extend(recording) 108 109 filename = os.path.join(self.build_dir, fname_csv) 110 fieldnames = set() 111 for r in self.recording: 112 fieldnames.update(r) 113 with open(filename, 'w') as csvfile: 114 cw = csv.DictWriter(csvfile, 115 fieldnames = sorted(list(fieldnames)), 116 lineterminator = os.linesep, 117 quoting = csv.QUOTE_NONNUMERIC) 118 cw.writeheader() 119 cw.writerows(self.recording) 120 121 @property 122 def status(self) -> TwisterStatus: 123 return self._status 124 125 @status.setter 126 def status(self, value : TwisterStatus) -> None: 127 # Check for illegal assignments by value 128 try: 129 key = value.name if isinstance(value, Enum) else value 130 self._status = TwisterStatus[key] 131 except KeyError as err: 132 raise StatusAttributeError(self.__class__, value) from err 133 134 def add_filter(self, reason, filter_type): 135 self.filters.append({'type': filter_type, 'reason': reason }) 136 self.status = TwisterStatus.FILTER 137 self.reason = reason 138 self.filter_type = filter_type 139 140 # Fix an issue with copying objects from testsuite, need better solution. 141 def init_cases(self): 142 for c in self.testsuite.testcases: 143 self.add_testcase(c.name, freeform=c.freeform) 144 145 def _get_run_id(self): 146 """ generate run id from instance unique identifier and a random 147 number 148 If exist, get cached run id from previous run.""" 149 run_id = "" 150 run_id_file = os.path.join(self.build_dir, "run_id.txt") 151 if os.path.exists(run_id_file): 152 with open(run_id_file) as fp: 153 run_id = fp.read() 154 else: 155 hash_object = hashlib.md5(self.name.encode(), usedforsecurity=False) 156 random_str = f"{random.getrandbits(64)}".encode() 157 hash_object.update(random_str) 158 run_id = hash_object.hexdigest() 159 os.makedirs(self.build_dir, exist_ok=True) 160 with open(run_id_file, 'w+') as fp: 161 fp.write(run_id) 162 return run_id 163 164 def add_missing_case_status(self, status, reason=None): 165 for case in self.testcases: 166 if case.status == TwisterStatus.STARTED: 167 case.status = TwisterStatus.FAIL 168 elif case.status == TwisterStatus.NONE: 169 case.status = status 170 if reason: 171 case.reason = reason 172 else: 173 case.reason = self.reason 174 175 def __getstate__(self): 176 d = self.__dict__.copy() 177 return d 178 179 def __setstate__(self, d): 180 self.__dict__.update(d) 181 182 def __lt__(self, other): 183 return self.name < other.name 184 185 def compose_case_name(self, tc_name) -> str: 186 return self.testsuite.compose_case_name(tc_name) 187 188 def set_case_status_by_name(self, name, status, reason=None): 189 tc = self.get_case_or_create(name) 190 tc.status = status 191 if reason: 192 tc.reason = reason 193 return tc 194 195 def add_testcase(self, name, freeform=False): 196 tc = TestCase(name=name) 197 tc.freeform = freeform 198 self.testcases.append(tc) 199 return tc 200 201 def get_case_by_name(self, name): 202 for c in self.testcases: 203 if c.name == name: 204 return c 205 return None 206 207 def get_case_or_create(self, name): 208 for c in self.testcases: 209 if c.name == name: 210 return c 211 212 logger.debug(f"Could not find a matching testcase for {name}") 213 tc = TestCase(name=name) 214 self.testcases.append(tc) 215 return tc 216 217 @staticmethod 218 def testsuite_runnable(testsuite, fixtures): 219 can_run = False 220 # console harness allows us to run the test and capture data. 221 if testsuite.harness in [ 222 'console', 223 'ztest', 224 'pytest', 225 'test', 226 'gtest', 227 'robot', 228 'ctest', 229 'shell' 230 ]: 231 can_run = True 232 # if we have a fixture that is also being supplied on the 233 # command-line, then we need to run the test, not just build it. 234 fixture = testsuite.harness_config.get('fixture') 235 if fixture: 236 can_run = fixture in map(lambda f: f.split(sep=':')[0], fixtures) 237 238 return can_run 239 240 def setup_handler(self, env: TwisterEnv): 241 # only setup once. 242 if self.handler: 243 return 244 245 options = env.options 246 common_args = (options, env.generator_cmd, not options.disable_suite_name_check) 247 simulator = self.platform.simulator_by_name(options.sim_name) 248 if options.device_testing: 249 handler = DeviceHandler(self, "device", *common_args) 250 handler.call_make_run = False 251 handler.ready = True 252 elif simulator: 253 if simulator.name == "qemu": 254 if os.name != "nt": 255 handler = QEMUHandler(self, "qemu", *common_args) 256 else: 257 handler = QEMUWinHandler(self, "qemu", *common_args) 258 handler.args.append(f"QEMU_PIPE={handler.get_fifo()}") 259 handler.ready = True 260 else: 261 handler = SimulationHandler(self, simulator.name, *common_args) 262 handler.ready = simulator.is_runnable() 263 264 elif self.testsuite.type == "unit": 265 handler = BinaryHandler(self, "unit", *common_args) 266 handler.binary = os.path.join(self.build_dir, "testbinary") 267 if options.enable_coverage: 268 handler.args.append("COVERAGE=1") 269 handler.call_make_run = False 270 handler.ready = True 271 else: 272 handler = Handler(self, "", *common_args) 273 if self.testsuite.harness == "ctest": 274 handler.ready = True 275 276 self.handler = handler 277 278 # Global testsuite parameters 279 def check_runnable(self, 280 options: TwisterEnv, 281 hardware_map=None): 282 283 enable_slow = options.enable_slow 284 filter = options.filter 285 fixtures = options.fixture 286 device_testing = options.device_testing 287 simulation = options.sim_name 288 289 simulator = self.platform.simulator_by_name(simulation) 290 if os.name == 'nt' and simulator: 291 # running on simulators is currently supported only for QEMU on Windows 292 if simulator.name not in ('na', 'qemu'): 293 return False 294 295 # check presence of QEMU on Windows 296 if simulator.name == 'qemu' and 'QEMU_BIN_PATH' not in os.environ: 297 return False 298 299 # we asked for build-only on the command line 300 if self.testsuite.build_only: 301 return False 302 303 # Do not run slow tests: 304 skip_slow = self.testsuite.slow and not enable_slow 305 if skip_slow: 306 return False 307 308 target_ready = bool(self.testsuite.type == "unit" or \ 309 self.platform.type == "native" or \ 310 self.testsuite.harness == "ctest" or \ 311 (simulator and simulator.name in SUPPORTED_SIMS and \ 312 simulator.name not in self.testsuite.simulation_exclude) or \ 313 device_testing) 314 315 # check if test is runnable in pytest 316 if self.testsuite.harness in ['pytest', 'shell']: 317 target_ready = bool( 318 filter == 'runnable' or simulator and simulator.name in SUPPORTED_SIMS_IN_PYTEST 319 ) 320 321 if filter != 'runnable' and \ 322 simulator and \ 323 simulator.name in SUPPORTED_SIMS_WITH_EXEC and \ 324 not simulator.is_runnable(): 325 target_ready = False 326 327 testsuite_runnable = self.testsuite_runnable(self.testsuite, fixtures) 328 329 if hardware_map: 330 for h in hardware_map.duts: 331 if (h.platform in self.platform.aliases and 332 self.testsuite_runnable(self.testsuite, h.fixtures)): 333 testsuite_runnable = True 334 break 335 336 return testsuite_runnable and target_ready 337 338 def create_overlay( 339 self, 340 platform, 341 enable_asan=False, 342 enable_ubsan=False, 343 enable_coverage=False, 344 coverage_platform=None 345 ): 346 if coverage_platform is None: 347 coverage_platform = [] 348 # Create this in a "twister/" subdirectory otherwise this 349 # will pass this overlay to kconfig.py *twice* and kconfig.cmake 350 # will silently give that second time precedence over any 351 # --extra-args=CONFIG_* 352 subdir = os.path.join(self.build_dir, "twister") 353 354 content = "" 355 356 if self.testsuite.extra_configs: 357 new_config_list = [] 358 # some configs might be conditional on arch or platform, see if we 359 # have a namespace defined and apply only if the namespace matches. 360 # we currently support both arch: and platform: 361 for config in self.testsuite.extra_configs: 362 cond_config = config.split(":") 363 if cond_config[0] == "arch" and len(cond_config) == 3: 364 if self.platform.arch == cond_config[1]: 365 new_config_list.append(cond_config[2]) 366 elif cond_config[0] == "platform" and len(cond_config) == 3: 367 if self.platform.name == cond_config[1]: 368 new_config_list.append(cond_config[2]) 369 else: 370 new_config_list.append(config) 371 372 content = "\n".join(new_config_list) 373 374 if enable_coverage: 375 for cp in coverage_platform: 376 if cp in platform.aliases: 377 content = content + "\nCONFIG_COVERAGE=y" 378 content = content + "\nCONFIG_COVERAGE_DUMP=y" 379 380 if platform.type == "native": 381 if enable_asan: 382 content = content + "\nCONFIG_ASAN=y" 383 if enable_ubsan: 384 content = content + "\nCONFIG_UBSAN=y" 385 386 if content: 387 os.makedirs(subdir, exist_ok=True) 388 file = os.path.join(subdir, "testsuite_extra.conf") 389 with open(file, "w", encoding='utf-8') as f: 390 f.write(content) 391 392 return content 393 394 def calculate_sizes( 395 self, 396 from_buildlog: bool = False, 397 generate_warning: bool = True 398 ) -> SizeCalculator: 399 """Get the RAM/ROM sizes of a test case. 400 401 This can only be run after the instance has been executed by 402 MakeGenerator, otherwise there won't be any binaries to measure. 403 404 @return A SizeCalculator object 405 """ 406 elf_filepath = self.get_elf_file() 407 buildlog_filepath = self.get_buildlog_file() if from_buildlog else '' 408 return SizeCalculator(elf_filename=elf_filepath, 409 extra_sections=self.testsuite.extra_sections, 410 buildlog_filepath=buildlog_filepath, 411 generate_warning=generate_warning) 412 413 def get_elf_file(self) -> str: 414 415 if self.sysbuild: 416 build_dir = self.domains.get_default_domain().build_dir 417 else: 418 build_dir = self.build_dir 419 420 fns = glob.glob(os.path.join(build_dir, "zephyr", "*.elf")) 421 fns.extend(glob.glob(os.path.join(build_dir, "testbinary"))) 422 blocklist = [ 423 'remapped', # used for xtensa plaforms 424 'zefi', # EFI for Zephyr 425 'qemu', # elf files generated after running in qemu 426 '_pre'] 427 fns = [x for x in fns if not any(bad in os.path.basename(x) for bad in blocklist)] 428 if not fns: 429 raise BuildError("Missing output binary") 430 elif len(fns) > 1: 431 logger.warning(f"multiple ELF files detected: {', '.join(fns)}") 432 return fns[0] 433 434 def get_buildlog_file(self) -> str: 435 """Get path to build.log file. 436 437 @raises BuildError: Incorrect amount (!=1) of build logs. 438 @return: Path to build.log (str). 439 """ 440 buildlog_paths = glob.glob(os.path.join(self.build_dir, "build.log")) 441 if len(buildlog_paths) != 1: 442 raise BuildError("Missing/multiple build.log file.") 443 return buildlog_paths[0] 444 445 def __repr__(self): 446 return f"<TestSuite {self.testsuite.name} on {self.platform.name}>" 447