1#!/usr/bin/env python3
2#
3#  Copyright (c) 2016, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29
30import copy
31import unittest
32
33import command
34import config
35import copy
36import ipv6
37import thread_cert
38from pktverify.consts import WIRESHARK_OVERRIDE_PREFS, ADDR_QRY_URI, ADDR_NTF_URI, NL_ML_EID_TLV, NL_RLOC16_TLV, NL_TARGET_EID_TLV
39from pktverify.packet_verifier import PacketVerifier
40from pktverify.bytes import Bytes
41
42LEADER = 1
43ROUTER1 = 2
44DUT_ROUTER2 = 3
45ROUTER3 = 4
46SED1 = 5
47PREFIX_1 = '2001::/64'
48GUA_1_START = '2001'
49PREFIX_2 = '2002::/64'
50
51# Test Purpose and Description:
52# -----------------------------
53# The purpose of this test case is to validate that the DUT is able to generate
54# Address Query and Address Notification messages properly.
55# The Leader is configured as a Border Router with DHCPv6 server for prefixes
56# 2001:: & 2002::
57#
58# Test Topology:
59# -------------
60# Router_1 - Leader
61#           /     \
62#     Router_3    Router_2(DUT)
63#                    |
64#                   SED
65#
66# DUT Types:
67# ----------
68#  Router
69
70
71class Cert_5_3_09_AddressQuery(thread_cert.TestCase):
72    USE_MESSAGE_FACTORY = False
73
74    TOPOLOGY = {
75        LEADER: {
76            'name': 'LEADER',
77            'mode': 'rdn',
78            'allowlist': [ROUTER1, DUT_ROUTER2, ROUTER3]
79        },
80        ROUTER1: {
81            'name': 'ROUTER_1',
82            'mode': 'rdn',
83            'allowlist': [LEADER]
84        },
85        DUT_ROUTER2: {
86            'name': 'ROUTER_2',
87            'mode': 'rdn',
88            'allowlist': [LEADER, SED1]
89        },
90        ROUTER3: {
91            'name': 'ROUTER_3',
92            'mode': 'rdn',
93            'allowlist': [LEADER]
94        },
95        SED1: {
96            'name': 'SED',
97            'is_mtd': True,
98            'mode': '-',
99            'timeout': config.DEFAULT_CHILD_TIMEOUT,
100            'allowlist': [DUT_ROUTER2]
101        },
102    }
103
104    # override wireshark preferences with case needed parameters
105    CASE_WIRESHARK_PREFS = copy.deepcopy(WIRESHARK_OVERRIDE_PREFS)
106    CASE_WIRESHARK_PREFS['6lowpan.context1'] = PREFIX_1
107    CASE_WIRESHARK_PREFS['6lowpan.context2'] = PREFIX_2
108
109    def test(self):
110        # 1 & 2 ALL: Build and verify the topology
111        self.nodes[LEADER].start()
112        self.simulator.go(config.LEADER_STARTUP_DELAY)
113        self.assertEqual(self.nodes[LEADER].get_state(), 'leader')
114
115        # Configure the LEADER to be a DHCPv6 Border Router for prefixes
116        self.nodes[LEADER].add_prefix(PREFIX_1, 'pdros')
117        self.nodes[LEADER].add_prefix(PREFIX_2, 'pdro')
118        self.nodes[LEADER].register_netdata()
119
120        self.nodes[ROUTER1].start()
121        self.simulator.go(config.ROUTER_STARTUP_DELAY)
122        self.assertEqual(self.nodes[ROUTER1].get_state(), 'router')
123
124        self.nodes[DUT_ROUTER2].start()
125        self.simulator.go(config.ROUTER_STARTUP_DELAY)
126        self.assertEqual(self.nodes[DUT_ROUTER2].get_state(), 'router')
127
128        self.nodes[ROUTER3].start()
129        self.simulator.go(config.ROUTER_STARTUP_DELAY)
130        self.assertEqual(self.nodes[ROUTER3].get_state(), 'router')
131
132        self.nodes[SED1].start()
133        self.simulator.go(5)
134        self.assertEqual(self.nodes[SED1].get_state(), 'child')
135
136        self.collect_rlocs()
137        self.collect_rloc16s()
138        self.collect_ipaddrs()
139
140        # 3 SED1: The SED1 sends an ICMPv6 Echo Request to ROUTER3 using GUA
141        # PREFIX_1 address
142        router3_addr = self.nodes[ROUTER3].get_addr(PREFIX_1)
143        self.assertTrue(router3_addr is not None)
144        self.assertTrue(self.nodes[SED1].ping(router3_addr))
145
146        # 4 ROUTER1: ROUTER1 sends an ICMPv6 Echo Request to the SED1 using GUA
147        # PREFIX_1 address
148        sed1_addr = self.nodes[SED1].get_addr(PREFIX_1)
149        self.assertTrue(sed1_addr is not None)
150        self.assertTrue(self.nodes[ROUTER1].ping(sed1_addr))
151        self.simulator.go(1)
152
153        # 5 SED1: SED1 sends an ICMPv6 Echo Request to the ROUTER3 using GUA
154        # PREFIX_1 address
155        self.assertTrue(self.nodes[SED1].ping(router3_addr))
156        self.simulator.go(1)
157
158        # 6 DUT_ROUTER2: Power off ROUTER3 and wait 580s to allow LEADER to
159        # expire its Router ID
160        self.nodes[ROUTER3].stop()
161        self.simulator.go(580)
162
163        # The SED1 sends an ICMPv6 Echo Request to ROUTER3 GUA PREFIX_1 address
164        self.assertFalse(self.nodes[SED1].ping(router3_addr))
165        self.simulator.go(1)
166
167        # 7 SED1: Power off SED1 and wait to allow DUT_ROUTER2 to timeout the
168        # child
169        self.nodes[SED1].stop()
170        self.simulator.go(3)
171        self.simulator.go(config.DEFAULT_CHILD_TIMEOUT)
172
173        # ROUTER1 sends two ICMPv6 Echo Requests to SED1 GUA PREFIX_1 address
174        self.assertFalse(self.nodes[ROUTER1].ping(sed1_addr))
175        self.assertFalse(self.nodes[ROUTER1].ping(sed1_addr))
176
177    def verify(self, pv):
178        pkts = pv.pkts
179        pv.summary.show()
180
181        LEADER = pv.vars['LEADER']
182        ROUTER_1 = pv.vars['ROUTER_1']
183        ROUTER_1_RLOC = pv.vars['ROUTER_1_RLOC']
184        ROUTER_2 = pv.vars['ROUTER_2']
185        ROUTER_2_RLOC = pv.vars['ROUTER_2_RLOC']
186        ROUTER_2_RLOC16 = pv.vars['ROUTER_2_RLOC16']
187        ROUTER_3 = pv.vars['ROUTER_3']
188        SED = pv.vars['SED']
189        SED_RLOC16 = pv.vars['SED_RLOC16']
190        MM = pv.vars['MM_PORT']
191        GUA1 = {}
192
193        for node in ('ROUTER_1', 'ROUTER_3', 'SED'):
194            for addr in pv.vars['%s_IPADDRS' % node]:
195                if addr.startswith(Bytes(GUA_1_START)):
196                    GUA1[node] = addr
197
198        # Step 1: Build the topology as described
199
200        for i in range(1, 4):
201            pv.verify_attached('ROUTER_%d' % i, 'LEADER')
202        pv.verify_attached('SED', 'ROUTER_2', 'MTD')
203
204        # Step 2: SED sends an ICMPv6 Echo Request to Router_3 using GUA 2001::
205        #         address
206        #         The DUT MUST generate an Address Query Request on SED’s behalf
207        #         to find each node’s RLOC.
208        #         The Address Query Request MUST be sent to the Realm-Local
209        #         All-Routers address (FF03::2)
210        #             CoAP URI-Path
211        #                 - NON POST coap://<FF03::2>
212        #             CoAP Payload
213        #                 - Target EID TLV
214        #         The DUT MUST receive and process the incoming Address Query
215        #         Response and forward the ICMPv6 Echo Reply packet to SED
216
217        _pkt = pkts.filter_ping_request().\
218            filter_wpan_src64(SED).\
219            filter_ipv6_dst(GUA1['ROUTER_3']).\
220            must_next()
221        pkts.filter_wpan_src64(ROUTER_2).\
222            filter_RLARMA().\
223            filter_coap_request(ADDR_QRY_URI, port=MM).\
224            filter(lambda p: p.thread_address.tlv.target_eid == GUA1['ROUTER_3']).\
225            must_next()
226        pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\
227            filter_wpan_src64(ROUTER_3).\
228            filter_ipv6_dst(GUA1['SED']).\
229            must_next()
230        pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\
231            filter_wpan_src64(ROUTER_2).\
232            filter_wpan_dst16(SED_RLOC16).\
233            must_next()
234
235        # Step 3: Router_1 sends an ICMPv6 Echo Request to SED using GUA 2001::
236        #         address
237        #         The DUT MUST respond to the Address Query Request with a properly
238        #         formatted Address Notification Message:
239        #             CoAP URI-Path
240        #                 - CON POST coap://[<Address Query Source>]:MM/a/an
241        #             CoAP Payload
242        #                 - ML-EID TLV
243        #                 - RLOC16 TLV
244        #                 - Target EID TLV
245        #         The IPv6 Source address MUST be the RLOC of the originator
246        #         The IPv6 Destination address MUST be the RLOC of the destination
247
248        pkts.filter_wpan_src64(ROUTER_1).\
249            filter_RLARMA().\
250            filter_coap_request(ADDR_QRY_URI, port=MM).\
251            filter(lambda p: p.thread_address.tlv.target_eid == GUA1['SED']).\
252            must_next()
253        pkts.filter_ipv6_src_dst(ROUTER_2_RLOC, ROUTER_1_RLOC).\
254            filter_coap_request(ADDR_NTF_URI, port=MM).\
255            filter(lambda p: {
256                                 NL_ML_EID_TLV,
257                                 NL_RLOC16_TLV,
258                                 NL_TARGET_EID_TLV
259                             } <= set(p.coap.tlv.type) and\
260                             p.thread_address.tlv.target_eid == GUA1['SED'] and\
261                             p.thread_address.tlv.rloc16 == ROUTER_2_RLOC16
262                   ).\
263            must_next()
264        _pkt = pkts.filter_ping_request().\
265            filter_wpan_src64(ROUTER_1).\
266            filter_ipv6_dst(GUA1['SED']).\
267            must_next()
268        pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\
269            filter_wpan_src64(SED).\
270            filter_ipv6_dst(GUA1['ROUTER_1']).\
271            must_next()
272
273        # Step 5: SED sends an ICMPv6 Echo Request to Router_3 using GUA 2001::
274        #         address
275        #         The DUT MUST not send an Address Query as Router_3 address should
276        #         be cached.
277        #         The DUT MUST forward the ICMPv6 Echo Reply to SED
278
279        _pkt = pkts.filter_ping_request().\
280            filter_wpan_src64(SED).\
281            filter_ipv6_dst(GUA1['ROUTER_3']).\
282            must_next()
283        lstart = pkts.index
284        pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\
285            filter_wpan_src64(ROUTER_3).\
286            filter_ipv6_dst(GUA1['SED']).\
287            must_next()
288        pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\
289            filter_wpan_src64(ROUTER_2).\
290            filter_wpan_dst16(SED_RLOC16).\
291            must_next()
292        lend = pkts.index
293        pkts.range(lstart, lend).filter_wpan_src64(ROUTER_2).\
294            filter_RLARMA().\
295            filter_coap_request(ADDR_QRY_URI, port=MM).\
296            must_not_next()
297
298        # Step 6: SED sends an ICMPv6 Echo Request to Router_3 using GUA 2001::
299        #         address
300        #         The DUT MUST update its address cache and remove all entries
301        #         based on Router_3’s Router ID.
302        #         The DUT MUST send an Address Query to discover Router_3’s RLOC address.
303        pkts.filter_ping_request().\
304            filter_wpan_src64(SED).\
305            filter_ipv6_dst(GUA1['ROUTER_3']).\
306            must_next()
307        pkts.filter_wpan_src64(ROUTER_2).\
308            filter_RLARMA().\
309            filter_coap_request(ADDR_QRY_URI, port=MM).\
310            filter(lambda p: p.thread_address.tlv.target_eid == GUA1['ROUTER_3']).\
311            must_next()
312
313        # Step 7: Router_1 sends two ICMPv6 Echo Requests to SED using GUA 2001::
314        #         address
315        #         The DUT MUST NOT respond with an Address Notification message
316
317        pkts.filter_wpan_src64(ROUTER_2).\
318            filter_ipv6_dst(ROUTER_1_RLOC).\
319            filter_coap_request(ADDR_NTF_URI, port=MM).\
320            must_not_next()
321        pkts.filter_ping_request().\
322            filter_wpan_src64(ROUTER_1).\
323            filter_ipv6_dst(GUA1['SED']).\
324            must_next()
325
326
327if __name__ == '__main__':
328    unittest.main()
329