1#! /usr/bin/env python3
2# Copyright 2019 Oticon A/S
3# SPDX-License-Identifier: Apache-2.0
4
5import os;
6from numpy import random;
7
8def parse_arguments():
9    import argparse
10    parser = argparse.ArgumentParser(description="EDTT (Embedded Device Test Tool)",
11                                     epilog="Note: A transport can have its own options")
12
13    parser.add_argument("-v", "--verbose", default=2, type=int, help="Verbosity level");
14
15    parser.add_argument("-t", "--transport", required=True,
16                        help="Type of transport to connect to the devices (the "
17                             "transport module must either exist as "
18                             "src/components/edtt_<transport>.py, or be a path "
19                             "to an importable transport module");
20
21    parser.add_argument("-T", "--test", required=True,
22                        help="Which test module to run. This can either be a module in "
23                        " src/tests, or a file path (relative or absolute)");
24
25    parser.add_argument("-C", "--case", required=False,
26                        default="all",
27                        help='Which testcase to run in that module.'
28                        'Options are: A real test name, "all", "randomize",'
29                        'or a file name containing a list of test names (default "all")')
30
31    parser.add_argument("--shuffle", required=False,
32                        action='store_true',
33                        help='Shuffle test order. '
34                        'The order will be dependent on the random seed. '
35                        'If <case> was set to all, this is equivalent to setting <case> to "randomize". '
36                        'If <case> was a file name, this will shuffle the lines in the file. '
37                        'If <case> was 1 particular testcase, this option has no effect.')
38
39    parser.add_argument("-S","--stop_on_failure", required=False,
40                        action='store_true',
41                        help="Stop as soon as any test fails, instead of "
42                        "continuing with the remaining tests")
43
44    parser.add_argument("--seed", required=False, default=0x1234, help='Random generator seed (0x1234 by default)')
45
46    return parser.parse_known_args()
47
48def try_to_import(module_path, type, def_path):
49    try:
50        if (("." not in module_path) and ("/" not in module_path)):
51            from importlib import import_module;
52            loaded_module = import_module(def_path + module_path)
53        else: #The user seems to want to load the module from a place off-tree
54            #If the user forgot Let's fill in the extension
55            if module_path[-3:] != ".py":
56                module_path = module_path + ".py"
57            import imp;
58            loaded_module = imp.load_source('%s', module_path);
59
60        return loaded_module;
61    except ImportError as e:
62        print(("\n Could not load the %s %s . Does it exist?\n"% (type, module_path)))
63        raise;
64
65# Initialize the transport and connect to devices
66def init_transport(transport, xtra_args, trace):
67    transport_module = try_to_import(transport, "transport", "components.edttt_");
68    transport = transport_module.EDTTT(xtra_args, trace);
69    transport.connect();
70    return transport;
71
72def run_one_test(args, xtra_args, transport, trace, test_mod, test_spec, nameLen):
73    trace.trace(4, test_spec);
74    trace.trace(4, "");
75    if test_spec.number_devices > transport.n_devices:
76        raise Exception("This test needs %i connected devices but you only connected to %i" %
77                        (test_spec.number_devices, transport.n_devices));
78
79    result = test_mod.run_a_test(args, transport, trace, test_spec);
80    trace.trace(2, "%-*s %s %s" % (nameLen, test_spec.name, test_spec.description[1:], ("PASSED" if result == 0 else "FAILED")));
81
82    return result;
83
84# Attempt to load and run the tests
85def run_tests(args, xtra_args, transport, trace):
86    passed = 0;
87    total = 0;
88
89    test_mod = try_to_import(args.test, "test", "tests.");
90    test_specs = test_mod.get_tests_specs();
91    nameLen = max([ len(test_specs[key].name) for key in test_specs ]);
92
93    t = args.case
94
95    if t.lower() == "all" or t.lower() == "randomize":
96        tests_list = list(test_specs.items());
97        if t.lower() == "randomize" or args.shuffle:
98            random.shuffle(tests_list)
99
100        for _,test_spec in tests_list:
101            result = run_one_test(args, xtra_args, transport, trace, test_mod, test_spec, nameLen);
102            passed += 1 if result == 0 else 0;
103            total += 1;
104            if result != 0 and args.stop_on_failure:
105                break;
106
107    elif t in test_specs:
108        result = run_one_test(args, xtra_args, transport, trace, test_mod, test_specs[t], nameLen);
109        passed += 1 if result == 0 else 0;
110        total += 1;
111
112    elif os.path.isfile(t):
113        file = open(t, "r");
114        lines = file.readlines();
115
116        if args.shuffle:
117            random.shuffle(lines);
118
119        for line in lines:
120            t = line.split("#",1)[0]; #remove comments
121            t = t.strip().upper();
122            if not t: #Skip empty lines, or those which had only comments
123                continue
124            if t in test_specs:
125                result = run_one_test(args, xtra_args, transport, trace, test_mod, test_specs[t], nameLen);
126                passed += 1 if result == 0 else 0;
127                total += 1;
128                if result != 0 and args.stop_on_failure:
129                    break;
130            else:
131                print(("unkown test " + t + ". Skipping"))
132        file.close();
133
134    else:
135        trace.trace(1, "Test '%s' not found!" % t);
136        total += 1;
137
138    failed = total - passed;
139    if total:
140        trace.trace(2, "\nSummary:\n\nStatus   Count\n%s" % ('='*14));
141        if passed > 0:
142            trace.trace(2, "PASS%10d" % passed);
143
144        if failed > 0:
145            trace.trace(2, "FAIL%10d" % failed);
146        trace.trace(2, "%s\nTotal%9d" % ('='*14, total));
147
148    return failed
149
150class Trace():
151    def __init__(self, level):
152        self.level = level;
153        self.transport = None
154
155    def trace(self, level, msg):
156        if ( level <= self.level ):
157            if self.transport:
158                from datetime import timedelta
159                td = timedelta(microseconds=self.transport.get_last_t())
160                mm, ss = divmod(td.seconds, 60)
161                hh, mm = divmod(mm, 60)
162                ts = "%02d:%02d:%02d.%06d" % (hh, mm, ss, td.microseconds)
163            else:
164                ts = '--:--:--.------'
165            for line in str(msg).split('\n'):
166                print('edtt: @{}  {}'.format(ts, line), flush=True);
167
168def main():
169    transport = None;
170    try:
171        (args, xtra_args) = parse_arguments();
172
173        random.seed(args.seed);
174
175        trace = Trace(args.verbose);
176
177        transport = init_transport(args.transport, xtra_args, trace);
178        trace.transport = transport;
179
180        result = run_tests(args, xtra_args, transport, trace);
181
182        transport.close();
183
184        from sys import exit;
185        exit(result);
186
187    except:
188        if transport:
189            transport.close();
190        raise;
191
192if __name__ == "__main__":
193    main();
194