1#!/usr/bin/env python3
2#
3#  Copyright (c) 2021, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 'AS IS'
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29import ipaddress
30import json
31import logging
32import unittest
33
34import config
35import thread_cert
36
37# Test description:
38#   This test verifies DNS-SD server works on a BR and is accessible from a Host.
39#
40# Topology:
41#    ----------------(eth)--------------------
42#           |                      |
43#          BR1 (Leader, Server)   HOST
44#         /        \
45#      CLIENT1    CLIENT2
46
47SERVER = BR1 = 1
48CLIENT1, CLIENT2 = 2, 3
49HOST = 4
50
51DOMAIN = 'default.service.arpa.'
52SERVICE = '_testsrv._udp'
53SERVICE_FULL_NAME = f'{SERVICE}.{DOMAIN}'
54
55VALID_SERVICE_NAMES = [
56    '_abc._udp.default.service.arpa.',
57    '_abc._tcp.default.service.arpa.',
58]
59
60WRONG_SERVICE_NAMES = [
61    '_testsrv._udp.default.service.xxxx.',
62    '_testsrv._txp,default.service.arpa.',
63]
64
65
66class TestDnssdServerOnBr(thread_cert.TestCase):
67    USE_MESSAGE_FACTORY = False
68
69    TOPOLOGY = {
70        BR1: {
71            'name': 'SERVER',
72            'is_otbr': True,
73            'version': '1.2',
74        },
75        CLIENT1: {
76            'name': 'CLIENT1',
77        },
78        CLIENT2: {
79            'name': 'CLIENT2',
80        },
81        HOST: {
82            'name': 'Host',
83            'is_host': True
84        },
85    }
86
87    def test(self):
88        server = br1 = self.nodes[BR1]
89        client1 = self.nodes[CLIENT1]
90        client2 = self.nodes[CLIENT2]
91        digger = host = self.nodes[HOST]
92
93        host.start(start_radvd=False)
94        self.simulator.go(5)
95
96        br1.start()
97        self.simulator.go(config.LEADER_STARTUP_DELAY)
98        self.assertEqual('leader', br1.get_state())
99        server.srp_server_set_enabled(True)
100        server.dns_upstream_query_state = False
101
102        client1.start()
103
104        self.simulator.go(config.ROUTER_STARTUP_DELAY)
105        self.assertEqual('router', client1.get_state())
106
107        client2.start()
108        self.simulator.go(config.ROUTER_STARTUP_DELAY)
109        self.assertEqual('router', client2.get_state())
110
111        self.simulator.go(10)
112
113        server_addr = server.get_ip6_address(config.ADDRESS_TYPE.OMR)[0]
114
115        # Router1 can ping to/from the Host on infra link.
116        self.assertTrue(br1.ping(host.get_ip6_address(config.ADDRESS_TYPE.ONLINK_ULA)[0], backbone=True))
117        self.assertTrue(host.ping(br1.get_ip6_address(config.ADDRESS_TYPE.OMR)[0], backbone=True))
118
119        client1_addrs = [client1.get_mleid(), client1.get_ip6_address(config.ADDRESS_TYPE.OMR)[0]]
120        self._config_srp_client_services(client1, 'ins1', 'host1', 11111, 1, 1, client1_addrs)
121
122        client2_addrs = [client2.get_mleid(), client2.get_ip6_address(config.ADDRESS_TYPE.OMR)[0]]
123        self._config_srp_client_services(client2, 'ins2', 'host2', 22222, 2, 2, client2_addrs)
124
125        ins1_full_name = f'ins1.{SERVICE_FULL_NAME}'
126        ins2_full_name = f'ins2.{SERVICE_FULL_NAME}'
127        host1_full_name = f'host1.{DOMAIN}'
128        host2_full_name = f'host2.{DOMAIN}'
129        EMPTY_TXT = {}
130
131        # check if PTR query works
132        dig_result = digger.dns_dig(server_addr, SERVICE_FULL_NAME, 'PTR')
133
134        self._assert_dig_result_matches(
135            dig_result, {
136                'QUESTION': [(SERVICE_FULL_NAME, 'IN', 'PTR')],
137                'ANSWER': [(SERVICE_FULL_NAME, 'IN', 'PTR', f'ins1.{SERVICE_FULL_NAME}'),
138                           (SERVICE_FULL_NAME, 'IN', 'PTR', f'ins2.{SERVICE_FULL_NAME}')],
139            })
140
141        # check if SRV query works
142        dig_result = digger.dns_dig(server_addr, ins1_full_name, 'SRV')
143        self._assert_dig_result_matches(
144            dig_result, {
145                'QUESTION': [(ins1_full_name, 'IN', 'SRV')],
146                'ANSWER': [(ins1_full_name, 'IN', 'SRV', 1, 1, 11111, host1_full_name),],
147                'ADDITIONAL': [
148                    (host1_full_name, 'IN', 'AAAA', client1_addrs[0]),
149                    (host1_full_name, 'IN', 'AAAA', client1_addrs[1]),
150                ],
151            })
152
153        dig_result = digger.dns_dig(server_addr, ins2_full_name, 'SRV')
154        self._assert_dig_result_matches(
155            dig_result, {
156                'QUESTION': [(ins2_full_name, 'IN', 'SRV')],
157                'ANSWER': [(ins2_full_name, 'IN', 'SRV', 2, 2, 22222, host2_full_name),],
158                'ADDITIONAL': [
159                    (host2_full_name, 'IN', 'AAAA', client2_addrs[0]),
160                    (host2_full_name, 'IN', 'AAAA', client2_addrs[1]),
161                ],
162            })
163
164        # check if TXT query works
165        dig_result = digger.dns_dig(server_addr, ins1_full_name, 'TXT')
166        self._assert_dig_result_matches(dig_result, {
167            'QUESTION': [(ins1_full_name, 'IN', 'TXT')],
168            'ANSWER': [(ins1_full_name, 'IN', 'TXT', EMPTY_TXT),],
169        })
170
171        dig_result = digger.dns_dig(server_addr, ins2_full_name, 'TXT')
172        self._assert_dig_result_matches(dig_result, {
173            'QUESTION': [(ins2_full_name, 'IN', 'TXT')],
174            'ANSWER': [(ins2_full_name, 'IN', 'TXT', EMPTY_TXT),],
175        })
176
177        # check if AAAA query works
178        dig_result = digger.dns_dig(server_addr, host1_full_name, 'AAAA')
179        self._assert_dig_result_matches(
180            dig_result, {
181                'QUESTION': [(host1_full_name, 'IN', 'AAAA'),],
182                'ANSWER': [
183                    (host1_full_name, 'IN', 'AAAA', client1_addrs[0]),
184                    (host1_full_name, 'IN', 'AAAA', client1_addrs[1]),
185                ],
186            })
187
188        dig_result = digger.dns_dig(server_addr, host2_full_name, 'AAAA')
189        self._assert_dig_result_matches(
190            dig_result, {
191                'QUESTION': [(host2_full_name, 'IN', 'AAAA'),],
192                'ANSWER': [
193                    (host2_full_name, 'IN', 'AAAA', client2_addrs[0]),
194                    (host2_full_name, 'IN', 'AAAA', client2_addrs[1]),
195                ],
196            })
197
198        # check some invalid queries
199        for qtype in ['CNAME']:
200            dig_result = digger.dns_dig(server_addr, host1_full_name, qtype)
201            self._assert_dig_result_matches(dig_result, {
202                'status': 'NOTIMP',
203            })
204
205        for service_name in WRONG_SERVICE_NAMES:
206            dig_result = digger.dns_dig(server_addr, service_name, 'PTR')
207            self._assert_dig_result_matches(dig_result, {
208                'status': 'NXDOMAIN',
209            })
210
211        # verify Discovery Proxy works for _meshcop._udp
212        self._verify_discovery_proxy_meshcop(server_addr, server.get_network_name(), digger)
213
214    def _verify_discovery_proxy_meshcop(self, server_addr, network_name, digger):
215        dp_service_name = '_meshcop._udp.default.service.arpa.'
216        dp_hostname = lambda x: x.endswith('.default.service.arpa.')
217
218        def check_border_agent_port(port):
219            return 0 < port <= 65535
220
221        dig_result = digger.dns_dig(server_addr, dp_service_name, 'PTR')
222        for answer in dig_result['ANSWER']:
223            if len(answer) >= 2 and answer[-2] == 'PTR':
224                dp_instance_name = answer[-1]
225                break
226        self._assert_dig_result_matches(
227            dig_result, {
228                'QUESTION': [(dp_service_name, 'IN', 'PTR'),],
229                'ANSWER': [(dp_service_name, 'IN', 'PTR', dp_instance_name),],
230                'ADDITIONAL': [
231                    (dp_instance_name, 'IN', 'SRV', 0, 0, check_border_agent_port, dp_hostname),
232                    (dp_instance_name, 'IN', 'TXT', lambda txt: (isinstance(txt, dict) and txt.get(
233                        'nn') == network_name and 'xp' in txt and 'tv' in txt and 'xa' in txt)),
234                ],
235            })
236
237        # Find the actual host name and IPv6 address
238        dp_ip6_address = None
239        for rr in dig_result['ADDITIONAL']:
240            if rr[3] == 'SRV':
241                dp_hostname = rr[7]
242            elif rr[3] == 'AAAA':
243                dp_ip6_address = rr[4]
244
245        assert isinstance(dp_hostname, str), dig_result
246
247        dig_result = digger.dns_dig(server_addr, dp_instance_name, 'SRV')
248        self._assert_dig_result_matches(
249            dig_result, {
250                'QUESTION': [(dp_instance_name, 'IN', 'SRV'),],
251                'ANSWER': [(dp_instance_name, 'IN', 'SRV', 0, 0, check_border_agent_port, dp_hostname),],
252                'ADDITIONAL': [(dp_instance_name, 'IN', 'TXT', lambda txt: (isinstance(txt, dict) and txt.get(
253                    'nn') == network_name and 'xp' in txt and 'tv' in txt and 'xa' in txt)),],
254            })
255
256        dig_result = digger.dns_dig(server_addr, dp_instance_name, 'TXT')
257        self._assert_dig_result_matches(
258            dig_result, {
259                'QUESTION': [(dp_instance_name, 'IN', 'TXT'),],
260                'ANSWER': [(dp_instance_name, 'IN', 'TXT', lambda txt: (isinstance(txt, dict) and txt.get(
261                    'nn') == network_name and 'xp' in txt and 'tv' in txt and 'xa' in txt)),],
262                'ADDITIONAL': [(dp_instance_name, 'IN', 'SRV', 0, 0, check_border_agent_port, dp_hostname),],
263            })
264
265        if dp_ip6_address is not None:
266            dig_result = digger.dns_dig(server_addr, dp_hostname, 'AAAA')
267
268            self._assert_dig_result_matches(dig_result, {
269                'QUESTION': [(dp_hostname, 'IN', 'AAAA'),],
270                'ANSWER': [(dp_hostname, 'IN', 'AAAA', dp_ip6_address),],
271            })
272
273    def _config_srp_client_services(self, client, instancename, hostname, port, priority, weight, addrs):
274        client.srp_client_enable_auto_start_mode()
275        client.srp_client_set_host_name(hostname)
276        client.srp_client_set_host_address(*addrs)
277        client.srp_client_add_service(instancename, SERVICE, port, priority, weight)
278
279        self.simulator.go(5)
280        self.assertEqual(client.srp_client_get_host_state(), 'Registered')
281
282    def _assert_have_question(self, dig_result, question):
283        for dig_question in dig_result['QUESTION']:
284            if self._match_record(dig_question, question):
285                return
286
287        self.fail((dig_result, question))
288
289    def _assert_have_answer(self, dig_result, record, additional=False):
290        for dig_answer in dig_result['ANSWER' if not additional else 'ADDITIONAL']:
291            dig_answer = list(dig_answer)
292            dig_answer[1:2] = []  # remove TTL from answer
293
294            record = list(record)
295
296            # convert IPv6 addresses to `ipaddress.IPv6Address` before matching
297            if dig_answer[2] == 'AAAA':
298                dig_answer[3] = ipaddress.IPv6Address(dig_answer[3])
299
300            if record[2] == 'AAAA':
301                record[3] = ipaddress.IPv6Address(record[3])
302
303            if self._match_record(dig_answer, record):
304                return
305
306            print('not match: ', dig_answer, record,
307                  list(a == b or (callable(b) and b(a)) for a, b in zip(dig_answer, record)))
308
309        self.fail((record, dig_result))
310
311    def _match_record(self, record, match):
312        assert not any(callable(elem) for elem in record), record
313
314        if record == match:
315            return True
316
317        return all(a == b or (callable(b) and b(a)) for a, b in zip(record, match))
318
319    def _assert_dig_result_matches(self, dig_result, expected_result):
320        self.assertEqual(dig_result['opcode'], expected_result.get('opcode', 'QUERY'), dig_result)
321        self.assertEqual(dig_result['status'], expected_result.get('status', 'NOERROR'), dig_result)
322
323        if 'QUESTION' in expected_result:
324            self.assertEqual(len(dig_result['QUESTION']), len(expected_result['QUESTION']), dig_result)
325
326            for question in expected_result['QUESTION']:
327                self._assert_have_question(dig_result, question)
328
329        if 'ANSWER' in expected_result:
330            self.assertEqual(len(dig_result['ANSWER']), len(expected_result['ANSWER']), dig_result)
331
332            for record in expected_result['ANSWER']:
333                self._assert_have_answer(dig_result, record, additional=False)
334
335        if 'ADDITIONAL' in expected_result:
336            self.assertGreaterEqual(len(dig_result['ADDITIONAL']), len(expected_result['ADDITIONAL']), dig_result)
337
338            for record in expected_result['ADDITIONAL']:
339                self._assert_have_answer(dig_result, record, additional=True)
340
341        logging.info("dig result matches:\r%s", json.dumps(dig_result, indent=True))
342
343
344if __name__ == '__main__':
345    unittest.main()
346