1import re 2import argparse 3import os.path 4import itertools 5import subprocess 6import sys 7from os import environ 8 9from colorama import init,Fore, Back, Style 10 11try: 12 os.mkdir("AC6_results") 13except: 14 pass 15 16try: 17 os.mkdir("GCC_results") 18except: 19 pass 20 21try: 22 os.mkdir("CLANG_results") 23except: 24 pass 25 26DEBUG = False 27ERROR_OCCURED = False 28 29all_errors = [] 30 31def printTitle(s): 32 print("\n" + Fore.GREEN + Style.BRIGHT + s + Style.RESET_ALL) 33 34def printSubTitle(s): 35 print(Fore.YELLOW + Style.BRIGHT + s + Style.RESET_ALL) 36 37def printError(s): 38 print(Fore.RED + Style.BRIGHT + s + Style.RESET_ALL+"\n") 39 40class Result: 41 def __init__(self,msg,error=False): 42 self._error = error 43 self._msg = msg 44 45 @property 46 def error(self): 47 return self._error 48 49 @property 50 def msg(self): 51 return self._msg 52 53def is_error(res,test_name,err): 54 if res.error: 55 printError("Error") 56 all_errors.append(test_name) 57 print(test_name,file=err) 58 print(res.msg,file=err) 59 print("--------------",file=err) 60 return(True) 61 return(False) 62 63def run(args,mustPrint=False,dumpStdErr=True,timeout=20,printCmd=False): 64 global ERROR_OCCURED 65 global DEBUG 66 try: 67 if DEBUG or printCmd: 68 print(" ".join(args)) 69 result=subprocess.run(args,text=True,capture_output=True,timeout=timeout) 70 if result.returncode !=0 : 71 ERROR_OCCURED = True 72 if dumpStdErr: 73 return(Result(result.stderr + "\n\nSTDOUT:\n\n" + result.stdout,error=True)) 74 else: 75 return(Result(result.stdout,error=True)) 76 77 if mustPrint: 78 print(result.stdout) 79 return(Result(result.stdout)) 80 except Exception as e: 81 printError("Exception occured") 82 ERROR_OCCURED = True 83 return(Result(str(e),error=True)) 84 85parser = argparse.ArgumentParser(description='Parse test description') 86parser.add_argument('-c', nargs='?',type = str, default="M55",help="M55/M4/M0") 87parser.add_argument('-p', nargs='?',type = str, default="VHT",help="VHT/MPS3") 88parser.add_argument('-a', action='store_true', help="Generate allocator definitions") 89parser.add_argument('-i', action='store_true', help="Refresh global allocator index") 90parser.add_argument('-b', action='store_true', help="Only benchmarks") 91parser.add_argument('-d', action='store_true', help="Dry run") 92parser.add_argument('-g', nargs='?',type = str, default="AC6",help="AC6 / CLANG / GCC") 93parser.add_argument('-u', nargs='?',type = str, default="L85986697A",help="Debug UUID") 94parser.add_argument('-t', action='store_true', help="Enable test mode") 95parser.add_argument('-avh', nargs='?',type = str, default="C:/Keil_v5/ARM/avh-fvp/bin/models", help="AVH folder") 96 97args = parser.parse_args() 98 99GHACTION = False 100 101if "AVH_FVP_PLUGINS" in os.environ: 102 GHACTION = True 103 104if not GHACTION: 105 import mps3run 106 107init() 108 109if args.a: 110 printTitle("Mode allocator generations") 111 112if args.i: 113 printTitle("Allocator test index refresh") 114 115NAME_TO_BOARD = { 116 "M55": "Corstone-300", 117 "Corstone-300": "Corstone-300", 118 "M4": "M4", 119 "M0" : "M0P" 120} 121 122def results(): 123 if args.g == "AC6": 124 return("AC6_results") 125 126 if args.g == "GCC": 127 return("GCC_results") 128 129 if args.g == "CLANG": 130 return("CLANG_results") 131 132 print(f"Compiler {args.g} not known") 133 exit(1) 134 135def target_name(): 136 return(f"{args.p}-{NAME_TO_BOARD[args.c]}") 137 138def cmd_args(): 139 # cbuild -O cprj test.csolution.yml -r --toolchain AC6 -c test.Release+MPS3-Corstone-300 140 toolchain = args.g 141 target = f"test.Release+{target_name()}" 142 143 command = ["-O", "cprj", 144 "test.csolution.yml", 145 "--toolchain", toolchain, 146 "-c", target] 147 148 return(command) 149 150 151 152if args.g == "AC6": 153 ext = ".axf" 154else: 155 ext = ".elf" 156 157fvpWin = {"M55":"FVP_Corstone_SSE-300_Ethos-U55.exe", 158 "M4":"FVP_MPS2_Cortex-M4.exe", 159 "M0":"FVP_MPS2_Cortex-M0plus.exe"} 160 161fvpUnix = {"M55":"FVP_Corstone_SSE-300_Ethos-U55", 162 "M4":"FVP_MPS2_Cortex-M4", 163 "M0":"FVP_MPS2_Cortex-M0plus"} 164 165AVHROOT = args.avh 166 167TESTS=["DOT_TEST", 168 "VECTOR_TEST", 169 "ROW_TEST", 170 "COL_TEST", 171 "MATRIX_TEST", 172 "FUSION_TEST" 173 ] 174 175# Some tests are too big (code size) and needs to be decomposed 176# They contain SUBTEST1, SUBTEST2 ... #if in the code 177# This script must know how many subtests are defined in each test 178# suite 179# No need to define an entry in this dictionary when no 180# subtest is defined 181SUBTESTS = {"MATRIX_TEST":19} 182# Subtests that are only for testing and not benchmarks 183ONLY_TESTS = {"MATRIX_TEST":[3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19]} 184 185def is_only_test(n,i): 186 if n[0] in ONLY_TESTS: 187 return(i in ONLY_TESTS[n[0]]) 188 return False 189 190DATATYPES = ["F64_DT", 191 "F32_DT", 192 "F16_DT", 193 "Q31_DT", 194 "Q15_DT", 195 "Q7_DT" 196 ] 197 198MODE = ["STATIC_TEST", 199 "DYNAMIC_TEST" 200 ] 201 202# Restricted tests for debugging 203#TESTS=["FUSION_TEST"] 204#DATATYPES=["Q15_DT"] 205#MODE = ["STATIC_TEST"] 206 207all_tests = list(itertools.product(TESTS,DATATYPES,MODE)) 208 209 210 211ALLOC = "#define POOL_ALLOCATOR" 212TESTMODE = "" 213if args.t: 214 TESTMODE = "#define TESTMODE" 215 216if args.a: 217 # Stat allocator enabled and we do stats on VHT CS300 only 218 ALLOC = "//#define POOL_ALLOCATOR" 219 args.c = "M55" 220 args.p = "VHT" 221 222BENCH = "//#define ONLY_BENCHMARKS" 223if args.b: 224 BENCH = "#define ONLY_BENCHMARKS" 225 226HEADER = f"""#ifndef TEST_CONFIG_H 227#define TEST_CONFIG_H 228 229{TESTMODE} 230{ALLOC} 231{BENCH} 232 233#define %s 234#define %s 235#define %s 236%s 237 238#endif 239""" 240 241 242 243def out_path(): 244 return(os.path.join("cprj","out","test",target_name(),"Release","test"+ ext)) 245 246def configure_and_build_test(test_name,test,err,subtest,first): 247 if subtest is not None: 248 subteststr = f"#define SUBTEST{subtest}" 249 else: 250 subteststr = "" 251 with open("test_config.h","w") as c: 252 print(HEADER % (test + (subteststr,)),file=c) 253 if first: 254 res = run(["cbuild"] + cmd_args() + ["-r","--update-rte"],timeout=600,printCmd=True) 255 else: 256 res = run(["cbuild"] +cmd_args(),timeout=600,printCmd=True) 257 if not is_error(res,test_name,err): 258 if DEBUG: 259 print(res.msg) 260 return(True) 261 return(False) 262 263def process_allocator_data(test_name,test,msg,subtest): 264 lines = msg.splitlines() 265 state = 0 266 alloc_cpp = [] 267 alloc_h = [] 268 for l in lines: 269 if re.match(r"^ALLOC_POOL.*$",l): 270 alloc_cpp.append(l.strip()) 271 if re.match(r"^POOL.*$",l): 272 alloc_h.append(l.strip()) 273 if subtest is not None: 274 HEADER=f"#if defined({test[0]}) && defined({test[1]}) && defined({test[2]}) && defined(SUBTEST{subtest})" 275 else: 276 HEADER=f"#if defined({test[0]}) && defined({test[1]}) && defined({test[2]})" 277 # Gen h 278 with open(os.path.join("allocation",test_name)+".h","w") as h: 279 print(HEADER,file=h) 280 for l in alloc_h: 281 print(l,file=h) 282 print("#endif",file=h) 283 284 # Gen cpp 285 with open(os.path.join("allocation",test_name)+".cpp","w") as h: 286 print(HEADER,file=h) 287 for l in alloc_cpp: 288 print(l,file=h) 289 print("#endif",file=h) 290 291def process_bench(test_name,test,msg,subtest): 292 global DEBUG 293 lines = msg.splitlines() 294 test_name = args.p +"_" + args.c + "_" + test_name 295 if DEBUG: 296 print(os.path.join(results(),test_name)+".txt") 297 with open(os.path.join(results(),test_name)+".txt","w") as h: 298 for l in lines: 299 print(l.rstrip(),file=h) 300 301 302def process_result(test_name,test,msg,subtest): 303 printSubTitle("Process result") 304 if args.a: 305 process_allocator_data(test_name,test,msg,subtest) 306 else: 307 process_bench(test_name,test,msg,subtest) 308 309def runVHT(test_name,test,err,subtest): 310 core = args.c 311 target = target_name() 312 config = os.path.join("fvp_configs",target) + ".txt" 313 #print(target) 314 #print(config) 315 if core == "M55": 316 exe = "cpu0=" + out_path() 317 else: 318 exe = out_path() 319 320 if AVHROOT: 321 avhAttempt = os.path.join(AVHROOT,fvpWin[core]) 322 if os.path.exists(avhAttempt): 323 avh = avhAttempt 324 325 avhAttempt = os.path.join(AVHROOT,fvpUnix[core]) 326 if os.path.exists(avhAttempt): 327 avh = avhAttempt 328 else: 329 avh = avhUnixExe[core] 330 331 res=run([avh,"-f",config,"-a",exe],printCmd=True) 332 if not is_error(res,test_name,err): 333 process_result(test_name,test,res.msg,subtest) 334 335def runMPS3(test_name,test,err,subtest): 336 lines="" 337 res = None 338 try: 339 exe = out_path() 340 lines = mps3run.run_out(exe,args.u) 341 res = Result(lines) 342 except Exception as e: 343 res = Result(str(e),error = True) 344 if not is_error(res,test_name,err): 345 process_result(test_name,test,res.msg,subtest) 346 347def runATest(test,file_err,nb,NB_MAX,current_nb_axf,nb_axf,first=True,subtest=None): 348 global DEBUG 349 if subtest is not None: 350 maxsub = SUBTESTS[test[0]] 351 test_name=f"{test[0]}_{test[1]}_{test[2]}_{subtest}" 352 printTitle(test_name + f" : AXF {current_nb_axf} / {nb_axf}, TEST {nb}/{NB_MAX} (subtest {subtest}/{maxsub})") 353 else: 354 test_name=f"{test[0]}_{test[1]}_{test[2]}" 355 printTitle(test_name + f" : AXF {current_nb_axf} / {nb_axf}, TEST {nb}/{NB_MAX}") 356 if args.d: 357 return 358 printSubTitle("Configure and build") 359 if configure_and_build_test(test_name,test,file_err,subtest,first): 360 printSubTitle("Run") 361 if args.p == "VHT": 362 runVHT(test_name,test,file_err,subtest) 363 if args.p == "MPS3" and args.c == "M55": 364 runMPS3(test_name,test,file_err,subtest) 365 366nb_axf = 0 367for test in all_tests: 368 if test[0] in SUBTESTS: 369 for subtestnbb in range(SUBTESTS[test[0]]): 370 if not args.b or not is_only_test(test,subtestnbb+1): 371 nb_axf = nb_axf + 1 372 else: 373 nb_axf = nb_axf + 1 374print(f"Number of axf to test = {nb_axf}") 375 376with open(os.path.join(results(),f"errors_{args.c}.txt"),"w") as err: 377 # Generate include for allocations 378 if args.a or args.i: 379 with open(os.path.join("allocation","all.h"),"w") as fh: 380 for test in all_tests: 381 if test[0] in SUBTESTS: 382 for subtestnbb in range(SUBTESTS[test[0]]): 383 test_name=f"{test[0]}_{test[1]}_{test[2]}_{subtestnbb+1}" 384 print(f"#include \"{test_name}.h\"",file=fh) 385 else: 386 test_name=f"{test[0]}_{test[1]}_{test[2]}" 387 print(f"#include \"{test_name}.h\"",file=fh) 388 389 with open(os.path.join("allocation","all.cpp"),"w") as fc: 390 for test in all_tests: 391 if test[0] in SUBTESTS: 392 for subtestnbb in range(SUBTESTS[test[0]]): 393 test_name=f"{test[0]}_{test[1]}_{test[2]}_{subtestnbb+1}" 394 print(f"#include \"{test_name}.cpp\"",file=fc) 395 else: 396 test_name=f"{test[0]}_{test[1]}_{test[2]}" 397 print(f"#include \"{test_name}.cpp\"",file=fc) 398 399 if not args.i: 400 NB_MAX = len(all_tests) 401 nb = 1 # test cases 402 current_axf = 1 403 first = True 404 for test in all_tests: 405 if test[0] in SUBTESTS: 406 for subtestnbb in range(SUBTESTS[test[0]]): 407 if not args.b or not is_only_test(test,subtestnbb+1): 408 runATest(test,err,nb,NB_MAX,current_axf,nb_axf,first,subtestnbb+1) 409 current_axf = current_axf + 1 410 first = False 411 else: 412 runATest(test,err,nb,NB_MAX,current_axf,nb_axf,first) 413 current_axf = current_axf + 1 414 first = False 415 nb = nb + 1 416 417 418if not GHACTION: 419 if ERROR_OCCURED: 420 printError("Error in tests:") 421 for n in all_errors: 422 printError(n) 423 sys.exit("Error occurred") 424 else: 425 sys.exit(0) 426