1#!/usr/bin/env python3
2#
3#  Copyright (c) 2019, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29
30import logging
31import operator
32import sys
33
34from pktverify import consts
35from pktverify.test_info import TestInfo
36
37
38class NodeSummary(object):
39    """
40    Represents a summary of a node.
41    """
42
43    def __init__(self, role, extaddr):
44        self._role = role
45        self._extaddr = extaddr
46        self._ipaddrs = {}
47
48    @property
49    def role(self):
50        return self._role
51
52    @property
53    def extaddr(self):
54        return self._extaddr
55
56    @property
57    def ipaddr_link_local(self):
58        for a, _ in self._iter_ipaddrs_rev():
59            if a.is_link_local:
60                return a
61
62        return None
63
64    @property
65    def ipaddr_mleid(self):
66        for a, _ in self._iter_ipaddrs_rev():
67            if a.is_mleid:
68                return a
69
70        return None
71
72    def _iter_ipaddrs_rev(self):
73        return sorted(self._ipaddrs.items(), key=operator.itemgetter(1), reverse=True)
74
75    def add_ipaddr(self, ipaddr, index):
76        if ipaddr not in self._ipaddrs:
77            self._ipaddrs[ipaddr] = index
78
79    def __str__(self):
80        return "[node {role} extaddr {extaddr} ipaddrs {ipaddrs}]".format(
81            role=self._role,
82            extaddr=self.extaddr,
83            ipaddrs=", ".join(map(str, sorted(self._ipaddrs))),
84        )
85
86    __repr__ = __str__
87
88
89class Summary(object):
90    """
91    Represents a summary of the test.
92    """
93
94    def __init__(self, pkts, test_info: TestInfo):
95        self._pkts = pkts
96        self._test_info = test_info
97        self._leader_id = None
98        self._analyze()
99
100    def iterroles(self):
101        return self._role_to_node.items()
102
103    def _analyze(self):
104        self._analyze_test_info()
105
106        with self._pkts.save_index():
107            for f in [
108                    self._analyze_leader,
109                    self._analyze_packets,
110            ]:
111                self._pkts.index = (0, 0)
112                f()
113
114    def _analyze_test_info(self):
115        self._role_to_node = {}
116        self._extaddr_to_node = {}
117
118        for role, extaddr in self._test_info.extaddrs.items():
119            assert role not in self._role_to_node
120            assert extaddr not in self._extaddr_to_node
121
122            node = NodeSummary(role, extaddr)
123            self._role_to_node[role] = node
124            self._extaddr_to_node[extaddr] = node
125
126    def _analyze_leader(self):
127        for p in self._pkts:
128
129            if p.mle.cmd in [consts.MLE_DATA_RESPONSE, consts.MLE_ADVERTISEMENT]:
130
131                p.mle.__getattr__('tlv')
132                p.mle.__getattr__('tlv.leader_data')
133                p.mle.__getattr__('tlv.leader_data.router_id')
134
135                tlv = p.mle.tlv
136                if tlv.leader_data:
137                    self._leader_id = tlv.leader_data.router_id
138                    logging.info("leader found in pcap: %d", self._leader_id)
139                    break
140        else:
141            logging.warning("leader not found in pcap")
142
143    def _analyze_packets(self):
144        for i, p in enumerate(self._pkts):
145            extaddr, src = None, None
146            # each packet should be either wpan or eth
147            assert (p.wpan and not p.eth) or (p.eth and not p.wpan)
148            if p.wpan:
149                # it is a 802.15.4 packet
150                extaddr = p.wpan.src64
151
152            if p.ipv6:
153                # it is a IPv6 packet
154                src = p.ipv6.src
155
156            if extaddr and src:
157                if extaddr in self._extaddr_to_node:
158                    role_sum = self._extaddr_to_node[extaddr]
159                    role_sum.add_ipaddr(src, i)
160                else:
161                    logging.warn("Extaddr %s is not in the testbed", extaddr)
162
163    def show(self):
164        show_roles = "\n\t\t".join(map(str, self._role_to_node.values()))
165        sys.stderr.write("""{header}
166    Pcap Summary:
167        packets = {num_packets}
168        roles = {num_roles}
169            {show_roles}
170    {tailer}
171    """.format(
172            header='>' * 120,
173            num_packets=len(self._pkts),
174            num_roles=len(self._role_to_node),
175            show_roles=show_roles,
176            tailer='<' * 120,
177        ))
178
179    def ipaddr_mleid_by_role(self, role):
180        node = self._role_to_node[role]
181        return node.ipaddr_mleid
182
183    def ipaddr_link_local_by_role(self, role):
184        node = self._role_to_node[role]
185        return node.ipaddr_link_local
186
187    def role(self, r):
188        return self._role_to_node[r]
189