1# Copyright (c) 2022 Nordic Semiconductor ASA 2# SPDX-License-Identifier: Apache-2.0 3 4from __future__ import annotations 5 6import logging 7import re 8 9from pathlib import Path 10from yaml import safe_load 11from dataclasses import dataclass, field 12 13 14logger = logging.getLogger(__name__) 15 16 17class QuarantineException(Exception): 18 pass 19 20 21class Quarantine: 22 """Handle tests under quarantine.""" 23 24 def __init__(self, quarantine_list=[]) -> None: 25 self.quarantine = QuarantineData() 26 for quarantine_file in quarantine_list: 27 self.quarantine.extend(QuarantineData.load_data_from_yaml(quarantine_file)) 28 29 def get_matched_quarantine(self, testname, platform, architecture, simulation): 30 qelem = self.quarantine.get_matched_quarantine(testname, platform, architecture, simulation) 31 if qelem: 32 logger.debug('%s quarantined with reason: %s' % (testname, qelem.comment)) 33 return qelem.comment 34 return None 35 36 37@dataclass 38class QuarantineElement: 39 scenarios: list[str] = field(default_factory=list) 40 platforms: list[str] = field(default_factory=list) 41 architectures: list[str] = field(default_factory=list) 42 simulations: list[str] = field(default_factory=list) 43 comment: str = 'NA' 44 re_scenarios: list = field(default_factory=list) 45 re_platforms: list = field(default_factory=list) 46 re_architectures: list = field(default_factory=list) 47 re_simulations: list = field(default_factory=list) 48 49 def __post_init__(self): 50 # If there is no entry in filters then take all possible values. 51 # To keep backward compatibility, 'all' keyword might be still used. 52 if 'all' in self.scenarios: 53 self.scenarios = [] 54 if 'all' in self.platforms: 55 self.platforms = [] 56 if 'all' in self.architectures: 57 self.architectures = [] 58 if 'all' in self.simulations: 59 self.simulations = [] 60 # keep precompiled regexp entiries to speed-up matching 61 self.re_scenarios = [re.compile(pat) for pat in self.scenarios] 62 self.re_platforms = [re.compile(pat) for pat in self.platforms] 63 self.re_architectures = [re.compile(pat) for pat in self.architectures] 64 self.re_simulations = [re.compile(pat) for pat in self.simulations] 65 66 # However, at least one of the filters ('scenarios', platforms' ...) 67 # must be given (there is no sense to put all possible configuration 68 # into quarantine) 69 if not any([self.scenarios, self.platforms, self.architectures, self.simulations]): 70 raise QuarantineException("At least one of filters ('scenarios', 'platforms' ...) " 71 "must be specified") 72 73 74@dataclass 75class QuarantineData: 76 qlist: list[QuarantineElement] = field(default_factory=list) 77 78 def __post_init__(self): 79 qelements = [] 80 for qelem in self.qlist: 81 if isinstance(qelem, QuarantineElement): 82 qelements.append(qelem) 83 else: 84 qelements.append(QuarantineElement(**qelem)) 85 self.qlist = qelements 86 87 @classmethod 88 def load_data_from_yaml(cls, filename: str | Path) -> QuarantineData: 89 """Load quarantine from yaml file.""" 90 with open(filename, 'r', encoding='UTF-8') as yaml_fd: 91 qlist_raw_data: list[dict] = safe_load(yaml_fd) 92 try: 93 if not qlist_raw_data: 94 # in case of loading empty quarantine file 95 return cls() 96 return cls(qlist_raw_data) 97 98 except Exception as e: 99 logger.error(f'When loading {filename} received error: {e}') 100 raise QuarantineException('Cannot load Quarantine data') from e 101 102 def extend(self, qdata: QuarantineData) -> None: 103 self.qlist.extend(qdata.qlist) 104 105 def get_matched_quarantine(self, 106 scenario: str, 107 platform: str, 108 architecture: str, 109 simulation: str) -> QuarantineElement | None: 110 """Return quarantine element if test is matched to quarantine rules""" 111 for qelem in self.qlist: 112 matched: bool = False 113 if (qelem.scenarios 114 and (matched := _is_element_matched(scenario, qelem.re_scenarios)) is False): 115 continue 116 if (qelem.platforms 117 and (matched := _is_element_matched(platform, qelem.re_platforms)) is False): 118 continue 119 if (qelem.architectures 120 and (matched := _is_element_matched(architecture, qelem.re_architectures)) is False): 121 continue 122 if (qelem.simulations 123 and (matched := _is_element_matched(simulation, qelem.re_simulations)) is False): 124 continue 125 126 if matched: 127 return qelem 128 return None 129 130 131def _is_element_matched(element: str, list_of_elements: list[re.Pattern]) -> bool: 132 """Return True if given element is matching to any of elements from the list""" 133 for pattern in list_of_elements: 134 if pattern.fullmatch(element): 135 return True 136 return False 137