1#! /usr/bin/env python3
2# Copyright 2019 Oticon A/S
3# SPDX-License-Identifier: Apache-2.0
4
5import os;
6from components.btsnoop import Btsnoop, BtsnoopPriority
7from numpy import random;
8from components.dump import SortedDumps, Packets
9from components.utils import toArray
10
11def parse_arguments():
12    import argparse
13    parser = argparse.ArgumentParser(description="EDTT (Embedded Device Test Tool)",
14                                     epilog="Note: A transport can have its own options")
15
16    parser.add_argument("-v", "--verbose", default=2, type=int, help="Verbosity level");
17
18    parser.add_argument("-t", "--transport", required=True,
19                        help="Type of transport to connect to the devices (the "
20                             "transport module must either exist as "
21                             "src/components/edtt_<transport>.py, or be a path "
22                             "to an importable transport module");
23
24    parser.add_argument("-T", "--test", required=True,
25                        help="Which test module to run. This can either be a module in "
26                        " src/tests, or a file path (relative or absolute)");
27
28    parser.add_argument("-C", "--case", required=False,
29                        default="all",
30                        help='Which testcase to run in that module.'
31                        'Options are: A real test name, "all", "randomize",'
32                        'or a file name containing a list of test names (default "all")')
33
34    parser.add_argument("--shuffle", required=False,
35                        action='store_true',
36                        help='Shuffle test order. '
37                        'The order will be dependent on the random seed. '
38                        'If <case> was set to all, this is equivalent to setting <case> to "randomize". '
39                        'If <case> was a file name, this will shuffle the lines in the file. '
40                        'If <case> was 1 particular testcase, this option has no effect.')
41
42    parser.add_argument("-S","--stop_on_failure", required=False,
43                        action='store_true',
44                        help="Stop as soon as any test fails, instead of "
45                        "continuing with the remaining tests")
46
47    parser.add_argument("--seed", required=False, default=0x1234, help='Random generator seed (0x1234 by default)')
48
49    parser.add_argument("--store_btsnoop", required=False, default=False, help="Store btsnoop to the file")
50
51    parser.add_argument("--btmon_socket_path", required=False, default="/tmp/btmon-sock", help="path to the unix socket used by btmon")
52
53    return parser.parse_known_args()
54
55def try_to_import(module_path, type, def_path):
56    try:
57        if (("." not in module_path) and ("/" not in module_path)):
58            from importlib import import_module;
59            loaded_module = import_module(def_path + module_path)
60        else: #The user seems to want to load the module from a place off-tree
61            #If the user forgot Let's fill in the extension
62            if module_path[-3:] != ".py":
63                module_path = module_path + ".py"
64            import imp;
65            loaded_module = imp.load_source('%s', module_path);
66
67        return loaded_module;
68    except ImportError as e:
69        print(("\n Could not load the %s %s . Does it exist?\n"% (type, module_path)))
70        raise;
71
72# Initialize the transport and connect to devices
73def init_transport(transport, xtra_args, trace):
74    transport_module = try_to_import(transport, "transport", "components.edttt_");
75    transport = transport_module.EDTTT(xtra_args, trace);
76    transport.connect();
77    return transport;
78
79def run_one_test(args, xtra_args, transport, trace, test_mod, test_spec, nameLen, packets):
80    trace.trace(4, test_spec);
81    trace.trace(4, "");
82    if test_spec.number_devices > transport.n_devices:
83        raise Exception("This test needs %i connected devices but you only connected to %i" %
84                        (test_spec.number_devices, transport.n_devices));
85
86    trace.btsnoop.send_user_data(0, BtsnoopPriority.INFO, test_spec.name)
87    result = test_mod.run_a_test(args, transport, trace, test_spec, packets);
88    trace.trace(2, "%-*s %s %s" % (nameLen, test_spec.name, test_spec.description[1:], ("PASSED" if result == 0 else "FAILED")));
89    trace.btsnoop.send_user_data(0, BtsnoopPriority.INFO, "%-*s %s %s" % (nameLen, test_spec.name, test_spec.description[1:], ("PASSED" if result == 0 else "FAILED")))
90
91    return result;
92
93# Attempt to load and run the tests
94def run_tests(args, xtra_args, transport, trace, dumps):
95    passed = 0;
96    total = 0;
97    unknown = 0;
98
99    test_mod = try_to_import(args.test, "test", "tests.");
100    test_specs = test_mod.get_tests_specs();
101    nameLen = max([ len(test_specs[key].name) for key in test_specs ]);
102
103    t = args.case
104
105    if t.lower() == "all" or t.lower() == "randomize":
106        tests_list = list(test_specs.items());
107        if t.lower() == "randomize" or args.shuffle:
108            random.shuffle(tests_list)
109
110        for _,test_spec in tests_list:
111            result = run_one_test(args, xtra_args, transport, trace, test_mod, test_spec, nameLen, Packets(dumps))
112            passed += 1 if result == 0 else 0;
113            total += 1;
114            if result != 0 and args.stop_on_failure:
115                break;
116
117    elif t in test_specs:
118        result = run_one_test(args, xtra_args, transport, trace, test_mod, test_specs[t], nameLen, Packets(dumps))
119        passed += 1 if result == 0 else 0;
120        total += 1;
121
122    elif os.path.isfile(t):
123        file = open(t, "r");
124        lines = file.readlines();
125
126        if args.shuffle:
127            random.shuffle(lines);
128
129        for line in lines:
130            t = line.split("#",1)[0]; #remove comments
131            t = t.strip().upper();
132            if not t: #Skip empty lines, or those which had only comments
133                continue
134            if t in test_specs:
135                result = run_one_test(args, xtra_args, transport, trace, test_mod, test_specs[t], nameLen, Packets(dumps))
136                passed += 1 if result == 0 else 0;
137                total += 1;
138                if result != 0 and args.stop_on_failure:
139                    break;
140            else:
141                unknown += 1;
142                print(("unknown test " + t + ". Skipping"))
143        file.close();
144
145    else:
146        trace.trace(1, "Test '%s' not found!" % t);
147        total += 1;
148
149    failed = total - passed;
150    if total:
151        trace.trace(2, "\nSummary:\n\nStatus   Count\n%s" % ('='*14));
152        if passed > 0:
153            trace.trace(2, "PASS%10d" % passed);
154            trace.btsnoop.send_user_data(0, BtsnoopPriority.INFO, "PASS%10d" % passed)
155
156        if failed > 0:
157            trace.trace(2, "FAIL%10d" % failed);
158            trace.btsnoop.send_user_data(0, BtsnoopPriority.INFO, "FAIL%10d" % failed)
159
160        if unknown > 0:
161            trace.trace(2, "UNKNOWN%7d" % unknown);
162
163        trace.trace(2, "%s\nTotal%9d" % ('='*14, total));
164
165    return failed + unknown
166
167class Trace():
168    def __init__(self, level):
169        self.level = level;
170        self.transport = None
171        self.btsnoop = None
172
173    def trace(self, level, msg):
174        if ( level <= self.level ):
175            if self.transport:
176                from datetime import timedelta
177                td = timedelta(microseconds=self.transport.get_last_t())
178                mm, ss = divmod(td.seconds, 60)
179                hh, mm = divmod(mm, 60)
180                ts = "%02d:%02d:%02d.%06d" % (hh, mm, ss, td.microseconds)
181            else:
182                ts = '--:--:--.------'
183            for line in str(msg).split('\n'):
184                print('edtt: @{}  {}'.format(ts, line), flush=True);
185
186def main():
187    transport = None;
188    try:
189        (args, xtra_args) = parse_arguments();
190
191        random.seed(int(args.seed));
192
193        trace = Trace(args.verbose);
194
195        transport = init_transport(args.transport, xtra_args, trace);
196        trace.transport = transport;
197        trace.btsnoop = Btsnoop(args.store_btsnoop, args.btmon_socket_path)
198        address = 0x000000000000
199        trace.btsnoop.send_index_added(0, toArray(address, 6), "UpperTester")
200        trace.btsnoop.send_index_added(1, toArray(address, 6), "LowerTester")
201
202        device_dumps = SortedDumps()
203        device_dumps.add_rx(0, os.path.join(os.environ['BSIM_OUT_PATH'], 'results', transport.sim_id, 'd_2G4_00.Rx.csv'))
204        device_dumps.add_tx(0, os.path.join(os.environ['BSIM_OUT_PATH'], 'results', transport.sim_id, 'd_2G4_00.Tx.csv'))
205        device_dumps.add_rx(1, os.path.join(os.environ['BSIM_OUT_PATH'], 'results', transport.sim_id, 'd_2G4_01.Rx.csv'))
206        device_dumps.add_tx(1, os.path.join(os.environ['BSIM_OUT_PATH'], 'results', transport.sim_id, 'd_2G4_01.Tx.csv'))
207
208        trace.btsnoop.send_user_data(0, BtsnoopPriority.ALERT, "Testing session started")
209        result = run_tests(args, xtra_args, transport, trace, device_dumps)
210        trace.btsnoop.send_user_data(0, BtsnoopPriority.INFO, "Testing session completed ")
211        trace.btsnoop.close()
212        transport.close();
213
214        from sys import exit;
215        exit(result);
216
217    except:
218        if transport:
219            transport.close();
220        raise;
221
222if __name__ == "__main__":
223    main();
224