1#!/usr/bin/env python3
2#
3#  Copyright (c) 2021, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 'AS IS'
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29
30import logging
31import unittest
32
33import pktverify
34from pktverify import packet_verifier, packet_filter, consts
35from pktverify.consts import MA1, MA2
36import config
37import thread_cert
38
39# Test description:
40# The purpose of this test case is to verify that a Primary BBR correctly
41# decrements the Hop Limit field by 1 if packet is forwarded with MPL, and if
42# forwarded from Thread Network (using MPL) to LAN. This test also verifies that
43# the BR drops a packet with Hop Limit 0. It also checks the use of IPv6 packets
44# that span multiple 6LoWPAN fragments.
45#
46# Topology:
47#    ----------------(eth)------------------
48#           |                        |
49#         BR_1 (Leader)              |
50#           |                       HOST
51#           |
52#          ROUTER_1
53#
54
55BR_1 = 1
56ROUTER_1 = 2
57HOST = 3
58ICMP_HEADER_LEN = 8
59
60
61class MATN_12_HopLimitProcessing(thread_cert.TestCase):
62    USE_MESSAGE_FACTORY = False
63
64    TOPOLOGY = {
65        BR_1: {
66            'name': 'BR_1',
67            'is_otbr': True,
68            'allowlist': [ROUTER_1],
69            'version': '1.2',
70        },
71        ROUTER_1: {
72            'name': 'Router_1',
73            'allowlist': [BR_1],
74            'version': '1.2',
75        },
76        HOST: {
77            'name': 'Host',
78            'is_host': True,
79        },
80    }
81
82    def test(self):
83        br1 = self.nodes[BR_1]
84        router1 = self.nodes[ROUTER_1]
85        host = self.nodes[HOST]
86
87        br1.start()
88        self.simulator.go(config.LEADER_STARTUP_DELAY)
89        self.assertEqual('leader', br1.get_state())
90        self.assertTrue(br1.is_primary_backbone_router)
91
92        router1.start()
93        self.simulator.go(5)
94        self.assertEqual('router', router1.get_state())
95
96        host.start(start_radvd=False)
97        self.simulator.go(10)
98
99        # Router_1 registers a multicast address, MA1.
100        router1.add_ipmaddr(MA1)
101        self.simulator.go(5)
102
103        # 1. Host multicasts a ping packet to the multicast address, MA1, with
104        # the IPv6 Hop Limit field set to 59. The size of the payload is 130
105        # bytes.
106        self.assertTrue(
107            host.ping(MA1,
108                      backbone=True,
109                      ttl=59,
110                      size=130,
111                      interface=host.get_ip6_address(config.ADDRESS_TYPE.ONLINK_ULA)[0]))
112        self.simulator.go(5)
113
114        # 4. Host multicasts a ping packet to the multicast address, MA1, with
115        # the IPv6 Hop Limit field set to 1. The size of the payload is 130
116        # bytes.
117        host.ping(MA1,
118                  backbone=True,
119                  ttl=1,
120                  size=130,
121                  interface=host.get_ip6_address(config.ADDRESS_TYPE.ONLINK_ULA)[0])
122        self.simulator.go(5)
123
124        # 6. Router_1 sends a ping packet encapsulated in an MPL packet to the
125        # multicast address, MA2, with the Hop Limit field of the inner packet
126        # set to 159. The size of the payload is 130 bytes.
127        self.assertFalse(router1.ping(MA2, hoplimit=159, size=130))
128        self.simulator.go(5)
129
130        # 8. Router_1 sends a ping packet encapsulated in an MPL multicast
131        # packet to the multicast address, MA2, with the Hop Limit field of
132        # the inner (encapsulated) packet set to 2. The size of the payload is
133        # 130 bytes.
134        self.assertFalse(router1.ping(MA2, hoplimit=2, size=130))
135        self.simulator.go(5)
136
137        # 10. Router_1 sends a ping packet encapsulated in an MPL multicast
138        # packet to the multicast address, MA2, with the Hop Limit field of the
139        # inner packet set to 1. The size of the payload is 130 bytes.
140        self.assertFalse(router1.ping(MA2, hoplimit=1, size=130))
141        self.simulator.go(5)
142
143        # 11. Router_1 sends a ping packet encapsulated in an MPL multicast
144        # packet to the multicast address, MA2, with the Hop Limit field of the
145        # inner packet set to 0.
146        self.assertFalse(router1.ping(MA2, hoplimit=0, size=130))
147        self.simulator.go(5)
148
149        self.collect_ipaddrs()
150        self.collect_rloc16s()
151        self.collect_rlocs()
152        self.collect_extra_vars()
153
154    def verify(self, pv: pktverify.packet_verifier.PacketVerifier):
155        pkts = pv.pkts
156        vars = pv.vars
157        pv.summary.show()
158        logging.info(f'vars = {vars}')
159
160        # Ensure the topology is formed correctly
161        pv.verify_attached('Router_1', 'BR_1')
162
163        # 1. Host multicasts a ping packet to the multicast address, MA1, with
164        # the IPv6 Hop Limit field set to 59. The size of the payload is 130
165        # bytes.
166        _pkt = pkts.filter_eth_src(vars['Host_ETH']) \
167            .filter_ipv6_dst(MA1) \
168            .filter_ping_request() \
169            .filter(lambda
170                        p: p.ipv6.hlim == 59 and p.ipv6.plen == 130 + ICMP_HEADER_LEN) \
171            .must_next()
172
173        # 2. BR_1 forwards the ping packet to Router_1 as an MPL packet
174        # encapsulating the IPv6 packet with the Hop Limit field of the inner
175        # packet set to 58.
176        _pkt2 = pkts.filter_wpan_src64(vars['BR_1']) \
177            .filter_AMPLFMA(mpl_seed_id=vars['BR_1_RLOC']) \
178            .filter_ping_request(identifier=_pkt.icmpv6.echo.identifier) \
179            .filter(lambda
180                        p: p.ipv6inner.hlim == 58 and p.ipv6inner.plen == 130 + ICMP_HEADER_LEN) \
181            .must_next()
182
183        # 3. Router_1 receives the multicast ping packet.
184        pkts.filter_wpan_src64(vars['Router_1']) \
185            .filter_ipv6_dst(_pkt.ipv6.src) \
186            .filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier) \
187            .must_next()
188
189        # 4. Host multicasts a ping packet to the multicast address, MA1, with
190        # the IPv6 Hop Limit field set to 1. The size of the payload is 130
191        # bytes.
192        _pkt = pkts.filter_eth_src(vars['Host_ETH']) \
193            .filter_ipv6_dst(MA1) \
194            .filter_ping_request() \
195            .filter(
196            lambda
197                p: p.ipv6.hlim == 1 and p.ipv6.plen == 130 + ICMP_HEADER_LEN) \
198            .must_next()
199
200        # 5. BR_1 does not forward the ping packet.
201        pkts.filter_wpan_src64(vars['BR_1']) \
202            .filter_AMPLFMA(mpl_seed_id=vars['BR_1']) \
203            .filter_ping_request(identifier=_pkt.icmpv6.echo.identifier) \
204            .must_not_next()
205
206        # 6. Router_1 sends a ping packet encapsulated in an MPL packet to the
207        # multicast address, MA2, with the Hop Limit field of the inner
208        # packet set to 159. The size of the payload is 130 bytes.
209        _pkt = pkts.filter_wpan_src64(vars['Router_1']) \
210            .filter_AMPLFMA(mpl_seed_id=vars['Router_1_RLOC']) \
211            .filter_ping_request() \
212            .filter(lambda p: p.ipv6inner.dst == MA2 and
213                              p.ipv6inner.hlim == 159 and
214                              p.ipv6inner.plen == 130 + ICMP_HEADER_LEN) \
215            .must_next()
216
217        # 7. BR_1 forwards the multicast ping packet to the LAN with the Hop
218        # Limit field set to 158.
219        pkts.filter_eth_src(vars['BR_1_ETH']) \
220            .filter_ping_request(identifier=_pkt.icmpv6.echo.identifier) \
221            .filter(lambda p: p.ipv6.hlim == 158) \
222            .must_next()
223
224        # 8. Router_1 sends a ping packet encapsulated in an MPL multicast
225        # packet to the multicast address, MA2, with the Hop Limit field of the
226        # inner packet set to 2. The size of the payload is 130 bytes
227        _pkt = pkts.filter_wpan_src64(vars['Router_1']) \
228            .filter_AMPLFMA(mpl_seed_id=vars['Router_1_RLOC']) \
229            .filter_ping_request() \
230            .filter(lambda p: p.ipv6inner.dst == MA2 and
231                              p.ipv6inner.hlim == 2 and
232                              p.ipv6inner.plen == 130 + ICMP_HEADER_LEN) \
233            .must_next()
234
235        # 9. BR_1 forwards the multicast packet to the LAN with the Hop Limit
236        # field set to 1.
237        pkts.filter_eth_src(vars['BR_1_ETH']) \
238            .filter_ping_request(identifier=_pkt.icmpv6.echo.identifier) \
239            .filter(lambda p: p.ipv6.hlim == 1) \
240            .must_next()
241
242        # 10. Router_1 sends a ping packet encapsulated in an MPL multicast
243        # packet to the multicast address, MA2, with the Hop Limit field of the
244        # inner packet set to 1. The size of the payload is 130 bytes.
245        _pkt = pkts.filter_wpan_src64(vars['Router_1']) \
246            .filter_AMPLFMA(mpl_seed_id=vars['Router_1_RLOC']) \
247            .filter_ping_request() \
248            .filter(lambda p: p.ipv6inner.dst == MA2 and
249                              p.ipv6inner.hlim == 1 and
250                              p.ipv6inner.plen == 130 + ICMP_HEADER_LEN) \
251            .must_next()
252
253        # 11. BR_1 does not forward the ping packet to the LAN.
254        pkts.filter_eth_src(vars['BR_1_ETH']) \
255            .filter_ping_request(identifier=_pkt.icmpv6.echo.identifier) \
256            .must_not_next()
257
258        # 12. Router_1 sends a ping packet encapsulated in an MPL multicast
259        # packet to the multicast address, MA2, with the Hop Limit field of the
260        # inner packet set to 0.
261        _pkt = pkts.filter_wpan_src64(vars['Router_1']) \
262            .filter_AMPLFMA(mpl_seed_id=vars['Router_1_RLOC']) \
263            .filter_ping_request() \
264            .filter(lambda p: p.ipv6inner.dst == MA2 and
265                              p.ipv6inner.hlim == 0 and
266                              p.ipv6inner.plen == 130 + ICMP_HEADER_LEN) \
267            .must_next()
268
269        # 13. BR_1 does not forward the ping packet to the LAN.
270        pkts.filter_eth_src(vars['BR_1_ETH']) \
271            .filter_ping_request(identifier=_pkt.icmpv6.echo.identifier) \
272            .must_not_next()
273
274
275if __name__ == '__main__':
276    unittest.main()
277