1#!/usr/bin/env python3 2# 3# Copyright (c) 2021, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 'AS IS' 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29import binascii 30import ipaddress 31import logging 32import unittest 33 34import config 35import thread_cert 36 37# Test description: 38# This test verifies that OTBR publishes the meshcop service using a proper 39# configuration. 40# 41# Topology: 42# ----------------(eth)----------------------------- 43# | | | 44# BR1 BR2 HOST (mDNS Browser) 45# 46# 47 48BR1 = 1 49BR2 = 2 50HOST = 3 51 52 53class PublishMeshCopService(thread_cert.TestCase): 54 USE_MESSAGE_FACTORY = False 55 56 TOPOLOGY = { 57 BR1: { 58 'name': 'BR_1', 59 'allowlist': [], 60 'is_otbr': True, 61 'version': '1.2', 62 'network_name': 'ot-br1', 63 'boot_delay': 5, 64 }, 65 BR2: { 66 'name': 'BR_2', 67 'allowlist': [], 68 'is_otbr': True, 69 'version': '1.2', 70 'network_name': 'ot-br2', 71 'boot_delay': 5, 72 }, 73 HOST: { 74 'name': 'Host', 75 'is_host': True 76 }, 77 } 78 79 def test(self): 80 host = self.nodes[HOST] 81 br1 = self.nodes[BR1] 82 br2 = self.nodes[BR2] 83 br2.disable_br() 84 85 # Use different network names to distinguish meshcop services 86 br1.set_active_dataset(updateExisting=True, network_name='ot-br1') 87 br2.set_active_dataset(updateExisting=True, network_name='ot-br2') 88 89 host.start(start_radvd=False) 90 self.simulator.go(20) 91 92 self.assertEqual(br1.get_state(), 'disabled') 93 # TODO enable this line when renaming with mDNSResponder is stable 94 # self.check_meshcop_service(br1, host) 95 br1.start() 96 self.simulator.go(config.BORDER_ROUTER_STARTUP_DELAY) 97 self.assertEqual('leader', br1.get_state()) 98 99 # start to test ephemeral key mode (ePSKc) 100 br1.ephemeral_key_enabled = True 101 self.assertTrue(br1.ephemeral_key_enabled) 102 self.simulator.go(10) 103 self.check_meshcop_service(br1, host) 104 105 # activate ePSKc mode 106 lifetime = 500_000 107 ephemeral_key = br1.activate_ephemeral_key_mode(lifetime) 108 self.assertEqual(len(ephemeral_key), 9) 109 self.assertEqual(br1.get_ephemeral_key_state(), 'active') 110 # check Meshcop-e service 111 self.check_meshcop_e_service(host, True) 112 113 # deactivate ePSKc mode in force 114 br1.deactivate_ephemeral_key_mode(retain_active_session=False) 115 self.assertEqual(br1.get_ephemeral_key_state(), 'inactive') 116 self.simulator.go(10) 117 # check Meshcop-e service 118 self.check_meshcop_e_service(host, False) 119 120 # activate ePSKc mode by default lifetime 121 lifetime = 0 122 ephemeral_key = br1.activate_ephemeral_key_mode(lifetime) 123 self.assertEqual(len(ephemeral_key), 9) 124 self.assertEqual(br1.get_ephemeral_key_state(), 'active') 125 # check Meshcop-e service 126 self.check_meshcop_e_service(host, True) 127 128 # deactivate ePSKc mode NOT in force 129 br1.deactivate_ephemeral_key_mode(retain_active_session=True) 130 self.assertEqual(br1.get_ephemeral_key_state(), 'inactive') 131 self.simulator.go(10) 132 # check Meshcop-e service 133 self.check_meshcop_e_service(host, False) 134 135 # change ephemeral key mode (ePSKc) and check Meshcop 136 br1.ephemeral_key_enabled = False 137 self.assertFalse(br1.ephemeral_key_enabled) 138 self.simulator.go(10) 139 self.check_meshcop_service(br1, host) 140 # check Meshcop-e format 141 self.check_meshcop_e_service(host, False) 142 # end of ephemeral key mode (ePSKc) test 143 144 br1.disable_backbone_router() 145 self.simulator.go(10) 146 self.check_meshcop_service(br1, host) 147 148 br1.stop() 149 br1.set_active_dataset(updateExisting=True, network_name='ot-br1-1') 150 br1.start() 151 self.simulator.go(config.LEADER_REBOOT_DELAY) 152 self.check_meshcop_service(br1, host) 153 154 # verify that there are two meshcop services 155 br2.set_active_dataset(updateExisting=True, network_name='ot-br2-1') 156 br2.start() 157 br2.disable_backbone_router() 158 br2.enable_br() 159 self.simulator.go(config.LEADER_REBOOT_DELAY) 160 161 service_instances = host.browse_mdns_services('_meshcop._udp') 162 self.assertEqual(len(service_instances), 2) 163 br1_service = self.check_meshcop_service(br1, host) 164 br2_service = self.check_meshcop_service(br2, host) 165 self.assertNotEqual(br1_service['host'], br2_service['host']) 166 167 br1.stop_otbr_service() 168 self.simulator.go(5) 169 br2.enable_backbone_router() 170 self.simulator.go(5) 171 self.assertEqual(len(host.browse_mdns_services('_meshcop._udp')), 1) 172 br1.start_otbr_service() 173 self.simulator.go(10) 174 self.assertEqual(len(host.browse_mdns_services('_meshcop._udp')), 2) 175 self.check_meshcop_service(br1, host) 176 self.check_meshcop_service(br2, host) 177 178 br1.factory_reset() 179 180 dataset = { 181 'timestamp': 1, 182 'channel': config.CHANNEL, 183 'channel_mask': config.CHANNEL_MASK, 184 'extended_panid': config.EXTENDED_PANID, 185 'mesh_local_prefix': config.MESH_LOCAL_PREFIX.split('/')[0], 186 'network_key': binascii.hexlify(config.DEFAULT_NETWORK_KEY).decode(), 187 'network_name': 'ot-br-1-3', 188 'panid': config.PANID, 189 'pskc': config.PSKC, 190 'security_policy': config.SECURITY_POLICY, 191 } 192 193 br1.set_active_dataset(**dataset) 194 self.simulator.go(10) 195 196 self.assertEqual(len(host.browse_mdns_services('_meshcop._udp')), 2) 197 self.check_meshcop_service(br1, host) 198 self.check_meshcop_service(br2, host) 199 200 def check_meshcop_service(self, br, host): 201 services = self.discover_all_meshcop_services(host) 202 for service in services: 203 if service['txt']['nn'] == br.get_network_name(): 204 self.check_meshcop_service_by_data(br, service) 205 return service 206 self.fail('MeshCoP service not found') 207 208 def check_meshcop_service_by_data(self, br, service_data): 209 sb_data = service_data['txt']['sb'].encode('raw_unicode_escape') 210 state_bitmap = int.from_bytes(sb_data, byteorder='big') 211 logging.info(bin(state_bitmap)) 212 self.assertEqual((state_bitmap & 7), 1) # connection mode = PskC 213 sb_thread_interface_status = state_bitmap >> 3 & 3 214 sb_thread_role = state_bitmap >> 9 & 3 215 device_role = br.get_state() 216 217 if device_role == 'disabled': 218 # Thread interface is not active and is not initialized with a set of valid operation network parameters 219 self.assertEqual(sb_thread_interface_status, 0) 220 self.assertEqual(sb_thread_role, 0) # Thread role is disabled 221 222 elif device_role == 'detached': 223 # Thread interface is initialized with a set of valid operation network parameters, but is not actively participating in a network 224 self.assertEqual(sb_thread_interface_status, 1) 225 self.assertEqual(sb_thread_role, 0) # Thread role is detached 226 227 elif device_role == 'child': 228 # Thread interface is initialized with a set of valid operation network parameters, and is actively part of a Network 229 self.assertEqual(sb_thread_interface_status, 2) 230 self.assertEqual(sb_thread_role, 1) # Thread role is child 231 232 elif device_role == 'router': 233 # Thread interface is initialized with a set of valid operation network parameters, and is actively part of a Network 234 self.assertEqual(sb_thread_interface_status, 2) 235 self.assertEqual(sb_thread_role, 2) # Thread role is router 236 237 elif device_role == 'leader': 238 # Thread interface is initialized with a set of valid operation network parameters, and is actively part of a Network 239 self.assertEqual(sb_thread_interface_status, 2) 240 self.assertEqual(sb_thread_role, 3) # Thread role is leader 241 242 self.assertEqual((state_bitmap >> 5 & 3), 1) # high availability 243 self.assertEqual((state_bitmap >> 7 & 1), device_role not in ['disabled', 'detached'] and 244 br.get_backbone_router_state() != 'Disabled') # BBR is enabled or not 245 self.assertEqual((state_bitmap >> 8 & 1), device_role not in ['disabled', 'detached'] and 246 br.get_backbone_router_state() == 'Primary') # BBR is primary or not 247 self.assertEqual(bool(state_bitmap >> 11 & 1), br.ephemeral_key_enabled) # ePSKc is supported or not 248 self.assertEqual(service_data['txt']['nn'], br.get_network_name()) 249 self.assertEqual(service_data['txt']['rv'], '1') 250 self.assertIn(service_data['txt']['tv'], ['1.1.0', '1.1.1', '1.2.0', '1.3.0', '1.4.0']) 251 252 def discover_services(self, host, type): 253 instance_names = host.browse_mdns_services(type) 254 services = [] 255 for instance_name in instance_names: 256 services.append(host.discover_mdns_service(instance_name, type, None)) 257 return services 258 259 def discover_all_meshcop_services(self, host): 260 return self.discover_services(host, '_meshcop._udp') 261 262 def check_meshcop_e_service(self, host, isactive): 263 services = self.discover_services(host, '_meshcop-e._udp') 264 # TODO: Meshcop-e port check etc. 265 if isactive: 266 self.assertTrue(len(services) > 0, msg='Meshcop-e service not found') 267 else: 268 self.assertEqual(len(services), 0, msg='Meshcop-e service still found after disabled') 269 270 271if __name__ == '__main__': 272 unittest.main() 273