1#!/usr/bin/env python3
2#
3#  Copyright (c) 2019, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29import logging
30from typing import Tuple
31
32from pktverify import consts
33from pktverify.consts import MLE_CHILD_ID_REQUEST, MLE_ADVERTISEMENT, MLE_CHILD_ID_RESPONSE
34from pktverify.pcap_reader import PcapReader
35from pktverify.summary import Summary
36from pktverify.test_info import TestInfo
37from pktverify.verify_result import VerifyResult
38
39
40class PacketVerifier(object):
41    """
42    Base class for packet verifiers that runs the packet verification process
43    """
44    NET_NAME = "OpenThread"
45    MC_PORT = 49191
46    MM_PORT = 61631
47    BB_PORT = 61631
48    LLANMA = 'ff02::1'  # Link-Local All Nodes multicast address
49    LLARMA = 'ff02::2'  # Link-Local All Routers multicast address
50    RLANMA = 'ff03::1'  # realm-local all-nodes multicast address
51    RLARMA = 'ff03::2'  # realm-local all-routers multicast address
52    RLAMFMA = 'ff03::fc'  # realm-local ALL_MPL_FORWARDERS address
53    LLABMA = 'ff32:40:fd00:7d03:7d03:7d03:0:3'  # Link-Local All BBRs multicast address
54
55    def __init__(self, test_info_path, wireshark_prefs=None):
56        logging.basicConfig(level=logging.INFO,
57                            format='File "%(pathname)s", line %(lineno)d, in %(funcName)s\n'
58                            '%(asctime)s - %(levelname)s - %(message)s')
59
60        ti = TestInfo(test_info_path)
61        if wireshark_prefs is not None:
62            pkts = PcapReader.read(ti.pcap_path, wireshark_prefs)
63        else:
64            pkts = PcapReader.read(ti.pcap_path)
65        print('loaded %d packets from %s' % (len(pkts), ti.pcap_path))
66        self.pkts = pkts
67        self.test_info = ti
68
69        self.summary = Summary(pkts, ti)
70        self._vars = {}
71        self._add_initial_vars()
72
73    def add_vars(self, **vars):
74        """
75        Add new variables.
76
77        :param vars: The new variables.
78        """
79        self._vars.update(vars)
80
81    @property
82    def vars(self):
83        """
84        :return: the dict of all variables
85        """
86        return self._vars
87
88    def add_common_vars(self):
89        """
90        Add common variables that is needed by many test cases.
91        """
92        self.add_vars(
93            NET_NAME=PacketVerifier.NET_NAME,
94            MM_PORT=PacketVerifier.MM_PORT,
95            MC_PORT=PacketVerifier.MC_PORT,
96            BB_PORT=PacketVerifier.BB_PORT,
97            LLANMA=PacketVerifier.LLANMA,  # Link-Local All Nodes multicast address
98            LLARMA=PacketVerifier.LLARMA,  # Link-Local All Routers multicast address
99            RLANMA=PacketVerifier.RLANMA,  # realm-local all-nodes multicast address
100            RLARMA=PacketVerifier.RLARMA,  # realm-local all-routers multicast address
101            RLAMFMA=PacketVerifier.RLAMFMA,  # realm-local ALL_MPL_FORWARDERS address
102            LLABMA=PacketVerifier.LLABMA,  # Link-Local All BBRs multicast address
103            MA1=consts.MA1,
104            MA2=consts.MA2,
105            MA3=consts.MA3,
106            MA4=consts.MA4,
107            MA5=consts.MA5,
108            MA6=consts.MA6,
109            MA1g=consts.MA1g,
110            MAe1=consts.MAe1,
111            MAe2=consts.MAe2,
112            MAe3=consts.MAe3,
113        )
114
115    def _add_initial_vars(self):
116        for i, addr in self.test_info.extaddrs.items():
117            name = self.test_info.get_node_name(i)
118            self._vars[name] = addr
119
120        for i, addr in self.test_info.ethaddrs.items():
121            name = self.test_info.get_node_name(i) + '_ETH'
122            self._vars[name] = addr
123
124        for i, addrs in self.test_info.ipaddrs.items():
125            name = self.test_info.get_node_name(i)
126            self._vars[name + '_IPADDRS'] = addrs
127            for addr in addrs:
128                if addr.is_dua:
129                    key = name + '_DUA'
130                elif addr.is_backbone_gua:
131                    key = name + '_BGUA'
132                elif addr.is_link_local and (name + '_BGUA') in self._vars:
133                    # FIXME: assume the link-local address after Backbone GUA is the Backbone Link Local address
134                    key = name + '_BLLA'
135                elif addr.is_link_local:
136                    key = name + '_LLA'
137                else:
138                    logging.warning("IPv6 address ignored: name=%s, addr=%s, is_global=%s, is_link_local=%s", name,
139                                    addr, addr.is_global, addr.is_link_local)
140                    continue
141
142                if key in self._vars:
143                    logging.warning("duplicate IPv6 address type: name=%s, addr=%s,%s", name, addr, self._vars[key])
144                    continue
145
146                self._vars[key] = addr
147
148        for i, addr in self.test_info.mleids.items():
149            name = self.test_info.get_node_name(i)
150            self._vars[name + '_MLEID'] = addr
151
152        for i, rloc16 in self.test_info.rloc16s.items():
153            key = self.test_info.get_node_name(i) + '_RLOC16'
154            self._vars[key] = rloc16
155
156        for i, rloc in self.test_info.rlocs.items():
157            key = self.test_info.get_node_name(i) + '_RLOC'
158            self._vars[key] = rloc
159
160        for i, omr in self.test_info.omrs.items():
161            key = self.test_info.get_node_name(i) + '_OMR'
162            self._vars[key] = omr
163
164        for i, dua in self.test_info.duas.items():
165            key = self.test_info.get_node_name(i) + '_DUA'
166            self._vars[key] = dua
167
168        if self.test_info.leader_aloc:
169            self._vars['LEADER_ALOC'] = self.test_info.leader_aloc
170
171        for k, v in self.test_info.extra_vars.items():
172            assert k not in self._vars, k
173            logging.info("add extra var: %s = %s", k, v)
174            self._vars[k] = v
175
176        for i, topo in self.test_info.topology.items():
177            name = self.test_info.get_node_name(i)
178            if topo['version']:
179                self._vars[name + '_VERSION'] = {'1.1': 2, '1.2': 3, '1.3': 4, '1.4': 5}[topo['version']]
180
181    def verify_attached(self, child: str, parent: str = None, child_type: str = 'FTD', pkts=None) -> VerifyResult:
182        """
183        Verify that the device attaches to the Thread network.
184
185        :param child: The child device name.
186        :param parent: The parent device name.
187        :param child_type: The child device type (FTD, FTD-ED, MTD).
188        """
189        result = VerifyResult()
190        assert self.is_thread_device(child), child
191        assert child_type in ('FTD', 'FTD-ED', 'MTD'), child_type
192        pkts = pkts or self.pkts
193        child_extaddr = self.vars[child]
194
195        src_pkts = pkts.filter_wpan_src64(child_extaddr)
196        if parent:
197            assert self.is_thread_device(parent), parent
198            src_pkts = pkts.filter_wpan_src64(child_extaddr).\
199                filter_wpan_dst64(self.vars[parent])
200        src_pkts.filter_mle_cmd(MLE_CHILD_ID_REQUEST).must_next()  # Child Id Request
201        result.record_last('child_id_request', pkts)
202
203        dst_pkts = pkts.filter_wpan_dst64(child_extaddr)
204        if parent:
205            dst_pkts = pkts.filter_wpan_src64(self.vars[parent]).\
206                filter_wpan_dst64(child_extaddr)
207        dst_pkts.filter_mle_cmd(MLE_CHILD_ID_RESPONSE).must_next()  # Child Id Response
208        result.record_last('child_id_response', pkts)
209
210        with pkts.save_index():
211            if child_type == 'FTD':
212                src_pkts = pkts.filter_wpan_src64(child_extaddr)
213                src_pkts.filter_mle_cmd(MLE_ADVERTISEMENT).must_next()  # MLE Advertisement
214                result.record_last('mle_advertisement', pkts)
215            logging.info(f"verify attached: d={child}, result={result}")
216
217        return result
218
219    def verify_ping(self, src: str, dst: str, bbr: str = None, pkts: 'PacketVerifier' = None) -> VerifyResult:
220        """
221        Verify the ping process.
222
223        :param src: The source device name.
224        :param dst: The destination device name.
225        :param bbr: The Backbone Router name.
226                    If specified, this method also verifies that the ping request and reply be forwarded by the Backbone Router.
227        :param pkts: The PacketFilter to search.
228
229        :return: The verification result.
230        """
231        if bbr:
232            assert not (self.is_thread_device(src) and self.is_thread_device(dst)), \
233                f"both {src} and {dst} are WPAN devices"
234            assert not (self.is_backbone_device(src) and self.is_backbone_device(dst)), \
235                f"both {src} and {dst} are ETH devices"
236
237        if pkts is None:
238            pkts = self.pkts
239
240        src_dua = self.vars[src + '_DUA']
241        dst_dua = self.vars[dst + '_DUA']
242        if bbr:
243            bbr_ext = self.vars[bbr]
244            bbr_eth = self.vars[bbr + '_ETH']
245
246        result = VerifyResult()
247        ping_req = pkts.filter_ping_request().filter_ipv6_dst(dst_dua)
248        if self.is_backbone_device(src):
249            p = ping_req.filter_eth_src(self.vars[src + '_ETH']).must_next()
250        else:
251            p = ping_req.filter_wpan_src64(self.vars[src]).must_next()
252
253        # pkts.last().show()
254        ping_id = p.icmpv6.echo.identifier
255        logging.info("verify_ping: ping_id=%x", ping_id)
256        result.record_last('ping_request', pkts)
257        ping_req = ping_req.filter(lambda p: p.icmpv6.echo.identifier == ping_id)
258
259        # BBR unicasts the ping packet to TD.
260        if bbr:
261            if self.is_backbone_device(src):
262                ping_req.filter_wpan_src64(bbr_ext).must_next()
263            else:
264                ping_req.filter_eth_src(bbr_eth).must_next()
265
266        ping_reply = pkts.filter_ping_reply().filter_ipv6_dst(src_dua).filter(
267            lambda p: p.icmpv6.echo.identifier == ping_id)
268        # TD receives ping packet and responds back to Host via SBBR.
269        if self.is_thread_device(dst):
270            ping_reply.filter_wpan_src64(self.vars[dst]).must_next()
271        else:
272            ping_reply.filter_eth_src(self.vars[dst + '_ETH']).must_next()
273
274        result.record_last('ping_reply', pkts)
275
276        if bbr:
277            # SBBR forwards the ping response packet to Host.
278            if self.is_thread_device(dst):
279                ping_reply.filter_eth_src(bbr_eth).must_next()
280            else:
281                ping_reply.filter_wpan_src64(bbr_ext).must_next()
282
283        return result
284
285    def is_thread_device(self, name: str) -> bool:
286        """
287        Returns if the device is an WPAN device.
288
289        :param name: The device name.
290
291        Note that device can be both a WPAN device and an Ethernet device.
292        """
293        assert isinstance(name, str), name
294
295        return name in self.vars
296
297    def is_backbone_device(self, name: str) -> bool:
298        """
299        Returns if the device s an Ethernet device.
300
301        :param name: The device name.
302
303        Note that device can be both a WPAN device and an Ethernet device.
304        """
305        assert isinstance(name, str), name
306
307        return f'{name}_ETH' in self.vars
308
309    def max_index(self, *indexes: Tuple[int, int]) -> Tuple[int, int]:
310        wpan_idx = 0
311        eth_idx = 0
312        for wi, ei in indexes:
313            wpan_idx = max(wpan_idx, wi)
314            eth_idx = max(eth_idx, ei)
315
316        return wpan_idx, eth_idx
317
318    def verify_dua_registration(self, src64, dua, *, pbbr_eth, sbbr_eth=None, pbbr_src64=None):
319        pv, pkts = self, self.pkts
320        MM = pv.vars['MM_PORT']
321        BB = pv.vars['BB_PORT']
322
323        # Router1 should send /n/dr for DUA registration
324        dr = pkts.filter_wpan_src64(src64).filter_coap_request('/n/dr', port=MM).filter(
325            'thread_nm.tlv.target_eid == {ROUTER1_DUA}', ROUTER1_DUA=dua).must_next()
326
327        # SBBR should not send /b/bq for Router1's DUA
328        if sbbr_eth is not None:
329            pkts.filter_backbone_query(dua, eth_src=sbbr_eth, port=BB).must_not_next()
330
331        # PBBR should respond to /n/dr
332        if pbbr_src64 is not None:
333            pkts.filter_wpan_src64(pbbr_src64).filter_coap_ack(
334                '/n/dr', port=MM).must_next().must_verify('thread_nm.tlv.status == 0')
335
336        # PBBR should send /b/bq for Router1's DUA (1st time)
337        bq1 = pkts.filter_backbone_query(dua, eth_src=pbbr_eth, port=BB).must_next()
338        bq1_index = pkts.index
339
340        assert bq1.sniff_timestamp - dr.sniff_timestamp <= 1.01, bq1.sniff_timestamp - dr.sniff_timestamp
341
342        # PBBR should send /b/bq for Router1's DUA (2nd time)
343        bq2 = pkts.filter_backbone_query(dua, eth_src=pbbr_eth, port=BB).must_next()
344
345        assert 0.9 < bq2.sniff_timestamp - bq1.sniff_timestamp < 1.1, bq2.sniff_timestamp - bq1.sniff_timestamp
346
347        # PBBR should send /b/bq for Router1's DUA (3rd time)
348        bq3 = pkts.filter_backbone_query(dua, eth_src=pbbr_eth, port=BB).must_next()
349
350        assert 0.9 < bq3.sniff_timestamp - bq2.sniff_timestamp < 1.1, bq3.sniff_timestamp - bq2.sniff_timestamp
351
352        # PBBR should send PRO_BB.ntf for Router's DUA when DAD completed
353        pkts.filter_eth_src(pbbr_eth).filter_backbone_answer(dua, port=BB, confirmable=False).must_next().show()
354
355        # PBBR should not recv /b/ba response from other BBRs during this period
356        pkts.range(bq1_index, pkts.index,
357                   cascade=False).filter('eth.src != {PBBR_ETH}',
358                                         PBBR_ETH=pbbr_eth).filter_backbone_answer(dua, port=BB).must_not_next()
359