1#!/usr/bin/env python3 2# 3# Copyright (c) 2019, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29import logging 30from typing import Tuple 31 32from pktverify import consts 33from pktverify.consts import MLE_CHILD_ID_REQUEST, MLE_ADVERTISEMENT, MLE_CHILD_ID_RESPONSE 34from pktverify.pcap_reader import PcapReader 35from pktverify.summary import Summary 36from pktverify.test_info import TestInfo 37from pktverify.verify_result import VerifyResult 38 39 40class PacketVerifier(object): 41 """ 42 Base class for packet verifiers that runs the packet verification process 43 """ 44 NET_NAME = "OpenThread" 45 MC_PORT = 49191 46 MM_PORT = 61631 47 BB_PORT = 61631 48 LLANMA = 'ff02::1' # Link-Local All Nodes multicast address 49 LLARMA = 'ff02::2' # Link-Local All Routers multicast address 50 RLANMA = 'ff03::1' # realm-local all-nodes multicast address 51 RLARMA = 'ff03::2' # realm-local all-routers multicast address 52 RLAMFMA = 'ff03::fc' # realm-local ALL_MPL_FORWARDERS address 53 LLABMA = 'ff32:40:fd00:7d03:7d03:7d03:0:3' # Link-Local All BBRs multicast address 54 55 def __init__(self, test_info_path, wireshark_prefs=None): 56 logging.basicConfig(level=logging.INFO, 57 format='File "%(pathname)s", line %(lineno)d, in %(funcName)s\n' 58 '%(asctime)s - %(levelname)s - %(message)s') 59 60 ti = TestInfo(test_info_path) 61 if wireshark_prefs is not None: 62 pkts = PcapReader.read(ti.pcap_path, wireshark_prefs) 63 else: 64 pkts = PcapReader.read(ti.pcap_path) 65 print('loaded %d packets from %s' % (len(pkts), ti.pcap_path)) 66 self.pkts = pkts 67 self.test_info = ti 68 69 self.summary = Summary(pkts, ti) 70 self._vars = {} 71 self._add_initial_vars() 72 73 def add_vars(self, **vars): 74 """ 75 Add new variables. 76 77 :param vars: The new variables. 78 """ 79 self._vars.update(vars) 80 81 @property 82 def vars(self): 83 """ 84 :return: the dict of all variables 85 """ 86 return self._vars 87 88 def add_common_vars(self): 89 """ 90 Add common variables that is needed by many test cases. 91 """ 92 self.add_vars( 93 NET_NAME=PacketVerifier.NET_NAME, 94 MM_PORT=PacketVerifier.MM_PORT, 95 MC_PORT=PacketVerifier.MC_PORT, 96 BB_PORT=PacketVerifier.BB_PORT, 97 LLANMA=PacketVerifier.LLANMA, # Link-Local All Nodes multicast address 98 LLARMA=PacketVerifier.LLARMA, # Link-Local All Routers multicast address 99 RLANMA=PacketVerifier.RLANMA, # realm-local all-nodes multicast address 100 RLARMA=PacketVerifier.RLARMA, # realm-local all-routers multicast address 101 RLAMFMA=PacketVerifier.RLAMFMA, # realm-local ALL_MPL_FORWARDERS address 102 LLABMA=PacketVerifier.LLABMA, # Link-Local All BBRs multicast address 103 MA1=consts.MA1, 104 MA2=consts.MA2, 105 MA3=consts.MA3, 106 MA4=consts.MA4, 107 MA5=consts.MA5, 108 MA6=consts.MA6, 109 MA1g=consts.MA1g, 110 MAe1=consts.MAe1, 111 MAe2=consts.MAe2, 112 MAe3=consts.MAe3, 113 ) 114 115 def _add_initial_vars(self): 116 for i, addr in self.test_info.extaddrs.items(): 117 name = self.test_info.get_node_name(i) 118 self._vars[name] = addr 119 120 for i, addr in self.test_info.ethaddrs.items(): 121 name = self.test_info.get_node_name(i) + '_ETH' 122 self._vars[name] = addr 123 124 for i, addrs in self.test_info.ipaddrs.items(): 125 name = self.test_info.get_node_name(i) 126 self._vars[name + '_IPADDRS'] = addrs 127 for addr in addrs: 128 if addr.is_dua: 129 key = name + '_DUA' 130 elif addr.is_backbone_gua: 131 key = name + '_BGUA' 132 elif addr.is_link_local and (name + '_BGUA') in self._vars: 133 # FIXME: assume the link-local address after Backbone GUA is the Backbone Link Local address 134 key = name + '_BLLA' 135 elif addr.is_link_local: 136 key = name + '_LLA' 137 else: 138 logging.warning("IPv6 address ignored: name=%s, addr=%s, is_global=%s, is_link_local=%s", name, 139 addr, addr.is_global, addr.is_link_local) 140 continue 141 142 if key in self._vars: 143 logging.warning("duplicate IPv6 address type: name=%s, addr=%s,%s", name, addr, self._vars[key]) 144 continue 145 146 self._vars[key] = addr 147 148 for i, addr in self.test_info.mleids.items(): 149 name = self.test_info.get_node_name(i) 150 self._vars[name + '_MLEID'] = addr 151 152 for i, rloc16 in self.test_info.rloc16s.items(): 153 key = self.test_info.get_node_name(i) + '_RLOC16' 154 self._vars[key] = rloc16 155 156 for i, rloc in self.test_info.rlocs.items(): 157 key = self.test_info.get_node_name(i) + '_RLOC' 158 self._vars[key] = rloc 159 160 if self.test_info.leader_aloc: 161 self._vars['LEADER_ALOC'] = self.test_info.leader_aloc 162 163 for k, v in self.test_info.extra_vars.items(): 164 assert k not in self._vars, k 165 logging.info("add extra var: %s = %s", k, v) 166 self._vars[k] = v 167 168 for i, topo in self.test_info.topology.items(): 169 name = self.test_info.get_node_name(i) 170 if topo['version']: 171 self._vars[name + '_VERSION'] = {'1.1': 2, '1.2': 3}[topo['version']] 172 173 def verify_attached(self, child: str, parent: str = None, child_type: str = 'FTD', pkts=None) -> VerifyResult: 174 """ 175 Verify that the device attaches to the Thread network. 176 177 :param child: The child device name. 178 :param parent: The parent device name. 179 :param child_type: The child device type (FTD, FTD-ED, MTD). 180 """ 181 result = VerifyResult() 182 assert self.is_thread_device(child), child 183 assert child_type in ('FTD', 'FTD-ED', 'MTD'), child_type 184 pkts = pkts or self.pkts 185 child_extaddr = self.vars[child] 186 187 src_pkts = pkts.filter_wpan_src64(child_extaddr) 188 if parent: 189 assert self.is_thread_device(parent), parent 190 src_pkts = pkts.filter_wpan_src64(child_extaddr).\ 191 filter_wpan_dst64(self.vars[parent]) 192 src_pkts.filter_mle_cmd(MLE_CHILD_ID_REQUEST).must_next() # Child Id Request 193 result.record_last('child_id_request', pkts) 194 195 dst_pkts = pkts.filter_wpan_dst64(child_extaddr) 196 if parent: 197 dst_pkts = pkts.filter_wpan_src64(self.vars[parent]).\ 198 filter_wpan_dst64(child_extaddr) 199 dst_pkts.filter_mle_cmd(MLE_CHILD_ID_RESPONSE).must_next() # Child Id Response 200 result.record_last('child_id_response', pkts) 201 202 with pkts.save_index(): 203 if child_type == 'FTD': 204 src_pkts = pkts.filter_wpan_src64(child_extaddr) 205 src_pkts.filter_mle_cmd(MLE_ADVERTISEMENT).must_next() # MLE Advertisement 206 result.record_last('mle_advertisement', pkts) 207 logging.info(f"verify attached: d={child}, result={result}") 208 209 return result 210 211 def verify_ping(self, src: str, dst: str, bbr: str = None, pkts: 'PacketVerifier' = None) -> VerifyResult: 212 """ 213 Verify the ping process. 214 215 :param src: The source device name. 216 :param dst: The destination device name. 217 :param bbr: The Backbone Router name. 218 If specified, this method also verifies that the ping request and reply be forwarded by the Backbone Router. 219 :param pkts: The PacketFilter to search. 220 221 :return: The verification result. 222 """ 223 if bbr: 224 assert not (self.is_thread_device(src) and self.is_thread_device(dst)), \ 225 f"both {src} and {dst} are WPAN devices" 226 assert not (self.is_backbone_device(src) and self.is_backbone_device(dst)), \ 227 f"both {src} and {dst} are ETH devices" 228 229 if pkts is None: 230 pkts = self.pkts 231 232 src_dua = self.vars[src + '_DUA'] 233 dst_dua = self.vars[dst + '_DUA'] 234 if bbr: 235 bbr_ext = self.vars[bbr] 236 bbr_eth = self.vars[bbr + '_ETH'] 237 238 result = VerifyResult() 239 ping_req = pkts.filter_ping_request().filter_ipv6_dst(dst_dua) 240 if self.is_backbone_device(src): 241 p = ping_req.filter_eth_src(self.vars[src + '_ETH']).must_next() 242 else: 243 p = ping_req.filter_wpan_src64(self.vars[src]).must_next() 244 245 # pkts.last().show() 246 ping_id = p.icmpv6.echo.identifier 247 logging.info("verify_ping: ping_id=%x", ping_id) 248 result.record_last('ping_request', pkts) 249 ping_req = ping_req.filter(lambda p: p.icmpv6.echo.identifier == ping_id) 250 251 # BBR unicasts the ping packet to TD. 252 if bbr: 253 if self.is_backbone_device(src): 254 ping_req.filter_wpan_src64(bbr_ext).must_next() 255 else: 256 ping_req.filter_eth_src(bbr_eth).must_next() 257 258 ping_reply = pkts.filter_ping_reply().filter_ipv6_dst(src_dua).filter( 259 lambda p: p.icmpv6.echo.identifier == ping_id) 260 # TD receives ping packet and responds back to Host via SBBR. 261 if self.is_thread_device(dst): 262 ping_reply.filter_wpan_src64(self.vars[dst]).must_next() 263 else: 264 ping_reply.filter_eth_src(self.vars[dst + '_ETH']).must_next() 265 266 result.record_last('ping_reply', pkts) 267 268 if bbr: 269 # SBBR forwards the ping response packet to Host. 270 if self.is_thread_device(dst): 271 ping_reply.filter_eth_src(bbr_eth).must_next() 272 else: 273 ping_reply.filter_wpan_src64(bbr_ext).must_next() 274 275 return result 276 277 def is_thread_device(self, name: str) -> bool: 278 """ 279 Returns if the device is an WPAN device. 280 281 :param name: The device name. 282 283 Note that device can be both a WPAN device and an Ethernet device. 284 """ 285 assert isinstance(name, str), name 286 287 return name in self.vars 288 289 def is_backbone_device(self, name: str) -> bool: 290 """ 291 Returns if the device s an Ethernet device. 292 293 :param name: The device name. 294 295 Note that device can be both a WPAN device and an Ethernet device. 296 """ 297 assert isinstance(name, str), name 298 299 return f'{name}_ETH' in self.vars 300 301 def max_index(self, *indexes: Tuple[int, int]) -> Tuple[int, int]: 302 wpan_idx = 0 303 eth_idx = 0 304 for wi, ei in indexes: 305 wpan_idx = max(wpan_idx, wi) 306 eth_idx = max(eth_idx, ei) 307 308 return wpan_idx, eth_idx 309 310 def verify_dua_registration(self, src64, dua, *, pbbr_eth, sbbr_eth=None, pbbr_src64=None): 311 pv, pkts = self, self.pkts 312 MM = pv.vars['MM_PORT'] 313 BB = pv.vars['BB_PORT'] 314 315 # Router1 should send /n/dr for DUA registration 316 dr = pkts.filter_wpan_src64(src64).filter_coap_request('/n/dr', port=MM).filter( 317 'thread_nm.tlv.target_eid == {ROUTER1_DUA}', ROUTER1_DUA=dua).must_next() 318 319 # SBBR should not send /b/bq for Router1's DUA 320 if sbbr_eth is not None: 321 pkts.filter_backbone_query(dua, eth_src=sbbr_eth, port=BB).must_not_next() 322 323 # PBBR should respond to /n/dr 324 if pbbr_src64 is not None: 325 pkts.filter_wpan_src64(pbbr_src64).filter_coap_ack( 326 '/n/dr', port=MM).must_next().must_verify('thread_nm.tlv.status == 0') 327 328 # PBBR should send /b/bq for Router1's DUA (1st time) 329 bq1 = pkts.filter_backbone_query(dua, eth_src=pbbr_eth, port=BB).must_next() 330 bq1_index = pkts.index 331 332 assert bq1.sniff_timestamp - dr.sniff_timestamp <= 1.01, bq1.sniff_timestamp - dr.sniff_timestamp 333 334 # PBBR should send /b/bq for Router1's DUA (2nd time) 335 bq2 = pkts.filter_backbone_query(dua, eth_src=pbbr_eth, port=BB).must_next() 336 337 assert 0.9 < bq2.sniff_timestamp - bq1.sniff_timestamp < 1.1, bq2.sniff_timestamp - bq1.sniff_timestamp 338 339 # PBBR should send /b/bq for Router1's DUA (3rd time) 340 bq3 = pkts.filter_backbone_query(dua, eth_src=pbbr_eth, port=BB).must_next() 341 342 assert 0.9 < bq3.sniff_timestamp - bq2.sniff_timestamp < 1.1, bq3.sniff_timestamp - bq2.sniff_timestamp 343 344 # PBBR should send PRO_BB.ntf for Router's DUA when DAD completed 345 pkts.filter_eth_src(pbbr_eth).filter_backbone_answer(dua, port=BB, confirmable=False).must_next().show() 346 347 # PBBR should not recv /b/ba response from other BBRs during this period 348 pkts.range(bq1_index, pkts.index, 349 cascade=False).filter('eth.src != {PBBR_ETH}', 350 PBBR_ETH=pbbr_eth).filter_backbone_answer(dua, port=BB).must_not_next() 351