#! /usr/bin/env python3 # Copyright 2019 Oticon A/S # SPDX-License-Identifier: Apache-2.0 import os; from components.btsnoop import Btsnoop, BtsnoopPriority from numpy import random; from components.dump import SortedDumps, Packets from components.utils import toArray def parse_arguments(): import argparse parser = argparse.ArgumentParser(description="EDTT (Embedded Device Test Tool)", epilog="Note: A transport can have its own options") parser.add_argument("-v", "--verbose", default=2, type=int, help="Verbosity level"); parser.add_argument("-t", "--transport", required=True, help="Type of transport to connect to the devices (the " "transport module must either exist as " "src/components/edtt_<transport>.py, or be a path " "to an importable transport module"); parser.add_argument("-T", "--test", required=True, help="Which test module to run. This can either be a module in " " src/tests, or a file path (relative or absolute)"); parser.add_argument("-C", "--case", required=False, default="all", help='Which testcase to run in that module.' 'Options are: A real test name, "all", "randomize",' 'or a file name containing a list of test names (default "all")') parser.add_argument("--shuffle", required=False, action='store_true', help='Shuffle test order. ' 'The order will be dependent on the random seed. ' 'If <case> was set to all, this is equivalent to setting <case> to "randomize". ' 'If <case> was a file name, this will shuffle the lines in the file. ' 'If <case> was 1 particular testcase, this option has no effect.') parser.add_argument("-S","--stop_on_failure", required=False, action='store_true', help="Stop as soon as any test fails, instead of " "continuing with the remaining tests") parser.add_argument("--seed", required=False, default=0x1234, help='Random generator seed (0x1234 by default)') parser.add_argument("--store_btsnoop", required=False, default=False, help="Store btsnoop to the file") parser.add_argument("--btmon_socket_path", required=False, default="/tmp/btmon-sock", help="path to the unix socket used by btmon") return parser.parse_known_args() def try_to_import(module_path, type, def_path): try: if (("." not in module_path) and ("/" not in module_path)): from importlib import import_module; loaded_module = import_module(def_path + module_path) else: #The user seems to want to load the module from a place off-tree #If the user forgot Let's fill in the extension if module_path[-3:] != ".py": module_path = module_path + ".py" import imp; loaded_module = imp.load_source('%s', module_path); return loaded_module; except ImportError as e: print(("\n Could not load the %s %s . Does it exist?\n"% (type, module_path))) raise; # Initialize the transport and connect to devices def init_transport(transport, xtra_args, trace): transport_module = try_to_import(transport, "transport", "components.edttt_"); transport = transport_module.EDTTT(xtra_args, trace); transport.connect(); return transport; def run_one_test(args, xtra_args, transport, trace, test_mod, test_spec, nameLen, packets): trace.trace(4, test_spec); trace.trace(4, ""); if test_spec.number_devices > transport.n_devices: raise Exception("This test needs %i connected devices but you only connected to %i" % (test_spec.number_devices, transport.n_devices)); trace.btsnoop.send_user_data(0, BtsnoopPriority.INFO, test_spec.name) result = test_mod.run_a_test(args, transport, trace, test_spec, packets); trace.trace(2, "%-*s %s %s" % (nameLen, test_spec.name, test_spec.description[1:], ("PASSED" if result == 0 else "FAILED"))); trace.btsnoop.send_user_data(0, BtsnoopPriority.INFO, "%-*s %s %s" % (nameLen, test_spec.name, test_spec.description[1:], ("PASSED" if result == 0 else "FAILED"))) return result; # Attempt to load and run the tests def run_tests(args, xtra_args, transport, trace, dumps): passed = 0; total = 0; unknown = 0; test_mod = try_to_import(args.test, "test", "tests."); test_specs = test_mod.get_tests_specs(); nameLen = max([ len(test_specs[key].name) for key in test_specs ]); t = args.case if t.lower() == "all" or t.lower() == "randomize": tests_list = list(test_specs.items()); if t.lower() == "randomize" or args.shuffle: random.shuffle(tests_list) for _,test_spec in tests_list: result = run_one_test(args, xtra_args, transport, trace, test_mod, test_spec, nameLen, Packets(dumps)) passed += 1 if result == 0 else 0; total += 1; if result != 0 and args.stop_on_failure: break; elif t in test_specs: result = run_one_test(args, xtra_args, transport, trace, test_mod, test_specs[t], nameLen, Packets(dumps)) passed += 1 if result == 0 else 0; total += 1; elif os.path.isfile(t): file = open(t, "r"); lines = file.readlines(); if args.shuffle: random.shuffle(lines); for line in lines: t = line.split("#",1)[0]; #remove comments t = t.strip().upper(); if not t: #Skip empty lines, or those which had only comments continue if t in test_specs: result = run_one_test(args, xtra_args, transport, trace, test_mod, test_specs[t], nameLen, Packets(dumps)) passed += 1 if result == 0 else 0; total += 1; if result != 0 and args.stop_on_failure: break; else: unknown += 1; print(("unknown test " + t + ". Skipping")) file.close(); else: trace.trace(1, "Test '%s' not found!" % t); total += 1; failed = total - passed; if total: trace.trace(2, "\nSummary:\n\nStatus Count\n%s" % ('='*14)); if passed > 0: trace.trace(2, "PASS%10d" % passed); trace.btsnoop.send_user_data(0, BtsnoopPriority.INFO, "PASS%10d" % passed) if failed > 0: trace.trace(2, "FAIL%10d" % failed); trace.btsnoop.send_user_data(0, BtsnoopPriority.INFO, "FAIL%10d" % failed) if unknown > 0: trace.trace(2, "UNKNOWN%7d" % unknown); trace.trace(2, "%s\nTotal%9d" % ('='*14, total)); return failed + unknown class Trace(): def __init__(self, level): self.level = level; self.transport = None self.btsnoop = None def trace(self, level, msg): if ( level <= self.level ): if self.transport: from datetime import timedelta td = timedelta(microseconds=self.transport.get_last_t()) mm, ss = divmod(td.seconds, 60) hh, mm = divmod(mm, 60) ts = "%02d:%02d:%02d.%06d" % (hh, mm, ss, td.microseconds) else: ts = '--:--:--.------' for line in str(msg).split('\n'): print('edtt: @{} {}'.format(ts, line), flush=True); def main(): transport = None; try: (args, xtra_args) = parse_arguments(); random.seed(int(args.seed)); trace = Trace(args.verbose); transport = init_transport(args.transport, xtra_args, trace); trace.transport = transport; trace.btsnoop = Btsnoop(args.store_btsnoop, args.btmon_socket_path) address = 0x000000000000 trace.btsnoop.send_index_added(0, toArray(address, 6), "UpperTester") trace.btsnoop.send_index_added(1, toArray(address, 6), "LowerTester") device_dumps = SortedDumps() device_dumps.add_rx(0, os.path.join(os.environ['BSIM_OUT_PATH'], 'results', transport.sim_id, 'd_2G4_00.Rx.csv')) device_dumps.add_tx(0, os.path.join(os.environ['BSIM_OUT_PATH'], 'results', transport.sim_id, 'd_2G4_00.Tx.csv')) device_dumps.add_rx(1, os.path.join(os.environ['BSIM_OUT_PATH'], 'results', transport.sim_id, 'd_2G4_01.Rx.csv')) device_dumps.add_tx(1, os.path.join(os.environ['BSIM_OUT_PATH'], 'results', transport.sim_id, 'd_2G4_01.Tx.csv')) trace.btsnoop.send_user_data(0, BtsnoopPriority.ALERT, "Testing session started") result = run_tests(args, xtra_args, transport, trace, device_dumps) trace.btsnoop.send_user_data(0, BtsnoopPriority.INFO, "Testing session completed ") trace.btsnoop.close() transport.close(); from sys import exit; exit(result); except: if transport: transport.close(); raise; if __name__ == "__main__": main();