1# 2# Copyright (c) 2019, The OpenThread Authors. 3# All rights reserved. 4# 5# Redistribution and use in source and binary forms, with or without 6# modification, are permitted provided that the following conditions are met: 7# 1. Redistributions of source code must retain the above copyright 8# notice, this list of conditions and the following disclaimer. 9# 2. Redistributions in binary form must reproduce the above copyright 10# notice, this list of conditions and the following disclaimer in the 11# documentation and/or other materials provided with the distribution. 12# 3. Neither the name of the copyright holder nor the 13# names of its contributors may be used to endorse or promote products 14# derived from this software without specific prior written permission. 15# 16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 17# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 20# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 21# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 22# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 23# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 24# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 25# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 26# POSSIBILITY OF SUCH DAMAGE. 27# 28 29import logging 30import sys 31from operator import attrgetter 32from typing import Optional, Callable, Tuple, Union 33 34from pktverify import consts, errors 35from pktverify.addrs import EthAddr, ExtAddr, Ipv6Addr 36from pktverify.bytes import Bytes 37from pktverify.consts import THREAD_ALLOWED_ICMPV6_TYPES 38from pktverify.packet import Packet 39from pktverify.utils import make_filter_func 40 41WPAN, ETH = 0, 1 42 43 44class _SavedIndex(object): 45 __slots__ = ('_pkts', '_saved_index') 46 47 def __init__(self, pkts): 48 self._pkts = pkts 49 self._saved_index = pkts.index 50 51 def __enter__(self): 52 pass 53 54 def __exit__(self, exc_type, exc_val, exc_tb): 55 self._pkts.index = self._saved_index 56 57 58def _always_true(p): 59 return True 60 61 62class PacketFilter(object): 63 """ 64 Represents a range of packets that are filtered by given filter 65 """ 66 67 def __init__(self, 68 pkts, 69 start=(0, 0), 70 stop=None, 71 *, 72 index=None, 73 filter_func: Optional[Callable] = None, 74 parent: Optional['PacketFilter'] = None): 75 if stop is None: 76 stop = (len(pkts), len(pkts)) 77 78 self._pkts = pkts 79 self._start_index = start 80 self._stop_index = stop 81 self._index = index if index is not None else self._start_index 82 self._last_index = -1 83 self._filter_func = filter_func or _always_true 84 self._parent = parent 85 self._check_type_ok() 86 87 def _check_type_ok(self): 88 assert self._last_index == -1 or 0 <= self._last_index < len(self._pkts) 89 90 assert isinstance(self._start_index, tuple) and len(self._start_index) == 2, self._start_index 91 assert isinstance(self._stop_index, tuple) and len(self._stop_index) == 2, self._stop_index 92 assert isinstance(self._index, tuple) and len(self._index) == 2, self._index 93 self._check_idx_range_ok((0, 0), self._start_index) 94 self._check_idx_range_ok(self._start_index, self._index) 95 self._check_idx_range_ok(self._index, self._stop_index) 96 self._check_idx_range_ok(self._stop_index, (len(self._pkts), len(self._pkts))) 97 98 def _check_idx_range_ok(self, start, stop): 99 assert start[0] <= stop[0], (start, stop) 100 assert start[1] <= stop[1], (start, stop) 101 102 @property 103 def index(self) -> Tuple[int, int]: 104 """ 105 :return: the current index (which is a tuple) 106 """ 107 return self._index 108 109 @index.setter 110 def index(self, index: Tuple[int, int]): 111 """ 112 Set the current index 113 :param index: the index tuple to set 114 """ 115 assert isinstance(index, tuple) and len(index) == 2, index 116 self._check_type_ok() 117 self._index = index 118 self._check_type_ok() 119 120 def __len__(self): 121 """ 122 :return: length of packets 123 """ 124 return len(self._pkts) 125 126 def save_index(self): 127 """ 128 Save the current index to be restored. 129 130 :return: a context that saves the current index when entering, and restores when exiting 131 """ 132 return _SavedIndex(self) 133 134 @property 135 def start_index(self) -> Tuple[int, int]: 136 """ 137 :return: the start index tuple 138 """ 139 return self._start_index 140 141 @property 142 def stop_index(self) -> Tuple[int, int]: 143 """ 144 :return: the stop index tuple 145 """ 146 return self._stop_index 147 148 def filter(self, func, cascade=True, **vars) -> 'PacketFilter': 149 """ 150 Create a new PacketFilter based on this packet filter with given filter func 151 152 :param func: a callable that returns a bool (e.x. lambda p: xxx) or a filter string 153 :param cascade: True if calling next in the new filter will also set index for this filter, False otherwise 154 :param vars: variables for filter string 155 :return: a new PacketFilter 156 """ 157 print('\n>>> filtering in range %s~%s%s:' % 158 (self._index, self._stop_index, "<end>" if self._stop_index == len(self._pkts) else "<stop>"), 159 file=sys.stderr) 160 161 func = make_filter_func(func, **vars) 162 self._check_type_ok() 163 return PacketFilter(self._pkts, 164 self._index, 165 self._stop_index, 166 filter_func=lambda p: self._filter_func(p) and func(p), 167 parent=self if cascade else None) 168 169 def filter_if(self, cond: bool, *args, **kwargs) -> 'PacketFilter': 170 """ 171 Create a filter using given arguments if `cond` is true. 172 173 :param cond: the condition to be checked 174 :param args: arguments for filter func 175 :param kwargs: arguments for filter func 176 :return: a sub filter using given arguments if cond is true, or self otherwise 177 """ 178 if cond: 179 return self.filter(*args, **kwargs) 180 else: 181 return self 182 183 @property 184 def last_index(self) -> Tuple[int, int]: 185 return self._last_index 186 187 def last(self) -> Packet: 188 """ 189 :return: the last packet found 190 """ 191 if self._last_index >= 0: 192 return self._pkts[self._last_index] 193 else: 194 raise errors.PacketNotFound(self.index, self._stop_index) 195 196 def next(self) -> Optional[Packet]: 197 """ 198 Find the next packet starting from the current index to the stop index that matches the current filter. 199 200 :return: the next matching packet, or None if packet not found 201 """ 202 self._check_type_ok() 203 idx = min(self._index) 204 stop_idx = max(self._stop_index) 205 206 while idx < stop_idx: 207 p = self._pkts[idx] 208 209 sys.stderr.write('#%d %s' % (idx + 1, '\n' if idx % 40 == 39 else '')) 210 if self._filter_func(p): 211 if p.wpan and not (self._index[0] <= idx < self._stop_index[0]): # wpan matched but not in range 212 pass 213 elif p.eth and not (self._index[1] <= idx < self._stop_index[1]): # eth matched but not in range 214 pass 215 else: 216 self._on_found_next(idx, p) 217 print("\n>>> found packet at #%d!" % (idx + 1,), file=sys.stderr) 218 return p 219 220 idx += 1 221 222 return None 223 224 def must_next(self) -> Packet: 225 """ 226 Call .next(), raise error if packet is not found. 227 228 :return: the next matching packet 229 """ 230 p = self.next() 231 if p is not None: 232 return p 233 else: 234 raise errors.PacketNotFound(self.index, self._stop_index) 235 236 def must_not_next(self) -> None: 237 """ 238 Call .next(), raise error if packet is found 239 """ 240 p = self.next() 241 if p is None: 242 return 243 else: 244 logging.error("Found unexpected packet at #%s", self.index) 245 p.show() 246 p.debug_fields() 247 raise errors.UnexpectedPacketFound(self.index, p) 248 249 def _on_found_next(self, idx: int, p: Packet): 250 assert self._pkts[idx] is p 251 assert idx >= min(self._index) 252 assert not p.wpan or idx >= self._index[0] 253 assert not p.eth or idx >= self._index[1], (self._index, idx) 254 255 if p.wpan: 256 wpan_idx = idx + 1 257 eth_idx = max(self._index[1], 258 self._find_prev_packet(idx + 1, p.sniff_timestamp - consts.AUTO_SEEK_BACK_MAX_DURATION, ETH)) 259 else: 260 eth_idx = idx + 1 261 wpan_idx = max( 262 self._index[0], 263 self._find_prev_packet(idx + 1, p.sniff_timestamp - consts.AUTO_SEEK_BACK_MAX_DURATION, WPAN)) 264 265 # make sure index never go back 266 assert wpan_idx >= self._index[0] 267 assert eth_idx >= self._index[1] 268 269 print('\n>>>_on_found_next %d %s => %s' % (idx, self._index, (wpan_idx, eth_idx)), file=sys.stderr) 270 self._set_found_index(idx, (wpan_idx, eth_idx)) 271 272 def _find_prev_packet(self, idx, min_sniff_timestamp, pkttype): 273 assert pkttype in (WPAN, ETH) 274 275 prev_idx = idx 276 while idx > 0 and self._pkts[idx - 1].sniff_timestamp >= min_sniff_timestamp: 277 idx -= 1 278 if pkttype == WPAN and self._pkts[idx].wpan: 279 prev_idx = idx 280 elif pkttype == ETH and self._pkts[idx].eth: 281 prev_idx = idx 282 283 return prev_idx 284 285 def __iter__(self): 286 for pkt in self._pkts: 287 yield pkt 288 289 def range(self, start, stop=None, cascade=True) -> 'PacketFilter': 290 """ 291 Create a new PacketFilter using the specified start and stop index tuples 292 293 :param start: the new start index tuple 294 :param stop: the new stop index tuple 295 :param cascade: True if calling next in the new filter will also set index for this filter, False otherwise 296 :return: a new PacketFilter with new start and stop range 297 """ 298 299 if stop is None: 300 stop = self._stop_index 301 302 assert self._start_index <= start <= self._stop_index 303 assert self._start_index <= stop <= self._stop_index 304 return PacketFilter(self._pkts, start, stop, filter_func=self._filter_func, parent=self if cascade else None) 305 306 def copy(self) -> 'PacketFilter': 307 """ 308 :return: a copy of the current PacketFilter 309 """ 310 return PacketFilter(self._pkts, self._index, self._stop_index, filter_func=self._filter_func, parent=None) 311 312 def __getitem__(self, index: int) -> Packet: 313 """ 314 :param index: the packet index (not tuple!) 315 :return: the packet at the specified index 316 """ 317 assert isinstance(index, int), index 318 return self._pkts[index] 319 320 def seek_back(self, max_duration: float, *, eth=False, wpan=False) -> 'PacketFilter': 321 """ 322 Move the current index back in time within the specified max duration. Either eth or wpan must be True. 323 324 :param max_duration: the max duration to move back 325 :param eth: True if eth index can be moved back 326 :param wpan: True if wpan index can be moved back 327 :return: self 328 """ 329 assert eth or wpan, "must have eth or wpan" 330 331 wpan_idx = self._index[0] 332 if wpan and wpan_idx < len(self._pkts): 333 wpan_idx = self._find_prev_packet(wpan_idx, self._pkts[wpan_idx].sniff_timestamp - max_duration, WPAN) 334 wpan_idx = max(self._start_index[0], wpan_idx) 335 336 eth_idx = self._index[1] 337 if eth and eth_idx < len(self._pkts): 338 eth_idx = self._find_prev_packet(eth_idx, self._pkts[eth_idx].sniff_timestamp - max_duration, ETH) 339 eth_idx = max(self._start_index[1], eth_idx) 340 341 print("\n>>> back %s wpan=%s, eth=%s: index %s => %s" % (max_duration, wpan, eth, self._index, 342 (wpan_idx, eth_idx)), 343 file=sys.stderr) 344 self._index = (wpan_idx, eth_idx) 345 self._check_type_ok() 346 return self 347 348 def _set_found_index(self, last_index: Tuple[int, int], index: Tuple[int, int]): 349 self._last_index = last_index 350 self._index = index 351 self._check_type_ok() 352 353 if self._parent is not None: 354 self._parent._set_found_index(last_index, index) 355 356 def filter_mle_advertisement(self, role: str, **kwargs): 357 assert role in ('Leader', 'Router', 'REED'), role 358 359 tlv_set = {consts.LEADER_DATA_TLV, consts.SOURCE_ADDRESS_TLV} 360 361 if role != 'REED': 362 tlv_set.add(consts.ROUTE64_TLV) 363 364 return self.filter_LLANMA(). \ 365 filter_mle_cmd(consts.MLE_ADVERTISEMENT). \ 366 filter(lambda p: tlv_set <= 367 set(p.mle.tlv.type) and \ 368 p.ipv6.hlim == 255, **kwargs 369 ) 370 371 def filter_coap(self, **kwargs): 372 """ 373 Create a new PacketFilter to filter COAP packets. 374 375 :param kwargs: Extra arguments for `filter`. 376 :return: The new PacketFilter to filter COAP packets. 377 """ 378 return self.filter(attrgetter('coap'), **kwargs) 379 380 def filter_coap_request(self, uri_path, port=None, confirmable=None, **kwargs): 381 """ 382 Create a new PacketFilter to filter COAP Request packets. 383 384 :param uri_path: The COAP URI path to filter. 385 :param port: The UDP port to filter if specified. 386 :param kwargs: Extra arguments for `filter`. 387 :return: The new PacketFilter to filter COAP Request packets. 388 """ 389 assert isinstance(uri_path, str), uri_path 390 assert port is None or isinstance(port, int), port 391 return self.filter( 392 lambda p: (p.coap.is_post and p.coap.opt.uri_path_recon == uri_path and 393 (confirmable is None or p.coap.type == 394 (0 if confirmable else 1)) and (port is None or p.udp.dstport == port)), **kwargs) 395 396 def filter_coap_ack(self, uri_path, port=None, **kwargs): 397 """ 398 Create a new PacketFilter for filter COAP ACK packets. 399 400 :param uri_path: The COAP URI path to filter. 401 :param port: The UDP port to filter if specified. 402 :param kwargs: Extra arguments for `filter`. 403 :return: The new PacketFilter to filter COAP ACK packets. 404 """ 405 assert isinstance(uri_path, str), uri_path 406 assert port is None or isinstance(port, int), port 407 return self.filter( 408 lambda p: (p.coap.is_ack and p.coap.opt.uri_path_recon == uri_path and 409 (port is None or p.udp.dstport == port)), **kwargs) 410 411 def filter_backbone_answer(self, 412 target: str, 413 *, 414 eth_src: Optional[EthAddr] = None, 415 port: int = None, 416 confirmable: bool = None, 417 mliid=None): 418 filter_eth = self.filter_eth_src(eth_src) if eth_src else self.filter_eth() 419 f = filter_eth.filter_coap_request('/b/ba', port=port, 420 confirmable=confirmable).filter('thread_bl.tlv.target_eid == {target}', 421 target=target) 422 if mliid is not None: 423 f = f.filter('thread_bl.tlv.ml_eid == {mliid}', mliid=mliid) 424 425 return f 426 427 def filter_backbone_query(self, target: str, *, eth_src: EthAddr, port: int = None) -> 'PacketFilter': 428 return self.filter_eth_src(eth_src).filter_coap_request('/b/bq', port=port, confirmable=False).filter( 429 'thread_bl.tlv.target_eid == {target}', target=target) 430 431 def filter_wpan(self, **kwargs): 432 """ 433 Create a new PacketFilter for filter WPAN packets. 434 435 :param kwargs: Extra arguments for `filter`. 436 :return: The new PacketFilter to filter WPAN packets. 437 """ 438 return self.filter(attrgetter('wpan'), **kwargs) 439 440 def filter_wpan_ack(self, **kwargs): 441 """ 442 Create a new PacketFilter for filter WPAN ACK packets. 443 444 :param kwargs: Extra arguments for `filter`. 445 :return: The new PacketFilter to filter WPAN packets. 446 """ 447 return self.filter(lambda p: p.wpan.frame_type == consts.MAC_FRAME_TYPE_ACK, **kwargs) 448 449 def filter_wpan_beacon(self, **kwargs): 450 """ 451 Create a new PacketFilter for filter WPAN beacon. 452 453 :param kwargs: Extra arguments for `filter`. 454 :return: The new PacketFilter to filter WPAN packets. 455 """ 456 return self.filter(lambda p: p.wpan.frame_type == consts.MAC_FRAME_TYPE_BEACON, **kwargs) 457 458 def filter_wpan_data(self, **kwargs): 459 """ 460 Create a new PacketFilter for filter WPAN data packets. 461 462 :param kwargs: Extra arguments for `filter`. 463 :return: The new PacketFilter to filter WPAN packets. 464 """ 465 return self.filter(lambda p: p.wpan.frame_type == consts.MAC_FRAME_TYPE_DATA, **kwargs) 466 467 def filter_wpan_seq(self, seq, **kwargs): 468 """ 469 Create a new PacketFilter for filter WPAN packets of a sequence number. 470 471 :param seq: The sequence number to filter. 472 :param kwargs: Extra arguments for `filter`. 473 :return: The new PacketFilter to filter WPAN packets. 474 """ 475 return self.filter(lambda p: p.wpan.seq_no == seq, **kwargs) 476 477 def filter_wpan_version(self, version: int, **kwargs): 478 """ 479 Create a new PacketFilter for filter WPAN packets of a given version. 480 481 :param version: The version to filter. 482 :param kwargs: Extra arguments for `filter`. 483 :return: The new PacketFilter to filter WPAN packets. 484 """ 485 return self.filter(lambda p: p.wpan.version == version, **kwargs) 486 487 def filter_wpan_channel(self, channel: int, **kwargs): 488 """ 489 Create a new PacketFilter for filter WPAN packets of a given channel. 490 491 :param channel: The channel to filter. 492 :param kwargs: Extra arguments for `filter`. 493 :return: The new PacketFilter to filter WPAN packets. 494 """ 495 return self.filter(lambda p: p.wpan.channel == channel, **kwargs) 496 497 def filter_wpan_src16(self, addr, **kwargs): 498 return self.filter(lambda p: p.wpan.src16 == addr, **kwargs) 499 500 def filter_wpan_dst16(self, addr, **kwargs): 501 return self.filter(lambda p: p.wpan.dst16 == addr, **kwargs) 502 503 def filter_wpan_src16_dst16(self, src_addr, dst_addr, **kwargs): 504 return self.filter(lambda p: p.wpan.src16 == src_addr and p.wpan.dst16 == dst_addr, **kwargs) 505 506 def filter_wpan_src64(self, addr, **kwargs): 507 assert isinstance(addr, (str, ExtAddr)), addr 508 return self.filter(lambda p: p.wpan.src64 == addr, **kwargs) 509 510 def filter_wpan_dst64(self, addr, **kwargs): 511 assert isinstance(addr, (str, ExtAddr)), addr 512 return self.filter(lambda p: p.wpan.dst64 == addr, **kwargs) 513 514 def filter_dst16(self, rloc16: int, **kwargs): 515 return self.filter(lambda p: p.lowpan.mesh.dest16 == rloc16 or p.wpan.dst16 == rloc16, **kwargs) 516 517 def filter_wpan_ie_present(self, **kwargs): 518 return self.filter(lambda p: p.wpan.ie_present == 1) 519 520 def filter_wpan_ie_not_present(self, **kwargs): 521 return self.filter(lambda p: p.wpan.ie_present == 0) 522 523 def filter_ping_request(self, identifier=None, **kwargs): 524 return self.filter( 525 lambda p: p.icmpv6.is_ping_request and (identifier is None or p.icmpv6.echo.identifier == identifier), 526 **kwargs) 527 528 def filter_ping_reply(self, **kwargs): 529 identifier = kwargs.pop('identifier', None) 530 return self.filter( 531 lambda p: (p.icmpv6.is_ping_reply and (identifier is None or p.icmpv6.echo.identifier == identifier)), 532 **kwargs) 533 534 def filter_eth(self, **kwargs): 535 return self.filter(attrgetter('eth'), **kwargs) 536 537 def filter_eth_src(self, addr, **kwargs): 538 assert isinstance(addr, (str, EthAddr)) 539 return self.filter(lambda p: p.eth.src == addr, **kwargs) 540 541 def filter_ipv6_dst(self, addr, **kwargs): 542 assert isinstance(addr, (str, Ipv6Addr)) 543 return self.filter(lambda p: p.ipv6.dst == addr, **kwargs) 544 545 def filter_ipv6_2dsts(self, addr1, addr2, **kwargs): 546 assert isinstance(addr1, (str, Ipv6Addr)) 547 assert isinstance(addr2, (str, Ipv6Addr)) 548 return self.filter(lambda p: p.ipv6.dst == addr1 or p.ipv6.dst == addr2, **kwargs) 549 550 def filter_ipv6_src_dst(self, src_addr, dst_addr, **kwargs): 551 assert isinstance(src_addr, (str, Ipv6Addr)) 552 assert isinstance(dst_addr, (str, Ipv6Addr)) 553 return self.filter(lambda p: p.ipv6.src == src_addr and p.ipv6.dst == dst_addr, **kwargs) 554 555 def filter_LLATNMA(self, **kwargs): 556 return self.filter(lambda p: p.ipv6.dst == consts.LINK_LOCAL_All_THREAD_NODES_MULTICAST_ADDRESS, **kwargs) 557 558 def filter_RLANMA(self, **kwargs): 559 return self.filter(lambda p: p.ipv6.dst == consts.REALM_LOCAL_ALL_NODES_ADDRESS, **kwargs) 560 561 def filter_RLARMA(self, **kwargs): 562 return self.filter(lambda p: p.ipv6.dst == consts.REALM_LOCAL_ALL_ROUTERS_ADDRESS, **kwargs) 563 564 def filter_RLATNMA(self, **kwargs): 565 return self.filter(lambda p: p.ipv6.dst == consts.REALM_LOCAL_All_THREAD_NODES_MULTICAST_ADDRESS, **kwargs) 566 567 def filter_LLANMA(self, **kwargs): 568 return self.filter(lambda p: p.ipv6.dst == consts.LINK_LOCAL_ALL_NODES_MULTICAST_ADDRESS, **kwargs) 569 570 def filter_LLABMA(self, **kwargs): 571 return self.filter(lambda p: p.ipv6.dst == consts.LINK_LOCAL_ALL_BBRS_MULTICAST_ADDRESS, **kwargs) 572 573 def filter_LLARMA(self, **kwargs): 574 return self.filter(lambda p: p.ipv6.dst == consts.LINK_LOCAL_ALL_ROUTERS_MULTICAST_ADDRESS, **kwargs) 575 576 def filter_AMPLFMA(self, mpl_seed_id: Union[int, Ipv6Addr] = None, **kwargs): 577 f = self.filter(lambda p: p.ipv6.dst == consts.ALL_MPL_FORWARDERS_MA, **kwargs) 578 if mpl_seed_id is not None: 579 if isinstance(mpl_seed_id, int): 580 mpl_seed_id = Bytes([mpl_seed_id >> 8, mpl_seed_id & 0xFF]) 581 f = f.filter(lambda p: p.ipv6.opt.mpl.seed_id == mpl_seed_id) 582 else: 583 rloc = mpl_seed_id 584 rloc16 = bytes(rloc[-2:]) 585 f = f.filter(lambda p: (p.ipv6.src == rloc and p.ipv6.opt.mpl.flag.s == 0) or 586 (p.ipv6.src != rloc and p.ipv6.opt.mpl.flag.s == 1 and p.ipv6.opt.mpl.seed_id == rloc16)) 587 return f 588 589 def filter_mle(self, **kwargs): 590 return self.filter(attrgetter('mle'), **kwargs) 591 592 def filter_wpan_cmd(self, cmd, **kwargs): 593 assert isinstance(cmd, int), cmd 594 return self.filter(lambda p: p.wpan.cmd == cmd, **kwargs) 595 596 def filter_mle_cmd(self, cmd, **kwargs): 597 assert isinstance(cmd, int), cmd 598 return self.filter(lambda p: p.mle.cmd == cmd, **kwargs) 599 600 def filter_mle_cmd2(self, cmd1, cmd2, **kwargs): 601 assert isinstance(cmd1, int), cmd1 602 assert isinstance(cmd2, int), cmd2 603 return self.filter(lambda p: p.mle.cmd == cmd1 or p.mle.cmd == cmd2, **kwargs) 604 605 def filter_mle_has_tlv(self, *tlv_types, **kwargs): 606 return self.filter(lambda p: set(tlv_types) <= set(p.mle.tlv.type), **kwargs) 607 608 def filter_icmpv6(self, **kwargs): 609 return self.filter(attrgetter('icmpv6'), **kwargs) 610 611 def filter_icmpv6_nd_ns(self, target_address: Ipv6Addr): 612 return self.filter(lambda p: 613 (p.icmpv6.is_neighbor_solicitation and p.icmpv6.nd.ns.target_address == target_address)) 614 615 def filter_icmpv6_nd_na(self, target_address: Ipv6Addr): 616 return self.filter(lambda p: 617 (p.icmpv6.is_neighbor_advertisement and p.icmpv6.nd.na.target_address == target_address)) 618 619 def filter_icmpv6_nd_ra(self): 620 return self.filter(lambda p: p.icmpv6.is_router_advertisement) 621 622 def filter_thread_unallowed_icmpv6(self): 623 return self.filter('wpan and icmpv6 and icmpv6.type not in {THREAD_ALLOWED_ICMPV6_TYPES}', 624 THREAD_ALLOWED_ICMPV6_TYPES=THREAD_ALLOWED_ICMPV6_TYPES) 625 626 def filter_has_bbr_dataset(self): 627 return self.filter(""" 628 thread_nwd.tlv.server.has('16') 629 and thread_nwd.tlv.service.s_data.seqno is not null 630 and thread_nwd.tlv.service.s_data.rrdelay is not null 631 and thread_nwd.tlv.service.s_data.mlrtimeout is not null 632 """) 633