1# SPDX-FileCopyrightText: 2016-2022 Espressif Systems (Shanghai) CO LTD 2# SPDX-License-Identifier: Apache-2.0 3 4import logging 5import os 6import re 7from threading import Thread 8 9import ttfw_idf 10from tiny_test_fw import DUT 11 12LOG_LEVEL = logging.DEBUG 13LOGGER_NAME = 'modbus_test' 14 15# Allowed options for the test 16TEST_READ_MAX_ERR_COUNT = 3 # Maximum allowed read errors during initialization 17TEST_THREAD_JOIN_TIMEOUT = 60 # Test theread join timeout in seconds 18TEST_EXPECT_STR_TIMEOUT = 30 # Test expect timeout in seconds 19TEST_MASTER_TCP = 'mb_tcp_master' 20TEST_SLAVE_TCP = 'mb_tcp_slave' 21 22STACK_DEFAULT = 0 23STACK_IPV4 = 1 24STACK_IPV6 = 2 25STACK_INIT = 3 26STACK_CONNECT = 4 27STACK_START = 5 28STACK_PAR_OK = 6 29STACK_PAR_FAIL = 7 30STACK_DESTROY = 8 31 32pattern_dict_slave = {STACK_IPV4: (r'.*I \([0-9]+\) example_connect: - IPv4 address: ([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}).*'), 33 STACK_IPV6: (r'.*I \([0-9]+\) example_connect: - IPv6 address: (([A-Fa-f0-9]{1,4}::?){1,7}[A-Fa-f0-9]{1,4}).*'), 34 STACK_INIT: (r'.*I \(([0-9]+)\) MB_TCP_SLAVE_PORT: (Protocol stack initialized).'), 35 STACK_CONNECT: (r'.*I\s\(([0-9]+)\) MB_TCP_SLAVE_PORT: Socket \(#[0-9]+\), accept client connection from address: ' 36 r'([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}).*'), 37 STACK_START: (r'.*I\s\(([0-9]+)\) SLAVE_TEST: (Start modbus test).*'), 38 STACK_PAR_OK: (r'.*I\s\(([0-9]+)\) SLAVE_TEST: ([A-Z]+ [A-Z]+) \([a-zA-Z0-9_]+ us\),\s' 39 r'ADDR:([0-9]+), TYPE:[0-9]+, INST_ADDR:0x[a-zA-Z0-9]+, SIZE:[0-9]+'), 40 STACK_PAR_FAIL: (r'.*E \(([0-9]+)\) SLAVE_TEST: Response time exceeds configured [0-9]+ [ms], ignore packet.*'), 41 STACK_DESTROY: (r'.*I\s\(([0-9]+)\) SLAVE_TEST: (Modbus controller destroyed).')} 42 43pattern_dict_master = {STACK_IPV4: (r'.*I \([0-9]+\) example_connect: - IPv4 address: ([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}).*'), 44 STACK_IPV6: (r'.*I \([0-9]+\) example_connect: - IPv6 address: (([A-Fa-f0-9]{1,4}::?){1,7}[A-Fa-f0-9]{1,4}).*'), 45 STACK_INIT: (r'.*I \(([0-9]+)\) MASTER_TEST: (Modbus master stack initialized).*'), 46 STACK_CONNECT: (r'.*.*I\s\(([0-9]+)\) MB_TCP_MASTER_PORT: (Connected [0-9]+ slaves), start polling.*'), 47 STACK_START: (r'.*I \(([0-9]+)\) MASTER_TEST: (Start modbus test).*'), 48 STACK_PAR_OK: (r'.*I\s\(([0-9]+)\) MASTER_TEST: Characteristic #[0-9]+ ([a-zA-Z0-9_]+)' 49 r'\s\([a-zA-Z\%\/]+\) value = [a-zA-Z0-9\.]+ \(0x[a-zA-Z0-9]+\) read successful.*'), 50 STACK_PAR_FAIL: (r'.*E \(([0-9]+)\) MASTER_TEST: Characteristic #[0-9]+\s\(([a-zA-Z0-9_]+)\)\s' 51 r'read fail, err = [0-9]+ \([_A-Z]+\).*'), 52 STACK_DESTROY: (r'.*I\s\(([0-9]+)\) MASTER_TEST: (Destroy master).*')} 53 54logger = logging.getLogger(LOGGER_NAME) 55 56 57class DutTestThread(Thread): 58 """ Test thread class 59 """ 60 def __init__(self, dut=None, name=None, ip_addr=None, expect=None): 61 """ Initialize the thread parameters 62 """ 63 self.tname = name 64 self.dut = dut 65 self.expected = expect 66 self.data = None 67 self.ip_addr = ip_addr 68 self.test_finish = False 69 self.param_fail_count = 0 70 self.param_ok_count = 0 71 self.test_stage = STACK_DEFAULT 72 super(DutTestThread, self).__init__() 73 74 def __enter__(self): 75 logger.debug('Restart %s.', self.tname) 76 # Reset DUT first 77 self.dut.reset() 78 # Capture output from the DUT 79 self.dut.start_capture_raw_data(capture_id=self.dut.name) 80 return self 81 82 def __exit__(self, exc_type, exc_value, traceback): 83 """ The exit method of context manager 84 """ 85 if exc_type is not None or exc_value is not None: 86 logger.info('Thread %s rised an exception type: %s, value: %s', self.tname, str(exc_type), str(exc_value)) 87 88 def run(self): 89 """ The function implements thread functionality 90 """ 91 # Initialize slave IP for master board 92 if (self.ip_addr is not None): 93 self.set_ip(0) 94 95 # Check expected strings in the listing 96 self.test_start(TEST_EXPECT_STR_TIMEOUT) 97 98 # Check DUT exceptions 99 dut_exceptions = self.dut.get_exceptions() 100 if 'Guru Meditation Error:' in dut_exceptions: 101 raise RuntimeError('%s generated an exception(s): %s\n' % (str(self.dut), dut_exceptions)) 102 103 # Mark thread has run to completion without any exceptions 104 self.data = self.dut.stop_capture_raw_data(capture_id=self.dut.name) 105 106 def set_ip(self, index=0): 107 """ The method to send slave IP to master application 108 """ 109 message = r'.*Waiting IP([0-9]{1,2}) from stdin.*' 110 # Read all data from previous restart to get prompt correctly 111 self.dut.read() 112 result = self.dut.expect(re.compile(message), timeout=TEST_EXPECT_STR_TIMEOUT) 113 if int(result[0]) != index: 114 raise RuntimeError('Incorrect index of IP=%s for %s\n' % (result[0], str(self.dut))) 115 # Use the same slave IP address for all characteristics during the test 116 self.dut.write('IP0=' + self.ip_addr, '\n', False) 117 self.dut.write('IP1=' + self.ip_addr, '\n', False) 118 self.dut.write('IP2=' + self.ip_addr, '\n', False) 119 logger.debug('Set IP address=%s for %s', self.ip_addr, self.tname) 120 message = r'.*IP\([0-9]+\) = \[([0-9a-zA-Z\.\:]+)\] set from stdin.*' 121 result = self.dut.expect(re.compile(message), timeout=TEST_EXPECT_STR_TIMEOUT) 122 logger.debug('Thread %s initialized with slave IP=%s.', self.tname, self.ip_addr) 123 124 def test_start(self, timeout_value): 125 """ The method to initialize and handle test stages 126 """ 127 def handle_get_ip4(data): 128 """ Handle get_ip v4 129 """ 130 logger.debug('%s[STACK_IPV4]: %s', self.tname, str(data)) 131 self.test_stage = STACK_IPV4 132 133 def handle_get_ip6(data): 134 """ Handle get_ip v6 135 """ 136 logger.debug('%s[STACK_IPV6]: %s', self.tname, str(data)) 137 self.test_stage = STACK_IPV6 138 139 def handle_init(data): 140 """ Handle init 141 """ 142 logger.debug('%s[STACK_INIT]: %s', self.tname, str(data)) 143 self.test_stage = STACK_INIT 144 145 def handle_connect(data): 146 """ Handle connect 147 """ 148 logger.debug('%s[STACK_CONNECT]: %s', self.tname, str(data)) 149 self.test_stage = STACK_CONNECT 150 151 def handle_test_start(data): 152 """ Handle connect 153 """ 154 logger.debug('%s[STACK_START]: %s', self.tname, str(data)) 155 self.test_stage = STACK_START 156 157 def handle_par_ok(data): 158 """ Handle parameter ok 159 """ 160 logger.debug('%s[READ_PAR_OK]: %s', self.tname, str(data)) 161 if self.test_stage >= STACK_START: 162 self.param_ok_count += 1 163 self.test_stage = STACK_PAR_OK 164 165 def handle_par_fail(data): 166 """ Handle parameter fail 167 """ 168 logger.debug('%s[READ_PAR_FAIL]: %s', self.tname, str(data)) 169 self.param_fail_count += 1 170 self.test_stage = STACK_PAR_FAIL 171 172 def handle_destroy(data): 173 """ Handle destroy 174 """ 175 logger.debug('%s[DESTROY]: %s', self.tname, str(data)) 176 self.test_stage = STACK_DESTROY 177 self.test_finish = True 178 179 while not self.test_finish: 180 try: 181 self.dut.expect_any((re.compile(self.expected[STACK_IPV4]), handle_get_ip4), 182 (re.compile(self.expected[STACK_IPV6]), handle_get_ip6), 183 (re.compile(self.expected[STACK_INIT]), handle_init), 184 (re.compile(self.expected[STACK_CONNECT]), handle_connect), 185 (re.compile(self.expected[STACK_START]), handle_test_start), 186 (re.compile(self.expected[STACK_PAR_OK]), handle_par_ok), 187 (re.compile(self.expected[STACK_PAR_FAIL]), handle_par_fail), 188 (re.compile(self.expected[STACK_DESTROY]), handle_destroy), 189 timeout=timeout_value) 190 except DUT.ExpectTimeout: 191 logger.debug('%s, expect timeout on stage #%d (%s seconds)', self.tname, self.test_stage, timeout_value) 192 self.test_finish = True 193 194 195def test_check_mode(dut=None, mode_str=None, value=None): 196 """ Check communication mode for dut 197 """ 198 global logger 199 try: 200 opt = dut.app.get_sdkconfig()[mode_str] 201 logger.debug('%s {%s} = %s.\n', str(dut), mode_str, opt) 202 return value == opt 203 except Exception: 204 logger.error('ENV_TEST_FAILURE: %s: Cannot find option %s in sdkconfig.', str(dut), mode_str) 205 return False 206 207 208@ttfw_idf.idf_example_test(env_tag='Example_Modbus_TCP', target=['esp32']) 209def test_modbus_communication(env, comm_mode): 210 global logger 211 212 rel_project_path = os.path.join('examples', 'protocols', 'modbus', 'tcp') 213 # Get device under test. Both duts must be able to be connected to WiFi router 214 dut_master = env.get_dut('modbus_tcp_master', os.path.join(rel_project_path, TEST_MASTER_TCP)) 215 dut_slave = env.get_dut('modbus_tcp_slave', os.path.join(rel_project_path, TEST_SLAVE_TCP)) 216 log_file = os.path.join(env.log_path, 'modbus_tcp_test.log') 217 print('Logging file name: %s' % log_file) 218 219 try: 220 # create file handler which logs even debug messages 221 logger.setLevel(logging.DEBUG) 222 fh = logging.FileHandler(log_file) 223 fh.setLevel(logging.DEBUG) 224 # set format of output for both handlers 225 formatter = logging.Formatter('%(levelname)s:%(message)s') 226 fh.setFormatter(formatter) 227 logger.addHandler(fh) 228 # create console handler 229 ch = logging.StreamHandler() 230 ch.setLevel(logging.INFO) 231 # set format of output for both handlers 232 formatter = logging.Formatter('%(levelname)s:%(message)s') 233 ch.setFormatter(formatter) 234 logger.addHandler(ch) 235 236 # Check Kconfig configuration options for each built example 237 if (test_check_mode(dut_master, 'CONFIG_FMB_COMM_MODE_TCP_EN', 'y') and 238 test_check_mode(dut_slave, 'CONFIG_FMB_COMM_MODE_TCP_EN', 'y')): 239 slave_name = TEST_SLAVE_TCP 240 master_name = TEST_MASTER_TCP 241 else: 242 logger.error('ENV_TEST_FAILURE: IP resolver mode do not match in the master and slave implementation.\n') 243 raise RuntimeError('ENV_TEST_FAILURE: IP resolver mode do not match in the master and slave implementation.\n') 244 address = None 245 if test_check_mode(dut_master, 'CONFIG_MB_SLAVE_IP_FROM_STDIN', 'y'): 246 logger.info('ENV_TEST_INFO: Set slave IP address through STDIN.\n') 247 # Flash app onto DUT (Todo: Debug case when the slave flashed before master then expect does not work correctly for no reason 248 dut_slave.start_app() 249 dut_master.start_app() 250 if test_check_mode(dut_master, 'CONFIG_EXAMPLE_CONNECT_IPV6', 'y'): 251 address = dut_slave.expect(re.compile(pattern_dict_slave[STACK_IPV6]), TEST_EXPECT_STR_TIMEOUT) 252 else: 253 address = dut_slave.expect(re.compile(pattern_dict_slave[STACK_IPV4]), TEST_EXPECT_STR_TIMEOUT) 254 if address is not None: 255 print('Found IP slave address: %s' % address[0]) 256 else: 257 raise RuntimeError('ENV_TEST_FAILURE: Slave IP address is not found in the output. Check network settings.\n') 258 else: 259 raise RuntimeError('ENV_TEST_FAILURE: Slave IP resolver is not configured correctly.\n') 260 261 # Create thread for each dut 262 with DutTestThread(dut=dut_master, name=master_name, ip_addr=address[0], expect=pattern_dict_master) as dut_master_thread: 263 with DutTestThread(dut=dut_slave, name=slave_name, ip_addr=None, expect=pattern_dict_slave) as dut_slave_thread: 264 265 # Start each thread 266 dut_slave_thread.start() 267 dut_master_thread.start() 268 269 # Wait for threads to complete 270 dut_slave_thread.join(timeout=TEST_THREAD_JOIN_TIMEOUT) 271 dut_master_thread.join(timeout=TEST_THREAD_JOIN_TIMEOUT) 272 273 if dut_slave_thread.is_alive(): 274 logger.error('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n', 275 dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT) 276 raise RuntimeError('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' % 277 (dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT)) 278 279 if dut_master_thread.is_alive(): 280 logger.error('TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n', 281 dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT) 282 raise RuntimeError('TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' % 283 (dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT)) 284 285 logger.info('TEST_INFO: %s error count = %d, %s error count = %d.\n', 286 dut_master_thread.tname, dut_master_thread.param_fail_count, 287 dut_slave_thread.tname, dut_slave_thread.param_fail_count) 288 logger.info('TEST_INFO: %s ok count = %d, %s ok count = %d.\n', 289 dut_master_thread.tname, dut_master_thread.param_ok_count, 290 dut_slave_thread.tname, dut_slave_thread.param_ok_count) 291 292 if ((dut_master_thread.param_fail_count > TEST_READ_MAX_ERR_COUNT) or 293 (dut_slave_thread.param_fail_count > TEST_READ_MAX_ERR_COUNT) or 294 (dut_slave_thread.param_ok_count == 0) or 295 (dut_master_thread.param_ok_count == 0)): 296 raise RuntimeError('TEST_FAILURE: %s parameter read error(ok) count = %d(%d), %s parameter read error(ok) count = %d(%d).\n' % 297 (dut_master_thread.tname, dut_master_thread.param_fail_count, dut_master_thread.param_ok_count, 298 dut_slave_thread.tname, dut_slave_thread.param_fail_count, dut_slave_thread.param_ok_count)) 299 logger.info('TEST_SUCCESS: The Modbus parameter test is completed successfully.\n') 300 301 finally: 302 dut_master.close() 303 dut_slave.close() 304 logging.shutdown() 305 306 307if __name__ == '__main__': 308 test_modbus_communication() 309