1# vim: set syntax=python ts=4 : 2# 3# Copyright (c) 2018-2024 Intel Corporation 4# Copyright 2022 NXP 5# Copyright (c) 2024 Arm Limited (or its affiliates). All rights reserved. 6# 7# SPDX-License-Identifier: Apache-2.0 8from __future__ import annotations 9 10import csv 11import glob 12import hashlib 13import logging 14import os 15import random 16from enum import Enum 17 18from twisterlib.constants import ( 19 SUPPORTED_SIMS, 20 SUPPORTED_SIMS_IN_PYTEST, 21 SUPPORTED_SIMS_WITH_EXEC, 22) 23from twisterlib.environment import TwisterEnv 24from twisterlib.error import BuildError, StatusAttributeError 25from twisterlib.handlers import ( 26 BinaryHandler, 27 DeviceHandler, 28 Handler, 29 QEMUHandler, 30 QEMUWinHandler, 31 SimulationHandler, 32) 33from twisterlib.platform import Platform 34from twisterlib.size_calc import SizeCalculator 35from twisterlib.statuses import TwisterStatus 36from twisterlib.testsuite import TestCase, TestSuite 37 38logger = logging.getLogger('twister') 39logger.setLevel(logging.DEBUG) 40 41class TestInstance: 42 """Class representing the execution of a particular TestSuite on a platform 43 44 @param test The TestSuite object we want to build/execute 45 @param platform Platform object that we want to build and run against 46 @param base_outdir Base directory for all test results. The actual 47 out directory used is <outdir>/<platform>/<test case name> 48 """ 49 50 __test__ = False 51 52 def __init__(self, testsuite, platform, outdir): 53 54 self.testsuite: TestSuite = testsuite 55 self.platform: Platform = platform 56 57 self._status = TwisterStatus.NONE 58 self.reason = "Unknown" 59 self.metrics = dict() 60 self.handler = None 61 self.recording = None 62 self.outdir = outdir 63 self.execution_time = 0 64 self.build_time = 0 65 self.retries = 0 66 67 self.name = os.path.join(platform.name, testsuite.name) 68 self.dut = None 69 70 if testsuite.detailed_test_id: 71 self.build_dir = os.path.join(outdir, platform.normalized_name, testsuite.name) 72 else: 73 # if suite is not in zephyr, 74 # keep only the part after ".." in reconstructed dir structure 75 source_dir_rel = testsuite.source_dir_rel.rsplit(os.pardir+os.path.sep, 1)[-1] 76 self.build_dir = os.path.join( 77 outdir, 78 platform.normalized_name, 79 source_dir_rel, 80 testsuite.name 81 ) 82 self.run_id = None 83 self.domains = None 84 # Instance need to use sysbuild if a given suite or a platform requires it 85 self.sysbuild = testsuite.sysbuild or platform.sysbuild 86 87 self.run = False 88 self.testcases: list[TestCase] = [] 89 self.init_cases() 90 self.filters = [] 91 self.filter_type = None 92 93 def setup_run_id(self): 94 self.run_id = self._get_run_id() 95 96 def record(self, recording, fname_csv="recording.csv"): 97 if recording: 98 if self.recording is None: 99 self.recording = recording.copy() 100 else: 101 self.recording.extend(recording) 102 103 filename = os.path.join(self.build_dir, fname_csv) 104 with open(filename, 'w') as csvfile: 105 cw = csv.DictWriter(csvfile, 106 fieldnames = self.recording[0].keys(), 107 lineterminator = os.linesep, 108 quoting = csv.QUOTE_NONNUMERIC) 109 cw.writeheader() 110 cw.writerows(self.recording) 111 112 @property 113 def status(self) -> TwisterStatus: 114 return self._status 115 116 @status.setter 117 def status(self, value : TwisterStatus) -> None: 118 # Check for illegal assignments by value 119 try: 120 key = value.name if isinstance(value, Enum) else value 121 self._status = TwisterStatus[key] 122 except KeyError as err: 123 raise StatusAttributeError(self.__class__, value) from err 124 125 def add_filter(self, reason, filter_type): 126 self.filters.append({'type': filter_type, 'reason': reason }) 127 self.status = TwisterStatus.FILTER 128 self.reason = reason 129 self.filter_type = filter_type 130 131 # Fix an issue with copying objects from testsuite, need better solution. 132 def init_cases(self): 133 for c in self.testsuite.testcases: 134 self.add_testcase(c.name, freeform=c.freeform) 135 136 def _get_run_id(self): 137 """ generate run id from instance unique identifier and a random 138 number 139 If exist, get cached run id from previous run.""" 140 run_id = "" 141 run_id_file = os.path.join(self.build_dir, "run_id.txt") 142 if os.path.exists(run_id_file): 143 with open(run_id_file) as fp: 144 run_id = fp.read() 145 else: 146 hash_object = hashlib.md5(self.name.encode()) 147 random_str = f"{random.getrandbits(64)}".encode() 148 hash_object.update(random_str) 149 run_id = hash_object.hexdigest() 150 os.makedirs(self.build_dir, exist_ok=True) 151 with open(run_id_file, 'w+') as fp: 152 fp.write(run_id) 153 return run_id 154 155 def add_missing_case_status(self, status, reason=None): 156 for case in self.testcases: 157 if case.status == TwisterStatus.STARTED: 158 case.status = TwisterStatus.FAIL 159 elif case.status == TwisterStatus.NONE: 160 case.status = status 161 if reason: 162 case.reason = reason 163 else: 164 case.reason = self.reason 165 166 def __getstate__(self): 167 d = self.__dict__.copy() 168 return d 169 170 def __setstate__(self, d): 171 self.__dict__.update(d) 172 173 def __lt__(self, other): 174 return self.name < other.name 175 176 def compose_case_name(self, tc_name) -> str: 177 return self.testsuite.compose_case_name(tc_name) 178 179 def set_case_status_by_name(self, name, status, reason=None): 180 tc = self.get_case_or_create(name) 181 tc.status = status 182 if reason: 183 tc.reason = reason 184 return tc 185 186 def add_testcase(self, name, freeform=False): 187 tc = TestCase(name=name) 188 tc.freeform = freeform 189 self.testcases.append(tc) 190 return tc 191 192 def get_case_by_name(self, name): 193 for c in self.testcases: 194 if c.name == name: 195 return c 196 return None 197 198 def get_case_or_create(self, name): 199 for c in self.testcases: 200 if c.name == name: 201 return c 202 203 logger.debug(f"Could not find a matching testcase for {name}") 204 tc = TestCase(name=name) 205 self.testcases.append(tc) 206 return tc 207 208 @staticmethod 209 def testsuite_runnable(testsuite, fixtures): 210 can_run = False 211 # console harness allows us to run the test and capture data. 212 if testsuite.harness in [ 'console', 'ztest', 'pytest', 'test', 'gtest', 'robot']: 213 can_run = True 214 # if we have a fixture that is also being supplied on the 215 # command-line, then we need to run the test, not just build it. 216 fixture = testsuite.harness_config.get('fixture') 217 if fixture: 218 can_run = fixture in map(lambda f: f.split(sep=':')[0], fixtures) 219 220 return can_run 221 222 def setup_handler(self, env: TwisterEnv): 223 # only setup once. 224 if self.handler: 225 return 226 227 options = env.options 228 common_args = (options, env.generator_cmd, not options.disable_suite_name_check) 229 simulator = self.platform.simulator_by_name(options.sim_name) 230 if options.device_testing: 231 handler = DeviceHandler(self, "device", *common_args) 232 handler.call_make_run = False 233 handler.ready = True 234 elif simulator: 235 if simulator.name == "qemu": 236 if os.name != "nt": 237 handler = QEMUHandler(self, "qemu", *common_args) 238 else: 239 handler = QEMUWinHandler(self, "qemu", *common_args) 240 handler.args.append(f"QEMU_PIPE={handler.get_fifo()}") 241 handler.ready = True 242 else: 243 handler = SimulationHandler(self, simulator.name, *common_args) 244 handler.ready = simulator.is_runnable() 245 246 elif self.testsuite.type == "unit": 247 handler = BinaryHandler(self, "unit", *common_args) 248 handler.binary = os.path.join(self.build_dir, "testbinary") 249 if options.enable_coverage: 250 handler.args.append("COVERAGE=1") 251 handler.call_make_run = False 252 handler.ready = True 253 else: 254 handler = Handler(self, "", *common_args) 255 256 self.handler = handler 257 258 # Global testsuite parameters 259 def check_runnable(self, 260 options: TwisterEnv, 261 hardware_map=None): 262 263 enable_slow = options.enable_slow 264 filter = options.filter 265 fixtures = options.fixture 266 device_testing = options.device_testing 267 simulation = options.sim_name 268 269 simulator = self.platform.simulator_by_name(simulation) 270 if os.name == 'nt' and simulator: 271 # running on simulators is currently supported only for QEMU on Windows 272 if simulator.name not in ('na', 'qemu'): 273 return False 274 275 # check presence of QEMU on Windows 276 if simulator.name == 'qemu' and 'QEMU_BIN_PATH' not in os.environ: 277 return False 278 279 # we asked for build-only on the command line 280 if self.testsuite.build_only: 281 return False 282 283 # Do not run slow tests: 284 skip_slow = self.testsuite.slow and not enable_slow 285 if skip_slow: 286 return False 287 288 target_ready = bool(self.testsuite.type == "unit" or \ 289 self.platform.type == "native" or \ 290 (simulator and simulator.name in SUPPORTED_SIMS and \ 291 simulator.name not in self.testsuite.simulation_exclude) or \ 292 device_testing) 293 294 # check if test is runnable in pytest 295 if self.testsuite.harness == 'pytest': 296 target_ready = bool( 297 filter == 'runnable' or simulator and simulator.name in SUPPORTED_SIMS_IN_PYTEST 298 ) 299 300 if filter != 'runnable' and \ 301 simulator and \ 302 simulator.name in SUPPORTED_SIMS_WITH_EXEC and \ 303 not simulator.is_runnable(): 304 target_ready = False 305 306 testsuite_runnable = self.testsuite_runnable(self.testsuite, fixtures) 307 308 if hardware_map: 309 for h in hardware_map.duts: 310 if (h.platform == self.platform.name and 311 self.testsuite_runnable(self.testsuite, h.fixtures)): 312 testsuite_runnable = True 313 break 314 315 return testsuite_runnable and target_ready 316 317 def create_overlay( 318 self, 319 platform, 320 enable_asan=False, 321 enable_ubsan=False, 322 enable_coverage=False, 323 coverage_platform=None 324 ): 325 if coverage_platform is None: 326 coverage_platform = [] 327 # Create this in a "twister/" subdirectory otherwise this 328 # will pass this overlay to kconfig.py *twice* and kconfig.cmake 329 # will silently give that second time precedence over any 330 # --extra-args=CONFIG_* 331 subdir = os.path.join(self.build_dir, "twister") 332 333 content = "" 334 335 if self.testsuite.extra_configs: 336 new_config_list = [] 337 # some configs might be conditional on arch or platform, see if we 338 # have a namespace defined and apply only if the namespace matches. 339 # we currently support both arch: and platform: 340 for config in self.testsuite.extra_configs: 341 cond_config = config.split(":") 342 if cond_config[0] == "arch" and len(cond_config) == 3: 343 if self.platform.arch == cond_config[1]: 344 new_config_list.append(cond_config[2]) 345 elif cond_config[0] == "platform" and len(cond_config) == 3: 346 if self.platform.name == cond_config[1]: 347 new_config_list.append(cond_config[2]) 348 else: 349 new_config_list.append(config) 350 351 content = "\n".join(new_config_list) 352 353 if enable_coverage: 354 for cp in coverage_platform: 355 if cp in platform.aliases: 356 content = content + "\nCONFIG_COVERAGE=y" 357 content = content + "\nCONFIG_COVERAGE_DUMP=y" 358 359 if platform.type == "native": 360 if enable_asan: 361 content = content + "\nCONFIG_ASAN=y" 362 if enable_ubsan: 363 content = content + "\nCONFIG_UBSAN=y" 364 365 if content: 366 os.makedirs(subdir, exist_ok=True) 367 file = os.path.join(subdir, "testsuite_extra.conf") 368 with open(file, "w", encoding='utf-8') as f: 369 f.write(content) 370 371 return content 372 373 def calculate_sizes( 374 self, 375 from_buildlog: bool = False, 376 generate_warning: bool = True 377 ) -> SizeCalculator: 378 """Get the RAM/ROM sizes of a test case. 379 380 This can only be run after the instance has been executed by 381 MakeGenerator, otherwise there won't be any binaries to measure. 382 383 @return A SizeCalculator object 384 """ 385 elf_filepath = self.get_elf_file() 386 buildlog_filepath = self.get_buildlog_file() if from_buildlog else '' 387 return SizeCalculator(elf_filename=elf_filepath, 388 extra_sections=self.testsuite.extra_sections, 389 buildlog_filepath=buildlog_filepath, 390 generate_warning=generate_warning) 391 392 def get_elf_file(self) -> str: 393 394 if self.sysbuild: 395 build_dir = self.domains.get_default_domain().build_dir 396 else: 397 build_dir = self.build_dir 398 399 fns = glob.glob(os.path.join(build_dir, "zephyr", "*.elf")) 400 fns.extend(glob.glob(os.path.join(build_dir, "testbinary"))) 401 blocklist = [ 402 'remapped', # used for xtensa plaforms 403 'zefi', # EFI for Zephyr 404 'qemu', # elf files generated after running in qemu 405 '_pre'] 406 fns = [x for x in fns if not any(bad in os.path.basename(x) for bad in blocklist)] 407 if not fns: 408 raise BuildError("Missing output binary") 409 elif len(fns) > 1: 410 logger.warning(f"multiple ELF files detected: {', '.join(fns)}") 411 return fns[0] 412 413 def get_buildlog_file(self) -> str: 414 """Get path to build.log file. 415 416 @raises BuildError: Incorrect amount (!=1) of build logs. 417 @return: Path to build.log (str). 418 """ 419 buildlog_paths = glob.glob(os.path.join(self.build_dir, "build.log")) 420 if len(buildlog_paths) != 1: 421 raise BuildError("Missing/multiple build.log file.") 422 return buildlog_paths[0] 423 424 def __repr__(self): 425 return f"<TestSuite {self.testsuite.name} on {self.platform.name}>" 426