1#!/usr/bin/env python3
2#
3#  Copyright (c) 2021, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 'AS IS'
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29import binascii
30import ipaddress
31import logging
32import unittest
33
34import config
35import thread_cert
36
37# Test description:
38#   This test verifies that OTBR publishes the meshcop service using a proper
39#   configuration.
40#
41# Topology:
42#    ----------------(eth)-----------------------------
43#           |             |                    |
44#          BR1           BR2           HOST (mDNS Browser)
45#
46#
47
48BR1 = 1
49BR2 = 2
50HOST = 3
51
52
53class PublishMeshCopService(thread_cert.TestCase):
54    USE_MESSAGE_FACTORY = False
55
56    TOPOLOGY = {
57        BR1: {
58            'name': 'BR_1',
59            'allowlist': [],
60            'is_otbr': True,
61            'version': '1.2',
62            'network_name': 'ot-br1',
63            'boot_delay': 5,
64        },
65        BR2: {
66            'name': 'BR_2',
67            'allowlist': [],
68            'is_otbr': True,
69            'version': '1.2',
70            'network_name': 'ot-br2',
71            'boot_delay': 5,
72        },
73        HOST: {
74            'name': 'Host',
75            'is_host': True
76        },
77    }
78
79    def test(self):
80        host = self.nodes[HOST]
81        br1 = self.nodes[BR1]
82        br2 = self.nodes[BR2]
83        br2.disable_br()
84
85        # Use different network names to distinguish meshcop services
86        br1.set_active_dataset(updateExisting=True, network_name='ot-br1')
87        br2.set_active_dataset(updateExisting=True, network_name='ot-br2')
88
89        host.start(start_radvd=False)
90        self.simulator.go(20)
91
92        self.assertEqual(br1.get_state(), 'disabled')
93        # TODO enable this line when renaming with mDNSResponder is stable
94        # self.check_meshcop_service(br1, host)
95        br1.start()
96        self.simulator.go(config.BORDER_ROUTER_STARTUP_DELAY)
97        self.assertEqual('leader', br1.get_state())
98
99        # start to test ephemeral key mode (ePSKc)
100        br1.ephemeral_key_enabled = True
101        self.assertTrue(br1.ephemeral_key_enabled)
102        self.simulator.go(10)
103        self.check_meshcop_service(br1, host)
104
105        # activate ePSKc mode
106        lifetime = 500_000
107        ephemeral_key = br1.activate_ephemeral_key_mode(lifetime)
108        self.assertEqual(len(ephemeral_key), 9)
109        self.assertEqual(br1.get_ephemeral_key_state(), 'active')
110        # check Meshcop-e service
111        self.check_meshcop_e_service(host, True)
112
113        # deactivate ePSKc mode
114        br1.deactivate_ephemeral_key_mode()
115        self.assertEqual(br1.get_ephemeral_key_state(), 'inactive')
116        self.simulator.go(10)
117        # check Meshcop-e service
118        self.check_meshcop_e_service(host, False)
119
120        # change ephemeral key mode (ePSKc) and check Meshcop
121        br1.ephemeral_key_enabled = False
122        self.assertFalse(br1.ephemeral_key_enabled)
123        self.simulator.go(10)
124        self.check_meshcop_service(br1, host)
125        # check Meshcop-e format
126        self.check_meshcop_e_service(host, False)
127        # end of ephemeral key mode (ePSKc) test
128
129        br1.disable_backbone_router()
130        self.simulator.go(10)
131        self.check_meshcop_service(br1, host)
132
133        br1.stop()
134        br1.set_active_dataset(updateExisting=True, network_name='ot-br1-1')
135        br1.start()
136        self.simulator.go(config.BORDER_ROUTER_STARTUP_DELAY)
137        self.simulator.go(5)  # Needs to wait extra some time to update meshcop service on state changes.
138        self.check_meshcop_service(br1, host)
139
140        # verify that there are two meshcop services
141        br2.set_active_dataset(updateExisting=True, network_name='ot-br2-1')
142        br2.start()
143        br2.disable_backbone_router()
144        br2.enable_br()
145        self.simulator.go(config.BORDER_ROUTER_STARTUP_DELAY)
146
147        service_instances = host.browse_mdns_services('_meshcop._udp')
148        self.assertEqual(len(service_instances), 2)
149        br1_service = self.check_meshcop_service(br1, host)
150        br2_service = self.check_meshcop_service(br2, host)
151        self.assertNotEqual(br1_service['host'], br2_service['host'])
152
153        br1.stop_otbr_service()
154        self.simulator.go(5)
155        br2.enable_backbone_router()
156        self.simulator.go(5)
157        self.assertEqual(len(host.browse_mdns_services('_meshcop._udp')), 1)
158        br1.start_otbr_service()
159        self.simulator.go(10)
160        self.assertEqual(len(host.browse_mdns_services('_meshcop._udp')), 2)
161        self.check_meshcop_service(br1, host)
162        self.check_meshcop_service(br2, host)
163
164        br1.factory_reset()
165
166        dataset = {
167            'timestamp': 1,
168            'channel': config.CHANNEL,
169            'channel_mask': config.CHANNEL_MASK,
170            'extended_panid': config.EXTENDED_PANID,
171            'mesh_local_prefix': config.MESH_LOCAL_PREFIX.split('/')[0],
172            'network_key': binascii.hexlify(config.DEFAULT_NETWORK_KEY).decode(),
173            'network_name': 'ot-br-1-3',
174            'panid': config.PANID,
175            'pskc': config.PSKC,
176            'security_policy': config.SECURITY_POLICY,
177        }
178
179        br1.set_active_dataset(**dataset)
180        self.simulator.go(10)
181
182        self.assertEqual(len(host.browse_mdns_services('_meshcop._udp')), 2)
183        self.check_meshcop_service(br1, host)
184        self.check_meshcop_service(br2, host)
185
186    def check_meshcop_service(self, br, host):
187        services = self.discover_all_meshcop_services(host)
188        for service in services:
189            if service['txt']['nn'] == br.get_network_name():
190                self.check_meshcop_service_by_data(br, service)
191                return service
192        self.fail('MeshCoP service not found')
193
194    def check_meshcop_service_by_data(self, br, service_data):
195        sb_data = service_data['txt']['sb'].encode('raw_unicode_escape')
196        state_bitmap = int.from_bytes(sb_data, byteorder='big')
197        logging.info(bin(state_bitmap))
198        self.assertEqual((state_bitmap & 7), 1)  # connection mode = PskC
199        sb_thread_interface_status = state_bitmap >> 3 & 3
200        sb_thread_role = state_bitmap >> 9 & 3
201        device_role = br.get_state()
202
203        if device_role == 'disabled':
204            # Thread interface is not active and is not initialized with a set of valid operation network parameters
205            self.assertEqual(sb_thread_interface_status, 0)
206            self.assertEqual(sb_thread_role, 0)  # Thread role is disabled
207
208        elif device_role == 'detached':
209            # Thread interface is initialized with a set of valid operation network parameters, but is not actively participating in a network
210            self.assertEqual(sb_thread_interface_status, 1)
211            self.assertEqual(sb_thread_role, 0)  # Thread role is detached
212
213        elif device_role == 'child':
214            # Thread interface is initialized with a set of valid operation network parameters, and is actively part of a Network
215            self.assertEqual(sb_thread_interface_status, 2)
216            self.assertEqual(sb_thread_role, 1)  # Thread role is child
217
218        elif device_role == 'router':
219            # Thread interface is initialized with a set of valid operation network parameters, and is actively part of a Network
220            self.assertEqual(sb_thread_interface_status, 2)
221            self.assertEqual(sb_thread_role, 2)  # Thread role is router
222
223        elif device_role == 'leader':
224            # Thread interface is initialized with a set of valid operation network parameters, and is actively part of a Network
225            self.assertEqual(sb_thread_interface_status, 2)
226            self.assertEqual(sb_thread_role, 3)  # Thread role is leader
227
228        self.assertEqual((state_bitmap >> 5 & 3), 1)  # high availability
229        self.assertEqual((state_bitmap >> 7 & 1), device_role not in ['disabled', 'detached'] and
230                         br.get_backbone_router_state() != 'Disabled')  # BBR is enabled or not
231        self.assertEqual((state_bitmap >> 8 & 1), device_role not in ['disabled', 'detached'] and
232                         br.get_backbone_router_state() == 'Primary')  # BBR is primary or not
233        self.assertEqual(bool(state_bitmap >> 11 & 1), br.ephemeral_key_enabled)  # ePSKc is supported or not
234        self.assertEqual(service_data['txt']['nn'], br.get_network_name())
235        self.assertEqual(service_data['txt']['rv'], '1')
236        self.assertIn(service_data['txt']['tv'], ['1.1.0', '1.1.1', '1.2.0', '1.3.0', '1.4.0'])
237
238    def discover_services(self, host, type):
239        instance_names = host.browse_mdns_services(type)
240        services = []
241        for instance_name in instance_names:
242            services.append(host.discover_mdns_service(instance_name, type, None))
243        return services
244
245    def discover_all_meshcop_services(self, host):
246        return self.discover_services(host, '_meshcop._udp')
247
248    def check_meshcop_e_service(self, host, isactive):
249        services = self.discover_services(host, '_meshcop-e._udp')
250        # TODO: Meshcop-e port check etc.
251        if isactive:
252            self.assertTrue(len(services) > 0, msg='Meshcop-e service not found')
253        else:
254            self.assertEqual(len(services), 0, msg='Meshcop-e service still found after disabled')
255
256
257if __name__ == '__main__':
258    unittest.main()
259