1#!/usr/bin/env python3 2# vim: set syntax=python ts=4 : 3# 4# Copyright (c) 2018 Intel Corporation 5# SPDX-License-Identifier: Apache-2.0 6 7import os 8import contextlib 9import string 10import mmap 11import sys 12import re 13import subprocess 14import select 15import shutil 16import shlex 17import signal 18import threading 19import concurrent.futures 20from collections import OrderedDict 21import queue 22import time 23import csv 24import glob 25import concurrent 26import xml.etree.ElementTree as ET 27import logging 28import pty 29from pathlib import Path 30from distutils.spawn import find_executable 31from colorama import Fore 32import pickle 33import platform 34import yaml 35import json 36from multiprocessing import Lock, Process, Value 37 38try: 39 # Use the C LibYAML parser if available, rather than the Python parser. 40 # It's much faster. 41 from yaml import CSafeLoader as SafeLoader 42 from yaml import CDumper as Dumper 43except ImportError: 44 from yaml import SafeLoader, Dumper 45 46try: 47 import serial 48except ImportError: 49 print("Install pyserial python module with pip to use --device-testing option.") 50 51try: 52 from tabulate import tabulate 53except ImportError: 54 print("Install tabulate python module with pip to use --device-testing option.") 55 56try: 57 import psutil 58except ImportError: 59 print("Install psutil python module with pip to run in Qemu.") 60 61ZEPHYR_BASE = os.getenv("ZEPHYR_BASE") 62if not ZEPHYR_BASE: 63 sys.exit("$ZEPHYR_BASE environment variable undefined") 64 65# This is needed to load edt.pickle files. 66sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts", "dts", 67 "python-devicetree", "src")) 68from devicetree import edtlib # pylint: disable=unused-import 69 70# Use this for internal comparisons; that's what canonicalization is 71# for. Don't use it when invoking other components of the build system 72# to avoid confusing and hard to trace inconsistencies in error messages 73# and logs, generated Makefiles, etc. compared to when users invoke these 74# components directly. 75# Note "normalization" is different from canonicalization, see os.path. 76canonical_zephyr_base = os.path.realpath(ZEPHYR_BASE) 77 78sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/")) 79 80import scl 81import expr_parser 82 83logger = logging.getLogger('twister') 84logger.setLevel(logging.DEBUG) 85 86 87class ExecutionCounter(object): 88 def __init__(self, total=0): 89 self._done = Value('i', 0) 90 self._passed = Value('i', 0) 91 self._skipped_configs = Value('i', 0) 92 self._skipped_runtime = Value('i', 0) 93 self._skipped_cases = Value('i', 0) 94 self._error = Value('i', 0) 95 self._failed = Value('i', 0) 96 self._total = Value('i', total) 97 self._cases = Value('i', 0) 98 99 100 self.lock = Lock() 101 102 @property 103 def cases(self): 104 with self._cases.get_lock(): 105 return self._cases.value 106 107 @cases.setter 108 def cases(self, value): 109 with self._cases.get_lock(): 110 self._cases.value = value 111 112 @property 113 def skipped_cases(self): 114 with self._skipped_cases.get_lock(): 115 return self._skipped_cases.value 116 117 @skipped_cases.setter 118 def skipped_cases(self, value): 119 with self._skipped_cases.get_lock(): 120 self._skipped_cases.value = value 121 122 @property 123 def error(self): 124 with self._error.get_lock(): 125 return self._error.value 126 127 @error.setter 128 def error(self, value): 129 with self._error.get_lock(): 130 self._error.value = value 131 132 @property 133 def done(self): 134 with self._done.get_lock(): 135 return self._done.value 136 137 @done.setter 138 def done(self, value): 139 with self._done.get_lock(): 140 self._done.value = value 141 142 @property 143 def passed(self): 144 with self._passed.get_lock(): 145 return self._passed.value 146 147 @passed.setter 148 def passed(self, value): 149 with self._passed.get_lock(): 150 self._passed.value = value 151 152 @property 153 def skipped_configs(self): 154 with self._skipped_configs.get_lock(): 155 return self._skipped_configs.value 156 157 @skipped_configs.setter 158 def skipped_configs(self, value): 159 with self._skipped_configs.get_lock(): 160 self._skipped_configs.value = value 161 162 @property 163 def skipped_runtime(self): 164 with self._skipped_runtime.get_lock(): 165 return self._skipped_runtime.value 166 167 @skipped_runtime.setter 168 def skipped_runtime(self, value): 169 with self._skipped_runtime.get_lock(): 170 self._skipped_runtime.value = value 171 172 @property 173 def failed(self): 174 with self._failed.get_lock(): 175 return self._failed.value 176 177 @failed.setter 178 def failed(self, value): 179 with self._failed.get_lock(): 180 self._failed.value = value 181 182 @property 183 def total(self): 184 with self._total.get_lock(): 185 return self._total.value 186 187class CMakeCacheEntry: 188 '''Represents a CMake cache entry. 189 190 This class understands the type system in a CMakeCache.txt, and 191 converts the following cache types to Python types: 192 193 Cache Type Python type 194 ---------- ------------------------------------------- 195 FILEPATH str 196 PATH str 197 STRING str OR list of str (if ';' is in the value) 198 BOOL bool 199 INTERNAL str OR list of str (if ';' is in the value) 200 ---------- ------------------------------------------- 201 ''' 202 203 # Regular expression for a cache entry. 204 # 205 # CMake variable names can include escape characters, allowing a 206 # wider set of names than is easy to match with a regular 207 # expression. To be permissive here, use a non-greedy match up to 208 # the first colon (':'). This breaks if the variable name has a 209 # colon inside, but it's good enough. 210 CACHE_ENTRY = re.compile( 211 r'''(?P<name>.*?) # name 212 :(?P<type>FILEPATH|PATH|STRING|BOOL|INTERNAL) # type 213 =(?P<value>.*) # value 214 ''', re.X) 215 216 @classmethod 217 def _to_bool(cls, val): 218 # Convert a CMake BOOL string into a Python bool. 219 # 220 # "True if the constant is 1, ON, YES, TRUE, Y, or a 221 # non-zero number. False if the constant is 0, OFF, NO, 222 # FALSE, N, IGNORE, NOTFOUND, the empty string, or ends in 223 # the suffix -NOTFOUND. Named boolean constants are 224 # case-insensitive. If the argument is not one of these 225 # constants, it is treated as a variable." 226 # 227 # https://cmake.org/cmake/help/v3.0/command/if.html 228 val = val.upper() 229 if val in ('ON', 'YES', 'TRUE', 'Y'): 230 return 1 231 elif val in ('OFF', 'NO', 'FALSE', 'N', 'IGNORE', 'NOTFOUND', ''): 232 return 0 233 elif val.endswith('-NOTFOUND'): 234 return 0 235 else: 236 try: 237 v = int(val) 238 return v != 0 239 except ValueError as exc: 240 raise ValueError('invalid bool {}'.format(val)) from exc 241 242 @classmethod 243 def from_line(cls, line, line_no): 244 # Comments can only occur at the beginning of a line. 245 # (The value of an entry could contain a comment character). 246 if line.startswith('//') or line.startswith('#'): 247 return None 248 249 # Whitespace-only lines do not contain cache entries. 250 if not line.strip(): 251 return None 252 253 m = cls.CACHE_ENTRY.match(line) 254 if not m: 255 return None 256 257 name, type_, value = (m.group(g) for g in ('name', 'type', 'value')) 258 if type_ == 'BOOL': 259 try: 260 value = cls._to_bool(value) 261 except ValueError as exc: 262 args = exc.args + ('on line {}: {}'.format(line_no, line),) 263 raise ValueError(args) from exc 264 elif type_ in ['STRING', 'INTERNAL']: 265 # If the value is a CMake list (i.e. is a string which 266 # contains a ';'), convert to a Python list. 267 if ';' in value: 268 value = value.split(';') 269 270 return CMakeCacheEntry(name, value) 271 272 def __init__(self, name, value): 273 self.name = name 274 self.value = value 275 276 def __str__(self): 277 fmt = 'CMakeCacheEntry(name={}, value={})' 278 return fmt.format(self.name, self.value) 279 280 281class CMakeCache: 282 '''Parses and represents a CMake cache file.''' 283 284 @staticmethod 285 def from_file(cache_file): 286 return CMakeCache(cache_file) 287 288 def __init__(self, cache_file): 289 self.cache_file = cache_file 290 self.load(cache_file) 291 292 def load(self, cache_file): 293 entries = [] 294 with open(cache_file, 'r') as cache: 295 for line_no, line in enumerate(cache): 296 entry = CMakeCacheEntry.from_line(line, line_no) 297 if entry: 298 entries.append(entry) 299 self._entries = OrderedDict((e.name, e) for e in entries) 300 301 def get(self, name, default=None): 302 entry = self._entries.get(name) 303 if entry is not None: 304 return entry.value 305 else: 306 return default 307 308 def get_list(self, name, default=None): 309 if default is None: 310 default = [] 311 entry = self._entries.get(name) 312 if entry is not None: 313 value = entry.value 314 if isinstance(value, list): 315 return value 316 elif isinstance(value, str): 317 return [value] if value else [] 318 else: 319 msg = 'invalid value {} type {}' 320 raise RuntimeError(msg.format(value, type(value))) 321 else: 322 return default 323 324 def __contains__(self, name): 325 return name in self._entries 326 327 def __getitem__(self, name): 328 return self._entries[name].value 329 330 def __setitem__(self, name, entry): 331 if not isinstance(entry, CMakeCacheEntry): 332 msg = 'improper type {} for value {}, expecting CMakeCacheEntry' 333 raise TypeError(msg.format(type(entry), entry)) 334 self._entries[name] = entry 335 336 def __delitem__(self, name): 337 del self._entries[name] 338 339 def __iter__(self): 340 return iter(self._entries.values()) 341 342 343class TwisterException(Exception): 344 pass 345 346 347class TwisterRuntimeError(TwisterException): 348 pass 349 350 351class ConfigurationError(TwisterException): 352 def __init__(self, cfile, message): 353 TwisterException.__init__(self, cfile + ": " + message) 354 355 356class BuildError(TwisterException): 357 pass 358 359 360class ExecutionError(TwisterException): 361 pass 362 363 364class HarnessImporter: 365 366 def __init__(self, name): 367 sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/pylib/twister")) 368 module = __import__("harness") 369 if name: 370 my_class = getattr(module, name) 371 else: 372 my_class = getattr(module, "Test") 373 374 self.instance = my_class() 375 376 377class Handler: 378 def __init__(self, instance, type_str="build"): 379 """Constructor 380 381 """ 382 self.state = "waiting" 383 self.run = False 384 self.duration = 0 385 self.type_str = type_str 386 387 self.binary = None 388 self.pid_fn = None 389 self.call_make_run = False 390 391 self.name = instance.name 392 self.instance = instance 393 self.timeout = instance.testcase.timeout 394 self.sourcedir = instance.testcase.source_dir 395 self.build_dir = instance.build_dir 396 self.log = os.path.join(self.build_dir, "handler.log") 397 self.returncode = 0 398 self.set_state("running", self.duration) 399 self.generator = None 400 self.generator_cmd = None 401 402 self.args = [] 403 self.terminated = False 404 405 def set_state(self, state, duration): 406 self.state = state 407 self.duration = duration 408 409 def get_state(self): 410 ret = (self.state, self.duration) 411 return ret 412 413 def record(self, harness): 414 if harness.recording: 415 filename = os.path.join(self.build_dir, "recording.csv") 416 with open(filename, "at") as csvfile: 417 cw = csv.writer(csvfile, harness.fieldnames, lineterminator=os.linesep) 418 cw.writerow(harness.fieldnames) 419 for instance in harness.recording: 420 cw.writerow(instance) 421 422 def terminate(self, proc): 423 # encapsulate terminate functionality so we do it consistently where ever 424 # we might want to terminate the proc. We need try_kill_process_by_pid 425 # because of both how newer ninja (1.6.0 or greater) and .NET / renode 426 # work. Newer ninja's don't seem to pass SIGTERM down to the children 427 # so we need to use try_kill_process_by_pid. 428 for child in psutil.Process(proc.pid).children(recursive=True): 429 try: 430 os.kill(child.pid, signal.SIGTERM) 431 except ProcessLookupError: 432 pass 433 proc.terminate() 434 # sleep for a while before attempting to kill 435 time.sleep(0.5) 436 proc.kill() 437 self.terminated = True 438 439 def add_missing_testscases(self, harness): 440 """ 441 If testsuite was broken by some error (e.g. timeout) it is necessary to 442 add information about next testcases, which were not be 443 performed due to this error. 444 """ 445 for c in self.instance.testcase.cases: 446 if c not in harness.tests: 447 harness.tests[c] = "BLOCK" 448 449 450class BinaryHandler(Handler): 451 def __init__(self, instance, type_str): 452 """Constructor 453 454 @param instance Test Instance 455 """ 456 super().__init__(instance, type_str) 457 458 self.call_west_flash = False 459 460 # Tool options 461 self.valgrind = False 462 self.lsan = False 463 self.asan = False 464 self.ubsan = False 465 self.coverage = False 466 467 def try_kill_process_by_pid(self): 468 if self.pid_fn: 469 pid = int(open(self.pid_fn).read()) 470 os.unlink(self.pid_fn) 471 self.pid_fn = None # clear so we don't try to kill the binary twice 472 try: 473 os.kill(pid, signal.SIGTERM) 474 except ProcessLookupError: 475 pass 476 477 def _output_reader(self, proc): 478 self.line = proc.stdout.readline() 479 480 def _output_handler(self, proc, harness): 481 if harness.is_pytest: 482 harness.handle(None) 483 return 484 485 log_out_fp = open(self.log, "wt") 486 timeout_extended = False 487 timeout_time = time.time() + self.timeout 488 while True: 489 this_timeout = timeout_time - time.time() 490 if this_timeout < 0: 491 break 492 reader_t = threading.Thread(target=self._output_reader, args=(proc,), daemon=True) 493 reader_t.start() 494 reader_t.join(this_timeout) 495 if not reader_t.is_alive(): 496 line = self.line 497 logger.debug("OUTPUT: {0}".format(line.decode('utf-8').rstrip())) 498 log_out_fp.write(line.decode('utf-8')) 499 log_out_fp.flush() 500 harness.handle(line.decode('utf-8').rstrip()) 501 if harness.state: 502 if not timeout_extended or harness.capture_coverage: 503 timeout_extended = True 504 if harness.capture_coverage: 505 timeout_time = time.time() + 30 506 else: 507 timeout_time = time.time() + 2 508 else: 509 reader_t.join(0) 510 break 511 try: 512 # POSIX arch based ztests end on their own, 513 # so let's give it up to 100ms to do so 514 proc.wait(0.1) 515 except subprocess.TimeoutExpired: 516 self.terminate(proc) 517 518 log_out_fp.close() 519 520 def handle(self): 521 522 harness_name = self.instance.testcase.harness.capitalize() 523 harness_import = HarnessImporter(harness_name) 524 harness = harness_import.instance 525 harness.configure(self.instance) 526 527 if self.call_make_run: 528 command = [self.generator_cmd, "run"] 529 elif self.call_west_flash: 530 command = ["west", "flash", "--skip-rebuild", "-d", self.build_dir] 531 else: 532 command = [self.binary] 533 534 run_valgrind = False 535 if self.valgrind and shutil.which("valgrind"): 536 command = ["valgrind", "--error-exitcode=2", 537 "--leak-check=full", 538 "--suppressions=" + ZEPHYR_BASE + "/scripts/valgrind.supp", 539 "--log-file=" + self.build_dir + "/valgrind.log" 540 ] + command 541 run_valgrind = True 542 543 logger.debug("Spawning process: " + 544 " ".join(shlex.quote(word) for word in command) + os.linesep + 545 "in directory: " + self.build_dir) 546 547 start_time = time.time() 548 549 env = os.environ.copy() 550 if self.asan: 551 env["ASAN_OPTIONS"] = "log_path=stdout:" + \ 552 env.get("ASAN_OPTIONS", "") 553 if not self.lsan: 554 env["ASAN_OPTIONS"] += "detect_leaks=0" 555 556 if self.ubsan: 557 env["UBSAN_OPTIONS"] = "log_path=stdout:halt_on_error=1:" + \ 558 env.get("UBSAN_OPTIONS", "") 559 560 with subprocess.Popen(command, stdout=subprocess.PIPE, 561 stderr=subprocess.PIPE, cwd=self.build_dir, env=env) as proc: 562 logger.debug("Spawning BinaryHandler Thread for %s" % self.name) 563 t = threading.Thread(target=self._output_handler, args=(proc, harness,), daemon=True) 564 t.start() 565 t.join() 566 if t.is_alive(): 567 self.terminate(proc) 568 t.join() 569 proc.wait() 570 self.returncode = proc.returncode 571 self.try_kill_process_by_pid() 572 573 handler_time = time.time() - start_time 574 575 if self.coverage: 576 subprocess.call(["GCOV_PREFIX=" + self.build_dir, 577 "gcov", self.sourcedir, "-b", "-s", self.build_dir], shell=True) 578 579 # FIXME: This is needed when killing the simulator, the console is 580 # garbled and needs to be reset. Did not find a better way to do that. 581 if sys.stdout.isatty(): 582 subprocess.call(["stty", "sane"]) 583 584 if harness.is_pytest: 585 harness.pytest_run(self.log) 586 self.instance.results = harness.tests 587 588 if not self.terminated and self.returncode != 0: 589 # When a process is killed, the default handler returns 128 + SIGTERM 590 # so in that case the return code itself is not meaningful 591 self.set_state("failed", handler_time) 592 self.instance.reason = "Failed" 593 elif run_valgrind and self.returncode == 2: 594 self.set_state("failed", handler_time) 595 self.instance.reason = "Valgrind error" 596 elif harness.state: 597 self.set_state(harness.state, handler_time) 598 if harness.state == "failed": 599 self.instance.reason = "Failed" 600 else: 601 self.set_state("timeout", handler_time) 602 self.instance.reason = "Timeout" 603 self.add_missing_testscases(harness) 604 605 self.record(harness) 606 607 608class DeviceHandler(Handler): 609 610 def __init__(self, instance, type_str): 611 """Constructor 612 613 @param instance Test Instance 614 """ 615 super().__init__(instance, type_str) 616 617 self.suite = None 618 619 def monitor_serial(self, ser, halt_fileno, harness): 620 if harness.is_pytest: 621 harness.handle(None) 622 return 623 624 log_out_fp = open(self.log, "wt") 625 626 ser_fileno = ser.fileno() 627 readlist = [halt_fileno, ser_fileno] 628 629 if self.coverage: 630 # Set capture_coverage to True to indicate that right after 631 # test results we should get coverage data, otherwise we exit 632 # from the test. 633 harness.capture_coverage = True 634 635 ser.flush() 636 637 while ser.isOpen(): 638 readable, _, _ = select.select(readlist, [], [], self.timeout) 639 640 if halt_fileno in readable: 641 logger.debug('halted') 642 ser.close() 643 break 644 if ser_fileno not in readable: 645 continue # Timeout. 646 647 serial_line = None 648 try: 649 serial_line = ser.readline() 650 except TypeError: 651 pass 652 except serial.SerialException: 653 ser.close() 654 break 655 656 # Just because ser_fileno has data doesn't mean an entire line 657 # is available yet. 658 if serial_line: 659 sl = serial_line.decode('utf-8', 'ignore').lstrip() 660 logger.debug("DEVICE: {0}".format(sl.rstrip())) 661 662 log_out_fp.write(sl) 663 log_out_fp.flush() 664 harness.handle(sl.rstrip()) 665 666 if harness.state: 667 if not harness.capture_coverage: 668 ser.close() 669 break 670 671 log_out_fp.close() 672 673 def device_is_available(self, instance): 674 device = instance.platform.name 675 fixture = instance.testcase.harness_config.get("fixture") 676 for d in self.suite.duts: 677 if fixture and fixture not in d.fixtures: 678 continue 679 if d.platform != device or not (d.serial or d.serial_pty): 680 continue 681 d.lock.acquire() 682 avail = False 683 if d.available: 684 d.available = 0 685 d.counter += 1 686 avail = True 687 d.lock.release() 688 if avail: 689 return d 690 691 return None 692 693 def make_device_available(self, serial): 694 for d in self.suite.duts: 695 if d.serial == serial or d.serial_pty: 696 d.available = 1 697 698 @staticmethod 699 def run_custom_script(script, timeout): 700 with subprocess.Popen(script, stderr=subprocess.PIPE, stdout=subprocess.PIPE) as proc: 701 try: 702 stdout, _ = proc.communicate(timeout=timeout) 703 logger.debug(stdout.decode()) 704 705 except subprocess.TimeoutExpired: 706 proc.kill() 707 proc.communicate() 708 logger.error("{} timed out".format(script)) 709 710 def handle(self): 711 out_state = "failed" 712 runner = None 713 714 hardware = self.device_is_available(self.instance) 715 while not hardware: 716 logger.debug("Waiting for device {} to become available".format(self.instance.platform.name)) 717 time.sleep(1) 718 hardware = self.device_is_available(self.instance) 719 720 runner = hardware.runner or self.suite.west_runner 721 serial_pty = hardware.serial_pty 722 723 ser_pty_process = None 724 if serial_pty: 725 master, slave = pty.openpty() 726 try: 727 ser_pty_process = subprocess.Popen(re.split(',| ', serial_pty), stdout=master, stdin=master, stderr=master) 728 except subprocess.CalledProcessError as error: 729 logger.error("Failed to run subprocess {}, error {}".format(serial_pty, error.output)) 730 return 731 732 serial_device = os.ttyname(slave) 733 else: 734 serial_device = hardware.serial 735 736 logger.debug("Using serial device {}".format(serial_device)) 737 738 if (self.suite.west_flash is not None) or runner: 739 command = ["west", "flash", "--skip-rebuild", "-d", self.build_dir] 740 command_extra_args = [] 741 742 # There are three ways this option is used. 743 # 1) bare: --west-flash 744 # This results in options.west_flash == [] 745 # 2) with a value: --west-flash="--board-id=42" 746 # This results in options.west_flash == "--board-id=42" 747 # 3) Multiple values: --west-flash="--board-id=42,--erase" 748 # This results in options.west_flash == "--board-id=42 --erase" 749 if self.suite.west_flash and self.suite.west_flash != []: 750 command_extra_args.extend(self.suite.west_flash.split(',')) 751 752 if runner: 753 command.append("--runner") 754 command.append(runner) 755 756 board_id = hardware.probe_id or hardware.id 757 product = hardware.product 758 if board_id is not None: 759 if runner == "pyocd": 760 command_extra_args.append("--board-id") 761 command_extra_args.append(board_id) 762 elif runner == "nrfjprog": 763 command_extra_args.append("--snr") 764 command_extra_args.append(board_id) 765 elif runner == "openocd" and product == "STM32 STLink": 766 command_extra_args.append("--cmd-pre-init") 767 command_extra_args.append("hla_serial %s" % (board_id)) 768 elif runner == "openocd" and product == "STLINK-V3": 769 command_extra_args.append("--cmd-pre-init") 770 command_extra_args.append("hla_serial %s" % (board_id)) 771 elif runner == "openocd" and product == "EDBG CMSIS-DAP": 772 command_extra_args.append("--cmd-pre-init") 773 command_extra_args.append("cmsis_dap_serial %s" % (board_id)) 774 elif runner == "jlink": 775 command.append("--tool-opt=-SelectEmuBySN %s" % (board_id)) 776 elif runner == "stm32cubeprogrammer": 777 command.append("--tool-opt=sn=%s" % (board_id)) 778 779 if command_extra_args != []: 780 command.append('--') 781 command.extend(command_extra_args) 782 else: 783 command = [self.generator_cmd, "-C", self.build_dir, "flash"] 784 785 pre_script = hardware.pre_script 786 post_flash_script = hardware.post_flash_script 787 post_script = hardware.post_script 788 789 if pre_script: 790 self.run_custom_script(pre_script, 30) 791 792 try: 793 ser = serial.Serial( 794 serial_device, 795 baudrate=115200, 796 parity=serial.PARITY_NONE, 797 stopbits=serial.STOPBITS_ONE, 798 bytesize=serial.EIGHTBITS, 799 timeout=self.timeout 800 ) 801 except serial.SerialException as e: 802 self.set_state("failed", 0) 803 self.instance.reason = "Failed" 804 logger.error("Serial device error: %s" % (str(e))) 805 806 if serial_pty and ser_pty_process: 807 ser_pty_process.terminate() 808 outs, errs = ser_pty_process.communicate() 809 logger.debug("Process {} terminated outs: {} errs {}".format(serial_pty, outs, errs)) 810 811 self.make_device_available(serial_device) 812 return 813 814 ser.flush() 815 816 harness_name = self.instance.testcase.harness.capitalize() 817 harness_import = HarnessImporter(harness_name) 818 harness = harness_import.instance 819 harness.configure(self.instance) 820 read_pipe, write_pipe = os.pipe() 821 start_time = time.time() 822 823 t = threading.Thread(target=self.monitor_serial, daemon=True, 824 args=(ser, read_pipe, harness)) 825 t.start() 826 827 d_log = "{}/device.log".format(self.instance.build_dir) 828 logger.debug('Flash command: %s', command) 829 try: 830 stdout = stderr = None 831 with subprocess.Popen(command, stderr=subprocess.PIPE, stdout=subprocess.PIPE) as proc: 832 try: 833 (stdout, stderr) = proc.communicate(timeout=30) 834 logger.debug(stdout.decode()) 835 836 if proc.returncode != 0: 837 self.instance.reason = "Device issue (Flash?)" 838 with open(d_log, "w") as dlog_fp: 839 dlog_fp.write(stderr.decode()) 840 os.write(write_pipe, b'x') # halt the thread 841 out_state = "flash_error" 842 except subprocess.TimeoutExpired: 843 proc.kill() 844 (stdout, stderr) = proc.communicate() 845 self.instance.reason = "Device issue (Timeout)" 846 847 with open(d_log, "w") as dlog_fp: 848 dlog_fp.write(stderr.decode()) 849 850 except subprocess.CalledProcessError: 851 os.write(write_pipe, b'x') # halt the thread 852 853 if post_flash_script: 854 self.run_custom_script(post_flash_script, 30) 855 856 t.join(self.timeout) 857 if t.is_alive(): 858 logger.debug("Timed out while monitoring serial output on {}".format(self.instance.platform.name)) 859 out_state = "timeout" 860 861 if ser.isOpen(): 862 ser.close() 863 864 if serial_pty: 865 ser_pty_process.terminate() 866 outs, errs = ser_pty_process.communicate() 867 logger.debug("Process {} terminated outs: {} errs {}".format(serial_pty, outs, errs)) 868 869 os.close(write_pipe) 870 os.close(read_pipe) 871 872 handler_time = time.time() - start_time 873 874 if out_state in ["timeout", "flash_error"]: 875 self.add_missing_testscases(harness) 876 877 if out_state == "timeout": 878 self.instance.reason = "Timeout" 879 elif out_state == "flash_error": 880 self.instance.reason = "Flash error" 881 882 if harness.is_pytest: 883 harness.pytest_run(self.log) 884 self.instance.results = harness.tests 885 886 # sometimes a test instance hasn't been executed successfully with an 887 # empty dictionary results, in order to include it into final report, 888 # so fill the results as BLOCK 889 if self.instance.results == {}: 890 for k in self.instance.testcase.cases: 891 self.instance.results[k] = 'BLOCK' 892 893 if harness.state: 894 self.set_state(harness.state, handler_time) 895 if harness.state == "failed": 896 self.instance.reason = "Failed" 897 else: 898 self.set_state(out_state, handler_time) 899 900 if post_script: 901 self.run_custom_script(post_script, 30) 902 903 self.make_device_available(serial_device) 904 self.record(harness) 905 906 907class QEMUHandler(Handler): 908 """Spawns a thread to monitor QEMU output from pipes 909 910 We pass QEMU_PIPE to 'make run' and monitor the pipes for output. 911 We need to do this as once qemu starts, it runs forever until killed. 912 Test cases emit special messages to the console as they run, we check 913 for these to collect whether the test passed or failed. 914 """ 915 916 def __init__(self, instance, type_str): 917 """Constructor 918 919 @param instance Test instance 920 """ 921 922 super().__init__(instance, type_str) 923 self.fifo_fn = os.path.join(instance.build_dir, "qemu-fifo") 924 925 self.pid_fn = os.path.join(instance.build_dir, "qemu.pid") 926 927 if "ignore_qemu_crash" in instance.testcase.tags: 928 self.ignore_qemu_crash = True 929 self.ignore_unexpected_eof = True 930 else: 931 self.ignore_qemu_crash = False 932 self.ignore_unexpected_eof = False 933 934 @staticmethod 935 def _get_cpu_time(pid): 936 """get process CPU time. 937 938 The guest virtual time in QEMU icount mode isn't host time and 939 it's maintained by counting guest instructions, so we use QEMU 940 process exection time to mostly simulate the time of guest OS. 941 """ 942 proc = psutil.Process(pid) 943 cpu_time = proc.cpu_times() 944 return cpu_time.user + cpu_time.system 945 946 @staticmethod 947 def _thread(handler, timeout, outdir, logfile, fifo_fn, pid_fn, results, harness, 948 ignore_unexpected_eof=False): 949 fifo_in = fifo_fn + ".in" 950 fifo_out = fifo_fn + ".out" 951 952 # These in/out nodes are named from QEMU's perspective, not ours 953 if os.path.exists(fifo_in): 954 os.unlink(fifo_in) 955 os.mkfifo(fifo_in) 956 if os.path.exists(fifo_out): 957 os.unlink(fifo_out) 958 os.mkfifo(fifo_out) 959 960 # We don't do anything with out_fp but we need to open it for 961 # writing so that QEMU doesn't block, due to the way pipes work 962 out_fp = open(fifo_in, "wb") 963 # Disable internal buffering, we don't 964 # want read() or poll() to ever block if there is data in there 965 in_fp = open(fifo_out, "rb", buffering=0) 966 log_out_fp = open(logfile, "wt") 967 968 start_time = time.time() 969 timeout_time = start_time + timeout 970 p = select.poll() 971 p.register(in_fp, select.POLLIN) 972 out_state = None 973 974 line = "" 975 timeout_extended = False 976 977 pid = 0 978 if os.path.exists(pid_fn): 979 pid = int(open(pid_fn).read()) 980 981 while True: 982 this_timeout = int((timeout_time - time.time()) * 1000) 983 if this_timeout < 0 or not p.poll(this_timeout): 984 try: 985 if pid and this_timeout > 0: 986 #there's possibility we polled nothing because 987 #of not enough CPU time scheduled by host for 988 #QEMU process during p.poll(this_timeout) 989 cpu_time = QEMUHandler._get_cpu_time(pid) 990 if cpu_time < timeout and not out_state: 991 timeout_time = time.time() + (timeout - cpu_time) 992 continue 993 except ProcessLookupError: 994 out_state = "failed" 995 break 996 997 if not out_state: 998 out_state = "timeout" 999 break 1000 1001 if pid == 0 and os.path.exists(pid_fn): 1002 pid = int(open(pid_fn).read()) 1003 1004 if harness.is_pytest: 1005 harness.handle(None) 1006 out_state = harness.state 1007 break 1008 1009 try: 1010 c = in_fp.read(1).decode("utf-8") 1011 except UnicodeDecodeError: 1012 # Test is writing something weird, fail 1013 out_state = "unexpected byte" 1014 break 1015 1016 if c == "": 1017 # EOF, this shouldn't happen unless QEMU crashes 1018 if not ignore_unexpected_eof: 1019 out_state = "unexpected eof" 1020 break 1021 line = line + c 1022 if c != "\n": 1023 continue 1024 1025 # line contains a full line of data output from QEMU 1026 log_out_fp.write(line) 1027 log_out_fp.flush() 1028 line = line.strip() 1029 logger.debug(f"QEMU ({pid}): {line}") 1030 1031 harness.handle(line) 1032 if harness.state: 1033 # if we have registered a fail make sure the state is not 1034 # overridden by a false success message coming from the 1035 # testsuite 1036 if out_state not in ['failed', 'unexpected eof', 'unexpected byte']: 1037 out_state = harness.state 1038 1039 # if we get some state, that means test is doing well, we reset 1040 # the timeout and wait for 2 more seconds to catch anything 1041 # printed late. We wait much longer if code 1042 # coverage is enabled since dumping this information can 1043 # take some time. 1044 if not timeout_extended or harness.capture_coverage: 1045 timeout_extended = True 1046 if harness.capture_coverage: 1047 timeout_time = time.time() + 30 1048 else: 1049 timeout_time = time.time() + 2 1050 line = "" 1051 1052 if harness.is_pytest: 1053 harness.pytest_run(logfile) 1054 out_state = harness.state 1055 1056 handler.record(harness) 1057 1058 handler_time = time.time() - start_time 1059 logger.debug(f"QEMU ({pid}) complete ({out_state}) after {handler_time} seconds") 1060 1061 if out_state == "timeout": 1062 handler.instance.reason = "Timeout" 1063 handler.set_state("failed", handler_time) 1064 elif out_state == "failed": 1065 handler.instance.reason = "Failed" 1066 handler.set_state("failed", handler_time) 1067 elif out_state in ['unexpected eof', 'unexpected byte']: 1068 handler.instance.reason = out_state 1069 handler.set_state("failed", handler_time) 1070 else: 1071 handler.set_state(out_state, handler_time) 1072 1073 log_out_fp.close() 1074 out_fp.close() 1075 in_fp.close() 1076 if pid: 1077 try: 1078 if pid: 1079 os.kill(pid, signal.SIGTERM) 1080 except ProcessLookupError: 1081 # Oh well, as long as it's dead! User probably sent Ctrl-C 1082 pass 1083 1084 os.unlink(fifo_in) 1085 os.unlink(fifo_out) 1086 1087 def handle(self): 1088 self.results = {} 1089 self.run = True 1090 1091 # We pass this to QEMU which looks for fifos with .in and .out 1092 # suffixes. 1093 1094 self.fifo_fn = os.path.join(self.instance.build_dir, "qemu-fifo") 1095 self.pid_fn = os.path.join(self.instance.build_dir, "qemu.pid") 1096 1097 if os.path.exists(self.pid_fn): 1098 os.unlink(self.pid_fn) 1099 1100 self.log_fn = self.log 1101 1102 harness_import = HarnessImporter(self.instance.testcase.harness.capitalize()) 1103 harness = harness_import.instance 1104 harness.configure(self.instance) 1105 1106 self.thread = threading.Thread(name=self.name, target=QEMUHandler._thread, 1107 args=(self, self.timeout, self.build_dir, 1108 self.log_fn, self.fifo_fn, 1109 self.pid_fn, self.results, harness, 1110 self.ignore_unexpected_eof)) 1111 1112 self.instance.results = harness.tests 1113 self.thread.daemon = True 1114 logger.debug("Spawning QEMUHandler Thread for %s" % self.name) 1115 self.thread.start() 1116 if sys.stdout.isatty(): 1117 subprocess.call(["stty", "sane"]) 1118 1119 logger.debug("Running %s (%s)" % (self.name, self.type_str)) 1120 command = [self.generator_cmd] 1121 command += ["-C", self.build_dir, "run"] 1122 1123 is_timeout = False 1124 qemu_pid = None 1125 1126 with subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=self.build_dir) as proc: 1127 logger.debug("Spawning QEMUHandler Thread for %s" % self.name) 1128 1129 try: 1130 proc.wait(self.timeout) 1131 except subprocess.TimeoutExpired: 1132 # sometimes QEMU can't handle SIGTERM signal correctly 1133 # in that case kill -9 QEMU process directly and leave 1134 # twister to judge testing result by console output 1135 1136 is_timeout = True 1137 self.terminate(proc) 1138 if harness.state == "passed": 1139 self.returncode = 0 1140 else: 1141 self.returncode = proc.returncode 1142 else: 1143 if os.path.exists(self.pid_fn): 1144 qemu_pid = int(open(self.pid_fn).read()) 1145 logger.debug(f"No timeout, return code from QEMU ({qemu_pid}): {proc.returncode}") 1146 self.returncode = proc.returncode 1147 # Need to wait for harness to finish processing 1148 # output from QEMU. Otherwise it might miss some 1149 # error messages. 1150 self.thread.join(0) 1151 if self.thread.is_alive(): 1152 logger.debug("Timed out while monitoring QEMU output") 1153 1154 if os.path.exists(self.pid_fn): 1155 qemu_pid = int(open(self.pid_fn).read()) 1156 os.unlink(self.pid_fn) 1157 1158 logger.debug(f"return code from QEMU ({qemu_pid}): {self.returncode}") 1159 1160 if (self.returncode != 0 and not self.ignore_qemu_crash) or not harness.state: 1161 self.set_state("failed", 0) 1162 if is_timeout: 1163 self.instance.reason = "Timeout" 1164 else: 1165 self.instance.reason = "Exited with {}".format(self.returncode) 1166 self.add_missing_testscases(harness) 1167 1168 def get_fifo(self): 1169 return self.fifo_fn 1170 1171 1172class SizeCalculator: 1173 alloc_sections = [ 1174 "bss", 1175 "noinit", 1176 "app_bss", 1177 "app_noinit", 1178 "ccm_bss", 1179 "ccm_noinit" 1180 ] 1181 1182 rw_sections = [ 1183 "datas", 1184 "initlevel", 1185 "exceptions", 1186 "initshell", 1187 "_static_thread_data_area", 1188 "k_timer_area", 1189 "k_mem_slab_area", 1190 "k_mem_pool_area", 1191 "sw_isr_table", 1192 "k_sem_area", 1193 "k_mutex_area", 1194 "app_shmem_regions", 1195 "_k_fifo_area", 1196 "_k_lifo_area", 1197 "k_stack_area", 1198 "k_msgq_area", 1199 "k_mbox_area", 1200 "k_pipe_area", 1201 "net_if_area", 1202 "net_if_dev_area", 1203 "net_l2_area", 1204 "net_l2_data", 1205 "k_queue_area", 1206 "_net_buf_pool_area", 1207 "app_datas", 1208 "kobject_data", 1209 "mmu_tables", 1210 "app_pad", 1211 "priv_stacks", 1212 "ccm_data", 1213 "usb_descriptor", 1214 "usb_data", "usb_bos_desc", 1215 "uart_mux", 1216 'log_backends_sections', 1217 'log_dynamic_sections', 1218 'log_const_sections', 1219 "app_smem", 1220 'shell_root_cmds_sections', 1221 'log_const_sections', 1222 "font_entry_sections", 1223 "priv_stacks_noinit", 1224 "_GCOV_BSS_SECTION_NAME", 1225 "gcov", 1226 "nocache", 1227 "devices", 1228 "k_heap_area", 1229 ] 1230 1231 # These get copied into RAM only on non-XIP 1232 ro_sections = [ 1233 "rom_start", 1234 "text", 1235 "ctors", 1236 "init_array", 1237 "reset", 1238 "z_object_assignment_area", 1239 "rodata", 1240 "net_l2", 1241 "vector", 1242 "sw_isr_table", 1243 "settings_handler_static_area", 1244 "bt_l2cap_fixed_chan_area", 1245 "bt_l2cap_br_fixed_chan_area", 1246 "bt_gatt_service_static_area", 1247 "vectors", 1248 "net_socket_register_area", 1249 "net_ppp_proto", 1250 "shell_area", 1251 "tracing_backend_area", 1252 "ppp_protocol_handler_area", 1253 ] 1254 1255 def __init__(self, filename, extra_sections): 1256 """Constructor 1257 1258 @param filename Path to the output binary 1259 The <filename> is parsed by objdump to determine section sizes 1260 """ 1261 # Make sure this is an ELF binary 1262 with open(filename, "rb") as f: 1263 magic = f.read(4) 1264 1265 try: 1266 if magic != b'\x7fELF': 1267 raise TwisterRuntimeError("%s is not an ELF binary" % filename) 1268 except Exception as e: 1269 print(str(e)) 1270 sys.exit(2) 1271 1272 # Search for CONFIG_XIP in the ELF's list of symbols using NM and AWK. 1273 # GREP can not be used as it returns an error if the symbol is not 1274 # found. 1275 is_xip_command = "nm " + filename + \ 1276 " | awk '/CONFIG_XIP/ { print $3 }'" 1277 is_xip_output = subprocess.check_output( 1278 is_xip_command, shell=True, stderr=subprocess.STDOUT).decode( 1279 "utf-8").strip() 1280 try: 1281 if is_xip_output.endswith("no symbols"): 1282 raise TwisterRuntimeError("%s has no symbol information" % filename) 1283 except Exception as e: 1284 print(str(e)) 1285 sys.exit(2) 1286 1287 self.is_xip = (len(is_xip_output) != 0) 1288 1289 self.filename = filename 1290 self.sections = [] 1291 self.rom_size = 0 1292 self.ram_size = 0 1293 self.extra_sections = extra_sections 1294 1295 self._calculate_sizes() 1296 1297 def get_ram_size(self): 1298 """Get the amount of RAM the application will use up on the device 1299 1300 @return amount of RAM, in bytes 1301 """ 1302 return self.ram_size 1303 1304 def get_rom_size(self): 1305 """Get the size of the data that this application uses on device's flash 1306 1307 @return amount of ROM, in bytes 1308 """ 1309 return self.rom_size 1310 1311 def unrecognized_sections(self): 1312 """Get a list of sections inside the binary that weren't recognized 1313 1314 @return list of unrecognized section names 1315 """ 1316 slist = [] 1317 for v in self.sections: 1318 if not v["recognized"]: 1319 slist.append(v["name"]) 1320 return slist 1321 1322 def _calculate_sizes(self): 1323 """ Calculate RAM and ROM usage by section """ 1324 objdump_command = "objdump -h " + self.filename 1325 objdump_output = subprocess.check_output( 1326 objdump_command, shell=True).decode("utf-8").splitlines() 1327 1328 for line in objdump_output: 1329 words = line.split() 1330 1331 if not words: # Skip lines that are too short 1332 continue 1333 1334 index = words[0] 1335 if not index[0].isdigit(): # Skip lines that do not start 1336 continue # with a digit 1337 1338 name = words[1] # Skip lines with section names 1339 if name[0] == '.': # starting with '.' 1340 continue 1341 1342 # TODO this doesn't actually reflect the size in flash or RAM as 1343 # it doesn't include linker-imposed padding between sections. 1344 # It is close though. 1345 size = int(words[2], 16) 1346 if size == 0: 1347 continue 1348 1349 load_addr = int(words[4], 16) 1350 virt_addr = int(words[3], 16) 1351 1352 # Add section to memory use totals (for both non-XIP and XIP scenarios) 1353 # Unrecognized section names are not included in the calculations. 1354 recognized = True 1355 if name in SizeCalculator.alloc_sections: 1356 self.ram_size += size 1357 stype = "alloc" 1358 elif name in SizeCalculator.rw_sections: 1359 self.ram_size += size 1360 self.rom_size += size 1361 stype = "rw" 1362 elif name in SizeCalculator.ro_sections: 1363 self.rom_size += size 1364 if not self.is_xip: 1365 self.ram_size += size 1366 stype = "ro" 1367 else: 1368 stype = "unknown" 1369 if name not in self.extra_sections: 1370 recognized = False 1371 1372 self.sections.append({"name": name, "load_addr": load_addr, 1373 "size": size, "virt_addr": virt_addr, 1374 "type": stype, "recognized": recognized}) 1375 1376 1377 1378class TwisterConfigParser: 1379 """Class to read test case files with semantic checking 1380 """ 1381 1382 def __init__(self, filename, schema): 1383 """Instantiate a new TwisterConfigParser object 1384 1385 @param filename Source .yaml file to read 1386 """ 1387 self.data = {} 1388 self.schema = schema 1389 self.filename = filename 1390 self.tests = {} 1391 self.common = {} 1392 1393 def load(self): 1394 self.data = scl.yaml_load_verify(self.filename, self.schema) 1395 1396 if 'tests' in self.data: 1397 self.tests = self.data['tests'] 1398 if 'common' in self.data: 1399 self.common = self.data['common'] 1400 1401 def _cast_value(self, value, typestr): 1402 if isinstance(value, str): 1403 v = value.strip() 1404 if typestr == "str": 1405 return v 1406 1407 elif typestr == "float": 1408 return float(value) 1409 1410 elif typestr == "int": 1411 return int(value) 1412 1413 elif typestr == "bool": 1414 return value 1415 1416 elif typestr.startswith("list") and isinstance(value, list): 1417 return value 1418 elif typestr.startswith("list") and isinstance(value, str): 1419 vs = v.split() 1420 if len(typestr) > 4 and typestr[4] == ":": 1421 return [self._cast_value(vsi, typestr[5:]) for vsi in vs] 1422 else: 1423 return vs 1424 1425 elif typestr.startswith("set"): 1426 vs = v.split() 1427 if len(typestr) > 3 and typestr[3] == ":": 1428 return {self._cast_value(vsi, typestr[4:]) for vsi in vs} 1429 else: 1430 return set(vs) 1431 1432 elif typestr.startswith("map"): 1433 return value 1434 else: 1435 raise ConfigurationError( 1436 self.filename, "unknown type '%s'" % value) 1437 1438 def get_test(self, name, valid_keys): 1439 """Get a dictionary representing the keys/values within a test 1440 1441 @param name The test in the .yaml file to retrieve data from 1442 @param valid_keys A dictionary representing the intended semantics 1443 for this test. Each key in this dictionary is a key that could 1444 be specified, if a key is given in the .yaml file which isn't in 1445 here, it will generate an error. Each value in this dictionary 1446 is another dictionary containing metadata: 1447 1448 "default" - Default value if not given 1449 "type" - Data type to convert the text value to. Simple types 1450 supported are "str", "float", "int", "bool" which will get 1451 converted to respective Python data types. "set" and "list" 1452 may also be specified which will split the value by 1453 whitespace (but keep the elements as strings). finally, 1454 "list:<type>" and "set:<type>" may be given which will 1455 perform a type conversion after splitting the value up. 1456 "required" - If true, raise an error if not defined. If false 1457 and "default" isn't specified, a type conversion will be 1458 done on an empty string 1459 @return A dictionary containing the test key-value pairs with 1460 type conversion and default values filled in per valid_keys 1461 """ 1462 1463 d = {} 1464 for k, v in self.common.items(): 1465 d[k] = v 1466 1467 for k, v in self.tests[name].items(): 1468 if k in d: 1469 if isinstance(d[k], str): 1470 # By default, we just concatenate string values of keys 1471 # which appear both in "common" and per-test sections, 1472 # but some keys are handled in adhoc way based on their 1473 # semantics. 1474 if k == "filter": 1475 d[k] = "(%s) and (%s)" % (d[k], v) 1476 else: 1477 d[k] += " " + v 1478 else: 1479 d[k] = v 1480 1481 for k, kinfo in valid_keys.items(): 1482 if k not in d: 1483 if "required" in kinfo: 1484 required = kinfo["required"] 1485 else: 1486 required = False 1487 1488 if required: 1489 raise ConfigurationError( 1490 self.filename, 1491 "missing required value for '%s' in test '%s'" % 1492 (k, name)) 1493 else: 1494 if "default" in kinfo: 1495 default = kinfo["default"] 1496 else: 1497 default = self._cast_value("", kinfo["type"]) 1498 d[k] = default 1499 else: 1500 try: 1501 d[k] = self._cast_value(d[k], kinfo["type"]) 1502 except ValueError: 1503 raise ConfigurationError( 1504 self.filename, "bad %s value '%s' for key '%s' in name '%s'" % 1505 (kinfo["type"], d[k], k, name)) 1506 1507 return d 1508 1509 1510class Platform: 1511 """Class representing metadata for a particular platform 1512 1513 Maps directly to BOARD when building""" 1514 1515 platform_schema = scl.yaml_load(os.path.join(ZEPHYR_BASE, 1516 "scripts", "schemas", "twister", "platform-schema.yaml")) 1517 1518 def __init__(self): 1519 """Constructor. 1520 1521 """ 1522 1523 self.name = "" 1524 self.twister = True 1525 # if no RAM size is specified by the board, take a default of 128K 1526 self.ram = 128 1527 1528 self.ignore_tags = [] 1529 self.only_tags = [] 1530 self.default = False 1531 # if no flash size is specified by the board, take a default of 512K 1532 self.flash = 512 1533 self.supported = set() 1534 1535 self.arch = "" 1536 self.type = "na" 1537 self.simulation = "na" 1538 self.supported_toolchains = [] 1539 self.env = [] 1540 self.env_satisfied = True 1541 self.filter_data = dict() 1542 1543 def load(self, platform_file): 1544 scp = TwisterConfigParser(platform_file, self.platform_schema) 1545 scp.load() 1546 data = scp.data 1547 1548 self.name = data['identifier'] 1549 self.twister = data.get("twister", True) 1550 # if no RAM size is specified by the board, take a default of 128K 1551 self.ram = data.get("ram", 128) 1552 testing = data.get("testing", {}) 1553 self.ignore_tags = testing.get("ignore_tags", []) 1554 self.only_tags = testing.get("only_tags", []) 1555 self.default = testing.get("default", False) 1556 # if no flash size is specified by the board, take a default of 512K 1557 self.flash = data.get("flash", 512) 1558 self.supported = set() 1559 for supp_feature in data.get("supported", []): 1560 for item in supp_feature.split(":"): 1561 self.supported.add(item) 1562 1563 self.arch = data['arch'] 1564 self.type = data.get('type', "na") 1565 self.simulation = data.get('simulation', "na") 1566 self.supported_toolchains = data.get("toolchain", []) 1567 self.env = data.get("env", []) 1568 self.env_satisfied = True 1569 for env in self.env: 1570 if not os.environ.get(env, None): 1571 self.env_satisfied = False 1572 1573 def __repr__(self): 1574 return "<%s on %s>" % (self.name, self.arch) 1575 1576 1577class DisablePyTestCollectionMixin(object): 1578 __test__ = False 1579 1580 1581class TestCase(DisablePyTestCollectionMixin): 1582 """Class representing a test application 1583 """ 1584 1585 def __init__(self, testcase_root, workdir, name): 1586 """TestCase constructor. 1587 1588 This gets called by TestSuite as it finds and reads test yaml files. 1589 Multiple TestCase instances may be generated from a single testcase.yaml, 1590 each one corresponds to an entry within that file. 1591 1592 We need to have a unique name for every single test case. Since 1593 a testcase.yaml can define multiple tests, the canonical name for 1594 the test case is <workdir>/<name>. 1595 1596 @param testcase_root os.path.abspath() of one of the --testcase-root 1597 @param workdir Sub-directory of testcase_root where the 1598 .yaml test configuration file was found 1599 @param name Name of this test case, corresponding to the entry name 1600 in the test case configuration file. For many test cases that just 1601 define one test, can be anything and is usually "test". This is 1602 really only used to distinguish between different cases when 1603 the testcase.yaml defines multiple tests 1604 """ 1605 1606 1607 self.source_dir = "" 1608 self.yamlfile = "" 1609 self.cases = [] 1610 self.name = self.get_unique(testcase_root, workdir, name) 1611 self.id = name 1612 1613 self.type = None 1614 self.tags = set() 1615 self.extra_args = None 1616 self.extra_configs = None 1617 self.arch_allow = None 1618 self.arch_exclude = None 1619 self.skip = False 1620 self.platform_exclude = None 1621 self.platform_allow = None 1622 self.toolchain_exclude = None 1623 self.toolchain_allow = None 1624 self.tc_filter = None 1625 self.timeout = 60 1626 self.harness = "" 1627 self.harness_config = {} 1628 self.build_only = True 1629 self.build_on_all = False 1630 self.slow = False 1631 self.min_ram = -1 1632 self.depends_on = None 1633 self.min_flash = -1 1634 self.extra_sections = None 1635 self.integration_platforms = [] 1636 1637 @staticmethod 1638 def get_unique(testcase_root, workdir, name): 1639 1640 canonical_testcase_root = os.path.realpath(testcase_root) 1641 if Path(canonical_zephyr_base) in Path(canonical_testcase_root).parents: 1642 # This is in ZEPHYR_BASE, so include path in name for uniqueness 1643 # FIXME: We should not depend on path of test for unique names. 1644 relative_tc_root = os.path.relpath(canonical_testcase_root, 1645 start=canonical_zephyr_base) 1646 else: 1647 relative_tc_root = "" 1648 1649 # workdir can be "." 1650 unique = os.path.normpath(os.path.join(relative_tc_root, workdir, name)) 1651 check = name.split(".") 1652 if len(check) < 2: 1653 raise TwisterException(f"""bad test name '{name}' in {testcase_root}/{workdir}. \ 1654Tests should reference the category and subsystem with a dot as a separator. 1655 """ 1656 ) 1657 return unique 1658 1659 @staticmethod 1660 def scan_file(inf_name): 1661 suite_regex = re.compile( 1662 # do not match until end-of-line, otherwise we won't allow 1663 # stc_regex below to catch the ones that are declared in the same 1664 # line--as we only search starting the end of this match 1665 br"^\s*ztest_test_suite\(\s*(?P<suite_name>[a-zA-Z0-9_]+)\s*,", 1666 re.MULTILINE) 1667 stc_regex = re.compile( 1668 br"^\s*" # empy space at the beginning is ok 1669 # catch the case where it is declared in the same sentence, e.g: 1670 # 1671 # ztest_test_suite(mutex_complex, ztest_user_unit_test(TESTNAME)); 1672 br"(?:ztest_test_suite\([a-zA-Z0-9_]+,\s*)?" 1673 # Catch ztest[_user]_unit_test-[_setup_teardown](TESTNAME) 1674 br"ztest_(?:1cpu_)?(?:user_)?unit_test(?:_setup_teardown)?" 1675 # Consume the argument that becomes the extra testcse 1676 br"\(\s*" 1677 br"(?P<stc_name>[a-zA-Z0-9_]+)" 1678 # _setup_teardown() variant has two extra arguments that we ignore 1679 br"(?:\s*,\s*[a-zA-Z0-9_]+\s*,\s*[a-zA-Z0-9_]+)?" 1680 br"\s*\)", 1681 # We don't check how it finishes; we don't care 1682 re.MULTILINE) 1683 suite_run_regex = re.compile( 1684 br"^\s*ztest_run_test_suite\((?P<suite_name>[a-zA-Z0-9_]+)\)", 1685 re.MULTILINE) 1686 achtung_regex = re.compile( 1687 br"(#ifdef|#endif)", 1688 re.MULTILINE) 1689 warnings = None 1690 1691 with open(inf_name) as inf: 1692 if os.name == 'nt': 1693 mmap_args = {'fileno': inf.fileno(), 'length': 0, 'access': mmap.ACCESS_READ} 1694 else: 1695 mmap_args = {'fileno': inf.fileno(), 'length': 0, 'flags': mmap.MAP_PRIVATE, 'prot': mmap.PROT_READ, 1696 'offset': 0} 1697 1698 with contextlib.closing(mmap.mmap(**mmap_args)) as main_c: 1699 suite_regex_match = suite_regex.search(main_c) 1700 if not suite_regex_match: 1701 # can't find ztest_test_suite, maybe a client, because 1702 # it includes ztest.h 1703 return None, None 1704 1705 suite_run_match = suite_run_regex.search(main_c) 1706 if not suite_run_match: 1707 raise ValueError("can't find ztest_run_test_suite") 1708 1709 achtung_matches = re.findall( 1710 achtung_regex, 1711 main_c[suite_regex_match.end():suite_run_match.start()]) 1712 if achtung_matches: 1713 warnings = "found invalid %s in ztest_test_suite()" \ 1714 % ", ".join(sorted({match.decode() for match in achtung_matches},reverse = True)) 1715 _matches = re.findall( 1716 stc_regex, 1717 main_c[suite_regex_match.end():suite_run_match.start()]) 1718 for match in _matches: 1719 if not match.decode().startswith("test_"): 1720 warnings = "Found a test that does not start with test_" 1721 matches = [match.decode().replace("test_", "", 1) for match in _matches] 1722 return matches, warnings 1723 1724 def scan_path(self, path): 1725 subcases = [] 1726 for filename in glob.glob(os.path.join(path, "src", "*.c*")): 1727 try: 1728 _subcases, warnings = self.scan_file(filename) 1729 if warnings: 1730 logger.error("%s: %s" % (filename, warnings)) 1731 raise TwisterRuntimeError("%s: %s" % (filename, warnings)) 1732 if _subcases: 1733 subcases += _subcases 1734 except ValueError as e: 1735 logger.error("%s: can't find: %s" % (filename, e)) 1736 1737 for filename in glob.glob(os.path.join(path, "*.c")): 1738 try: 1739 _subcases, warnings = self.scan_file(filename) 1740 if warnings: 1741 logger.error("%s: %s" % (filename, warnings)) 1742 if _subcases: 1743 subcases += _subcases 1744 except ValueError as e: 1745 logger.error("%s: can't find: %s" % (filename, e)) 1746 return subcases 1747 1748 def parse_subcases(self, test_path): 1749 results = self.scan_path(test_path) 1750 for sub in results: 1751 name = "{}.{}".format(self.id, sub) 1752 self.cases.append(name) 1753 1754 if not results: 1755 self.cases.append(self.id) 1756 1757 def __str__(self): 1758 return self.name 1759 1760 1761class TestInstance(DisablePyTestCollectionMixin): 1762 """Class representing the execution of a particular TestCase on a platform 1763 1764 @param test The TestCase object we want to build/execute 1765 @param platform Platform object that we want to build and run against 1766 @param base_outdir Base directory for all test results. The actual 1767 out directory used is <outdir>/<platform>/<test case name> 1768 """ 1769 1770 def __init__(self, testcase, platform, outdir): 1771 1772 self.testcase = testcase 1773 self.platform = platform 1774 1775 self.status = None 1776 self.reason = "Unknown" 1777 self.metrics = dict() 1778 self.handler = None 1779 self.outdir = outdir 1780 1781 self.name = os.path.join(platform.name, testcase.name) 1782 self.build_dir = os.path.join(outdir, platform.name, testcase.name) 1783 1784 self.run = False 1785 1786 self.results = {} 1787 1788 def __getstate__(self): 1789 d = self.__dict__.copy() 1790 return d 1791 1792 def __setstate__(self, d): 1793 self.__dict__.update(d) 1794 1795 def __lt__(self, other): 1796 return self.name < other.name 1797 1798 1799 @staticmethod 1800 def testcase_runnable(testcase, fixtures): 1801 can_run = False 1802 # console harness allows us to run the test and capture data. 1803 if testcase.harness in [ 'console', 'ztest', 'pytest']: 1804 can_run = True 1805 # if we have a fixture that is also being supplied on the 1806 # command-line, then we need to run the test, not just build it. 1807 fixture = testcase.harness_config.get('fixture') 1808 if fixture: 1809 can_run = (fixture in fixtures) 1810 1811 elif testcase.harness: 1812 can_run = False 1813 else: 1814 can_run = True 1815 1816 return can_run 1817 1818 1819 # Global testsuite parameters 1820 def check_runnable(self, enable_slow=False, filter='buildable', fixtures=[]): 1821 1822 # right now we only support building on windows. running is still work 1823 # in progress. 1824 if os.name == 'nt': 1825 return False 1826 1827 # we asked for build-only on the command line 1828 if self.testcase.build_only: 1829 return False 1830 1831 # Do not run slow tests: 1832 skip_slow = self.testcase.slow and not enable_slow 1833 if skip_slow: 1834 return False 1835 1836 target_ready = bool(self.testcase.type == "unit" or \ 1837 self.platform.type == "native" or \ 1838 self.platform.simulation in ["mdb-nsim", "nsim", "renode", "qemu", "tsim", "armfvp"] or \ 1839 filter == 'runnable') 1840 1841 if self.platform.simulation == "nsim": 1842 if not find_executable("nsimdrv"): 1843 target_ready = False 1844 1845 if self.platform.simulation == "mdb-nsim": 1846 if not find_executable("mdb"): 1847 target_ready = False 1848 1849 if self.platform.simulation == "renode": 1850 if not find_executable("renode"): 1851 target_ready = False 1852 1853 if self.platform.simulation == "tsim": 1854 if not find_executable("tsim-leon3"): 1855 target_ready = False 1856 1857 testcase_runnable = self.testcase_runnable(self.testcase, fixtures) 1858 1859 return testcase_runnable and target_ready 1860 1861 def create_overlay(self, platform, enable_asan=False, enable_ubsan=False, enable_coverage=False, coverage_platform=[]): 1862 # Create this in a "twister/" subdirectory otherwise this 1863 # will pass this overlay to kconfig.py *twice* and kconfig.cmake 1864 # will silently give that second time precedence over any 1865 # --extra-args=CONFIG_* 1866 subdir = os.path.join(self.build_dir, "twister") 1867 1868 content = "" 1869 1870 if self.testcase.extra_configs: 1871 content = "\n".join(self.testcase.extra_configs) 1872 1873 if enable_coverage: 1874 if platform.name in coverage_platform: 1875 content = content + "\nCONFIG_COVERAGE=y" 1876 content = content + "\nCONFIG_COVERAGE_DUMP=y" 1877 1878 if enable_asan: 1879 if platform.type == "native": 1880 content = content + "\nCONFIG_ASAN=y" 1881 1882 if enable_ubsan: 1883 if platform.type == "native": 1884 content = content + "\nCONFIG_UBSAN=y" 1885 1886 if content: 1887 os.makedirs(subdir, exist_ok=True) 1888 file = os.path.join(subdir, "testcase_extra.conf") 1889 with open(file, "w") as f: 1890 f.write(content) 1891 1892 return content 1893 1894 def calculate_sizes(self): 1895 """Get the RAM/ROM sizes of a test case. 1896 1897 This can only be run after the instance has been executed by 1898 MakeGenerator, otherwise there won't be any binaries to measure. 1899 1900 @return A SizeCalculator object 1901 """ 1902 fns = glob.glob(os.path.join(self.build_dir, "zephyr", "*.elf")) 1903 fns.extend(glob.glob(os.path.join(self.build_dir, "zephyr", "*.exe"))) 1904 fns = [x for x in fns if not x.endswith('_prebuilt.elf')] 1905 if len(fns) != 1: 1906 raise BuildError("Missing/multiple output ELF binary") 1907 1908 return SizeCalculator(fns[0], self.testcase.extra_sections) 1909 1910 def fill_results_by_status(self): 1911 """Fills results according to self.status 1912 1913 The method is used to propagate the instance level status 1914 to the test cases inside. Useful when the whole instance is skipped 1915 and the info is required also at the test cases level for reporting. 1916 Should be used with caution, e.g. should not be used 1917 to fill all results with passes 1918 """ 1919 status_to_verdict = { 1920 'skipped': 'SKIP', 1921 'error': 'BLOCK', 1922 'failure': 'FAILED' 1923 } 1924 1925 for k in self.results: 1926 self.results[k] = status_to_verdict[self.status] 1927 1928 def __repr__(self): 1929 return "<TestCase %s on %s>" % (self.testcase.name, self.platform.name) 1930 1931 1932class CMake(): 1933 config_re = re.compile('(CONFIG_[A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$') 1934 dt_re = re.compile('([A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$') 1935 1936 def __init__(self, testcase, platform, source_dir, build_dir): 1937 1938 self.cwd = None 1939 self.capture_output = True 1940 1941 self.defconfig = {} 1942 self.cmake_cache = {} 1943 1944 self.instance = None 1945 self.testcase = testcase 1946 self.platform = platform 1947 self.source_dir = source_dir 1948 self.build_dir = build_dir 1949 self.log = "build.log" 1950 self.generator = None 1951 self.generator_cmd = None 1952 1953 def parse_generated(self): 1954 self.defconfig = {} 1955 return {} 1956 1957 def run_build(self, args=[]): 1958 1959 logger.debug("Building %s for %s" % (self.source_dir, self.platform.name)) 1960 1961 cmake_args = [] 1962 cmake_args.extend(args) 1963 cmake = shutil.which('cmake') 1964 cmd = [cmake] + cmake_args 1965 kwargs = dict() 1966 1967 if self.capture_output: 1968 kwargs['stdout'] = subprocess.PIPE 1969 # CMake sends the output of message() to stderr unless it's STATUS 1970 kwargs['stderr'] = subprocess.STDOUT 1971 1972 if self.cwd: 1973 kwargs['cwd'] = self.cwd 1974 1975 p = subprocess.Popen(cmd, **kwargs) 1976 out, _ = p.communicate() 1977 1978 results = {} 1979 if p.returncode == 0: 1980 msg = "Finished building %s for %s" % (self.source_dir, self.platform.name) 1981 1982 self.instance.status = "passed" 1983 results = {'msg': msg, "returncode": p.returncode, "instance": self.instance} 1984 1985 if out: 1986 log_msg = out.decode(sys.getdefaultencoding()) 1987 with open(os.path.join(self.build_dir, self.log), "a") as log: 1988 log.write(log_msg) 1989 1990 else: 1991 return None 1992 else: 1993 # A real error occurred, raise an exception 1994 log_msg = "" 1995 if out: 1996 log_msg = out.decode(sys.getdefaultencoding()) 1997 with open(os.path.join(self.build_dir, self.log), "a") as log: 1998 log.write(log_msg) 1999 2000 if log_msg: 2001 res = re.findall("region `(FLASH|ROM|RAM|ICCM|DCCM|SRAM)' overflowed by", log_msg) 2002 if res and not self.overflow_as_errors: 2003 logger.debug("Test skipped due to {} Overflow".format(res[0])) 2004 self.instance.status = "skipped" 2005 self.instance.reason = "{} overflow".format(res[0]) 2006 else: 2007 self.instance.status = "error" 2008 self.instance.reason = "Build failure" 2009 2010 results = { 2011 "returncode": p.returncode, 2012 "instance": self.instance, 2013 } 2014 2015 return results 2016 2017 def run_cmake(self, args=[]): 2018 2019 if self.warnings_as_errors: 2020 warnings_as_errors = 'y' 2021 gen_defines_args = "--edtlib-Werror" 2022 else: 2023 warnings_as_errors = 'n' 2024 gen_defines_args = "" 2025 2026 logger.debug("Running cmake on %s for %s" % (self.source_dir, self.platform.name)) 2027 cmake_args = [ 2028 f'-B{self.build_dir}', 2029 f'-S{self.source_dir}', 2030 f'-DCONFIG_COMPILER_WARNINGS_AS_ERRORS={warnings_as_errors}', 2031 f'-DEXTRA_GEN_DEFINES_ARGS={gen_defines_args}', 2032 f'-G{self.generator}' 2033 ] 2034 2035 args = ["-D{}".format(a.replace('"', '')) for a in args] 2036 cmake_args.extend(args) 2037 2038 cmake_opts = ['-DBOARD={}'.format(self.platform.name)] 2039 cmake_args.extend(cmake_opts) 2040 2041 2042 logger.debug("Calling cmake with arguments: {}".format(cmake_args)) 2043 cmake = shutil.which('cmake') 2044 cmd = [cmake] + cmake_args 2045 kwargs = dict() 2046 2047 if self.capture_output: 2048 kwargs['stdout'] = subprocess.PIPE 2049 # CMake sends the output of message() to stderr unless it's STATUS 2050 kwargs['stderr'] = subprocess.STDOUT 2051 2052 if self.cwd: 2053 kwargs['cwd'] = self.cwd 2054 2055 p = subprocess.Popen(cmd, **kwargs) 2056 out, _ = p.communicate() 2057 2058 if p.returncode == 0: 2059 filter_results = self.parse_generated() 2060 msg = "Finished building %s for %s" % (self.source_dir, self.platform.name) 2061 logger.debug(msg) 2062 results = {'msg': msg, 'filter': filter_results} 2063 2064 else: 2065 self.instance.status = "error" 2066 self.instance.reason = "Cmake build failure" 2067 self.instance.fill_results_by_status() 2068 logger.error("Cmake build failure: %s for %s" % (self.source_dir, self.platform.name)) 2069 results = {"returncode": p.returncode} 2070 2071 if out: 2072 with open(os.path.join(self.build_dir, self.log), "a") as log: 2073 log_msg = out.decode(sys.getdefaultencoding()) 2074 log.write(log_msg) 2075 2076 return results 2077 2078 @staticmethod 2079 def run_cmake_script(args=[]): 2080 2081 logger.debug("Running cmake script %s" % (args[0])) 2082 2083 cmake_args = ["-D{}".format(a.replace('"', '')) for a in args[1:]] 2084 cmake_args.extend(['-P', args[0]]) 2085 2086 logger.debug("Calling cmake with arguments: {}".format(cmake_args)) 2087 cmake = shutil.which('cmake') 2088 if not cmake: 2089 msg = "Unable to find `cmake` in path" 2090 logger.error(msg) 2091 raise Exception(msg) 2092 cmd = [cmake] + cmake_args 2093 2094 kwargs = dict() 2095 kwargs['stdout'] = subprocess.PIPE 2096 # CMake sends the output of message() to stderr unless it's STATUS 2097 kwargs['stderr'] = subprocess.STDOUT 2098 2099 p = subprocess.Popen(cmd, **kwargs) 2100 out, _ = p.communicate() 2101 2102 # It might happen that the environment adds ANSI escape codes like \x1b[0m, 2103 # for instance if twister is executed from inside a makefile. In such a 2104 # scenario it is then necessary to remove them, as otherwise the JSON decoding 2105 # will fail. 2106 ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') 2107 out = ansi_escape.sub('', out.decode()) 2108 2109 if p.returncode == 0: 2110 msg = "Finished running %s" % (args[0]) 2111 logger.debug(msg) 2112 results = {"returncode": p.returncode, "msg": msg, "stdout": out} 2113 2114 else: 2115 logger.error("Cmake script failure: %s" % (args[0])) 2116 results = {"returncode": p.returncode, "returnmsg": out} 2117 2118 return results 2119 2120 2121class FilterBuilder(CMake): 2122 2123 def __init__(self, testcase, platform, source_dir, build_dir): 2124 super().__init__(testcase, platform, source_dir, build_dir) 2125 2126 self.log = "config-twister.log" 2127 2128 def parse_generated(self): 2129 2130 if self.platform.name == "unit_testing": 2131 return {} 2132 2133 cmake_cache_path = os.path.join(self.build_dir, "CMakeCache.txt") 2134 defconfig_path = os.path.join(self.build_dir, "zephyr", ".config") 2135 2136 with open(defconfig_path, "r") as fp: 2137 defconfig = {} 2138 for line in fp.readlines(): 2139 m = self.config_re.match(line) 2140 if not m: 2141 if line.strip() and not line.startswith("#"): 2142 sys.stderr.write("Unrecognized line %s\n" % line) 2143 continue 2144 defconfig[m.group(1)] = m.group(2).strip() 2145 2146 self.defconfig = defconfig 2147 2148 cmake_conf = {} 2149 try: 2150 cache = CMakeCache.from_file(cmake_cache_path) 2151 except FileNotFoundError: 2152 cache = {} 2153 2154 for k in iter(cache): 2155 cmake_conf[k.name] = k.value 2156 2157 self.cmake_cache = cmake_conf 2158 2159 filter_data = { 2160 "ARCH": self.platform.arch, 2161 "PLATFORM": self.platform.name 2162 } 2163 filter_data.update(os.environ) 2164 filter_data.update(self.defconfig) 2165 filter_data.update(self.cmake_cache) 2166 2167 edt_pickle = os.path.join(self.build_dir, "zephyr", "edt.pickle") 2168 if self.testcase and self.testcase.tc_filter: 2169 try: 2170 if os.path.exists(edt_pickle): 2171 with open(edt_pickle, 'rb') as f: 2172 edt = pickle.load(f) 2173 else: 2174 edt = None 2175 res = expr_parser.parse(self.testcase.tc_filter, filter_data, edt) 2176 2177 except (ValueError, SyntaxError) as se: 2178 sys.stderr.write( 2179 "Failed processing %s\n" % self.testcase.yamlfile) 2180 raise se 2181 2182 if not res: 2183 return {os.path.join(self.platform.name, self.testcase.name): True} 2184 else: 2185 return {os.path.join(self.platform.name, self.testcase.name): False} 2186 else: 2187 self.platform.filter_data = filter_data 2188 return filter_data 2189 2190 2191class ProjectBuilder(FilterBuilder): 2192 2193 def __init__(self, suite, instance, **kwargs): 2194 super().__init__(instance.testcase, instance.platform, instance.testcase.source_dir, instance.build_dir) 2195 2196 self.log = "build.log" 2197 self.instance = instance 2198 self.suite = suite 2199 self.filtered_tests = 0 2200 2201 self.lsan = kwargs.get('lsan', False) 2202 self.asan = kwargs.get('asan', False) 2203 self.ubsan = kwargs.get('ubsan', False) 2204 self.valgrind = kwargs.get('valgrind', False) 2205 self.extra_args = kwargs.get('extra_args', []) 2206 self.device_testing = kwargs.get('device_testing', False) 2207 self.cmake_only = kwargs.get('cmake_only', False) 2208 self.cleanup = kwargs.get('cleanup', False) 2209 self.coverage = kwargs.get('coverage', False) 2210 self.inline_logs = kwargs.get('inline_logs', False) 2211 self.generator = kwargs.get('generator', None) 2212 self.generator_cmd = kwargs.get('generator_cmd', None) 2213 self.verbose = kwargs.get('verbose', None) 2214 self.warnings_as_errors = kwargs.get('warnings_as_errors', True) 2215 self.overflow_as_errors = kwargs.get('overflow_as_errors', False) 2216 2217 @staticmethod 2218 def log_info(filename, inline_logs): 2219 filename = os.path.abspath(os.path.realpath(filename)) 2220 if inline_logs: 2221 logger.info("{:-^100}".format(filename)) 2222 2223 try: 2224 with open(filename) as fp: 2225 data = fp.read() 2226 except Exception as e: 2227 data = "Unable to read log data (%s)\n" % (str(e)) 2228 2229 logger.error(data) 2230 2231 logger.info("{:-^100}".format(filename)) 2232 else: 2233 logger.error("see: " + Fore.YELLOW + filename + Fore.RESET) 2234 2235 def log_info_file(self, inline_logs): 2236 build_dir = self.instance.build_dir 2237 h_log = "{}/handler.log".format(build_dir) 2238 b_log = "{}/build.log".format(build_dir) 2239 v_log = "{}/valgrind.log".format(build_dir) 2240 d_log = "{}/device.log".format(build_dir) 2241 2242 if os.path.exists(v_log) and "Valgrind" in self.instance.reason: 2243 self.log_info("{}".format(v_log), inline_logs) 2244 elif os.path.exists(h_log) and os.path.getsize(h_log) > 0: 2245 self.log_info("{}".format(h_log), inline_logs) 2246 elif os.path.exists(d_log) and os.path.getsize(d_log) > 0: 2247 self.log_info("{}".format(d_log), inline_logs) 2248 else: 2249 self.log_info("{}".format(b_log), inline_logs) 2250 2251 def setup_handler(self): 2252 2253 instance = self.instance 2254 args = [] 2255 2256 # FIXME: Needs simplification 2257 if instance.platform.simulation == "qemu": 2258 instance.handler = QEMUHandler(instance, "qemu") 2259 args.append("QEMU_PIPE=%s" % instance.handler.get_fifo()) 2260 instance.handler.call_make_run = True 2261 elif instance.testcase.type == "unit": 2262 instance.handler = BinaryHandler(instance, "unit") 2263 instance.handler.binary = os.path.join(instance.build_dir, "testbinary") 2264 if self.coverage: 2265 args.append("COVERAGE=1") 2266 elif instance.platform.type == "native": 2267 handler = BinaryHandler(instance, "native") 2268 2269 handler.asan = self.asan 2270 handler.valgrind = self.valgrind 2271 handler.lsan = self.lsan 2272 handler.ubsan = self.ubsan 2273 handler.coverage = self.coverage 2274 2275 handler.binary = os.path.join(instance.build_dir, "zephyr", "zephyr.exe") 2276 instance.handler = handler 2277 elif instance.platform.simulation == "renode": 2278 if find_executable("renode"): 2279 instance.handler = BinaryHandler(instance, "renode") 2280 instance.handler.pid_fn = os.path.join(instance.build_dir, "renode.pid") 2281 instance.handler.call_make_run = True 2282 elif instance.platform.simulation == "tsim": 2283 instance.handler = BinaryHandler(instance, "tsim") 2284 instance.handler.call_make_run = True 2285 elif self.device_testing: 2286 instance.handler = DeviceHandler(instance, "device") 2287 instance.handler.coverage = self.coverage 2288 elif instance.platform.simulation == "nsim": 2289 if find_executable("nsimdrv"): 2290 instance.handler = BinaryHandler(instance, "nsim") 2291 instance.handler.call_make_run = True 2292 elif instance.platform.simulation == "mdb-nsim": 2293 if find_executable("mdb"): 2294 instance.handler = BinaryHandler(instance, "nsim") 2295 instance.handler.call_make_run = True 2296 elif instance.platform.simulation == "armfvp": 2297 instance.handler = BinaryHandler(instance, "armfvp") 2298 instance.handler.call_make_run = True 2299 2300 if instance.handler: 2301 instance.handler.args = args 2302 instance.handler.generator_cmd = self.generator_cmd 2303 instance.handler.generator = self.generator 2304 2305 def process(self, pipeline, done, message, lock, results): 2306 op = message.get('op') 2307 2308 if not self.instance.handler: 2309 self.setup_handler() 2310 2311 # The build process, call cmake and build with configured generator 2312 if op == "cmake": 2313 res = self.cmake() 2314 if self.instance.status in ["failed", "error"]: 2315 pipeline.put({"op": "report", "test": self.instance}) 2316 elif self.cmake_only: 2317 if self.instance.status is None: 2318 self.instance.status = "passed" 2319 pipeline.put({"op": "report", "test": self.instance}) 2320 else: 2321 if self.instance.name in res['filter'] and res['filter'][self.instance.name]: 2322 logger.debug("filtering %s" % self.instance.name) 2323 self.instance.status = "skipped" 2324 self.instance.reason = "filter" 2325 results.skipped_runtime += 1 2326 for case in self.instance.testcase.cases: 2327 self.instance.results.update({case: 'SKIP'}) 2328 pipeline.put({"op": "report", "test": self.instance}) 2329 else: 2330 pipeline.put({"op": "build", "test": self.instance}) 2331 2332 elif op == "build": 2333 logger.debug("build test: %s" % self.instance.name) 2334 res = self.build() 2335 2336 if not res: 2337 self.instance.status = "error" 2338 self.instance.reason = "Build Failure" 2339 pipeline.put({"op": "report", "test": self.instance}) 2340 else: 2341 # Count skipped cases during build, for example 2342 # due to ram/rom overflow. 2343 inst = res.get("instance", None) 2344 if inst and inst.status == "skipped": 2345 results.skipped_runtime += 1 2346 2347 if res.get('returncode', 1) > 0: 2348 pipeline.put({"op": "report", "test": self.instance}) 2349 else: 2350 if self.instance.run and self.instance.handler: 2351 pipeline.put({"op": "run", "test": self.instance}) 2352 else: 2353 pipeline.put({"op": "report", "test": self.instance}) 2354 # Run the generated binary using one of the supported handlers 2355 elif op == "run": 2356 logger.debug("run test: %s" % self.instance.name) 2357 self.run() 2358 self.instance.status, _ = self.instance.handler.get_state() 2359 logger.debug(f"run status: {self.instance.name} {self.instance.status}") 2360 2361 # to make it work with pickle 2362 self.instance.handler.thread = None 2363 self.instance.handler.suite = None 2364 pipeline.put({ 2365 "op": "report", 2366 "test": self.instance, 2367 "status": self.instance.status, 2368 "reason": self.instance.reason 2369 } 2370 ) 2371 2372 # Report results and output progress to screen 2373 elif op == "report": 2374 with lock: 2375 done.put(self.instance) 2376 self.report_out(results) 2377 2378 if self.cleanup and not self.coverage and self.instance.status == "passed": 2379 pipeline.put({ 2380 "op": "cleanup", 2381 "test": self.instance 2382 }) 2383 2384 elif op == "cleanup": 2385 if self.device_testing: 2386 self.cleanup_device_testing_artifacts() 2387 else: 2388 self.cleanup_artifacts() 2389 2390 def cleanup_artifacts(self, additional_keep=[]): 2391 logger.debug("Cleaning up {}".format(self.instance.build_dir)) 2392 allow = [ 2393 'zephyr/.config', 2394 'handler.log', 2395 'build.log', 2396 'device.log', 2397 'recording.csv', 2398 ] 2399 2400 allow += additional_keep 2401 2402 allow = [os.path.join(self.instance.build_dir, file) for file in allow] 2403 2404 for dirpath, dirnames, filenames in os.walk(self.instance.build_dir, topdown=False): 2405 for name in filenames: 2406 path = os.path.join(dirpath, name) 2407 if path not in allow: 2408 os.remove(path) 2409 # Remove empty directories and symbolic links to directories 2410 for dir in dirnames: 2411 path = os.path.join(dirpath, dir) 2412 if os.path.islink(path): 2413 os.remove(path) 2414 elif not os.listdir(path): 2415 os.rmdir(path) 2416 2417 def cleanup_device_testing_artifacts(self): 2418 logger.debug("Cleaning up for Device Testing {}".format(self.instance.build_dir)) 2419 2420 sanitizelist = [ 2421 'CMakeCache.txt', 2422 'zephyr/runners.yaml', 2423 ] 2424 keep = [ 2425 'zephyr/zephyr.hex', 2426 'zephyr/zephyr.bin', 2427 'zephyr/zephyr.elf', 2428 ] 2429 2430 keep += sanitizelist 2431 2432 self.cleanup_artifacts(keep) 2433 2434 # sanitize paths so files are relocatable 2435 for file in sanitizelist: 2436 file = os.path.join(self.instance.build_dir, file) 2437 2438 with open(file, "rt") as fin: 2439 data = fin.read() 2440 data = data.replace(canonical_zephyr_base+"/", "") 2441 2442 with open(file, "wt") as fin: 2443 fin.write(data) 2444 2445 def report_out(self, results): 2446 total_to_do = results.total - results.skipped_configs 2447 total_tests_width = len(str(total_to_do)) 2448 results.done += 1 2449 instance = self.instance 2450 2451 if instance.status in ["error", "failed", "timeout", "flash_error"]: 2452 if instance.status == "error": 2453 results.error += 1 2454 results.failed += 1 2455 if self.verbose: 2456 status = Fore.RED + "FAILED " + Fore.RESET + instance.reason 2457 else: 2458 print("") 2459 logger.error( 2460 "{:<25} {:<50} {}FAILED{}: {}".format( 2461 instance.platform.name, 2462 instance.testcase.name, 2463 Fore.RED, 2464 Fore.RESET, 2465 instance.reason)) 2466 if not self.verbose: 2467 self.log_info_file(self.inline_logs) 2468 elif instance.status == "skipped": 2469 status = Fore.YELLOW + "SKIPPED" + Fore.RESET 2470 elif instance.status == "passed": 2471 status = Fore.GREEN + "PASSED" + Fore.RESET 2472 else: 2473 logger.debug(f"Unknown status = {instance.status}") 2474 status = Fore.YELLOW + "UNKNOWN" + Fore.RESET 2475 2476 if self.verbose: 2477 if self.cmake_only: 2478 more_info = "cmake" 2479 elif instance.status == "skipped": 2480 more_info = instance.reason 2481 else: 2482 if instance.handler and instance.run: 2483 more_info = instance.handler.type_str 2484 htime = instance.handler.duration 2485 if htime: 2486 more_info += " {:.3f}s".format(htime) 2487 else: 2488 more_info = "build" 2489 2490 logger.info("{:>{}}/{} {:<25} {:<50} {} ({})".format( 2491 results.done, total_tests_width, total_to_do, instance.platform.name, 2492 instance.testcase.name, status, more_info)) 2493 2494 if instance.status in ["error", "failed", "timeout"]: 2495 self.log_info_file(self.inline_logs) 2496 else: 2497 completed_perc = 0 2498 if total_to_do > 0: 2499 completed_perc = int((float(results.done) / total_to_do) * 100) 2500 2501 skipped = results.skipped_configs + results.skipped_runtime 2502 sys.stdout.write("\rINFO - Total complete: %s%4d/%4d%s %2d%% skipped: %s%4d%s, failed: %s%4d%s" % ( 2503 Fore.GREEN, 2504 results.done, 2505 total_to_do, 2506 Fore.RESET, 2507 completed_perc, 2508 Fore.YELLOW if skipped > 0 else Fore.RESET, 2509 skipped, 2510 Fore.RESET, 2511 Fore.RED if results.failed > 0 else Fore.RESET, 2512 results.failed, 2513 Fore.RESET 2514 ) 2515 ) 2516 sys.stdout.flush() 2517 2518 def cmake(self): 2519 2520 instance = self.instance 2521 args = self.testcase.extra_args[:] 2522 args += self.extra_args 2523 2524 if instance.handler: 2525 args += instance.handler.args 2526 2527 # merge overlay files into one variable 2528 def extract_overlays(args): 2529 re_overlay = re.compile('OVERLAY_CONFIG=(.*)') 2530 other_args = [] 2531 overlays = [] 2532 for arg in args: 2533 match = re_overlay.search(arg) 2534 if match: 2535 overlays.append(match.group(1).strip('\'"')) 2536 else: 2537 other_args.append(arg) 2538 2539 args[:] = other_args 2540 return overlays 2541 2542 overlays = extract_overlays(args) 2543 2544 if os.path.exists(os.path.join(instance.build_dir, 2545 "twister", "testcase_extra.conf")): 2546 overlays.append(os.path.join(instance.build_dir, 2547 "twister", "testcase_extra.conf")) 2548 2549 if overlays: 2550 args.append("OVERLAY_CONFIG=\"%s\"" % (" ".join(overlays))) 2551 2552 res = self.run_cmake(args) 2553 return res 2554 2555 def build(self): 2556 res = self.run_build(['--build', self.build_dir]) 2557 return res 2558 2559 def run(self): 2560 2561 instance = self.instance 2562 2563 if instance.handler: 2564 if instance.handler.type_str == "device": 2565 instance.handler.suite = self.suite 2566 2567 instance.handler.handle() 2568 2569 sys.stdout.flush() 2570 2571class TestSuite(DisablePyTestCollectionMixin): 2572 config_re = re.compile('(CONFIG_[A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$') 2573 dt_re = re.compile('([A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$') 2574 2575 tc_schema = scl.yaml_load( 2576 os.path.join(ZEPHYR_BASE, 2577 "scripts", "schemas", "twister", "testcase-schema.yaml")) 2578 quarantine_schema = scl.yaml_load( 2579 os.path.join(ZEPHYR_BASE, 2580 "scripts", "schemas", "twister", "quarantine-schema.yaml")) 2581 2582 testcase_valid_keys = {"tags": {"type": "set", "required": False}, 2583 "type": {"type": "str", "default": "integration"}, 2584 "extra_args": {"type": "list"}, 2585 "extra_configs": {"type": "list"}, 2586 "build_only": {"type": "bool", "default": False}, 2587 "build_on_all": {"type": "bool", "default": False}, 2588 "skip": {"type": "bool", "default": False}, 2589 "slow": {"type": "bool", "default": False}, 2590 "timeout": {"type": "int", "default": 60}, 2591 "min_ram": {"type": "int", "default": 8}, 2592 "depends_on": {"type": "set"}, 2593 "min_flash": {"type": "int", "default": 32}, 2594 "arch_allow": {"type": "set"}, 2595 "arch_exclude": {"type": "set"}, 2596 "extra_sections": {"type": "list", "default": []}, 2597 "integration_platforms": {"type": "list", "default": []}, 2598 "platform_exclude": {"type": "set"}, 2599 "platform_allow": {"type": "set"}, 2600 "toolchain_exclude": {"type": "set"}, 2601 "toolchain_allow": {"type": "set"}, 2602 "filter": {"type": "str"}, 2603 "harness": {"type": "str"}, 2604 "harness_config": {"type": "map", "default": {}} 2605 } 2606 2607 RELEASE_DATA = os.path.join(ZEPHYR_BASE, "scripts", "release", 2608 "twister_last_release.csv") 2609 2610 SAMPLE_FILENAME = 'sample.yaml' 2611 TESTCASE_FILENAME = 'testcase.yaml' 2612 2613 def __init__(self, board_root_list=[], testcase_roots=[], outdir=None): 2614 2615 self.roots = testcase_roots 2616 if not isinstance(board_root_list, list): 2617 self.board_roots = [board_root_list] 2618 else: 2619 self.board_roots = board_root_list 2620 2621 # Testsuite Options 2622 self.coverage_platform = [] 2623 self.build_only = False 2624 self.cmake_only = False 2625 self.cleanup = False 2626 self.enable_slow = False 2627 self.device_testing = False 2628 self.fixtures = [] 2629 self.enable_coverage = False 2630 self.enable_ubsan = False 2631 self.enable_lsan = False 2632 self.enable_asan = False 2633 self.enable_valgrind = False 2634 self.extra_args = [] 2635 self.inline_logs = False 2636 self.enable_sizes_report = False 2637 self.west_flash = None 2638 self.west_runner = None 2639 self.generator = None 2640 self.generator_cmd = None 2641 self.warnings_as_errors = True 2642 self.overflow_as_errors = False 2643 self.quarantine_verify = False 2644 2645 # Keep track of which test cases we've filtered out and why 2646 self.testcases = {} 2647 self.quarantine = {} 2648 self.platforms = [] 2649 self.selected_platforms = [] 2650 self.filtered_platforms = [] 2651 self.default_platforms = [] 2652 self.outdir = os.path.abspath(outdir) 2653 self.discards = {} 2654 self.load_errors = 0 2655 self.instances = dict() 2656 2657 self.total_platforms = 0 2658 self.start_time = 0 2659 self.duration = 0 2660 self.warnings = 0 2661 2662 # hardcoded for now 2663 self.duts = [] 2664 2665 # run integration tests only 2666 self.integration = False 2667 2668 self.pipeline = None 2669 self.version = "NA" 2670 2671 def check_zephyr_version(self): 2672 try: 2673 subproc = subprocess.run(["git", "describe", "--abbrev=12"], 2674 stdout=subprocess.PIPE, 2675 universal_newlines=True, 2676 cwd=ZEPHYR_BASE) 2677 if subproc.returncode == 0: 2678 self.version = subproc.stdout.strip() 2679 logger.info(f"Zephyr version: {self.version}") 2680 except OSError: 2681 logger.info("Cannot read zephyr version.") 2682 2683 def get_platform_instances(self, platform): 2684 filtered_dict = {k:v for k,v in self.instances.items() if k.startswith(platform + "/")} 2685 return filtered_dict 2686 2687 def config(self): 2688 logger.info("coverage platform: {}".format(self.coverage_platform)) 2689 2690 # Debug Functions 2691 @staticmethod 2692 def info(what): 2693 sys.stdout.write(what + "\n") 2694 sys.stdout.flush() 2695 2696 def update_counting(self, results=None, initial=False): 2697 results.skipped_configs = 0 2698 results.skipped_cases = 0 2699 for instance in self.instances.values(): 2700 if initial: 2701 results.cases += len(instance.testcase.cases) 2702 if instance.status == 'skipped': 2703 results.skipped_configs += 1 2704 results.skipped_cases += len(instance.testcase.cases) 2705 elif instance.status == "passed": 2706 results.passed += 1 2707 for res in instance.results.values(): 2708 if res == 'SKIP': 2709 results.skipped_cases += 1 2710 2711 def compare_metrics(self, filename): 2712 # name, datatype, lower results better 2713 interesting_metrics = [("ram_size", int, True), 2714 ("rom_size", int, True)] 2715 2716 if not os.path.exists(filename): 2717 logger.error("Cannot compare metrics, %s not found" % filename) 2718 return [] 2719 2720 results = [] 2721 saved_metrics = {} 2722 with open(filename) as fp: 2723 cr = csv.DictReader(fp) 2724 for row in cr: 2725 d = {} 2726 for m, _, _ in interesting_metrics: 2727 d[m] = row[m] 2728 saved_metrics[(row["test"], row["platform"])] = d 2729 2730 for instance in self.instances.values(): 2731 mkey = (instance.testcase.name, instance.platform.name) 2732 if mkey not in saved_metrics: 2733 continue 2734 sm = saved_metrics[mkey] 2735 for metric, mtype, lower_better in interesting_metrics: 2736 if metric not in instance.metrics: 2737 continue 2738 if sm[metric] == "": 2739 continue 2740 delta = instance.metrics.get(metric, 0) - mtype(sm[metric]) 2741 if delta == 0: 2742 continue 2743 results.append((instance, metric, instance.metrics.get(metric, 0), delta, 2744 lower_better)) 2745 return results 2746 2747 def footprint_reports(self, report, show_footprint, all_deltas, 2748 footprint_threshold, last_metrics): 2749 if not report: 2750 return 2751 2752 logger.debug("running footprint_reports") 2753 deltas = self.compare_metrics(report) 2754 warnings = 0 2755 if deltas and show_footprint: 2756 for i, metric, value, delta, lower_better in deltas: 2757 if not all_deltas and ((delta < 0 and lower_better) or 2758 (delta > 0 and not lower_better)): 2759 continue 2760 2761 percentage = 0 2762 if value > delta: 2763 percentage = (float(delta) / float(value - delta)) 2764 2765 if not all_deltas and (percentage < (footprint_threshold / 100.0)): 2766 continue 2767 2768 logger.info("{:<25} {:<60} {}{}{}: {} {:<+4}, is now {:6} {:+.2%}".format( 2769 i.platform.name, i.testcase.name, Fore.YELLOW, 2770 "INFO" if all_deltas else "WARNING", Fore.RESET, 2771 metric, delta, value, percentage)) 2772 warnings += 1 2773 2774 if warnings: 2775 logger.warning("Deltas based on metrics from last %s" % 2776 ("release" if not last_metrics else "run")) 2777 2778 def summary(self, results, unrecognized_sections): 2779 failed = 0 2780 run = 0 2781 for instance in self.instances.values(): 2782 if instance.status == "failed": 2783 failed += 1 2784 elif instance.metrics.get("unrecognized") and not unrecognized_sections: 2785 logger.error("%sFAILED%s: %s has unrecognized binary sections: %s" % 2786 (Fore.RED, Fore.RESET, instance.name, 2787 str(instance.metrics.get("unrecognized", [])))) 2788 failed += 1 2789 2790 if instance.metrics.get('handler_time', None): 2791 run += 1 2792 2793 if results.total and results.total != results.skipped_configs: 2794 pass_rate = (float(results.passed) / float(results.total - results.skipped_configs)) 2795 else: 2796 pass_rate = 0 2797 2798 logger.info( 2799 "{}{} of {}{} test configurations passed ({:.2%}), {}{}{} failed, {} skipped with {}{}{} warnings in {:.2f} seconds".format( 2800 Fore.RED if failed else Fore.GREEN, 2801 results.passed, 2802 results.total - results.skipped_configs, 2803 Fore.RESET, 2804 pass_rate, 2805 Fore.RED if results.failed else Fore.RESET, 2806 results.failed, 2807 Fore.RESET, 2808 results.skipped_configs, 2809 Fore.YELLOW if self.warnings else Fore.RESET, 2810 self.warnings, 2811 Fore.RESET, 2812 self.duration)) 2813 2814 self.total_platforms = len(self.platforms) 2815 # if we are only building, do not report about tests being executed. 2816 if self.platforms and not self.build_only: 2817 logger.info("In total {} test cases were executed, {} skipped on {} out of total {} platforms ({:02.2f}%)".format( 2818 results.cases - results.skipped_cases, 2819 results.skipped_cases, 2820 len(self.filtered_platforms), 2821 self.total_platforms, 2822 (100 * len(self.filtered_platforms) / len(self.platforms)) 2823 )) 2824 2825 logger.info(f"{Fore.GREEN}{run}{Fore.RESET} test configurations executed on platforms, \ 2826{Fore.RED}{results.total - run - results.skipped_configs}{Fore.RESET} test configurations were only built.") 2827 2828 def save_reports(self, name, suffix, report_dir, no_update, release, only_failed, platform_reports, json_report): 2829 if not self.instances: 2830 return 2831 2832 logger.info("Saving reports...") 2833 if name: 2834 report_name = name 2835 else: 2836 report_name = "twister" 2837 2838 if report_dir: 2839 os.makedirs(report_dir, exist_ok=True) 2840 filename = os.path.join(report_dir, report_name) 2841 outdir = report_dir 2842 else: 2843 filename = os.path.join(self.outdir, report_name) 2844 outdir = self.outdir 2845 2846 if suffix: 2847 filename = "{}_{}".format(filename, suffix) 2848 2849 if not no_update: 2850 self.xunit_report(filename + ".xml", full_report=False, 2851 append=only_failed, version=self.version) 2852 self.xunit_report(filename + "_report.xml", full_report=True, 2853 append=only_failed, version=self.version) 2854 self.csv_report(filename + ".csv") 2855 2856 if json_report: 2857 self.json_report(filename + ".json", append=only_failed, version=self.version) 2858 2859 if platform_reports: 2860 self.target_report(outdir, suffix, append=only_failed) 2861 if self.discards: 2862 self.discard_report(filename + "_discard.csv") 2863 2864 if release: 2865 self.csv_report(self.RELEASE_DATA) 2866 2867 def add_configurations(self): 2868 2869 for board_root in self.board_roots: 2870 board_root = os.path.abspath(board_root) 2871 2872 logger.debug("Reading platform configuration files under %s..." % 2873 board_root) 2874 2875 for file in glob.glob(os.path.join(board_root, "*", "*", "*.yaml")): 2876 try: 2877 platform = Platform() 2878 platform.load(file) 2879 if platform.name in [p.name for p in self.platforms]: 2880 logger.error(f"Duplicate platform {platform.name} in {file}") 2881 raise Exception(f"Duplicate platform identifier {platform.name} found") 2882 if platform.twister: 2883 self.platforms.append(platform) 2884 if platform.default: 2885 self.default_platforms.append(platform.name) 2886 2887 except RuntimeError as e: 2888 logger.error("E: %s: can't load: %s" % (file, e)) 2889 self.load_errors += 1 2890 2891 def get_all_tests(self): 2892 tests = [] 2893 for _, tc in self.testcases.items(): 2894 for case in tc.cases: 2895 tests.append(case) 2896 2897 return tests 2898 2899 @staticmethod 2900 def get_toolchain(): 2901 toolchain_script = Path(ZEPHYR_BASE) / Path('cmake/verify-toolchain.cmake') 2902 result = CMake.run_cmake_script([toolchain_script, "FORMAT=json"]) 2903 2904 try: 2905 if result['returncode']: 2906 raise TwisterRuntimeError(f"E: {result['returnmsg']}") 2907 except Exception as e: 2908 print(str(e)) 2909 sys.exit(2) 2910 toolchain = json.loads(result['stdout'])['ZEPHYR_TOOLCHAIN_VARIANT'] 2911 logger.info(f"Using '{toolchain}' toolchain.") 2912 2913 return toolchain 2914 2915 def add_testcases(self, testcase_filter=[]): 2916 for root in self.roots: 2917 root = os.path.abspath(root) 2918 2919 logger.debug("Reading test case configuration files under %s..." % root) 2920 2921 for dirpath, _, filenames in os.walk(root, topdown=True): 2922 if self.SAMPLE_FILENAME in filenames: 2923 filename = self.SAMPLE_FILENAME 2924 elif self.TESTCASE_FILENAME in filenames: 2925 filename = self.TESTCASE_FILENAME 2926 else: 2927 continue 2928 2929 logger.debug("Found possible test case in " + dirpath) 2930 2931 tc_path = os.path.join(dirpath, filename) 2932 2933 try: 2934 parsed_data = TwisterConfigParser(tc_path, self.tc_schema) 2935 parsed_data.load() 2936 2937 tc_path = os.path.dirname(tc_path) 2938 workdir = os.path.relpath(tc_path, root) 2939 2940 for name in parsed_data.tests.keys(): 2941 tc = TestCase(root, workdir, name) 2942 2943 tc_dict = parsed_data.get_test(name, self.testcase_valid_keys) 2944 2945 tc.source_dir = tc_path 2946 tc.yamlfile = tc_path 2947 2948 tc.type = tc_dict["type"] 2949 tc.tags = tc_dict["tags"] 2950 tc.extra_args = tc_dict["extra_args"] 2951 tc.extra_configs = tc_dict["extra_configs"] 2952 tc.arch_allow = tc_dict["arch_allow"] 2953 tc.arch_exclude = tc_dict["arch_exclude"] 2954 tc.skip = tc_dict["skip"] 2955 tc.platform_exclude = tc_dict["platform_exclude"] 2956 tc.platform_allow = tc_dict["platform_allow"] 2957 tc.toolchain_exclude = tc_dict["toolchain_exclude"] 2958 tc.toolchain_allow = tc_dict["toolchain_allow"] 2959 tc.tc_filter = tc_dict["filter"] 2960 tc.timeout = tc_dict["timeout"] 2961 tc.harness = tc_dict["harness"] 2962 tc.harness_config = tc_dict["harness_config"] 2963 if tc.harness == 'console' and not tc.harness_config: 2964 raise Exception('Harness config error: console harness defined without a configuration.') 2965 tc.build_only = tc_dict["build_only"] 2966 tc.build_on_all = tc_dict["build_on_all"] 2967 tc.slow = tc_dict["slow"] 2968 tc.min_ram = tc_dict["min_ram"] 2969 tc.depends_on = tc_dict["depends_on"] 2970 tc.min_flash = tc_dict["min_flash"] 2971 tc.extra_sections = tc_dict["extra_sections"] 2972 tc.integration_platforms = tc_dict["integration_platforms"] 2973 2974 tc.parse_subcases(tc_path) 2975 2976 if testcase_filter: 2977 if tc.name and tc.name in testcase_filter: 2978 self.testcases[tc.name] = tc 2979 else: 2980 self.testcases[tc.name] = tc 2981 2982 except Exception as e: 2983 logger.error("%s: can't load (skipping): %s" % (tc_path, e)) 2984 self.load_errors += 1 2985 return len(self.testcases) 2986 2987 def get_platform(self, name): 2988 selected_platform = None 2989 for platform in self.platforms: 2990 if platform.name == name: 2991 selected_platform = platform 2992 break 2993 return selected_platform 2994 2995 def load_quarantine(self, file): 2996 """ 2997 Loads quarantine list from the given yaml file. Creates a dictionary 2998 of all tests configurations (platform + scenario: comment) that shall be 2999 skipped due to quarantine 3000 """ 3001 3002 # Load yaml into quarantine_yaml 3003 quarantine_yaml = scl.yaml_load_verify(file, self.quarantine_schema) 3004 3005 # Create quarantine_list with a product of the listed 3006 # platforms and scenarios for each entry in quarantine yaml 3007 quarantine_list = [] 3008 for quar_dict in quarantine_yaml: 3009 if quar_dict['platforms'][0] == "all": 3010 plat = [p.name for p in self.platforms] 3011 else: 3012 plat = quar_dict['platforms'] 3013 comment = quar_dict.get('comment', "NA") 3014 quarantine_list.append([{".".join([p, s]): comment} 3015 for p in plat for s in quar_dict['scenarios']]) 3016 3017 # Flatten the quarantine_list 3018 quarantine_list = [it for sublist in quarantine_list for it in sublist] 3019 # Change quarantine_list into a dictionary 3020 for d in quarantine_list: 3021 self.quarantine.update(d) 3022 3023 def load_from_file(self, file, filter_status=[], filter_platform=[]): 3024 try: 3025 with open(file, "r") as fp: 3026 cr = csv.DictReader(fp) 3027 instance_list = [] 3028 for row in cr: 3029 if row["status"] in filter_status: 3030 continue 3031 test = row["test"] 3032 3033 platform = self.get_platform(row["platform"]) 3034 if filter_platform and platform.name not in filter_platform: 3035 continue 3036 instance = TestInstance(self.testcases[test], platform, self.outdir) 3037 if self.device_testing: 3038 tfilter = 'runnable' 3039 else: 3040 tfilter = 'buildable' 3041 instance.run = instance.check_runnable( 3042 self.enable_slow, 3043 tfilter, 3044 self.fixtures 3045 ) 3046 instance.create_overlay(platform, self.enable_asan, self.enable_ubsan, self.enable_coverage, self.coverage_platform) 3047 instance_list.append(instance) 3048 self.add_instances(instance_list) 3049 3050 except KeyError as e: 3051 logger.error("Key error while parsing tests file.({})".format(str(e))) 3052 sys.exit(2) 3053 3054 except FileNotFoundError as e: 3055 logger.error("Couldn't find input file with list of tests. ({})".format(e)) 3056 sys.exit(2) 3057 3058 def apply_filters(self, **kwargs): 3059 3060 toolchain = self.get_toolchain() 3061 3062 discards = {} 3063 platform_filter = kwargs.get('platform') 3064 exclude_platform = kwargs.get('exclude_platform', []) 3065 testcase_filter = kwargs.get('run_individual_tests', []) 3066 arch_filter = kwargs.get('arch') 3067 tag_filter = kwargs.get('tag') 3068 exclude_tag = kwargs.get('exclude_tag') 3069 all_filter = kwargs.get('all') 3070 runnable = kwargs.get('runnable') 3071 force_toolchain = kwargs.get('force_toolchain') 3072 force_platform = kwargs.get('force_platform') 3073 emu_filter = kwargs.get('emulation_only') 3074 3075 logger.debug("platform filter: " + str(platform_filter)) 3076 logger.debug(" arch_filter: " + str(arch_filter)) 3077 logger.debug(" tag_filter: " + str(tag_filter)) 3078 logger.debug(" exclude_tag: " + str(exclude_tag)) 3079 3080 default_platforms = False 3081 emulation_platforms = False 3082 3083 3084 if all_filter: 3085 logger.info("Selecting all possible platforms per test case") 3086 # When --all used, any --platform arguments ignored 3087 platform_filter = [] 3088 elif not platform_filter and not emu_filter: 3089 logger.info("Selecting default platforms per test case") 3090 default_platforms = True 3091 elif emu_filter: 3092 logger.info("Selecting emulation platforms per test case") 3093 emulation_platforms = True 3094 3095 if platform_filter: 3096 platforms = list(filter(lambda p: p.name in platform_filter, self.platforms)) 3097 elif emu_filter: 3098 platforms = list(filter(lambda p: p.simulation != 'na', self.platforms)) 3099 elif arch_filter: 3100 platforms = list(filter(lambda p: p.arch in arch_filter, self.platforms)) 3101 elif default_platforms: 3102 platforms = list(filter(lambda p: p.default, self.platforms)) 3103 else: 3104 platforms = self.platforms 3105 3106 logger.info("Building initial testcase list...") 3107 3108 for tc_name, tc in self.testcases.items(): 3109 3110 if tc.build_on_all and not platform_filter: 3111 platform_scope = self.platforms 3112 elif tc.integration_platforms and self.integration: 3113 platform_scope = list(filter(lambda item: item.name in tc.integration_platforms, \ 3114 self.platforms)) 3115 else: 3116 platform_scope = platforms 3117 3118 integration = self.integration and tc.integration_platforms 3119 3120 # If there isn't any overlap between the platform_allow list and the platform_scope 3121 # we set the scope to the platform_allow list 3122 if tc.platform_allow and not platform_filter and not integration: 3123 a = set(platform_scope) 3124 b = set(filter(lambda item: item.name in tc.platform_allow, self.platforms)) 3125 c = a.intersection(b) 3126 if not c: 3127 platform_scope = list(filter(lambda item: item.name in tc.platform_allow, \ 3128 self.platforms)) 3129 3130 # list of instances per testcase, aka configurations. 3131 instance_list = [] 3132 for plat in platform_scope: 3133 instance = TestInstance(tc, plat, self.outdir) 3134 if runnable: 3135 tfilter = 'runnable' 3136 else: 3137 tfilter = 'buildable' 3138 3139 instance.run = instance.check_runnable( 3140 self.enable_slow, 3141 tfilter, 3142 self.fixtures 3143 ) 3144 3145 for t in tc.cases: 3146 instance.results[t] = None 3147 3148 if runnable and self.duts: 3149 for h in self.duts: 3150 if h.platform == plat.name: 3151 if tc.harness_config.get('fixture') in h.fixtures: 3152 instance.run = True 3153 3154 if not force_platform and plat.name in exclude_platform: 3155 discards[instance] = discards.get(instance, "Platform is excluded on command line.") 3156 3157 if (plat.arch == "unit") != (tc.type == "unit"): 3158 # Discard silently 3159 continue 3160 3161 if runnable and not instance.run: 3162 discards[instance] = discards.get(instance, "Not runnable on device") 3163 3164 if self.integration and tc.integration_platforms and plat.name not in tc.integration_platforms: 3165 discards[instance] = discards.get(instance, "Not part of integration platforms") 3166 3167 if tc.skip: 3168 discards[instance] = discards.get(instance, "Skip filter") 3169 3170 if tag_filter and not tc.tags.intersection(tag_filter): 3171 discards[instance] = discards.get(instance, "Command line testcase tag filter") 3172 3173 if exclude_tag and tc.tags.intersection(exclude_tag): 3174 discards[instance] = discards.get(instance, "Command line testcase exclude filter") 3175 3176 if testcase_filter and tc_name not in testcase_filter: 3177 discards[instance] = discards.get(instance, "Testcase name filter") 3178 3179 if arch_filter and plat.arch not in arch_filter: 3180 discards[instance] = discards.get(instance, "Command line testcase arch filter") 3181 3182 if not force_platform: 3183 3184 if tc.arch_allow and plat.arch not in tc.arch_allow: 3185 discards[instance] = discards.get(instance, "Not in test case arch allow list") 3186 3187 if tc.arch_exclude and plat.arch in tc.arch_exclude: 3188 discards[instance] = discards.get(instance, "In test case arch exclude") 3189 3190 if tc.platform_exclude and plat.name in tc.platform_exclude: 3191 discards[instance] = discards.get(instance, "In test case platform exclude") 3192 3193 if tc.toolchain_exclude and toolchain in tc.toolchain_exclude: 3194 discards[instance] = discards.get(instance, "In test case toolchain exclude") 3195 3196 if platform_filter and plat.name not in platform_filter: 3197 discards[instance] = discards.get(instance, "Command line platform filter") 3198 3199 if tc.platform_allow and plat.name not in tc.platform_allow: 3200 discards[instance] = discards.get(instance, "Not in testcase platform allow list") 3201 3202 if tc.toolchain_allow and toolchain not in tc.toolchain_allow: 3203 discards[instance] = discards.get(instance, "Not in testcase toolchain allow list") 3204 3205 if not plat.env_satisfied: 3206 discards[instance] = discards.get(instance, "Environment ({}) not satisfied".format(", ".join(plat.env))) 3207 3208 if not force_toolchain \ 3209 and toolchain and (toolchain not in plat.supported_toolchains) \ 3210 and "host" not in plat.supported_toolchains \ 3211 and tc.type != 'unit': 3212 discards[instance] = discards.get(instance, "Not supported by the toolchain") 3213 3214 if plat.ram < tc.min_ram: 3215 discards[instance] = discards.get(instance, "Not enough RAM") 3216 3217 if tc.depends_on: 3218 dep_intersection = tc.depends_on.intersection(set(plat.supported)) 3219 if dep_intersection != set(tc.depends_on): 3220 discards[instance] = discards.get(instance, "No hardware support") 3221 3222 if plat.flash < tc.min_flash: 3223 discards[instance] = discards.get(instance, "Not enough FLASH") 3224 3225 if set(plat.ignore_tags) & tc.tags: 3226 discards[instance] = discards.get(instance, "Excluded tags per platform (exclude_tags)") 3227 3228 if plat.only_tags and not set(plat.only_tags) & tc.tags: 3229 discards[instance] = discards.get(instance, "Excluded tags per platform (only_tags)") 3230 3231 test_configuration = ".".join([instance.platform.name, 3232 instance.testcase.id]) 3233 # skip quarantined tests 3234 if test_configuration in self.quarantine and not self.quarantine_verify: 3235 discards[instance] = discards.get(instance, 3236 f"Quarantine: {self.quarantine[test_configuration]}") 3237 # run only quarantined test to verify their statuses (skip everything else) 3238 if self.quarantine_verify and test_configuration not in self.quarantine: 3239 discards[instance] = discards.get(instance, "Not under quarantine") 3240 3241 # if nothing stopped us until now, it means this configuration 3242 # needs to be added. 3243 instance_list.append(instance) 3244 3245 # no configurations, so jump to next testcase 3246 if not instance_list: 3247 continue 3248 3249 # if twister was launched with no platform options at all, we 3250 # take all default platforms 3251 if default_platforms and not tc.build_on_all and not integration: 3252 if tc.platform_allow: 3253 a = set(self.default_platforms) 3254 b = set(tc.platform_allow) 3255 c = a.intersection(b) 3256 if c: 3257 aa = list(filter(lambda tc: tc.platform.name in c, instance_list)) 3258 self.add_instances(aa) 3259 else: 3260 self.add_instances(instance_list) 3261 else: 3262 instances = list(filter(lambda tc: tc.platform.default, instance_list)) 3263 self.add_instances(instances) 3264 elif integration: 3265 instances = list(filter(lambda item: item.platform.name in tc.integration_platforms, instance_list)) 3266 self.add_instances(instances) 3267 3268 3269 3270 elif emulation_platforms: 3271 self.add_instances(instance_list) 3272 for instance in list(filter(lambda inst: not inst.platform.simulation != 'na', instance_list)): 3273 discards[instance] = discards.get(instance, "Not an emulated platform") 3274 else: 3275 self.add_instances(instance_list) 3276 3277 for _, case in self.instances.items(): 3278 case.create_overlay(case.platform, self.enable_asan, self.enable_ubsan, self.enable_coverage, self.coverage_platform) 3279 3280 self.discards = discards 3281 self.selected_platforms = set(p.platform.name for p in self.instances.values()) 3282 3283 for instance in self.discards: 3284 instance.reason = self.discards[instance] 3285 # If integration mode is on all skips on integration_platforms are treated as errors. 3286 if self.integration and instance.platform.name in instance.testcase.integration_platforms \ 3287 and "Quarantine" not in instance.reason: 3288 instance.status = "error" 3289 instance.reason += " but is one of the integration platforms" 3290 instance.fill_results_by_status() 3291 self.instances[instance.name] = instance 3292 else: 3293 instance.status = "skipped" 3294 instance.fill_results_by_status() 3295 3296 self.filtered_platforms = set(p.platform.name for p in self.instances.values() 3297 if p.status != "skipped" ) 3298 3299 return discards 3300 3301 def add_instances(self, instance_list): 3302 for instance in instance_list: 3303 self.instances[instance.name] = instance 3304 3305 @staticmethod 3306 def calc_one_elf_size(instance): 3307 if instance.status not in ["error", "failed", "skipped"]: 3308 if instance.platform.type != "native": 3309 size_calc = instance.calculate_sizes() 3310 instance.metrics["ram_size"] = size_calc.get_ram_size() 3311 instance.metrics["rom_size"] = size_calc.get_rom_size() 3312 instance.metrics["unrecognized"] = size_calc.unrecognized_sections() 3313 else: 3314 instance.metrics["ram_size"] = 0 3315 instance.metrics["rom_size"] = 0 3316 instance.metrics["unrecognized"] = [] 3317 3318 instance.metrics["handler_time"] = instance.handler.duration if instance.handler else 0 3319 3320 def add_tasks_to_queue(self, pipeline, build_only=False, test_only=False): 3321 for instance in self.instances.values(): 3322 if build_only: 3323 instance.run = False 3324 3325 if instance.status not in ['passed', 'skipped', 'error']: 3326 logger.debug(f"adding {instance.name}") 3327 instance.status = None 3328 if test_only and instance.run: 3329 pipeline.put({"op": "run", "test": instance}) 3330 else: 3331 pipeline.put({"op": "cmake", "test": instance}) 3332 # If the instance got 'error' status before, proceed to the report stage 3333 if instance.status == "error": 3334 pipeline.put({"op": "report", "test": instance}) 3335 3336 def pipeline_mgr(self, pipeline, done_queue, lock, results): 3337 while True: 3338 try: 3339 task = pipeline.get_nowait() 3340 except queue.Empty: 3341 break 3342 else: 3343 test = task['test'] 3344 pb = ProjectBuilder(self, 3345 test, 3346 lsan=self.enable_lsan, 3347 asan=self.enable_asan, 3348 ubsan=self.enable_ubsan, 3349 coverage=self.enable_coverage, 3350 extra_args=self.extra_args, 3351 device_testing=self.device_testing, 3352 cmake_only=self.cmake_only, 3353 cleanup=self.cleanup, 3354 valgrind=self.enable_valgrind, 3355 inline_logs=self.inline_logs, 3356 generator=self.generator, 3357 generator_cmd=self.generator_cmd, 3358 verbose=self.verbose, 3359 warnings_as_errors=self.warnings_as_errors, 3360 overflow_as_errors=self.overflow_as_errors 3361 ) 3362 pb.process(pipeline, done_queue, task, lock, results) 3363 3364 return True 3365 3366 def execute(self, pipeline, done, results): 3367 lock = Lock() 3368 logger.info("Adding tasks to the queue...") 3369 self.add_tasks_to_queue(pipeline, self.build_only, self.test_only) 3370 logger.info("Added initial list of jobs to queue") 3371 3372 processes = [] 3373 for job in range(self.jobs): 3374 logger.debug(f"Launch process {job}") 3375 p = Process(target=self.pipeline_mgr, args=(pipeline, done, lock, results, )) 3376 processes.append(p) 3377 p.start() 3378 3379 try: 3380 for p in processes: 3381 p.join() 3382 except KeyboardInterrupt: 3383 logger.info("Execution interrupted") 3384 for p in processes: 3385 p.terminate() 3386 3387 # FIXME: This needs to move out. 3388 if self.enable_size_report and not self.cmake_only: 3389 # Parallelize size calculation 3390 executor = concurrent.futures.ThreadPoolExecutor(self.jobs) 3391 futures = [executor.submit(self.calc_one_elf_size, instance) 3392 for instance in self.instances.values()] 3393 concurrent.futures.wait(futures) 3394 else: 3395 for instance in self.instances.values(): 3396 instance.metrics["ram_size"] = 0 3397 instance.metrics["rom_size"] = 0 3398 instance.metrics["handler_time"] = instance.handler.duration if instance.handler else 0 3399 instance.metrics["unrecognized"] = [] 3400 3401 return results 3402 3403 def discard_report(self, filename): 3404 3405 try: 3406 if not self.discards: 3407 raise TwisterRuntimeError("apply_filters() hasn't been run!") 3408 except Exception as e: 3409 logger.error(str(e)) 3410 sys.exit(2) 3411 with open(filename, "wt") as csvfile: 3412 fieldnames = ["test", "arch", "platform", "reason"] 3413 cw = csv.DictWriter(csvfile, fieldnames, lineterminator=os.linesep) 3414 cw.writeheader() 3415 for instance, reason in sorted(self.discards.items()): 3416 rowdict = {"test": instance.testcase.name, 3417 "arch": instance.platform.arch, 3418 "platform": instance.platform.name, 3419 "reason": reason} 3420 cw.writerow(rowdict) 3421 3422 def target_report(self, outdir, suffix, append=False): 3423 platforms = {inst.platform.name for _, inst in self.instances.items()} 3424 for platform in platforms: 3425 if suffix: 3426 filename = os.path.join(outdir,"{}_{}.xml".format(platform, suffix)) 3427 else: 3428 filename = os.path.join(outdir,"{}.xml".format(platform)) 3429 self.xunit_report(filename, platform, full_report=True, 3430 append=append, version=self.version) 3431 3432 3433 @staticmethod 3434 def process_log(log_file): 3435 filtered_string = "" 3436 if os.path.exists(log_file): 3437 with open(log_file, "rb") as f: 3438 log = f.read().decode("utf-8") 3439 filtered_string = ''.join(filter(lambda x: x in string.printable, log)) 3440 3441 return filtered_string 3442 3443 3444 def xunit_report(self, filename, platform=None, full_report=False, append=False, version="NA"): 3445 total = 0 3446 fails = passes = errors = skips = 0 3447 if platform: 3448 selected = [platform] 3449 logger.info(f"Writing target report for {platform}...") 3450 else: 3451 logger.info(f"Writing xunit report {filename}...") 3452 selected = self.selected_platforms 3453 3454 if os.path.exists(filename) and append: 3455 tree = ET.parse(filename) 3456 eleTestsuites = tree.getroot() 3457 else: 3458 eleTestsuites = ET.Element('testsuites') 3459 3460 for p in selected: 3461 inst = self.get_platform_instances(p) 3462 fails = 0 3463 passes = 0 3464 errors = 0 3465 skips = 0 3466 duration = 0 3467 3468 for _, instance in inst.items(): 3469 handler_time = instance.metrics.get('handler_time', 0) 3470 duration += handler_time 3471 if full_report and instance.run: 3472 for k in instance.results.keys(): 3473 if instance.results[k] == 'PASS': 3474 passes += 1 3475 elif instance.results[k] == 'BLOCK': 3476 errors += 1 3477 elif instance.results[k] == 'SKIP' or instance.status in ['skipped']: 3478 skips += 1 3479 else: 3480 fails += 1 3481 else: 3482 if instance.status in ["error", "failed", "timeout", "flash_error"]: 3483 if instance.reason in ['build_error', 'handler_crash']: 3484 errors += 1 3485 else: 3486 fails += 1 3487 elif instance.status == 'skipped': 3488 skips += 1 3489 elif instance.status == 'passed': 3490 passes += 1 3491 else: 3492 if instance.status: 3493 logger.error(f"{instance.name}: Unknown status {instance.status}") 3494 else: 3495 logger.error(f"{instance.name}: No status") 3496 3497 total = (errors + passes + fails + skips) 3498 # do not produce a report if no tests were actually run (only built) 3499 if total == 0: 3500 continue 3501 3502 run = p 3503 eleTestsuite = None 3504 3505 # When we re-run the tests, we re-use the results and update only with 3506 # the newly run tests. 3507 if os.path.exists(filename) and append: 3508 ts = eleTestsuites.findall(f'testsuite/[@name="{p}"]') 3509 if ts: 3510 eleTestsuite = ts[0] 3511 eleTestsuite.attrib['failures'] = "%d" % fails 3512 eleTestsuite.attrib['errors'] = "%d" % errors 3513 eleTestsuite.attrib['skipped'] = "%d" % skips 3514 else: 3515 logger.info(f"Did not find any existing results for {p}") 3516 eleTestsuite = ET.SubElement(eleTestsuites, 'testsuite', 3517 name=run, time="%f" % duration, 3518 tests="%d" % (total), 3519 failures="%d" % fails, 3520 errors="%d" % (errors), skipped="%s" % (skips)) 3521 eleTSPropetries = ET.SubElement(eleTestsuite, 'properties') 3522 # Multiple 'property' can be added to 'properties' 3523 # differing by name and value 3524 ET.SubElement(eleTSPropetries, 'property', name="version", value=version) 3525 3526 else: 3527 eleTestsuite = ET.SubElement(eleTestsuites, 'testsuite', 3528 name=run, time="%f" % duration, 3529 tests="%d" % (total), 3530 failures="%d" % fails, 3531 errors="%d" % (errors), skipped="%s" % (skips)) 3532 eleTSPropetries = ET.SubElement(eleTestsuite, 'properties') 3533 # Multiple 'property' can be added to 'properties' 3534 # differing by name and value 3535 ET.SubElement(eleTSPropetries, 'property', name="version", value=version) 3536 3537 for _, instance in inst.items(): 3538 if full_report: 3539 tname = os.path.basename(instance.testcase.name) 3540 else: 3541 tname = instance.testcase.id 3542 handler_time = instance.metrics.get('handler_time', 0) 3543 3544 if full_report: 3545 for k in instance.results.keys(): 3546 # remove testcases that are being re-run from exiting reports 3547 for tc in eleTestsuite.findall(f'testcase/[@name="{k}"]'): 3548 eleTestsuite.remove(tc) 3549 3550 classname = ".".join(tname.split(".")[:2]) 3551 eleTestcase = ET.SubElement( 3552 eleTestsuite, 'testcase', 3553 classname=classname, 3554 name="%s" % (k), time="%f" % handler_time) 3555 if instance.results[k] in ['FAIL', 'BLOCK'] or \ 3556 (not instance.run and instance.status in ["error", "failed", "timeout"]): 3557 if instance.results[k] == 'FAIL': 3558 el = ET.SubElement( 3559 eleTestcase, 3560 'failure', 3561 type="failure", 3562 message="failed") 3563 else: 3564 el = ET.SubElement( 3565 eleTestcase, 3566 'error', 3567 type="failure", 3568 message=instance.reason) 3569 log_root = os.path.join(self.outdir, instance.platform.name, instance.testcase.name) 3570 log_file = os.path.join(log_root, "handler.log") 3571 el.text = self.process_log(log_file) 3572 3573 elif instance.results[k] == 'PASS' \ 3574 or (not instance.run and instance.status in ["passed"]): 3575 pass 3576 elif instance.results[k] == 'SKIP' or (instance.status in ["skipped"]): 3577 el = ET.SubElement(eleTestcase, 'skipped', type="skipped", message=instance.reason) 3578 else: 3579 el = ET.SubElement( 3580 eleTestcase, 3581 'error', 3582 type="error", 3583 message=f"{instance.reason}") 3584 else: 3585 if platform: 3586 classname = ".".join(instance.testcase.name.split(".")[:2]) 3587 else: 3588 classname = p + ":" + ".".join(instance.testcase.name.split(".")[:2]) 3589 3590 # remove testcases that are being re-run from exiting reports 3591 for tc in eleTestsuite.findall(f'testcase/[@classname="{classname}"][@name="{instance.testcase.name}"]'): 3592 eleTestsuite.remove(tc) 3593 3594 eleTestcase = ET.SubElement(eleTestsuite, 'testcase', 3595 classname=classname, 3596 name="%s" % (instance.testcase.name), 3597 time="%f" % handler_time) 3598 3599 if instance.status in ["error", "failed", "timeout", "flash_error"]: 3600 failure = ET.SubElement( 3601 eleTestcase, 3602 'failure', 3603 type="failure", 3604 message=instance.reason) 3605 3606 log_root = ("%s/%s/%s" % (self.outdir, instance.platform.name, instance.testcase.name)) 3607 bl = os.path.join(log_root, "build.log") 3608 hl = os.path.join(log_root, "handler.log") 3609 log_file = bl 3610 if instance.reason != 'Build error': 3611 if os.path.exists(hl): 3612 log_file = hl 3613 else: 3614 log_file = bl 3615 3616 failure.text = self.process_log(log_file) 3617 3618 elif instance.status == "skipped": 3619 ET.SubElement(eleTestcase, 'skipped', type="skipped", message="Skipped") 3620 3621 result = ET.tostring(eleTestsuites) 3622 with open(filename, 'wb') as report: 3623 report.write(result) 3624 3625 return fails, passes, errors, skips 3626 3627 def csv_report(self, filename): 3628 with open(filename, "wt") as csvfile: 3629 fieldnames = ["test", "arch", "platform", "status", 3630 "extra_args", "handler", "handler_time", "ram_size", 3631 "rom_size"] 3632 cw = csv.DictWriter(csvfile, fieldnames, lineterminator=os.linesep) 3633 cw.writeheader() 3634 for instance in self.instances.values(): 3635 rowdict = {"test": instance.testcase.name, 3636 "arch": instance.platform.arch, 3637 "platform": instance.platform.name, 3638 "extra_args": " ".join(instance.testcase.extra_args), 3639 "handler": instance.platform.simulation} 3640 3641 rowdict["status"] = instance.status 3642 if instance.status not in ["error", "failed", "timeout"]: 3643 if instance.handler: 3644 rowdict["handler_time"] = instance.metrics.get("handler_time", 0) 3645 ram_size = instance.metrics.get("ram_size", 0) 3646 rom_size = instance.metrics.get("rom_size", 0) 3647 rowdict["ram_size"] = ram_size 3648 rowdict["rom_size"] = rom_size 3649 cw.writerow(rowdict) 3650 3651 def json_report(self, filename, append=False, version="NA"): 3652 logger.info(f"Writing JSON report {filename}") 3653 report = {} 3654 selected = self.selected_platforms 3655 report["environment"] = {"os": os.name, 3656 "zephyr_version": version, 3657 "toolchain": self.get_toolchain() 3658 } 3659 json_data = {} 3660 if os.path.exists(filename) and append: 3661 with open(filename, 'r') as json_file: 3662 json_data = json.load(json_file) 3663 3664 suites = json_data.get("testsuites", []) 3665 if suites: 3666 suite = suites[0] 3667 testcases = suite.get("testcases", []) 3668 else: 3669 suite = {} 3670 testcases = [] 3671 3672 for p in selected: 3673 inst = self.get_platform_instances(p) 3674 for _, instance in inst.items(): 3675 testcase = {} 3676 handler_log = os.path.join(instance.build_dir, "handler.log") 3677 build_log = os.path.join(instance.build_dir, "build.log") 3678 device_log = os.path.join(instance.build_dir, "device.log") 3679 3680 handler_time = instance.metrics.get('handler_time', 0) 3681 ram_size = instance.metrics.get ("ram_size", 0) 3682 rom_size = instance.metrics.get("rom_size",0) 3683 for k in instance.results.keys(): 3684 testcases = list(filter(lambda d: not (d.get('testcase') == k and d.get('platform') == p), testcases )) 3685 testcase = {"testcase": k, 3686 "arch": instance.platform.arch, 3687 "platform": p, 3688 } 3689 if ram_size: 3690 testcase["ram_size"] = ram_size 3691 if rom_size: 3692 testcase["rom_size"] = rom_size 3693 3694 if instance.results[k] in ["PASS"] or instance.status == 'passed': 3695 testcase["status"] = "passed" 3696 if instance.handler: 3697 testcase["execution_time"] = handler_time 3698 3699 elif instance.results[k] in ['FAIL', 'BLOCK'] or instance.status in ["error", "failed", "timeout", "flash_error"]: 3700 testcase["status"] = "failed" 3701 testcase["reason"] = instance.reason 3702 testcase["execution_time"] = handler_time 3703 if os.path.exists(handler_log): 3704 testcase["test_output"] = self.process_log(handler_log) 3705 elif os.path.exists(device_log): 3706 testcase["device_log"] = self.process_log(device_log) 3707 else: 3708 testcase["build_log"] = self.process_log(build_log) 3709 elif instance.status == 'skipped': 3710 testcase["status"] = "skipped" 3711 testcase["reason"] = instance.reason 3712 testcases.append(testcase) 3713 3714 suites = [ {"testcases": testcases} ] 3715 report["testsuites"] = suites 3716 3717 with open(filename, "wt") as json_file: 3718 json.dump(report, json_file, indent=4, separators=(',',':')) 3719 3720 def get_testcase(self, identifier): 3721 results = [] 3722 for _, tc in self.testcases.items(): 3723 for case in tc.cases: 3724 if case == identifier: 3725 results.append(tc) 3726 return results 3727 3728class CoverageTool: 3729 """ Base class for every supported coverage tool 3730 """ 3731 3732 def __init__(self): 3733 self.gcov_tool = None 3734 self.base_dir = None 3735 3736 @staticmethod 3737 def factory(tool): 3738 if tool == 'lcov': 3739 t = Lcov() 3740 elif tool == 'gcovr': 3741 t = Gcovr() 3742 else: 3743 logger.error("Unsupported coverage tool specified: {}".format(tool)) 3744 return None 3745 3746 logger.debug(f"Select {tool} as the coverage tool...") 3747 return t 3748 3749 @staticmethod 3750 def retrieve_gcov_data(input_file): 3751 logger.debug("Working on %s" % input_file) 3752 extracted_coverage_info = {} 3753 capture_data = False 3754 capture_complete = False 3755 with open(input_file, 'r') as fp: 3756 for line in fp.readlines(): 3757 if re.search("GCOV_COVERAGE_DUMP_START", line): 3758 capture_data = True 3759 continue 3760 if re.search("GCOV_COVERAGE_DUMP_END", line): 3761 capture_complete = True 3762 break 3763 # Loop until the coverage data is found. 3764 if not capture_data: 3765 continue 3766 if line.startswith("*"): 3767 sp = line.split("<") 3768 if len(sp) > 1: 3769 # Remove the leading delimiter "*" 3770 file_name = sp[0][1:] 3771 # Remove the trailing new line char 3772 hex_dump = sp[1][:-1] 3773 else: 3774 continue 3775 else: 3776 continue 3777 extracted_coverage_info.update({file_name: hex_dump}) 3778 if not capture_data: 3779 capture_complete = True 3780 return {'complete': capture_complete, 'data': extracted_coverage_info} 3781 3782 @staticmethod 3783 def create_gcda_files(extracted_coverage_info): 3784 logger.debug("Generating gcda files") 3785 for filename, hexdump_val in extracted_coverage_info.items(): 3786 # if kobject_hash is given for coverage gcovr fails 3787 # hence skipping it problem only in gcovr v4.1 3788 if "kobject_hash" in filename: 3789 filename = (filename[:-4]) + "gcno" 3790 try: 3791 os.remove(filename) 3792 except Exception: 3793 pass 3794 continue 3795 3796 with open(filename, 'wb') as fp: 3797 fp.write(bytes.fromhex(hexdump_val)) 3798 3799 def generate(self, outdir): 3800 for filename in glob.glob("%s/**/handler.log" % outdir, recursive=True): 3801 gcov_data = self.__class__.retrieve_gcov_data(filename) 3802 capture_complete = gcov_data['complete'] 3803 extracted_coverage_info = gcov_data['data'] 3804 if capture_complete: 3805 self.__class__.create_gcda_files(extracted_coverage_info) 3806 logger.debug("Gcov data captured: {}".format(filename)) 3807 else: 3808 logger.error("Gcov data capture incomplete: {}".format(filename)) 3809 3810 with open(os.path.join(outdir, "coverage.log"), "a") as coveragelog: 3811 ret = self._generate(outdir, coveragelog) 3812 if ret == 0: 3813 logger.info("HTML report generated: {}".format( 3814 os.path.join(outdir, "coverage", "index.html"))) 3815 3816 3817class Lcov(CoverageTool): 3818 3819 def __init__(self): 3820 super().__init__() 3821 self.ignores = [] 3822 3823 def add_ignore_file(self, pattern): 3824 self.ignores.append('*' + pattern + '*') 3825 3826 def add_ignore_directory(self, pattern): 3827 self.ignores.append('*/' + pattern + '/*') 3828 3829 def _generate(self, outdir, coveragelog): 3830 coveragefile = os.path.join(outdir, "coverage.info") 3831 ztestfile = os.path.join(outdir, "ztest.info") 3832 cmd = ["lcov", "--gcov-tool", self.gcov_tool, 3833 "--capture", "--directory", outdir, 3834 "--rc", "lcov_branch_coverage=1", 3835 "--output-file", coveragefile] 3836 cmd_str = " ".join(cmd) 3837 logger.debug(f"Running {cmd_str}...") 3838 subprocess.call(cmd, stdout=coveragelog) 3839 # We want to remove tests/* and tests/ztest/test/* but save tests/ztest 3840 subprocess.call(["lcov", "--gcov-tool", self.gcov_tool, "--extract", 3841 coveragefile, 3842 os.path.join(self.base_dir, "tests", "ztest", "*"), 3843 "--output-file", ztestfile, 3844 "--rc", "lcov_branch_coverage=1"], stdout=coveragelog) 3845 3846 if os.path.exists(ztestfile) and os.path.getsize(ztestfile) > 0: 3847 subprocess.call(["lcov", "--gcov-tool", self.gcov_tool, "--remove", 3848 ztestfile, 3849 os.path.join(self.base_dir, "tests/ztest/test/*"), 3850 "--output-file", ztestfile, 3851 "--rc", "lcov_branch_coverage=1"], 3852 stdout=coveragelog) 3853 files = [coveragefile, ztestfile] 3854 else: 3855 files = [coveragefile] 3856 3857 for i in self.ignores: 3858 subprocess.call( 3859 ["lcov", "--gcov-tool", self.gcov_tool, "--remove", 3860 coveragefile, i, "--output-file", 3861 coveragefile, "--rc", "lcov_branch_coverage=1"], 3862 stdout=coveragelog) 3863 3864 # The --ignore-errors source option is added to avoid it exiting due to 3865 # samples/application_development/external_lib/ 3866 return subprocess.call(["genhtml", "--legend", "--branch-coverage", 3867 "--ignore-errors", "source", 3868 "-output-directory", 3869 os.path.join(outdir, "coverage")] + files, 3870 stdout=coveragelog) 3871 3872 3873class Gcovr(CoverageTool): 3874 3875 def __init__(self): 3876 super().__init__() 3877 self.ignores = [] 3878 3879 def add_ignore_file(self, pattern): 3880 self.ignores.append('.*' + pattern + '.*') 3881 3882 def add_ignore_directory(self, pattern): 3883 self.ignores.append(".*/" + pattern + '/.*') 3884 3885 @staticmethod 3886 def _interleave_list(prefix, list): 3887 tuple_list = [(prefix, item) for item in list] 3888 return [item for sublist in tuple_list for item in sublist] 3889 3890 def _generate(self, outdir, coveragelog): 3891 coveragefile = os.path.join(outdir, "coverage.json") 3892 ztestfile = os.path.join(outdir, "ztest.json") 3893 3894 excludes = Gcovr._interleave_list("-e", self.ignores) 3895 3896 # We want to remove tests/* and tests/ztest/test/* but save tests/ztest 3897 cmd = ["gcovr", "-r", self.base_dir, "--gcov-executable", 3898 self.gcov_tool, "-e", "tests/*"] + excludes + ["--json", "-o", 3899 coveragefile, outdir] 3900 cmd_str = " ".join(cmd) 3901 logger.debug(f"Running {cmd_str}...") 3902 subprocess.call(cmd, stdout=coveragelog) 3903 3904 subprocess.call(["gcovr", "-r", self.base_dir, "--gcov-executable", 3905 self.gcov_tool, "-f", "tests/ztest", "-e", 3906 "tests/ztest/test/*", "--json", "-o", ztestfile, 3907 outdir], stdout=coveragelog) 3908 3909 if os.path.exists(ztestfile) and os.path.getsize(ztestfile) > 0: 3910 files = [coveragefile, ztestfile] 3911 else: 3912 files = [coveragefile] 3913 3914 subdir = os.path.join(outdir, "coverage") 3915 os.makedirs(subdir, exist_ok=True) 3916 3917 tracefiles = self._interleave_list("--add-tracefile", files) 3918 3919 return subprocess.call(["gcovr", "-r", self.base_dir, "--html", 3920 "--html-details"] + tracefiles + 3921 ["-o", os.path.join(subdir, "index.html")], 3922 stdout=coveragelog) 3923 3924class DUT(object): 3925 def __init__(self, 3926 id=None, 3927 serial=None, 3928 platform=None, 3929 product=None, 3930 serial_pty=None, 3931 connected=False, 3932 pre_script=None, 3933 post_script=None, 3934 post_flash_script=None, 3935 runner=None): 3936 3937 self.serial = serial 3938 self.platform = platform 3939 self.serial_pty = serial_pty 3940 self._counter = Value("i", 0) 3941 self._available = Value("i", 1) 3942 self.connected = connected 3943 self.pre_script = pre_script 3944 self.id = id 3945 self.product = product 3946 self.runner = runner 3947 self.fixtures = [] 3948 self.post_flash_script = post_flash_script 3949 self.post_script = post_script 3950 self.pre_script = pre_script 3951 self.probe_id = None 3952 self.notes = None 3953 self.lock = Lock() 3954 self.match = False 3955 3956 3957 @property 3958 def available(self): 3959 with self._available.get_lock(): 3960 return self._available.value 3961 3962 @available.setter 3963 def available(self, value): 3964 with self._available.get_lock(): 3965 self._available.value = value 3966 3967 @property 3968 def counter(self): 3969 with self._counter.get_lock(): 3970 return self._counter.value 3971 3972 @counter.setter 3973 def counter(self, value): 3974 with self._counter.get_lock(): 3975 self._counter.value = value 3976 3977 def to_dict(self): 3978 d = {} 3979 exclude = ['_available', '_counter', 'match'] 3980 v = vars(self) 3981 for k in v.keys(): 3982 if k not in exclude and v[k]: 3983 d[k] = v[k] 3984 return d 3985 3986 3987 def __repr__(self): 3988 return f"<{self.platform} ({self.product}) on {self.serial}>" 3989 3990class HardwareMap: 3991 schema_path = os.path.join(ZEPHYR_BASE, "scripts", "schemas", "twister", "hwmap-schema.yaml") 3992 3993 manufacturer = [ 3994 'ARM', 3995 'SEGGER', 3996 'MBED', 3997 'STMicroelectronics', 3998 'Atmel Corp.', 3999 'Texas Instruments', 4000 'Silicon Labs', 4001 'NXP Semiconductors', 4002 'Microchip Technology Inc.', 4003 'FTDI', 4004 'Digilent' 4005 ] 4006 4007 runner_mapping = { 4008 'pyocd': [ 4009 'DAPLink CMSIS-DAP', 4010 'MBED CMSIS-DAP' 4011 ], 4012 'jlink': [ 4013 'J-Link', 4014 'J-Link OB' 4015 ], 4016 'openocd': [ 4017 'STM32 STLink', '^XDS110.*', 'STLINK-V3' 4018 ], 4019 'dediprog': [ 4020 'TTL232R-3V3', 4021 'MCP2200 USB Serial Port Emulator' 4022 ] 4023 } 4024 4025 def __init__(self): 4026 self.detected = [] 4027 self.duts = [] 4028 4029 def add_device(self, serial, platform, pre_script, is_pty): 4030 device = DUT(platform=platform, connected=True, pre_script=pre_script) 4031 4032 if is_pty: 4033 device.serial_pty = serial 4034 else: 4035 device.serial = serial 4036 4037 self.duts.append(device) 4038 4039 def load(self, map_file): 4040 hwm_schema = scl.yaml_load(self.schema_path) 4041 duts = scl.yaml_load_verify(map_file, hwm_schema) 4042 for dut in duts: 4043 pre_script = dut.get('pre_script') 4044 post_script = dut.get('post_script') 4045 post_flash_script = dut.get('post_flash_script') 4046 platform = dut.get('platform') 4047 id = dut.get('id') 4048 runner = dut.get('runner') 4049 serial = dut.get('serial') 4050 product = dut.get('product') 4051 fixtures = dut.get('fixtures', []) 4052 new_dut = DUT(platform=platform, 4053 product=product, 4054 runner=runner, 4055 id=id, 4056 serial=serial, 4057 connected=serial is not None, 4058 pre_script=pre_script, 4059 post_script=post_script, 4060 post_flash_script=post_flash_script) 4061 new_dut.fixtures = fixtures 4062 new_dut.counter = 0 4063 self.duts.append(new_dut) 4064 4065 def scan(self, persistent=False): 4066 from serial.tools import list_ports 4067 4068 if persistent and platform.system() == 'Linux': 4069 # On Linux, /dev/serial/by-id provides symlinks to 4070 # '/dev/ttyACMx' nodes using names which are unique as 4071 # long as manufacturers fill out USB metadata nicely. 4072 # 4073 # This creates a map from '/dev/ttyACMx' device nodes 4074 # to '/dev/serial/by-id/usb-...' symlinks. The symlinks 4075 # go into the hardware map because they stay the same 4076 # even when the user unplugs / replugs the device. 4077 # 4078 # Some inexpensive USB/serial adapters don't result 4079 # in unique names here, though, so use of this feature 4080 # requires explicitly setting persistent=True. 4081 by_id = Path('/dev/serial/by-id') 4082 def readlink(link): 4083 return str((by_id / link).resolve()) 4084 4085 persistent_map = {readlink(link): str(link) 4086 for link in by_id.iterdir()} 4087 else: 4088 persistent_map = {} 4089 4090 serial_devices = list_ports.comports() 4091 logger.info("Scanning connected hardware...") 4092 for d in serial_devices: 4093 if d.manufacturer in self.manufacturer: 4094 4095 # TI XDS110 can have multiple serial devices for a single board 4096 # assume endpoint 0 is the serial, skip all others 4097 if d.manufacturer == 'Texas Instruments' and not d.location.endswith('0'): 4098 continue 4099 s_dev = DUT(platform="unknown", 4100 id=d.serial_number, 4101 serial=persistent_map.get(d.device, d.device), 4102 product=d.product, 4103 runner='unknown', 4104 connected=True) 4105 4106 for runner, _ in self.runner_mapping.items(): 4107 products = self.runner_mapping.get(runner) 4108 if d.product in products: 4109 s_dev.runner = runner 4110 continue 4111 # Try regex matching 4112 for p in products: 4113 if re.match(p, d.product): 4114 s_dev.runner = runner 4115 4116 s_dev.connected = True 4117 self.detected.append(s_dev) 4118 else: 4119 logger.warning("Unsupported device (%s): %s" % (d.manufacturer, d)) 4120 4121 def save(self, hwm_file): 4122 # use existing map 4123 self.detected.sort(key=lambda x: x.serial or '') 4124 if os.path.exists(hwm_file): 4125 with open(hwm_file, 'r') as yaml_file: 4126 hwm = yaml.load(yaml_file, Loader=SafeLoader) 4127 if hwm: 4128 hwm.sort(key=lambda x: x['serial'] or '') 4129 4130 # disconnect everything 4131 for h in hwm: 4132 h['connected'] = False 4133 h['serial'] = None 4134 4135 for _detected in self.detected: 4136 for h in hwm: 4137 if _detected.id == h['id'] and _detected.product == h['product'] and not _detected.match: 4138 h['connected'] = True 4139 h['serial'] = _detected.serial 4140 _detected.match = True 4141 4142 new_duts = list(filter(lambda d: not d.match, self.detected)) 4143 new = [] 4144 for d in new_duts: 4145 new.append(d.to_dict()) 4146 4147 if hwm: 4148 hwm = hwm + new 4149 else: 4150 hwm = new 4151 4152 with open(hwm_file, 'w') as yaml_file: 4153 yaml.dump(hwm, yaml_file, Dumper=Dumper, default_flow_style=False) 4154 4155 self.load(hwm_file) 4156 logger.info("Registered devices:") 4157 self.dump() 4158 4159 else: 4160 # create new file 4161 dl = [] 4162 for _connected in self.detected: 4163 platform = _connected.platform 4164 id = _connected.id 4165 runner = _connected.runner 4166 serial = _connected.serial 4167 product = _connected.product 4168 d = { 4169 'platform': platform, 4170 'id': id, 4171 'runner': runner, 4172 'serial': serial, 4173 'product': product, 4174 'connected': _connected.connected 4175 } 4176 dl.append(d) 4177 with open(hwm_file, 'w') as yaml_file: 4178 yaml.dump(dl, yaml_file, Dumper=Dumper, default_flow_style=False) 4179 logger.info("Detected devices:") 4180 self.dump(detected=True) 4181 4182 def dump(self, filtered=[], header=[], connected_only=False, detected=False): 4183 print("") 4184 table = [] 4185 if detected: 4186 to_show = self.detected 4187 else: 4188 to_show = self.duts 4189 4190 if not header: 4191 header = ["Platform", "ID", "Serial device"] 4192 for p in to_show: 4193 platform = p.platform 4194 connected = p.connected 4195 if filtered and platform not in filtered: 4196 continue 4197 4198 if not connected_only or connected: 4199 table.append([platform, p.id, p.serial]) 4200 4201 print(tabulate(table, headers=header, tablefmt="github")) 4202