1# Copyright (c) 2022 Nordic Semiconductor ASA
2# Copyright (c) 2024 Arm Limited (or its affiliates). All rights reserved.
3#
4# SPDX-License-Identifier: Apache-2.0
5
6from __future__ import annotations
7
8import logging
9import re
10from dataclasses import dataclass, field
11from pathlib import Path
12
13import yaml
14
15try:
16    from yaml import CSafeLoader as SafeLoader
17except ImportError:
18    from yaml import SafeLoader
19
20
21logger = logging.getLogger(__name__)
22
23
24class QuarantineException(Exception):
25    pass
26
27
28class Quarantine:
29    """Handle tests under quarantine."""
30
31    def __init__(self, quarantine_list=None) -> None:
32        if quarantine_list is None:
33            quarantine_list = []
34        self.quarantine = QuarantineData()
35        for quarantine_file in quarantine_list:
36            self.quarantine.extend(QuarantineData.load_data_from_yaml(quarantine_file))
37
38    def get_matched_quarantine(self, testname, platform, architecture, simulator):
39        qelem = self.quarantine.get_matched_quarantine(testname, platform, architecture, simulator)
40        if qelem:
41            logger.debug(f'{testname} quarantined with reason: {qelem.comment}')
42            return qelem.comment
43        return None
44
45
46@dataclass
47class QuarantineElement:
48    scenarios: list[str] = field(default_factory=list)
49    platforms: list[str] = field(default_factory=list)
50    architectures: list[str] = field(default_factory=list)
51    simulations: list[str] = field(default_factory=list)
52    comment: str = 'NA'
53    re_scenarios: list = field(default_factory=list)
54    re_platforms: list = field(default_factory=list)
55    re_architectures: list = field(default_factory=list)
56    re_simulations: list = field(default_factory=list)
57
58    def __post_init__(self):
59        # If there is no entry in filters then take all possible values.
60        # To keep backward compatibility, 'all' keyword might be still used.
61        if 'all' in self.scenarios:
62            self.scenarios = []
63        if 'all' in self.platforms:
64            self.platforms = []
65        if 'all' in self.architectures:
66            self.architectures = []
67        if 'all' in self.simulations:
68            self.simulations = []
69        # keep precompiled regexp entiries to speed-up matching
70        self.re_scenarios = [re.compile(pat) for pat in self.scenarios]
71        self.re_platforms = [re.compile(pat) for pat in self.platforms]
72        self.re_architectures = [re.compile(pat) for pat in self.architectures]
73        self.re_simulations = [re.compile(pat) for pat in self.simulations]
74
75        # However, at least one of the filters ('scenarios', platforms' ...)
76        # must be given (there is no sense to put all possible configuration
77        # into quarantine)
78        if not any([self.scenarios, self.platforms, self.architectures, self.simulations]):
79            raise QuarantineException("At least one of filters ('scenarios', 'platforms' ...) "
80                                      "must be specified")
81
82
83@dataclass
84class QuarantineData:
85    qlist: list[QuarantineElement] = field(default_factory=list)
86
87    def __post_init__(self):
88        qelements = []
89        for qelem in self.qlist:
90            if isinstance(qelem, QuarantineElement):
91                qelements.append(qelem)
92            else:
93                qelements.append(QuarantineElement(**qelem))
94        self.qlist = qelements
95
96    @classmethod
97    def load_data_from_yaml(cls, filename: str | Path) -> QuarantineData:
98        """Load quarantine from yaml file."""
99        with open(filename, encoding='UTF-8') as yaml_fd:
100            qlist_raw_data: list[dict] = yaml.load(yaml_fd, Loader=SafeLoader)
101        try:
102            if not qlist_raw_data:
103                # in case of loading empty quarantine file
104                return cls()
105            return cls(qlist_raw_data)
106
107        except Exception as e:
108            logger.error(f'When loading {filename} received error: {e}')
109            raise QuarantineException('Cannot load Quarantine data') from e
110
111    def extend(self, qdata: QuarantineData) -> None:
112        self.qlist.extend(qdata.qlist)
113
114    def get_matched_quarantine(self,
115                               scenario: str,
116                               platform: str,
117                               architecture: str,
118                               simulator_name: str) -> QuarantineElement | None:
119        """Return quarantine element if test is matched to quarantine rules"""
120        for qelem in self.qlist:
121            matched: bool = False
122            if (qelem.scenarios
123                    and (matched := _is_element_matched(scenario, qelem.re_scenarios)) is False):
124                continue
125            if (qelem.platforms
126                    and (matched := _is_element_matched(platform, qelem.re_platforms)) is False):
127                continue
128            if (
129                qelem.architectures
130                and (matched := _is_element_matched(architecture, qelem.re_architectures)) is False
131            ):
132                continue
133            if (
134                qelem.simulations
135                and (matched := _is_element_matched(simulator_name, qelem.re_simulations)) is False
136            ):
137                continue
138
139            if matched:
140                return qelem
141        return None
142
143
144def _is_element_matched(element: str, list_of_elements: list[re.Pattern]) -> bool:
145    """Return True if given element is matching to any of elements from the list"""
146    return any(pattern.fullmatch(element) for pattern in list_of_elements)
147