1#!/usr/bin/env python3 2# 3# Copyright (c) 2016, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import unittest 31import copy 32 33import config 34import thread_cert 35from pktverify.consts import MLE_PARENT_REQUEST, MLE_DATA_RESPONSE, MLE_DATA_REQUEST, MLE_CHILD_UPDATE_REQUEST, MGMT_PENDING_SET_URI, ACTIVE_OPERATION_DATASET_TLV, ACTIVE_TIMESTAMP_TLV, PENDING_TIMESTAMP_TLV, TLV_REQUEST_TLV, NETWORK_DATA_TLV, NM_BORDER_AGENT_LOCATOR_TLV, NM_COMMISSIONER_SESSION_ID_TLV, NM_DELAY_TIMER_TLV, PENDING_OPERATION_DATASET_TLV, NWD_COMMISSIONING_DATA_TLV, LEADER_ALOC 36from pktverify.packet_verifier import PacketVerifier 37from pktverify.null_field import nullField 38 39COMMISSIONER = 1 40LEADER = 2 41DUT = 3 42 43CHANNEL_INIT = 19 44PANID_INIT = 0xface 45LEADER_ACTIVE_TIMESTAMP = 10 46 47COMMISSIONER_PENDING_TIMESTAMP = 20 48COMMISSIONER_ACTIVE_TIMESTAMP = 70 49COMMISSIONER_DELAY_TIMER = 60000 50COMMISSIONER_PENDING_CHANNEL = 20 51COMMISSIONER_PENDING_PANID = 0xafce 52 53# Test Purpose and Description: 54# ----------------------------- 55# The purpose of this test case is to verify that after a reset, the DUT 56# reattaches to the test network using parameters set in Active/Pending 57# Operational Datasets. 58# 59# Test Topology: 60# ------------- 61# Commissioner 62# | 63# Leader 64# | 65# DUT 66# 67# DUT Types: 68# ---------- 69# Router 70# ED 71# SED 72 73 74class Cert_9_2_8_PersistentDatasets_Base(thread_cert.TestCase): 75 USE_MESSAGE_FACTORY = False 76 SUPPORT_NCP = False 77 78 TOPOLOGY = { 79 COMMISSIONER: { 80 'name': 'COMMISSIONER', 81 'mode': 'rdn', 82 'panid': PANID_INIT, 83 'channel': CHANNEL_INIT, 84 'allowlist': [LEADER] 85 }, 86 LEADER: { 87 'name': 'LEADER', 88 'active_dataset': { 89 'timestamp': LEADER_ACTIVE_TIMESTAMP, 90 'panid': PANID_INIT, 91 'channel': CHANNEL_INIT 92 }, 93 'mode': 'rdn', 94 'allowlist': [COMMISSIONER, DUT] 95 }, 96 DUT: { 97 'name': 'DUT', 98 'panid': PANID_INIT, 99 'channel': CHANNEL_INIT, 100 'allowlist': [LEADER] 101 }, 102 } 103 104 def _setUpDUT(self): 105 self.nodes[DUT].add_allowlist(self.nodes[LEADER].get_addr64()) 106 self.nodes[DUT].enable_allowlist() 107 if self.TOPOLOGY[DUT]['mode'] == 'rdn': 108 self.nodes[DUT].set_router_selection_jitter(1) 109 else: 110 self.nodes[DUT].set_timeout(config.DEFAULT_CHILD_TIMEOUT) 111 112 def test(self): 113 self.nodes[LEADER].start() 114 self.simulator.go(config.LEADER_STARTUP_DELAY) 115 self.assertEqual(self.nodes[LEADER].get_state(), 'leader') 116 117 self.nodes[COMMISSIONER].start() 118 self.simulator.go(config.ROUTER_STARTUP_DELAY) 119 self.assertEqual(self.nodes[COMMISSIONER].get_state(), 'router') 120 121 self.nodes[DUT].start() 122 self.simulator.go(config.ROUTER_STARTUP_DELAY) 123 if self.TOPOLOGY[DUT]['mode'] == 'rdn': 124 self.assertEqual(self.nodes[DUT].get_state(), 'router') 125 else: 126 self.assertEqual(self.nodes[DUT].get_state(), 'child') 127 128 self.collect_rlocs() 129 130 self.nodes[COMMISSIONER].commissioner_start() 131 self.simulator.go(3) 132 133 self.nodes[COMMISSIONER].send_mgmt_pending_set( 134 pending_timestamp=COMMISSIONER_PENDING_TIMESTAMP, 135 active_timestamp=COMMISSIONER_ACTIVE_TIMESTAMP, 136 delay_timer=COMMISSIONER_DELAY_TIMER, 137 channel=COMMISSIONER_PENDING_CHANNEL, 138 panid=COMMISSIONER_PENDING_PANID, 139 ) 140 self.simulator.go(5) 141 142 # power down the DUT for 60 seconds 143 self.nodes[DUT].reset() 144 self.simulator.go(60) 145 146 # the network moves to COMMISSIONER_PENDING_PANID 147 self.assertEqual(self.nodes[LEADER].get_panid(), COMMISSIONER_PENDING_PANID) 148 self.assertEqual(self.nodes[COMMISSIONER].get_panid(), COMMISSIONER_PENDING_PANID) 149 self.assertEqual(self.nodes[LEADER].get_channel(), COMMISSIONER_PENDING_CHANNEL) 150 self.assertEqual(self.nodes[COMMISSIONER].get_channel(), COMMISSIONER_PENDING_CHANNEL) 151 152 # restart the DUT to attach to COMMISSIONER_PENDING_CHANNEL 153 self.nodes[DUT].reset() 154 self._setUpDUT() 155 self.nodes[DUT].start() 156 157 self.assertEqual(self.nodes[DUT].get_panid(), PANID_INIT) 158 self.assertEqual(self.nodes[DUT].get_channel(), CHANNEL_INIT) 159 self.simulator.go(60) 160 161 self.assertEqual(self.nodes[DUT].get_panid(), COMMISSIONER_PENDING_PANID) 162 self.assertEqual(self.nodes[DUT].get_channel(), COMMISSIONER_PENDING_CHANNEL) 163 self.collect_ipaddrs() 164 165 ipaddr = self.nodes[DUT].get_ip6_address(config.ADDRESS_TYPE.ML_EID) 166 self.assertTrue(self.nodes[COMMISSIONER].ping(ipaddr)) 167 168 def verify(self, pv): 169 pkts = pv.pkts 170 pv.summary.show() 171 172 LEADER = pv.vars['LEADER'] 173 LEADER_RLOC = pv.vars['LEADER_RLOC'] 174 COMMISSIONER = pv.vars['COMMISSIONER'] 175 COMMISSIONER_MLEID = pv.vars['COMMISSIONER_MLEID'] 176 COMMISSIONER_RLOC = pv.vars['COMMISSIONER_RLOC'] 177 DUT_EXTADDR = pv.vars['DUT'] 178 DUT_MLEID = pv.vars['DUT_MLEID'] 179 180 # Step 1: Ensure the topology is formed correctly 181 pv.verify_attached('COMMISSIONER', 'LEADER') 182 if self.TOPOLOGY[DUT]['mode'] == 'rdn': 183 pv.verify_attached('DUT', 'LEADER') 184 else: 185 pv.verify_attached('DUT', 'LEADER', 'MTD') 186 _pkt = pkts.last() 187 188 # Step 2: Commissioner to send MGMT_PENDING_SET.req to the Leader Anycast 189 # or Routing Locator: 190 # CoAP Request URI 191 # coap://[<L>]:MM/c/ps 192 # CoAP Payload 193 # - valid Commissioner Session ID TLV 194 # - Pending Timestamp TLV : 20s 195 # - Active Timestamp TLV : 70s 196 # - Delay Timer TLV : 60s 197 # - Channel TLV : 20 198 # - PAN ID TLV: 0xAFCE 199 pkts.filter_coap_request(MGMT_PENDING_SET_URI).\ 200 filter_ipv6_2dsts(LEADER_RLOC, LEADER_ALOC).\ 201 filter(lambda p: p.thread_meshcop.tlv.active_tstamp == 202 COMMISSIONER_ACTIVE_TIMESTAMP and\ 203 p.thread_meshcop.tlv.pending_tstamp == 204 COMMISSIONER_PENDING_TIMESTAMP and\ 205 p.thread_meshcop.tlv.delay_timer == 206 COMMISSIONER_DELAY_TIMER and\ 207 p.thread_meshcop.tlv.channel == 208 [COMMISSIONER_PENDING_CHANNEL] and\ 209 p.thread_meshcop.tlv.pan_id == 210 [COMMISSIONER_PENDING_PANID] and\ 211 NM_COMMISSIONER_SESSION_ID_TLV in p.thread_meshcop.tlv.type 212 ).\ 213 must_next() 214 215 # Step 3: Leader sends MGMT_PENDING_SET.rsq to the Commissioner: 216 # CoAP Response Code 217 # 2.04 Changed 218 # CoAP Payload 219 # - State TLV (value = Accept) 220 pkts.filter_coap_ack(MGMT_PENDING_SET_URI).\ 221 filter_wpan_src64(LEADER).\ 222 filter_ipv6_dst(COMMISSIONER_RLOC).\ 223 must_next().\ 224 must_verify(lambda p: p.thread_meshcop.tlv.state == 1) 225 226 if self.TOPOLOGY[DUT]['mode'] != '-': 227 # Step 4: Leader sends a multicast MLE Data Response with the new network data, 228 # including the following TLVs: 229 # - Leader Data TLV: 230 # Data Version field incremented 231 # Stable Version field incremented 232 # - Network Data TLV: 233 # - Commissioner Data TLV: 234 # Stable flag set to 0 235 # Border Agent Locator TLV 236 # Commissioner Session ID TLV 237 # - Active Timestamp TLV: 70s 238 # - Pending Timestamp TLV: 20s 239 pkts.filter_mle_cmd(MLE_DATA_RESPONSE).\ 240 filter_wpan_src64(LEADER).\ 241 filter_LLANMA().\ 242 filter(lambda p: p.mle.tlv.active_tstamp == 243 LEADER_ACTIVE_TIMESTAMP and\ 244 p.mle.tlv.pending_tstamp == 245 COMMISSIONER_PENDING_TIMESTAMP and\ 246 p.mle.tlv.leader_data.data_version != 247 (_pkt.mle.tlv.leader_data.data_version + 1) % 256 and\ 248 p.mle.tlv.leader_data.stable_data_version != 249 (_pkt.mle.tlv.leader_data.stable_data_version + 1) % 256 and\ 250 p.thread_nwd.tlv.stable == [0] and\ 251 NWD_COMMISSIONING_DATA_TLV in p.thread_nwd.tlv.type and\ 252 NM_COMMISSIONER_SESSION_ID_TLV in p.thread_meshcop.tlv.type and\ 253 NM_BORDER_AGENT_LOCATOR_TLV in p.thread_meshcop.tlv.type 254 ).\ 255 must_next() 256 else: 257 # Step 5: Leader MUST send a MLE Child Update Request or MLE Data 258 # Response to SED, including the following TLVs: 259 # - Leader Data TLV: 260 # Data Version field incremented 261 # Stable Version field incremented 262 # - Network Data TLV: 263 # - Active Timestamp TLV: 70s 264 # - Pending Timestamp TLV: 20s 265 pkts.filter_wpan_src64(LEADER).\ 266 filter_wpan_dst64(DUT_EXTADDR).\ 267 filter_mle_cmd2(MLE_CHILD_UPDATE_REQUEST, MLE_DATA_RESPONSE).\ 268 filter(lambda p: p.mle.tlv.active_tstamp == 269 LEADER_ACTIVE_TIMESTAMP and\ 270 p.mle.tlv.pending_tstamp == 271 COMMISSIONER_PENDING_TIMESTAMP and\ 272 p.mle.tlv.leader_data.data_version != 273 (_pkt.mle.tlv.leader_data.data_version + 1) % 256 and\ 274 p.mle.tlv.leader_data.stable_data_version != 275 (_pkt.mle.tlv.leader_data.stable_data_version + 1) % 256 and\ 276 NETWORK_DATA_TLV in p.mle.tlv.type 277 ).\ 278 must_next() 279 280 # Step 6: The DUT MUST send a MLE Data Request to the Leader and include its current 281 # Active Timestamp 282 pkts.filter_mle_cmd(MLE_DATA_REQUEST).\ 283 filter_wpan_src64(DUT_EXTADDR).\ 284 filter_wpan_dst64(LEADER).\ 285 filter(lambda p: { 286 TLV_REQUEST_TLV, 287 NETWORK_DATA_TLV 288 } < set(p.mle.tlv.type) and\ 289 p.thread_nwd.tlv.type is nullField and\ 290 p.mle.tlv.active_tstamp == LEADER_ACTIVE_TIMESTAMP 291 ).\ 292 must_next() 293 294 # Step 6: Leader sends a MLE Data Response including the following TLVs: 295 # - Active Timestamp TLV 296 # - Pending Timestamp TLV 297 # - Pending Operational Dataset TLV 298 pkts.filter_mle_cmd(MLE_DATA_RESPONSE).\ 299 filter_wpan_src64(LEADER).\ 300 filter_wpan_dst64(DUT_EXTADDR).\ 301 filter(lambda p: { 302 ACTIVE_TIMESTAMP_TLV, 303 PENDING_TIMESTAMP_TLV, 304 PENDING_OPERATION_DATASET_TLV 305 } < set(p.mle.tlv.type) 306 ).\ 307 must_next() 308 309 # Step 9: The DUT MUST attempt to reattach by sending Parent Request using the parameters 310 # from Active Operational Dataset (Channel ='Primary', PANID: 0xFACE) 311 # The DUT MUST then attach using the parameters from the Pending Operational 312 # Dataset (Channel = 'Secondary', PANID:0xAFCE) 313 for pan_id in (PANID_INIT, COMMISSIONER_PENDING_PANID): 314 pkts.filter_mle_cmd(MLE_PARENT_REQUEST).\ 315 filter_wpan_src64(DUT_EXTADDR).\ 316 filter_LLARMA().\ 317 filter(lambda p: p.wpan.dst_pan == pan_id).\ 318 must_next() 319 320 # Step 10: The DUT MUST respond with an ICMPv6 Echo Reply 321 _pkt = pkts.filter_ping_request().\ 322 filter_wpan_src64(COMMISSIONER).\ 323 filter_ipv6_dst(DUT_MLEID).\ 324 must_next() 325 pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\ 326 filter_wpan_src64(DUT_EXTADDR).\ 327 filter_ipv6_dst(COMMISSIONER_MLEID).\ 328 must_next() 329 330 331class Cert_9_2_8_PersistentDatasets_ROUTER(Cert_9_2_8_PersistentDatasets_Base): 332 TOPOLOGY = copy.deepcopy(Cert_9_2_8_PersistentDatasets_Base.TOPOLOGY) 333 TOPOLOGY[DUT]['mode'] = 'rdn' 334 335 336class Cert_9_2_8_PersistentDatasets_ED(Cert_9_2_8_PersistentDatasets_Base): 337 TOPOLOGY = copy.deepcopy(Cert_9_2_8_PersistentDatasets_Base.TOPOLOGY) 338 TOPOLOGY[DUT]['mode'] = 'rn' 339 TOPOLOGY[DUT]['is_mtd'] = True 340 TOPOLOGY[DUT]['timeout'] = config.DEFAULT_CHILD_TIMEOUT 341 342 343class Cert_9_2_8_PersistentDatasets_SED(Cert_9_2_8_PersistentDatasets_Base): 344 TOPOLOGY = copy.deepcopy(Cert_9_2_8_PersistentDatasets_Base.TOPOLOGY) 345 TOPOLOGY[DUT]['mode'] = '-' 346 TOPOLOGY[DUT]['is_mtd'] = True 347 TOPOLOGY[DUT]['timeout'] = config.DEFAULT_CHILD_TIMEOUT 348 349 350del (Cert_9_2_8_PersistentDatasets_Base) 351 352if __name__ == '__main__': 353 unittest.main() 354