1# SPDX-License-Identifier: Apache-2.0
2import re
3import os
4import subprocess
5from collections import OrderedDict
6import xml.etree.ElementTree as ET
7
8result_re = re.compile(".*(PASS|FAIL|SKIP) - (test_)?(.*) in")
9
10class Harness:
11    GCOV_START = "GCOV_COVERAGE_DUMP_START"
12    GCOV_END = "GCOV_COVERAGE_DUMP_END"
13    FAULT = "ZEPHYR FATAL ERROR"
14    RUN_PASSED = "PROJECT EXECUTION SUCCESSFUL"
15    RUN_FAILED = "PROJECT EXECUTION FAILED"
16
17    def __init__(self):
18        self.state = None
19        self.type = None
20        self.regex = []
21        self.matches = OrderedDict()
22        self.ordered = True
23        self.repeat = 1
24        self.tests = {}
25        self.id = None
26        self.fail_on_fault = True
27        self.fault = False
28        self.capture_coverage = False
29        self.next_pattern = 0
30        self.record = None
31        self.recording = []
32        self.fieldnames = []
33        self.ztest = False
34        self.is_pytest = False
35
36    def configure(self, instance):
37        config = instance.testcase.harness_config
38        self.id = instance.testcase.id
39        if "ignore_faults" in instance.testcase.tags:
40            self.fail_on_fault = False
41
42        if config:
43            self.type = config.get('type', None)
44            self.regex = config.get('regex', [])
45            self.repeat = config.get('repeat', 1)
46            self.ordered = config.get('ordered', True)
47            self.record = config.get('record', {})
48
49    def process_test(self, line):
50
51        if self.RUN_PASSED in line:
52            if self.fault:
53                self.state = "failed"
54            else:
55                self.state = "passed"
56
57        if self.RUN_FAILED in line:
58            self.state = "failed"
59
60        if self.fail_on_fault:
61            if self.FAULT == line:
62                self.fault = True
63
64        if self.GCOV_START in line:
65            self.capture_coverage = True
66        elif self.GCOV_END in line:
67            self.capture_coverage = False
68
69class Console(Harness):
70
71    def configure(self, instance):
72        super(Console, self).configure(instance)
73        if self.type == "one_line":
74            self.pattern = re.compile(self.regex[0])
75        elif self.type == "multi_line":
76            self.patterns = []
77            for r in self.regex:
78                self.patterns.append(re.compile(r))
79
80    def handle(self, line):
81        if self.type == "one_line":
82            if self.pattern.search(line):
83                self.state = "passed"
84        elif self.type == "multi_line" and self.ordered:
85            if (self.next_pattern < len(self.patterns) and
86                self.patterns[self.next_pattern].search(line)):
87                self.next_pattern += 1
88                if self.next_pattern >= len(self.patterns):
89                    self.state = "passed"
90        elif self.type == "multi_line" and not self.ordered:
91            for i, pattern in enumerate(self.patterns):
92                r = self.regex[i]
93                if pattern.search(line) and not r in self.matches:
94                    self.matches[r] = line
95            if len(self.matches) == len(self.regex):
96                self.state = "passed"
97
98        if self.fail_on_fault:
99            if self.FAULT in line:
100                self.fault = True
101
102        if self.GCOV_START in line:
103            self.capture_coverage = True
104        elif self.GCOV_END in line:
105            self.capture_coverage = False
106
107
108        if self.record:
109            pattern = re.compile(self.record.get("regex", ""))
110            match = pattern.search(line)
111            if match:
112                csv = []
113                if not self.fieldnames:
114                    for k,v in match.groupdict().items():
115                        self.fieldnames.append(k)
116
117                for k,v in match.groupdict().items():
118                    csv.append(v.strip())
119                self.recording.append(csv)
120
121        self.process_test(line)
122
123        if self.state == "passed":
124            self.tests[self.id] = "PASS"
125        else:
126            self.tests[self.id] = "FAIL"
127
128class Pytest(Harness):
129    def configure(self, instance):
130        super(Pytest, self).configure(instance)
131        self.running_dir = instance.build_dir
132        self.source_dir = instance.testcase.source_dir
133        self.pytest_root = 'pytest'
134        self.is_pytest = True
135        config = instance.testcase.harness_config
136
137        if config:
138            self.pytest_root = config.get('pytest_root', 'pytest')
139
140    def handle(self, line):
141        ''' Test cases that make use of pytest more care about results given
142            by pytest tool which is called in pytest_run(), so works of this
143            handle is trying to give a PASS or FAIL to avoid timeout, nothing
144            is writen into handler.log
145        '''
146        self.state = "passed"
147        self.tests[self.id] = "PASS"
148
149    def pytest_run(self, log_file):
150        ''' To keep artifacts of pytest in self.running_dir, pass this directory
151            by "--cmdopt". On pytest end, add a command line option and provide
152            the cmdopt through a fixture function
153            If pytest harness report failure, twister will direct user to see
154            handler.log, this method writes test result in handler.log
155        '''
156        cmd = [
157			'pytest',
158			'-s',
159			os.path.join(self.source_dir, self.pytest_root),
160			'--cmdopt',
161			self.running_dir,
162			'--junit-xml',
163			os.path.join(self.running_dir, 'report.xml'),
164			'-q'
165        ]
166
167        log = open(log_file, "a")
168        outs = []
169        errs = []
170
171        with subprocess.Popen(cmd,
172                              stdout = subprocess.PIPE,
173                              stderr = subprocess.PIPE) as proc:
174            try:
175                outs, errs = proc.communicate()
176                tree = ET.parse(os.path.join(self.running_dir, "report.xml"))
177                root = tree.getroot()
178                for child in root:
179                    if child.tag == 'testsuite':
180                        if child.attrib['failures'] != '0':
181                            self.state = "failed"
182                        elif child.attrib['skipped'] != '0':
183                            self.state = "skipped"
184                        elif child.attrib['errors'] != '0':
185                            self.state = "errors"
186                        else:
187                            self.state = "passed"
188            except subprocess.TimeoutExpired:
189                proc.kill()
190                self.state = "failed"
191            except ET.ParseError:
192                self.state = "failed"
193            except IOError:
194                log.write("Can't access report.xml\n")
195                self.state = "failed"
196
197        if self.state == "passed":
198            self.tests[self.id] = "PASS"
199            log.write("Pytest cases passed\n")
200        elif self.state == "skipped":
201            self.tests[self.id] = "SKIP"
202            log.write("Pytest cases skipped\n")
203            log.write("Please refer report.xml for detail")
204        else:
205            self.tests[self.id] = "FAIL"
206            log.write("Pytest cases failed\n")
207
208        log.write("\nOutput from pytest:\n")
209        log.write(outs.decode('UTF-8'))
210        log.write(errs.decode('UTF-8'))
211        log.close()
212
213
214class Test(Harness):
215    RUN_PASSED = "PROJECT EXECUTION SUCCESSFUL"
216    RUN_FAILED = "PROJECT EXECUTION FAILED"
217
218    def handle(self, line):
219        match = result_re.match(line)
220        if match and match.group(2):
221            name = "{}.{}".format(self.id, match.group(3))
222            self.tests[name] = match.group(1)
223            self.ztest = True
224
225        self.process_test(line)
226
227        if not self.ztest and self.state:
228            if self.state == "passed":
229                self.tests[self.id] = "PASS"
230            else:
231                self.tests[self.id] = "FAIL"
232
233
234class Ztest(Test):
235    pass
236