1#!/usr/bin/env python3
2# Copyright (c) 2025 Nordic Semiconductor NA
3# SPDX-License-Identifier: Apache-2.0
4from __future__ import annotations
5
6import argparse
7import csv
8import json
9import logging
10import os
11import textwrap
12from dataclasses import asdict, dataclass, field, is_dataclass
13
14logger = logging.getLogger(__name__)
15
16
17def create_parser() -> argparse.ArgumentParser:
18    parser = argparse.ArgumentParser(
19        formatter_class=argparse.RawDescriptionHelpFormatter,
20        allow_abbrev=False,
21        description='Analyzes Twister JSON reports',
22        epilog=(
23            textwrap.dedent("""
24            Example usage:
25                To analyze errors with predefined CMake and Build error patterns, run:
26                > python %(prog)s twister_reports/*.json --long-summary
27                The summary will be saved to twister_report_summary.json file unless --output option is used.
28                To save error summary to CSV file, use --output-csv option (number of test files is limited to 100):
29                > python %(prog)s twister_reports/*.json --output-csv twister_report_summary.csv
30                One can use --error-patterns option to provide custom error patterns file:
31                > python %(prog)s **/twister.json --error-patterns error_patterns.txt
32        """)  # noqa E501
33        ),
34    )
35    parser.add_argument('inputs', type=str, nargs="+", help='twister.json files to read')
36    parser.add_argument(
37        '--error-patterns',
38        type=str,
39        help='text file with custom error patterns, ' 'each entry must be separated by newlines',
40    )
41    parser.add_argument(
42        '--output',
43        type=str,
44        default='twister_report_summary.json',
45        help='output json file name, default: twister_report_summary.json',
46    )
47    parser.add_argument('--output-csv', type=str, help='output csv file name')
48    parser.add_argument(
49        '--output-md', type=str, help='output markdown file name to store table with errors'
50    )
51    parser.add_argument(
52        '--status',
53        action='store_true',
54        help='add statuses of testsuites and testcases to the summary',
55    )
56    parser.add_argument(
57        '--platforms',
58        action='store_true',
59        help='add errors per platform to the summary',
60    )
61    parser.add_argument(
62        '--long-summary',
63        action='store_true',
64        help='show all matched errors grouped by reason, otherwise show only most common errors',
65    )
66
67    parser.add_argument(
68        '-ll',
69        '--log-level',
70        type=str.upper,
71        default='INFO',
72        choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
73    )
74    return parser
75
76
77@dataclass
78class Counters:
79    counters: dict[str, TestCollection] = field(default_factory=dict)
80
81    def add_counter(self, key: str, test: str = '') -> None:
82        self.counters[key] = self.counters.get(key, TestCollection())
83        self.counters[key].append(test)
84
85    def print_counters(self, indent: int = 0):
86        for key, value in self.counters.items():
87            print(f'{" " * indent}{value.quantity:4}    {key}')
88            if value.has_subcounters():
89                value.subcounters.print_counters(indent + 4)
90
91    def sort_by_quantity(self):
92        self.counters = dict(
93            sorted(self.counters.items(), key=lambda item: item[1].quantity, reverse=True)
94        )
95        for value in self.counters.values():
96            if value.has_subcounters():
97                value.subcounters.sort_by_quantity()
98
99    def get_next_entry(self, depth: int = 0, max_depth: int = 10):
100        for key, value in self.counters.items():
101            # limit number of test files to 100 to not exceed CSV cell limit
102            yield depth, value.quantity, key, ', '.join(value.tests[0:100])
103            if value.has_subcounters() and depth < max_depth:
104                yield from value.subcounters.get_next_entry(depth + 1, max_depth)
105
106    def _flatten(self):
107        """
108        Yield all deepest counters in a flat structure.
109        Deepest counters refer to those counters which
110        do not contain any further nested subcounters.
111        """
112        for key, value in self.counters.items():
113            if value.has_subcounters():
114                yield from value.subcounters._flatten()
115            else:
116                yield key, value
117
118    def get_most_common(self, n: int = 10):
119        return dict(sorted(self._flatten(), key=lambda item: item[1].quantity, reverse=True)[:n])
120
121
122@dataclass
123class TestCollection:
124    quantity: int = 0
125    tests: list[str] = field(default_factory=list)
126    subcounters: Counters = field(default_factory=Counters)
127
128    def append(self, test: str = ''):
129        self.quantity += 1
130        if test:
131            self.tests.append(test)
132
133    def has_subcounters(self):
134        return bool(self.subcounters.counters)
135
136
137class TwisterReports:
138    def __init__(self):
139        self.status: Counters = Counters()
140        self.errors: Counters = Counters()
141        self.platforms: Counters = Counters()
142
143    def parse_report(self, json_filename):
144        logger.info(f'Process {json_filename}')
145        with open(json_filename) as json_results:
146            json_data = json.load(json_results)
147
148        for ts in json_data.get('testsuites', []):
149            self.parse_statuses(ts)
150
151        for ts in json_data.get('testsuites', []):
152            self.parse_testsuite(ts)
153
154    def parse_statuses(self, testsuite):
155        ts_status = testsuite.get('status', 'no status in testsuite')
156        self.status.add_counter(ts_status)
157        # Process testcases
158        for tc in testsuite.get('testcases', []):
159            tc_status = tc.get('status')
160            self.status.counters[ts_status].subcounters.add_counter(tc_status)
161
162    def parse_testsuite(self, testsuite):
163        ts_status = testsuite.get('status') or 'no status in testsuite'
164        if ts_status not in ('error', 'failed'):
165            return
166
167        ts_platform = testsuite.get('platform') or 'Unknown platform'
168        self.platforms.add_counter(ts_platform)
169        ts_reason = testsuite.get('reason') or 'Unknown reason'
170        ts_log = testsuite.get('log')
171        test_identifier = f'{testsuite.get("platform")}:{testsuite.get("name")}'
172
173        # CMake and Build failures are treated separately.
174        # Extract detailed information to group errors. Keep the parsing methods
175        # to allow for further customization and keep backward compatibility.
176        if ts_reason.startswith('CMake build failure'):
177            reason = 'CMake build failure'
178            self.errors.add_counter(reason)
179            error_key = ts_reason.split(reason, 1)[-1].lstrip(' -')
180            if not error_key:
181                error_key = self._parse_cmake_build_failure(ts_log)
182            self.errors.counters[reason].subcounters.add_counter(error_key, test_identifier)
183            ts_reason = reason
184        elif ts_reason.startswith('Build failure'):
185            reason = 'Build failure'
186            self.errors.add_counter(reason)
187            error_key = ts_reason.split(reason, 1)[-1].lstrip(' -')
188            if not error_key:
189                error_key = self._parse_build_failure(ts_log)
190            self.errors.counters[reason].subcounters.add_counter(error_key, test_identifier)
191            ts_reason = reason
192        else:
193            self.errors.add_counter(ts_reason)
194
195        # Process testcases
196        for tc in testsuite.get('testcases', []):
197            tc_reason = tc.get('reason')
198            tc_log = tc.get('log')
199            if tc_reason and tc_log:
200                self.errors.counters[ts_reason].subcounters.add_counter(tc_reason, test_identifier)
201
202        if not self.errors.counters[ts_reason].has_subcounters():
203            self.errors.counters[ts_reason].tests.append(test_identifier)
204
205    def _parse_cmake_build_failure(self, log: str) -> str | None:
206        last_warning = 'no warning found'
207        lines = log.splitlines()
208        for i, line in enumerate(lines):
209            if "warning: " in line:
210                last_warning = line
211            elif "devicetree error: " in line:
212                return "devicetree error"
213            elif "fatal error: " in line:
214                return line[line.index('fatal error: ') :].strip()
215            elif "error: " in line:  # error: Aborting due to Kconfig warnings
216                if "undefined symbol" in last_warning:
217                    return last_warning[last_warning.index('undefined symbol') :].strip()
218                return last_warning
219            elif "CMake Error at" in line:
220                for next_line in lines[i + 1 :]:
221                    if next_line.strip():
222                        return line + ' ' + next_line
223                return line
224        return "No matching CMake error pattern found"
225
226    def _parse_build_failure(self, log: str) -> str | None:
227        last_warning = ''
228        lines = log.splitlines()
229        for i, line in enumerate(lines):
230            if "undefined reference" in line:
231                return line[line.index('undefined reference') :].strip()
232            elif "error: ld returned" in line:
233                if last_warning:
234                    return last_warning
235                elif "overflowed by" in lines[i - 1]:
236                    return "ld.bfd: region overflowed"
237                elif "ld.bfd: warning: " in lines[i - 1]:
238                    return "ld.bfd:" + lines[i - 1].split("ld.bfd:", 1)[-1]
239                return line
240            elif "error: " in line:
241                return line[line.index('error: ') :].strip()
242            elif ": in function " in line:
243                last_warning = line[line.index('in function') :].strip()
244        return "No matching build error pattern found"
245
246    def sort_counters(self):
247        self.status.sort_by_quantity()
248        self.platforms.sort_by_quantity()
249        self.errors.sort_by_quantity()
250
251
252class TwisterReportsWithPatterns(TwisterReports):
253    def __init__(self, error_patterns_file):
254        super().__init__()
255        self.error_patterns = []
256        self.add_error_patterns(error_patterns_file)
257
258    def add_error_patterns(self, filename):
259        with open(filename) as f:
260            self.error_patterns = [
261                line
262                for line in f.read().splitlines()
263                if line.strip() and not line.strip().startswith('#')
264            ]
265        logger.info(f'Loaded {len(self.error_patterns)} error patterns from {filename}')
266
267    def parse_testsuite(self, testsuite):
268        ts_status = testsuite.get('status') or 'no status in testsuite'
269        if ts_status not in ('error', 'failed'):
270            return
271
272        ts_log = testsuite.get('log')
273        test_identifier = f'{testsuite.get("platform")}:{testsuite.get("name")}'
274        if key := self._parse_log_with_error_paterns(ts_log):
275            self.errors.add_counter(key, test_identifier)
276        # Process testcases
277        for tc in testsuite.get('testcases', []):
278            tc_log = tc.get('log')
279            if tc_log and (key := self._parse_log_with_error_paterns(tc_log)):
280                self.errors.add_counter(key, test_identifier)
281
282    def _parse_log_with_error_paterns(self, log: str) -> str | None:
283        for line in log.splitlines():
284            for error_pattern in self.error_patterns:
285                if error_pattern in line:
286                    logger.debug(f'Matched: {error_pattern} in {line}')
287                    return error_pattern
288        return None
289
290
291class EnhancedJSONEncoder(json.JSONEncoder):
292    def default(self, o):
293        if is_dataclass(o):
294            return asdict(o)
295        return super().default(o)
296
297
298def dump_to_json(filename, data):
299    with open(filename, 'w') as f:
300        json.dump(data, f, indent=4, cls=EnhancedJSONEncoder)
301    logger.info(f'Data saved to {filename}')
302
303
304def dump_to_csv(filename, data: Counters):
305    with open(filename, 'w', newline='') as csvfile:
306        csvwriter = csv.writer(csvfile)
307        # Write headers
308        csvwriter.writerow(['Depth', 'Counter', 'Key', 'Tests'])
309        # Write error data
310        for csv_data in data.get_next_entry():
311            csvwriter.writerow(csv_data)
312    logger.info(f'Data saved to {filename}')
313
314
315def dump_markdown_table(filename, data: Counters, max_depth=2):
316    with open(filename, 'w', newline='') as md:
317        for depth, quantity, key, _ in data.get_next_entry(max_depth=max_depth):
318            if depth == 0:
319                md.write('\n')
320            md.write(f'| {quantity:4} | {key} |\n')
321            if depth == 0:
322                md.write('|-------|------|\n')
323    logger.info(f'Markdown table saved to {filename}')
324
325
326def summary_with_most_common_errors(errors: Counters, limit: int = 15):
327    print('\nMost common errors summary:')
328    for key, value in errors.get_most_common(n=limit).items():
329        print(f'{value.quantity:4}    {key}')
330
331
332def main():
333    parser = create_parser()
334    args = parser.parse_args()
335
336    logging.basicConfig(level=args.log_level, format='%(levelname)-8s:  %(message)s')
337    logger = logging.getLogger()
338
339    if args.error_patterns:
340        reports = TwisterReportsWithPatterns(args.error_patterns)
341    else:
342        reports = TwisterReports()
343
344    for filename in args.inputs:
345        if os.path.exists(filename):
346            reports.parse_report(filename)
347        else:
348            logger.warning(f'File not found: {filename}')
349
350    reports.sort_counters()
351    dump_to_json(
352        args.output,
353        {'status': reports.status, 'platforms': reports.platforms, 'errors': reports.errors},
354    )
355
356    if args.status:
357        print('\nTestsuites and testcases status summary:')
358        reports.status.print_counters()
359
360    if not reports.errors.counters:
361        return
362
363    if args.platforms and reports.platforms.counters:
364        print('\nErrors per platform:')
365        reports.platforms.print_counters()
366
367    if args.long_summary:
368        print('\nErrors summary:')
369        reports.errors.print_counters()
370    else:
371        summary_with_most_common_errors(reports.errors)
372
373    if args.output_csv:
374        dump_to_csv(args.output_csv, reports.errors)
375    if args.output_md:
376        dump_markdown_table(args.output_md, reports.errors, max_depth=2)
377
378
379if __name__ == '__main__':
380    main()
381