1#!/usr/bin/env python3 2# 3# Copyright (c) 2016, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import copy 31import unittest 32 33import command 34import config 35import copy 36import ipv6 37import thread_cert 38from pktverify.consts import WIRESHARK_OVERRIDE_PREFS, ADDR_QRY_URI, ADDR_NTF_URI, NL_ML_EID_TLV, NL_RLOC16_TLV, NL_TARGET_EID_TLV 39from pktverify.packet_verifier import PacketVerifier 40from pktverify.bytes import Bytes 41 42LEADER = 1 43ROUTER1 = 2 44DUT_ROUTER2 = 3 45ROUTER3 = 4 46SED1 = 5 47PREFIX_1 = '2001::/64' 48GUA_1_START = '2001' 49PREFIX_2 = '2002::/64' 50 51# Test Purpose and Description: 52# ----------------------------- 53# The purpose of this test case is to validate that the DUT is able to generate 54# Address Query and Address Notification messages properly. 55# The Leader is configured as a Border Router with DHCPv6 server for prefixes 56# 2001:: & 2002:: 57# 58# Test Topology: 59# ------------- 60# Router_1 - Leader 61# / \ 62# Router_3 Router_2(DUT) 63# | 64# SED 65# 66# DUT Types: 67# ---------- 68# Router 69 70 71class Cert_5_3_09_AddressQuery(thread_cert.TestCase): 72 USE_MESSAGE_FACTORY = False 73 74 TOPOLOGY = { 75 LEADER: { 76 'name': 'LEADER', 77 'mode': 'rdn', 78 'allowlist': [ROUTER1, DUT_ROUTER2, ROUTER3] 79 }, 80 ROUTER1: { 81 'name': 'ROUTER_1', 82 'mode': 'rdn', 83 'allowlist': [LEADER] 84 }, 85 DUT_ROUTER2: { 86 'name': 'ROUTER_2', 87 'mode': 'rdn', 88 'allowlist': [LEADER, SED1] 89 }, 90 ROUTER3: { 91 'name': 'ROUTER_3', 92 'mode': 'rdn', 93 'allowlist': [LEADER] 94 }, 95 SED1: { 96 'name': 'SED', 97 'is_mtd': True, 98 'mode': '-', 99 'timeout': config.DEFAULT_CHILD_TIMEOUT, 100 'allowlist': [DUT_ROUTER2] 101 }, 102 } 103 104 # override wireshark preferences with case needed parameters 105 CASE_WIRESHARK_PREFS = copy.deepcopy(WIRESHARK_OVERRIDE_PREFS) 106 CASE_WIRESHARK_PREFS['6lowpan.context1'] = PREFIX_1 107 CASE_WIRESHARK_PREFS['6lowpan.context2'] = PREFIX_2 108 109 def test(self): 110 # 1 & 2 ALL: Build and verify the topology 111 self.nodes[LEADER].start() 112 self.simulator.go(config.LEADER_STARTUP_DELAY) 113 self.assertEqual(self.nodes[LEADER].get_state(), 'leader') 114 115 # Configure the LEADER to be a DHCPv6 Border Router for prefixes 116 self.nodes[LEADER].add_prefix(PREFIX_1, 'pdros') 117 self.nodes[LEADER].add_prefix(PREFIX_2, 'pdro') 118 self.nodes[LEADER].register_netdata() 119 120 self.nodes[ROUTER1].start() 121 self.simulator.go(config.ROUTER_STARTUP_DELAY) 122 self.assertEqual(self.nodes[ROUTER1].get_state(), 'router') 123 124 self.nodes[DUT_ROUTER2].start() 125 self.simulator.go(config.ROUTER_STARTUP_DELAY) 126 self.assertEqual(self.nodes[DUT_ROUTER2].get_state(), 'router') 127 128 self.nodes[ROUTER3].start() 129 self.simulator.go(config.ROUTER_STARTUP_DELAY) 130 self.assertEqual(self.nodes[ROUTER3].get_state(), 'router') 131 132 self.nodes[SED1].start() 133 self.simulator.go(5) 134 self.assertEqual(self.nodes[SED1].get_state(), 'child') 135 136 self.collect_rlocs() 137 self.collect_rloc16s() 138 self.collect_ipaddrs() 139 140 # 3 SED1: The SED1 sends an ICMPv6 Echo Request to ROUTER3 using GUA 141 # PREFIX_1 address 142 router3_addr = self.nodes[ROUTER3].get_addr(PREFIX_1) 143 self.assertTrue(router3_addr is not None) 144 self.assertTrue(self.nodes[SED1].ping(router3_addr)) 145 146 # 4 ROUTER1: ROUTER1 sends an ICMPv6 Echo Request to the SED1 using GUA 147 # PREFIX_1 address 148 sed1_addr = self.nodes[SED1].get_addr(PREFIX_1) 149 self.assertTrue(sed1_addr is not None) 150 self.assertTrue(self.nodes[ROUTER1].ping(sed1_addr)) 151 self.simulator.go(1) 152 153 # 5 SED1: SED1 sends an ICMPv6 Echo Request to the ROUTER3 using GUA 154 # PREFIX_1 address 155 self.assertTrue(self.nodes[SED1].ping(router3_addr)) 156 self.simulator.go(1) 157 158 # 6 DUT_ROUTER2: Power off ROUTER3 and wait 580s to allow LEADER to 159 # expire its Router ID 160 self.nodes[ROUTER3].stop() 161 self.simulator.go(580) 162 163 # The SED1 sends an ICMPv6 Echo Request to ROUTER3 GUA PREFIX_1 address 164 self.assertFalse(self.nodes[SED1].ping(router3_addr)) 165 self.simulator.go(1) 166 167 # 7 SED1: Power off SED1 and wait to allow DUT_ROUTER2 to timeout the 168 # child 169 self.nodes[SED1].stop() 170 self.simulator.go(3) 171 self.simulator.go(config.DEFAULT_CHILD_TIMEOUT) 172 173 # ROUTER1 sends two ICMPv6 Echo Requests to SED1 GUA PREFIX_1 address 174 self.assertFalse(self.nodes[ROUTER1].ping(sed1_addr)) 175 self.assertFalse(self.nodes[ROUTER1].ping(sed1_addr)) 176 177 def verify(self, pv): 178 pkts = pv.pkts 179 pv.summary.show() 180 181 LEADER = pv.vars['LEADER'] 182 ROUTER_1 = pv.vars['ROUTER_1'] 183 ROUTER_1_RLOC = pv.vars['ROUTER_1_RLOC'] 184 ROUTER_2 = pv.vars['ROUTER_2'] 185 ROUTER_2_RLOC = pv.vars['ROUTER_2_RLOC'] 186 ROUTER_2_RLOC16 = pv.vars['ROUTER_2_RLOC16'] 187 ROUTER_3 = pv.vars['ROUTER_3'] 188 SED = pv.vars['SED'] 189 SED_RLOC16 = pv.vars['SED_RLOC16'] 190 MM = pv.vars['MM_PORT'] 191 GUA1 = {} 192 193 for node in ('ROUTER_1', 'ROUTER_3', 'SED'): 194 for addr in pv.vars['%s_IPADDRS' % node]: 195 if addr.startswith(Bytes(GUA_1_START)): 196 GUA1[node] = addr 197 198 # Step 1: Build the topology as described 199 200 for i in range(1, 4): 201 pv.verify_attached('ROUTER_%d' % i, 'LEADER') 202 pv.verify_attached('SED', 'ROUTER_2', 'MTD') 203 204 # Step 2: SED sends an ICMPv6 Echo Request to Router_3 using GUA 2001:: 205 # address 206 # The DUT MUST generate an Address Query Request on SED’s behalf 207 # to find each node’s RLOC. 208 # The Address Query Request MUST be sent to the Realm-Local 209 # All-Routers address (FF03::2) 210 # CoAP URI-Path 211 # - NON POST coap://<FF03::2> 212 # CoAP Payload 213 # - Target EID TLV 214 # The DUT MUST receive and process the incoming Address Query 215 # Response and forward the ICMPv6 Echo Reply packet to SED 216 217 _pkt = pkts.filter_ping_request().\ 218 filter_wpan_src64(SED).\ 219 filter_ipv6_dst(GUA1['ROUTER_3']).\ 220 must_next() 221 pkts.filter_wpan_src64(ROUTER_2).\ 222 filter_RLARMA().\ 223 filter_coap_request(ADDR_QRY_URI, port=MM).\ 224 filter(lambda p: p.thread_address.tlv.target_eid == GUA1['ROUTER_3']).\ 225 must_next() 226 pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\ 227 filter_wpan_src64(ROUTER_3).\ 228 filter_ipv6_dst(GUA1['SED']).\ 229 must_next() 230 pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\ 231 filter_wpan_src64(ROUTER_2).\ 232 filter_wpan_dst16(SED_RLOC16).\ 233 must_next() 234 235 # Step 3: Router_1 sends an ICMPv6 Echo Request to SED using GUA 2001:: 236 # address 237 # The DUT MUST respond to the Address Query Request with a properly 238 # formatted Address Notification Message: 239 # CoAP URI-Path 240 # - CON POST coap://[<Address Query Source>]:MM/a/an 241 # CoAP Payload 242 # - ML-EID TLV 243 # - RLOC16 TLV 244 # - Target EID TLV 245 # The IPv6 Source address MUST be the RLOC of the originator 246 # The IPv6 Destination address MUST be the RLOC of the destination 247 248 pkts.filter_wpan_src64(ROUTER_1).\ 249 filter_RLARMA().\ 250 filter_coap_request(ADDR_QRY_URI, port=MM).\ 251 filter(lambda p: p.thread_address.tlv.target_eid == GUA1['SED']).\ 252 must_next() 253 pkts.filter_ipv6_src_dst(ROUTER_2_RLOC, ROUTER_1_RLOC).\ 254 filter_coap_request(ADDR_NTF_URI, port=MM).\ 255 filter(lambda p: { 256 NL_ML_EID_TLV, 257 NL_RLOC16_TLV, 258 NL_TARGET_EID_TLV 259 } <= set(p.coap.tlv.type) and\ 260 p.thread_address.tlv.target_eid == GUA1['SED'] and\ 261 p.thread_address.tlv.rloc16 == ROUTER_2_RLOC16 262 ).\ 263 must_next() 264 _pkt = pkts.filter_ping_request().\ 265 filter_wpan_src64(ROUTER_1).\ 266 filter_ipv6_dst(GUA1['SED']).\ 267 must_next() 268 pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\ 269 filter_wpan_src64(SED).\ 270 filter_ipv6_dst(GUA1['ROUTER_1']).\ 271 must_next() 272 273 # Step 5: SED sends an ICMPv6 Echo Request to Router_3 using GUA 2001:: 274 # address 275 # The DUT MUST not send an Address Query as Router_3 address should 276 # be cached. 277 # The DUT MUST forward the ICMPv6 Echo Reply to SED 278 279 _pkt = pkts.filter_ping_request().\ 280 filter_wpan_src64(SED).\ 281 filter_ipv6_dst(GUA1['ROUTER_3']).\ 282 must_next() 283 lstart = pkts.index 284 pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\ 285 filter_wpan_src64(ROUTER_3).\ 286 filter_ipv6_dst(GUA1['SED']).\ 287 must_next() 288 pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\ 289 filter_wpan_src64(ROUTER_2).\ 290 filter_wpan_dst16(SED_RLOC16).\ 291 must_next() 292 lend = pkts.index 293 pkts.range(lstart, lend).filter_wpan_src64(ROUTER_2).\ 294 filter_RLARMA().\ 295 filter_coap_request(ADDR_QRY_URI, port=MM).\ 296 must_not_next() 297 298 # Step 6: SED sends an ICMPv6 Echo Request to Router_3 using GUA 2001:: 299 # address 300 # The DUT MUST update its address cache and remove all entries 301 # based on Router_3’s Router ID. 302 # The DUT MUST send an Address Query to discover Router_3’s RLOC address. 303 pkts.filter_ping_request().\ 304 filter_wpan_src64(SED).\ 305 filter_ipv6_dst(GUA1['ROUTER_3']).\ 306 must_next() 307 pkts.filter_wpan_src64(ROUTER_2).\ 308 filter_RLARMA().\ 309 filter_coap_request(ADDR_QRY_URI, port=MM).\ 310 filter(lambda p: p.thread_address.tlv.target_eid == GUA1['ROUTER_3']).\ 311 must_next() 312 313 # Step 7: Router_1 sends two ICMPv6 Echo Requests to SED using GUA 2001:: 314 # address 315 # The DUT MUST NOT respond with an Address Notification message 316 317 pkts.filter_wpan_src64(ROUTER_2).\ 318 filter_ipv6_dst(ROUTER_1_RLOC).\ 319 filter_coap_request(ADDR_NTF_URI, port=MM).\ 320 must_not_next() 321 pkts.filter_ping_request().\ 322 filter_wpan_src64(ROUTER_1).\ 323 filter_ipv6_dst(GUA1['SED']).\ 324 must_next() 325 326 327if __name__ == '__main__': 328 unittest.main() 329