1# Copyright (c) 2022 Nordic Semiconductor ASA
2# SPDX-License-Identifier: Apache-2.0
3
4from __future__ import annotations
5
6import logging
7import re
8
9from pathlib import Path
10from yaml import safe_load
11from dataclasses import dataclass, field
12
13
14logger = logging.getLogger(__name__)
15
16
17class QuarantineException(Exception):
18    pass
19
20
21class Quarantine:
22    """Handle tests under quarantine."""
23
24    def __init__(self, quarantine_list=[]) -> None:
25        self.quarantine = QuarantineData()
26        for quarantine_file in quarantine_list:
27            self.quarantine.extend(QuarantineData.load_data_from_yaml(quarantine_file))
28
29    def get_matched_quarantine(self, testname, platform, architecture, simulation):
30        qelem = self.quarantine.get_matched_quarantine(testname, platform, architecture, simulation)
31        if qelem:
32            logger.debug('%s quarantined with reason: %s' % (testname, qelem.comment))
33            return qelem.comment
34        return None
35
36
37@dataclass
38class QuarantineElement:
39    scenarios: list[str] = field(default_factory=list)
40    platforms: list[str] = field(default_factory=list)
41    architectures: list[str] = field(default_factory=list)
42    simulations: list[str] = field(default_factory=list)
43    comment: str = 'NA'
44    re_scenarios: list = field(default_factory=list)
45    re_platforms: list = field(default_factory=list)
46    re_architectures: list = field(default_factory=list)
47    re_simulations: list = field(default_factory=list)
48
49    def __post_init__(self):
50        # If there is no entry in filters then take all possible values.
51        # To keep backward compatibility, 'all' keyword might be still used.
52        if 'all' in self.scenarios:
53            self.scenarios = []
54        if 'all' in self.platforms:
55            self.platforms = []
56        if 'all' in self.architectures:
57            self.architectures = []
58        if 'all' in self.simulations:
59            self.simulations = []
60        # keep precompiled regexp entiries to speed-up matching
61        self.re_scenarios = [re.compile(pat) for pat in self.scenarios]
62        self.re_platforms = [re.compile(pat) for pat in self.platforms]
63        self.re_architectures = [re.compile(pat) for pat in self.architectures]
64        self.re_simulations = [re.compile(pat) for pat in self.simulations]
65
66        # However, at least one of the filters ('scenarios', platforms' ...)
67        # must be given (there is no sense to put all possible configuration
68        # into quarantine)
69        if not any([self.scenarios, self.platforms, self.architectures, self.simulations]):
70            raise QuarantineException("At least one of filters ('scenarios', 'platforms' ...) "
71                                      "must be specified")
72
73
74@dataclass
75class QuarantineData:
76    qlist: list[QuarantineElement] = field(default_factory=list)
77
78    def __post_init__(self):
79        qelements = []
80        for qelem in self.qlist:
81            if isinstance(qelem, QuarantineElement):
82                qelements.append(qelem)
83            else:
84                qelements.append(QuarantineElement(**qelem))
85        self.qlist = qelements
86
87    @classmethod
88    def load_data_from_yaml(cls, filename: str | Path) -> QuarantineData:
89        """Load quarantine from yaml file."""
90        with open(filename, 'r', encoding='UTF-8') as yaml_fd:
91            qlist_raw_data: list[dict] = safe_load(yaml_fd)
92        try:
93            if not qlist_raw_data:
94                # in case of loading empty quarantine file
95                return cls()
96            return cls(qlist_raw_data)
97
98        except Exception as e:
99            logger.error(f'When loading {filename} received error: {e}')
100            raise QuarantineException('Cannot load Quarantine data') from e
101
102    def extend(self, qdata: QuarantineData) -> None:
103        self.qlist.extend(qdata.qlist)
104
105    def get_matched_quarantine(self,
106                               scenario: str,
107                               platform: str,
108                               architecture: str,
109                               simulation: str) -> QuarantineElement | None:
110        """Return quarantine element if test is matched to quarantine rules"""
111        for qelem in self.qlist:
112            matched: bool = False
113            if (qelem.scenarios
114                    and (matched := _is_element_matched(scenario, qelem.re_scenarios)) is False):
115                continue
116            if (qelem.platforms
117                    and (matched := _is_element_matched(platform, qelem.re_platforms)) is False):
118                continue
119            if (qelem.architectures
120                    and (matched := _is_element_matched(architecture, qelem.re_architectures)) is False):
121                continue
122            if (qelem.simulations
123                    and (matched := _is_element_matched(simulation, qelem.re_simulations)) is False):
124                continue
125
126            if matched:
127                return qelem
128        return None
129
130
131def _is_element_matched(element: str, list_of_elements: list[re.Pattern]) -> bool:
132    """Return True if given element is matching to any of elements from the list"""
133    for pattern in list_of_elements:
134        if pattern.fullmatch(element):
135            return True
136    return False
137