1# 2# Copyright (c) 2019, The OpenThread Authors. 3# All rights reserved. 4# 5# Redistribution and use in source and binary forms, with or without 6# modification, are permitted provided that the following conditions are met: 7# 1. Redistributions of source code must retain the above copyright 8# notice, this list of conditions and the following disclaimer. 9# 2. Redistributions in binary form must reproduce the above copyright 10# notice, this list of conditions and the following disclaimer in the 11# documentation and/or other materials provided with the distribution. 12# 3. Neither the name of the copyright holder nor the 13# names of its contributors may be used to endorse or promote products 14# derived from this software without specific prior written permission. 15# 16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 17# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 20# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 21# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 22# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 23# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 24# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 25# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 26# POSSIBILITY OF SUCH DAMAGE. 27# 28 29import logging 30import sys 31from operator import attrgetter 32from typing import Optional, Callable, Tuple, Union 33 34from pktverify import consts, errors 35from pktverify.addrs import EthAddr, ExtAddr, Ipv6Addr 36from pktverify.bytes import Bytes 37from pktverify.packet import Packet 38from pktverify.utils import make_filter_func 39 40WPAN, ETH = 0, 1 41 42 43class _SavedIndex(object): 44 __slots__ = ('_pkts', '_saved_index') 45 46 def __init__(self, pkts): 47 self._pkts = pkts 48 self._saved_index = pkts.index 49 50 def __enter__(self): 51 pass 52 53 def __exit__(self, exc_type, exc_val, exc_tb): 54 self._pkts.index = self._saved_index 55 56 57def _always_true(p): 58 return True 59 60 61class PacketFilter(object): 62 """ 63 Represents a range of packets that are filtered by given filter 64 """ 65 66 def __init__(self, 67 pkts, 68 start=(0, 0), 69 stop=None, 70 *, 71 index=None, 72 filter_func: Optional[Callable] = None, 73 parent: Optional['PacketFilter'] = None): 74 if stop is None: 75 stop = (len(pkts), len(pkts)) 76 77 self._pkts = pkts 78 self._start_index = start 79 self._stop_index = stop 80 self._index = index if index is not None else self._start_index 81 self._last_index = -1 82 self._filter_func = filter_func or _always_true 83 self._parent = parent 84 self._check_type_ok() 85 86 def _check_type_ok(self): 87 assert self._last_index == -1 or 0 <= self._last_index < len(self._pkts) 88 89 assert isinstance(self._start_index, tuple) and len(self._start_index) == 2, self._start_index 90 assert isinstance(self._stop_index, tuple) and len(self._stop_index) == 2, self._stop_index 91 assert isinstance(self._index, tuple) and len(self._index) == 2, self._index 92 self._check_idx_range_ok((0, 0), self._start_index) 93 self._check_idx_range_ok(self._start_index, self._index) 94 self._check_idx_range_ok(self._index, self._stop_index) 95 self._check_idx_range_ok(self._stop_index, (len(self._pkts), len(self._pkts))) 96 97 def _check_idx_range_ok(self, start, stop): 98 assert start[0] <= stop[0], (start, stop) 99 assert start[1] <= stop[1], (start, stop) 100 101 @property 102 def index(self) -> Tuple[int, int]: 103 """ 104 :return: the current index (which is a tuple) 105 """ 106 return self._index 107 108 @index.setter 109 def index(self, index: Tuple[int, int]): 110 """ 111 Set the current index 112 :param index: the index tuple to set 113 """ 114 assert isinstance(index, tuple) and len(index) == 2, index 115 self._check_type_ok() 116 self._index = index 117 self._check_type_ok() 118 119 def __len__(self): 120 """ 121 :return: length of packets 122 """ 123 return len(self._pkts) 124 125 def save_index(self): 126 """ 127 Save the current index to be restored. 128 129 :return: a context that saves the current index when entering, and restores when exiting 130 """ 131 return _SavedIndex(self) 132 133 @property 134 def start_index(self) -> Tuple[int, int]: 135 """ 136 :return: the start index tuple 137 """ 138 return self._start_index 139 140 @property 141 def stop_index(self) -> Tuple[int, int]: 142 """ 143 :return: the stop index tuple 144 """ 145 return self._stop_index 146 147 def filter(self, func, cascade=True, **vars) -> 'PacketFilter': 148 """ 149 Create a new PacketFilter based on this packet filter with given filter func 150 151 :param func: a callable that returns a bool (e.x. lambda p: xxx) or a filter string 152 :param cascade: True if calling next in the new filter will also set index for this filter, False otherwise 153 :param vars: variables for filter string 154 :return: a new PacketFilter 155 """ 156 print('\n>>> filtering in range %s~%s%s:' % 157 (self._index, self._stop_index, "<end>" if self._stop_index == len(self._pkts) else "<stop>"), 158 file=sys.stderr) 159 160 func = make_filter_func(func, **vars) 161 self._check_type_ok() 162 return PacketFilter(self._pkts, 163 self._index, 164 self._stop_index, 165 filter_func=lambda p: self._filter_func(p) and func(p), 166 parent=self if cascade else None) 167 168 def filter_if(self, cond: bool, *args, **kwargs) -> 'PacketFilter': 169 """ 170 Create a filter using given arguments if `cond` is true. 171 172 :param cond: the condition to be checked 173 :param args: arguments for filter func 174 :param kwargs: arguments for filter func 175 :return: a sub filter using given arguments if cond is true, or self otherwise 176 """ 177 if cond: 178 return self.filter(*args, **kwargs) 179 else: 180 return self 181 182 @property 183 def last_index(self) -> Tuple[int, int]: 184 return self._last_index 185 186 def last(self) -> Packet: 187 """ 188 :return: the last packet found 189 """ 190 if self._last_index >= 0: 191 return self._pkts[self._last_index] 192 else: 193 raise errors.PacketNotFound(self.index, self._stop_index) 194 195 def next(self) -> Optional[Packet]: 196 """ 197 Find the next packet starting from the current index to the stop index that matches the current filter. 198 199 :return: the next matching packet, or None if packet not found 200 """ 201 self._check_type_ok() 202 idx = min(self._index) 203 stop_idx = max(self._stop_index) 204 205 while idx < stop_idx: 206 p = self._pkts[idx] 207 208 sys.stderr.write('#%d %s' % (idx + 1, '\n' if idx % 40 == 39 else '')) 209 if self._filter_func(p): 210 if p.wpan and not (self._index[0] <= idx < self._stop_index[0]): # wpan matched but not in range 211 pass 212 elif p.eth and not (self._index[1] <= idx < self._stop_index[1]): # eth matched but not in range 213 pass 214 else: 215 self._on_found_next(idx, p) 216 print("\n>>> found packet at #%d!" % (idx + 1,), file=sys.stderr) 217 return p 218 219 idx += 1 220 221 return None 222 223 def must_next(self) -> Packet: 224 """ 225 Call .next(), raise error if packet is not found. 226 227 :return: the next matching packet 228 """ 229 p = self.next() 230 if p is not None: 231 return p 232 else: 233 raise errors.PacketNotFound(self.index, self._stop_index) 234 235 def must_not_next(self) -> None: 236 """ 237 Call .next(), raise error if packet is found 238 """ 239 p = self.next() 240 if p is None: 241 return 242 else: 243 logging.error("Found unexpected packet at #%s", self.index) 244 p.show() 245 p.debug_fields() 246 raise errors.UnexpectedPacketFound(self.index, p) 247 248 def _on_found_next(self, idx: int, p: Packet): 249 assert self._pkts[idx] is p 250 assert idx >= min(self._index) 251 assert not p.wpan or idx >= self._index[0] 252 assert not p.eth or idx >= self._index[1], (self._index, idx) 253 254 if p.wpan: 255 wpan_idx = idx + 1 256 eth_idx = max(self._index[1], 257 self._find_prev_packet(idx + 1, p.sniff_timestamp - consts.AUTO_SEEK_BACK_MAX_DURATION, ETH)) 258 else: 259 eth_idx = idx + 1 260 wpan_idx = max( 261 self._index[0], 262 self._find_prev_packet(idx + 1, p.sniff_timestamp - consts.AUTO_SEEK_BACK_MAX_DURATION, WPAN)) 263 264 # make sure index never go back 265 assert wpan_idx >= self._index[0] 266 assert eth_idx >= self._index[1] 267 268 print('\n>>>_on_found_next %d %s => %s' % (idx, self._index, (wpan_idx, eth_idx)), file=sys.stderr) 269 self._set_found_index(idx, (wpan_idx, eth_idx)) 270 271 def _find_prev_packet(self, idx, min_sniff_timestamp, pkttype): 272 assert pkttype in (WPAN, ETH) 273 274 prev_idx = idx 275 while idx > 0 and self._pkts[idx - 1].sniff_timestamp >= min_sniff_timestamp: 276 idx -= 1 277 if pkttype == WPAN and self._pkts[idx].wpan: 278 prev_idx = idx 279 elif pkttype == ETH and self._pkts[idx].eth: 280 prev_idx = idx 281 282 return prev_idx 283 284 def __iter__(self): 285 for pkt in self._pkts: 286 yield pkt 287 288 def range(self, start, stop=None, cascade=True) -> 'PacketFilter': 289 """ 290 Create a new PacketFilter using the specified start and stop index tuples 291 292 :param start: the new start index tuple 293 :param stop: the new stop index tuple 294 :param cascade: True if calling next in the new filter will also set index for this filter, False otherwise 295 :return: a new PacketFilter with new start and stop range 296 """ 297 298 if stop is None: 299 stop = self._stop_index 300 301 assert self._start_index <= start <= self._stop_index 302 assert self._start_index <= stop <= self._stop_index 303 return PacketFilter(self._pkts, start, stop, filter_func=self._filter_func, parent=self if cascade else None) 304 305 def copy(self) -> 'PacketFilter': 306 """ 307 :return: a copy of the current PacketFilter 308 """ 309 return PacketFilter(self._pkts, self._index, self._stop_index, filter_func=self._filter_func, parent=None) 310 311 def __getitem__(self, index: int) -> Packet: 312 """ 313 :param index: the packet index (not tuple!) 314 :return: the packet at the specified index 315 """ 316 assert isinstance(index, int), index 317 return self._pkts[index] 318 319 def seek_back(self, max_duration: float, *, eth=False, wpan=False) -> 'PacketFilter': 320 """ 321 Move the current index back in time within the specified max duration. Either eth or wpan must be True. 322 323 :param max_duration: the max duration to move back 324 :param eth: True if eth index can be moved back 325 :param wpan: True if wpan index can be moved back 326 :return: self 327 """ 328 assert eth or wpan, "must have eth or wpan" 329 330 wpan_idx = self._index[0] 331 if wpan and wpan_idx < len(self._pkts): 332 wpan_idx = self._find_prev_packet(wpan_idx, self._pkts[wpan_idx].sniff_timestamp - max_duration, WPAN) 333 wpan_idx = max(self._start_index[0], wpan_idx) 334 335 eth_idx = self._index[1] 336 if eth and eth_idx < len(self._pkts): 337 eth_idx = self._find_prev_packet(eth_idx, self._pkts[eth_idx].sniff_timestamp - max_duration, ETH) 338 eth_idx = max(self._start_index[1], eth_idx) 339 340 print("\n>>> back %s wpan=%s, eth=%s: index %s => %s" % (max_duration, wpan, eth, self._index, 341 (wpan_idx, eth_idx)), 342 file=sys.stderr) 343 self._index = (wpan_idx, eth_idx) 344 self._check_type_ok() 345 return self 346 347 def _set_found_index(self, last_index: Tuple[int, int], index: Tuple[int, int]): 348 self._last_index = last_index 349 self._index = index 350 self._check_type_ok() 351 352 if self._parent is not None: 353 self._parent._set_found_index(last_index, index) 354 355 def filter_mle_advertisement(self, role: str, **kwargs): 356 assert role in ('Leader', 'Router', 'REED'), role 357 358 tlv_set = {consts.LEADER_DATA_TLV, consts.SOURCE_ADDRESS_TLV} 359 360 if role != 'REED': 361 tlv_set.add(consts.ROUTE64_TLV) 362 363 return self.filter_LLANMA(). \ 364 filter_mle_cmd(consts.MLE_ADVERTISEMENT). \ 365 filter(lambda p: tlv_set == 366 set(p.mle.tlv.type) and \ 367 p.ipv6.hlim == 255, **kwargs 368 ) 369 370 def filter_coap(self, **kwargs): 371 """ 372 Create a new PacketFilter to filter COAP packets. 373 374 :param kwargs: Extra arguments for `filter`. 375 :return: The new PacketFilter to filter COAP packets. 376 """ 377 return self.filter(attrgetter('coap'), **kwargs) 378 379 def filter_coap_request(self, uri_path, port=None, confirmable=None, **kwargs): 380 """ 381 Create a new PacketFilter to filter COAP Request packets. 382 383 :param uri_path: The COAP URI path to filter. 384 :param port: The UDP port to filter if specified. 385 :param kwargs: Extra arguments for `filter`. 386 :return: The new PacketFilter to filter COAP Request packets. 387 """ 388 assert isinstance(uri_path, str), uri_path 389 assert port is None or isinstance(port, int), port 390 return self.filter( 391 lambda p: (p.coap.is_post and p.coap.opt.uri_path_recon == uri_path and 392 (confirmable is None or p.coap.type == 393 (0 if confirmable else 1)) and (port is None or p.udp.dstport == port)), **kwargs) 394 395 def filter_coap_ack(self, uri_path, port=None, **kwargs): 396 """ 397 Create a new PacketFilter for filter COAP ACK packets. 398 399 :param uri_path: The COAP URI path to filter. 400 :param port: The UDP port to filter if specified. 401 :param kwargs: Extra arguments for `filter`. 402 :return: The new PacketFilter to filter COAP ACK packets. 403 """ 404 assert isinstance(uri_path, str), uri_path 405 assert port is None or isinstance(port, int), port 406 return self.filter( 407 lambda p: (p.coap.is_ack and p.coap.opt.uri_path_recon == uri_path and 408 (port is None or p.udp.dstport == port)), **kwargs) 409 410 def filter_backbone_answer(self, 411 target: str, 412 *, 413 eth_src: Optional[EthAddr] = None, 414 port: int = None, 415 confirmable: bool = None, 416 mliid=None): 417 filter_eth = self.filter_eth_src(eth_src) if eth_src else self.filter_eth() 418 f = filter_eth.filter_coap_request('/b/ba', port=port, 419 confirmable=confirmable).filter('thread_bl.tlv.target_eid == {target}', 420 target=target) 421 if mliid is not None: 422 f = f.filter('thread_bl.tlv.ml_eid == {mliid}', mliid=mliid) 423 424 return f 425 426 def filter_backbone_query(self, target: str, *, eth_src: EthAddr, port: int = None) -> 'PacketFilter': 427 return self.filter_eth_src(eth_src).filter_coap_request('/b/bq', port=port, confirmable=False).filter( 428 'thread_bl.tlv.target_eid == {target}', target=target) 429 430 def filter_wpan(self, **kwargs): 431 """ 432 Create a new PacketFilter for filter WPAN packets. 433 434 :param kwargs: Extra arguments for `filter`. 435 :return: The new PacketFilter to filter WPAN packets. 436 """ 437 return self.filter(attrgetter('wpan'), **kwargs) 438 439 def filter_wpan_ack(self, **kwargs): 440 """ 441 Create a new PacketFilter for filter WPAN ACK packets. 442 443 :param kwargs: Extra arguments for `filter`. 444 :return: The new PacketFilter to filter WPAN packets. 445 """ 446 return self.filter(lambda p: p.wpan.frame_type == consts.MAC_FRAME_TYPE_ACK, **kwargs) 447 448 def filter_wpan_beacon(self, **kwargs): 449 """ 450 Create a new PacketFilter for filter WPAN beacon. 451 452 :param kwargs: Extra arguments for `filter`. 453 :return: The new PacketFilter to filter WPAN packets. 454 """ 455 return self.filter(lambda p: p.wpan.frame_type == consts.MAC_FRAME_TYPE_BEACON, **kwargs) 456 457 def filter_wpan_data(self, **kwargs): 458 """ 459 Create a new PacketFilter for filter WPAN data packets. 460 461 :param kwargs: Extra arguments for `filter`. 462 :return: The new PacketFilter to filter WPAN packets. 463 """ 464 return self.filter(lambda p: p.wpan.frame_type == consts.MAC_FRAME_TYPE_DATA, **kwargs) 465 466 def filter_wpan_seq(self, seq, **kwargs): 467 """ 468 Create a new PacketFilter for filter WPAN packets of a sequence number. 469 470 :param seq: The sequence number to filter. 471 :param kwargs: Extra arguments for `filter`. 472 :return: The new PacketFilter to filter WPAN packets. 473 """ 474 return self.filter(lambda p: p.wpan.seq_no == seq, **kwargs) 475 476 def filter_wpan_version(self, version: int, **kwargs): 477 """ 478 Create a new PacketFilter for filter WPAN packets of a given version. 479 480 :param version: The version to filter. 481 :param kwargs: Extra arguments for `filter`. 482 :return: The new PacketFilter to filter WPAN packets. 483 """ 484 return self.filter(lambda p: p.wpan.version == version, **kwargs) 485 486 def filter_wpan_channel(self, channel: int, **kwargs): 487 """ 488 Create a new PacketFilter for filter WPAN packets of a given channel. 489 490 :param channel: The channel to filter. 491 :param kwargs: Extra arguments for `filter`. 492 :return: The new PacketFilter to filter WPAN packets. 493 """ 494 return self.filter(lambda p: p.wpan.channel == channel, **kwargs) 495 496 def filter_wpan_src16(self, addr, **kwargs): 497 return self.filter(lambda p: p.wpan.src16 == addr, **kwargs) 498 499 def filter_wpan_dst16(self, addr, **kwargs): 500 return self.filter(lambda p: p.wpan.dst16 == addr, **kwargs) 501 502 def filter_wpan_src16_dst16(self, src_addr, dst_addr, **kwargs): 503 return self.filter(lambda p: p.wpan.src16 == src_addr and p.wpan.dst16 == dst_addr, **kwargs) 504 505 def filter_wpan_src64(self, addr, **kwargs): 506 assert isinstance(addr, (str, ExtAddr)), addr 507 return self.filter(lambda p: p.wpan.src64 == addr, **kwargs) 508 509 def filter_wpan_dst64(self, addr, **kwargs): 510 assert isinstance(addr, (str, ExtAddr)), addr 511 return self.filter(lambda p: p.wpan.dst64 == addr, **kwargs) 512 513 def filter_dst16(self, rloc16: int, **kwargs): 514 return self.filter(lambda p: p.lowpan.mesh.dest16 == rloc16 or p.wpan.dst16 == rloc16, **kwargs) 515 516 def filter_wpan_ie_present(self, **kwargs): 517 return self.filter(lambda p: p.wpan.ie_present == 1) 518 519 def filter_wpan_ie_not_present(self, **kwargs): 520 return self.filter(lambda p: p.wpan.ie_present == 0) 521 522 def filter_ping_request(self, identifier=None, **kwargs): 523 return self.filter( 524 lambda p: p.icmpv6.is_ping_request and (identifier is None or p.icmpv6.echo.identifier == identifier), 525 **kwargs) 526 527 def filter_ping_reply(self, **kwargs): 528 identifier = kwargs.pop('identifier', None) 529 return self.filter( 530 lambda p: (p.icmpv6.is_ping_reply and (identifier is None or p.icmpv6.echo.identifier == identifier)), 531 **kwargs) 532 533 def filter_eth(self, **kwargs): 534 return self.filter(attrgetter('eth'), **kwargs) 535 536 def filter_eth_src(self, addr, **kwargs): 537 assert isinstance(addr, (str, EthAddr)) 538 return self.filter(lambda p: p.eth.src == addr, **kwargs) 539 540 def filter_ipv6_dst(self, addr, **kwargs): 541 assert isinstance(addr, (str, Ipv6Addr)) 542 return self.filter(lambda p: p.ipv6.dst == addr, **kwargs) 543 544 def filter_ipv6_2dsts(self, addr1, addr2, **kwargs): 545 assert isinstance(addr1, (str, Ipv6Addr)) 546 assert isinstance(addr2, (str, Ipv6Addr)) 547 return self.filter(lambda p: p.ipv6.dst == addr1 or p.ipv6.dst == addr2, **kwargs) 548 549 def filter_ipv6_src_dst(self, src_addr, dst_addr, **kwargs): 550 assert isinstance(src_addr, (str, Ipv6Addr)) 551 assert isinstance(dst_addr, (str, Ipv6Addr)) 552 return self.filter(lambda p: p.ipv6.src == src_addr and p.ipv6.dst == dst_addr, **kwargs) 553 554 def filter_LLATNMA(self, **kwargs): 555 return self.filter(lambda p: p.ipv6.dst == consts.LINK_LOCAL_All_THREAD_NODES_MULTICAST_ADDRESS, **kwargs) 556 557 def filter_RLANMA(self, **kwargs): 558 return self.filter(lambda p: p.ipv6.dst == consts.REALM_LOCAL_ALL_NODES_ADDRESS, **kwargs) 559 560 def filter_RLARMA(self, **kwargs): 561 return self.filter(lambda p: p.ipv6.dst == consts.REALM_LOCAL_ALL_ROUTERS_ADDRESS, **kwargs) 562 563 def filter_RLATNMA(self, **kwargs): 564 return self.filter(lambda p: p.ipv6.dst == consts.REALM_LOCAL_All_THREAD_NODES_MULTICAST_ADDRESS, **kwargs) 565 566 def filter_LLANMA(self, **kwargs): 567 return self.filter(lambda p: p.ipv6.dst == consts.LINK_LOCAL_ALL_NODES_MULTICAST_ADDRESS, **kwargs) 568 569 def filter_LLABMA(self, **kwargs): 570 return self.filter(lambda p: p.ipv6.dst == consts.LINK_LOCAL_ALL_BBRS_MULTICAST_ADDRESS, **kwargs) 571 572 def filter_LLARMA(self, **kwargs): 573 return self.filter(lambda p: p.ipv6.dst == consts.LINK_LOCAL_ALL_ROUTERS_MULTICAST_ADDRESS, **kwargs) 574 575 def filter_AMPLFMA(self, mpl_seed_id: Union[int, Ipv6Addr] = None, **kwargs): 576 f = self.filter(lambda p: p.ipv6.dst == consts.ALL_MPL_FORWARDERS_MA, **kwargs) 577 if mpl_seed_id is not None: 578 if isinstance(mpl_seed_id, int): 579 mpl_seed_id = Bytes([mpl_seed_id >> 8, mpl_seed_id & 0xFF]) 580 f = f.filter(lambda p: p.ipv6.opt.mpl.seed_id == mpl_seed_id) 581 else: 582 rloc = mpl_seed_id 583 rloc16 = bytes(rloc[-2:]) 584 f = f.filter(lambda p: (p.ipv6.src == rloc and p.ipv6.opt.mpl.flag.s == 0) or 585 (p.ipv6.src != rloc and p.ipv6.opt.mpl.flag.s == 1 and p.ipv6.opt.mpl.seed_id == rloc16)) 586 return f 587 588 def filter_mle(self, **kwargs): 589 return self.filter(attrgetter('mle'), **kwargs) 590 591 def filter_wpan_cmd(self, cmd, **kwargs): 592 assert isinstance(cmd, int), cmd 593 return self.filter(lambda p: p.wpan.cmd == cmd, **kwargs) 594 595 def filter_mle_cmd(self, cmd, **kwargs): 596 assert isinstance(cmd, int), cmd 597 return self.filter(lambda p: p.mle.cmd == cmd, **kwargs) 598 599 def filter_mle_cmd2(self, cmd1, cmd2, **kwargs): 600 assert isinstance(cmd1, int), cmd1 601 assert isinstance(cmd2, int), cmd2 602 return self.filter(lambda p: p.mle.cmd == cmd1 or p.mle.cmd == cmd2, **kwargs) 603 604 def filter_mle_has_tlv(self, *tlv_types, **kwargs): 605 return self.filter(lambda p: set(tlv_types) <= set(p.mle.tlv.type), **kwargs) 606 607 def filter_icmpv6(self, **kwargs): 608 return self.filter(attrgetter('icmpv6'), **kwargs) 609 610 def filter_icmpv6_nd_ns(self, target_address: Ipv6Addr): 611 return self.filter(lambda p: 612 (p.icmpv6.is_neighbor_solicitation and p.icmpv6.nd.ns.target_address == target_address)) 613 614 def filter_icmpv6_nd_na(self, target_address: Ipv6Addr): 615 return self.filter(lambda p: 616 (p.icmpv6.is_neighbor_advertisement and p.icmpv6.nd.na.target_address == target_address)) 617 618 def filter_icmpv6_nd_ra(self): 619 return self.filter(lambda p: p.icmpv6.is_router_advertisement) 620 621 def filter_has_bbr_dataset(self): 622 return self.filter(""" 623 thread_nwd.tlv.server.has('16') 624 and thread_nwd.tlv.service.s_data.seqno is not null 625 and thread_nwd.tlv.service.s_data.rrdelay is not null 626 and thread_nwd.tlv.service.s_data.mlrtimeout is not null 627 """) 628