1#!/usr/bin/env python3
2# vim: set syntax=python ts=4 :
3#
4# Copyright (c) 2018 Intel Corporation
5# SPDX-License-Identifier: Apache-2.0
6
7import os
8import json
9import logging
10from colorama import Fore
11import xml.etree.ElementTree as ET
12import string
13from datetime import datetime
14
15logger = logging.getLogger('twister')
16logger.setLevel(logging.DEBUG)
17
18class Reporting:
19
20    def __init__(self, plan, env) -> None:
21        self.plan = plan #FIXME
22        self.instances = plan.instances
23        self.platforms = plan.platforms
24        self.selected_platforms = plan.selected_platforms
25        self.filtered_platforms = plan.filtered_platforms
26        self.env = env
27        self.timestamp = datetime.now().isoformat()
28        self.outdir = os.path.abspath(env.options.outdir)
29
30    @staticmethod
31    def process_log(log_file):
32        filtered_string = ""
33        if os.path.exists(log_file):
34            with open(log_file, "rb") as f:
35                log = f.read().decode("utf-8")
36                filtered_string = ''.join(filter(lambda x: x in string.printable, log))
37
38        return filtered_string
39
40
41    @staticmethod
42    def xunit_testcase(eleTestsuite, name, classname, status, ts_status, reason, duration, runnable, stats, log, build_only_as_skip):
43        fails, passes, errors, skips = stats
44
45        if status in ['skipped', 'filtered']:
46            duration = 0
47
48        eleTestcase = ET.SubElement(
49            eleTestsuite, "testcase",
50            classname=classname,
51            name=f"{name}",
52            time=f"{duration}")
53
54        if status in ['skipped', 'filtered']:
55            skips += 1
56            # temporarily add build_only_as_skip to restore existing CI report behaviour
57            if ts_status == "passed" and not runnable:
58                tc_type = "build"
59            else:
60                tc_type = status
61            ET.SubElement(eleTestcase, 'skipped', type=f"{tc_type}", message=f"{reason}")
62        elif status in ["failed", "blocked"]:
63            fails += 1
64            el = ET.SubElement(eleTestcase, 'failure', type="failure", message=f"{reason}")
65            if log:
66                el.text = log
67        elif status == "error":
68            errors += 1
69            el = ET.SubElement(eleTestcase, 'error', type="failure", message=f"{reason}")
70            if log:
71                el.text = log
72        elif status == 'passed':
73            if not runnable and build_only_as_skip:
74                ET.SubElement(eleTestcase, 'skipped', type="build", message="built only")
75                skips += 1
76            else:
77                passes += 1
78        else:
79            if not status:
80                logger.debug(f"{name}: No status")
81                ET.SubElement(eleTestcase, 'skipped', type=f"untested", message="No results captured, testsuite misconfiguration?")
82            else:
83                logger.error(f"{name}: Unknown status '{status}'")
84
85        return (fails, passes, errors, skips)
86
87    # Generate a report with all testsuites instead of doing this per platform
88    def xunit_report_suites(self, json_file, filename):
89
90        json_data = {}
91        with open(json_file, "r") as json_results:
92            json_data = json.load(json_results)
93
94
95        env = json_data.get('environment', {})
96        version = env.get('zephyr_version', None)
97
98        eleTestsuites = ET.Element('testsuites')
99        all_suites = json_data.get("testsuites", [])
100
101        suites_to_report = all_suites
102            # do not create entry if everything is filtered out
103        if not self.env.options.detailed_skipped_report:
104            suites_to_report = list(filter(lambda d: d.get('status') != "filtered", all_suites))
105
106        for suite in suites_to_report:
107            duration = 0
108            eleTestsuite = ET.SubElement(eleTestsuites, 'testsuite',
109                                            name=suite.get("name"), time="0",
110                                            timestamp = self.timestamp,
111                                            tests="0",
112                                            failures="0",
113                                            errors="0", skipped="0")
114            eleTSPropetries = ET.SubElement(eleTestsuite, 'properties')
115            # Multiple 'property' can be added to 'properties'
116            # differing by name and value
117            ET.SubElement(eleTSPropetries, 'property', name="version", value=version)
118            ET.SubElement(eleTSPropetries, 'property', name="platform", value=suite.get("platform"))
119            ET.SubElement(eleTSPropetries, 'property', name="architecture", value=suite.get("arch"))
120
121            total = 0
122            fails = passes = errors = skips = 0
123            handler_time = suite.get('execution_time', 0)
124            runnable = suite.get('runnable', 0)
125            duration += float(handler_time)
126            ts_status = suite.get('status')
127            for tc in suite.get("testcases", []):
128                status = tc.get('status')
129                reason = tc.get('reason', suite.get('reason', 'Unknown'))
130                log = tc.get("log", suite.get("log"))
131
132                tc_duration = tc.get('execution_time', handler_time)
133                name = tc.get("identifier")
134                classname = ".".join(name.split(".")[:2])
135                fails, passes, errors, skips = self.xunit_testcase(eleTestsuite,
136                    name, classname, status, ts_status, reason, tc_duration, runnable,
137                    (fails, passes, errors, skips), log, True)
138
139            total = errors + passes + fails + skips
140
141            eleTestsuite.attrib['time'] = f"{duration}"
142            eleTestsuite.attrib['failures'] = f"{fails}"
143            eleTestsuite.attrib['errors'] = f"{errors}"
144            eleTestsuite.attrib['skipped'] = f"{skips}"
145            eleTestsuite.attrib['tests'] = f"{total}"
146
147        result = ET.tostring(eleTestsuites)
148        with open(filename, 'wb') as report:
149            report.write(result)
150
151    def xunit_report(self, json_file, filename, selected_platform=None, full_report=False):
152        if selected_platform:
153            selected = [selected_platform]
154            logger.info(f"Writing target report for {selected_platform}...")
155        else:
156            logger.info(f"Writing xunit report {filename}...")
157            selected = self.selected_platforms
158
159        json_data = {}
160        with open(json_file, "r") as json_results:
161            json_data = json.load(json_results)
162
163
164        env = json_data.get('environment', {})
165        version = env.get('zephyr_version', None)
166
167        eleTestsuites = ET.Element('testsuites')
168        all_suites = json_data.get("testsuites", [])
169
170        for platform in selected:
171            suites = list(filter(lambda d: d['platform'] == platform, all_suites))
172            # do not create entry if everything is filtered out
173            if not self.env.options.detailed_skipped_report:
174                non_filtered = list(filter(lambda d: d.get('status') != "filtered", suites))
175                if not non_filtered:
176                    continue
177
178            duration = 0
179            eleTestsuite = ET.SubElement(eleTestsuites, 'testsuite',
180                                            name=platform,
181                                            timestamp = self.timestamp,
182                                            time="0",
183                                            tests="0",
184                                            failures="0",
185                                            errors="0", skipped="0")
186            eleTSPropetries = ET.SubElement(eleTestsuite, 'properties')
187            # Multiple 'property' can be added to 'properties'
188            # differing by name and value
189            ET.SubElement(eleTSPropetries, 'property', name="version", value=version)
190
191            total = 0
192            fails = passes = errors = skips = 0
193            for ts in suites:
194                handler_time = ts.get('execution_time', 0)
195                runnable = ts.get('runnable', 0)
196                duration += float(handler_time)
197
198                ts_status = ts.get('status')
199                # Do not report filtered testcases
200                if ts_status == 'filtered' and not self.env.options.detailed_skipped_report:
201                    continue
202                if full_report:
203                    for tc in ts.get("testcases", []):
204                        status = tc.get('status')
205                        reason = tc.get('reason', ts.get('reason', 'Unknown'))
206                        log = tc.get("log", ts.get("log"))
207
208                        tc_duration = tc.get('execution_time', handler_time)
209                        name = tc.get("identifier")
210                        classname = ".".join(name.split(".")[:2])
211                        fails, passes, errors, skips = self.xunit_testcase(eleTestsuite,
212                            name, classname, status, ts_status, reason, tc_duration, runnable,
213                            (fails, passes, errors, skips), log, True)
214                else:
215                    reason = ts.get('reason', 'Unknown')
216                    name = ts.get("name")
217                    classname = f"{platform}:{name}"
218                    log = ts.get("log")
219                    fails, passes, errors, skips = self.xunit_testcase(eleTestsuite,
220                        name, classname, ts_status, ts_status, reason, duration, runnable,
221                        (fails, passes, errors, skips), log, False)
222
223            total = errors + passes + fails + skips
224
225            eleTestsuite.attrib['time'] = f"{duration}"
226            eleTestsuite.attrib['failures'] = f"{fails}"
227            eleTestsuite.attrib['errors'] = f"{errors}"
228            eleTestsuite.attrib['skipped'] = f"{skips}"
229            eleTestsuite.attrib['tests'] = f"{total}"
230
231        result = ET.tostring(eleTestsuites)
232        with open(filename, 'wb') as report:
233            report.write(result)
234
235    def json_report(self, filename, version="NA", platform=None):
236        logger.info(f"Writing JSON report {filename}")
237        report = {}
238        report["environment"] = {"os": os.name,
239                                 "zephyr_version": version,
240                                 "toolchain": self.env.toolchain,
241                                 "commit_date": self.env.commit_date,
242                                 "run_date": self.env.run_date
243                                 }
244        suites = []
245
246        for instance in self.instances.values():
247            if platform and platform != instance.platform.name:
248                continue
249            suite = {}
250            handler_log = os.path.join(instance.build_dir, "handler.log")
251            pytest_log = os.path.join(instance.build_dir, "twister_harness.log")
252            build_log = os.path.join(instance.build_dir, "build.log")
253            device_log = os.path.join(instance.build_dir, "device.log")
254
255            handler_time = instance.metrics.get('handler_time', 0)
256            used_ram = instance.metrics.get ("used_ram", 0)
257            used_rom  = instance.metrics.get("used_rom",0)
258            available_ram = instance.metrics.get("available_ram", 0)
259            available_rom = instance.metrics.get("available_rom", 0)
260            suite = {
261                "name": instance.testsuite.name,
262                "arch": instance.platform.arch,
263                "platform": instance.platform.name,
264                "path": instance.testsuite.source_dir_rel
265            }
266            if instance.run_id:
267                suite['run_id'] = instance.run_id
268
269            suite["runnable"] = False
270            if instance.status != 'filtered':
271                suite["runnable"] = instance.run
272
273            if used_ram:
274                suite["used_ram"] = used_ram
275            if used_rom:
276                suite["used_rom"] = used_rom
277
278            suite['retries'] = instance.retries
279
280            if instance.dut:
281                suite["dut"] = instance.dut
282            if available_ram:
283                suite["available_ram"] = available_ram
284            if available_rom:
285                suite["available_rom"] = available_rom
286            if instance.status in ["error", "failed"]:
287                suite['status'] = instance.status
288                suite["reason"] = instance.reason
289                # FIXME
290                if os.path.exists(pytest_log):
291                    suite["log"] = self.process_log(pytest_log)
292                elif os.path.exists(handler_log):
293                    suite["log"] = self.process_log(handler_log)
294                elif os.path.exists(device_log):
295                    suite["log"] = self.process_log(device_log)
296                else:
297                    suite["log"] = self.process_log(build_log)
298            elif instance.status == 'filtered':
299                suite["status"] = "filtered"
300                suite["reason"] = instance.reason
301            elif instance.status == 'passed':
302                suite["status"] = "passed"
303            elif instance.status == 'skipped':
304                suite["status"] = "skipped"
305                suite["reason"] = instance.reason
306
307            if instance.status is not None:
308                suite["execution_time"] =  f"{float(handler_time):.2f}"
309            suite["build_time"] =  f"{float(instance.build_time):.2f}"
310
311            testcases = []
312
313            if len(instance.testcases) == 1:
314                single_case_duration = f"{float(handler_time):.2f}"
315            else:
316                single_case_duration = 0
317
318            for case in instance.testcases:
319                # freeform was set when no sub testcases were parsed, however,
320                # if we discover those at runtime, the fallback testcase wont be
321                # needed anymore and can be removed from the output, it does
322                # not have a status and would otherwise be reported as skipped.
323                if case.freeform and case.status is None and len(instance.testcases) > 1:
324                    continue
325                testcase = {}
326                testcase['identifier'] = case.name
327                if instance.status:
328                    if single_case_duration:
329                        testcase['execution_time'] = single_case_duration
330                    else:
331                        testcase['execution_time'] = f"{float(case.duration):.2f}"
332
333                if case.output != "":
334                    testcase['log'] = case.output
335
336                if case.status == "skipped":
337                    if instance.status == "filtered":
338                        testcase["status"] = "filtered"
339                    else:
340                        testcase["status"] = "skipped"
341                        testcase["reason"] = case.reason or instance.reason
342                else:
343                    testcase["status"] = case.status
344                    if case.reason:
345                        testcase["reason"] = case.reason
346
347                testcases.append(testcase)
348
349            suite['testcases'] = testcases
350
351            if instance.recording is not None:
352                suite['recording'] = instance.recording
353
354            suites.append(suite)
355
356        report["testsuites"] = suites
357        with open(filename, "wt") as json_file:
358            json.dump(report, json_file, indent=4, separators=(',',':'))
359
360
361    def compare_metrics(self, filename):
362        # name, datatype, lower results better
363        interesting_metrics = [("used_ram", int, True),
364                               ("used_rom", int, True)]
365
366        if not os.path.exists(filename):
367            logger.error("Cannot compare metrics, %s not found" % filename)
368            return []
369
370        results = []
371        saved_metrics = {}
372        with open(filename) as fp:
373            jt = json.load(fp)
374            for ts in jt.get("testsuites", []):
375                d = {}
376                for m, _, _ in interesting_metrics:
377                    d[m] = ts.get(m, 0)
378                ts_name = ts.get('name')
379                ts_platform = ts.get('platform')
380                saved_metrics[(ts_name, ts_platform)] = d
381
382        for instance in self.instances.values():
383            mkey = (instance.testsuite.name, instance.platform.name)
384            if mkey not in saved_metrics:
385                continue
386            sm = saved_metrics[mkey]
387            for metric, mtype, lower_better in interesting_metrics:
388                if metric not in instance.metrics:
389                    continue
390                if sm[metric] == "":
391                    continue
392                delta = instance.metrics.get(metric, 0) - mtype(sm[metric])
393                if delta == 0:
394                    continue
395                results.append((instance, metric, instance.metrics.get(metric, 0), delta,
396                                lower_better))
397        return results
398
399    def footprint_reports(self, report, show_footprint, all_deltas,
400                          footprint_threshold, last_metrics):
401        if not report:
402            return
403
404        logger.debug("running footprint_reports")
405        deltas = self.compare_metrics(report)
406        warnings = 0
407        if deltas and show_footprint:
408            for i, metric, value, delta, lower_better in deltas:
409                if not all_deltas and ((delta < 0 and lower_better) or
410                                       (delta > 0 and not lower_better)):
411                    continue
412
413                percentage = 0
414                if value > delta:
415                    percentage = (float(delta) / float(value - delta))
416
417                if not all_deltas and (percentage < (footprint_threshold / 100.0)):
418                    continue
419
420                logger.info("{:<25} {:<60} {}{}{}: {} {:<+4}, is now {:6} {:+.2%}".format(
421                    i.platform.name, i.testsuite.name, Fore.YELLOW,
422                    "INFO" if all_deltas else "WARNING", Fore.RESET,
423                    metric, delta, value, percentage))
424                warnings += 1
425
426        if warnings:
427            logger.warning("Deltas based on metrics from last %s" %
428                           ("release" if not last_metrics else "run"))
429
430    def synopsis(self):
431        cnt = 0
432        example_instance = None
433        detailed_test_id = self.env.options.detailed_test_id
434        for instance in self.instances.values():
435            if instance.status not in ["passed", "filtered", "skipped"]:
436                cnt = cnt + 1
437                if cnt == 1:
438                    logger.info("-+" * 40)
439                    logger.info("The following issues were found (showing the top 10 items):")
440
441                logger.info(f"{cnt}) {instance.testsuite.name} on {instance.platform.name} {instance.status} ({instance.reason})")
442                example_instance = instance
443            if cnt == 10:
444                break
445
446        if cnt and example_instance:
447            logger.info("")
448            logger.info("To rerun the tests, call twister using the following commandline:")
449            extra_parameters = '' if detailed_test_id else ' --no-detailed-test-id'
450            logger.info(f"west twister -p <PLATFORM> -s <TEST ID>{extra_parameters}, for example:")
451            logger.info("")
452            logger.info(f"west twister -p {example_instance.platform.name} -s {example_instance.testsuite.name}"
453                        f"{extra_parameters}")
454            logger.info(f"or with west:")
455            logger.info(f"west build -p -b {example_instance.platform.name} "
456                        f"{example_instance.testsuite.source_dir_rel} -T {example_instance.testsuite.id}")
457            logger.info("-+" * 40)
458
459    def summary(self, results, unrecognized_sections, duration):
460        failed = 0
461        run = 0
462        for instance in self.instances.values():
463            if instance.status == "failed":
464                failed += 1
465            elif instance.metrics.get("unrecognized") and not unrecognized_sections:
466                logger.error("%sFAILED%s: %s has unrecognized binary sections: %s" %
467                             (Fore.RED, Fore.RESET, instance.name,
468                              str(instance.metrics.get("unrecognized", []))))
469                failed += 1
470
471            # FIXME: need a better way to identify executed tests
472            handler_time = instance.metrics.get('handler_time', 0)
473            if float(handler_time) > 0:
474                run += 1
475
476        if results.total and results.total != results.skipped_configs:
477            pass_rate = (float(results.passed) / float(results.total - results.skipped_configs))
478        else:
479            pass_rate = 0
480
481        logger.info(
482            "{}{} of {}{} test configurations passed ({:.2%}), {}{}{} failed, {}{}{} errored, {} skipped with {}{}{} warnings in {:.2f} seconds".format(
483                Fore.RED if failed else Fore.GREEN,
484                results.passed,
485                results.total,
486                Fore.RESET,
487                pass_rate,
488                Fore.RED if results.failed else Fore.RESET,
489                results.failed,
490                Fore.RESET,
491                Fore.RED if results.error else Fore.RESET,
492                results.error,
493                Fore.RESET,
494                results.skipped_configs,
495                Fore.YELLOW if self.plan.warnings else Fore.RESET,
496                self.plan.warnings,
497                Fore.RESET,
498                duration))
499
500        total_platforms = len(self.platforms)
501        # if we are only building, do not report about tests being executed.
502        if self.platforms and not self.env.options.build_only:
503            logger.info("In total {} test cases were executed, {} skipped on {} out of total {} platforms ({:02.2f}%)".format(
504                results.cases - results.skipped_cases,
505                results.skipped_cases,
506                len(self.filtered_platforms),
507                total_platforms,
508                (100 * len(self.filtered_platforms) / len(self.platforms))
509            ))
510
511        built_only = results.total - run - results.skipped_configs
512        logger.info(f"{Fore.GREEN}{run}{Fore.RESET} test configurations executed on platforms, \
513{Fore.RED}{built_only}{Fore.RESET} test configurations were only built.")
514
515    def save_reports(self, name, suffix, report_dir, no_update, platform_reports):
516        if not self.instances:
517            return
518
519        logger.info("Saving reports...")
520        if name:
521            report_name = name
522        else:
523            report_name = "twister"
524
525        if report_dir:
526            os.makedirs(report_dir, exist_ok=True)
527            filename = os.path.join(report_dir, report_name)
528            outdir = report_dir
529        else:
530            outdir = self.outdir
531            filename = os.path.join(outdir, report_name)
532
533        if suffix:
534            filename = "{}_{}".format(filename, suffix)
535
536        if not no_update:
537            json_file = filename + ".json"
538            self.json_report(json_file, version=self.env.version)
539            self.xunit_report(json_file, filename + ".xml", full_report=False)
540            self.xunit_report(json_file, filename + "_report.xml", full_report=True)
541            self.xunit_report_suites(json_file, filename + "_suite_report.xml")
542
543            if platform_reports:
544                self.target_report(json_file, outdir, suffix)
545
546
547    def target_report(self, json_file, outdir, suffix):
548        platforms = {inst.platform.name for _, inst in self.instances.items()}
549        for platform in platforms:
550            if suffix:
551                filename = os.path.join(outdir,"{}_{}.xml".format(platform, suffix))
552                json_platform_file = os.path.join(outdir,"{}_{}.json".format(platform, suffix))
553            else:
554                filename = os.path.join(outdir,"{}.xml".format(platform))
555                json_platform_file = os.path.join(outdir,"{}.json".format(platform))
556            self.xunit_report(json_file, filename, platform, full_report=True)
557            self.json_report(json_platform_file, version=self.env.version, platform=platform)
558