1#!/usr/bin/env python3 2# 3# Copyright (c) 2021, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 'AS IS' 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29import binascii 30import ipaddress 31import logging 32import unittest 33 34import config 35import thread_cert 36 37# Test description: 38# This test verifies that OTBR publishes the meshcop service using a proper 39# configuration. 40# 41# Topology: 42# ----------------(eth)----------------------------- 43# | | | 44# BR1 BR2 HOST (mDNS Browser) 45# 46# 47 48BR1 = 1 49BR2 = 2 50HOST = 3 51 52 53class PublishMeshCopService(thread_cert.TestCase): 54 USE_MESSAGE_FACTORY = False 55 56 TOPOLOGY = { 57 BR1: { 58 'name': 'BR_1', 59 'allowlist': [], 60 'is_otbr': True, 61 'version': '1.2', 62 'network_name': 'ot-br1', 63 'boot_delay': 5, 64 }, 65 BR2: { 66 'name': 'BR_2', 67 'allowlist': [], 68 'is_otbr': True, 69 'version': '1.2', 70 'network_name': 'ot-br2', 71 'boot_delay': 5, 72 }, 73 HOST: { 74 'name': 'Host', 75 'is_host': True 76 }, 77 } 78 79 def test(self): 80 host = self.nodes[HOST] 81 br1 = self.nodes[BR1] 82 br2 = self.nodes[BR2] 83 br2.disable_br() 84 85 # Use different network names to distinguish meshcop services 86 br1.set_active_dataset(updateExisting=True, network_name='ot-br1') 87 br2.set_active_dataset(updateExisting=True, network_name='ot-br2') 88 89 host.start(start_radvd=False) 90 self.simulator.go(20) 91 92 self.assertEqual(br1.get_state(), 'disabled') 93 # TODO enable this line when renaming with mDNSResponder is stable 94 # self.check_meshcop_service(br1, host) 95 br1.start() 96 self.simulator.go(config.BORDER_ROUTER_STARTUP_DELAY) 97 self.assertEqual('leader', br1.get_state()) 98 99 # start to test ephemeral key mode (ePSKc) 100 br1.ephemeral_key_enabled = True 101 self.assertTrue(br1.ephemeral_key_enabled) 102 self.simulator.go(10) 103 self.check_meshcop_service(br1, host) 104 105 # activate ePSKc mode 106 lifetime = 500_000 107 ephemeral_key = br1.activate_ephemeral_key_mode(lifetime) 108 self.assertEqual(len(ephemeral_key), 9) 109 self.assertEqual(br1.get_ephemeral_key_state(), 'active') 110 # check Meshcop-e service 111 self.check_meshcop_e_service(host, True) 112 113 # deactivate ePSKc mode 114 br1.deactivate_ephemeral_key_mode() 115 self.assertEqual(br1.get_ephemeral_key_state(), 'inactive') 116 self.simulator.go(10) 117 # check Meshcop-e service 118 self.check_meshcop_e_service(host, False) 119 120 # change ephemeral key mode (ePSKc) and check Meshcop 121 br1.ephemeral_key_enabled = False 122 self.assertFalse(br1.ephemeral_key_enabled) 123 self.simulator.go(10) 124 self.check_meshcop_service(br1, host) 125 # check Meshcop-e format 126 self.check_meshcop_e_service(host, False) 127 # end of ephemeral key mode (ePSKc) test 128 129 br1.disable_backbone_router() 130 self.simulator.go(10) 131 self.check_meshcop_service(br1, host) 132 133 br1.stop() 134 br1.set_active_dataset(updateExisting=True, network_name='ot-br1-1') 135 br1.start() 136 self.simulator.go(config.BORDER_ROUTER_STARTUP_DELAY) 137 self.simulator.go(5) # Needs to wait extra some time to update meshcop service on state changes. 138 self.check_meshcop_service(br1, host) 139 140 # verify that there are two meshcop services 141 br2.set_active_dataset(updateExisting=True, network_name='ot-br2-1') 142 br2.start() 143 br2.disable_backbone_router() 144 br2.enable_br() 145 self.simulator.go(config.BORDER_ROUTER_STARTUP_DELAY) 146 147 service_instances = host.browse_mdns_services('_meshcop._udp') 148 self.assertEqual(len(service_instances), 2) 149 br1_service = self.check_meshcop_service(br1, host) 150 br2_service = self.check_meshcop_service(br2, host) 151 self.assertNotEqual(br1_service['host'], br2_service['host']) 152 153 br1.stop_otbr_service() 154 self.simulator.go(5) 155 br2.enable_backbone_router() 156 self.simulator.go(5) 157 self.assertEqual(len(host.browse_mdns_services('_meshcop._udp')), 1) 158 br1.start_otbr_service() 159 self.simulator.go(10) 160 self.assertEqual(len(host.browse_mdns_services('_meshcop._udp')), 2) 161 self.check_meshcop_service(br1, host) 162 self.check_meshcop_service(br2, host) 163 164 br1.factory_reset() 165 166 dataset = { 167 'timestamp': 1, 168 'channel': config.CHANNEL, 169 'channel_mask': config.CHANNEL_MASK, 170 'extended_panid': config.EXTENDED_PANID, 171 'mesh_local_prefix': config.MESH_LOCAL_PREFIX.split('/')[0], 172 'network_key': binascii.hexlify(config.DEFAULT_NETWORK_KEY).decode(), 173 'network_name': 'ot-br-1-3', 174 'panid': config.PANID, 175 'pskc': config.PSKC, 176 'security_policy': config.SECURITY_POLICY, 177 } 178 179 br1.set_active_dataset(**dataset) 180 self.simulator.go(10) 181 182 self.assertEqual(len(host.browse_mdns_services('_meshcop._udp')), 2) 183 self.check_meshcop_service(br1, host) 184 self.check_meshcop_service(br2, host) 185 186 def check_meshcop_service(self, br, host): 187 services = self.discover_all_meshcop_services(host) 188 for service in services: 189 if service['txt']['nn'] == br.get_network_name(): 190 self.check_meshcop_service_by_data(br, service) 191 return service 192 self.fail('MeshCoP service not found') 193 194 def check_meshcop_service_by_data(self, br, service_data): 195 sb_data = service_data['txt']['sb'].encode('raw_unicode_escape') 196 state_bitmap = int.from_bytes(sb_data, byteorder='big') 197 logging.info(bin(state_bitmap)) 198 self.assertEqual((state_bitmap & 7), 1) # connection mode = PskC 199 sb_thread_interface_status = state_bitmap >> 3 & 3 200 sb_thread_role = state_bitmap >> 9 & 3 201 device_role = br.get_state() 202 203 if device_role == 'disabled': 204 # Thread interface is not active and is not initialized with a set of valid operation network parameters 205 self.assertEqual(sb_thread_interface_status, 0) 206 self.assertEqual(sb_thread_role, 0) # Thread role is disabled 207 208 elif device_role == 'detached': 209 # Thread interface is initialized with a set of valid operation network parameters, but is not actively participating in a network 210 self.assertEqual(sb_thread_interface_status, 1) 211 self.assertEqual(sb_thread_role, 0) # Thread role is detached 212 213 elif device_role == 'child': 214 # Thread interface is initialized with a set of valid operation network parameters, and is actively part of a Network 215 self.assertEqual(sb_thread_interface_status, 2) 216 self.assertEqual(sb_thread_role, 1) # Thread role is child 217 218 elif device_role == 'router': 219 # Thread interface is initialized with a set of valid operation network parameters, and is actively part of a Network 220 self.assertEqual(sb_thread_interface_status, 2) 221 self.assertEqual(sb_thread_role, 2) # Thread role is router 222 223 elif device_role == 'leader': 224 # Thread interface is initialized with a set of valid operation network parameters, and is actively part of a Network 225 self.assertEqual(sb_thread_interface_status, 2) 226 self.assertEqual(sb_thread_role, 3) # Thread role is leader 227 228 self.assertEqual((state_bitmap >> 5 & 3), 1) # high availability 229 self.assertEqual((state_bitmap >> 7 & 1), device_role not in ['disabled', 'detached'] and 230 br.get_backbone_router_state() != 'Disabled') # BBR is enabled or not 231 self.assertEqual((state_bitmap >> 8 & 1), device_role not in ['disabled', 'detached'] and 232 br.get_backbone_router_state() == 'Primary') # BBR is primary or not 233 self.assertEqual(bool(state_bitmap >> 11 & 1), br.ephemeral_key_enabled) # ePSKc is supported or not 234 self.assertEqual(service_data['txt']['nn'], br.get_network_name()) 235 self.assertEqual(service_data['txt']['rv'], '1') 236 self.assertIn(service_data['txt']['tv'], ['1.1.0', '1.1.1', '1.2.0', '1.3.0', '1.4.0']) 237 238 def discover_services(self, host, type): 239 instance_names = host.browse_mdns_services(type) 240 services = [] 241 for instance_name in instance_names: 242 services.append(host.discover_mdns_service(instance_name, type, None)) 243 return services 244 245 def discover_all_meshcop_services(self, host): 246 return self.discover_services(host, '_meshcop._udp') 247 248 def check_meshcop_e_service(self, host, isactive): 249 services = self.discover_services(host, '_meshcop-e._udp') 250 # TODO: Meshcop-e port check etc. 251 if isactive: 252 self.assertTrue(len(services) > 0, msg='Meshcop-e service not found') 253 else: 254 self.assertEqual(len(services), 0, msg='Meshcop-e service still found after disabled') 255 256 257if __name__ == '__main__': 258 unittest.main() 259