1#!/usr/bin/env python3 2# 3# Copyright (c) 2022, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import ipaddress 31import unittest 32 33import command 34import config 35import thread_cert 36 37# Test description: 38# This test verifies SRP client auto host address mode. 39# 40# Topology: 41# SRP client (leader) 42# | 43# | 44# SRP server (router) 45# 46 47CLIENT = 1 48SERVER = 2 49 50 51class SrpAutoHostAddress(thread_cert.TestCase): 52 USE_MESSAGE_FACTORY = False 53 SUPPORT_NCP = False 54 55 TOPOLOGY = { 56 CLIENT: { 57 'name': 'SRP_CLIENT', 58 'mode': 'rdn', 59 }, 60 SERVER: { 61 'name': 'SRP_SERVER', 62 'mode': 'rdn', 63 }, 64 } 65 66 def test(self): 67 client = self.nodes[CLIENT] 68 server = self.nodes[SERVER] 69 70 # Deprecation interval of an SLAAC address before removal. 71 deprecate_time = 300 72 73 #------------------------------------------------------------------- 74 # Form the network. 75 76 client.srp_server_set_enabled(False) 77 client.start() 78 self.simulator.go(15) 79 self.assertEqual(client.get_state(), 'leader') 80 client.srp_client_stop() 81 82 server.start() 83 self.simulator.go(5) 84 self.assertEqual(server.get_state(), 'router') 85 86 #------------------------------------------------------------------- 87 # Enable SRP server 88 89 server.srp_server_set_enabled(True) 90 self.simulator.go(5) 91 92 #------------------------------------------------------------------- 93 # Check auto start mode on SRP client 94 95 client.srp_client_enable_auto_start_mode() 96 self.assertEqual(client.srp_client_get_auto_start_mode(), 'Enabled') 97 self.simulator.go(15) 98 99 self.assertEqual(client.srp_client_get_state(), 'Enabled') 100 101 #------------------------------------------------------------------- 102 # Set host name and enable auto host address on client 103 104 client.srp_client_set_host_name('host') 105 client.srp_client_enable_auto_host_address() 106 107 #------------------------------------------------------------------- 108 # Register a service on client 109 110 client.srp_client_add_service('test_srv', '_test._udo', 12345, 0, 0) 111 self.simulator.go(2) 112 self.check_registered_addresses(client, server) 113 114 #------------------------------------------------------------------- 115 # Add an address and check the SRP client re-registered and updated 116 # server with new address. 117 118 client.add_ipaddr('fd00:1:2:3:4:5:6:7') 119 120 self.simulator.go(5) 121 client_addresses = [addr.strip() for addr in client.get_addrs()] 122 self.assertIn('fd00:1:2:3:4:5:6:7', client_addresses) 123 self.check_registered_addresses(client, server) 124 125 #------------------------------------------------------------------- 126 # Remove the address and check the SRP client re-registered and updated 127 # server. 128 129 client.del_ipaddr('fd00:1:2:3:4:5:6:7') 130 131 self.simulator.go(5) 132 client_addresses = [addr.strip() for addr in client.get_addrs()] 133 self.assertNotIn('fd00:1:2:3:4:5:6:7', client_addresses) 134 self.check_registered_addresses(client, server) 135 136 #------------------------------------------------------------------- 137 # Add an SLAAC on-mesh prefix (which will trigger an address to be 138 # added) and check that the SRP client re-registered and updated 139 # server with the new address. 140 141 client.add_prefix('fd00:abba:cafe:bee::/64', 'paos') 142 client.register_netdata() 143 self.simulator.go(15) 144 145 slaac_addr = [addr.strip() for addr in client.get_addrs() if addr.strip().startswith('fd00:abba:cafe:bee:')] 146 self.assertEqual(len(slaac_addr), 1) 147 self.check_registered_addresses(client, server) 148 149 #------------------------------------------------------------------- 150 # Add another SLAAC on-mesh prefix and check that the SRP client 151 # re-registered and updated server with all address. 152 153 client.add_prefix('fd00:9:8:7::/64', 'paos') 154 client.register_netdata() 155 self.simulator.go(15) 156 157 slaac_addr = [addr.strip() for addr in client.get_addrs() if addr.strip().startswith('fd00:9:8:7:')] 158 self.assertEqual(len(slaac_addr), 1) 159 self.check_registered_addresses(client, server) 160 161 #------------------------------------------------------------------- 162 # Add a non-preferred SLAAC on-mesh prefix and check that the 163 # set of registered addresses remains unchanged and that the 164 # non-preferred address is not registered by SRP client. 165 166 client.add_prefix('fd00:a:b:c::/64', 'aos') 167 client.register_netdata() 168 self.simulator.go(15) 169 170 slaac_addr = [addr.strip() for addr in client.get_addrs() if addr.strip().startswith('fd00:a:b:c:')] 171 self.assertEqual(len(slaac_addr), 1) 172 self.check_registered_addresses(client, server) 173 174 #------------------------------------------------------------------- 175 # Remove the on-mesh prefix. This should trigger the 176 # associated SLAAC address to be deprecated, but it should 177 # not yet cause the client to re-register. Verify that the 178 # registered addresses on server remain unchanged. 179 180 old_registered_addresses = self.get_registered_host_addresses_from_server(server) 181 182 client.remove_prefix('fd00:abba:cafe:bee::/64') 183 client.register_netdata() 184 185 self.simulator.go(15) 186 self.assertEqual(old_registered_addresses, self.get_registered_host_addresses_from_server(server)) 187 188 # Wait until the SLAAC address deprecation time has elapsed 189 # and the address is removed. Verify that the SRP client 190 # re-registers and updates the server with the remaining 191 # address. 192 193 self.simulator.go(deprecate_time) 194 195 self.check_registered_addresses(client, server) 196 197 #------------------------------------------------------------------- 198 # Remove the next on-mesh prefix. Verify that the client does 199 # not re-register while the address is deprecating. After the 200 # address is removed, confirm that the SRP client 201 # re-registers using only the ML-EID. 202 203 old_registered_addresses = self.get_registered_host_addresses_from_server(server) 204 205 client.remove_prefix('fd00:9:8:7::/64') 206 client.register_netdata() 207 208 self.simulator.go(15) 209 self.assertEqual(old_registered_addresses, self.get_registered_host_addresses_from_server(server)) 210 211 self.simulator.go(deprecate_time) 212 213 self.check_registered_addresses(client, server) 214 215 #------------------------------------------------------------------- 216 # Add and remove the on-mesh prefix again. However, before the 217 # address deprecation time elapses and the address is removed, 218 # restart the server. This should trigger the client to 219 # re-register. Verify that the client re-registers with the 220 # most up-to-date addresses and does not register the deprecating 221 # address. 222 223 client.add_prefix('fd00:9:8:7::/64', 'paos') 224 client.register_netdata() 225 self.simulator.go(15) 226 227 slaac_addr = [addr.strip() for addr in client.get_addrs() if addr.strip().startswith('fd00:9:8:7:')] 228 self.assertEqual(len(slaac_addr), 1) 229 self.check_registered_addresses(client, server) 230 231 # Remove the prefix and verify that client does not 232 # register while the SLAAC address is deprecating. 233 234 old_registered_addresses = self.get_registered_host_addresses_from_server(server) 235 236 client.remove_prefix('fd00:9:8:7::/64') 237 client.register_netdata() 238 239 self.simulator.go(15) 240 self.assertEqual(old_registered_addresses, self.get_registered_host_addresses_from_server(server)) 241 242 # Disable and re-enable the server. This should trigger the 243 # client to re-register. Verify that the ML-EID address is 244 # now registered. 245 246 server.srp_server_set_enabled(False) 247 server.srp_server_set_enabled(True) 248 249 self.simulator.go(20) 250 251 self.check_registered_addresses(client, server) 252 253 registered_addresses = self.get_registered_host_addresses_from_server(server) 254 self.assertEqual(len(registered_addresses), 1) 255 self.assertEqual(registered_addresses[0], client.get_mleid()) 256 257 # Check that SLAAC address is still deprecating. 258 259 slaac_addr = [addr.strip() for addr in client.get_addrs() if addr.strip().startswith('fd00:9:8:7:')] 260 self.assertEqual(len(slaac_addr), 1) 261 262 #------------------------------------------------------------------- 263 # Explicitly set the host addresses (which disables the auto host 264 # address mode) and check that only the new addresses are registered. 265 266 client.srp_client_set_host_address('fd00:f:e:d:c:b:a:9') 267 self.simulator.go(5) 268 269 self.assertEqual(client.srp_client_get_host_state(), 'Registered') 270 server_hosts = server.srp_server_get_hosts() 271 self.assertEqual(len(server_hosts), 1) 272 server_host = server_hosts[0] 273 self.assertEqual(server_host['deleted'], 'false') 274 self.assertEqual(server_host['fullname'], 'host.default.service.arpa.') 275 host_addresses = [addr.strip() for addr in server_host['addresses']] 276 self.assertEqual(len(host_addresses), 1) 277 self.assertEqual(host_addresses[0], 'fd00:f:e:d:c:b:a:9') 278 279 #------------------------------------------------------------------- 280 # Re-enable auto host address mode and check that addresses are 281 # updated and registered properly. 282 283 client.srp_client_enable_auto_host_address() 284 self.simulator.go(5) 285 self.check_registered_addresses(client, server) 286 287 def get_registered_host_addresses_from_server(self, server): 288 # Check the host info on server. 289 server_hosts = server.srp_server_get_hosts() 290 self.assertEqual(len(server_hosts), 1) 291 server_host = server_hosts[0] 292 self.assertEqual(server_host['deleted'], 'false') 293 self.assertEqual(server_host['fullname'], 'host.default.service.arpa.') 294 return [addr.strip() for addr in server_host['addresses']] 295 296 def check_registered_addresses(self, client, server): 297 # Ensure client has registered successfully. 298 self.assertEqual(client.srp_client_get_host_state(), 'Registered') 299 300 # Check the host addresses on server to match client. 301 302 host_addresses = self.get_registered_host_addresses_from_server(server) 303 304 client_mleid = client.get_mleid() 305 client_addresses = [addr.split(' ')[0] for addr in client.get_addrs(verbose=True) if 'preferred:1' in addr] 306 client_addresses += [client_mleid] 307 308 # All registered addresses must be in client list of addresses. 309 310 for addr in host_addresses: 311 self.assertIn(addr, client_addresses) 312 313 # All preferred addresses on client excluding link-local and 314 # mesh-local addresses must be seen on server side. But if there 315 # was no address, then mesh-local address should be the only 316 # one registered. 317 318 checked_address = False 319 320 for addr in client_addresses: 321 if not self.is_address_link_local(addr) and not self.is_address_locator(addr) and addr != client_mleid: 322 self.assertIn(addr, host_addresses) 323 checked_address = True 324 325 if not checked_address: 326 self.assertEqual(len(host_addresses), 1) 327 self.assertIn(client_mleid, host_addresses) 328 329 def is_address_locator(self, addr): 330 # Checks if an IPv6 address is a locator (IID should match `0:ff:fe00:xxxx`) 331 u32s = addr.split(':') 332 self.assertEqual(len(u32s), 8) 333 return ':'.join(u32s[4:]).startswith('0:ff:fe00:') 334 335 def is_address_link_local(self, addr): 336 # Checks if an IPv6 address is link-local 337 return addr.startswith('fe80:') 338 339 340if __name__ == '__main__': 341 unittest.main() 342