1# Need Python 3 string formatting functions
2from __future__ import print_function
3
4import logging
5import os
6import re
7from threading import Thread
8
9import ttfw_idf
10
11LOG_LEVEL = logging.DEBUG
12LOGGER_NAME = 'modbus_test'
13
14# Allowed parameter reads
15TEST_READ_MIN_COUNT = 10            # Minimum number of correct readings
16TEST_READ_MAX_ERR_COUNT = 2         # Maximum allowed read errors during initialization
17
18TEST_THREAD_EXPECT_TIMEOUT = 120    # Test theread expect timeout in seconds
19TEST_THREAD_JOIN_TIMEOUT = 180      # Test theread join timeout in seconds
20
21# Test definitions
22TEST_MASTER_RTU = 'master_rtu'
23TEST_SLAVE_RTU = 'slave_rtu'
24
25TEST_MASTER_ASCII = 'master_ascii'
26TEST_SLAVE_ASCII = 'slave_ascii'
27
28# Define tuple of strings to expect for each DUT.
29#
30master_expect = ('MASTER_TEST: Modbus master stack initialized...', 'MASTER_TEST: Start modbus test...', 'MASTER_TEST: Destroy master...')
31slave_expect = ('SLAVE_TEST: Modbus slave stack initialized.', 'SLAVE_TEST: Start modbus test...', 'SLAVE_TEST: Modbus controller destroyed.')
32
33# The dictionary for expected values in listing
34expect_dict_master_ok = {'START': (),
35                         'READ_PAR_OK': (),
36                         'ALARM_MSG': (u'7',)}
37
38expect_dict_master_err = {'READ_PAR_ERR': (u'263', u'ESP_ERR_TIMEOUT'),
39                          'READ_STK_ERR': (u'107', u'ESP_ERR_TIMEOUT')}
40
41# The dictionary for regular expression patterns to check in listing
42pattern_dict_master_ok = {'START': (r'.*I \([0-9]+\) MASTER_TEST: Start modbus test...'),
43                          'READ_PAR_OK': (r'.*I\s\([0-9]+\) MASTER_TEST: Characteristic #[0-9]+ [a-zA-Z0-9_]+'
44                                          r'\s\([a-zA-Z\%\/]+\) value = [a-zA-Z0-9\.\s]*\(0x[a-zA-Z0-9]+\) read successful.'),
45                          'ALARM_MSG': (r'.*I \([0-9]*\) MASTER_TEST: Alarm triggered by cid #([0-9]+).')}
46
47pattern_dict_master_err = {'READ_PAR_ERR_TOUT': (r'.*E \([0-9]+\) MASTER_TEST: Characteristic #[0-9]+'
48                                                 r'\s\([a-zA-Z0-9_]+\) read fail, err = [0-9]+ \([_A-Z]+\).'),
49                           'READ_STK_ERR_TOUT': (r'.*E \([0-9]+\) MB_CONTROLLER_MASTER: [a-zA-Z0-9_]+\([0-9]+\):\s'
50                                                 r'SERIAL master get parameter failure error=\(0x([a-zA-Z0-9]+)\) \(([_A-Z]+)\).')}
51
52# The dictionary for expected values in listing
53expect_dict_slave_ok = {'START': (),
54                        'READ_PAR_OK': (),
55                        'DESTROY': ()}
56
57# The dictionary for regular expression patterns to check in listing
58pattern_dict_slave_ok = {'START': (r'.*I \([0-9]+\) SLAVE_TEST: Start modbus test...'),
59                         'READ_PAR_OK': (r'.*I\s\([0-9]+\) SLAVE_TEST: [A-Z]+ READ \([a-zA-Z0-9_]+ us\),\s'
60                                         r'ADDR:[0-9]+, TYPE:[0-9]+, INST_ADDR:0x[a-zA-Z0-9]+, SIZE:[0-9]+'),
61                         'DESTROY': (r'.*I\s\([0-9]+\) SLAVE_TEST: Modbus controller destroyed.')}
62
63logger = logging.getLogger(LOGGER_NAME)
64
65
66class DutTestThread(Thread):
67    def __init__(self, dut=None, name=None, expect=None):
68        """ Initialize the thread parameters
69        """
70        self.tname = name
71        self.dut = dut
72        self.expected = expect
73        self.result = False
74        self.data = None
75        super(DutTestThread, self).__init__()
76
77    def run(self):
78        """ The function implements thread functionality
79        """
80        # Must reset again as flashing during start_app will reset multiple times, causing unexpected results
81        self.dut.reset()
82
83        # Capture output from the DUT
84        self.dut.start_capture_raw_data()
85
86        # Check expected strings in the listing
87        for string in self.expected:
88            self.dut.expect(string, TEST_THREAD_EXPECT_TIMEOUT)
89
90        # Check DUT exceptions
91        dut_exceptions = self.dut.get_exceptions()
92        if 'Guru Meditation Error:' in dut_exceptions:
93            raise Exception('%s generated an exception: %s\n' % (str(self.dut), dut_exceptions))
94
95        # Mark thread has run to completion without any exceptions
96        self.data = self.dut.stop_capture_raw_data()
97        self.result = True
98
99
100def test_filter_output(data=None, start_pattern=None, end_pattern=None):
101    """Use patters to filter output
102    """
103    start_index = str(data).find(start_pattern)
104    end_index = str(data).find(end_pattern)
105    logger.debug('Listing start index= %d, end=%d' % (start_index, end_index))
106    if start_index == -1 or end_index == -1:
107        return data
108    return data[start_index:end_index + len(end_pattern)]
109
110
111def test_expect_re(data, pattern):
112    """
113    Check if re pattern is matched in data cache
114    :param data: data to process
115    :param pattern: compiled RegEx pattern
116    :return: match groups if match succeed otherwise None
117    """
118    ret = None
119    if isinstance(pattern, type(u'')):
120        pattern = pattern.encode('utf-8')
121    regex = re.compile(pattern)
122    if isinstance(data, type(u'')):
123        data = data.encode('utf-8')
124    match = regex.search(data)
125    if match:
126        ret = tuple(None if x is None else x.decode() for x in match.groups())
127        index = match.end()
128    else:
129        index = None
130    return ret, index
131
132
133def test_check_output(data=None, check_dict=None, expect_dict=None):
134    """ Check output for the test
135        Check log using regular expressions:
136    """
137    global logger
138    match_count = 0
139    index = 0
140    data_lines = data.splitlines()
141    for key, pattern in check_dict.items():
142        if key not in expect_dict:
143            break
144        # Check pattern in the each line
145        for line in data_lines:
146            group, index = test_expect_re(line, pattern)
147            if index is not None:
148                logger.debug('Found key{%s}=%s, line: \n%s' % (key, group, line))
149                if expect_dict[key] == group:
150                    logger.debug('The result is correct for the key:%s, expected:%s == returned:%s' % (key, str(expect_dict[key]), str(group)))
151                    match_count += 1
152    return match_count
153
154
155def test_check_mode(dut=None, mode_str=None, value=None):
156    """ Check communication mode for dut
157    """
158    global logger
159    try:
160        opt = dut.app.get_sdkconfig()[mode_str]
161        logger.info('%s {%s} = %s.\n' % (str(dut), mode_str, opt))
162        return value == opt
163    except Exception:
164        logger.info('ENV_TEST_FAILURE: %s: Cannot find option %s in sdkconfig.' % (str(dut), mode_str))
165    return False
166
167
168@ttfw_idf.idf_example_test(env_tag='Example_T2_RS485')
169def test_modbus_communication(env, comm_mode):
170    global logger
171
172    # Get device under test. "dut1 - master", "dut2 - slave" must be properly connected through RS485 interface driver
173    dut_master = env.get_dut('modbus_master', 'examples/protocols/modbus/serial/mb_master', dut_class=ttfw_idf.ESP32DUT)
174    dut_slave = env.get_dut('modbus_slave', 'examples/protocols/modbus/serial/mb_slave', dut_class=ttfw_idf.ESP32DUT)
175
176    try:
177        logger.debug('Environment vars: %s\r\n' % os.environ)
178        logger.debug('DUT slave sdkconfig: %s\r\n' % dut_slave.app.get_sdkconfig())
179        logger.debug('DUT master sdkconfig: %s\r\n' % dut_master.app.get_sdkconfig())
180
181        # Check Kconfig configuration options for each built example
182        if test_check_mode(dut_master, 'CONFIG_MB_COMM_MODE_ASCII', 'y') and test_check_mode(dut_slave, 'CONFIG_MB_COMM_MODE_ASCII', 'y'):
183            logger.info('ENV_TEST_INFO: Modbus ASCII test mode selected in the configuration. \n')
184            slave_name = TEST_SLAVE_ASCII
185            master_name = TEST_MASTER_ASCII
186        elif test_check_mode(dut_master, 'CONFIG_MB_COMM_MODE_RTU', 'y') and test_check_mode(dut_slave, 'CONFIG_MB_COMM_MODE_RTU', 'y'):
187            logger.info('ENV_TEST_INFO: Modbus RTU test mode selected in the configuration. \n')
188            slave_name = TEST_SLAVE_RTU
189            master_name = TEST_MASTER_RTU
190        else:
191            logger.error("ENV_TEST_FAILURE: Communication mode in master and slave configuration don't match.\n")
192            raise Exception("ENV_TEST_FAILURE: Communication mode in master and slave configuration don't match.\n")
193        # Check if slave address for example application is default one to be able to communicate
194        if not test_check_mode(dut_slave, 'CONFIG_MB_SLAVE_ADDR', '1'):
195            logger.error('ENV_TEST_FAILURE: Slave address option is incorrect.\n')
196            raise Exception('ENV_TEST_FAILURE: Slave address option is incorrect.\n')
197
198        # Flash app onto each DUT
199        dut_master.start_app()
200        dut_slave.start_app()
201
202        # Create thread for each dut
203        dut_master_thread = DutTestThread(dut=dut_master, name=master_name, expect=master_expect)
204        dut_slave_thread = DutTestThread(dut=dut_slave, name=slave_name, expect=slave_expect)
205
206        # Start each thread
207        dut_slave_thread.start()
208        dut_master_thread.start()
209
210        # Wait for threads to complete
211        dut_slave_thread.join(timeout=TEST_THREAD_JOIN_TIMEOUT)
212        dut_master_thread.join(timeout=TEST_THREAD_JOIN_TIMEOUT)
213
214        if dut_slave_thread.isAlive():
215            logger.error('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' %
216                         (dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
217            raise Exception('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' %
218                            (dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
219
220        if dut_master_thread.isAlive():
221            logger.error('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' %
222                         (dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
223            raise Exception('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' %
224                            (dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
225    finally:
226        dut_master.close()
227        dut_slave.close()
228
229    # Check if test threads completed successfully and captured data
230    if not dut_slave_thread.result or dut_slave_thread.data is None:
231        logger.error('The thread %s was not run successfully.' % dut_slave_thread.tname)
232        raise Exception('The thread %s was not run successfully.' % dut_slave_thread.tname)
233
234    if not dut_master_thread.result or dut_master_thread.data is None:
235        logger.error('The thread %s was not run successfully.' % dut_slave_thread.tname)
236        raise Exception('The thread %s was not run successfully.' % dut_master_thread.tname)
237
238    # Filter output to get test messages
239    master_output = test_filter_output(dut_master_thread.data, master_expect[0], master_expect[len(master_expect) - 1])
240    if master_output is not None:
241        logger.info('The data for master thread is captured.')
242        logger.debug(master_output)
243
244    slave_output = test_filter_output(dut_slave_thread.data, slave_expect[0], slave_expect[len(slave_expect) - 1])
245    if slave_output is not None:
246        logger.info('The data for slave thread is captured.')
247        logger.debug(slave_output)
248
249    # Check if parameters are read correctly by master
250    match_count = test_check_output(master_output, pattern_dict_master_ok, expect_dict_master_ok)
251    if match_count < TEST_READ_MIN_COUNT:
252        logger.error('There are errors reading parameters from %s, %d' % (dut_master_thread.tname, match_count))
253        raise Exception('There are errors reading parameters from %s, %d' % (dut_master_thread.tname, match_count))
254    logger.info('OK pattern test for %s, match_count=%d.' % (dut_master_thread.tname, match_count))
255
256    # If the test completed successfully (alarm triggered) but there are some errors during reading of parameters
257    match_count = test_check_output(master_output, pattern_dict_master_err, expect_dict_master_err)
258    if match_count > TEST_READ_MAX_ERR_COUNT:
259        logger.error('There are errors reading parameters from %s, %d' % (dut_master_thread.tname, match_count))
260        raise Exception('There are errors reading parameters from %s, %d' % (dut_master_thread.tname, match_count))
261    logger.info('ERROR pattern test for %s, match_count=%d.' % (dut_master_thread.tname, match_count))
262
263    match_count = test_check_output(slave_output, pattern_dict_slave_ok, expect_dict_slave_ok)
264    if match_count < TEST_READ_MIN_COUNT:
265        logger.error('There are errors reading parameters from %s, %d' % (dut_slave_thread.tname, match_count))
266        raise Exception('There are errors reading parameters from %s, %d' % (dut_slave_thread.tname, match_count))
267    logger.info('OK pattern test for %s, match_count=%d.' % (dut_slave_thread.tname, match_count))
268
269
270if __name__ == '__main__':
271    logger = logging.getLogger(LOGGER_NAME)
272    # create file handler which logs even debug messages
273    fh = logging.FileHandler('modbus_test.log')
274    fh.setLevel(logging.DEBUG)
275    logger.setLevel(logging.DEBUG)
276    # create console handler
277    ch = logging.StreamHandler()
278    ch.setLevel(logging.INFO)
279    # set format of output for both handlers
280    formatter = logging.Formatter('%(levelname)s:%(message)s')
281    ch.setFormatter(formatter)
282    fh.setFormatter(formatter)
283    logger.addHandler(fh)
284    logger.addHandler(ch)
285    logger.info('Start script %s.' % os.path.basename(__file__))
286    print('Logging file name: %s' % logger.handlers[0].baseFilename)
287    test_modbus_communication()
288    logging.shutdown()
289