1# Need Python 3 string formatting functions 2from __future__ import print_function 3 4from threading import Thread 5 6import ttfw_idf 7 8# Define tuple of strings to expect for each DUT. 9master_expect = ('TWAI Master: Driver installed', 'TWAI Master: Driver uninstalled') 10slave_expect = ('TWAI Slave: Driver installed', 'TWAI Slave: Driver uninstalled') 11listen_only_expect = ('TWAI Listen Only: Driver installed', 'TWAI Listen Only: Driver uninstalled') 12 13 14def dut_thread_callback(**kwargs): 15 # Parse keyword arguments 16 dut = kwargs['dut'] # Get DUT from kwargs 17 expected = kwargs['expected'] 18 result = kwargs['result'] # Get result[out] from kwargs. MUST be of mutable type e.g. list 19 20 # Must reset again as flashing during start_app will reset multiple times, causing unexpected results 21 dut.reset() 22 23 for string in expected: 24 dut.expect(string, 20) 25 26 # Mark thread has run to completion without any exceptions 27 result[0] = True 28 29 30@ttfw_idf.idf_example_test(env_tag='Example_TWAI2', ignore=True) 31def test_twai_network_example(env, extra_data): 32 33 # Get device under test. "dut1", "dut2", and "dut3" must be properly defined in EnvConfig 34 dut_master = env.get_dut('dut1', 'examples/peripherals/twai/twai_network/twai_network_master', 35 dut_class=ttfw_idf.ESP32DUT) 36 dut_slave = env.get_dut('dut2', 'examples/peripherals/twai/twai_network/twai_network_slave', 37 dut_class=ttfw_idf.ESP32DUT) 38 dut_listen_only = env.get_dut('dut3', 'examples/peripherals/twai/twai_network/twai_network_listen_only', 39 dut_class=ttfw_idf.ESP32DUT) 40 41 # Flash app onto each DUT, each DUT is reset again at the start of each thread 42 dut_master.start_app() 43 dut_slave.start_app() 44 dut_listen_only.start_app() 45 46 # Create dict of keyword arguments for each dut 47 results = [[False], [False], [False]] 48 master_kwargs = {'dut': dut_master, 'result': results[0], 'expected': master_expect} 49 slave_kwargs = {'dut': dut_slave, 'result': results[1], 'expected': slave_expect} 50 listen_only_kwargs = {'dut': dut_listen_only, 'result': results[2], 'expected': listen_only_expect} 51 52 # Create thread for each dut 53 dut_master_thread = Thread(target=dut_thread_callback, name='Master Thread', kwargs=master_kwargs) 54 dut_slave_thread = Thread(target=dut_thread_callback, name='Slave Thread', kwargs=slave_kwargs) 55 dut_listen_only_thread = Thread(target=dut_thread_callback, name='Listen Only Thread', kwargs=listen_only_kwargs) 56 57 # Start each thread 58 dut_listen_only_thread.start() 59 dut_master_thread.start() 60 dut_slave_thread.start() 61 62 # Wait for threads to complete 63 dut_listen_only_thread.join() 64 dut_master_thread.join() 65 dut_slave_thread.join() 66 67 # check each thread ran to completion 68 for result in results: 69 if result[0] is not True: 70 raise Exception('One or more threads did not run successfully') 71 72 73if __name__ == '__main__': 74 test_twai_network_example() 75