1#!/usr/bin/env python3 2# 3# Copyright (c) 2024 Intel Corporation 4# 5# SPDX-License-Identifier: Apache-2.0 6 7""" 8This script converts memory footprint data prepared by `./footprint/scripts/track.py` 9into a JSON files compatible with Twister report schema making them ready for upload 10to the same ElasticSearch data storage together with other Twister reports 11for analysis, visualization, etc. 12 13The memory footprint input data files (rom.json, ram.json) are expected in directories 14sturctured as 'ZEPHYR_VERSION/APPLICATION/FEATURE/BOARD' under the input path(s). 15The BOARD name itself can be in HWMv2 format as 'BOARD/SOC' or 'BOARD/SOC/VARIANT' 16with the corresponding sub-directories. 17 18For example, an input path `./**/*v3.6.0-rc3-*/footprints/**/frdm_k64f/` will be 19expanded by bash to all sub-directories with the 'footprints' data `v3.6.0-rc3` 20release commits collected for `frdm_k64f` board. 21Note: for the above example to work the bash recursive globbing should be active: 22`shopt -s globstar`. 23 24The output `twister_footprint.json` files will be placed into the same directories 25as the corresponding input files. 26 27In Twister report a test instance has either long or short name, each needs test 28suite name from the test configuration yaml file. 29This scripts has `--test-name` parameter to customize how to compose test names 30from the plan.txt columns including an additional (last) one whth explicit 31test suite name ('dot separated' format). 32""" 33 34from __future__ import annotations 35 36from datetime import datetime, timezone 37import argparse 38import os 39import sys 40import re 41import csv 42import logging 43import json 44from git import Repo 45from git.exc import BadName 46 47 48VERSION_COMMIT_RE = re.compile(r".*-g([a-f0-9]{12})$") 49PLAN_HEADERS = ['name', 'feature', 'board', 'application', 'options', 'suite_name'] 50TESTSUITE_FILENAME = { 'tests': 'testcase.yaml', 'samples': 'sample.yaml' } 51FOOTPRINT_FILES = { 'ROM': 'rom.json', 'RAM': 'ram.json' } 52RESULT_FILENAME = 'twister_footprint.json' 53HWMv2_LEVELS = 3 54 55logger = None 56LOG_LEVELS = { 57 'DEBUG': (logging.DEBUG, 3), 58 'INFO': (logging.INFO, 2), 59 'WARNING': (logging.WARNING, 1), 60 'ERROR': (logging.ERROR, 0) 61 } 62 63 64def init_logs(logger_name=''): 65 global logger 66 67 log_level = os.environ.get('LOG_LEVEL', 'ERROR') 68 log_level = LOG_LEVELS[log_level][0] if log_level in LOG_LEVELS else logging.ERROR 69 70 console = logging.StreamHandler(sys.stdout) 71 console.setFormatter(logging.Formatter('%(asctime)s - %(levelname)-8s - %(message)s')) 72 73 logger = logging.getLogger(logger_name) 74 logger.setLevel(log_level) 75 logger.addHandler(console) 76 77def set_verbose(verbose: int): 78 levels = { lvl[1]: lvl[0] for lvl in LOG_LEVELS.values() } 79 if verbose > len(levels): 80 verbose = len(levels) 81 if verbose <= 0: 82 verbose = 0 83 logger.setLevel(levels[verbose]) 84 85 86def parse_args(): 87 parser = argparse.ArgumentParser(allow_abbrev=False, 88 formatter_class=argparse.RawDescriptionHelpFormatter, 89 description=__doc__) 90 91 parser.add_argument('input_paths', metavar='INPUT_PATHS', nargs='+', 92 help="Directories with the memory footprint data to convert. " 93 "Each directory must have 'ZEPHYR_VERSION/APPLICATION/FEATURE/BOARD' path structure.") 94 95 parser.add_argument('-p', '--plan', metavar='PLAN_FILE_CSV', required=True, 96 help="An execution plan (CSV file) with details of what footprint applications " 97 "and platforms were chosen to generate the input data. " 98 "It is also applied to filter input directories and check their names.") 99 100 parser.add_argument('-o', '--output-fname', metavar='OUTPUT_FNAME', required=False, 101 default=RESULT_FILENAME, 102 help="Destination JSON file name to create at each of INPUT_PATHS. " 103 "Default: '%(default)s'") 104 105 parser.add_argument('-z', '--zephyr_base', metavar='ZEPHYR_BASE', required=False, 106 default = os.environ.get('ZEPHYR_BASE'), 107 help="Zephyr code base path to use instead of the current ZEPHYR_BASE environment variable. " 108 "The script needs Zephyr repository there to read SHA and commit time of builds. " 109 "Current default: '%(default)s'") 110 111 parser.add_argument("--test-name", 112 choices=['application/suite_name', 'suite_name', 'application', 'name.feature'], 113 default='name.feature', 114 help="How to compose Twister test instance names using plan.txt columns. " 115 "Default: '%(default)s'" ) 116 117 parser.add_argument("--no-testsuite-check", 118 dest='testsuite_check', action="store_false", 119 help="Don't check for applications' testsuite configs in ZEPHYR_BASE.") 120 121 parser.add_argument('-v', '--verbose', required=False, action='count', default=0, 122 help="Increase the logging level for each occurrence. Default level: ERROR") 123 124 return parser.parse_args() 125 126 127def read_plan(fname: str) -> list[dict]: 128 plan = [] 129 with open(fname) as plan_file: 130 plan_rows = csv.reader(plan_file) 131 plan_vals = [ dict(zip(PLAN_HEADERS, row)) for row in plan_rows ] 132 plan = { f"{p['name']}/{p['feature']}/{p['board']}" : p for p in plan_vals } 133 return plan 134 135 136def get_id_from_path(plan, in_path, max_levels=HWMv2_LEVELS): 137 data_id = {} 138 (in_path, data_id['board']) = os.path.split(in_path) 139 if not data_id['board']: 140 # trailing '/' 141 (in_path, data_id['board']) = os.path.split(in_path) 142 143 for _ in range(max_levels): 144 (in_path, data_id['feature']) = os.path.split(in_path) 145 (c_head, data_id['app']) = os.path.split(in_path) 146 (c_head, data_id['version']) = os.path.split(c_head) 147 if not all(data_id.values()): 148 # incorrect plan id 149 return None 150 if f"{data_id['app']}/{data_id['feature']}/{data_id['board']}" in plan: 151 return data_id 152 else: 153 # try with HWMv2 board name one more level deep 154 data_id['board'] = f"{data_id['feature']}/{data_id['board']}" 155 156 # not found 157 return {} 158 159 160def main(): 161 errors = 0 162 converted = 0 163 skipped = 0 164 filtered = 0 165 166 run_date = datetime.now(timezone.utc).isoformat(timespec='seconds') 167 168 init_logs() 169 170 args = parse_args() 171 172 set_verbose(args.verbose) 173 174 if not args.zephyr_base: 175 logging.error("ZEPHYR_BASE is not defined.") 176 sys.exit(1) 177 178 zephyr_base = os.path.abspath(args.zephyr_base) 179 zephyr_base_repo = Repo(zephyr_base) 180 181 logging.info(f"scanning {len(args.input_paths)} directories ...") 182 183 logging.info(f"use plan '{args.plan}'") 184 plan = read_plan(args.plan) 185 186 test_name_sep = '/' if '/' in args.test_name else '.' 187 test_name_parts = args.test_name.split(test_name_sep) 188 189 for report_path in args.input_paths: 190 logging.info(f"convert {report_path}") 191 # print(p) 192 p_head = os.path.normcase(report_path) 193 p_head = os.path.normpath(p_head) 194 if not os.path.isdir(p_head): 195 logging.error(f"not a directory '{p_head}'") 196 errors += 1 197 continue 198 199 data_id = get_id_from_path(plan, p_head) 200 if data_id is None: 201 logging.warning(f"skipped '{report_path}' - not a correct report directory") 202 skipped += 1 203 continue 204 elif not data_id: 205 logging.info(f"filtered '{report_path}' - not in the plan") 206 filtered += 1 207 continue 208 209 r_plan = f"{data_id['app']}/{data_id['feature']}/{data_id['board']}" 210 211 if 'suite_name' in test_name_parts and 'suite_name' not in plan[r_plan]: 212 logging.info(f"filtered '{report_path}' - no Twister suite name in the plan.") 213 filtered += 1 214 continue 215 216 suite_name = test_name_sep.join([plan[r_plan][n] if n in plan[r_plan] else '' for n in test_name_parts]) 217 218 # Just some sanity checks of the 'application' in the current ZEPHYR_BASE 219 if args.testsuite_check: 220 suite_type = plan[r_plan]['application'].split('/') 221 if len(suite_type) and suite_type[0] in TESTSUITE_FILENAME: 222 suite_conf_name = TESTSUITE_FILENAME[suite_type[0]] 223 else: 224 logging.error(f"unknown app type to get configuration in '{report_path}'") 225 errors += 1 226 continue 227 228 suite_conf_fname = os.path.join(zephyr_base, plan[r_plan]['application'], suite_conf_name) 229 if not os.path.isfile(suite_conf_fname): 230 logging.error(f"test configuration not found for '{report_path}' at '{suite_conf_fname}'") 231 errors += 1 232 continue 233 234 235 # Check SHA presence in the current ZEPHYR_BASE 236 sha_match = VERSION_COMMIT_RE.search(data_id['version']) 237 version_sha = sha_match.group(1) if sha_match else data_id['version'] 238 try: 239 git_commit = zephyr_base_repo.commit(version_sha) 240 except BadName: 241 logging.error(f"SHA:'{version_sha}' is not found in ZEPHYR_BASE for '{report_path}'") 242 errors += 1 243 continue 244 245 246 # Compose twister_footprint.json record - each application (test suite) will have its own 247 # simplified header with options, SHA, etc. 248 249 res = {} 250 251 res['environment'] = { 252 'zephyr_version': data_id['version'], 253 'commit_date': 254 git_commit.committed_datetime.astimezone(timezone.utc).isoformat(timespec='seconds'), 255 'run_date': run_date, 256 'options': { 257 'testsuite_root': [ plan[r_plan]['application'] ], 258 'build_only': True, 259 'create_rom_ram_report': True, 260 'footprint_report': 'all', 261 'platform': [ plan[r_plan]['board'] ] 262 } 263 } 264 265 test_suite = { 266 'name': suite_name, 267 'arch': None, 268 'platform': plan[r_plan]['board'], 269 'status': 'passed', 270 'footprint': {} 271 } 272 273 for k,v in FOOTPRINT_FILES.items(): 274 footprint_fname = os.path.join(report_path, v) 275 try: 276 with open(footprint_fname, "rt") as footprint_json: 277 logger.debug(f"reading {footprint_fname}") 278 test_suite['footprint'][k] = json.load(footprint_json) 279 except FileNotFoundError: 280 logger.warning(f"{report_path} missing {v}") 281 282 res['testsuites'] = [test_suite] 283 284 report_fname = os.path.join(report_path, args.output_fname) 285 with open(report_fname, "wt") as json_file: 286 logger.debug(f"writing {report_fname}") 287 json.dump(res, json_file, indent=4, separators=(',',':')) 288 289 converted += 1 290 291 logging.info(f'found={len(args.input_paths)}, converted={converted}, ' 292 f'skipped={skipped}, filtered={filtered}, errors={errors}') 293 sys.exit(errors != 0) 294 295 296if __name__ == '__main__': 297 main() 298