1#!/usr/bin/env python3
2#
3#  Copyright (c) 2022, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29
30import ipaddress
31import unittest
32
33import command
34import config
35import thread_cert
36
37# Test description:
38#   This test verifies SRP client auto host address mode.
39#
40# Topology:
41#     SRP client (leader)
42#       |
43#       |
44#     SRP server (router)
45#
46
47CLIENT = 1
48SERVER = 2
49
50
51class SrpAutoHostAddress(thread_cert.TestCase):
52    USE_MESSAGE_FACTORY = False
53    SUPPORT_NCP = False
54
55    TOPOLOGY = {
56        CLIENT: {
57            'name': 'SRP_CLIENT',
58            'mode': 'rdn',
59        },
60        SERVER: {
61            'name': 'SRP_SERVER',
62            'mode': 'rdn',
63        },
64    }
65
66    def test(self):
67        client = self.nodes[CLIENT]
68        server = self.nodes[SERVER]
69
70        # Deprecation interval of an SLAAC address before removal.
71        deprecate_time = 300
72
73        #-------------------------------------------------------------------
74        # Form the network.
75
76        client.srp_server_set_enabled(False)
77        client.start()
78        self.simulator.go(15)
79        self.assertEqual(client.get_state(), 'leader')
80        client.srp_client_stop()
81
82        server.start()
83        self.simulator.go(5)
84        self.assertEqual(server.get_state(), 'router')
85
86        #-------------------------------------------------------------------
87        # Enable SRP server
88
89        server.srp_server_set_enabled(True)
90        self.simulator.go(5)
91
92        #-------------------------------------------------------------------
93        # Check auto start mode on SRP client
94
95        client.srp_client_enable_auto_start_mode()
96        self.assertEqual(client.srp_client_get_auto_start_mode(), 'Enabled')
97        self.simulator.go(15)
98
99        self.assertEqual(client.srp_client_get_state(), 'Enabled')
100
101        #-------------------------------------------------------------------
102        # Set host name and enable auto host address on client
103
104        client.srp_client_set_host_name('host')
105        client.srp_client_enable_auto_host_address()
106
107        #-------------------------------------------------------------------
108        # Register a service on client
109
110        client.srp_client_add_service('test_srv', '_test._udo', 12345, 0, 0)
111        self.simulator.go(2)
112        self.check_registered_addresses(client, server)
113
114        #-------------------------------------------------------------------
115        # Add an address and check the SRP client re-registered and updated
116        # server with new address.
117
118        client.add_ipaddr('fd00:1:2:3:4:5:6:7')
119
120        self.simulator.go(5)
121        client_addresses = [addr.strip() for addr in client.get_addrs()]
122        self.assertIn('fd00:1:2:3:4:5:6:7', client_addresses)
123        self.check_registered_addresses(client, server)
124
125        #-------------------------------------------------------------------
126        # Remove the address and check the SRP client re-registered and updated
127        # server.
128
129        client.del_ipaddr('fd00:1:2:3:4:5:6:7')
130
131        self.simulator.go(5)
132        client_addresses = [addr.strip() for addr in client.get_addrs()]
133        self.assertNotIn('fd00:1:2:3:4:5:6:7', client_addresses)
134        self.check_registered_addresses(client, server)
135
136        #-------------------------------------------------------------------
137        # Add an SLAAC on-mesh prefix (which will trigger an address to be
138        # added) and check that the SRP client re-registered and updated
139        # server with the new address.
140
141        client.add_prefix('fd00:abba:cafe:bee::/64', 'paos')
142        client.register_netdata()
143        self.simulator.go(15)
144
145        slaac_addr = [addr.strip() for addr in client.get_addrs() if addr.strip().startswith('fd00:abba:cafe:bee:')]
146        self.assertEqual(len(slaac_addr), 1)
147        self.check_registered_addresses(client, server)
148
149        #-------------------------------------------------------------------
150        # Add another SLAAC on-mesh prefix and check that the SRP client
151        # re-registered and updated server with all address.
152
153        client.add_prefix('fd00:9:8:7::/64', 'paos')
154        client.register_netdata()
155        self.simulator.go(15)
156
157        slaac_addr = [addr.strip() for addr in client.get_addrs() if addr.strip().startswith('fd00:9:8:7:')]
158        self.assertEqual(len(slaac_addr), 1)
159        self.check_registered_addresses(client, server)
160
161        #-------------------------------------------------------------------
162        # Add a non-preferred SLAAC on-mesh prefix and check that the
163        # set of registered addresses remains unchanged and that the
164        # non-preferred address is not registered by SRP client.
165
166        client.add_prefix('fd00:a:b:c::/64', 'aos')
167        client.register_netdata()
168        self.simulator.go(15)
169
170        slaac_addr = [addr.strip() for addr in client.get_addrs() if addr.strip().startswith('fd00:a:b:c:')]
171        self.assertEqual(len(slaac_addr), 1)
172        self.check_registered_addresses(client, server)
173
174        #-------------------------------------------------------------------
175        # Remove the on-mesh prefix. This should trigger the
176        # associated SLAAC address to be deprecated, but it should
177        # not yet cause the client to re-register. Verify that the
178        # registered addresses on server remain unchanged.
179
180        old_registered_addresses = self.get_registered_host_addresses_from_server(server)
181
182        client.remove_prefix('fd00:abba:cafe:bee::/64')
183        client.register_netdata()
184
185        self.simulator.go(15)
186        self.assertEqual(old_registered_addresses, self.get_registered_host_addresses_from_server(server))
187
188        # Wait until the SLAAC address deprecation time has elapsed
189        # and the address is removed. Verify that the SRP client
190        # re-registers and updates the server with the remaining
191        # address.
192
193        self.simulator.go(deprecate_time)
194
195        self.check_registered_addresses(client, server)
196
197        #-------------------------------------------------------------------
198        # Remove the next on-mesh prefix. Verify that the client does
199        # not re-register while the address is deprecating. After the
200        # address is removed, confirm that the SRP client
201        # re-registers using only the ML-EID.
202
203        old_registered_addresses = self.get_registered_host_addresses_from_server(server)
204
205        client.remove_prefix('fd00:9:8:7::/64')
206        client.register_netdata()
207
208        self.simulator.go(15)
209        self.assertEqual(old_registered_addresses, self.get_registered_host_addresses_from_server(server))
210
211        self.simulator.go(deprecate_time)
212
213        self.check_registered_addresses(client, server)
214
215        #-------------------------------------------------------------------
216        # Add and remove the on-mesh prefix again. However, before the
217        # address deprecation time elapses and the address is removed,
218        # restart the server. This should trigger the client to
219        # re-register. Verify that the client re-registers with the
220        # most up-to-date addresses and does not register the deprecating
221        # address.
222
223        client.add_prefix('fd00:9:8:7::/64', 'paos')
224        client.register_netdata()
225        self.simulator.go(15)
226
227        slaac_addr = [addr.strip() for addr in client.get_addrs() if addr.strip().startswith('fd00:9:8:7:')]
228        self.assertEqual(len(slaac_addr), 1)
229        self.check_registered_addresses(client, server)
230
231        # Remove the prefix and verify that client does not
232        # register while the SLAAC address is deprecating.
233
234        old_registered_addresses = self.get_registered_host_addresses_from_server(server)
235
236        client.remove_prefix('fd00:9:8:7::/64')
237        client.register_netdata()
238
239        self.simulator.go(15)
240        self.assertEqual(old_registered_addresses, self.get_registered_host_addresses_from_server(server))
241
242        # Disable and re-enable the server. This should trigger the
243        # client to re-register. Verify that the ML-EID address is
244        # now registered.
245
246        server.srp_server_set_enabled(False)
247        server.srp_server_set_enabled(True)
248
249        self.simulator.go(20)
250
251        self.check_registered_addresses(client, server)
252
253        registered_addresses = self.get_registered_host_addresses_from_server(server)
254        self.assertEqual(len(registered_addresses), 1)
255        self.assertEqual(registered_addresses[0], client.get_mleid())
256
257        # Check that SLAAC address is still deprecating.
258
259        slaac_addr = [addr.strip() for addr in client.get_addrs() if addr.strip().startswith('fd00:9:8:7:')]
260        self.assertEqual(len(slaac_addr), 1)
261
262        #-------------------------------------------------------------------
263        # Explicitly set the host addresses (which disables the auto host
264        # address mode) and check that only the new addresses are registered.
265
266        client.srp_client_set_host_address('fd00:f:e:d:c:b:a:9')
267        self.simulator.go(5)
268
269        self.assertEqual(client.srp_client_get_host_state(), 'Registered')
270        server_hosts = server.srp_server_get_hosts()
271        self.assertEqual(len(server_hosts), 1)
272        server_host = server_hosts[0]
273        self.assertEqual(server_host['deleted'], 'false')
274        self.assertEqual(server_host['fullname'], 'host.default.service.arpa.')
275        host_addresses = [addr.strip() for addr in server_host['addresses']]
276        self.assertEqual(len(host_addresses), 1)
277        self.assertEqual(host_addresses[0], 'fd00:f:e:d:c:b:a:9')
278
279        #-------------------------------------------------------------------
280        # Re-enable auto host address mode and check that addresses are
281        # updated and registered properly.
282
283        client.srp_client_enable_auto_host_address()
284        self.simulator.go(5)
285        self.check_registered_addresses(client, server)
286
287    def get_registered_host_addresses_from_server(self, server):
288        # Check the host info on server.
289        server_hosts = server.srp_server_get_hosts()
290        self.assertEqual(len(server_hosts), 1)
291        server_host = server_hosts[0]
292        self.assertEqual(server_host['deleted'], 'false')
293        self.assertEqual(server_host['fullname'], 'host.default.service.arpa.')
294        return [addr.strip() for addr in server_host['addresses']]
295
296    def check_registered_addresses(self, client, server):
297        # Ensure client has registered successfully.
298        self.assertEqual(client.srp_client_get_host_state(), 'Registered')
299
300        # Check the host addresses on server to match client.
301
302        host_addresses = self.get_registered_host_addresses_from_server(server)
303
304        client_mleid = client.get_mleid()
305        client_addresses = [addr.split(' ')[0] for addr in client.get_addrs(verbose=True) if 'preferred:1' in addr]
306        client_addresses += [client_mleid]
307
308        # All registered addresses must be in client list of addresses.
309
310        for addr in host_addresses:
311            self.assertIn(addr, client_addresses)
312
313        # All preferred addresses on client excluding link-local and
314        # mesh-local addresses must be seen on server side. But if there
315        # was no address, then mesh-local address should be the only
316        # one registered.
317
318        checked_address = False
319
320        for addr in client_addresses:
321            if not self.is_address_link_local(addr) and not self.is_address_locator(addr) and addr != client_mleid:
322                self.assertIn(addr, host_addresses)
323                checked_address = True
324
325        if not checked_address:
326            self.assertEqual(len(host_addresses), 1)
327            self.assertIn(client_mleid, host_addresses)
328
329    def is_address_locator(self, addr):
330        # Checks if an IPv6 address is a locator (IID should match `0:ff:fe00:xxxx`)
331        u32s = addr.split(':')
332        self.assertEqual(len(u32s), 8)
333        return ':'.join(u32s[4:]).startswith('0:ff:fe00:')
334
335    def is_address_link_local(self, addr):
336        # Checks if an IPv6 address is link-local
337        return addr.startswith('fe80:')
338
339
340if __name__ == '__main__':
341    unittest.main()
342