1# Need Python 3 string formatting functions 2from __future__ import print_function 3 4import logging 5import os 6import re 7from threading import Thread 8 9import ttfw_idf 10 11LOG_LEVEL = logging.DEBUG 12LOGGER_NAME = 'modbus_test' 13 14# Allowed parameter reads 15TEST_READ_MIN_COUNT = 10 # Minimum number of correct readings 16TEST_READ_MAX_ERR_COUNT = 2 # Maximum allowed read errors during initialization 17 18TEST_THREAD_EXPECT_TIMEOUT = 120 # Test theread expect timeout in seconds 19TEST_THREAD_JOIN_TIMEOUT = 180 # Test theread join timeout in seconds 20 21# Test definitions 22TEST_MASTER_RTU = 'master_rtu' 23TEST_SLAVE_RTU = 'slave_rtu' 24 25TEST_MASTER_ASCII = 'master_ascii' 26TEST_SLAVE_ASCII = 'slave_ascii' 27 28# Define tuple of strings to expect for each DUT. 29# 30master_expect = ('MASTER_TEST: Modbus master stack initialized...', 'MASTER_TEST: Start modbus test...', 'MASTER_TEST: Destroy master...') 31slave_expect = ('SLAVE_TEST: Modbus slave stack initialized.', 'SLAVE_TEST: Start modbus test...', 'SLAVE_TEST: Modbus controller destroyed.') 32 33# The dictionary for expected values in listing 34expect_dict_master_ok = {'START': (), 35 'READ_PAR_OK': (), 36 'ALARM_MSG': (u'7',)} 37 38expect_dict_master_err = {'READ_PAR_ERR': (u'263', u'ESP_ERR_TIMEOUT'), 39 'READ_STK_ERR': (u'107', u'ESP_ERR_TIMEOUT')} 40 41# The dictionary for regular expression patterns to check in listing 42pattern_dict_master_ok = {'START': (r'.*I \([0-9]+\) MASTER_TEST: Start modbus test...'), 43 'READ_PAR_OK': (r'.*I\s\([0-9]+\) MASTER_TEST: Characteristic #[0-9]+ [a-zA-Z0-9_]+' 44 r'\s\([a-zA-Z\%\/]+\) value = [a-zA-Z0-9\.\s]*\(0x[a-zA-Z0-9]+\) read successful.'), 45 'ALARM_MSG': (r'.*I \([0-9]*\) MASTER_TEST: Alarm triggered by cid #([0-9]+).')} 46 47pattern_dict_master_err = {'READ_PAR_ERR_TOUT': (r'.*E \([0-9]+\) MASTER_TEST: Characteristic #[0-9]+' 48 r'\s\([a-zA-Z0-9_]+\) read fail, err = [0-9]+ \([_A-Z]+\).'), 49 'READ_STK_ERR_TOUT': (r'.*E \([0-9]+\) MB_CONTROLLER_MASTER: [a-zA-Z0-9_]+\([0-9]+\):\s' 50 r'SERIAL master get parameter failure error=\(0x([a-zA-Z0-9]+)\) \(([_A-Z]+)\).')} 51 52# The dictionary for expected values in listing 53expect_dict_slave_ok = {'START': (), 54 'READ_PAR_OK': (), 55 'DESTROY': ()} 56 57# The dictionary for regular expression patterns to check in listing 58pattern_dict_slave_ok = {'START': (r'.*I \([0-9]+\) SLAVE_TEST: Start modbus test...'), 59 'READ_PAR_OK': (r'.*I\s\([0-9]+\) SLAVE_TEST: [A-Z]+ READ \([a-zA-Z0-9_]+ us\),\s' 60 r'ADDR:[0-9]+, TYPE:[0-9]+, INST_ADDR:0x[a-zA-Z0-9]+, SIZE:[0-9]+'), 61 'DESTROY': (r'.*I\s\([0-9]+\) SLAVE_TEST: Modbus controller destroyed.')} 62 63logger = logging.getLogger(LOGGER_NAME) 64 65 66class DutTestThread(Thread): 67 def __init__(self, dut=None, name=None, expect=None): 68 """ Initialize the thread parameters 69 """ 70 self.tname = name 71 self.dut = dut 72 self.expected = expect 73 self.result = False 74 self.data = None 75 super(DutTestThread, self).__init__() 76 77 def run(self): 78 """ The function implements thread functionality 79 """ 80 # Must reset again as flashing during start_app will reset multiple times, causing unexpected results 81 self.dut.reset() 82 83 # Capture output from the DUT 84 self.dut.start_capture_raw_data() 85 86 # Check expected strings in the listing 87 for string in self.expected: 88 self.dut.expect(string, TEST_THREAD_EXPECT_TIMEOUT) 89 90 # Check DUT exceptions 91 dut_exceptions = self.dut.get_exceptions() 92 if 'Guru Meditation Error:' in dut_exceptions: 93 raise Exception('%s generated an exception: %s\n' % (str(self.dut), dut_exceptions)) 94 95 # Mark thread has run to completion without any exceptions 96 self.data = self.dut.stop_capture_raw_data() 97 self.result = True 98 99 100def test_filter_output(data=None, start_pattern=None, end_pattern=None): 101 """Use patters to filter output 102 """ 103 start_index = str(data).find(start_pattern) 104 end_index = str(data).find(end_pattern) 105 logger.debug('Listing start index= %d, end=%d' % (start_index, end_index)) 106 if start_index == -1 or end_index == -1: 107 return data 108 return data[start_index:end_index + len(end_pattern)] 109 110 111def test_expect_re(data, pattern): 112 """ 113 Check if re pattern is matched in data cache 114 :param data: data to process 115 :param pattern: compiled RegEx pattern 116 :return: match groups if match succeed otherwise None 117 """ 118 ret = None 119 if isinstance(pattern, type(u'')): 120 pattern = pattern.encode('utf-8') 121 regex = re.compile(pattern) 122 if isinstance(data, type(u'')): 123 data = data.encode('utf-8') 124 match = regex.search(data) 125 if match: 126 ret = tuple(None if x is None else x.decode() for x in match.groups()) 127 index = match.end() 128 else: 129 index = None 130 return ret, index 131 132 133def test_check_output(data=None, check_dict=None, expect_dict=None): 134 """ Check output for the test 135 Check log using regular expressions: 136 """ 137 global logger 138 match_count = 0 139 index = 0 140 data_lines = data.splitlines() 141 for key, pattern in check_dict.items(): 142 if key not in expect_dict: 143 break 144 # Check pattern in the each line 145 for line in data_lines: 146 group, index = test_expect_re(line, pattern) 147 if index is not None: 148 logger.debug('Found key{%s}=%s, line: \n%s' % (key, group, line)) 149 if expect_dict[key] == group: 150 logger.debug('The result is correct for the key:%s, expected:%s == returned:%s' % (key, str(expect_dict[key]), str(group))) 151 match_count += 1 152 return match_count 153 154 155def test_check_mode(dut=None, mode_str=None, value=None): 156 """ Check communication mode for dut 157 """ 158 global logger 159 try: 160 opt = dut.app.get_sdkconfig()[mode_str] 161 logger.info('%s {%s} = %s.\n' % (str(dut), mode_str, opt)) 162 return value == opt 163 except Exception: 164 logger.info('ENV_TEST_FAILURE: %s: Cannot find option %s in sdkconfig.' % (str(dut), mode_str)) 165 return False 166 167 168@ttfw_idf.idf_example_test(env_tag='Example_T2_RS485') 169def test_modbus_communication(env, comm_mode): 170 global logger 171 172 # Get device under test. "dut1 - master", "dut2 - slave" must be properly connected through RS485 interface driver 173 dut_master = env.get_dut('modbus_master', 'examples/protocols/modbus/serial/mb_master', dut_class=ttfw_idf.ESP32DUT) 174 dut_slave = env.get_dut('modbus_slave', 'examples/protocols/modbus/serial/mb_slave', dut_class=ttfw_idf.ESP32DUT) 175 176 try: 177 logger.debug('Environment vars: %s\r\n' % os.environ) 178 logger.debug('DUT slave sdkconfig: %s\r\n' % dut_slave.app.get_sdkconfig()) 179 logger.debug('DUT master sdkconfig: %s\r\n' % dut_master.app.get_sdkconfig()) 180 181 # Check Kconfig configuration options for each built example 182 if test_check_mode(dut_master, 'CONFIG_MB_COMM_MODE_ASCII', 'y') and test_check_mode(dut_slave, 'CONFIG_MB_COMM_MODE_ASCII', 'y'): 183 logger.info('ENV_TEST_INFO: Modbus ASCII test mode selected in the configuration. \n') 184 slave_name = TEST_SLAVE_ASCII 185 master_name = TEST_MASTER_ASCII 186 elif test_check_mode(dut_master, 'CONFIG_MB_COMM_MODE_RTU', 'y') and test_check_mode(dut_slave, 'CONFIG_MB_COMM_MODE_RTU', 'y'): 187 logger.info('ENV_TEST_INFO: Modbus RTU test mode selected in the configuration. \n') 188 slave_name = TEST_SLAVE_RTU 189 master_name = TEST_MASTER_RTU 190 else: 191 logger.error("ENV_TEST_FAILURE: Communication mode in master and slave configuration don't match.\n") 192 raise Exception("ENV_TEST_FAILURE: Communication mode in master and slave configuration don't match.\n") 193 # Check if slave address for example application is default one to be able to communicate 194 if not test_check_mode(dut_slave, 'CONFIG_MB_SLAVE_ADDR', '1'): 195 logger.error('ENV_TEST_FAILURE: Slave address option is incorrect.\n') 196 raise Exception('ENV_TEST_FAILURE: Slave address option is incorrect.\n') 197 198 # Flash app onto each DUT 199 dut_master.start_app() 200 dut_slave.start_app() 201 202 # Create thread for each dut 203 dut_master_thread = DutTestThread(dut=dut_master, name=master_name, expect=master_expect) 204 dut_slave_thread = DutTestThread(dut=dut_slave, name=slave_name, expect=slave_expect) 205 206 # Start each thread 207 dut_slave_thread.start() 208 dut_master_thread.start() 209 210 # Wait for threads to complete 211 dut_slave_thread.join(timeout=TEST_THREAD_JOIN_TIMEOUT) 212 dut_master_thread.join(timeout=TEST_THREAD_JOIN_TIMEOUT) 213 214 if dut_slave_thread.isAlive(): 215 logger.error('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' % 216 (dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT)) 217 raise Exception('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' % 218 (dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT)) 219 220 if dut_master_thread.isAlive(): 221 logger.error('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' % 222 (dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT)) 223 raise Exception('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' % 224 (dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT)) 225 finally: 226 dut_master.close() 227 dut_slave.close() 228 229 # Check if test threads completed successfully and captured data 230 if not dut_slave_thread.result or dut_slave_thread.data is None: 231 logger.error('The thread %s was not run successfully.' % dut_slave_thread.tname) 232 raise Exception('The thread %s was not run successfully.' % dut_slave_thread.tname) 233 234 if not dut_master_thread.result or dut_master_thread.data is None: 235 logger.error('The thread %s was not run successfully.' % dut_slave_thread.tname) 236 raise Exception('The thread %s was not run successfully.' % dut_master_thread.tname) 237 238 # Filter output to get test messages 239 master_output = test_filter_output(dut_master_thread.data, master_expect[0], master_expect[len(master_expect) - 1]) 240 if master_output is not None: 241 logger.info('The data for master thread is captured.') 242 logger.debug(master_output) 243 244 slave_output = test_filter_output(dut_slave_thread.data, slave_expect[0], slave_expect[len(slave_expect) - 1]) 245 if slave_output is not None: 246 logger.info('The data for slave thread is captured.') 247 logger.debug(slave_output) 248 249 # Check if parameters are read correctly by master 250 match_count = test_check_output(master_output, pattern_dict_master_ok, expect_dict_master_ok) 251 if match_count < TEST_READ_MIN_COUNT: 252 logger.error('There are errors reading parameters from %s, %d' % (dut_master_thread.tname, match_count)) 253 raise Exception('There are errors reading parameters from %s, %d' % (dut_master_thread.tname, match_count)) 254 logger.info('OK pattern test for %s, match_count=%d.' % (dut_master_thread.tname, match_count)) 255 256 # If the test completed successfully (alarm triggered) but there are some errors during reading of parameters 257 match_count = test_check_output(master_output, pattern_dict_master_err, expect_dict_master_err) 258 if match_count > TEST_READ_MAX_ERR_COUNT: 259 logger.error('There are errors reading parameters from %s, %d' % (dut_master_thread.tname, match_count)) 260 raise Exception('There are errors reading parameters from %s, %d' % (dut_master_thread.tname, match_count)) 261 logger.info('ERROR pattern test for %s, match_count=%d.' % (dut_master_thread.tname, match_count)) 262 263 match_count = test_check_output(slave_output, pattern_dict_slave_ok, expect_dict_slave_ok) 264 if match_count < TEST_READ_MIN_COUNT: 265 logger.error('There are errors reading parameters from %s, %d' % (dut_slave_thread.tname, match_count)) 266 raise Exception('There are errors reading parameters from %s, %d' % (dut_slave_thread.tname, match_count)) 267 logger.info('OK pattern test for %s, match_count=%d.' % (dut_slave_thread.tname, match_count)) 268 269 270if __name__ == '__main__': 271 logger = logging.getLogger(LOGGER_NAME) 272 # create file handler which logs even debug messages 273 fh = logging.FileHandler('modbus_test.log') 274 fh.setLevel(logging.DEBUG) 275 logger.setLevel(logging.DEBUG) 276 # create console handler 277 ch = logging.StreamHandler() 278 ch.setLevel(logging.INFO) 279 # set format of output for both handlers 280 formatter = logging.Formatter('%(levelname)s:%(message)s') 281 ch.setFormatter(formatter) 282 fh.setFormatter(formatter) 283 logger.addHandler(fh) 284 logger.addHandler(ch) 285 logger.info('Start script %s.' % os.path.basename(__file__)) 286 print('Logging file name: %s' % logger.handlers[0].baseFilename) 287 test_modbus_communication() 288 logging.shutdown() 289