1#!/usr/bin/env python3 2# 3# Copyright (c) 2016, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import unittest 31 32import config 33import thread_cert 34from pktverify.consts import MLE_PARENT_REQUEST, MLE_DATA_RESPONSE, MLE_DATA_REQUEST, MGMT_PENDING_SET_URI, SOURCE_ADDRESS_TLV, LEADER_DATA_TLV, ACTIVE_OPERATION_DATASET_TLV, ACTIVE_TIMESTAMP_TLV, PENDING_TIMESTAMP_TLV, TLV_REQUEST_TLV, NETWORK_DATA_TLV, NM_BORDER_AGENT_LOCATOR_TLV, NM_COMMISSIONER_SESSION_ID_TLV, NM_DELAY_TIMER_TLV, PENDING_OPERATION_DATASET_TLV, NWD_COMMISSIONING_DATA_TLV 35from pktverify.packet_verifier import PacketVerifier 36from pktverify.null_field import nullField 37 38KEY1 = '00112233445566778899aabbccddeeff' 39KEY2 = 'ffeeddccbbaa99887766554433221100' 40 41CHANNEL_INIT = 19 42PANID_INIT = 0xface 43 44COMMISSIONER = 1 45LEADER = 2 46ROUTER1 = 3 47ED1 = 4 48SED1 = 5 49 50# Test Purpose and Description: 51# ----------------------------- 52# The purpose of this test case is to confirm the DUT correctly applies 53# DELAY_TIMER_DEFAULT when the network key is changed. 54# The Commissioner first tries to set a network key update to happen too 55# soon (delay of 60s vs DELAY_TIMER_DEFAULT of 300s); the DUT is expected 56# to override the short value and communicate an appropriately longer delay 57# to the Router. 58# The Commissioner then sets a delay time longer than default; the DUT is 59# validated to not artificially clamp the longer time back to the 60# DELAY_TIMER_DEFAULT value. 61# 62# Test Topology: 63# ------------- 64# Commissioner 65# | 66# Leader 67# | 68# Router 69# / \ 70# ED SED 71# 72# DUT Types: 73# ---------- 74# Leader 75 76 77class Cert_9_2_11_NetworkKey(thread_cert.TestCase): 78 USE_MESSAGE_FACTORY = False 79 SUPPORT_NCP = False 80 81 TOPOLOGY = { 82 COMMISSIONER: { 83 'name': 'COMMISSIONER', 84 'active_dataset': { 85 'timestamp': 10, 86 'panid': PANID_INIT, 87 'channel': CHANNEL_INIT, 88 'network_key': KEY1 89 }, 90 'mode': 'rdn', 91 'allowlist': [LEADER] 92 }, 93 LEADER: { 94 'name': 'LEADER', 95 'active_dataset': { 96 'timestamp': 10, 97 'panid': PANID_INIT, 98 'channel': CHANNEL_INIT, 99 'network_key': KEY1 100 }, 101 'mode': 'rdn', 102 'allowlist': [COMMISSIONER, ROUTER1] 103 }, 104 ROUTER1: { 105 'name': 'ROUTER', 106 'active_dataset': { 107 'timestamp': 10, 108 'panid': PANID_INIT, 109 'channel': CHANNEL_INIT, 110 'network_key': KEY1 111 }, 112 'mode': 'rdn', 113 'allowlist': [LEADER, ED1, SED1] 114 }, 115 ED1: { 116 'name': 'ED', 117 'channel': CHANNEL_INIT, 118 'is_mtd': True, 119 'networkkey': KEY1, 120 'mode': 'rn', 121 'panid': PANID_INIT, 122 'allowlist': [ROUTER1] 123 }, 124 SED1: { 125 'name': 'SED', 126 'channel': CHANNEL_INIT, 127 'is_mtd': True, 128 'networkkey': KEY1, 129 'mode': '-', 130 'panid': PANID_INIT, 131 'timeout': config.DEFAULT_CHILD_TIMEOUT, 132 'allowlist': [ROUTER1] 133 }, 134 } 135 136 def test(self): 137 self.nodes[LEADER].start() 138 self.simulator.go(config.LEADER_STARTUP_DELAY) 139 self.assertEqual(self.nodes[LEADER].get_state(), 'leader') 140 141 self.nodes[COMMISSIONER].start() 142 self.simulator.go(config.ROUTER_STARTUP_DELAY) 143 self.assertEqual(self.nodes[COMMISSIONER].get_state(), 'router') 144 self.nodes[COMMISSIONER].commissioner_start() 145 self.simulator.go(3) 146 147 self.nodes[ROUTER1].start() 148 self.simulator.go(config.ROUTER_STARTUP_DELAY) 149 self.assertEqual(self.nodes[ROUTER1].get_state(), 'router') 150 151 self.nodes[ED1].start() 152 self.simulator.go(5) 153 self.assertEqual(self.nodes[ED1].get_state(), 'child') 154 155 self.nodes[SED1].start() 156 self.simulator.go(5) 157 self.assertEqual(self.nodes[SED1].get_state(), 'child') 158 159 self.collect_rlocs() 160 self.collect_ipaddrs() 161 162 self.nodes[COMMISSIONER].send_mgmt_pending_set( 163 pending_timestamp=10, 164 active_timestamp=70, 165 delay_timer=60000, 166 network_key=KEY2, 167 ) 168 self.simulator.go(310) 169 170 self.assertEqual(self.nodes[COMMISSIONER].get_networkkey(), KEY2) 171 self.assertEqual(self.nodes[LEADER].get_networkkey(), KEY2) 172 self.assertEqual(self.nodes[ROUTER1].get_networkkey(), KEY2) 173 self.assertEqual(self.nodes[ED1].get_networkkey(), KEY2) 174 self.assertEqual(self.nodes[SED1].get_networkkey(), KEY2) 175 176 ipaddr = self.nodes[LEADER].get_ip6_address(config.ADDRESS_TYPE.ML_EID) 177 self.assertTrue(self.nodes[ROUTER1].ping(ipaddr)) 178 179 self.nodes[COMMISSIONER].send_mgmt_pending_set( 180 pending_timestamp=20, 181 active_timestamp=30, 182 delay_timer=500000, 183 network_key=KEY1, 184 ) 185 self.simulator.go(510) 186 187 self.assertEqual(self.nodes[COMMISSIONER].get_networkkey(), KEY1) 188 self.assertEqual(self.nodes[LEADER].get_networkkey(), KEY1) 189 self.assertEqual(self.nodes[ROUTER1].get_networkkey(), KEY1) 190 self.assertEqual(self.nodes[ED1].get_networkkey(), KEY1) 191 self.assertEqual(self.nodes[SED1].get_networkkey(), KEY1) 192 193 ipaddr = self.nodes[LEADER].get_ip6_address(config.ADDRESS_TYPE.ML_EID) 194 self.assertTrue(self.nodes[ROUTER1].ping(ipaddr)) 195 196 def verify(self, pv): 197 pkts = pv.pkts 198 pv.summary.show() 199 200 LEADER = pv.vars['LEADER'] 201 LEADER_MLEID = pv.vars['LEADER_MLEID'] 202 COMMISSIONER = pv.vars['COMMISSIONER'] 203 COMMISSIONER_RLOC = pv.vars['COMMISSIONER_RLOC'] 204 ROUTER = pv.vars['ROUTER'] 205 ROUTER_MLEID = pv.vars['ROUTER_MLEID'] 206 ED = pv.vars['ED'] 207 SED = pv.vars['SED'] 208 209 # Step 1: Ensure the topology is formed correctly 210 for node in ('COMMISSIONER', 'ROUTER'): 211 pv.verify_attached(node, 'LEADER') 212 for node in ('ED', 'SED'): 213 pv.verify_attached(node, 'ROUTER', 'MTD') 214 _pkt = pkts.last() 215 216 # Step 3: Leader sends MGMT_PENDING_SET.rsq to the Commissioner: 217 # CoAP Response Code 218 # 2.04 Changed 219 # CoAP Payload 220 # - State TLV (value = Accept) 221 # 222 # Leader MUST multicast MLE Data Response with the new network data, 223 # including the following TLVs: 224 # - Leader Data TLV: 225 # Data Version field incremented 226 # Stable Version field incremented 227 # - Network Data TLV: 228 # - Commissioner Data TLV: 229 # Stable flag set to 0 230 # Border Agent Locator TLV 231 # Commissioner Session ID TLV 232 # - Active Timestamp TLV: 10s 233 # - Pending Timestamp TLV: 20s 234 pkts.filter_coap_ack(MGMT_PENDING_SET_URI).\ 235 filter_wpan_src64(LEADER).\ 236 filter_ipv6_dst(COMMISSIONER_RLOC).\ 237 must_next().\ 238 must_verify(lambda p: p.thread_meshcop.tlv.state == 1) 239 pkts.filter_mle_cmd(MLE_DATA_RESPONSE).\ 240 filter_wpan_src64(LEADER).\ 241 filter_LLANMA().\ 242 filter(lambda p: p.mle.tlv.active_tstamp == 10 and\ 243 p.mle.tlv.pending_tstamp == 10 and\ 244 (p.mle.tlv.leader_data.data_version - 245 _pkt.mle.tlv.leader_data.data_version) % 256 <= 127 and\ 246 (p.mle.tlv.leader_data.stable_data_version - 247 _pkt.mle.tlv.leader_data.stable_data_version) % 256 <= 127 and\ 248 p.thread_nwd.tlv.stable == [0] and\ 249 NWD_COMMISSIONING_DATA_TLV in p.thread_nwd.tlv.type and\ 250 NM_COMMISSIONER_SESSION_ID_TLV in p.thread_meshcop.tlv.type and\ 251 NM_BORDER_AGENT_LOCATOR_TLV in p.thread_meshcop.tlv.type 252 ).\ 253 must_next() 254 255 # Step 5: Leader sends a MLE Data Response to Router including the following TLVs: 256 # - Source Address TLV 257 # - Leader Data TLV 258 # - Network Data TLV 259 # - Commissioner Data TLV: 260 # Stable flag set to 0 261 # Border Agent Locator TLV 262 # Commissioner Session ID TLV 263 # - Active Timestamp TLV 264 # - Pending Timestamp TLV 265 # - Pending Operational Dataset TLV 266 # - Delay Timer TLV <greater than 200s> 267 # - Network Key TLV: New Network Key 268 # - Active Timestamp TLV <70s> 269 _dr_pkt = pkts.filter_mle_cmd(MLE_DATA_RESPONSE).\ 270 filter_wpan_src64(LEADER).\ 271 filter_wpan_dst64(ROUTER).\ 272 filter(lambda p: { 273 SOURCE_ADDRESS_TLV, 274 LEADER_DATA_TLV, 275 ACTIVE_TIMESTAMP_TLV, 276 PENDING_TIMESTAMP_TLV, 277 PENDING_OPERATION_DATASET_TLV 278 } <= set(p.mle.tlv.type) and\ 279 p.thread_nwd.tlv.stable == [0] and\ 280 NWD_COMMISSIONING_DATA_TLV in p.thread_nwd.tlv.type and\ 281 NM_COMMISSIONER_SESSION_ID_TLV in p.thread_meshcop.tlv.type and\ 282 NM_BORDER_AGENT_LOCATOR_TLV in p.thread_meshcop.tlv.type and\ 283 p.thread_meshcop.tlv.delay_timer > 200000 and\ 284 p.thread_meshcop.tlv.master_key == KEY2 and\ 285 p.thread_meshcop.tlv.active_tstamp == 70 286 ).\ 287 must_next() 288 289 # Step 8: Verify all devices now use New Network key. 290 # checked in test() 291 292 # Step 9: Verify new MAC key is generated and used when sending ICMPv6 Echo Reply 293 # is received. 294 _pkt = pkts.filter_ping_request().\ 295 filter_wpan_src64(ROUTER).\ 296 filter_ipv6_dst(LEADER_MLEID).\ 297 must_next() 298 pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\ 299 filter_wpan_src64(LEADER).\ 300 filter_ipv6_dst(ROUTER_MLEID).\ 301 must_next() 302 303 # Step 11: Leader sends MGMT_PENDING_SET.rsq to the Commissioner: 304 # CoAP Response Code 305 # 2.04 Changed 306 # CoAP Payload 307 # - State TLV (value = Accept) 308 # 309 # Leader MUST multicast MLE Data Response with the new network data, 310 # including the following TLVs: 311 # - Leader Data TLV: 312 # Data Version field incremented 313 # Stable Version field incremented 314 # - Network Data TLV: 315 # - Commissioner Data TLV: 316 # Stable flag set to 0 317 # Border Agent Locator TLV 318 # Commissioner Session ID TLV 319 # - Active Timestamp TLV: 70s 320 # - Pending Timestamp TLV: 20s 321 pkts.filter_coap_ack(MGMT_PENDING_SET_URI).\ 322 filter_wpan_src64(LEADER).\ 323 filter_ipv6_dst(COMMISSIONER_RLOC).\ 324 must_next().\ 325 must_verify(lambda p: p.thread_meshcop.tlv.state == 1) 326 pkts.filter_mle_cmd(MLE_DATA_RESPONSE).\ 327 filter_wpan_src64(LEADER).\ 328 filter_LLANMA().\ 329 filter(lambda p: p.mle.tlv.active_tstamp == 70 and\ 330 p.mle.tlv.pending_tstamp == 20 and\ 331 (p.mle.tlv.leader_data.data_version - 332 _dr_pkt.mle.tlv.leader_data.data_version) % 256 <= 127 and\ 333 (p.mle.tlv.leader_data.stable_data_version - 334 _dr_pkt.mle.tlv.leader_data.stable_data_version) % 256 <= 127 and\ 335 p.thread_nwd.tlv.stable == [0] and\ 336 NWD_COMMISSIONING_DATA_TLV in p.thread_nwd.tlv.type and\ 337 NM_COMMISSIONER_SESSION_ID_TLV in p.thread_meshcop.tlv.type and\ 338 NM_BORDER_AGENT_LOCATOR_TLV in p.thread_meshcop.tlv.type 339 ).\ 340 must_next() 341 342 # Step 13: Leader sends a MLE Data Response to Router including the following TLVs: 343 # - Source Address TLV 344 # - Leader Data TLV 345 # - Network Data TLV 346 # - Commissioner Data TLV: 347 # Stable flag set to 0 348 # Border Agent Locator TLV 349 # Commissioner Session ID TLV 350 # - Active Timestamp TLV <70s> 351 # - Pending Timestamp TLV <20s> 352 # - Pending Operational Dataset TLV 353 # - Active Timestamp TLV <30s> 354 # - Delay Timer TLV <greater than 300s> 355 # - Network Key TLV: New Network Key 356 pkts.filter_mle_cmd(MLE_DATA_RESPONSE).\ 357 filter_wpan_src64(LEADER).\ 358 filter_wpan_dst64(ROUTER).\ 359 filter(lambda p: { 360 SOURCE_ADDRESS_TLV, 361 LEADER_DATA_TLV, 362 ACTIVE_TIMESTAMP_TLV, 363 PENDING_TIMESTAMP_TLV, 364 PENDING_OPERATION_DATASET_TLV 365 } <= set(p.mle.tlv.type) and\ 366 p.thread_nwd.tlv.stable == [0] and\ 367 NWD_COMMISSIONING_DATA_TLV in p.thread_nwd.tlv.type and\ 368 NM_COMMISSIONER_SESSION_ID_TLV in p.thread_meshcop.tlv.type and\ 369 NM_BORDER_AGENT_LOCATOR_TLV in p.thread_meshcop.tlv.type and\ 370 p.mle.tlv.active_tstamp == 70 and\ 371 p.mle.tlv.pending_tstamp == 20 and\ 372 p.thread_meshcop.tlv.delay_timer > 300000 and\ 373 p.thread_meshcop.tlv.master_key == KEY1 and\ 374 p.thread_meshcop.tlv.active_tstamp == 30 375 ).\ 376 must_next() 377 378 # Step 17: The DUT MUST send an ICMPv6 Echo Reply using the new Network key 379 _pkt = pkts.filter_ping_request().\ 380 filter_wpan_src64(ROUTER).\ 381 filter_ipv6_dst(LEADER_MLEID).\ 382 must_next() 383 pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\ 384 filter_wpan_src64(LEADER).\ 385 filter_ipv6_dst(ROUTER_MLEID).\ 386 must_next() 387 388 389if __name__ == '__main__': 390 unittest.main() 391