1#!/usr/bin/env python3 2# 3# Copyright (c) 2019, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import logging 31import operator 32import sys 33 34from pktverify import consts 35from pktverify.test_info import TestInfo 36 37 38class NodeSummary(object): 39 """ 40 Represents a summary of a node. 41 """ 42 43 def __init__(self, role, extaddr): 44 self._role = role 45 self._extaddr = extaddr 46 self._ipaddrs = {} 47 48 @property 49 def role(self): 50 return self._role 51 52 @property 53 def extaddr(self): 54 return self._extaddr 55 56 @property 57 def ipaddr_link_local(self): 58 for a, _ in self._iter_ipaddrs_rev(): 59 if a.is_link_local: 60 return a 61 62 return None 63 64 @property 65 def ipaddr_mleid(self): 66 for a, _ in self._iter_ipaddrs_rev(): 67 if a.is_mleid: 68 return a 69 70 return None 71 72 def _iter_ipaddrs_rev(self): 73 return sorted(self._ipaddrs.items(), key=operator.itemgetter(1), reverse=True) 74 75 def add_ipaddr(self, ipaddr, index): 76 if ipaddr not in self._ipaddrs: 77 self._ipaddrs[ipaddr] = index 78 79 def __str__(self): 80 return "[node {role} extaddr {extaddr} ipaddrs {ipaddrs}]".format( 81 role=self._role, 82 extaddr=self.extaddr, 83 ipaddrs=", ".join(map(str, sorted(self._ipaddrs))), 84 ) 85 86 __repr__ = __str__ 87 88 89class Summary(object): 90 """ 91 Represents a summary of the test. 92 """ 93 94 def __init__(self, pkts, test_info: TestInfo): 95 self._pkts = pkts 96 self._test_info = test_info 97 self._leader_id = None 98 self._analyze() 99 100 def iterroles(self): 101 return self._role_to_node.items() 102 103 def _analyze(self): 104 self._analyze_test_info() 105 106 with self._pkts.save_index(): 107 for f in [ 108 self._analyze_leader, 109 self._analyze_packets, 110 ]: 111 self._pkts.index = (0, 0) 112 f() 113 114 def _analyze_test_info(self): 115 self._role_to_node = {} 116 self._extaddr_to_node = {} 117 118 for role, extaddr in self._test_info.extaddrs.items(): 119 assert role not in self._role_to_node 120 assert extaddr not in self._extaddr_to_node 121 122 node = NodeSummary(role, extaddr) 123 self._role_to_node[role] = node 124 self._extaddr_to_node[extaddr] = node 125 126 def _analyze_leader(self): 127 for p in self._pkts: 128 129 if p.mle.cmd in [consts.MLE_DATA_RESPONSE, consts.MLE_ADVERTISEMENT]: 130 131 p.mle.__getattr__('tlv') 132 p.mle.__getattr__('tlv.leader_data') 133 p.mle.__getattr__('tlv.leader_data.router_id') 134 135 tlv = p.mle.tlv 136 if tlv.leader_data: 137 self._leader_id = tlv.leader_data.router_id 138 logging.info("leader found in pcap: %d", self._leader_id) 139 break 140 else: 141 logging.warning("leader not found in pcap") 142 143 def _analyze_packets(self): 144 for i, p in enumerate(self._pkts): 145 extaddr, src = None, None 146 # each packet should be either wpan or eth 147 assert (p.wpan and not p.eth) or (p.eth and not p.wpan) 148 if p.wpan: 149 # it is a 802.15.4 packet 150 extaddr = p.wpan.src64 151 152 if p.ipv6: 153 # it is a IPv6 packet 154 src = p.ipv6.src 155 156 if extaddr and src: 157 if extaddr in self._extaddr_to_node: 158 role_sum = self._extaddr_to_node[extaddr] 159 role_sum.add_ipaddr(src, i) 160 else: 161 logging.warn("Extaddr %s is not in the testbed", extaddr) 162 163 def show(self): 164 show_roles = "\n\t\t".join(map(str, self._role_to_node.values())) 165 sys.stderr.write("""{header} 166 Pcap Summary: 167 packets = {num_packets} 168 roles = {num_roles} 169 {show_roles} 170 {tailer} 171 """.format( 172 header='>' * 120, 173 num_packets=len(self._pkts), 174 num_roles=len(self._role_to_node), 175 show_roles=show_roles, 176 tailer='<' * 120, 177 )) 178 179 def ipaddr_mleid_by_role(self, role): 180 node = self._role_to_node[role] 181 return node.ipaddr_mleid 182 183 def ipaddr_link_local_by_role(self, role): 184 node = self._role_to_node[role] 185 return node.ipaddr_link_local 186 187 def role(self, r): 188 return self._role_to_node[r] 189