1#! /usr/bin/env python3 2# Copyright 2019 Oticon A/S 3# SPDX-License-Identifier: Apache-2.0 4 5import os; 6from components.btsnoop import Btsnoop, BtsnoopPriority 7from numpy import random; 8from components.dump import SortedDumps, Packets 9from components.utils import toArray 10 11def parse_arguments(): 12 import argparse 13 parser = argparse.ArgumentParser(description="EDTT (Embedded Device Test Tool)", 14 epilog="Note: A transport can have its own options") 15 16 parser.add_argument("-v", "--verbose", default=2, type=int, help="Verbosity level"); 17 18 parser.add_argument("-t", "--transport", required=True, 19 help="Type of transport to connect to the devices (the " 20 "transport module must either exist as " 21 "src/components/edtt_<transport>.py, or be a path " 22 "to an importable transport module"); 23 24 parser.add_argument("-T", "--test", required=True, 25 help="Which test module to run. This can either be a module in " 26 " src/tests, or a file path (relative or absolute)"); 27 28 parser.add_argument("-C", "--case", required=False, 29 default="all", 30 help='Which testcase to run in that module.' 31 'Options are: A real test name, "all", "randomize",' 32 'or a file name containing a list of test names (default "all")') 33 34 parser.add_argument("--shuffle", required=False, 35 action='store_true', 36 help='Shuffle test order. ' 37 'The order will be dependent on the random seed. ' 38 'If <case> was set to all, this is equivalent to setting <case> to "randomize". ' 39 'If <case> was a file name, this will shuffle the lines in the file. ' 40 'If <case> was 1 particular testcase, this option has no effect.') 41 42 parser.add_argument("-S","--stop_on_failure", required=False, 43 action='store_true', 44 help="Stop as soon as any test fails, instead of " 45 "continuing with the remaining tests") 46 47 parser.add_argument("--seed", required=False, default=0x1234, help='Random generator seed (0x1234 by default)') 48 49 parser.add_argument("--store_btsnoop", required=False, default=False, help="Store btsnoop to the file") 50 51 parser.add_argument("--btmon_socket_path", required=False, default="/tmp/btmon-sock", help="path to the unix socket used by btmon") 52 53 return parser.parse_known_args() 54 55def try_to_import(module_path, type, def_path): 56 try: 57 if (("." not in module_path) and ("/" not in module_path)): 58 from importlib import import_module; 59 loaded_module = import_module(def_path + module_path) 60 else: #The user seems to want to load the module from a place off-tree 61 #If the user forgot Let's fill in the extension 62 if module_path[-3:] != ".py": 63 module_path = module_path + ".py" 64 import imp; 65 loaded_module = imp.load_source('%s', module_path); 66 67 return loaded_module; 68 except ImportError as e: 69 print(("\n Could not load the %s %s . Does it exist?\n"% (type, module_path))) 70 raise; 71 72# Initialize the transport and connect to devices 73def init_transport(transport, xtra_args, trace): 74 transport_module = try_to_import(transport, "transport", "components.edttt_"); 75 transport = transport_module.EDTTT(xtra_args, trace); 76 transport.connect(); 77 return transport; 78 79def run_one_test(args, xtra_args, transport, trace, test_mod, test_spec, nameLen, packets): 80 trace.trace(4, test_spec); 81 trace.trace(4, ""); 82 if test_spec.number_devices > transport.n_devices: 83 raise Exception("This test needs %i connected devices but you only connected to %i" % 84 (test_spec.number_devices, transport.n_devices)); 85 86 trace.btsnoop.send_user_data(0, BtsnoopPriority.INFO, test_spec.name) 87 result = test_mod.run_a_test(args, transport, trace, test_spec, packets); 88 trace.trace(2, "%-*s %s %s" % (nameLen, test_spec.name, test_spec.description[1:], ("PASSED" if result == 0 else "FAILED"))); 89 trace.btsnoop.send_user_data(0, BtsnoopPriority.INFO, "%-*s %s %s" % (nameLen, test_spec.name, test_spec.description[1:], ("PASSED" if result == 0 else "FAILED"))) 90 91 return result; 92 93# Attempt to load and run the tests 94def run_tests(args, xtra_args, transport, trace, dumps): 95 passed = 0; 96 total = 0; 97 unknown = 0; 98 99 test_mod = try_to_import(args.test, "test", "tests."); 100 test_specs = test_mod.get_tests_specs(); 101 nameLen = max([ len(test_specs[key].name) for key in test_specs ]); 102 103 t = args.case 104 105 if t.lower() == "all" or t.lower() == "randomize": 106 tests_list = list(test_specs.items()); 107 if t.lower() == "randomize" or args.shuffle: 108 random.shuffle(tests_list) 109 110 for _,test_spec in tests_list: 111 result = run_one_test(args, xtra_args, transport, trace, test_mod, test_spec, nameLen, Packets(dumps)) 112 passed += 1 if result == 0 else 0; 113 total += 1; 114 if result != 0 and args.stop_on_failure: 115 break; 116 117 elif t in test_specs: 118 result = run_one_test(args, xtra_args, transport, trace, test_mod, test_specs[t], nameLen, Packets(dumps)) 119 passed += 1 if result == 0 else 0; 120 total += 1; 121 122 elif os.path.isfile(t): 123 file = open(t, "r"); 124 lines = file.readlines(); 125 126 if args.shuffle: 127 random.shuffle(lines); 128 129 for line in lines: 130 t = line.split("#",1)[0]; #remove comments 131 t = t.strip().upper(); 132 if not t: #Skip empty lines, or those which had only comments 133 continue 134 if t in test_specs: 135 result = run_one_test(args, xtra_args, transport, trace, test_mod, test_specs[t], nameLen, Packets(dumps)) 136 passed += 1 if result == 0 else 0; 137 total += 1; 138 if result != 0 and args.stop_on_failure: 139 break; 140 else: 141 unknown += 1; 142 print(("unknown test " + t + ". Skipping")) 143 file.close(); 144 145 else: 146 trace.trace(1, "Test '%s' not found!" % t); 147 total += 1; 148 149 failed = total - passed; 150 if total: 151 trace.trace(2, "\nSummary:\n\nStatus Count\n%s" % ('='*14)); 152 if passed > 0: 153 trace.trace(2, "PASS%10d" % passed); 154 trace.btsnoop.send_user_data(0, BtsnoopPriority.INFO, "PASS%10d" % passed) 155 156 if failed > 0: 157 trace.trace(2, "FAIL%10d" % failed); 158 trace.btsnoop.send_user_data(0, BtsnoopPriority.INFO, "FAIL%10d" % failed) 159 160 if unknown > 0: 161 trace.trace(2, "UNKNOWN%7d" % unknown); 162 163 trace.trace(2, "%s\nTotal%9d" % ('='*14, total)); 164 165 return failed + unknown 166 167class Trace(): 168 def __init__(self, level): 169 self.level = level; 170 self.transport = None 171 self.btsnoop = None 172 173 def trace(self, level, msg): 174 if ( level <= self.level ): 175 if self.transport: 176 from datetime import timedelta 177 td = timedelta(microseconds=self.transport.get_last_t()) 178 mm, ss = divmod(td.seconds, 60) 179 hh, mm = divmod(mm, 60) 180 ts = "%02d:%02d:%02d.%06d" % (hh, mm, ss, td.microseconds) 181 else: 182 ts = '--:--:--.------' 183 for line in str(msg).split('\n'): 184 print('edtt: @{} {}'.format(ts, line), flush=True); 185 186def main(): 187 transport = None; 188 try: 189 (args, xtra_args) = parse_arguments(); 190 191 random.seed(int(args.seed)); 192 193 trace = Trace(args.verbose); 194 195 transport = init_transport(args.transport, xtra_args, trace); 196 trace.transport = transport; 197 trace.btsnoop = Btsnoop(args.store_btsnoop, args.btmon_socket_path) 198 address = 0x000000000000 199 trace.btsnoop.send_index_added(0, toArray(address, 6), "UpperTester") 200 trace.btsnoop.send_index_added(1, toArray(address, 6), "LowerTester") 201 202 device_dumps = SortedDumps() 203 device_dumps.add_rx(0, os.path.join(os.environ['BSIM_OUT_PATH'], 'results', transport.sim_id, 'd_2G4_00.Rx.csv')) 204 device_dumps.add_tx(0, os.path.join(os.environ['BSIM_OUT_PATH'], 'results', transport.sim_id, 'd_2G4_00.Tx.csv')) 205 device_dumps.add_rx(1, os.path.join(os.environ['BSIM_OUT_PATH'], 'results', transport.sim_id, 'd_2G4_01.Rx.csv')) 206 device_dumps.add_tx(1, os.path.join(os.environ['BSIM_OUT_PATH'], 'results', transport.sim_id, 'd_2G4_01.Tx.csv')) 207 208 trace.btsnoop.send_user_data(0, BtsnoopPriority.ALERT, "Testing session started") 209 result = run_tests(args, xtra_args, transport, trace, device_dumps) 210 trace.btsnoop.send_user_data(0, BtsnoopPriority.INFO, "Testing session completed ") 211 trace.btsnoop.close() 212 transport.close(); 213 214 from sys import exit; 215 exit(result); 216 217 except: 218 if transport: 219 transport.close(); 220 raise; 221 222if __name__ == "__main__": 223 main(); 224