1#!/usr/bin/env python3
2#
3# Copyright (c) 2024 Intel Corporation
4#
5# SPDX-License-Identifier: Apache-2.0
6
7"""
8This script converts memory footprint data prepared by `./footprint/scripts/track.py`
9into a JSON files compatible with Twister report schema making them ready for upload
10to the same ElasticSearch data storage together with other Twister reports
11for analysis, visualization, etc.
12
13The memory footprint input data files (rom.json, ram.json) are expected in directories
14sturctured as 'ZEPHYR_VERSION/APPLICATION/FEATURE/BOARD' under the input path(s).
15The BOARD name itself can be in HWMv2 format as 'BOARD/SOC' or 'BOARD/SOC/VARIANT'
16with the corresponding sub-directories.
17
18For example, an input path `./**/*v3.6.0-rc3-*/footprints/**/frdm_k64f/` will be
19expanded by bash to all sub-directories with the 'footprints' data `v3.6.0-rc3`
20release commits collected for `frdm_k64f` board.
21Note: for the above example to work the bash recursive globbing should be active:
22`shopt -s globstar`.
23
24The output `twister_footprint.json` files will be placed into the same directories
25as the corresponding input files.
26
27In Twister report a test instance has either long or short name, each needs test
28suite name from the test configuration yaml file.
29This scripts has `--test-name` parameter to customize how to compose test names
30from the plan.txt columns including an additional (last) one whth explicit
31test suite name ('dot separated' format).
32"""
33
34from __future__ import annotations
35
36from datetime import datetime, timezone
37import argparse
38import os
39import sys
40import re
41import csv
42import logging
43import json
44from git import Repo
45from git.exc import BadName
46
47
48VERSION_COMMIT_RE = re.compile(r".*-g([a-f0-9]{12})$")
49PLAN_HEADERS = ['name', 'feature', 'board', 'application', 'options', 'suite_name']
50TESTSUITE_FILENAME = { 'tests': 'testcase.yaml', 'samples': 'sample.yaml' }
51FOOTPRINT_FILES = { 'ROM': 'rom.json', 'RAM': 'ram.json' }
52RESULT_FILENAME = 'twister_footprint.json'
53HWMv2_LEVELS = 3
54
55logger = None
56LOG_LEVELS = {
57       'DEBUG': (logging.DEBUG, 3),
58       'INFO': (logging.INFO, 2),
59       'WARNING': (logging.WARNING, 1),
60       'ERROR': (logging.ERROR, 0)
61     }
62
63
64def init_logs(logger_name=''):
65    global logger
66
67    log_level = os.environ.get('LOG_LEVEL', 'ERROR')
68    log_level = LOG_LEVELS[log_level][0] if log_level in LOG_LEVELS else logging.ERROR
69
70    console = logging.StreamHandler(sys.stdout)
71    console.setFormatter(logging.Formatter('%(asctime)s - %(levelname)-8s - %(message)s'))
72
73    logger = logging.getLogger(logger_name)
74    logger.setLevel(log_level)
75    logger.addHandler(console)
76
77def set_verbose(verbose: int):
78    levels = { lvl[1]: lvl[0] for lvl in LOG_LEVELS.values() }
79    if verbose > len(levels):
80        verbose = len(levels)
81    if verbose <= 0:
82        verbose = 0
83    logger.setLevel(levels[verbose])
84
85
86def parse_args():
87    parser = argparse.ArgumentParser(allow_abbrev=False,
88        formatter_class=argparse.RawDescriptionHelpFormatter,
89        description=__doc__)
90
91    parser.add_argument('input_paths', metavar='INPUT_PATHS', nargs='+',
92        help="Directories with the memory footprint data to convert. "
93             "Each directory must have 'ZEPHYR_VERSION/APPLICATION/FEATURE/BOARD' path structure.")
94
95    parser.add_argument('-p', '--plan', metavar='PLAN_FILE_CSV', required=True,
96        help="An execution plan (CSV file) with details of what footprint applications "
97             "and platforms were chosen to generate the input data. "
98             "It is also applied to filter input directories and check their names.")
99
100    parser.add_argument('-o', '--output-fname', metavar='OUTPUT_FNAME', required=False,
101        default=RESULT_FILENAME,
102        help="Destination JSON file name to create at each of INPUT_PATHS. "
103             "Default: '%(default)s'")
104
105    parser.add_argument('-z', '--zephyr_base', metavar='ZEPHYR_BASE', required=False,
106        default = os.environ.get('ZEPHYR_BASE'),
107        help="Zephyr code base path to use instead of the current ZEPHYR_BASE environment variable. "
108             "The script needs Zephyr repository there to read SHA and commit time of builds. "
109             "Current default: '%(default)s'")
110
111    parser.add_argument("--test-name",
112        choices=['application/suite_name', 'suite_name', 'application', 'name.feature'],
113        default='name.feature',
114        help="How to compose Twister test instance names using plan.txt columns. "
115             "Default: '%(default)s'" )
116
117    parser.add_argument("--no-testsuite-check",
118        dest='testsuite_check', action="store_false",
119        help="Don't check for applications' testsuite configs in ZEPHYR_BASE.")
120
121    parser.add_argument('-v', '--verbose', required=False, action='count', default=0,
122        help="Increase the logging level for each occurrence. Default level: ERROR")
123
124    return parser.parse_args()
125
126
127def read_plan(fname: str) -> list[dict]:
128    plan = []
129    with open(fname) as plan_file:
130        plan_rows = csv.reader(plan_file)
131        plan_vals = [ dict(zip(PLAN_HEADERS, row)) for row in plan_rows ]
132        plan = { f"{p['name']}/{p['feature']}/{p['board']}" : p for p in plan_vals }
133    return plan
134
135
136def get_id_from_path(plan, in_path, max_levels=HWMv2_LEVELS):
137    data_id = {}
138    (in_path, data_id['board']) = os.path.split(in_path)
139    if not data_id['board']:
140        # trailing '/'
141        (in_path, data_id['board']) = os.path.split(in_path)
142
143    for _ in range(max_levels):
144        (in_path, data_id['feature']) = os.path.split(in_path)
145        (c_head, data_id['app']) = os.path.split(in_path)
146        (c_head, data_id['version']) = os.path.split(c_head)
147        if not all(data_id.values()):
148            # incorrect plan id
149            return None
150        if f"{data_id['app']}/{data_id['feature']}/{data_id['board']}" in plan:
151            return data_id
152        else:
153            # try with HWMv2 board name one more level deep
154            data_id['board'] = f"{data_id['feature']}/{data_id['board']}"
155
156    # not found
157    return {}
158
159
160def main():
161    errors = 0
162    converted = 0
163    skipped = 0
164    filtered = 0
165
166    run_date = datetime.now(timezone.utc).isoformat(timespec='seconds')
167
168    init_logs()
169
170    args = parse_args()
171
172    set_verbose(args.verbose)
173
174    if not args.zephyr_base:
175        logging.error("ZEPHYR_BASE is not defined.")
176        sys.exit(1)
177
178    zephyr_base = os.path.abspath(args.zephyr_base)
179    zephyr_base_repo = Repo(zephyr_base)
180
181    logging.info(f"scanning {len(args.input_paths)} directories ...")
182
183    logging.info(f"use plan '{args.plan}'")
184    plan = read_plan(args.plan)
185
186    test_name_sep = '/' if '/' in args.test_name else '.'
187    test_name_parts = args.test_name.split(test_name_sep)
188
189    for report_path in args.input_paths:
190        logging.info(f"convert {report_path}")
191        # print(p)
192        p_head = os.path.normcase(report_path)
193        p_head = os.path.normpath(p_head)
194        if not os.path.isdir(p_head):
195            logging.error(f"not a directory '{p_head}'")
196            errors += 1
197            continue
198
199        data_id = get_id_from_path(plan, p_head)
200        if data_id is None:
201            logging.warning(f"skipped '{report_path}' - not a correct report directory")
202            skipped += 1
203            continue
204        elif not data_id:
205            logging.info(f"filtered '{report_path}' - not in the plan")
206            filtered += 1
207            continue
208
209        r_plan = f"{data_id['app']}/{data_id['feature']}/{data_id['board']}"
210
211        if 'suite_name' in test_name_parts and 'suite_name' not in plan[r_plan]:
212            logging.info(f"filtered '{report_path}' - no Twister suite name in the plan.")
213            filtered += 1
214            continue
215
216        suite_name = test_name_sep.join([plan[r_plan][n] if n in plan[r_plan] else '' for n in test_name_parts])
217
218        # Just some sanity checks of the 'application' in the current ZEPHYR_BASE
219        if args.testsuite_check:
220            suite_type = plan[r_plan]['application'].split('/')
221            if len(suite_type) and suite_type[0] in TESTSUITE_FILENAME:
222                suite_conf_name = TESTSUITE_FILENAME[suite_type[0]]
223            else:
224                logging.error(f"unknown app type to get configuration in '{report_path}'")
225                errors += 1
226                continue
227
228            suite_conf_fname = os.path.join(zephyr_base, plan[r_plan]['application'], suite_conf_name)
229            if not os.path.isfile(suite_conf_fname):
230                logging.error(f"test configuration not found for '{report_path}' at '{suite_conf_fname}'")
231                errors += 1
232                continue
233
234
235        # Check SHA presence in the current ZEPHYR_BASE
236        sha_match = VERSION_COMMIT_RE.search(data_id['version'])
237        version_sha = sha_match.group(1) if sha_match else data_id['version']
238        try:
239            git_commit = zephyr_base_repo.commit(version_sha)
240        except BadName:
241            logging.error(f"SHA:'{version_sha}' is not found in ZEPHYR_BASE for '{report_path}'")
242            errors += 1
243            continue
244
245
246        # Compose twister_footprint.json record - each application (test suite) will have its own
247        # simplified header with options, SHA, etc.
248
249        res = {}
250
251        res['environment'] = {
252            'zephyr_version': data_id['version'],
253            'commit_date':
254                git_commit.committed_datetime.astimezone(timezone.utc).isoformat(timespec='seconds'),
255            'run_date': run_date,
256            'options': {
257                'testsuite_root': [ plan[r_plan]['application'] ],
258                'build_only': True,
259                'create_rom_ram_report': True,
260                'footprint_report': 'all',
261                'platform': [ plan[r_plan]['board'] ]
262            }
263        }
264
265        test_suite = {
266            'name': suite_name,
267            'arch': None,
268            'platform': plan[r_plan]['board'],
269            'status': 'passed',
270            'footprint': {}
271        }
272
273        for k,v in FOOTPRINT_FILES.items():
274            footprint_fname = os.path.join(report_path, v)
275            try:
276                with open(footprint_fname, "rt") as footprint_json:
277                    logger.debug(f"reading {footprint_fname}")
278                    test_suite['footprint'][k] = json.load(footprint_json)
279            except FileNotFoundError:
280                logger.warning(f"{report_path} missing {v}")
281
282        res['testsuites'] = [test_suite]
283
284        report_fname = os.path.join(report_path, args.output_fname)
285        with open(report_fname, "wt") as json_file:
286            logger.debug(f"writing {report_fname}")
287            json.dump(res, json_file, indent=4, separators=(',',':'))
288
289        converted += 1
290
291    logging.info(f'found={len(args.input_paths)}, converted={converted}, '
292                 f'skipped={skipped}, filtered={filtered}, errors={errors}')
293    sys.exit(errors != 0)
294
295
296if __name__ == '__main__':
297    main()
298