1#!/usr/bin/env python3 2# 3# Copyright (c) 2019, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import unittest 31 32import command 33import mesh_cop 34import thread_cert 35from pktverify.consts import MLE_DATA_RESPONSE, LEAD_PET_URI, LEAD_KA_URI, MGMT_COMMISSIONER_SET_URI, NM_CHANNEL_TLV, NM_COMMISSIONER_ID_TLV, NM_COMMISSIONER_SESSION_ID_TLV, NM_STATE_TLV, NM_STEERING_DATA_TLV, NM_BORDER_AGENT_LOCATOR_TLV, LEADER_DATA_TLV, NETWORK_DATA_TLV, ACTIVE_TIMESTAMP_TLV, SOURCE_ADDRESS_TLV, NWD_COMMISSIONING_DATA_TLV, MESHCOP_ACCEPT, MESHCOP_REJECT, LEADER_ALOC 36from pktverify.packet_verifier import PacketVerifier 37from pktverify.bytes import Bytes 38 39COMMISSIONER = 1 40LEADER = 2 41 42# Test Purpose and Description: 43# ----------------------------- 44# The purpose of this test case is to verify Leader's and active Commissioner's behavior via 45# MGMT_COMMISSIONER_SET request and response 46# 47# Test Topology: 48# ------------- 49# Commissioner 50# | 51# Leader 52# 53# DUT Types: 54# ---------- 55# Leader 56# Commissioner 57 58 59class Cert_9_2_02_MGMTCommissionerSet(thread_cert.TestCase): 60 SUPPORT_NCP = False 61 62 TOPOLOGY = { 63 COMMISSIONER: { 64 'name': 'COMMISSIONER', 65 'mode': 'rdn', 66 'allowlist': [LEADER] 67 }, 68 LEADER: { 69 'name': 'LEADER', 70 'mode': 'rdn', 71 'allowlist': [COMMISSIONER] 72 }, 73 } 74 75 def test(self): 76 self.nodes[LEADER].start() 77 self.simulator.go(5) 78 self.assertEqual(self.nodes[LEADER].get_state(), 'leader') 79 80 self.nodes[COMMISSIONER].start() 81 self.simulator.go(5) 82 self.assertEqual(self.nodes[COMMISSIONER].get_state(), 'router') 83 self.simulator.get_messages_sent_by(LEADER) 84 85 self.collect_rlocs() 86 self.collect_rloc16s() 87 88 self.nodes[COMMISSIONER].commissioner_start() 89 self.simulator.go(3) 90 91 leader_messages = self.simulator.get_messages_sent_by(LEADER) 92 msg = leader_messages.next_coap_message('2.04', assert_enabled=True) 93 commissioner_session_id_tlv = command.get_sub_tlv(msg.coap.payload, mesh_cop.CommissionerSessionId) 94 95 steering_data_tlv = mesh_cop.SteeringData(bytes([0xff])) 96 self.nodes[COMMISSIONER].commissioner_mgmtset_with_tlvs([steering_data_tlv]) 97 self.simulator.go(5) 98 99 self.nodes[COMMISSIONER].commissioner_mgmtset_with_tlvs([steering_data_tlv, commissioner_session_id_tlv]) 100 self.simulator.go(5) 101 102 border_agent_locator_tlv = mesh_cop.BorderAgentLocator(0x0400) 103 self.nodes[COMMISSIONER].commissioner_mgmtset_with_tlvs( 104 [commissioner_session_id_tlv, border_agent_locator_tlv]) 105 self.simulator.go(5) 106 107 self.nodes[COMMISSIONER].commissioner_mgmtset_with_tlvs([ 108 steering_data_tlv, 109 commissioner_session_id_tlv, 110 border_agent_locator_tlv, 111 ]) 112 self.simulator.go(5) 113 114 self.nodes[COMMISSIONER].commissioner_mgmtset_with_tlvs( 115 [mesh_cop.CommissionerSessionId(0xffff), steering_data_tlv]) 116 self.simulator.go(5) 117 118 self.nodes[COMMISSIONER].commissioner_mgmtset_with_tlvs([ 119 commissioner_session_id_tlv, 120 steering_data_tlv, 121 mesh_cop.Channel(0x0, 0x0), 122 ]) 123 self.simulator.go(5) 124 125 leader_rloc = self.nodes[LEADER].get_rloc() 126 commissioner_rloc = self.nodes[COMMISSIONER].get_rloc() 127 self.assertTrue(self.nodes[COMMISSIONER].ping(leader_rloc)) 128 self.simulator.go(1) 129 self.assertTrue(self.nodes[LEADER].ping(commissioner_rloc)) 130 131 def verify(self, pv): 132 pkts = pv.pkts 133 pv.summary.show() 134 135 LEADER = pv.vars['LEADER'] 136 LEADER_RLOC = pv.vars['LEADER_RLOC'] 137 LEADER_RLOC16 = pv.vars['LEADER_RLOC16'] 138 COMMISSIONER = pv.vars['COMMISSIONER'] 139 COMMISSIONER_RLOC = pv.vars['COMMISSIONER_RLOC'] 140 141 # Step 1: Ensure topology is formed correctly 142 pv.verify_attached('COMMISSIONER', 'LEADER') 143 144 # Step 2: Commissioner sends a Set Commissioner Dataset Request (MGMT_COMMISSIONER_SET.req) 145 # to Leader Anycast or Routing Locator: 146 # CoAP Request URI 147 # CON POST coap://<L>:MM/c/cs 148 # CoAP Payload 149 # (missing Commissioner Session ID TLV) 150 # Steering Data TLV (0xFF) 151 _mgmt_set_pkt = pkts.filter_wpan_src64(COMMISSIONER).\ 152 filter_ipv6_2dsts(LEADER_ALOC, LEADER_RLOC).\ 153 filter_coap_request(MGMT_COMMISSIONER_SET_URI).\ 154 filter(lambda p: 155 [NM_STEERING_DATA_TLV] == p.coap.tlv.type and\ 156 p.thread_meshcop.tlv.steering_data == Bytes('ff') 157 ).\ 158 must_next() 159 160 # Step 3: Leader sends a Set Commissioner Dataset Response (MGMT_COMMISSIONER_SET.rsp) to 161 # Commissioner: 162 # CoAP Response Code 163 # 2.04 Changed 164 # CoAP Payload 165 # State TLV (value = Reject) 166 pkts.filter_ipv6_src_dst(_mgmt_set_pkt.ipv6.dst, COMMISSIONER_RLOC).\ 167 filter_coap_ack(MGMT_COMMISSIONER_SET_URI).\ 168 filter(lambda p: 169 [NM_STATE_TLV] == p.coap.tlv.type and\ 170 p.thread_meshcop.tlv.state == MESHCOP_REJECT 171 ).\ 172 must_next() 173 174 # Step 4: Commissioner sends a Set Commissioner Dataset Request (MGMT_COMMISSIONER_SET.req) 175 # to Leader Anycast or Routing Locator: 176 # CoAP Request URI 177 # CON POST coap://<L>:MM/c/cs 178 # CoAP Payload 179 # Commissioner Session ID TLV 180 # Steering Data TLV (0xFF) 181 _mgmt_set_pkt = pkts.filter_wpan_src64(COMMISSIONER).\ 182 filter_ipv6_2dsts(LEADER_ALOC, LEADER_RLOC).\ 183 filter_coap_request(MGMT_COMMISSIONER_SET_URI).\ 184 filter(lambda p: { 185 NM_COMMISSIONER_SESSION_ID_TLV, 186 NM_STEERING_DATA_TLV 187 } == set(p.thread_meshcop.tlv.type) and\ 188 p.thread_meshcop.tlv.steering_data == Bytes('ff') 189 ).\ 190 must_next() 191 192 # Step 5: Leader sends a Set Commissioner Dataset Response (MGMT_COMMISSIONER_SET.rsp) to 193 # Commissioner: 194 # CoAP Response Code 195 # 2.04 Changed 196 # CoAP Payload 197 # State TLV (value = Accept) 198 pkts.filter_ipv6_src_dst(_mgmt_set_pkt.ipv6.dst, COMMISSIONER_RLOC).\ 199 filter_coap_ack(MGMT_COMMISSIONER_SET_URI).\ 200 filter(lambda p: 201 [NM_STATE_TLV] == p.coap.tlv.type and\ 202 p.thread_meshcop.tlv.state == MESHCOP_ACCEPT 203 ).\ 204 must_next() 205 206 # Step 6: Leader sends a MLE Data Response to the network with the 207 # following TLVs: 208 # - Active Timestamp TLV 209 # - Leader Data TLV 210 # - Network Data TLV 211 # - Source Address TLV 212 pkts.filter_wpan_src64(LEADER).\ 213 filter_LLANMA().\ 214 filter_mle_cmd(MLE_DATA_RESPONSE).\ 215 filter(lambda p: { 216 NETWORK_DATA_TLV, 217 SOURCE_ADDRESS_TLV, 218 ACTIVE_TIMESTAMP_TLV, 219 LEADER_DATA_TLV 220 } == set(p.mle.tlv.type) and\ 221 { 222 NWD_COMMISSIONING_DATA_TLV 223 } == set(p.thread_nwd.tlv.type) and\ 224 { 225 NM_BORDER_AGENT_LOCATOR_TLV, 226 NM_COMMISSIONER_SESSION_ID_TLV, 227 NM_STEERING_DATA_TLV 228 } == set(p.thread_meshcop.tlv.type) and\ 229 p.thread_nwd.tlv.stable == [0] 230 ).\ 231 must_next() 232 233 # Step 7: Commissioner sends a Set Commissioner Dataset Request (MGMT_COMMISSIONER_SET.req) 234 # to Leader Anycast or Routing Locator: 235 # CoAP Request URI 236 # CON POST coap://<L>:MM/c/cs 237 # CoAP Payload 238 # Commissioner Session ID TLV 239 # Border Agent Locator TLV (0x0400) (not allowed TLV) 240 _mgmt_set_pkt = pkts.filter_wpan_src64(COMMISSIONER).\ 241 filter_ipv6_2dsts(LEADER_ALOC, LEADER_RLOC).\ 242 filter_coap_request(MGMT_COMMISSIONER_SET_URI).\ 243 filter(lambda p: { 244 NM_COMMISSIONER_SESSION_ID_TLV, 245 NM_BORDER_AGENT_LOCATOR_TLV 246 } == set(p.thread_meshcop.tlv.type) and\ 247 p.thread_meshcop.tlv.ba_locator == 0x0400 248 ).\ 249 must_next() 250 251 # Step 8: Leader sends a Set Commissioner Dataset Response (MGMT_COMMISSIONER_SET.rsp) to 252 # Commissioner: 253 # CoAP Response Code 254 # 2.04 Changed 255 # CoAP Payload 256 # State TLV (value = Reject) 257 pkts.filter_ipv6_src_dst(_mgmt_set_pkt.ipv6.dst, COMMISSIONER_RLOC).\ 258 filter_coap_ack(MGMT_COMMISSIONER_SET_URI).\ 259 filter(lambda p: 260 [NM_STATE_TLV] == p.coap.tlv.type and\ 261 p.thread_meshcop.tlv.state == MESHCOP_REJECT 262 ).\ 263 must_next() 264 265 # Step 9: Commissioner sends a Set Commissioner Dataset Request (MGMT_COMMISSIONER_SET.req) 266 # to Leader Anycast or Routing Locator: 267 # CoAP Request URI 268 # CON POST coap://<L>:MM/c/cs 269 # CoAP Payload 270 # Commissioner Session ID TLV 271 # Steering Data TLV (0xFF) 272 # Border Agent Locator TLV (0x0400) (not allowed TLV) 273 _mgmt_set_pkt = pkts.filter_wpan_src64(COMMISSIONER).\ 274 filter_ipv6_2dsts(LEADER_ALOC, LEADER_RLOC).\ 275 filter_coap_request(MGMT_COMMISSIONER_SET_URI).\ 276 filter(lambda p: { 277 NM_COMMISSIONER_SESSION_ID_TLV, 278 NM_STEERING_DATA_TLV, 279 NM_BORDER_AGENT_LOCATOR_TLV 280 } == set(p.thread_meshcop.tlv.type) and\ 281 p.thread_meshcop.tlv.ba_locator == 0x0400 and\ 282 p.thread_meshcop.tlv.steering_data == Bytes('ff') 283 ).\ 284 must_next() 285 286 # Step 10: Leader sends a Set Commissioner Dataset Response (MGMT_COMMISSIONER_SET.rsp) to 287 # Commissioner: 288 # CoAP Response Code 289 # 2.04 Changed 290 # CoAP Payload 291 # State TLV (value = Reject) 292 pkts.filter_ipv6_src_dst(_mgmt_set_pkt.ipv6.dst, COMMISSIONER_RLOC).\ 293 filter_coap_ack(MGMT_COMMISSIONER_SET_URI).\ 294 filter(lambda p: 295 [NM_STATE_TLV] == p.coap.tlv.type and\ 296 p.thread_meshcop.tlv.state == MESHCOP_REJECT 297 ).\ 298 must_next() 299 300 # Step 11: Commissioner sends a Set Commissioner Dataset Request (MGMT_COMMISSIONER_SET.req) 301 # to Leader Anycast or Routing Locator: 302 # CoAP Request URI 303 # CON POST coap://<L>:MM/c/cs 304 # CoAP Payload 305 # Commissioner Session ID TLV (0xFFFF) (invalid value) 306 # Steering Data TLV (0xFF) 307 _mgmt_set_pkt = pkts.filter_wpan_src64(COMMISSIONER).\ 308 filter_ipv6_2dsts(LEADER_ALOC, LEADER_RLOC).\ 309 filter_coap_request(MGMT_COMMISSIONER_SET_URI).\ 310 filter(lambda p: { 311 NM_COMMISSIONER_SESSION_ID_TLV, 312 NM_STEERING_DATA_TLV 313 } == set(p.thread_meshcop.tlv.type) and\ 314 p.thread_meshcop.tlv.commissioner_sess_id == 0xFFFF and\ 315 p.thread_meshcop.tlv.steering_data == Bytes('ff') 316 ).\ 317 must_next() 318 319 # Step 12: Leader sends a Set Commissioner Dataset Response (MGMT_COMMISSIONER_SET.rsp) to 320 # Commissioner: 321 # CoAP Response Code 322 # 2.04 Changed 323 # CoAP Payload 324 # State TLV (value = Reject) 325 pkts.filter_ipv6_src_dst(_mgmt_set_pkt.ipv6.dst, COMMISSIONER_RLOC).\ 326 filter_coap_ack(MGMT_COMMISSIONER_SET_URI).\ 327 filter(lambda p: 328 [NM_STATE_TLV] == p.coap.tlv.type and\ 329 p.thread_meshcop.tlv.state == MESHCOP_REJECT 330 ).\ 331 must_next() 332 333 # Step 13: Commissioner sends a Set Commissioner Dataset Request (MGMT_COMMISSIONER_SET.req) 334 # to Leader Anycast or Routing Locator: 335 # CoAP Request URI 336 # CON POST coap://<L>:MM/c/cs 337 # CoAP Payload 338 # Commissioner Session ID TLV 339 # Steering Data TLV (0xFF) 340 # Channel TLV (not allowed TLV) 341 _mgmt_set_pkt = pkts.filter_wpan_src64(COMMISSIONER).\ 342 filter_ipv6_2dsts(LEADER_ALOC, LEADER_RLOC).\ 343 filter_coap_request(MGMT_COMMISSIONER_SET_URI).\ 344 filter(lambda p: { 345 NM_COMMISSIONER_SESSION_ID_TLV, 346 NM_STEERING_DATA_TLV, 347 NM_CHANNEL_TLV 348 } == set(p.thread_meshcop.tlv.type) and\ 349 p.thread_meshcop.tlv.steering_data == Bytes('ff') 350 ).\ 351 must_next() 352 353 # Step 14: Leader sends a Set Commissioner Dataset Response (MGMT_COMMISSIONER_SET.rsp) to 354 # Commissioner: 355 # CoAP Response Code 356 # 2.04 Changed 357 # CoAP Payload 358 # State TLV (value = Accept) 359 pkts.filter_ipv6_src_dst(_mgmt_set_pkt.ipv6.dst, COMMISSIONER_RLOC).\ 360 filter_coap_ack(MGMT_COMMISSIONER_SET_URI).\ 361 filter(lambda p: 362 [NM_STATE_TLV] == p.coap.tlv.type and\ 363 p.thread_meshcop.tlv.state == MESHCOP_ACCEPT 364 ).\ 365 must_next() 366 367 # Step 15: Verify connectivity by sending an ICMPv6 Echo Request to the DUT mesh local address 368 _pkt = pkts.filter_ping_request().\ 369 filter_ipv6_src_dst(COMMISSIONER_RLOC, LEADER_RLOC).\ 370 must_next() 371 pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\ 372 filter_ipv6_src_dst(LEADER_RLOC, COMMISSIONER_RLOC).\ 373 must_next() 374 375 _pkt = pkts.filter_ping_request().\ 376 filter_ipv6_src_dst(LEADER_RLOC, COMMISSIONER_RLOC).\ 377 must_next() 378 pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\ 379 filter_ipv6_src_dst(COMMISSIONER_RLOC, LEADER_RLOC).\ 380 must_next() 381 382 383if __name__ == '__main__': 384 unittest.main() 385