1# vim: set syntax=python ts=4 : 2# 3# Copyright (c) 2018-2022 Intel Corporation 4# Copyright 2022 NXP 5# SPDX-License-Identifier: Apache-2.0 6from __future__ import annotations 7import os 8import hashlib 9import random 10import logging 11import shutil 12import glob 13import csv 14 15from twisterlib.testsuite import TestCase, TestSuite 16from twisterlib.platform import Platform 17from twisterlib.error import BuildError 18from twisterlib.size_calc import SizeCalculator 19from twisterlib.handlers import ( 20 Handler, 21 SimulationHandler, 22 BinaryHandler, 23 QEMUHandler, 24 QEMUWinHandler, 25 DeviceHandler, 26 SUPPORTED_SIMS, 27 SUPPORTED_SIMS_IN_PYTEST, 28) 29 30logger = logging.getLogger('twister') 31logger.setLevel(logging.DEBUG) 32 33class TestInstance: 34 """Class representing the execution of a particular TestSuite on a platform 35 36 @param test The TestSuite object we want to build/execute 37 @param platform Platform object that we want to build and run against 38 @param base_outdir Base directory for all test results. The actual 39 out directory used is <outdir>/<platform>/<test case name> 40 """ 41 42 __test__ = False 43 44 def __init__(self, testsuite, platform, outdir): 45 46 self.testsuite: TestSuite = testsuite 47 self.platform: Platform = platform 48 49 self.status = None 50 self.reason = "Unknown" 51 self.metrics = dict() 52 self.handler = None 53 self.recording = None 54 self.outdir = outdir 55 self.execution_time = 0 56 self.build_time = 0 57 self.retries = 0 58 59 self.name = os.path.join(platform.name, testsuite.name) 60 self.dut = None 61 62 if testsuite.detailed_test_id: 63 self.build_dir = os.path.join(outdir, platform.normalized_name, testsuite.name) 64 else: 65 # if suite is not in zephyr, keep only the part after ".." in reconstructed dir structure 66 source_dir_rel = testsuite.source_dir_rel.rsplit(os.pardir+os.path.sep, 1)[-1] 67 self.build_dir = os.path.join(outdir, platform.normalized_name, source_dir_rel, testsuite.name) 68 self.run_id = self._get_run_id() 69 self.domains = None 70 # Instance need to use sysbuild if a given suite or a platform requires it 71 self.sysbuild = testsuite.sysbuild or platform.sysbuild 72 73 self.run = False 74 self.testcases: list[TestCase] = [] 75 self.init_cases() 76 self.filters = [] 77 self.filter_type = None 78 79 def record(self, recording, fname_csv="recording.csv"): 80 if recording: 81 if self.recording is None: 82 self.recording = recording.copy() 83 else: 84 self.recording.extend(recording) 85 86 filename = os.path.join(self.build_dir, fname_csv) 87 with open(filename, "wt") as csvfile: 88 cw = csv.DictWriter(csvfile, 89 fieldnames = self.recording[0].keys(), 90 lineterminator = os.linesep, 91 quoting = csv.QUOTE_NONNUMERIC) 92 cw.writeheader() 93 cw.writerows(self.recording) 94 95 def add_filter(self, reason, filter_type): 96 self.filters.append({'type': filter_type, 'reason': reason }) 97 self.status = "filtered" 98 self.reason = reason 99 self.filter_type = filter_type 100 101 # Fix an issue with copying objects from testsuite, need better solution. 102 def init_cases(self): 103 for c in self.testsuite.testcases: 104 self.add_testcase(c.name, freeform=c.freeform) 105 106 def _get_run_id(self): 107 """ generate run id from instance unique identifier and a random 108 number 109 If exist, get cached run id from previous run.""" 110 run_id = "" 111 run_id_file = os.path.join(self.build_dir, "run_id.txt") 112 if os.path.exists(run_id_file): 113 with open(run_id_file, "r") as fp: 114 run_id = fp.read() 115 else: 116 hash_object = hashlib.md5(self.name.encode()) 117 random_str = f"{random.getrandbits(64)}".encode() 118 hash_object.update(random_str) 119 run_id = hash_object.hexdigest() 120 os.makedirs(self.build_dir, exist_ok=True) 121 with open(run_id_file, 'w+') as fp: 122 fp.write(run_id) 123 return run_id 124 125 def add_missing_case_status(self, status, reason=None): 126 for case in self.testcases: 127 if case.status == 'started': 128 case.status = "failed" 129 elif not case.status: 130 case.status = status 131 if reason: 132 case.reason = reason 133 else: 134 case.reason = self.reason 135 136 def __getstate__(self): 137 d = self.__dict__.copy() 138 return d 139 140 def __setstate__(self, d): 141 self.__dict__.update(d) 142 143 def __lt__(self, other): 144 return self.name < other.name 145 146 def set_case_status_by_name(self, name, status, reason=None): 147 tc = self.get_case_or_create(name) 148 tc.status = status 149 if reason: 150 tc.reason = reason 151 return tc 152 153 def add_testcase(self, name, freeform=False): 154 tc = TestCase(name=name) 155 tc.freeform = freeform 156 self.testcases.append(tc) 157 return tc 158 159 def get_case_by_name(self, name): 160 for c in self.testcases: 161 if c.name == name: 162 return c 163 return None 164 165 def get_case_or_create(self, name): 166 for c in self.testcases: 167 if c.name == name: 168 return c 169 170 logger.debug(f"Could not find a matching testcase for {name}") 171 tc = TestCase(name=name) 172 self.testcases.append(tc) 173 return tc 174 175 @staticmethod 176 def testsuite_runnable(testsuite, fixtures): 177 can_run = False 178 # console harness allows us to run the test and capture data. 179 if testsuite.harness in [ 'console', 'ztest', 'pytest', 'test', 'gtest', 'robot']: 180 can_run = True 181 # if we have a fixture that is also being supplied on the 182 # command-line, then we need to run the test, not just build it. 183 fixture = testsuite.harness_config.get('fixture') 184 if fixture: 185 can_run = fixture in map(lambda f: f.split(sep=':')[0], fixtures) 186 187 return can_run 188 189 def setup_handler(self, env): 190 if self.handler: 191 return 192 193 options = env.options 194 handler = Handler(self, "") 195 if options.device_testing: 196 handler = DeviceHandler(self, "device") 197 handler.call_make_run = False 198 handler.ready = True 199 elif self.platform.simulation != "na": 200 if self.platform.simulation == "qemu": 201 if os.name != "nt": 202 handler = QEMUHandler(self, "qemu") 203 else: 204 handler = QEMUWinHandler(self, "qemu") 205 handler.args.append(f"QEMU_PIPE={handler.get_fifo()}") 206 handler.ready = True 207 else: 208 handler = SimulationHandler(self, self.platform.simulation) 209 210 if self.platform.simulation_exec and shutil.which(self.platform.simulation_exec): 211 handler.ready = True 212 elif self.testsuite.type == "unit": 213 handler = BinaryHandler(self, "unit") 214 handler.binary = os.path.join(self.build_dir, "testbinary") 215 if options.enable_coverage: 216 handler.args.append("COVERAGE=1") 217 handler.call_make_run = False 218 handler.ready = True 219 220 if handler: 221 handler.options = options 222 handler.generator_cmd = env.generator_cmd 223 handler.suite_name_check = not options.disable_suite_name_check 224 self.handler = handler 225 226 # Global testsuite parameters 227 def check_runnable(self, enable_slow=False, filter='buildable', fixtures=[], hardware_map=None): 228 229 if os.name == 'nt': 230 # running on simulators is currently supported only for QEMU on Windows 231 if self.platform.simulation not in ('na', 'qemu'): 232 return False 233 234 # check presence of QEMU on Windows 235 if self.platform.simulation == 'qemu' and 'QEMU_BIN_PATH' not in os.environ: 236 return False 237 238 # we asked for build-only on the command line 239 if self.testsuite.build_only: 240 return False 241 242 # Do not run slow tests: 243 skip_slow = self.testsuite.slow and not enable_slow 244 if skip_slow: 245 return False 246 247 target_ready = bool(self.testsuite.type == "unit" or \ 248 self.platform.type == "native" or \ 249 (self.platform.simulation in SUPPORTED_SIMS and \ 250 self.platform.simulation not in self.testsuite.simulation_exclude) or \ 251 filter == 'runnable') 252 253 # check if test is runnable in pytest 254 if self.testsuite.harness == 'pytest': 255 target_ready = bool(filter == 'runnable' or self.platform.simulation in SUPPORTED_SIMS_IN_PYTEST) 256 257 SUPPORTED_SIMS_WITH_EXEC = ['nsim', 'mdb-nsim', 'renode', 'tsim', 'native'] 258 if filter != 'runnable' and \ 259 self.platform.simulation in SUPPORTED_SIMS_WITH_EXEC and \ 260 self.platform.simulation_exec: 261 if not shutil.which(self.platform.simulation_exec): 262 target_ready = False 263 264 testsuite_runnable = self.testsuite_runnable(self.testsuite, fixtures) 265 266 if hardware_map: 267 for h in hardware_map.duts: 268 if (h.platform == self.platform.name and 269 self.testsuite_runnable(self.testsuite, h.fixtures)): 270 testsuite_runnable = True 271 break 272 273 return testsuite_runnable and target_ready 274 275 def create_overlay(self, platform, enable_asan=False, enable_ubsan=False, enable_coverage=False, coverage_platform=[]): 276 # Create this in a "twister/" subdirectory otherwise this 277 # will pass this overlay to kconfig.py *twice* and kconfig.cmake 278 # will silently give that second time precedence over any 279 # --extra-args=CONFIG_* 280 subdir = os.path.join(self.build_dir, "twister") 281 282 content = "" 283 284 if self.testsuite.extra_configs: 285 new_config_list = [] 286 # some configs might be conditional on arch or platform, see if we 287 # have a namespace defined and apply only if the namespace matches. 288 # we currently support both arch: and platform: 289 for config in self.testsuite.extra_configs: 290 cond_config = config.split(":") 291 if cond_config[0] == "arch" and len(cond_config) == 3: 292 if self.platform.arch == cond_config[1]: 293 new_config_list.append(cond_config[2]) 294 elif cond_config[0] == "platform" and len(cond_config) == 3: 295 if self.platform.name == cond_config[1]: 296 new_config_list.append(cond_config[2]) 297 else: 298 new_config_list.append(config) 299 300 content = "\n".join(new_config_list) 301 302 if enable_coverage: 303 if platform.name in coverage_platform: 304 content = content + "\nCONFIG_COVERAGE=y" 305 content = content + "\nCONFIG_COVERAGE_DUMP=y" 306 307 if enable_asan: 308 if platform.type == "native": 309 content = content + "\nCONFIG_ASAN=y" 310 311 if enable_ubsan: 312 if platform.type == "native": 313 content = content + "\nCONFIG_UBSAN=y" 314 315 if content: 316 os.makedirs(subdir, exist_ok=True) 317 file = os.path.join(subdir, "testsuite_extra.conf") 318 with open(file, "w", encoding='utf-8') as f: 319 f.write(content) 320 321 return content 322 323 def calculate_sizes(self, from_buildlog: bool = False, generate_warning: bool = True) -> SizeCalculator: 324 """Get the RAM/ROM sizes of a test case. 325 326 This can only be run after the instance has been executed by 327 MakeGenerator, otherwise there won't be any binaries to measure. 328 329 @return A SizeCalculator object 330 """ 331 elf_filepath = self.get_elf_file() 332 buildlog_filepath = self.get_buildlog_file() if from_buildlog else '' 333 return SizeCalculator(elf_filename=elf_filepath, 334 extra_sections=self.testsuite.extra_sections, 335 buildlog_filepath=buildlog_filepath, 336 generate_warning=generate_warning) 337 338 def get_elf_file(self) -> str: 339 340 if self.sysbuild: 341 build_dir = self.domains.get_default_domain().build_dir 342 else: 343 build_dir = self.build_dir 344 345 fns = glob.glob(os.path.join(build_dir, "zephyr", "*.elf")) 346 fns.extend(glob.glob(os.path.join(build_dir, "testbinary"))) 347 blocklist = [ 348 'remapped', # used for xtensa plaforms 349 'zefi', # EFI for Zephyr 350 'qemu', # elf files generated after running in qemu 351 '_pre'] 352 fns = [x for x in fns if not any(bad in os.path.basename(x) for bad in blocklist)] 353 if not fns: 354 raise BuildError("Missing output binary") 355 elif len(fns) > 1: 356 logger.warning(f"multiple ELF files detected: {', '.join(fns)}") 357 return fns[0] 358 359 def get_buildlog_file(self) -> str: 360 """Get path to build.log file. 361 362 @raises BuildError: Incorrect amount (!=1) of build logs. 363 @return: Path to build.log (str). 364 """ 365 buildlog_paths = glob.glob(os.path.join(self.build_dir, "build.log")) 366 if len(buildlog_paths) != 1: 367 raise BuildError("Missing/multiple build.log file.") 368 return buildlog_paths[0] 369 370 def __repr__(self): 371 return "<TestSuite %s on %s>" % (self.testsuite.name, self.platform.name) 372