Lines Matching full:self
88 def __init__(self, total=0): argument
89 self._done = Value('i', 0)
90 self._passed = Value('i', 0)
91 self._skipped_configs = Value('i', 0)
92 self._skipped_runtime = Value('i', 0)
93 self._skipped_cases = Value('i', 0)
94 self._error = Value('i', 0)
95 self._failed = Value('i', 0)
96 self._total = Value('i', total)
97 self._cases = Value('i', 0)
100 self.lock = Lock()
103 def cases(self): argument
104 with self._cases.get_lock():
105 return self._cases.value
108 def cases(self, value): argument
109 with self._cases.get_lock():
110 self._cases.value = value
113 def skipped_cases(self): argument
114 with self._skipped_cases.get_lock():
115 return self._skipped_cases.value
118 def skipped_cases(self, value): argument
119 with self._skipped_cases.get_lock():
120 self._skipped_cases.value = value
123 def error(self): argument
124 with self._error.get_lock():
125 return self._error.value
128 def error(self, value): argument
129 with self._error.get_lock():
130 self._error.value = value
133 def done(self): argument
134 with self._done.get_lock():
135 return self._done.value
138 def done(self, value): argument
139 with self._done.get_lock():
140 self._done.value = value
143 def passed(self): argument
144 with self._passed.get_lock():
145 return self._passed.value
148 def passed(self, value): argument
149 with self._passed.get_lock():
150 self._passed.value = value
153 def skipped_configs(self): argument
154 with self._skipped_configs.get_lock():
155 return self._skipped_configs.value
158 def skipped_configs(self, value): argument
159 with self._skipped_configs.get_lock():
160 self._skipped_configs.value = value
163 def skipped_runtime(self): argument
164 with self._skipped_runtime.get_lock():
165 return self._skipped_runtime.value
168 def skipped_runtime(self, value): argument
169 with self._skipped_runtime.get_lock():
170 self._skipped_runtime.value = value
173 def failed(self): argument
174 with self._failed.get_lock():
175 return self._failed.value
178 def failed(self, value): argument
179 with self._failed.get_lock():
180 self._failed.value = value
183 def total(self): argument
184 with self._total.get_lock():
185 return self._total.value
272 def __init__(self, name, value): argument
273 self.name = name
274 self.value = value
276 def __str__(self): argument
278 return fmt.format(self.name, self.value)
288 def __init__(self, cache_file): argument
289 self.cache_file = cache_file
290 self.load(cache_file)
292 def load(self, cache_file): argument
299 self._entries = OrderedDict((e.name, e) for e in entries)
301 def get(self, name, default=None): argument
302 entry = self._entries.get(name)
308 def get_list(self, name, default=None): argument
311 entry = self._entries.get(name)
324 def __contains__(self, name): argument
325 return name in self._entries
327 def __getitem__(self, name): argument
328 return self._entries[name].value
330 def __setitem__(self, name, entry): argument
334 self._entries[name] = entry
336 def __delitem__(self, name): argument
337 del self._entries[name]
339 def __iter__(self): argument
340 return iter(self._entries.values())
352 def __init__(self, cfile, message): argument
353 TwisterException.__init__(self, cfile + ": " + message)
366 def __init__(self, name): argument
374 self.instance = my_class()
378 def __init__(self, instance, type_str="build"): argument
382 self.state = "waiting"
383 self.run = False
384 self.duration = 0
385 self.type_str = type_str
387 self.binary = None
388 self.pid_fn = None
389 self.call_make_run = False
391 self.name = instance.name
392 self.instance = instance
393 self.timeout = instance.testcase.timeout
394 self.sourcedir = instance.testcase.source_dir
395 self.build_dir = instance.build_dir
396 self.log = os.path.join(self.build_dir, "handler.log")
397 self.returncode = 0
398 self.set_state("running", self.duration)
399 self.generator = None
400 self.generator_cmd = None
402 self.args = []
403 self.terminated = False
405 def set_state(self, state, duration): argument
406 self.state = state
407 self.duration = duration
409 def get_state(self): argument
410 ret = (self.state, self.duration)
413 def record(self, harness): argument
415 filename = os.path.join(self.build_dir, "recording.csv")
422 def terminate(self, proc): argument
437 self.terminated = True
439 def add_missing_testscases(self, harness): argument
445 for c in self.instance.testcase.cases:
451 def __init__(self, instance, type_str): argument
458 self.call_west_flash = False
461 self.valgrind = False
462 self.lsan = False
463 self.asan = False
464 self.ubsan = False
465 self.coverage = False
467 def try_kill_process_by_pid(self): argument
468 if self.pid_fn:
469 pid = int(open(self.pid_fn).read())
470 os.unlink(self.pid_fn)
471 self.pid_fn = None # clear so we don't try to kill the binary twice
477 def _output_reader(self, proc): argument
478 self.line = proc.stdout.readline()
480 def _output_handler(self, proc, harness): argument
485 log_out_fp = open(self.log, "wt")
487 timeout_time = time.time() + self.timeout
492 reader_t = threading.Thread(target=self._output_reader, args=(proc,), daemon=True)
496 line = self.line
516 self.terminate(proc)
520 def handle(self): argument
522 harness_name = self.instance.testcase.harness.capitalize()
525 harness.configure(self.instance)
527 if self.call_make_run:
528 command = [self.generator_cmd, "run"]
529 elif self.call_west_flash:
530 command = ["west", "flash", "--skip-rebuild", "-d", self.build_dir]
532 command = [self.binary]
535 if self.valgrind and shutil.which("valgrind"):
539 "--log-file=" + self.build_dir + "/valgrind.log"
545 "in directory: " + self.build_dir)
550 if self.asan:
553 if not self.lsan:
556 if self.ubsan:
561 stderr=subprocess.PIPE, cwd=self.build_dir, env=env) as proc:
562 logger.debug("Spawning BinaryHandler Thread for %s" % self.name)
563 t = threading.Thread(target=self._output_handler, args=(proc, harness,), daemon=True)
567 self.terminate(proc)
570 self.returncode = proc.returncode
571 self.try_kill_process_by_pid()
575 if self.coverage:
576 subprocess.call(["GCOV_PREFIX=" + self.build_dir,
577 "gcov", self.sourcedir, "-b", "-s", self.build_dir], shell=True)
585 harness.pytest_run(self.log)
586 self.instance.results = harness.tests
588 if not self.terminated and self.returncode != 0:
591 self.set_state("failed", handler_time)
592 self.instance.reason = "Failed"
593 elif run_valgrind and self.returncode == 2:
594 self.set_state("failed", handler_time)
595 self.instance.reason = "Valgrind error"
597 self.set_state(harness.state, handler_time)
599 self.instance.reason = "Failed"
601 self.set_state("timeout", handler_time)
602 self.instance.reason = "Timeout"
603 self.add_missing_testscases(harness)
605 self.record(harness)
610 def __init__(self, instance, type_str): argument
617 self.suite = None
619 def monitor_serial(self, ser, halt_fileno, harness): argument
624 log_out_fp = open(self.log, "wt")
629 if self.coverage:
638 readable, _, _ = select.select(readlist, [], [], self.timeout)
673 def device_is_available(self, instance): argument
676 for d in self.suite.duts:
693 def make_device_available(self, serial): argument
694 for d in self.suite.duts:
710 def handle(self): argument
714 hardware = self.device_is_available(self.instance)
716 … logger.debug("Waiting for device {} to become available".format(self.instance.platform.name))
718 hardware = self.device_is_available(self.instance)
720 runner = hardware.runner or self.suite.west_runner
738 if (self.suite.west_flash is not None) or runner:
739 command = ["west", "flash", "--skip-rebuild", "-d", self.build_dir]
749 if self.suite.west_flash and self.suite.west_flash != []:
750 command_extra_args.extend(self.suite.west_flash.split(','))
783 command = [self.generator_cmd, "-C", self.build_dir, "flash"]
790 self.run_custom_script(pre_script, 30)
799 timeout=self.timeout
802 self.set_state("failed", 0)
803 self.instance.reason = "Failed"
811 self.make_device_available(serial_device)
816 harness_name = self.instance.testcase.harness.capitalize()
819 harness.configure(self.instance)
823 t = threading.Thread(target=self.monitor_serial, daemon=True,
827 d_log = "{}/device.log".format(self.instance.build_dir)
837 self.instance.reason = "Device issue (Flash?)"
845 self.instance.reason = "Device issue (Timeout)"
854 self.run_custom_script(post_flash_script, 30)
856 t.join(self.timeout)
858 … logger.debug("Timed out while monitoring serial output on {}".format(self.instance.platform.name))
875 self.add_missing_testscases(harness)
878 self.instance.reason = "Timeout"
880 self.instance.reason = "Flash error"
883 harness.pytest_run(self.log)
884 self.instance.results = harness.tests
889 if self.instance.results == {}:
890 for k in self.instance.testcase.cases:
891 self.instance.results[k] = 'BLOCK'
894 self.set_state(harness.state, handler_time)
896 self.instance.reason = "Failed"
898 self.set_state(out_state, handler_time)
901 self.run_custom_script(post_script, 30)
903 self.make_device_available(serial_device)
904 self.record(harness)
916 def __init__(self, instance, type_str): argument
923 self.fifo_fn = os.path.join(instance.build_dir, "qemu-fifo")
925 self.pid_fn = os.path.join(instance.build_dir, "qemu.pid")
928 self.ignore_qemu_crash = True
929 self.ignore_unexpected_eof = True
931 self.ignore_qemu_crash = False
932 self.ignore_unexpected_eof = False
1087 def handle(self): argument
1088 self.results = {}
1089 self.run = True
1094 self.fifo_fn = os.path.join(self.instance.build_dir, "qemu-fifo")
1095 self.pid_fn = os.path.join(self.instance.build_dir, "qemu.pid")
1097 if os.path.exists(self.pid_fn):
1098 os.unlink(self.pid_fn)
1100 self.log_fn = self.log
1102 harness_import = HarnessImporter(self.instance.testcase.harness.capitalize())
1104 harness.configure(self.instance)
1106 self.thread = threading.Thread(name=self.name, target=QEMUHandler._thread,
1107 args=(self, self.timeout, self.build_dir,
1108 self.log_fn, self.fifo_fn,
1109 self.pid_fn, self.results, harness,
1110 self.ignore_unexpected_eof))
1112 self.instance.results = harness.tests
1113 self.thread.daemon = True
1114 logger.debug("Spawning QEMUHandler Thread for %s" % self.name)
1115 self.thread.start()
1119 logger.debug("Running %s (%s)" % (self.name, self.type_str))
1120 command = [self.generator_cmd]
1121 command += ["-C", self.build_dir, "run"]
1126 …with subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=self.build_dir)…
1127 logger.debug("Spawning QEMUHandler Thread for %s" % self.name)
1130 proc.wait(self.timeout)
1137 self.terminate(proc)
1139 self.returncode = 0
1141 self.returncode = proc.returncode
1143 if os.path.exists(self.pid_fn):
1144 qemu_pid = int(open(self.pid_fn).read())
1146 self.returncode = proc.returncode
1150 self.thread.join(0)
1151 if self.thread.is_alive():
1154 if os.path.exists(self.pid_fn):
1155 qemu_pid = int(open(self.pid_fn).read())
1156 os.unlink(self.pid_fn)
1158 logger.debug(f"return code from QEMU ({qemu_pid}): {self.returncode}")
1160 if (self.returncode != 0 and not self.ignore_qemu_crash) or not harness.state:
1161 self.set_state("failed", 0)
1163 self.instance.reason = "Timeout"
1165 self.instance.reason = "Exited with {}".format(self.returncode)
1166 self.add_missing_testscases(harness)
1168 def get_fifo(self): argument
1169 return self.fifo_fn
1255 def __init__(self, filename, extra_sections): argument
1287 self.is_xip = (len(is_xip_output) != 0)
1289 self.filename = filename
1290 self.sections = []
1291 self.rom_size = 0
1292 self.ram_size = 0
1293 self.extra_sections = extra_sections
1295 self._calculate_sizes()
1297 def get_ram_size(self): argument
1302 return self.ram_size
1304 def get_rom_size(self): argument
1309 return self.rom_size
1311 def unrecognized_sections(self): argument
1317 for v in self.sections:
1322 def _calculate_sizes(self): argument
1324 objdump_command = "objdump -h " + self.filename
1356 self.ram_size += size
1359 self.ram_size += size
1360 self.rom_size += size
1363 self.rom_size += size
1364 if not self.is_xip:
1365 self.ram_size += size
1369 if name not in self.extra_sections:
1372 self.sections.append({"name": name, "load_addr": load_addr,
1382 def __init__(self, filename, schema): argument
1387 self.data = {}
1388 self.schema = schema
1389 self.filename = filename
1390 self.tests = {}
1391 self.common = {}
1393 def load(self): argument
1394 self.data = scl.yaml_load_verify(self.filename, self.schema)
1396 if 'tests' in self.data:
1397 self.tests = self.data['tests']
1398 if 'common' in self.data:
1399 self.common = self.data['common']
1401 def _cast_value(self, value, typestr): argument
1421 return [self._cast_value(vsi, typestr[5:]) for vsi in vs]
1428 return {self._cast_value(vsi, typestr[4:]) for vsi in vs}
1436 self.filename, "unknown type '%s'" % value)
1438 def get_test(self, name, valid_keys): argument
1464 for k, v in self.common.items():
1467 for k, v in self.tests[name].items():
1490 self.filename,
1497 default = self._cast_value("", kinfo["type"])
1501 d[k] = self._cast_value(d[k], kinfo["type"])
1504 self.filename, "bad %s value '%s' for key '%s' in name '%s'" %
1518 def __init__(self): argument
1523 self.name = ""
1524 self.twister = True
1526 self.ram = 128
1528 self.ignore_tags = []
1529 self.only_tags = []
1530 self.default = False
1532 self.flash = 512
1533 self.supported = set()
1535 self.arch = ""
1536 self.type = "na"
1537 self.simulation = "na"
1538 self.supported_toolchains = []
1539 self.env = []
1540 self.env_satisfied = True
1541 self.filter_data = dict()
1543 def load(self, platform_file): argument
1544 scp = TwisterConfigParser(platform_file, self.platform_schema)
1548 self.name = data['identifier']
1549 self.twister = data.get("twister", True)
1551 self.ram = data.get("ram", 128)
1553 self.ignore_tags = testing.get("ignore_tags", [])
1554 self.only_tags = testing.get("only_tags", [])
1555 self.default = testing.get("default", False)
1557 self.flash = data.get("flash", 512)
1558 self.supported = set()
1561 self.supported.add(item)
1563 self.arch = data['arch']
1564 self.type = data.get('type', "na")
1565 self.simulation = data.get('simulation', "na")
1566 self.supported_toolchains = data.get("toolchain", [])
1567 self.env = data.get("env", [])
1568 self.env_satisfied = True
1569 for env in self.env:
1571 self.env_satisfied = False
1573 def __repr__(self): argument
1574 return "<%s on %s>" % (self.name, self.arch)
1585 def __init__(self, testcase_root, workdir, name): argument
1607 self.source_dir = ""
1608 self.yamlfile = ""
1609 self.cases = []
1610 self.name = self.get_unique(testcase_root, workdir, name)
1611 self.id = name
1613 self.type = None
1614 self.tags = set()
1615 self.extra_args = None
1616 self.extra_configs = None
1617 self.arch_allow = None
1618 self.arch_exclude = None
1619 self.skip = False
1620 self.platform_exclude = None
1621 self.platform_allow = None
1622 self.toolchain_exclude = None
1623 self.toolchain_allow = None
1624 self.tc_filter = None
1625 self.timeout = 60
1626 self.harness = ""
1627 self.harness_config = {}
1628 self.build_only = True
1629 self.build_on_all = False
1630 self.slow = False
1631 self.min_ram = -1
1632 self.depends_on = None
1633 self.min_flash = -1
1634 self.extra_sections = None
1635 self.integration_platforms = []
1724 def scan_path(self, path): argument
1728 _subcases, warnings = self.scan_file(filename)
1739 _subcases, warnings = self.scan_file(filename)
1748 def parse_subcases(self, test_path): argument
1749 results = self.scan_path(test_path)
1751 name = "{}.{}".format(self.id, sub)
1752 self.cases.append(name)
1755 self.cases.append(self.id)
1757 def __str__(self): argument
1758 return self.name
1770 def __init__(self, testcase, platform, outdir): argument
1772 self.testcase = testcase
1773 self.platform = platform
1775 self.status = None
1776 self.reason = "Unknown"
1777 self.metrics = dict()
1778 self.handler = None
1779 self.outdir = outdir
1781 self.name = os.path.join(platform.name, testcase.name)
1782 self.build_dir = os.path.join(outdir, platform.name, testcase.name)
1784 self.run = False
1786 self.results = {}
1788 def __getstate__(self): argument
1789 d = self.__dict__.copy()
1792 def __setstate__(self, d): argument
1793 self.__dict__.update(d)
1795 def __lt__(self, other): argument
1796 return self.name < other.name
1820 def check_runnable(self, enable_slow=False, filter='buildable', fixtures=[]): argument
1828 if self.testcase.build_only:
1832 skip_slow = self.testcase.slow and not enable_slow
1836 target_ready = bool(self.testcase.type == "unit" or \
1837 self.platform.type == "native" or \
1838 … self.platform.simulation in ["mdb-nsim", "nsim", "renode", "qemu", "tsim", "armfvp"] or \
1841 if self.platform.simulation == "nsim":
1845 if self.platform.simulation == "mdb-nsim":
1849 if self.platform.simulation == "renode":
1853 if self.platform.simulation == "tsim":
1857 testcase_runnable = self.testcase_runnable(self.testcase, fixtures)
1861 …def create_overlay(self, platform, enable_asan=False, enable_ubsan=False, enable_coverage=False, c… argument
1866 subdir = os.path.join(self.build_dir, "twister")
1870 if self.testcase.extra_configs:
1871 content = "\n".join(self.testcase.extra_configs)
1894 def calculate_sizes(self): argument
1902 fns = glob.glob(os.path.join(self.build_dir, "zephyr", "*.elf"))
1903 fns.extend(glob.glob(os.path.join(self.build_dir, "zephyr", "*.exe")))
1908 return SizeCalculator(fns[0], self.testcase.extra_sections)
1910 def fill_results_by_status(self): argument
1911 """Fills results according to self.status
1925 for k in self.results:
1926 self.results[k] = status_to_verdict[self.status]
1928 def __repr__(self): argument
1929 return "<TestCase %s on %s>" % (self.testcase.name, self.platform.name)
1936 def __init__(self, testcase, platform, source_dir, build_dir): argument
1938 self.cwd = None
1939 self.capture_output = True
1941 self.defconfig = {}
1942 self.cmake_cache = {}
1944 self.instance = None
1945 self.testcase = testcase
1946 self.platform = platform
1947 self.source_dir = source_dir
1948 self.build_dir = build_dir
1949 self.log = "build.log"
1950 self.generator = None
1951 self.generator_cmd = None
1953 def parse_generated(self): argument
1954 self.defconfig = {}
1957 def run_build(self, args=[]): argument
1959 logger.debug("Building %s for %s" % (self.source_dir, self.platform.name))
1967 if self.capture_output:
1972 if self.cwd:
1973 kwargs['cwd'] = self.cwd
1980 msg = "Finished building %s for %s" % (self.source_dir, self.platform.name)
1982 self.instance.status = "passed"
1983 results = {'msg': msg, "returncode": p.returncode, "instance": self.instance}
1987 with open(os.path.join(self.build_dir, self.log), "a") as log:
1997 with open(os.path.join(self.build_dir, self.log), "a") as log:
2002 if res and not self.overflow_as_errors:
2004 self.instance.status = "skipped"
2005 self.instance.reason = "{} overflow".format(res[0])
2007 self.instance.status = "error"
2008 self.instance.reason = "Build failure"
2012 "instance": self.instance,
2017 def run_cmake(self, args=[]): argument
2019 if self.warnings_as_errors:
2026 logger.debug("Running cmake on %s for %s" % (self.source_dir, self.platform.name))
2028 f'-B{self.build_dir}',
2029 f'-S{self.source_dir}',
2032 f'-G{self.generator}'
2038 cmake_opts = ['-DBOARD={}'.format(self.platform.name)]
2047 if self.capture_output:
2052 if self.cwd:
2053 kwargs['cwd'] = self.cwd
2059 filter_results = self.parse_generated()
2060 msg = "Finished building %s for %s" % (self.source_dir, self.platform.name)
2065 self.instance.status = "error"
2066 self.instance.reason = "Cmake build failure"
2067 self.instance.fill_results_by_status()
2068 logger.error("Cmake build failure: %s for %s" % (self.source_dir, self.platform.name))
2072 with open(os.path.join(self.build_dir, self.log), "a") as log:
2123 def __init__(self, testcase, platform, source_dir, build_dir): argument
2126 self.log = "config-twister.log"
2128 def parse_generated(self): argument
2130 if self.platform.name == "unit_testing":
2133 cmake_cache_path = os.path.join(self.build_dir, "CMakeCache.txt")
2134 defconfig_path = os.path.join(self.build_dir, "zephyr", ".config")
2139 m = self.config_re.match(line)
2146 self.defconfig = defconfig
2157 self.cmake_cache = cmake_conf
2160 "ARCH": self.platform.arch,
2161 "PLATFORM": self.platform.name
2164 filter_data.update(self.defconfig)
2165 filter_data.update(self.cmake_cache)
2167 edt_pickle = os.path.join(self.build_dir, "zephyr", "edt.pickle")
2168 if self.testcase and self.testcase.tc_filter:
2175 res = expr_parser.parse(self.testcase.tc_filter, filter_data, edt)
2179 "Failed processing %s\n" % self.testcase.yamlfile)
2183 return {os.path.join(self.platform.name, self.testcase.name): True}
2185 return {os.path.join(self.platform.name, self.testcase.name): False}
2187 self.platform.filter_data = filter_data
2193 def __init__(self, suite, instance, **kwargs): argument
2196 self.log = "build.log"
2197 self.instance = instance
2198 self.suite = suite
2199 self.filtered_tests = 0
2201 self.lsan = kwargs.get('lsan', False)
2202 self.asan = kwargs.get('asan', False)
2203 self.ubsan = kwargs.get('ubsan', False)
2204 self.valgrind = kwargs.get('valgrind', False)
2205 self.extra_args = kwargs.get('extra_args', [])
2206 self.device_testing = kwargs.get('device_testing', False)
2207 self.cmake_only = kwargs.get('cmake_only', False)
2208 self.cleanup = kwargs.get('cleanup', False)
2209 self.coverage = kwargs.get('coverage', False)
2210 self.inline_logs = kwargs.get('inline_logs', False)
2211 self.generator = kwargs.get('generator', None)
2212 self.generator_cmd = kwargs.get('generator_cmd', None)
2213 self.verbose = kwargs.get('verbose', None)
2214 self.warnings_as_errors = kwargs.get('warnings_as_errors', True)
2215 self.overflow_as_errors = kwargs.get('overflow_as_errors', False)
2235 def log_info_file(self, inline_logs): argument
2236 build_dir = self.instance.build_dir
2242 if os.path.exists(v_log) and "Valgrind" in self.instance.reason:
2243 self.log_info("{}".format(v_log), inline_logs)
2245 self.log_info("{}".format(h_log), inline_logs)
2247 self.log_info("{}".format(d_log), inline_logs)
2249 self.log_info("{}".format(b_log), inline_logs)
2251 def setup_handler(self): argument
2253 instance = self.instance
2264 if self.coverage:
2269 handler.asan = self.asan
2270 handler.valgrind = self.valgrind
2271 handler.lsan = self.lsan
2272 handler.ubsan = self.ubsan
2273 handler.coverage = self.coverage
2285 elif self.device_testing:
2287 instance.handler.coverage = self.coverage
2302 instance.handler.generator_cmd = self.generator_cmd
2303 instance.handler.generator = self.generator
2305 def process(self, pipeline, done, message, lock, results): argument
2308 if not self.instance.handler:
2309 self.setup_handler()
2313 res = self.cmake()
2314 if self.instance.status in ["failed", "error"]:
2315 pipeline.put({"op": "report", "test": self.instance})
2316 elif self.cmake_only:
2317 if self.instance.status is None:
2318 self.instance.status = "passed"
2319 pipeline.put({"op": "report", "test": self.instance})
2321 if self.instance.name in res['filter'] and res['filter'][self.instance.name]:
2322 logger.debug("filtering %s" % self.instance.name)
2323 self.instance.status = "skipped"
2324 self.instance.reason = "filter"
2326 for case in self.instance.testcase.cases:
2327 self.instance.results.update({case: 'SKIP'})
2328 pipeline.put({"op": "report", "test": self.instance})
2330 pipeline.put({"op": "build", "test": self.instance})
2333 logger.debug("build test: %s" % self.instance.name)
2334 res = self.build()
2337 self.instance.status = "error"
2338 self.instance.reason = "Build Failure"
2339 pipeline.put({"op": "report", "test": self.instance})
2348 pipeline.put({"op": "report", "test": self.instance})
2350 if self.instance.run and self.instance.handler:
2351 pipeline.put({"op": "run", "test": self.instance})
2353 pipeline.put({"op": "report", "test": self.instance})
2356 logger.debug("run test: %s" % self.instance.name)
2357 self.run()
2358 self.instance.status, _ = self.instance.handler.get_state()
2359 logger.debug(f"run status: {self.instance.name} {self.instance.status}")
2362 self.instance.handler.thread = None
2363 self.instance.handler.suite = None
2366 "test": self.instance,
2367 "status": self.instance.status,
2368 "reason": self.instance.reason
2375 done.put(self.instance)
2376 self.report_out(results)
2378 if self.cleanup and not self.coverage and self.instance.status == "passed":
2381 "test": self.instance
2385 if self.device_testing:
2386 self.cleanup_device_testing_artifacts()
2388 self.cleanup_artifacts()
2390 def cleanup_artifacts(self, additional_keep=[]): argument
2391 logger.debug("Cleaning up {}".format(self.instance.build_dir))
2402 allow = [os.path.join(self.instance.build_dir, file) for file in allow]
2404 for dirpath, dirnames, filenames in os.walk(self.instance.build_dir, topdown=False):
2417 def cleanup_device_testing_artifacts(self): argument
2418 logger.debug("Cleaning up for Device Testing {}".format(self.instance.build_dir))
2432 self.cleanup_artifacts(keep)
2436 file = os.path.join(self.instance.build_dir, file)
2445 def report_out(self, results): argument
2449 instance = self.instance
2455 if self.verbose:
2466 if not self.verbose:
2467 self.log_info_file(self.inline_logs)
2476 if self.verbose:
2477 if self.cmake_only:
2495 self.log_info_file(self.inline_logs)
2518 def cmake(self): argument
2520 instance = self.instance
2521 args = self.testcase.extra_args[:]
2522 args += self.extra_args
2552 res = self.run_cmake(args)
2555 def build(self): argument
2556 res = self.run_build(['--build', self.build_dir])
2559 def run(self): argument
2561 instance = self.instance
2565 instance.handler.suite = self.suite
2613 def __init__(self, board_root_list=[], testcase_roots=[], outdir=None): argument
2615 self.roots = testcase_roots
2617 self.board_roots = [board_root_list]
2619 self.board_roots = board_root_list
2622 self.coverage_platform = []
2623 self.build_only = False
2624 self.cmake_only = False
2625 self.cleanup = False
2626 self.enable_slow = False
2627 self.device_testing = False
2628 self.fixtures = []
2629 self.enable_coverage = False
2630 self.enable_ubsan = False
2631 self.enable_lsan = False
2632 self.enable_asan = False
2633 self.enable_valgrind = False
2634 self.extra_args = []
2635 self.inline_logs = False
2636 self.enable_sizes_report = False
2637 self.west_flash = None
2638 self.west_runner = None
2639 self.generator = None
2640 self.generator_cmd = None
2641 self.warnings_as_errors = True
2642 self.overflow_as_errors = False
2643 self.quarantine_verify = False
2646 self.testcases = {}
2647 self.quarantine = {}
2648 self.platforms = []
2649 self.selected_platforms = []
2650 self.filtered_platforms = []
2651 self.default_platforms = []
2652 self.outdir = os.path.abspath(outdir)
2653 self.discards = {}
2654 self.load_errors = 0
2655 self.instances = dict()
2657 self.total_platforms = 0
2658 self.start_time = 0
2659 self.duration = 0
2660 self.warnings = 0
2663 self.duts = []
2666 self.integration = False
2668 self.pipeline = None
2669 self.version = "NA"
2671 def check_zephyr_version(self): argument
2678 self.version = subproc.stdout.strip()
2679 logger.info(f"Zephyr version: {self.version}")
2683 def get_platform_instances(self, platform): argument
2684 filtered_dict = {k:v for k,v in self.instances.items() if k.startswith(platform + "/")}
2687 def config(self): argument
2688 logger.info("coverage platform: {}".format(self.coverage_platform))
2696 def update_counting(self, results=None, initial=False): argument
2699 for instance in self.instances.values():
2711 def compare_metrics(self, filename): argument
2730 for instance in self.instances.values():
2747 def footprint_reports(self, report, show_footprint, all_deltas, argument
2753 deltas = self.compare_metrics(report)
2778 def summary(self, results, unrecognized_sections): argument
2781 for instance in self.instances.values():
2809 Fore.YELLOW if self.warnings else Fore.RESET,
2810 self.warnings,
2812 self.duration))
2814 self.total_platforms = len(self.platforms)
2816 if self.platforms and not self.build_only:
2820 len(self.filtered_platforms),
2821 self.total_platforms,
2822 (100 * len(self.filtered_platforms) / len(self.platforms))
2828 …def save_reports(self, name, suffix, report_dir, no_update, release, only_failed, platform_reports… argument
2829 if not self.instances:
2843 filename = os.path.join(self.outdir, report_name)
2844 outdir = self.outdir
2850 self.xunit_report(filename + ".xml", full_report=False,
2851 append=only_failed, version=self.version)
2852 self.xunit_report(filename + "_report.xml", full_report=True,
2853 append=only_failed, version=self.version)
2854 self.csv_report(filename + ".csv")
2857 self.json_report(filename + ".json", append=only_failed, version=self.version)
2860 self.target_report(outdir, suffix, append=only_failed)
2861 if self.discards:
2862 self.discard_report(filename + "_discard.csv")
2865 self.csv_report(self.RELEASE_DATA)
2867 def add_configurations(self): argument
2869 for board_root in self.board_roots:
2879 if platform.name in [p.name for p in self.platforms]:
2883 self.platforms.append(platform)
2885 self.default_platforms.append(platform.name)
2889 self.load_errors += 1
2891 def get_all_tests(self): argument
2893 for _, tc in self.testcases.items():
2915 def add_testcases(self, testcase_filter=[]): argument
2916 for root in self.roots:
2922 if self.SAMPLE_FILENAME in filenames:
2923 filename = self.SAMPLE_FILENAME
2924 elif self.TESTCASE_FILENAME in filenames:
2925 filename = self.TESTCASE_FILENAME
2934 parsed_data = TwisterConfigParser(tc_path, self.tc_schema)
2943 tc_dict = parsed_data.get_test(name, self.testcase_valid_keys)
2978 self.testcases[tc.name] = tc
2980 self.testcases[tc.name] = tc
2984 self.load_errors += 1
2985 return len(self.testcases)
2987 def get_platform(self, name): argument
2989 for platform in self.platforms:
2995 def load_quarantine(self, file): argument
3003 quarantine_yaml = scl.yaml_load_verify(file, self.quarantine_schema)
3010 plat = [p.name for p in self.platforms]
3021 self.quarantine.update(d)
3023 def load_from_file(self, file, filter_status=[], filter_platform=[]): argument
3033 platform = self.get_platform(row["platform"])
3036 instance = TestInstance(self.testcases[test], platform, self.outdir)
3037 if self.device_testing:
3042 self.enable_slow,
3044 self.fixtures
3046 …instance.create_overlay(platform, self.enable_asan, self.enable_ubsan, self.enable_coverage, self.…
3048 self.add_instances(instance_list)
3058 def apply_filters(self, **kwargs): argument
3060 toolchain = self.get_toolchain()
3096 platforms = list(filter(lambda p: p.name in platform_filter, self.platforms))
3098 platforms = list(filter(lambda p: p.simulation != 'na', self.platforms))
3100 platforms = list(filter(lambda p: p.arch in arch_filter, self.platforms))
3102 platforms = list(filter(lambda p: p.default, self.platforms))
3104 platforms = self.platforms
3108 for tc_name, tc in self.testcases.items():
3111 platform_scope = self.platforms
3112 elif tc.integration_platforms and self.integration:
3114 self.platforms))
3118 integration = self.integration and tc.integration_platforms
3124 b = set(filter(lambda item: item.name in tc.platform_allow, self.platforms))
3128 self.platforms))
3133 instance = TestInstance(tc, plat, self.outdir)
3140 self.enable_slow,
3142 self.fixtures
3148 if runnable and self.duts:
3149 for h in self.duts:
3164 … if self.integration and tc.integration_platforms and plat.name not in tc.integration_platforms:
3234 if test_configuration in self.quarantine and not self.quarantine_verify:
3236 … f"Quarantine: {self.quarantine[test_configuration]}")
3238 if self.quarantine_verify and test_configuration not in self.quarantine:
3253 a = set(self.default_platforms)
3258 self.add_instances(aa)
3260 self.add_instances(instance_list)
3263 self.add_instances(instances)
3266 self.add_instances(instances)
3271 self.add_instances(instance_list)
3275 self.add_instances(instance_list)
3277 for _, case in self.instances.items():
3278 …case.create_overlay(case.platform, self.enable_asan, self.enable_ubsan, self.enable_coverage, self…
3280 self.discards = discards
3281 self.selected_platforms = set(p.platform.name for p in self.instances.values())
3283 for instance in self.discards:
3284 instance.reason = self.discards[instance]
3286 … if self.integration and instance.platform.name in instance.testcase.integration_platforms \
3291 self.instances[instance.name] = instance
3296 self.filtered_platforms = set(p.platform.name for p in self.instances.values()
3301 def add_instances(self, instance_list): argument
3303 self.instances[instance.name] = instance
3320 def add_tasks_to_queue(self, pipeline, build_only=False, test_only=False): argument
3321 for instance in self.instances.values():
3336 def pipeline_mgr(self, pipeline, done_queue, lock, results): argument
3344 pb = ProjectBuilder(self,
3346 lsan=self.enable_lsan,
3347 asan=self.enable_asan,
3348 ubsan=self.enable_ubsan,
3349 coverage=self.enable_coverage,
3350 extra_args=self.extra_args,
3351 device_testing=self.device_testing,
3352 cmake_only=self.cmake_only,
3353 cleanup=self.cleanup,
3354 valgrind=self.enable_valgrind,
3355 inline_logs=self.inline_logs,
3356 generator=self.generator,
3357 generator_cmd=self.generator_cmd,
3358 verbose=self.verbose,
3359 warnings_as_errors=self.warnings_as_errors,
3360 overflow_as_errors=self.overflow_as_errors
3366 def execute(self, pipeline, done, results): argument
3369 self.add_tasks_to_queue(pipeline, self.build_only, self.test_only)
3373 for job in range(self.jobs):
3375 p = Process(target=self.pipeline_mgr, args=(pipeline, done, lock, results, ))
3388 if self.enable_size_report and not self.cmake_only:
3390 executor = concurrent.futures.ThreadPoolExecutor(self.jobs)
3391 futures = [executor.submit(self.calc_one_elf_size, instance)
3392 for instance in self.instances.values()]
3395 for instance in self.instances.values():
3403 def discard_report(self, filename): argument
3406 if not self.discards:
3415 for instance, reason in sorted(self.discards.items()):
3422 def target_report(self, outdir, suffix, append=False): argument
3423 platforms = {inst.platform.name for _, inst in self.instances.items()}
3429 self.xunit_report(filename, platform, full_report=True,
3430 append=append, version=self.version)
3444 def xunit_report(self, filename, platform=None, full_report=False, append=False, version="NA"): argument
3452 selected = self.selected_platforms
3461 inst = self.get_platform_instances(p)
3569 … log_root = os.path.join(self.outdir, instance.platform.name, instance.testcase.name)
3571 el.text = self.process_log(log_file)
3606 … log_root = ("%s/%s/%s" % (self.outdir, instance.platform.name, instance.testcase.name))
3616 failure.text = self.process_log(log_file)
3627 def csv_report(self, filename): argument
3634 for instance in self.instances.values():
3651 def json_report(self, filename, append=False, version="NA"): argument
3654 selected = self.selected_platforms
3657 "toolchain": self.get_toolchain()
3673 inst = self.get_platform_instances(p)
3704 testcase["test_output"] = self.process_log(handler_log)
3706 testcase["device_log"] = self.process_log(device_log)
3708 testcase["build_log"] = self.process_log(build_log)
3720 def get_testcase(self, identifier): argument
3722 for _, tc in self.testcases.items():
3732 def __init__(self): argument
3733 self.gcov_tool = None
3734 self.base_dir = None
3799 def generate(self, outdir): argument
3801 gcov_data = self.__class__.retrieve_gcov_data(filename)
3805 self.__class__.create_gcda_files(extracted_coverage_info)
3811 ret = self._generate(outdir, coveragelog)
3819 def __init__(self): argument
3821 self.ignores = []
3823 def add_ignore_file(self, pattern): argument
3824 self.ignores.append('*' + pattern + '*')
3826 def add_ignore_directory(self, pattern): argument
3827 self.ignores.append('*/' + pattern + '/*')
3829 def _generate(self, outdir, coveragelog): argument
3832 cmd = ["lcov", "--gcov-tool", self.gcov_tool,
3840 subprocess.call(["lcov", "--gcov-tool", self.gcov_tool, "--extract",
3842 os.path.join(self.base_dir, "tests", "ztest", "*"),
3847 subprocess.call(["lcov", "--gcov-tool", self.gcov_tool, "--remove",
3849 os.path.join(self.base_dir, "tests/ztest/test/*"),
3857 for i in self.ignores:
3859 ["lcov", "--gcov-tool", self.gcov_tool, "--remove",
3875 def __init__(self): argument
3877 self.ignores = []
3879 def add_ignore_file(self, pattern): argument
3880 self.ignores.append('.*' + pattern + '.*')
3882 def add_ignore_directory(self, pattern): argument
3883 self.ignores.append(".*/" + pattern + '/.*')
3890 def _generate(self, outdir, coveragelog): argument
3894 excludes = Gcovr._interleave_list("-e", self.ignores)
3897 cmd = ["gcovr", "-r", self.base_dir, "--gcov-executable",
3898 self.gcov_tool, "-e", "tests/*"] + excludes + ["--json", "-o",
3904 subprocess.call(["gcovr", "-r", self.base_dir, "--gcov-executable",
3905 self.gcov_tool, "-f", "tests/ztest", "-e",
3917 tracefiles = self._interleave_list("--add-tracefile", files)
3919 return subprocess.call(["gcovr", "-r", self.base_dir, "--html",
3925 def __init__(self, argument
3937 self.serial = serial
3938 self.platform = platform
3939 self.serial_pty = serial_pty
3940 self._counter = Value("i", 0)
3941 self._available = Value("i", 1)
3942 self.connected = connected
3943 self.pre_script = pre_script
3944 self.id = id
3945 self.product = product
3946 self.runner = runner
3947 self.fixtures = []
3948 self.post_flash_script = post_flash_script
3949 self.post_script = post_script
3950 self.pre_script = pre_script
3951 self.probe_id = None
3952 self.notes = None
3953 self.lock = Lock()
3954 self.match = False
3958 def available(self): argument
3959 with self._available.get_lock():
3960 return self._available.value
3963 def available(self, value): argument
3964 with self._available.get_lock():
3965 self._available.value = value
3968 def counter(self): argument
3969 with self._counter.get_lock():
3970 return self._counter.value
3973 def counter(self, value): argument
3974 with self._counter.get_lock():
3975 self._counter.value = value
3977 def to_dict(self): argument
3980 v = vars(self)
3987 def __repr__(self): argument
3988 return f"<{self.platform} ({self.product}) on {self.serial}>"
4025 def __init__(self): argument
4026 self.detected = []
4027 self.duts = []
4029 def add_device(self, serial, platform, pre_script, is_pty): argument
4037 self.duts.append(device)
4039 def load(self, map_file): argument
4040 hwm_schema = scl.yaml_load(self.schema_path)
4063 self.duts.append(new_dut)
4065 def scan(self, persistent=False): argument
4093 if d.manufacturer in self.manufacturer:
4106 for runner, _ in self.runner_mapping.items():
4107 products = self.runner_mapping.get(runner)
4117 self.detected.append(s_dev)
4121 def save(self, hwm_file): argument
4123 self.detected.sort(key=lambda x: x.serial or '')
4135 for _detected in self.detected:
4142 new_duts = list(filter(lambda d: not d.match, self.detected))
4155 self.load(hwm_file)
4157 self.dump()
4162 for _connected in self.detected:
4180 self.dump(detected=True)
4182 def dump(self, filtered=[], header=[], connected_only=False, detected=False): argument
4186 to_show = self.detected
4188 to_show = self.duts