1#!/usr/bin/env python3
2#
3#  Copyright (c) 2021, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29
30import ipaddress
31import unittest
32
33import command
34import config
35import thread_cert
36
37# Test description:
38#   This test verifies SRP client auto-start functionality that SRP client can
39#   correctly discovers and connects to SRP server.
40#
41# Topology:
42#
43#   Four routers, one acting as SRP client, others as SRP server.
44#
45
46CLIENT = 1
47SERVER1 = 2
48SERVER2 = 3
49SERVER3 = 4
50
51
52class SrpAutoStartMode(thread_cert.TestCase):
53    USE_MESSAGE_FACTORY = False
54    SUPPORT_NCP = False
55
56    TOPOLOGY = {
57        CLIENT: {
58            'name': 'SRP_CLIENT',
59            'mode': 'rdn',
60        },
61        SERVER1: {
62            'name': 'SRP_SERVER1',
63            'mode': 'rdn',
64        },
65        SERVER2: {
66            'name': 'SRP_SERVER2',
67            'mode': 'rdn',
68        },
69        SERVER3: {
70            'name': 'SRP_SERVER3',
71            'mode': 'rdn',
72        },
73    }
74
75    def test(self):
76        client = self.nodes[CLIENT]
77        server1 = self.nodes[SERVER1]
78        server2 = self.nodes[SERVER2]
79        server3 = self.nodes[SERVER3]
80
81        #-------------------------------------------------------------------
82        # Form the network.
83
84        client.srp_server_set_enabled(False)
85        client.start()
86        self.simulator.go(config.LEADER_STARTUP_DELAY)
87        self.assertEqual(client.get_state(), 'leader')
88
89        server1.start()
90        server2.start()
91        server3.start()
92        self.simulator.go(config.ROUTER_STARTUP_DELAY)
93        self.assertEqual(server1.get_state(), 'router')
94        self.assertEqual(server2.get_state(), 'router')
95        self.assertEqual(server3.get_state(), 'router')
96
97        server1_mleid = server1.get_mleid()
98        server2_mleid = server2.get_mleid()
99        server3_mleid = server3.get_mleid()
100        anycast_port = 53
101
102        #-------------------------------------------------------------------
103        # Enable server1 with unicast address mode
104
105        server1.srp_server_set_addr_mode('unicast')
106        server1.srp_server_set_enabled(True)
107        self.simulator.go(5)
108
109        #-------------------------------------------------------------------
110        # Check auto start mode on client and check that server1 is selected
111
112        self.assertEqual(client.srp_client_get_auto_start_mode(), 'Enabled')
113        self.simulator.go(2)
114
115        self.assertEqual(client.srp_client_get_state(), 'Enabled')
116        self.assertEqual(client.srp_client_get_server_address(), server1_mleid)
117
118        #-------------------------------------------------------------------
119        # Disable server1 and check client is stopped/disabled.
120
121        server1.srp_server_set_enabled(False)
122        self.simulator.go(5)
123        self.assertEqual(client.srp_client_get_state(), 'Disabled')
124
125        #-------------------------------------------------------------------
126        # Enable server2 with unicast address mode and check client starts
127        # again.
128
129        server1.srp_server_set_addr_mode('unicast')
130        server2.srp_server_set_enabled(True)
131        self.simulator.go(5)
132        self.assertEqual(client.srp_client_get_state(), 'Enabled')
133        self.assertEqual(client.srp_client_get_server_address(), server2_mleid)
134
135        #-------------------------------------------------------------------
136        # Enable server1 and check that client stays with server2
137
138        server1.srp_server_set_enabled(True)
139        self.simulator.go(5)
140        self.assertEqual(client.srp_client_get_state(), 'Enabled')
141        self.assertEqual(client.srp_client_get_server_address(), server2_mleid)
142
143        #-------------------------------------------------------------------
144        # Disable server2 and check client switches to server1.
145
146        server2.srp_server_set_enabled(False)
147        self.simulator.go(5)
148        self.assertEqual(client.srp_client_get_state(), 'Enabled')
149        self.assertEqual(client.srp_client_get_server_address(), server1_mleid)
150
151        #-------------------------------------------------------------------
152        # Enable server2 with anycast mode seq-num 1, and check that client
153        # switched to it.
154
155        server2.srp_server_set_addr_mode('anycast')
156        server2.srp_server_set_anycast_seq_num(1)
157        server2.srp_server_set_enabled(True)
158        self.simulator.go(5)
159        server2_alocs = server2.get_ip6_address(config.ADDRESS_TYPE.ALOC)
160        self.assertEqual(server2.srp_server_get_anycast_seq_num(), 1)
161        self.assertEqual(client.srp_client_get_state(), 'Enabled')
162        self.assertIn(client.srp_client_get_server_address(), server2_alocs)
163        self.assertEqual(client.srp_client_get_server_port(), anycast_port)
164
165        #-------------------------------------------------------------------
166        # Enable server3 with anycast mode seq-num 2, and check that client
167        # switched to it since seq number is higher.
168
169        server3.srp_server_set_addr_mode('anycast')
170        server3.srp_server_set_anycast_seq_num(2)
171        server3.srp_server_set_enabled(True)
172        self.simulator.go(5)
173        server3_alocs = server3.get_ip6_address(config.ADDRESS_TYPE.ALOC)
174        self.assertEqual(server3.srp_server_get_anycast_seq_num(), 2)
175        self.assertEqual(client.srp_client_get_state(), 'Enabled')
176        self.assertIn(client.srp_client_get_server_address(), server3_alocs)
177        self.assertEqual(client.srp_client_get_server_port(), anycast_port)
178
179        #-------------------------------------------------------------------
180        # Disable server3 and check that client goes back to server2.
181
182        server3.srp_server_set_enabled(False)
183        self.simulator.go(5)
184        self.assertEqual(client.srp_client_get_state(), 'Enabled')
185        self.assertIn(client.srp_client_get_server_address(), server2_alocs)
186        self.assertEqual(client.srp_client_get_server_port(), anycast_port)
187
188        #-------------------------------------------------------------------
189        # Enable server3 with anycast mode seq-num 0 (which is smaller than
190        # server2 seq-num 1) and check that client stays with server2.
191
192        server3.srp_server_set_anycast_seq_num(0)
193        server3.srp_server_set_enabled(True)
194        self.simulator.go(5)
195        self.assertEqual(server3.srp_server_get_anycast_seq_num(), 0)
196        self.assertEqual(client.srp_client_get_state(), 'Enabled')
197        self.assertIn(client.srp_client_get_server_address(), server2_alocs)
198        self.assertEqual(client.srp_client_get_server_port(), anycast_port)
199
200        #-------------------------------------------------------------------
201        # Disable server2 and check that client goes back to server3.
202
203        server2.srp_server_set_enabled(False)
204        self.simulator.go(5)
205        self.assertEqual(client.srp_client_get_state(), 'Enabled')
206        server3_alocs = server3.get_ip6_address(config.ADDRESS_TYPE.ALOC)
207        self.assertIn(client.srp_client_get_server_address(), server3_alocs)
208        self.assertEqual(client.srp_client_get_server_port(), anycast_port)
209
210        #-------------------------------------------------------------------
211        # Disable server3 and check that client goes back to server1 with
212        # unicast address.
213
214        server3.srp_server_set_enabled(False)
215        self.simulator.go(5)
216        self.assertEqual(client.srp_client_get_state(), 'Enabled')
217        self.assertEqual(client.srp_client_get_server_address(), server1_mleid)
218
219        #-------------------------------------------------------------------
220        # Enable server2 with anycast mode seq-num 5, and check that client
221        # switched to it.
222
223        server2.srp_server_set_addr_mode('anycast')
224        server2.srp_server_set_anycast_seq_num(5)
225        server2.srp_server_set_enabled(True)
226        self.simulator.go(5)
227        server2_alocs = server2.get_ip6_address(config.ADDRESS_TYPE.ALOC)
228        self.assertEqual(server2.srp_server_get_anycast_seq_num(), 5)
229        self.assertEqual(client.srp_client_get_state(), 'Enabled')
230        self.assertIn(client.srp_client_get_server_address(), server2_alocs)
231        self.assertEqual(client.srp_client_get_server_port(), anycast_port)
232
233        #-------------------------------------------------------------------
234        # Publish an entry on server3 with specific unicast address
235        # This entry should be now preferred over anycast of server2.
236
237        unicast_addr3 = 'fd00:0:0:0:0:3333:beef:cafe'
238        unicast_port3 = 1234
239        server3.netdata_publish_dnssrp_unicast(unicast_addr3, unicast_port3)
240        self.simulator.go(65)
241        self.assertEqual(client.srp_client_get_state(), 'Enabled')
242        self.assertEqual(client.srp_client_get_server_address(), unicast_addr3)
243        self.assertEqual(client.srp_client_get_server_port(), unicast_port3)
244
245        #-------------------------------------------------------------------
246        # Publish an entry on server1 with specific unicast address
247        # Client should still stay with server3 which it originally selected.
248
249        unicast_addr1 = 'fd00:0:0:0:0:2222:beef:cafe'
250        unicast_port1 = 10203
251        server1.srp_server_set_enabled(False)
252        server1.netdata_publish_dnssrp_unicast(unicast_addr1, unicast_port1)
253        self.simulator.go(65)
254        self.assertEqual(client.srp_client_get_state(), 'Enabled')
255        self.assertEqual(client.srp_client_get_server_address(), unicast_addr3)
256        self.assertEqual(client.srp_client_get_server_port(), unicast_port3)
257
258        #-------------------------------------------------------------------
259        # Unpublish the entry on server3. Now client should switch to entry
260        # from server1.
261
262        server3.netdata_unpublish_dnssrp()
263        self.simulator.go(65)
264        self.assertEqual(client.srp_client_get_state(), 'Enabled')
265        self.assertEqual(client.srp_client_get_server_address(), unicast_addr1)
266        self.assertEqual(client.srp_client_get_server_port(), unicast_port1)
267
268        #-------------------------------------------------------------------
269        # Unpublish the entry on server1 and check client goes back to anycast
270        # entry from server2.
271
272        server1.netdata_unpublish_dnssrp()
273        self.simulator.go(65)
274        self.assertEqual(client.srp_client_get_state(), 'Enabled')
275        self.assertIn(client.srp_client_get_server_address(), server2_alocs)
276        self.assertEqual(client.srp_client_get_server_port(), anycast_port)
277
278        #-------------------------------------------------------------------
279        # Finally disable server2, and check that client is disabled.
280
281        server2.srp_server_set_enabled(False)
282        self.simulator.go(5)
283        self.assertEqual(client.srp_client_get_state(), 'Disabled')
284
285
286if __name__ == '__main__':
287    unittest.main()
288