1import re
2import argparse
3import os.path
4import itertools
5import subprocess
6import sys
7from os import environ
8
9from colorama import init,Fore, Back, Style
10
11try:
12   os.mkdir("AC6_results")
13except:
14    pass
15
16try:
17   os.mkdir("GCC_results")
18except:
19    pass
20
21try:
22   os.mkdir("CLANG_results")
23except:
24    pass
25
26DEBUG = False
27ERROR_OCCURED = False
28
29all_errors = []
30
31def printTitle(s):
32    print("\n" + Fore.GREEN + Style.BRIGHT +  s + Style.RESET_ALL)
33
34def printSubTitle(s):
35    print(Fore.YELLOW + Style.BRIGHT + s + Style.RESET_ALL)
36
37def printError(s):
38    print(Fore.RED + Style.BRIGHT +  s + Style.RESET_ALL+"\n")
39
40class Result:
41    def __init__(self,msg,error=False):
42        self._error = error
43        self._msg = msg
44
45    @property
46    def error(self):
47        return self._error
48
49    @property
50    def msg(self):
51        return self._msg
52
53def is_error(res,test_name,err):
54    if res.error:
55        printError("Error")
56        all_errors.append(test_name)
57        print(test_name,file=err)
58        print(res.msg,file=err)
59        print("--------------",file=err)
60        return(True)
61    return(False)
62
63def run(args,mustPrint=False,dumpStdErr=True,timeout=20,printCmd=False):
64    global ERROR_OCCURED
65    global DEBUG
66    try:
67        if DEBUG or printCmd:
68            print(" ".join(args))
69        result=subprocess.run(args,text=True,capture_output=True,timeout=timeout)
70        if result.returncode !=0 :
71             ERROR_OCCURED = True
72             if dumpStdErr:
73                return(Result(result.stderr + "\n\nSTDOUT:\n\n" + result.stdout,error=True))
74             else:
75                return(Result(result.stdout,error=True))
76
77        if mustPrint:
78            print(result.stdout)
79        return(Result(result.stdout))
80    except Exception as e:
81        printError("Exception occured")
82        ERROR_OCCURED = True
83        return(Result(str(e),error=True))
84
85parser = argparse.ArgumentParser(description='Parse test description')
86parser.add_argument('-c', nargs='?',type = str, default="M55",help="M55/M4/M0")
87parser.add_argument('-p', nargs='?',type = str, default="VHT",help="VHT/MPS3")
88parser.add_argument('-a', action='store_true', help="Generate allocator definitions")
89parser.add_argument('-i', action='store_true', help="Refresh global allocator index")
90parser.add_argument('-b', action='store_true', help="Only benchmarks")
91parser.add_argument('-d', action='store_true', help="Dry run")
92parser.add_argument('-g', nargs='?',type = str, default="AC6",help="AC6 / CLANG / GCC")
93parser.add_argument('-u', nargs='?',type = str, default="L85986697A",help="Debug UUID")
94parser.add_argument('-t', action='store_true', help="Enable test mode")
95parser.add_argument('-avh', nargs='?',type = str, default="C:/Keil_v5/ARM/avh-fvp/bin/models", help="AVH folder")
96
97args = parser.parse_args()
98
99GHACTION = False
100
101if "AVH_FVP_PLUGINS" in os.environ:
102    GHACTION = True
103
104if not GHACTION:
105   import mps3run
106
107init()
108
109if args.a:
110    printTitle("Mode allocator generations")
111
112if args.i:
113    printTitle("Allocator test index refresh")
114
115NAME_TO_BOARD = {
116    "M55": "Corstone-300",
117    "Corstone-300": "Corstone-300",
118    "M4": "M4",
119    "M0" : "M0P"
120}
121
122def results():
123    if args.g == "AC6":
124        return("AC6_results")
125
126    if args.g == "GCC":
127        return("GCC_results")
128
129    if args.g == "CLANG":
130        return("CLANG_results")
131
132    print(f"Compiler {args.g} not known")
133    exit(1)
134
135def target_name():
136    return(f"{args.p}-{NAME_TO_BOARD[args.c]}")
137
138def cmd_args():
139    # cbuild -O cprj test.csolution.yml -r --toolchain AC6 -c test.Release+MPS3-Corstone-300
140    toolchain = args.g
141    target = f"test.Release+{target_name()}"
142
143    command = ["-O", "cprj",
144               "test.csolution.yml",
145               "--toolchain", toolchain,
146               "-c", target]
147
148    return(command)
149
150
151
152if args.g == "AC6":
153    ext = ".axf"
154else:
155    ext = ".elf"
156
157fvpWin = {"M55":"FVP_Corstone_SSE-300_Ethos-U55.exe",
158          "M4":"FVP_MPS2_Cortex-M4.exe",
159          "M0":"FVP_MPS2_Cortex-M0plus.exe"}
160
161fvpUnix = {"M55":"FVP_Corstone_SSE-300_Ethos-U55",
162          "M4":"FVP_MPS2_Cortex-M4",
163          "M0":"FVP_MPS2_Cortex-M0plus"}
164
165AVHROOT = args.avh
166
167TESTS=["DOT_TEST",
168       "VECTOR_TEST",
169       "ROW_TEST",
170       "COL_TEST",
171       "MATRIX_TEST",
172       "FUSION_TEST"
173       ]
174
175# Some tests are too big (code size) and needs to be decomposed
176# They contain SUBTEST1, SUBTEST2 ... #if in the code
177# This script must know how many subtests are defined in each test
178# suite
179# No need to define an entry in this dictionary when no
180# subtest is defined
181SUBTESTS = {"MATRIX_TEST":19}
182# Subtests that are only for testing and not benchmarks
183ONLY_TESTS = {"MATRIX_TEST":[3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19]}
184
185def is_only_test(n,i):
186    if n[0] in ONLY_TESTS:
187       return(i in ONLY_TESTS[n[0]])
188    return False
189
190DATATYPES = ["F64_DT",
191             "F32_DT",
192             "F16_DT",
193             "Q31_DT",
194             "Q15_DT",
195             "Q7_DT"
196             ]
197
198MODE = ["STATIC_TEST",
199        "DYNAMIC_TEST"
200        ]
201
202# Restricted tests for debugging
203#TESTS=["FUSION_TEST"]
204#DATATYPES=["Q15_DT"]
205#MODE = ["STATIC_TEST"]
206
207all_tests = list(itertools.product(TESTS,DATATYPES,MODE))
208
209
210
211ALLOC = "#define POOL_ALLOCATOR"
212TESTMODE = ""
213if args.t:
214    TESTMODE = "#define TESTMODE"
215
216if args.a:
217    # Stat allocator enabled and we do stats on VHT CS300 only
218    ALLOC = "//#define POOL_ALLOCATOR"
219    args.c = "M55"
220    args.p = "VHT"
221
222BENCH = "//#define ONLY_BENCHMARKS"
223if args.b:
224    BENCH = "#define ONLY_BENCHMARKS"
225
226HEADER = f"""#ifndef TEST_CONFIG_H
227#define TEST_CONFIG_H
228
229{TESTMODE}
230{ALLOC}
231{BENCH}
232
233#define %s
234#define %s
235#define %s
236%s
237
238#endif
239"""
240
241
242
243def out_path():
244    return(os.path.join("cprj","out","test",target_name(),"Release","test"+ ext))
245
246def configure_and_build_test(test_name,test,err,subtest,first):
247    if subtest is not None:
248        subteststr = f"#define SUBTEST{subtest}"
249    else:
250        subteststr = ""
251    with open("test_config.h","w") as c:
252        print(HEADER % (test + (subteststr,)),file=c)
253    if first:
254       res = run(["cbuild"] + cmd_args() + ["-r","--update-rte"],timeout=600,printCmd=True)
255    else:
256       res = run(["cbuild"] +cmd_args(),timeout=600,printCmd=True)
257    if not is_error(res,test_name,err):
258        if DEBUG:
259            print(res.msg)
260        return(True)
261    return(False)
262
263def process_allocator_data(test_name,test,msg,subtest):
264    lines = msg.splitlines()
265    state = 0
266    alloc_cpp = []
267    alloc_h = []
268    for l in lines:
269        if re.match(r"^ALLOC_POOL.*$",l):
270            alloc_cpp.append(l.strip())
271        if re.match(r"^POOL.*$",l):
272            alloc_h.append(l.strip())
273    if subtest is not None:
274        HEADER=f"#if defined({test[0]}) && defined({test[1]}) && defined({test[2]}) && defined(SUBTEST{subtest})"
275    else:
276        HEADER=f"#if defined({test[0]}) && defined({test[1]}) && defined({test[2]})"
277    # Gen h
278    with open(os.path.join("allocation",test_name)+".h","w") as h:
279        print(HEADER,file=h)
280        for l in alloc_h:
281            print(l,file=h)
282        print("#endif",file=h)
283
284    # Gen cpp
285    with open(os.path.join("allocation",test_name)+".cpp","w") as h:
286         print(HEADER,file=h)
287         for l in alloc_cpp:
288             print(l,file=h)
289         print("#endif",file=h)
290
291def process_bench(test_name,test,msg,subtest):
292    global DEBUG
293    lines = msg.splitlines()
294    test_name = args.p +"_" + args.c + "_" + test_name
295    if DEBUG:
296       print(os.path.join(results(),test_name)+".txt")
297    with open(os.path.join(results(),test_name)+".txt","w") as h:
298       for l in lines:
299           print(l.rstrip(),file=h)
300
301
302def process_result(test_name,test,msg,subtest):
303    printSubTitle("Process result")
304    if args.a:
305        process_allocator_data(test_name,test,msg,subtest)
306    else:
307        process_bench(test_name,test,msg,subtest)
308
309def runVHT(test_name,test,err,subtest):
310    core = args.c
311    target = target_name()
312    config = os.path.join("fvp_configs",target) + ".txt"
313    #print(target)
314    #print(config)
315    if core == "M55":
316        exe = "cpu0=" + out_path()
317    else:
318        exe = out_path()
319
320    if AVHROOT:
321       avhAttempt = os.path.join(AVHROOT,fvpWin[core])
322       if os.path.exists(avhAttempt):
323          avh = avhAttempt
324
325       avhAttempt = os.path.join(AVHROOT,fvpUnix[core])
326       if os.path.exists(avhAttempt):
327          avh = avhAttempt
328    else:
329       avh = avhUnixExe[core]
330
331    res=run([avh,"-f",config,"-a",exe],printCmd=True)
332    if not is_error(res,test_name,err):
333       process_result(test_name,test,res.msg,subtest)
334
335def runMPS3(test_name,test,err,subtest):
336    lines=""
337    res = None
338    try:
339        exe = out_path()
340        lines = mps3run.run_out(exe,args.u)
341        res = Result(lines)
342    except Exception as e:
343        res = Result(str(e),error = True)
344    if not is_error(res,test_name,err):
345       process_result(test_name,test,res.msg,subtest)
346
347def runATest(test,file_err,nb,NB_MAX,current_nb_axf,nb_axf,first=True,subtest=None):
348    global DEBUG
349    if subtest is not None:
350       maxsub = SUBTESTS[test[0]]
351       test_name=f"{test[0]}_{test[1]}_{test[2]}_{subtest}"
352       printTitle(test_name + f" : AXF {current_nb_axf} / {nb_axf}, TEST {nb}/{NB_MAX} (subtest {subtest}/{maxsub})")
353    else:
354       test_name=f"{test[0]}_{test[1]}_{test[2]}"
355       printTitle(test_name + f" : AXF {current_nb_axf} / {nb_axf}, TEST {nb}/{NB_MAX}")
356    if args.d:
357        return
358    printSubTitle("Configure and build")
359    if configure_and_build_test(test_name,test,file_err,subtest,first):
360        printSubTitle("Run")
361        if args.p == "VHT":
362            runVHT(test_name,test,file_err,subtest)
363        if args.p == "MPS3" and args.c == "M55":
364            runMPS3(test_name,test,file_err,subtest)
365
366nb_axf = 0
367for test in all_tests:
368    if test[0] in SUBTESTS:
369        for subtestnbb in range(SUBTESTS[test[0]]):
370            if not args.b or not is_only_test(test,subtestnbb+1):
371               nb_axf = nb_axf + 1
372    else:
373            nb_axf = nb_axf + 1
374print(f"Number of axf to test = {nb_axf}")
375
376with open(os.path.join(results(),f"errors_{args.c}.txt"),"w") as err:
377    # Generate include for allocations
378    if args.a or args.i:
379        with open(os.path.join("allocation","all.h"),"w") as fh:
380            for test in all_tests:
381                if test[0] in SUBTESTS:
382                   for subtestnbb in range(SUBTESTS[test[0]]):
383                      test_name=f"{test[0]}_{test[1]}_{test[2]}_{subtestnbb+1}"
384                      print(f"#include \"{test_name}.h\"",file=fh)
385                else:
386                    test_name=f"{test[0]}_{test[1]}_{test[2]}"
387                    print(f"#include \"{test_name}.h\"",file=fh)
388
389        with open(os.path.join("allocation","all.cpp"),"w") as fc:
390            for test in all_tests:
391                if test[0] in SUBTESTS:
392                   for subtestnbb in range(SUBTESTS[test[0]]):
393                       test_name=f"{test[0]}_{test[1]}_{test[2]}_{subtestnbb+1}"
394                       print(f"#include \"{test_name}.cpp\"",file=fc)
395                else:
396                   test_name=f"{test[0]}_{test[1]}_{test[2]}"
397                   print(f"#include \"{test_name}.cpp\"",file=fc)
398
399    if not args.i:
400        NB_MAX = len(all_tests)
401        nb = 1 # test cases
402        current_axf = 1
403        first = True
404        for test in all_tests:
405            if test[0] in SUBTESTS:
406                for subtestnbb in range(SUBTESTS[test[0]]):
407                    if not args.b or not is_only_test(test,subtestnbb+1):
408                       runATest(test,err,nb,NB_MAX,current_axf,nb_axf,first,subtestnbb+1)
409                       current_axf = current_axf + 1
410                       first = False
411            else:
412                runATest(test,err,nb,NB_MAX,current_axf,nb_axf,first)
413                current_axf = current_axf + 1
414                first = False
415            nb = nb + 1
416
417
418if not GHACTION:
419   if ERROR_OCCURED:
420       printError("Error in tests:")
421       for n in all_errors:
422           printError(n)
423       sys.exit("Error occurred")
424   else:
425       sys.exit(0)
426