1# SPDX-FileCopyrightText: 2016-2022 Espressif Systems (Shanghai) CO LTD
2# SPDX-License-Identifier: Apache-2.0
3
4import logging
5import os
6import re
7from threading import Thread
8
9import ttfw_idf
10from tiny_test_fw import DUT
11
12LOG_LEVEL = logging.DEBUG
13LOGGER_NAME = 'modbus_test'
14
15# Allowed options for the test
16TEST_READ_MAX_ERR_COUNT = 3         # Maximum allowed read errors during initialization
17TEST_THREAD_JOIN_TIMEOUT = 60       # Test theread join timeout in seconds
18TEST_EXPECT_STR_TIMEOUT = 30        # Test expect timeout in seconds
19TEST_MASTER_TCP = 'mb_tcp_master'
20TEST_SLAVE_TCP = 'mb_tcp_slave'
21
22STACK_DEFAULT = 0
23STACK_IPV4 = 1
24STACK_IPV6 = 2
25STACK_INIT = 3
26STACK_CONNECT = 4
27STACK_START = 5
28STACK_PAR_OK = 6
29STACK_PAR_FAIL = 7
30STACK_DESTROY = 8
31
32pattern_dict_slave = {STACK_IPV4: (r'.*I \([0-9]+\) example_connect: - IPv4 address: ([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}).*'),
33                      STACK_IPV6: (r'.*I \([0-9]+\) example_connect: - IPv6 address: (([A-Fa-f0-9]{1,4}::?){1,7}[A-Fa-f0-9]{1,4}).*'),
34                      STACK_INIT: (r'.*I \(([0-9]+)\) MB_TCP_SLAVE_PORT: (Protocol stack initialized).'),
35                      STACK_CONNECT: (r'.*I\s\(([0-9]+)\) MB_TCP_SLAVE_PORT: Socket \(#[0-9]+\), accept client connection from address: '
36                                      r'([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}).*'),
37                      STACK_START: (r'.*I\s\(([0-9]+)\) SLAVE_TEST: (Start modbus test).*'),
38                      STACK_PAR_OK: (r'.*I\s\(([0-9]+)\) SLAVE_TEST: ([A-Z]+ [A-Z]+) \([a-zA-Z0-9_]+ us\),\s'
39                                     r'ADDR:([0-9]+), TYPE:[0-9]+, INST_ADDR:0x[a-zA-Z0-9]+, SIZE:[0-9]+'),
40                      STACK_PAR_FAIL: (r'.*E \(([0-9]+)\) SLAVE_TEST: Response time exceeds configured [0-9]+ [ms], ignore packet.*'),
41                      STACK_DESTROY: (r'.*I\s\(([0-9]+)\) SLAVE_TEST: (Modbus controller destroyed).')}
42
43pattern_dict_master = {STACK_IPV4: (r'.*I \([0-9]+\) example_connect: - IPv4 address: ([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}).*'),
44                       STACK_IPV6: (r'.*I \([0-9]+\) example_connect: - IPv6 address: (([A-Fa-f0-9]{1,4}::?){1,7}[A-Fa-f0-9]{1,4}).*'),
45                       STACK_INIT: (r'.*I \(([0-9]+)\) MASTER_TEST: (Modbus master stack initialized).*'),
46                       STACK_CONNECT: (r'.*.*I\s\(([0-9]+)\) MB_TCP_MASTER_PORT: (Connected [0-9]+ slaves), start polling.*'),
47                       STACK_START: (r'.*I \(([0-9]+)\) MASTER_TEST: (Start modbus test).*'),
48                       STACK_PAR_OK: (r'.*I\s\(([0-9]+)\) MASTER_TEST: Characteristic #[0-9]+ ([a-zA-Z0-9_]+)'
49                                      r'\s\([a-zA-Z\%\/]+\) value = [a-zA-Z0-9\.]+ \(0x[a-zA-Z0-9]+\) read successful.*'),
50                       STACK_PAR_FAIL: (r'.*E \(([0-9]+)\) MASTER_TEST: Characteristic #[0-9]+\s\(([a-zA-Z0-9_]+)\)\s'
51                                        r'read fail, err = [0-9]+ \([_A-Z]+\).*'),
52                       STACK_DESTROY: (r'.*I\s\(([0-9]+)\) MASTER_TEST: (Destroy master).*')}
53
54logger = logging.getLogger(LOGGER_NAME)
55
56
57class DutTestThread(Thread):
58    """ Test thread class
59    """
60    def __init__(self, dut=None, name=None, ip_addr=None, expect=None):
61        """ Initialize the thread parameters
62        """
63        self.tname = name
64        self.dut = dut
65        self.expected = expect
66        self.data = None
67        self.ip_addr = ip_addr
68        self.test_finish = False
69        self.param_fail_count = 0
70        self.param_ok_count = 0
71        self.test_stage = STACK_DEFAULT
72        super(DutTestThread, self).__init__()
73
74    def __enter__(self):
75        logger.debug('Restart %s.', self.tname)
76        # Reset DUT first
77        self.dut.reset()
78        # Capture output from the DUT
79        self.dut.start_capture_raw_data(capture_id=self.dut.name)
80        return self
81
82    def __exit__(self, exc_type, exc_value, traceback):
83        """ The exit method of context manager
84        """
85        if exc_type is not None or exc_value is not None:
86            logger.info('Thread %s rised an exception type: %s, value: %s', self.tname, str(exc_type), str(exc_value))
87
88    def run(self):
89        """ The function implements thread functionality
90        """
91        # Initialize slave IP for master board
92        if (self.ip_addr is not None):
93            self.set_ip(0)
94
95        # Check expected strings in the listing
96        self.test_start(TEST_EXPECT_STR_TIMEOUT)
97
98        # Check DUT exceptions
99        dut_exceptions = self.dut.get_exceptions()
100        if 'Guru Meditation Error:' in dut_exceptions:
101            raise RuntimeError('%s generated an exception(s): %s\n' % (str(self.dut), dut_exceptions))
102
103        # Mark thread has run to completion without any exceptions
104        self.data = self.dut.stop_capture_raw_data(capture_id=self.dut.name)
105
106    def set_ip(self, index=0):
107        """ The method to send slave IP to master application
108        """
109        message = r'.*Waiting IP([0-9]{1,2}) from stdin.*'
110        # Read all data from previous restart to get prompt correctly
111        self.dut.read()
112        result = self.dut.expect(re.compile(message), timeout=TEST_EXPECT_STR_TIMEOUT)
113        if int(result[0]) != index:
114            raise RuntimeError('Incorrect index of IP=%s for %s\n' % (result[0], str(self.dut)))
115        # Use the same slave IP address for all characteristics during the test
116        self.dut.write('IP0=' + self.ip_addr, '\n', False)
117        self.dut.write('IP1=' + self.ip_addr, '\n', False)
118        self.dut.write('IP2=' + self.ip_addr, '\n', False)
119        logger.debug('Set IP address=%s for %s', self.ip_addr, self.tname)
120        message = r'.*IP\([0-9]+\) = \[([0-9a-zA-Z\.\:]+)\] set from stdin.*'
121        result = self.dut.expect(re.compile(message), timeout=TEST_EXPECT_STR_TIMEOUT)
122        logger.debug('Thread %s initialized with slave IP=%s.', self.tname, self.ip_addr)
123
124    def test_start(self, timeout_value):
125        """ The method to initialize and handle test stages
126        """
127        def handle_get_ip4(data):
128            """ Handle get_ip v4
129            """
130            logger.debug('%s[STACK_IPV4]: %s', self.tname, str(data))
131            self.test_stage = STACK_IPV4
132
133        def handle_get_ip6(data):
134            """ Handle get_ip v6
135            """
136            logger.debug('%s[STACK_IPV6]: %s', self.tname, str(data))
137            self.test_stage = STACK_IPV6
138
139        def handle_init(data):
140            """ Handle init
141            """
142            logger.debug('%s[STACK_INIT]: %s', self.tname, str(data))
143            self.test_stage = STACK_INIT
144
145        def handle_connect(data):
146            """ Handle connect
147            """
148            logger.debug('%s[STACK_CONNECT]: %s', self.tname, str(data))
149            self.test_stage = STACK_CONNECT
150
151        def handle_test_start(data):
152            """ Handle connect
153            """
154            logger.debug('%s[STACK_START]: %s', self.tname, str(data))
155            self.test_stage = STACK_START
156
157        def handle_par_ok(data):
158            """ Handle parameter ok
159            """
160            logger.debug('%s[READ_PAR_OK]: %s', self.tname, str(data))
161            if self.test_stage >= STACK_START:
162                self.param_ok_count += 1
163            self.test_stage = STACK_PAR_OK
164
165        def handle_par_fail(data):
166            """ Handle parameter fail
167            """
168            logger.debug('%s[READ_PAR_FAIL]: %s', self.tname, str(data))
169            self.param_fail_count += 1
170            self.test_stage = STACK_PAR_FAIL
171
172        def handle_destroy(data):
173            """ Handle destroy
174            """
175            logger.debug('%s[DESTROY]: %s', self.tname, str(data))
176            self.test_stage = STACK_DESTROY
177            self.test_finish = True
178
179        while not self.test_finish:
180            try:
181                self.dut.expect_any((re.compile(self.expected[STACK_IPV4]), handle_get_ip4),
182                                    (re.compile(self.expected[STACK_IPV6]), handle_get_ip6),
183                                    (re.compile(self.expected[STACK_INIT]), handle_init),
184                                    (re.compile(self.expected[STACK_CONNECT]), handle_connect),
185                                    (re.compile(self.expected[STACK_START]), handle_test_start),
186                                    (re.compile(self.expected[STACK_PAR_OK]), handle_par_ok),
187                                    (re.compile(self.expected[STACK_PAR_FAIL]), handle_par_fail),
188                                    (re.compile(self.expected[STACK_DESTROY]), handle_destroy),
189                                    timeout=timeout_value)
190            except DUT.ExpectTimeout:
191                logger.debug('%s, expect timeout on stage #%d (%s seconds)', self.tname, self.test_stage, timeout_value)
192                self.test_finish = True
193
194
195def test_check_mode(dut=None, mode_str=None, value=None):
196    """ Check communication mode for dut
197    """
198    global logger
199    try:
200        opt = dut.app.get_sdkconfig()[mode_str]
201        logger.debug('%s {%s} = %s.\n', str(dut), mode_str, opt)
202        return value == opt
203    except Exception:
204        logger.error('ENV_TEST_FAILURE: %s: Cannot find option %s in sdkconfig.', str(dut), mode_str)
205    return False
206
207
208@ttfw_idf.idf_example_test(env_tag='Example_Modbus_TCP', target=['esp32'])
209def test_modbus_communication(env, comm_mode):
210    global logger
211
212    rel_project_path = os.path.join('examples', 'protocols', 'modbus', 'tcp')
213    # Get device under test. Both duts must be able to be connected to WiFi router
214    dut_master = env.get_dut('modbus_tcp_master', os.path.join(rel_project_path, TEST_MASTER_TCP))
215    dut_slave = env.get_dut('modbus_tcp_slave', os.path.join(rel_project_path, TEST_SLAVE_TCP))
216    log_file = os.path.join(env.log_path, 'modbus_tcp_test.log')
217    print('Logging file name: %s' % log_file)
218
219    try:
220        # create file handler which logs even debug messages
221        logger.setLevel(logging.DEBUG)
222        fh = logging.FileHandler(log_file)
223        fh.setLevel(logging.DEBUG)
224        # set format of output for both handlers
225        formatter = logging.Formatter('%(levelname)s:%(message)s')
226        fh.setFormatter(formatter)
227        logger.addHandler(fh)
228        # create console handler
229        ch = logging.StreamHandler()
230        ch.setLevel(logging.INFO)
231        # set format of output for both handlers
232        formatter = logging.Formatter('%(levelname)s:%(message)s')
233        ch.setFormatter(formatter)
234        logger.addHandler(ch)
235
236        # Check Kconfig configuration options for each built example
237        if (test_check_mode(dut_master, 'CONFIG_FMB_COMM_MODE_TCP_EN', 'y') and
238           test_check_mode(dut_slave, 'CONFIG_FMB_COMM_MODE_TCP_EN', 'y')):
239            slave_name = TEST_SLAVE_TCP
240            master_name = TEST_MASTER_TCP
241        else:
242            logger.error('ENV_TEST_FAILURE: IP resolver mode do not match in the master and slave implementation.\n')
243            raise RuntimeError('ENV_TEST_FAILURE: IP resolver mode do not match in the master and slave implementation.\n')
244        address = None
245        if test_check_mode(dut_master, 'CONFIG_MB_SLAVE_IP_FROM_STDIN', 'y'):
246            logger.info('ENV_TEST_INFO: Set slave IP address through STDIN.\n')
247            # Flash app onto DUT (Todo: Debug case when the slave flashed before master then expect does not work correctly for no reason
248            dut_slave.start_app()
249            dut_master.start_app()
250            if test_check_mode(dut_master, 'CONFIG_EXAMPLE_CONNECT_IPV6', 'y'):
251                address = dut_slave.expect(re.compile(pattern_dict_slave[STACK_IPV6]), TEST_EXPECT_STR_TIMEOUT)
252            else:
253                address = dut_slave.expect(re.compile(pattern_dict_slave[STACK_IPV4]), TEST_EXPECT_STR_TIMEOUT)
254            if address is not None:
255                    print('Found IP slave address: %s' % address[0])
256            else:
257                raise RuntimeError('ENV_TEST_FAILURE: Slave IP address is not found in the output. Check network settings.\n')
258        else:
259            raise RuntimeError('ENV_TEST_FAILURE: Slave IP resolver is not configured correctly.\n')
260
261        # Create thread for each dut
262        with DutTestThread(dut=dut_master, name=master_name, ip_addr=address[0], expect=pattern_dict_master) as dut_master_thread:
263            with DutTestThread(dut=dut_slave, name=slave_name, ip_addr=None, expect=pattern_dict_slave) as dut_slave_thread:
264
265                # Start each thread
266                dut_slave_thread.start()
267                dut_master_thread.start()
268
269                # Wait for threads to complete
270                dut_slave_thread.join(timeout=TEST_THREAD_JOIN_TIMEOUT)
271                dut_master_thread.join(timeout=TEST_THREAD_JOIN_TIMEOUT)
272
273                if dut_slave_thread.is_alive():
274                    logger.error('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n',
275                                 dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT)
276                    raise RuntimeError('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' %
277                                       (dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
278
279                if dut_master_thread.is_alive():
280                    logger.error('TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n',
281                                 dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT)
282                    raise RuntimeError('TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' %
283                                       (dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
284
285                logger.info('TEST_INFO: %s error count = %d, %s error count = %d.\n',
286                            dut_master_thread.tname, dut_master_thread.param_fail_count,
287                            dut_slave_thread.tname, dut_slave_thread.param_fail_count)
288                logger.info('TEST_INFO: %s ok count = %d, %s ok count = %d.\n',
289                            dut_master_thread.tname, dut_master_thread.param_ok_count,
290                            dut_slave_thread.tname, dut_slave_thread.param_ok_count)
291
292                if ((dut_master_thread.param_fail_count > TEST_READ_MAX_ERR_COUNT) or
293                   (dut_slave_thread.param_fail_count > TEST_READ_MAX_ERR_COUNT) or
294                   (dut_slave_thread.param_ok_count == 0) or
295                   (dut_master_thread.param_ok_count == 0)):
296                    raise RuntimeError('TEST_FAILURE: %s parameter read error(ok) count = %d(%d), %s parameter read error(ok) count = %d(%d).\n' %
297                                       (dut_master_thread.tname, dut_master_thread.param_fail_count, dut_master_thread.param_ok_count,
298                                        dut_slave_thread.tname, dut_slave_thread.param_fail_count, dut_slave_thread.param_ok_count))
299                logger.info('TEST_SUCCESS: The Modbus parameter test is completed successfully.\n')
300
301    finally:
302        dut_master.close()
303        dut_slave.close()
304        logging.shutdown()
305
306
307if __name__ == '__main__':
308    test_modbus_communication()
309