1#!/usr/bin/env python3 2# 3# Copyright (c) 2021, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 'AS IS' 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29import ipaddress 30import json 31import logging 32import unittest 33 34import config 35import thread_cert 36 37# Test description: 38# This test verifies DNS-SD server works on a BR and is accessible from a Host. 39# 40# Topology: 41# ----------------(eth)-------------------- 42# | | 43# BR1 (Leader, Server) HOST 44# / \ 45# CLIENT1 CLIENT2 46 47SERVER = BR1 = 1 48CLIENT1, CLIENT2 = 2, 3 49HOST = 4 50 51DOMAIN = 'default.service.arpa.' 52SERVICE = '_testsrv._udp' 53SERVICE_FULL_NAME = f'{SERVICE}.{DOMAIN}' 54 55VALID_SERVICE_NAMES = [ 56 '_abc._udp.default.service.arpa.', 57 '_abc._tcp.default.service.arpa.', 58] 59 60WRONG_SERVICE_NAMES = [ 61 '_testsrv._udp.default.service.xxxx.', 62 '_testsrv._txp,default.service.arpa.', 63] 64 65 66class TestDnssdServerOnBr(thread_cert.TestCase): 67 USE_MESSAGE_FACTORY = False 68 69 TOPOLOGY = { 70 BR1: { 71 'name': 'SERVER', 72 'is_otbr': True, 73 'version': '1.2', 74 }, 75 CLIENT1: { 76 'name': 'CLIENT1', 77 }, 78 CLIENT2: { 79 'name': 'CLIENT2', 80 }, 81 HOST: { 82 'name': 'Host', 83 'is_host': True 84 }, 85 } 86 87 def test(self): 88 server = br1 = self.nodes[BR1] 89 client1 = self.nodes[CLIENT1] 90 client2 = self.nodes[CLIENT2] 91 digger = host = self.nodes[HOST] 92 93 host.start(start_radvd=False) 94 self.simulator.go(5) 95 96 br1.start() 97 self.simulator.go(config.LEADER_STARTUP_DELAY) 98 self.assertEqual('leader', br1.get_state()) 99 server.srp_server_set_enabled(True) 100 server.dns_upstream_query_state = False 101 102 client1.start() 103 104 self.simulator.go(config.ROUTER_STARTUP_DELAY) 105 self.assertEqual('router', client1.get_state()) 106 107 client2.start() 108 self.simulator.go(config.ROUTER_STARTUP_DELAY) 109 self.assertEqual('router', client2.get_state()) 110 111 self.simulator.go(10) 112 113 server_addr = server.get_ip6_address(config.ADDRESS_TYPE.OMR)[0] 114 115 # Router1 can ping to/from the Host on infra link. 116 self.assertTrue(br1.ping(host.get_ip6_address(config.ADDRESS_TYPE.ONLINK_ULA)[0], backbone=True)) 117 self.assertTrue(host.ping(br1.get_ip6_address(config.ADDRESS_TYPE.OMR)[0], backbone=True)) 118 119 client1_addrs = [client1.get_mleid(), client1.get_ip6_address(config.ADDRESS_TYPE.OMR)[0]] 120 self._config_srp_client_services(client1, 'ins1', 'host1', 11111, 1, 1, client1_addrs) 121 122 client2_addrs = [client2.get_mleid(), client2.get_ip6_address(config.ADDRESS_TYPE.OMR)[0]] 123 self._config_srp_client_services(client2, 'ins2', 'host2', 22222, 2, 2, client2_addrs) 124 125 ins1_full_name = f'ins1.{SERVICE_FULL_NAME}' 126 ins2_full_name = f'ins2.{SERVICE_FULL_NAME}' 127 host1_full_name = f'host1.{DOMAIN}' 128 host2_full_name = f'host2.{DOMAIN}' 129 EMPTY_TXT = {} 130 131 # check if PTR query works 132 dig_result = digger.dns_dig(server_addr, SERVICE_FULL_NAME, 'PTR') 133 134 self._assert_dig_result_matches( 135 dig_result, { 136 'QUESTION': [(SERVICE_FULL_NAME, 'IN', 'PTR')], 137 'ANSWER': [(SERVICE_FULL_NAME, 'IN', 'PTR', f'ins1.{SERVICE_FULL_NAME}'), 138 (SERVICE_FULL_NAME, 'IN', 'PTR', f'ins2.{SERVICE_FULL_NAME}')], 139 }) 140 141 # check if SRV query works 142 dig_result = digger.dns_dig(server_addr, ins1_full_name, 'SRV') 143 self._assert_dig_result_matches( 144 dig_result, { 145 'QUESTION': [(ins1_full_name, 'IN', 'SRV')], 146 'ANSWER': [(ins1_full_name, 'IN', 'SRV', 1, 1, 11111, host1_full_name),], 147 'ADDITIONAL': [ 148 (host1_full_name, 'IN', 'AAAA', client1_addrs[0]), 149 (host1_full_name, 'IN', 'AAAA', client1_addrs[1]), 150 ], 151 }) 152 153 dig_result = digger.dns_dig(server_addr, ins2_full_name, 'SRV') 154 self._assert_dig_result_matches( 155 dig_result, { 156 'QUESTION': [(ins2_full_name, 'IN', 'SRV')], 157 'ANSWER': [(ins2_full_name, 'IN', 'SRV', 2, 2, 22222, host2_full_name),], 158 'ADDITIONAL': [ 159 (host2_full_name, 'IN', 'AAAA', client2_addrs[0]), 160 (host2_full_name, 'IN', 'AAAA', client2_addrs[1]), 161 ], 162 }) 163 164 # check if TXT query works 165 dig_result = digger.dns_dig(server_addr, ins1_full_name, 'TXT') 166 self._assert_dig_result_matches(dig_result, { 167 'QUESTION': [(ins1_full_name, 'IN', 'TXT')], 168 'ANSWER': [(ins1_full_name, 'IN', 'TXT', EMPTY_TXT),], 169 }) 170 171 dig_result = digger.dns_dig(server_addr, ins2_full_name, 'TXT') 172 self._assert_dig_result_matches(dig_result, { 173 'QUESTION': [(ins2_full_name, 'IN', 'TXT')], 174 'ANSWER': [(ins2_full_name, 'IN', 'TXT', EMPTY_TXT),], 175 }) 176 177 # check if AAAA query works 178 dig_result = digger.dns_dig(server_addr, host1_full_name, 'AAAA') 179 self._assert_dig_result_matches( 180 dig_result, { 181 'QUESTION': [(host1_full_name, 'IN', 'AAAA'),], 182 'ANSWER': [ 183 (host1_full_name, 'IN', 'AAAA', client1_addrs[0]), 184 (host1_full_name, 'IN', 'AAAA', client1_addrs[1]), 185 ], 186 }) 187 188 dig_result = digger.dns_dig(server_addr, host2_full_name, 'AAAA') 189 self._assert_dig_result_matches( 190 dig_result, { 191 'QUESTION': [(host2_full_name, 'IN', 'AAAA'),], 192 'ANSWER': [ 193 (host2_full_name, 'IN', 'AAAA', client2_addrs[0]), 194 (host2_full_name, 'IN', 'AAAA', client2_addrs[1]), 195 ], 196 }) 197 198 # check some invalid queries 199 for qtype in ['CNAME']: 200 dig_result = digger.dns_dig(server_addr, host1_full_name, qtype) 201 self._assert_dig_result_matches(dig_result, { 202 'status': 'NOTIMP', 203 }) 204 205 for service_name in WRONG_SERVICE_NAMES: 206 dig_result = digger.dns_dig(server_addr, service_name, 'PTR') 207 self._assert_dig_result_matches(dig_result, { 208 'status': 'NXDOMAIN', 209 }) 210 211 # verify Discovery Proxy works for _meshcop._udp 212 self._verify_discovery_proxy_meshcop(server_addr, server.get_network_name(), digger) 213 214 def _verify_discovery_proxy_meshcop(self, server_addr, network_name, digger): 215 dp_service_name = '_meshcop._udp.default.service.arpa.' 216 dp_hostname = lambda x: x.endswith('.default.service.arpa.') 217 218 def check_border_agent_port(port): 219 return 0 < port <= 65535 220 221 dig_result = digger.dns_dig(server_addr, dp_service_name, 'PTR') 222 for answer in dig_result['ANSWER']: 223 if len(answer) >= 2 and answer[-2] == 'PTR': 224 dp_instance_name = answer[-1] 225 break 226 self._assert_dig_result_matches( 227 dig_result, { 228 'QUESTION': [(dp_service_name, 'IN', 'PTR'),], 229 'ANSWER': [(dp_service_name, 'IN', 'PTR', dp_instance_name),], 230 'ADDITIONAL': [ 231 (dp_instance_name, 'IN', 'SRV', 0, 0, check_border_agent_port, dp_hostname), 232 (dp_instance_name, 'IN', 'TXT', lambda txt: (isinstance(txt, dict) and txt.get( 233 'nn') == network_name and 'xp' in txt and 'tv' in txt and 'xa' in txt)), 234 ], 235 }) 236 237 # Find the actual host name and IPv6 address 238 dp_ip6_address = None 239 for rr in dig_result['ADDITIONAL']: 240 if rr[3] == 'SRV': 241 dp_hostname = rr[7] 242 elif rr[3] == 'AAAA': 243 dp_ip6_address = rr[4] 244 245 assert isinstance(dp_hostname, str), dig_result 246 247 dig_result = digger.dns_dig(server_addr, dp_instance_name, 'SRV') 248 self._assert_dig_result_matches( 249 dig_result, { 250 'QUESTION': [(dp_instance_name, 'IN', 'SRV'),], 251 'ANSWER': [(dp_instance_name, 'IN', 'SRV', 0, 0, check_border_agent_port, dp_hostname),], 252 'ADDITIONAL': [(dp_instance_name, 'IN', 'TXT', lambda txt: (isinstance(txt, dict) and txt.get( 253 'nn') == network_name and 'xp' in txt and 'tv' in txt and 'xa' in txt)),], 254 }) 255 256 dig_result = digger.dns_dig(server_addr, dp_instance_name, 'TXT') 257 self._assert_dig_result_matches( 258 dig_result, { 259 'QUESTION': [(dp_instance_name, 'IN', 'TXT'),], 260 'ANSWER': [(dp_instance_name, 'IN', 'TXT', lambda txt: (isinstance(txt, dict) and txt.get( 261 'nn') == network_name and 'xp' in txt and 'tv' in txt and 'xa' in txt)),], 262 'ADDITIONAL': [(dp_instance_name, 'IN', 'SRV', 0, 0, check_border_agent_port, dp_hostname),], 263 }) 264 265 if dp_ip6_address is not None: 266 dig_result = digger.dns_dig(server_addr, dp_hostname, 'AAAA') 267 268 self._assert_dig_result_matches(dig_result, { 269 'QUESTION': [(dp_hostname, 'IN', 'AAAA'),], 270 'ANSWER': [(dp_hostname, 'IN', 'AAAA', dp_ip6_address),], 271 }) 272 273 def _config_srp_client_services(self, client, instancename, hostname, port, priority, weight, addrs): 274 client.srp_client_enable_auto_start_mode() 275 client.srp_client_set_host_name(hostname) 276 client.srp_client_set_host_address(*addrs) 277 client.srp_client_add_service(instancename, SERVICE, port, priority, weight) 278 279 self.simulator.go(5) 280 self.assertEqual(client.srp_client_get_host_state(), 'Registered') 281 282 def _assert_have_question(self, dig_result, question): 283 for dig_question in dig_result['QUESTION']: 284 if self._match_record(dig_question, question): 285 return 286 287 self.fail((dig_result, question)) 288 289 def _assert_have_answer(self, dig_result, record, additional=False): 290 for dig_answer in dig_result['ANSWER' if not additional else 'ADDITIONAL']: 291 dig_answer = list(dig_answer) 292 dig_answer[1:2] = [] # remove TTL from answer 293 294 record = list(record) 295 296 # convert IPv6 addresses to `ipaddress.IPv6Address` before matching 297 if dig_answer[2] == 'AAAA': 298 dig_answer[3] = ipaddress.IPv6Address(dig_answer[3]) 299 300 if record[2] == 'AAAA': 301 record[3] = ipaddress.IPv6Address(record[3]) 302 303 if self._match_record(dig_answer, record): 304 return 305 306 print('not match: ', dig_answer, record, 307 list(a == b or (callable(b) and b(a)) for a, b in zip(dig_answer, record))) 308 309 self.fail((record, dig_result)) 310 311 def _match_record(self, record, match): 312 assert not any(callable(elem) for elem in record), record 313 314 if record == match: 315 return True 316 317 return all(a == b or (callable(b) and b(a)) for a, b in zip(record, match)) 318 319 def _assert_dig_result_matches(self, dig_result, expected_result): 320 self.assertEqual(dig_result['opcode'], expected_result.get('opcode', 'QUERY'), dig_result) 321 self.assertEqual(dig_result['status'], expected_result.get('status', 'NOERROR'), dig_result) 322 323 if 'QUESTION' in expected_result: 324 self.assertEqual(len(dig_result['QUESTION']), len(expected_result['QUESTION']), dig_result) 325 326 for question in expected_result['QUESTION']: 327 self._assert_have_question(dig_result, question) 328 329 if 'ANSWER' in expected_result: 330 self.assertEqual(len(dig_result['ANSWER']), len(expected_result['ANSWER']), dig_result) 331 332 for record in expected_result['ANSWER']: 333 self._assert_have_answer(dig_result, record, additional=False) 334 335 if 'ADDITIONAL' in expected_result: 336 self.assertGreaterEqual(len(dig_result['ADDITIONAL']), len(expected_result['ADDITIONAL']), dig_result) 337 338 for record in expected_result['ADDITIONAL']: 339 self._assert_have_answer(dig_result, record, additional=True) 340 341 logging.info("dig result matches:\r%s", json.dumps(dig_result, indent=True)) 342 343 344if __name__ == '__main__': 345 unittest.main() 346