1#!/usr/bin/env python3
2# vim: set syntax=python ts=4 :
3#
4# Copyright (c) 2018 Intel Corporation
5# SPDX-License-Identifier: Apache-2.0
6
7import os
8import json
9import logging
10from colorama import Fore
11import xml.etree.ElementTree as ET
12import string
13from datetime import datetime
14from pathlib import PosixPath
15
16logger = logging.getLogger('twister')
17logger.setLevel(logging.DEBUG)
18
19class Reporting:
20
21    json_filters = {
22        'twister.json': {
23            'deny_suite': ['footprint']
24        },
25        'footprint.json': {
26            'deny_status': ['filtered'],
27            'deny_suite': ['testcases', 'execution_time', 'recording', 'retries', 'runnable']
28        }
29    }
30
31    def __init__(self, plan, env) -> None:
32        self.plan = plan #FIXME
33        self.instances = plan.instances
34        self.platforms = plan.platforms
35        self.selected_platforms = plan.selected_platforms
36        self.filtered_platforms = plan.filtered_platforms
37        self.env = env
38        self.timestamp = datetime.now().isoformat()
39        self.outdir = os.path.abspath(env.options.outdir)
40        self.instance_fail_count = plan.instance_fail_count
41        self.footprint = None
42
43    @staticmethod
44    def process_log(log_file):
45        filtered_string = ""
46        if os.path.exists(log_file):
47            with open(log_file, "rb") as f:
48                log = f.read().decode("utf-8")
49                filtered_string = ''.join(filter(lambda x: x in string.printable, log))
50
51        return filtered_string
52
53
54    @staticmethod
55    def xunit_testcase(eleTestsuite, name, classname, status, ts_status, reason, duration, runnable, stats, log, build_only_as_skip):
56        fails, passes, errors, skips = stats
57
58        if status in ['skipped', 'filtered']:
59            duration = 0
60
61        eleTestcase = ET.SubElement(
62            eleTestsuite, "testcase",
63            classname=classname,
64            name=f"{name}",
65            time=f"{duration}")
66
67        if status in ['skipped', 'filtered']:
68            skips += 1
69            # temporarily add build_only_as_skip to restore existing CI report behaviour
70            if ts_status == "passed" and not runnable:
71                tc_type = "build"
72            else:
73                tc_type = status
74            ET.SubElement(eleTestcase, 'skipped', type=f"{tc_type}", message=f"{reason}")
75        elif status in ["failed", "blocked"]:
76            fails += 1
77            el = ET.SubElement(eleTestcase, 'failure', type="failure", message=f"{reason}")
78            if log:
79                el.text = log
80        elif status == "error":
81            errors += 1
82            el = ET.SubElement(eleTestcase, 'error', type="failure", message=f"{reason}")
83            if log:
84                el.text = log
85        elif status == 'passed':
86            if not runnable and build_only_as_skip:
87                ET.SubElement(eleTestcase, 'skipped', type="build", message="built only")
88                skips += 1
89            else:
90                passes += 1
91        else:
92            if not status:
93                logger.debug(f"{name}: No status")
94                ET.SubElement(eleTestcase, 'skipped', type=f"untested", message="No results captured, testsuite misconfiguration?")
95            else:
96                logger.error(f"{name}: Unknown status '{status}'")
97
98        return (fails, passes, errors, skips)
99
100    # Generate a report with all testsuites instead of doing this per platform
101    def xunit_report_suites(self, json_file, filename):
102
103        json_data = {}
104        with open(json_file, "r") as json_results:
105            json_data = json.load(json_results)
106
107
108        env = json_data.get('environment', {})
109        version = env.get('zephyr_version', None)
110
111        eleTestsuites = ET.Element('testsuites')
112        all_suites = json_data.get("testsuites", [])
113
114        suites_to_report = all_suites
115            # do not create entry if everything is filtered out
116        if not self.env.options.detailed_skipped_report:
117            suites_to_report = list(filter(lambda d: d.get('status') != "filtered", all_suites))
118
119        for suite in suites_to_report:
120            duration = 0
121            eleTestsuite = ET.SubElement(eleTestsuites, 'testsuite',
122                                            name=suite.get("name"), time="0",
123                                            timestamp = self.timestamp,
124                                            tests="0",
125                                            failures="0",
126                                            errors="0", skipped="0")
127            eleTSPropetries = ET.SubElement(eleTestsuite, 'properties')
128            # Multiple 'property' can be added to 'properties'
129            # differing by name and value
130            ET.SubElement(eleTSPropetries, 'property', name="version", value=version)
131            ET.SubElement(eleTSPropetries, 'property', name="platform", value=suite.get("platform"))
132            ET.SubElement(eleTSPropetries, 'property', name="architecture", value=suite.get("arch"))
133
134            total = 0
135            fails = passes = errors = skips = 0
136            handler_time = suite.get('execution_time', 0)
137            runnable = suite.get('runnable', 0)
138            duration += float(handler_time)
139            ts_status = suite.get('status')
140            for tc in suite.get("testcases", []):
141                status = tc.get('status')
142                reason = tc.get('reason', suite.get('reason', 'Unknown'))
143                log = tc.get("log", suite.get("log"))
144
145                tc_duration = tc.get('execution_time', handler_time)
146                name = tc.get("identifier")
147                classname = ".".join(name.split(".")[:2])
148                fails, passes, errors, skips = self.xunit_testcase(eleTestsuite,
149                    name, classname, status, ts_status, reason, tc_duration, runnable,
150                    (fails, passes, errors, skips), log, True)
151
152            total = errors + passes + fails + skips
153
154            eleTestsuite.attrib['time'] = f"{duration}"
155            eleTestsuite.attrib['failures'] = f"{fails}"
156            eleTestsuite.attrib['errors'] = f"{errors}"
157            eleTestsuite.attrib['skipped'] = f"{skips}"
158            eleTestsuite.attrib['tests'] = f"{total}"
159
160        result = ET.tostring(eleTestsuites)
161        with open(filename, 'wb') as report:
162            report.write(result)
163
164    def xunit_report(self, json_file, filename, selected_platform=None, full_report=False):
165        if selected_platform:
166            selected = [selected_platform]
167            logger.info(f"Writing target report for {selected_platform}...")
168        else:
169            logger.info(f"Writing xunit report {filename}...")
170            selected = self.selected_platforms
171
172        json_data = {}
173        with open(json_file, "r") as json_results:
174            json_data = json.load(json_results)
175
176
177        env = json_data.get('environment', {})
178        version = env.get('zephyr_version', None)
179
180        eleTestsuites = ET.Element('testsuites')
181        all_suites = json_data.get("testsuites", [])
182
183        for platform in selected:
184            suites = list(filter(lambda d: d['platform'] == platform, all_suites))
185            # do not create entry if everything is filtered out
186            if not self.env.options.detailed_skipped_report:
187                non_filtered = list(filter(lambda d: d.get('status') != "filtered", suites))
188                if not non_filtered:
189                    continue
190
191            duration = 0
192            eleTestsuite = ET.SubElement(eleTestsuites, 'testsuite',
193                                            name=platform,
194                                            timestamp = self.timestamp,
195                                            time="0",
196                                            tests="0",
197                                            failures="0",
198                                            errors="0", skipped="0")
199            eleTSPropetries = ET.SubElement(eleTestsuite, 'properties')
200            # Multiple 'property' can be added to 'properties'
201            # differing by name and value
202            ET.SubElement(eleTSPropetries, 'property', name="version", value=version)
203
204            total = 0
205            fails = passes = errors = skips = 0
206            for ts in suites:
207                handler_time = ts.get('execution_time', 0)
208                runnable = ts.get('runnable', 0)
209                duration += float(handler_time)
210
211                ts_status = ts.get('status')
212                # Do not report filtered testcases
213                if ts_status == 'filtered' and not self.env.options.detailed_skipped_report:
214                    continue
215                if full_report:
216                    for tc in ts.get("testcases", []):
217                        status = tc.get('status')
218                        reason = tc.get('reason', ts.get('reason', 'Unknown'))
219                        log = tc.get("log", ts.get("log"))
220
221                        tc_duration = tc.get('execution_time', handler_time)
222                        name = tc.get("identifier")
223                        classname = ".".join(name.split(".")[:2])
224                        fails, passes, errors, skips = self.xunit_testcase(eleTestsuite,
225                            name, classname, status, ts_status, reason, tc_duration, runnable,
226                            (fails, passes, errors, skips), log, True)
227                else:
228                    reason = ts.get('reason', 'Unknown')
229                    name = ts.get("name")
230                    classname = f"{platform}:{name}"
231                    log = ts.get("log")
232                    fails, passes, errors, skips = self.xunit_testcase(eleTestsuite,
233                        name, classname, ts_status, ts_status, reason, duration, runnable,
234                        (fails, passes, errors, skips), log, False)
235
236            total = errors + passes + fails + skips
237
238            eleTestsuite.attrib['time'] = f"{duration}"
239            eleTestsuite.attrib['failures'] = f"{fails}"
240            eleTestsuite.attrib['errors'] = f"{errors}"
241            eleTestsuite.attrib['skipped'] = f"{skips}"
242            eleTestsuite.attrib['tests'] = f"{total}"
243
244        result = ET.tostring(eleTestsuites)
245        with open(filename, 'wb') as report:
246            report.write(result)
247
248    def json_report(self, filename, version="NA", platform=None, filters=None):
249        logger.info(f"Writing JSON report {filename}")
250
251        if self.env.options.report_all_options:
252            report_options = vars(self.env.options)
253        else:
254            report_options = self.env.non_default_options()
255
256        # Resolve known JSON serialization problems.
257        for k,v in report_options.items():
258            report_options[k] = str(v) if type(v) in [PosixPath] else v
259
260        report = {}
261        report["environment"] = {"os": os.name,
262                                 "zephyr_version": version,
263                                 "toolchain": self.env.toolchain,
264                                 "commit_date": self.env.commit_date,
265                                 "run_date": self.env.run_date,
266                                 "options": report_options
267                                 }
268        suites = []
269
270        for instance in self.instances.values():
271            if platform and platform != instance.platform.name:
272                continue
273            if instance.status == "filtered" and not self.env.options.report_filtered:
274                continue
275            if (filters and 'allow_status' in filters and instance.status not in filters['allow_status']):
276                logger.debug(f"Skip test suite '{instance.testsuite.name}' status '{instance.status}' "
277                             f"not allowed for {filename}")
278                continue
279            if (filters and 'deny_status' in filters and instance.status in filters['deny_status']):
280                logger.debug(f"Skip test suite '{instance.testsuite.name}' status '{instance.status}' "
281                             f"denied for {filename}")
282                continue
283            suite = {}
284            handler_log = os.path.join(instance.build_dir, "handler.log")
285            pytest_log = os.path.join(instance.build_dir, "twister_harness.log")
286            build_log = os.path.join(instance.build_dir, "build.log")
287            device_log = os.path.join(instance.build_dir, "device.log")
288
289            handler_time = instance.metrics.get('handler_time', 0)
290            used_ram = instance.metrics.get ("used_ram", 0)
291            used_rom  = instance.metrics.get("used_rom",0)
292            available_ram = instance.metrics.get("available_ram", 0)
293            available_rom = instance.metrics.get("available_rom", 0)
294            suite = {
295                "name": instance.testsuite.name,
296                "arch": instance.platform.arch,
297                "platform": instance.platform.name,
298                "path": instance.testsuite.source_dir_rel
299            }
300            if instance.run_id:
301                suite['run_id'] = instance.run_id
302
303            suite["runnable"] = False
304            if instance.status != 'filtered':
305                suite["runnable"] = instance.run
306
307            if used_ram:
308                suite["used_ram"] = used_ram
309            if used_rom:
310                suite["used_rom"] = used_rom
311
312            suite['retries'] = instance.retries
313
314            if instance.dut:
315                suite["dut"] = instance.dut
316            if available_ram:
317                suite["available_ram"] = available_ram
318            if available_rom:
319                suite["available_rom"] = available_rom
320            if instance.status in ["error", "failed"]:
321                suite['status'] = instance.status
322                suite["reason"] = instance.reason
323                # FIXME
324                if os.path.exists(pytest_log):
325                    suite["log"] = self.process_log(pytest_log)
326                elif os.path.exists(handler_log):
327                    suite["log"] = self.process_log(handler_log)
328                elif os.path.exists(device_log):
329                    suite["log"] = self.process_log(device_log)
330                else:
331                    suite["log"] = self.process_log(build_log)
332            elif instance.status == 'filtered':
333                suite["status"] = "filtered"
334                suite["reason"] = instance.reason
335            elif instance.status == 'passed':
336                suite["status"] = "passed"
337            elif instance.status == 'skipped':
338                suite["status"] = "skipped"
339                suite["reason"] = instance.reason
340
341            if instance.status is not None:
342                suite["execution_time"] =  f"{float(handler_time):.2f}"
343            suite["build_time"] =  f"{float(instance.build_time):.2f}"
344
345            testcases = []
346
347            if len(instance.testcases) == 1:
348                single_case_duration = f"{float(handler_time):.2f}"
349            else:
350                single_case_duration = 0
351
352            for case in instance.testcases:
353                # freeform was set when no sub testcases were parsed, however,
354                # if we discover those at runtime, the fallback testcase wont be
355                # needed anymore and can be removed from the output, it does
356                # not have a status and would otherwise be reported as skipped.
357                if case.freeform and case.status is None and len(instance.testcases) > 1:
358                    continue
359                testcase = {}
360                testcase['identifier'] = case.name
361                if instance.status:
362                    if single_case_duration:
363                        testcase['execution_time'] = single_case_duration
364                    else:
365                        testcase['execution_time'] = f"{float(case.duration):.2f}"
366
367                if case.output != "":
368                    testcase['log'] = case.output
369
370                if case.status == "skipped":
371                    if instance.status == "filtered":
372                        testcase["status"] = "filtered"
373                    else:
374                        testcase["status"] = "skipped"
375                        testcase["reason"] = case.reason or instance.reason
376                else:
377                    testcase["status"] = case.status
378                    if case.reason:
379                        testcase["reason"] = case.reason
380
381                testcases.append(testcase)
382
383            suite['testcases'] = testcases
384
385            if instance.recording is not None:
386                suite['recording'] = instance.recording
387
388            if (instance.status
389                    and instance.status not in ["error", "filtered"]
390                    and self.env.options.create_rom_ram_report
391                    and self.env.options.footprint_report is not None):
392                # Init as empty data preparing for filtering properties.
393                suite['footprint'] = {}
394
395            # Pass suite properties through the context filters.
396            if filters and 'allow_suite' in filters:
397                suite = {k:v for k,v in suite.items() if k in filters['allow_suite']}
398
399            if filters and 'deny_suite' in filters:
400                suite = {k:v for k,v in suite.items() if k not in filters['deny_suite']}
401
402            # Compose external data only to these properties which pass filtering.
403            if 'footprint' in suite:
404                do_all = 'all' in self.env.options.footprint_report
405                footprint_files = { 'ROM': 'rom.json', 'RAM': 'ram.json' }
406                for k,v in footprint_files.items():
407                    if do_all or k in self.env.options.footprint_report:
408                        footprint_fname = os.path.join(instance.build_dir, v)
409                        try:
410                            with open(footprint_fname, "rt") as footprint_json:
411                                logger.debug(f"Collect footprint.{k} for '{instance.name}'")
412                                suite['footprint'][k] = json.load(footprint_json)
413                        except FileNotFoundError:
414                            logger.error(f"Missing footprint.{k} for '{instance.name}'")
415                #
416            #
417
418            suites.append(suite)
419
420        report["testsuites"] = suites
421        with open(filename, "wt") as json_file:
422            json.dump(report, json_file, indent=4, separators=(',',':'))
423
424
425    def compare_metrics(self, filename):
426        # name, datatype, lower results better
427        interesting_metrics = [("used_ram", int, True),
428                               ("used_rom", int, True)]
429
430        if not os.path.exists(filename):
431            logger.error("Cannot compare metrics, %s not found" % filename)
432            return []
433
434        results = []
435        saved_metrics = {}
436        with open(filename) as fp:
437            jt = json.load(fp)
438            for ts in jt.get("testsuites", []):
439                d = {}
440                for m, _, _ in interesting_metrics:
441                    d[m] = ts.get(m, 0)
442                ts_name = ts.get('name')
443                ts_platform = ts.get('platform')
444                saved_metrics[(ts_name, ts_platform)] = d
445
446        for instance in self.instances.values():
447            mkey = (instance.testsuite.name, instance.platform.name)
448            if mkey not in saved_metrics:
449                continue
450            sm = saved_metrics[mkey]
451            for metric, mtype, lower_better in interesting_metrics:
452                if metric not in instance.metrics:
453                    continue
454                if sm[metric] == "":
455                    continue
456                delta = instance.metrics.get(metric, 0) - mtype(sm[metric])
457                if delta == 0:
458                    continue
459                results.append((instance, metric, instance.metrics.get(metric, 0), delta,
460                                lower_better))
461        return results
462
463    def footprint_reports(self, report, show_footprint, all_deltas,
464                          footprint_threshold, last_metrics):
465        if not report:
466            return
467
468        logger.debug("running footprint_reports")
469        deltas = self.compare_metrics(report)
470        warnings = 0
471        if deltas:
472            for i, metric, value, delta, lower_better in deltas:
473                if not all_deltas and ((delta < 0 and lower_better) or
474                                       (delta > 0 and not lower_better)):
475                    continue
476
477                percentage = 0
478                if value > delta:
479                    percentage = (float(delta) / float(value - delta))
480
481                if not all_deltas and (percentage < (footprint_threshold / 100.0)):
482                    continue
483
484                if show_footprint:
485                    logger.log(
486                        logging.INFO if all_deltas else logging.WARNING,
487                        "{:<25} {:<60} {} {:<+4}, is now {:6} {:+.2%}".format(
488                        i.platform.name, i.testsuite.name,
489                        metric, delta, value, percentage))
490
491                warnings += 1
492
493        if warnings:
494            logger.warning("Found {} footprint deltas to {} as a baseline.".format(
495                           warnings,
496                           (report if not last_metrics else "the last twister run.")))
497
498    def synopsis(self):
499        if self.env.options.report_summary == 0:
500            count = self.instance_fail_count
501            log_txt = f"The following issues were found (showing the all {count} items):"
502        elif self.env.options.report_summary:
503            count = self.env.options.report_summary
504            log_txt = f"The following issues were found "
505            if count > self.instance_fail_count:
506                log_txt += f"(presenting {self.instance_fail_count} out of the {count} items requested):"
507            else:
508                log_txt += f"(showing the {count} of {self.instance_fail_count} items):"
509        else:
510            count = 10
511            log_txt = f"The following issues were found (showing the top {count} items):"
512        cnt = 0
513        example_instance = None
514        detailed_test_id = self.env.options.detailed_test_id
515        for instance in self.instances.values():
516            if instance.status not in ["passed", "filtered", "skipped"]:
517                cnt += 1
518                if cnt == 1:
519                    logger.info("-+" * 40)
520                    logger.info(log_txt)
521
522                logger.info(f"{cnt}) {instance.testsuite.name} on {instance.platform.name} {instance.status} ({instance.reason})")
523                example_instance = instance
524            if cnt == count:
525                break
526        if cnt == 0 and self.env.options.report_summary is not None:
527            logger.info("-+" * 40)
528            logger.info(f"No errors/fails found")
529
530        if cnt and example_instance:
531            logger.info("")
532            logger.info("To rerun the tests, call twister using the following commandline:")
533            extra_parameters = '' if detailed_test_id else ' --no-detailed-test-id'
534            logger.info(f"west twister -p <PLATFORM> -s <TEST ID>{extra_parameters}, for example:")
535            logger.info("")
536            logger.info(f"west twister -p {example_instance.platform.name} -s {example_instance.testsuite.name}"
537                        f"{extra_parameters}")
538            logger.info(f"or with west:")
539            logger.info(f"west build -p -b {example_instance.platform.name} "
540                        f"{example_instance.testsuite.source_dir_rel} -T {example_instance.testsuite.id}")
541            logger.info("-+" * 40)
542
543    def summary(self, results, ignore_unrecognized_sections, duration):
544        failed = 0
545        run = 0
546        for instance in self.instances.values():
547            if instance.status == "failed":
548                failed += 1
549            elif not ignore_unrecognized_sections and instance.metrics.get("unrecognized"):
550                logger.error("%sFAILED%s: %s has unrecognized binary sections: %s" %
551                             (Fore.RED, Fore.RESET, instance.name,
552                              str(instance.metrics.get("unrecognized", []))))
553                failed += 1
554
555            # FIXME: need a better way to identify executed tests
556            handler_time = instance.metrics.get('handler_time', 0)
557            if float(handler_time) > 0:
558                run += 1
559
560        if results.total and results.total != results.skipped_configs:
561            pass_rate = (float(results.passed) / float(results.total - results.skipped_configs))
562        else:
563            pass_rate = 0
564
565        logger.info(
566            "{}{} of {}{} test configurations passed ({:.2%}), {}{}{} failed, {}{}{} errored, {} skipped with {}{}{} warnings in {:.2f} seconds".format(
567                Fore.RED if failed else Fore.GREEN,
568                results.passed,
569                results.total,
570                Fore.RESET,
571                pass_rate,
572                Fore.RED if results.failed else Fore.RESET,
573                results.failed,
574                Fore.RESET,
575                Fore.RED if results.error else Fore.RESET,
576                results.error,
577                Fore.RESET,
578                results.skipped_configs,
579                Fore.YELLOW if self.plan.warnings else Fore.RESET,
580                self.plan.warnings,
581                Fore.RESET,
582                duration))
583
584        total_platforms = len(self.platforms)
585        # if we are only building, do not report about tests being executed.
586        if self.platforms and not self.env.options.build_only:
587            logger.info("In total {} test cases were executed, {} skipped on {} out of total {} platforms ({:02.2f}%)".format(
588                results.cases - results.skipped_cases,
589                results.skipped_cases,
590                len(self.filtered_platforms),
591                total_platforms,
592                (100 * len(self.filtered_platforms) / len(self.platforms))
593            ))
594
595        built_only = results.total - run - results.skipped_configs
596        logger.info(f"{Fore.GREEN}{run}{Fore.RESET} test configurations executed on platforms, \
597{Fore.RED}{built_only}{Fore.RESET} test configurations were only built.")
598
599    def save_reports(self, name, suffix, report_dir, no_update, platform_reports):
600        if not self.instances:
601            return
602
603        logger.info("Saving reports...")
604        if name:
605            report_name = name
606        else:
607            report_name = "twister"
608
609        if report_dir:
610            os.makedirs(report_dir, exist_ok=True)
611            filename = os.path.join(report_dir, report_name)
612            outdir = report_dir
613        else:
614            outdir = self.outdir
615            filename = os.path.join(outdir, report_name)
616
617        if suffix:
618            filename = "{}_{}".format(filename, suffix)
619
620        if not no_update:
621            json_file = filename + ".json"
622            self.json_report(json_file, version=self.env.version,
623                             filters=self.json_filters['twister.json'])
624            if self.env.options.footprint_report is not None:
625                self.json_report(filename + "_footprint.json", version=self.env.version,
626                                 filters=self.json_filters['footprint.json'])
627            self.xunit_report(json_file, filename + ".xml", full_report=False)
628            self.xunit_report(json_file, filename + "_report.xml", full_report=True)
629            self.xunit_report_suites(json_file, filename + "_suite_report.xml")
630
631            if platform_reports:
632                self.target_report(json_file, outdir, suffix)
633
634
635    def target_report(self, json_file, outdir, suffix):
636        platforms = {repr(inst.platform):inst.platform for _, inst in self.instances.items()}
637        for platform in platforms.values():
638            if suffix:
639                filename = os.path.join(outdir,"{}_{}.xml".format(platform.normalized_name, suffix))
640                json_platform_file = os.path.join(outdir,"{}_{}".format(platform.normalized_name, suffix))
641            else:
642                filename = os.path.join(outdir,"{}.xml".format(platform.normalized_name))
643                json_platform_file = os.path.join(outdir, platform.normalized_name)
644            self.xunit_report(json_file, filename, platform.name, full_report=True)
645            self.json_report(json_platform_file + ".json",
646                             version=self.env.version, platform=platform.name,
647                             filters=self.json_filters['twister.json'])
648            if self.env.options.footprint_report is not None:
649                self.json_report(json_platform_file + "_footprint.json",
650                                 version=self.env.version, platform=platform.name,
651                                 filters=self.json_filters['footprint.json'])
652