1#! /usr/bin/env python3 2# Copyright 2019 Oticon A/S 3# SPDX-License-Identifier: Apache-2.0 4 5import os; 6from numpy import random; 7 8def parse_arguments(): 9 import argparse 10 parser = argparse.ArgumentParser(description="EDTT (Embedded Device Test Tool)", 11 epilog="Note: A transport can have its own options") 12 13 parser.add_argument("-v", "--verbose", default=2, type=int, help="Verbosity level"); 14 15 parser.add_argument("-t", "--transport", required=True, 16 help="Type of transport to connect to the devices (the " 17 "transport module must either exist as " 18 "src/components/edtt_<transport>.py, or be a path " 19 "to an importable transport module"); 20 21 parser.add_argument("-T", "--test", required=True, 22 help="Which test module to run. This can either be a module in " 23 " src/tests, or a file path (relative or absolute)"); 24 25 parser.add_argument("-C", "--case", required=False, 26 default="all", 27 help='Which testcase to run in that module.' 28 'Options are: A real test name, "all", "randomize",' 29 'or a file name containing a list of test names (default "all")') 30 31 parser.add_argument("--shuffle", required=False, 32 action='store_true', 33 help='Shuffle test order. ' 34 'The order will be dependent on the random seed. ' 35 'If <case> was set to all, this is equivalent to setting <case> to "randomize". ' 36 'If <case> was a file name, this will shuffle the lines in the file. ' 37 'If <case> was 1 particular testcase, this option has no effect.') 38 39 parser.add_argument("-S","--stop_on_failure", required=False, 40 action='store_true', 41 help="Stop as soon as any test fails, instead of " 42 "continuing with the remaining tests") 43 44 parser.add_argument("--seed", required=False, default=0x1234, help='Random generator seed (0x1234 by default)') 45 46 return parser.parse_known_args() 47 48def try_to_import(module_path, type, def_path): 49 try: 50 if (("." not in module_path) and ("/" not in module_path)): 51 from importlib import import_module; 52 loaded_module = import_module(def_path + module_path) 53 else: #The user seems to want to load the module from a place off-tree 54 #If the user forgot Let's fill in the extension 55 if module_path[-3:] != ".py": 56 module_path = module_path + ".py" 57 import imp; 58 loaded_module = imp.load_source('%s', module_path); 59 60 return loaded_module; 61 except ImportError as e: 62 print(("\n Could not load the %s %s . Does it exist?\n"% (type, module_path))) 63 raise; 64 65# Initialize the transport and connect to devices 66def init_transport(transport, xtra_args, trace): 67 transport_module = try_to_import(transport, "transport", "components.edttt_"); 68 transport = transport_module.EDTTT(xtra_args, trace); 69 transport.connect(); 70 return transport; 71 72def run_one_test(args, xtra_args, transport, trace, test_mod, test_spec, nameLen): 73 trace.trace(4, test_spec); 74 trace.trace(4, ""); 75 if test_spec.number_devices > transport.n_devices: 76 raise Exception("This test needs %i connected devices but you only connected to %i" % 77 (test_spec.number_devices, transport.n_devices)); 78 79 result = test_mod.run_a_test(args, transport, trace, test_spec); 80 trace.trace(2, "%-*s %s %s" % (nameLen, test_spec.name, test_spec.description[1:], ("PASSED" if result == 0 else "FAILED"))); 81 82 return result; 83 84# Attempt to load and run the tests 85def run_tests(args, xtra_args, transport, trace): 86 passed = 0; 87 total = 0; 88 89 test_mod = try_to_import(args.test, "test", "tests."); 90 test_specs = test_mod.get_tests_specs(); 91 nameLen = max([ len(test_specs[key].name) for key in test_specs ]); 92 93 t = args.case 94 95 if t.lower() == "all" or t.lower() == "randomize": 96 tests_list = list(test_specs.items()); 97 if t.lower() == "randomize" or args.shuffle: 98 random.shuffle(tests_list) 99 100 for _,test_spec in tests_list: 101 result = run_one_test(args, xtra_args, transport, trace, test_mod, test_spec, nameLen); 102 passed += 1 if result == 0 else 0; 103 total += 1; 104 if result != 0 and args.stop_on_failure: 105 break; 106 107 elif t in test_specs: 108 result = run_one_test(args, xtra_args, transport, trace, test_mod, test_specs[t], nameLen); 109 passed += 1 if result == 0 else 0; 110 total += 1; 111 112 elif os.path.isfile(t): 113 file = open(t, "r"); 114 lines = file.readlines(); 115 116 if args.shuffle: 117 random.shuffle(lines); 118 119 for line in lines: 120 t = line.split("#",1)[0]; #remove comments 121 t = t.strip().upper(); 122 if not t: #Skip empty lines, or those which had only comments 123 continue 124 if t in test_specs: 125 result = run_one_test(args, xtra_args, transport, trace, test_mod, test_specs[t], nameLen); 126 passed += 1 if result == 0 else 0; 127 total += 1; 128 if result != 0 and args.stop_on_failure: 129 break; 130 else: 131 print(("unkown test " + t + ". Skipping")) 132 file.close(); 133 134 else: 135 trace.trace(1, "Test '%s' not found!" % t); 136 total += 1; 137 138 failed = total - passed; 139 if total: 140 trace.trace(2, "\nSummary:\n\nStatus Count\n%s" % ('='*14)); 141 if passed > 0: 142 trace.trace(2, "PASS%10d" % passed); 143 144 if failed > 0: 145 trace.trace(2, "FAIL%10d" % failed); 146 trace.trace(2, "%s\nTotal%9d" % ('='*14, total)); 147 148 return failed 149 150class Trace(): 151 def __init__(self, level): 152 self.level = level; 153 self.transport = None 154 155 def trace(self, level, msg): 156 if ( level <= self.level ): 157 if self.transport: 158 from datetime import timedelta 159 td = timedelta(microseconds=self.transport.get_last_t()) 160 mm, ss = divmod(td.seconds, 60) 161 hh, mm = divmod(mm, 60) 162 ts = "%02d:%02d:%02d.%06d" % (hh, mm, ss, td.microseconds) 163 else: 164 ts = '--:--:--.------' 165 for line in str(msg).split('\n'): 166 print('edtt: @{} {}'.format(ts, line), flush=True); 167 168def main(): 169 transport = None; 170 try: 171 (args, xtra_args) = parse_arguments(); 172 173 random.seed(args.seed); 174 175 trace = Trace(args.verbose); 176 177 transport = init_transport(args.transport, xtra_args, trace); 178 trace.transport = transport; 179 180 result = run_tests(args, xtra_args, transport, trace); 181 182 transport.close(); 183 184 from sys import exit; 185 exit(result); 186 187 except: 188 if transport: 189 transport.close(); 190 raise; 191 192if __name__ == "__main__": 193 main(); 194