1#!/usr/bin/env python3
2#
3#  Copyright (c) 2020, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 'AS IS'
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29# This test verifies that PBBR sets DUA routes correctly.
30#
31import ipaddress
32import re
33import unittest
34
35import config
36import thread_cert
37from pktverify.packet_verifier import PacketVerifier
38
39# Use two channels
40CH1 = 11
41CH2 = 22
42
43PBBR = 1
44SBBR = 2
45ROUTER1 = 3
46HOST = 4
47PBBR2 = 5
48ROUTER2 = 6
49
50REREG_DELAY = 5  # Seconds
51MLR_TIMEOUT = 300  # Seconds
52WAIT_REDUNDANCE = 3
53
54DUPLICATE_DUA = 'fd00:7d03:7d03:7d03:11:2233:4455:6677'
55assert (re.match(config.DOMAIN_PREFIX_REGEX_PATTERN, DUPLICATE_DUA))
56
57DUPLICATE_IID = ''.join('%02x' % c for c in ipaddress.IPv6Address(DUPLICATE_DUA).packed[-8:])
58
59print('DUPLICATE_DUA: ', DUPLICATE_DUA)
60print('DUPLICATE_IID: ', DUPLICATE_IID)
61
62
63class TestDuaDad(thread_cert.TestCase):
64    USE_MESSAGE_FACTORY = False
65
66    # Topology:
67    #    ------(eth)----------------------
68    #       |        |       |      |
69    #      PBBR----SBBR    HOST   PBBR2
70    #        \ CH1 /               | CH2
71    #        ROUTER1             ROUTER2
72    #
73    # PBBR2 is in the secondary channel
74    #
75    TOPOLOGY = {
76        PBBR: {
77            'name': 'PBBR',
78            'allowlist': [SBBR, ROUTER1],
79            'is_otbr': True,
80            'version': '1.2',
81            'channel': CH1,
82            'prefer_router_id': 0x1,  # Use prefer_router_id to avoid Router ID conflicts in the two channels.
83        },
84        SBBR: {
85            'name': 'SBBR',
86            'allowlist': [PBBR, ROUTER1],
87            'is_otbr': True,
88            'version': '1.2',
89            'channel': CH1,
90            'prefer_router_id': 0x2,
91        },
92        ROUTER1: {
93            'name': 'ROUTER1',
94            'allowlist': [PBBR, SBBR],
95            'version': '1.2',
96            'channel': CH1,
97            'prefer_router_id': 0x3,
98        },
99        HOST: {
100            'name': 'HOST',
101            'is_host': True
102        },
103        PBBR2: {
104            'name': 'PBBR2',
105            'is_otbr': True,
106            'version': '1.2',
107            'channel': CH2,
108            'prefer_router_id': 0x4,
109        },
110        ROUTER2: {
111            'name': 'ROUTER2',
112            'version': '1.2',
113            'channel': CH2,
114            'prefer_router_id': 0x5,
115        },
116    }
117
118    def _bootstrap(self):
119        # Bring up HOST
120        self.nodes[HOST].start()
121
122        # Bring up PBBR
123        self.nodes[PBBR].start()
124        self.simulator.go(config.LEADER_STARTUP_DELAY)
125        self.assertEqual('leader', self.nodes[PBBR].get_state())
126        self.wait_node_state(PBBR, 'leader', 10)
127
128        self.nodes[PBBR].set_backbone_router(reg_delay=REREG_DELAY, mlr_timeout=MLR_TIMEOUT)
129        self.nodes[PBBR].enable_backbone_router()
130        self.nodes[PBBR].set_domain_prefix(config.DOMAIN_PREFIX, 'prosD')
131        self.simulator.go(5)
132        self.assertTrue(self.nodes[PBBR].is_primary_backbone_router)
133        self.assertIsNotNone(self.nodes[PBBR].get_ip6_address(config.ADDRESS_TYPE.DUA))
134
135        # Bring up SBBR
136        self.nodes[SBBR].start()
137        self.simulator.go(5)
138        self.assertEqual('router', self.nodes[SBBR].get_state())
139
140        self.nodes[SBBR].set_backbone_router(reg_delay=REREG_DELAY, mlr_timeout=MLR_TIMEOUT)
141        self.nodes[SBBR].enable_backbone_router()
142        self.simulator.go(5)
143        self.assertFalse(self.nodes[SBBR].is_primary_backbone_router)
144        self.assertIsNotNone(self.nodes[SBBR].get_ip6_address(config.ADDRESS_TYPE.DUA))
145
146        # Bring up ROUTER1
147        self.nodes[ROUTER1].start()
148        self.simulator.go(5)
149        self.assertEqual('router', self.nodes[ROUTER1].get_state())
150        self.simulator.go(config.DUA_DAD_DELAY + WAIT_REDUNDANCE)
151        self.assertIsNotNone(self.nodes[ROUTER1].get_ip6_address(config.ADDRESS_TYPE.DUA))
152
153        # Bring up PBBR2
154        self.nodes[PBBR2].start()
155        self.simulator.go(config.LEADER_STARTUP_DELAY)
156        self.assertEqual('leader', self.nodes[PBBR2].get_state())
157        self.wait_node_state(PBBR2, 'leader', 10)
158
159        self.nodes[PBBR2].set_backbone_router(reg_delay=REREG_DELAY, mlr_timeout=MLR_TIMEOUT)
160        self.nodes[PBBR2].enable_backbone_router()
161        self.nodes[PBBR2].set_domain_prefix(config.DOMAIN_PREFIX, 'prosD')
162        self.simulator.go(config.DUA_DAD_DELAY)
163        self.assertTrue(self.nodes[PBBR2].is_primary_backbone_router)
164        self.assertIsNotNone(self.nodes[PBBR2].get_ip6_address(config.ADDRESS_TYPE.DUA))
165
166        # Bring up ROUTER2
167        self.nodes[ROUTER2].start()
168        self.simulator.go(5)
169        self.assertEqual('router', self.nodes[ROUTER2].get_state())
170        self.simulator.go(config.DUA_DAD_DELAY + WAIT_REDUNDANCE)
171        self.assertIsNotNone(self.nodes[ROUTER2].get_ip6_address(config.ADDRESS_TYPE.DUA))
172
173    def test(self):
174        self._bootstrap()
175
176        self.collect_ipaddrs()
177        self.collect_rloc16s()
178        self.collect_rlocs()
179
180        self._test_extend_backbone_query()
181
182        self._test_dad_duplicate()
183
184        self._send_fake_pro_bb_ntf()
185
186    def _test_extend_backbone_query(self):
187        self.nodes[ROUTER2].ping(self.nodes[ROUTER1].get_ip6_address(config.ADDRESS_TYPE.DUA))
188
189    def _test_dad_duplicate(self):
190        self.nodes[ROUTER2].set_dua_iid(DUPLICATE_IID)
191        self.simulator.go(config.DUA_DAD_DELAY + WAIT_REDUNDANCE)
192        self.nodes[ROUTER1].set_dua_iid(DUPLICATE_IID)
193        self.simulator.go(config.DUA_DAD_DELAY + WAIT_REDUNDANCE)
194
195        # now Router1 should have generated new DUA
196        self.assertNotEqual(DUPLICATE_DUA, self.nodes[ROUTER1].get_ip6_address(config.ADDRESS_TYPE.DUA))
197
198    def _send_fake_pro_bb_ntf(self):
199        router1_dua = self.nodes[ROUTER1].get_ip6_address(config.ADDRESS_TYPE.DUA)
200        self.nodes[PBBR2].send_proactive_backbone_notification(router1_dua, DUPLICATE_IID, 0)
201        self.simulator.go(config.DUA_DAD_DELAY + WAIT_REDUNDANCE)
202
203    def verify(self, pv: PacketVerifier):
204        pkts = pv.pkts
205        pv.add_common_vars()
206        pv.summary.show()
207
208        PBBR = pv.vars['PBBR']
209        ROUTER1 = pv.vars['ROUTER1']
210        ROUTER1_DUA = pv.vars['ROUTER1_DUA']
211        ROUTER2 = pv.vars['ROUTER2']
212        ROUTER2_DUA = pv.vars['ROUTER2_DUA']
213        PBBR_ETH = pv.vars['PBBR_ETH']
214        SBBR_ETH = pv.vars['SBBR_ETH']
215        PBBR2 = pv.vars['PBBR2']
216        PBBR2_ETH = pv.vars['PBBR2_ETH']
217
218        MM = pv.vars['MM_PORT']
219        BB = pv.vars['BB_PORT']
220
221        # Verify the whole DAD process for ROUTER1
222        pv.verify_dua_registration(ROUTER1, ROUTER1_DUA, pbbr_eth=PBBR_ETH, pbbr_src64=PBBR, sbbr_eth=SBBR_ETH)
223
224        # Verify the whole DAD process for ROUTER2
225        pv.verify_dua_registration(ROUTER2, ROUTER2_DUA, pbbr_eth=PBBR2_ETH, pbbr_src64=PBBR2)
226
227        ###############################################################################################################
228        # Now we verify Extending ADDR.qry to BB.qry works for the Ping Request from Router2 to Router1
229        ###############################################################################################################
230
231        # Router2 should send ADDR.qry in the Thread network for Router1's DUA
232        pkts.filter_wpan_src64(ROUTER2).filter_coap_request('/a/aq', port=MM).filter(
233            'thread_address.tlv.target_eid == {ROUTER1_DUA}', ROUTER1_DUA=ROUTER1_DUA).must_next()
234        # PBBR2 should extend ADDR.qry to BB.qry
235        pkts.filter_backbone_query(ROUTER1_DUA, eth_src=PBBR2_ETH, port=BB).must_next()
236        # SBBR should not answer with BB.ans
237        pkts.filter_backbone_query(ROUTER1_DUA, eth_src=SBBR_ETH, port=BB).must_not_next()
238        # PBBR1 should answer with BB.ans
239        pkts.filter_backbone_answer(ROUTER1_DUA, eth_src=PBBR_ETH, port=BB).must_next()
240        # PBBR2 should send ADDR.ntf to Router2
241        pkts.filter_wpan_src64(PBBR2).filter_coap_request('/a/an', port=MM).filter(
242            'thread_address.tlv.target_eid == {ROUTER1_DUA}', ROUTER1_DUA=ROUTER1_DUA).must_next()
243
244        # Now, Router2 should send the Ping Request to PBBR2
245        pkts.filter_wpan_src64(ROUTER2).filter_ping_request().filter_ipv6_dst(ROUTER1_DUA).must_next()
246
247        ###############################################################################################################
248        # Now we start to verify that DAD duplicate is handled correctly
249        ###############################################################################################################
250
251        # PBBR should send /b/bq for DUPLICATE_DUA
252        pkts.filter_backbone_query(DUPLICATE_DUA, eth_src=PBBR_ETH, port=BB).must_next()
253
254        # PBBR2 should send /b/ba for DUPLICATE_DUA
255        ba = pkts.filter_backbone_answer(DUPLICATE_DUA, eth_src=PBBR2_ETH, port=BB).must_next()
256        ba.must_verify("""
257            set(thread_bl.tlv.type) == {tlvs}
258            and thread_bl.tlv.last_transaction_time > 0
259            and thread_meshcop.tlv.net_name == [{NET_NAME}]
260        """,
261                       tlvs={0, 3, 6, 12},
262                       **pv.vars)
263
264        # PBBR should NOT send /b/bq for DUPLICATE_DUA anymore
265        pkts.filter_backbone_query(DUPLICATE_DUA, eth_src=PBBR_ETH, port=BB).must_not_next()
266
267        # PBBR should send ADDR_ERR.ntf to Router1
268        ROUTER1_RLOC = pv.vars['ROUTER1_RLOC']
269        pkts.filter_wpan_src64(PBBR).filter_ipv6_dst(ROUTER1_RLOC).filter_coap_request('/a/ae', port=MM).must_next() \
270            .must_verify("""
271                thread_address.tlv.target_eid == {DUPLICATE_DUA}
272                and thread_address.tlv.ml_eid == {ml_eid}
273            """, DUPLICATE_DUA=DUPLICATE_DUA, ml_eid=ba.thread_bl.tlv.ml_eid)
274
275        # Router1 should generate new DUA and register again
276        # Router1 should generate and register a new DUA
277        with pkts.save_index():
278            new_dr = pkts.filter_wpan_src64(ROUTER1).filter_coap_request('/n/dr', port=MM).must_next().must_verify(
279                """
280                thread_nm.tlv.target_eid != {DUPLICATE_DUA}
281            """,
282                DUPLICATE_DUA=DUPLICATE_DUA)
283            router1_dua2 = new_dr.thread_nm.tlv.target_eid
284
285        # Verify the DAD process for the new DUA
286        pv.verify_dua_registration(ROUTER1, router1_dua2, pbbr_eth=PBBR_ETH, pbbr_src64=PBBR, sbbr_eth=SBBR_ETH)
287
288        ###############################################################################################################
289        # Now we start to verify that PRO_BB.ntf is handled correctly
290        ###############################################################################################################
291
292        # Scripted: PBBR2 should send PRO_BB.ntf for Router1's DUA
293        pkts.filter_eth_src(PBBR2_ETH).filter_backbone_answer(router1_dua2,
294                                                              port=BB,
295                                                              confirmable=False,
296                                                              mliid=DUPLICATE_IID).must_next()
297
298        # PBBR should broadcaset /a/ae to the Thread network
299        pkts.filter_wpan_src64(PBBR).filter_coap_request('/a/ae', port=MM,
300                                                         confirmable=False).filter_RLARMA().must_next()
301
302        # Router1 should generate and register a new DUA
303        with pkts.save_index():
304            new_dr = pkts.filter_wpan_src64(ROUTER1).filter_coap_request('/n/dr', port=MM).must_next()
305            router1_dua3 = new_dr.thread_nm.tlv.target_eid
306
307        # Verify the DAD process for the new DUA
308        pv.verify_dua_registration(ROUTER1, router1_dua3, pbbr_eth=PBBR_ETH, pbbr_src64=PBBR, sbbr_eth=SBBR_ETH)
309
310
311if __name__ == '__main__':
312    unittest.main()
313