1# Copyright (c) 2022 Nordic Semiconductor ASA 2# Copyright (c) 2024 Arm Limited (or its affiliates). All rights reserved. 3# 4# SPDX-License-Identifier: Apache-2.0 5 6from __future__ import annotations 7 8import logging 9import re 10from dataclasses import dataclass, field 11from pathlib import Path 12 13import yaml 14 15try: 16 from yaml import CSafeLoader as SafeLoader 17except ImportError: 18 from yaml import SafeLoader 19 20 21logger = logging.getLogger(__name__) 22 23 24class QuarantineException(Exception): 25 pass 26 27 28class Quarantine: 29 """Handle tests under quarantine.""" 30 31 def __init__(self, quarantine_list=None) -> None: 32 if quarantine_list is None: 33 quarantine_list = [] 34 self.quarantine = QuarantineData() 35 for quarantine_file in quarantine_list: 36 self.quarantine.extend(QuarantineData.load_data_from_yaml(quarantine_file)) 37 38 def get_matched_quarantine(self, testname, platform, architecture, simulator): 39 qelem = self.quarantine.get_matched_quarantine(testname, platform, architecture, simulator) 40 if qelem: 41 logger.debug(f'{testname} quarantined with reason: {qelem.comment}') 42 return qelem.comment 43 return None 44 45 46@dataclass 47class QuarantineElement: 48 scenarios: list[str] = field(default_factory=list) 49 platforms: list[str] = field(default_factory=list) 50 architectures: list[str] = field(default_factory=list) 51 simulations: list[str] = field(default_factory=list) 52 comment: str = 'NA' 53 re_scenarios: list = field(default_factory=list) 54 re_platforms: list = field(default_factory=list) 55 re_architectures: list = field(default_factory=list) 56 re_simulations: list = field(default_factory=list) 57 58 def __post_init__(self): 59 # If there is no entry in filters then take all possible values. 60 # To keep backward compatibility, 'all' keyword might be still used. 61 if 'all' in self.scenarios: 62 self.scenarios = [] 63 if 'all' in self.platforms: 64 self.platforms = [] 65 if 'all' in self.architectures: 66 self.architectures = [] 67 if 'all' in self.simulations: 68 self.simulations = [] 69 # keep precompiled regexp entiries to speed-up matching 70 self.re_scenarios = [re.compile(pat) for pat in self.scenarios] 71 self.re_platforms = [re.compile(pat) for pat in self.platforms] 72 self.re_architectures = [re.compile(pat) for pat in self.architectures] 73 self.re_simulations = [re.compile(pat) for pat in self.simulations] 74 75 # However, at least one of the filters ('scenarios', platforms' ...) 76 # must be given (there is no sense to put all possible configuration 77 # into quarantine) 78 if not any([self.scenarios, self.platforms, self.architectures, self.simulations]): 79 raise QuarantineException("At least one of filters ('scenarios', 'platforms' ...) " 80 "must be specified") 81 82 83@dataclass 84class QuarantineData: 85 qlist: list[QuarantineElement] = field(default_factory=list) 86 87 def __post_init__(self): 88 qelements = [] 89 for qelem in self.qlist: 90 if isinstance(qelem, QuarantineElement): 91 qelements.append(qelem) 92 else: 93 qelements.append(QuarantineElement(**qelem)) 94 self.qlist = qelements 95 96 @classmethod 97 def load_data_from_yaml(cls, filename: str | Path) -> QuarantineData: 98 """Load quarantine from yaml file.""" 99 with open(filename, encoding='UTF-8') as yaml_fd: 100 qlist_raw_data: list[dict] = yaml.load(yaml_fd, Loader=SafeLoader) 101 try: 102 if not qlist_raw_data: 103 # in case of loading empty quarantine file 104 return cls() 105 return cls(qlist_raw_data) 106 107 except Exception as e: 108 logger.error(f'When loading {filename} received error: {e}') 109 raise QuarantineException('Cannot load Quarantine data') from e 110 111 def extend(self, qdata: QuarantineData) -> None: 112 self.qlist.extend(qdata.qlist) 113 114 def get_matched_quarantine(self, 115 scenario: str, 116 platform: str, 117 architecture: str, 118 simulator_name: str) -> QuarantineElement | None: 119 """Return quarantine element if test is matched to quarantine rules""" 120 for qelem in self.qlist: 121 matched: bool = False 122 if (qelem.scenarios 123 and (matched := _is_element_matched(scenario, qelem.re_scenarios)) is False): 124 continue 125 if (qelem.platforms 126 and (matched := _is_element_matched(platform, qelem.re_platforms)) is False): 127 continue 128 if ( 129 qelem.architectures 130 and (matched := _is_element_matched(architecture, qelem.re_architectures)) is False 131 ): 132 continue 133 if ( 134 qelem.simulations 135 and (matched := _is_element_matched(simulator_name, qelem.re_simulations)) is False 136 ): 137 continue 138 139 if matched: 140 return qelem 141 return None 142 143 144def _is_element_matched(element: str, list_of_elements: list[re.Pattern]) -> bool: 145 """Return True if given element is matching to any of elements from the list""" 146 return any(pattern.fullmatch(element) for pattern in list_of_elements) 147