1#!/usr/bin/env python3 2# 3# Copyright (c) 2019, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29import logging 30from typing import Tuple 31 32from pktverify import consts 33from pktverify.consts import MLE_CHILD_ID_REQUEST, MLE_ADVERTISEMENT, MLE_CHILD_ID_RESPONSE 34from pktverify.pcap_reader import PcapReader 35from pktverify.summary import Summary 36from pktverify.test_info import TestInfo 37from pktverify.verify_result import VerifyResult 38 39 40class PacketVerifier(object): 41 """ 42 Base class for packet verifiers that runs the packet verification process 43 """ 44 NET_NAME = "OpenThread" 45 MC_PORT = 49191 46 MM_PORT = 61631 47 BB_PORT = 61631 48 LLANMA = 'ff02::1' # Link-Local All Nodes multicast address 49 LLARMA = 'ff02::2' # Link-Local All Routers multicast address 50 RLANMA = 'ff03::1' # realm-local all-nodes multicast address 51 RLARMA = 'ff03::2' # realm-local all-routers multicast address 52 RLAMFMA = 'ff03::fc' # realm-local ALL_MPL_FORWARDERS address 53 LLABMA = 'ff32:40:fd00:7d03:7d03:7d03:0:3' # Link-Local All BBRs multicast address 54 55 def __init__(self, test_info_path, wireshark_prefs=None): 56 logging.basicConfig(level=logging.INFO, 57 format='File "%(pathname)s", line %(lineno)d, in %(funcName)s\n' 58 '%(asctime)s - %(levelname)s - %(message)s') 59 60 ti = TestInfo(test_info_path) 61 if wireshark_prefs is not None: 62 pkts = PcapReader.read(ti.pcap_path, wireshark_prefs) 63 else: 64 pkts = PcapReader.read(ti.pcap_path) 65 print('loaded %d packets from %s' % (len(pkts), ti.pcap_path)) 66 self.pkts = pkts 67 self.test_info = ti 68 69 self.summary = Summary(pkts, ti) 70 self._vars = {} 71 self._add_initial_vars() 72 73 def add_vars(self, **vars): 74 """ 75 Add new variables. 76 77 :param vars: The new variables. 78 """ 79 self._vars.update(vars) 80 81 @property 82 def vars(self): 83 """ 84 :return: the dict of all variables 85 """ 86 return self._vars 87 88 def add_common_vars(self): 89 """ 90 Add common variables that is needed by many test cases. 91 """ 92 self.add_vars( 93 NET_NAME=PacketVerifier.NET_NAME, 94 MM_PORT=PacketVerifier.MM_PORT, 95 MC_PORT=PacketVerifier.MC_PORT, 96 BB_PORT=PacketVerifier.BB_PORT, 97 LLANMA=PacketVerifier.LLANMA, # Link-Local All Nodes multicast address 98 LLARMA=PacketVerifier.LLARMA, # Link-Local All Routers multicast address 99 RLANMA=PacketVerifier.RLANMA, # realm-local all-nodes multicast address 100 RLARMA=PacketVerifier.RLARMA, # realm-local all-routers multicast address 101 RLAMFMA=PacketVerifier.RLAMFMA, # realm-local ALL_MPL_FORWARDERS address 102 LLABMA=PacketVerifier.LLABMA, # Link-Local All BBRs multicast address 103 MA1=consts.MA1, 104 MA2=consts.MA2, 105 MA3=consts.MA3, 106 MA4=consts.MA4, 107 MA5=consts.MA5, 108 MA6=consts.MA6, 109 MA1g=consts.MA1g, 110 MAe1=consts.MAe1, 111 MAe2=consts.MAe2, 112 MAe3=consts.MAe3, 113 ) 114 115 def _add_initial_vars(self): 116 for i, addr in self.test_info.extaddrs.items(): 117 name = self.test_info.get_node_name(i) 118 self._vars[name] = addr 119 120 for i, addr in self.test_info.ethaddrs.items(): 121 name = self.test_info.get_node_name(i) + '_ETH' 122 self._vars[name] = addr 123 124 for i, addrs in self.test_info.ipaddrs.items(): 125 name = self.test_info.get_node_name(i) 126 self._vars[name + '_IPADDRS'] = addrs 127 for addr in addrs: 128 if addr.is_dua: 129 key = name + '_DUA' 130 elif addr.is_backbone_gua: 131 key = name + '_BGUA' 132 elif addr.is_link_local and (name + '_BGUA') in self._vars: 133 # FIXME: assume the link-local address after Backbone GUA is the Backbone Link Local address 134 key = name + '_BLLA' 135 elif addr.is_link_local: 136 key = name + '_LLA' 137 else: 138 logging.warning("IPv6 address ignored: name=%s, addr=%s, is_global=%s, is_link_local=%s", name, 139 addr, addr.is_global, addr.is_link_local) 140 continue 141 142 if key in self._vars: 143 logging.warning("duplicate IPv6 address type: name=%s, addr=%s,%s", name, addr, self._vars[key]) 144 continue 145 146 self._vars[key] = addr 147 148 for i, addr in self.test_info.mleids.items(): 149 name = self.test_info.get_node_name(i) 150 self._vars[name + '_MLEID'] = addr 151 152 for i, rloc16 in self.test_info.rloc16s.items(): 153 key = self.test_info.get_node_name(i) + '_RLOC16' 154 self._vars[key] = rloc16 155 156 for i, rloc in self.test_info.rlocs.items(): 157 key = self.test_info.get_node_name(i) + '_RLOC' 158 self._vars[key] = rloc 159 160 for i, omr in self.test_info.omrs.items(): 161 key = self.test_info.get_node_name(i) + '_OMR' 162 self._vars[key] = omr 163 164 for i, dua in self.test_info.duas.items(): 165 key = self.test_info.get_node_name(i) + '_DUA' 166 self._vars[key] = dua 167 168 if self.test_info.leader_aloc: 169 self._vars['LEADER_ALOC'] = self.test_info.leader_aloc 170 171 for k, v in self.test_info.extra_vars.items(): 172 assert k not in self._vars, k 173 logging.info("add extra var: %s = %s", k, v) 174 self._vars[k] = v 175 176 for i, topo in self.test_info.topology.items(): 177 name = self.test_info.get_node_name(i) 178 if topo['version']: 179 self._vars[name + '_VERSION'] = {'1.1': 2, '1.2': 3, '1.3': 4, '1.4': 5}[topo['version']] 180 181 def verify_attached(self, child: str, parent: str = None, child_type: str = 'FTD', pkts=None) -> VerifyResult: 182 """ 183 Verify that the device attaches to the Thread network. 184 185 :param child: The child device name. 186 :param parent: The parent device name. 187 :param child_type: The child device type (FTD, FTD-ED, MTD). 188 """ 189 result = VerifyResult() 190 assert self.is_thread_device(child), child 191 assert child_type in ('FTD', 'FTD-ED', 'MTD'), child_type 192 pkts = pkts or self.pkts 193 child_extaddr = self.vars[child] 194 195 src_pkts = pkts.filter_wpan_src64(child_extaddr) 196 if parent: 197 assert self.is_thread_device(parent), parent 198 src_pkts = pkts.filter_wpan_src64(child_extaddr).\ 199 filter_wpan_dst64(self.vars[parent]) 200 src_pkts.filter_mle_cmd(MLE_CHILD_ID_REQUEST).must_next() # Child Id Request 201 result.record_last('child_id_request', pkts) 202 203 dst_pkts = pkts.filter_wpan_dst64(child_extaddr) 204 if parent: 205 dst_pkts = pkts.filter_wpan_src64(self.vars[parent]).\ 206 filter_wpan_dst64(child_extaddr) 207 dst_pkts.filter_mle_cmd(MLE_CHILD_ID_RESPONSE).must_next() # Child Id Response 208 result.record_last('child_id_response', pkts) 209 210 with pkts.save_index(): 211 if child_type == 'FTD': 212 src_pkts = pkts.filter_wpan_src64(child_extaddr) 213 src_pkts.filter_mle_cmd(MLE_ADVERTISEMENT).must_next() # MLE Advertisement 214 result.record_last('mle_advertisement', pkts) 215 logging.info(f"verify attached: d={child}, result={result}") 216 217 return result 218 219 def verify_ping(self, src: str, dst: str, bbr: str = None, pkts: 'PacketVerifier' = None) -> VerifyResult: 220 """ 221 Verify the ping process. 222 223 :param src: The source device name. 224 :param dst: The destination device name. 225 :param bbr: The Backbone Router name. 226 If specified, this method also verifies that the ping request and reply be forwarded by the Backbone Router. 227 :param pkts: The PacketFilter to search. 228 229 :return: The verification result. 230 """ 231 if bbr: 232 assert not (self.is_thread_device(src) and self.is_thread_device(dst)), \ 233 f"both {src} and {dst} are WPAN devices" 234 assert not (self.is_backbone_device(src) and self.is_backbone_device(dst)), \ 235 f"both {src} and {dst} are ETH devices" 236 237 if pkts is None: 238 pkts = self.pkts 239 240 src_dua = self.vars[src + '_DUA'] 241 dst_dua = self.vars[dst + '_DUA'] 242 if bbr: 243 bbr_ext = self.vars[bbr] 244 bbr_eth = self.vars[bbr + '_ETH'] 245 246 result = VerifyResult() 247 ping_req = pkts.filter_ping_request().filter_ipv6_dst(dst_dua) 248 if self.is_backbone_device(src): 249 p = ping_req.filter_eth_src(self.vars[src + '_ETH']).must_next() 250 else: 251 p = ping_req.filter_wpan_src64(self.vars[src]).must_next() 252 253 # pkts.last().show() 254 ping_id = p.icmpv6.echo.identifier 255 logging.info("verify_ping: ping_id=%x", ping_id) 256 result.record_last('ping_request', pkts) 257 ping_req = ping_req.filter(lambda p: p.icmpv6.echo.identifier == ping_id) 258 259 # BBR unicasts the ping packet to TD. 260 if bbr: 261 if self.is_backbone_device(src): 262 ping_req.filter_wpan_src64(bbr_ext).must_next() 263 else: 264 ping_req.filter_eth_src(bbr_eth).must_next() 265 266 ping_reply = pkts.filter_ping_reply().filter_ipv6_dst(src_dua).filter( 267 lambda p: p.icmpv6.echo.identifier == ping_id) 268 # TD receives ping packet and responds back to Host via SBBR. 269 if self.is_thread_device(dst): 270 ping_reply.filter_wpan_src64(self.vars[dst]).must_next() 271 else: 272 ping_reply.filter_eth_src(self.vars[dst + '_ETH']).must_next() 273 274 result.record_last('ping_reply', pkts) 275 276 if bbr: 277 # SBBR forwards the ping response packet to Host. 278 if self.is_thread_device(dst): 279 ping_reply.filter_eth_src(bbr_eth).must_next() 280 else: 281 ping_reply.filter_wpan_src64(bbr_ext).must_next() 282 283 return result 284 285 def is_thread_device(self, name: str) -> bool: 286 """ 287 Returns if the device is an WPAN device. 288 289 :param name: The device name. 290 291 Note that device can be both a WPAN device and an Ethernet device. 292 """ 293 assert isinstance(name, str), name 294 295 return name in self.vars 296 297 def is_backbone_device(self, name: str) -> bool: 298 """ 299 Returns if the device s an Ethernet device. 300 301 :param name: The device name. 302 303 Note that device can be both a WPAN device and an Ethernet device. 304 """ 305 assert isinstance(name, str), name 306 307 return f'{name}_ETH' in self.vars 308 309 def max_index(self, *indexes: Tuple[int, int]) -> Tuple[int, int]: 310 wpan_idx = 0 311 eth_idx = 0 312 for wi, ei in indexes: 313 wpan_idx = max(wpan_idx, wi) 314 eth_idx = max(eth_idx, ei) 315 316 return wpan_idx, eth_idx 317 318 def verify_dua_registration(self, src64, dua, *, pbbr_eth, sbbr_eth=None, pbbr_src64=None): 319 pv, pkts = self, self.pkts 320 MM = pv.vars['MM_PORT'] 321 BB = pv.vars['BB_PORT'] 322 323 # Router1 should send /n/dr for DUA registration 324 dr = pkts.filter_wpan_src64(src64).filter_coap_request('/n/dr', port=MM).filter( 325 'thread_nm.tlv.target_eid == {ROUTER1_DUA}', ROUTER1_DUA=dua).must_next() 326 327 # SBBR should not send /b/bq for Router1's DUA 328 if sbbr_eth is not None: 329 pkts.filter_backbone_query(dua, eth_src=sbbr_eth, port=BB).must_not_next() 330 331 # PBBR should respond to /n/dr 332 if pbbr_src64 is not None: 333 pkts.filter_wpan_src64(pbbr_src64).filter_coap_ack( 334 '/n/dr', port=MM).must_next().must_verify('thread_nm.tlv.status == 0') 335 336 # PBBR should send /b/bq for Router1's DUA (1st time) 337 bq1 = pkts.filter_backbone_query(dua, eth_src=pbbr_eth, port=BB).must_next() 338 bq1_index = pkts.index 339 340 assert bq1.sniff_timestamp - dr.sniff_timestamp <= 1.01, bq1.sniff_timestamp - dr.sniff_timestamp 341 342 # PBBR should send /b/bq for Router1's DUA (2nd time) 343 bq2 = pkts.filter_backbone_query(dua, eth_src=pbbr_eth, port=BB).must_next() 344 345 assert 0.9 < bq2.sniff_timestamp - bq1.sniff_timestamp < 1.1, bq2.sniff_timestamp - bq1.sniff_timestamp 346 347 # PBBR should send /b/bq for Router1's DUA (3rd time) 348 bq3 = pkts.filter_backbone_query(dua, eth_src=pbbr_eth, port=BB).must_next() 349 350 assert 0.9 < bq3.sniff_timestamp - bq2.sniff_timestamp < 1.1, bq3.sniff_timestamp - bq2.sniff_timestamp 351 352 # PBBR should send PRO_BB.ntf for Router's DUA when DAD completed 353 pkts.filter_eth_src(pbbr_eth).filter_backbone_answer(dua, port=BB, confirmable=False).must_next().show() 354 355 # PBBR should not recv /b/ba response from other BBRs during this period 356 pkts.range(bq1_index, pkts.index, 357 cascade=False).filter('eth.src != {PBBR_ETH}', 358 PBBR_ETH=pbbr_eth).filter_backbone_answer(dua, port=BB).must_not_next() 359