1#!/usr/bin/env python3
2#
3#  Copyright (c) 2016, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29
30import unittest
31
32import command
33import config
34import copy
35import mle
36import thread_cert
37from pktverify.consts import WIRESHARK_OVERRIDE_PREFS, ADDR_ERR_URI, ADDR_QRY_URI, ADDR_NTF_URI, MLE_CHILD_ID_REQUEST, MLE_CHILD_ID_RESPONSE, REALM_LOCAL_ALL_ROUTERS_ADDRESS, NL_TARGET_EID_TLV, NL_RLOC16_TLV, NL_ML_EID_TLV
38from pktverify.packet_verifier import PacketVerifier
39
40DUT_LEADER = 1
41ROUTER1 = 2
42ROUTER2 = 3
43MED1 = 4
44SED1 = 5
45MED2 = 6
46
47MTDS = [MED1, SED1, MED2]
48ON_MESH_PREFIX = '2001::/64'
49IPV6_ADDR = '2001::1234'
50
51# Test Purpose and Description:
52# -----------------------------
53# The purpose of this test case is to validate the DUT’s ability to perform
54# duplicate address detection.
55#
56# Test Topology:
57# -------------
58# MED_2 - Leader(DUT)
59#         /     \
60#  Router_1   Router_2
61#      |          |
62#    MED_1       SED
63#
64# DUT Types:
65# ----------
66#  Leader
67
68
69class Cert_5_3_7_DuplicateAddress(thread_cert.TestCase):
70    USE_MESSAGE_FACTORY = False
71
72    TOPOLOGY = {
73        DUT_LEADER: {
74            'name': 'LEADER',
75            'mode': 'rdn',
76            'allowlist': [ROUTER1, ROUTER2, MED2]
77        },
78        ROUTER1: {
79            'name': 'ROUTER_1',
80            'mode': 'rdn',
81            'allowlist': [DUT_LEADER, MED1]
82        },
83        ROUTER2: {
84            'name': 'ROUTER_2',
85            'mode': 'rdn',
86            'allowlist': [DUT_LEADER, SED1]
87        },
88        MED1: {
89            'name': 'MED_1',
90            'is_mtd': True,
91            'mode': 'rn',
92            'allowlist': [ROUTER1]
93        },
94        SED1: {
95            'name': 'SED',
96            'is_mtd': True,
97            'mode': '-',
98            'allowlist': [ROUTER2]
99        },
100        MED2: {
101            'name': 'MED_2',
102            'is_mtd': True,
103            'mode': 'rn',
104            'allowlist': [DUT_LEADER]
105        },
106    }
107
108    # override wireshark preferences with case needed parameters
109    CASE_WIRESHARK_PREFS = copy.deepcopy(WIRESHARK_OVERRIDE_PREFS)
110    CASE_WIRESHARK_PREFS['6lowpan.context1'] = ON_MESH_PREFIX
111
112    def test(self):
113        self.nodes[DUT_LEADER].start()
114        self.simulator.go(5)
115        self.assertEqual(self.nodes[DUT_LEADER].get_state(), 'leader')
116
117        for i in range(ROUTER1, MED2 + 1):
118            self.nodes[i].start()
119            self.simulator.go(5)
120
121        for i in [ROUTER1, ROUTER2]:
122            self.assertEqual(self.nodes[i].get_state(), 'router')
123
124        for i in MTDS:
125            self.assertEqual(self.nodes[i].get_state(), 'child')
126
127        self.collect_ipaddrs()
128        self.collect_rlocs()
129
130        self.nodes[ROUTER2].add_prefix(ON_MESH_PREFIX, 'paros')
131        self.nodes[ROUTER2].register_netdata()
132
133        self.nodes[MED1].add_ipaddr(IPV6_ADDR)
134        self.nodes[SED1].add_ipaddr(IPV6_ADDR)
135        self.simulator.go(5)
136
137        self.nodes[MED2].ping(IPV6_ADDR)
138        self.simulator.go(5)
139
140    def verify(self, pv):
141        pkts = pv.pkts
142        pv.summary.show()
143
144        LEADER = pv.vars['LEADER']
145        LEADER_RLOC = pv.vars['LEADER_RLOC']
146        ROUTER_1 = pv.vars['ROUTER_1']
147        ROUTER_2 = pv.vars['ROUTER_2']
148        MED_1 = pv.vars['MED_1']
149        MED_2 = pv.vars['MED_2']
150        SED = pv.vars['SED']
151        MM = pv.vars['MM_PORT']
152
153        # Step 1: Ensure topology is formed correctly
154        for i in (1, 2):
155            pv.verify_attached('ROUTER_%d' % i, 'LEADER')
156        pv.verify_attached('MED_1', 'ROUTER_1', 'MTD')
157        pv.verify_attached('SED', 'ROUTER_2', 'MTD')
158        pv.verify_attached('MED_2', 'LEADER', 'MTD')
159
160        # Step 5: MED_2 sends ICMPv6 Echo Request to address configured on MED_1
161        #         and SED_1 with Prefix 2001::
162        #         The DUT MUST multicast an Address Query message to the Realm-Local
163        #         All Routers address (FF03::2):
164        #             CoAP URI-Path
165        #                 - NON POST coap://<FF03::2>
166        #             CoAP Payload
167        #                 - Target EID TLV
168
169        pkts.filter_ping_request().\
170            filter_wpan_src64(MED_2). \
171            filter_ipv6_dst(IPV6_ADDR). \
172            must_next()
173        pkts.filter_wpan_src64(LEADER).\
174            filter_RLARMA().\
175            filter_coap_request(ADDR_QRY_URI, port=MM).\
176            filter(lambda p: p.thread_address.tlv.target_eid == IPV6_ADDR).\
177            must_next()
178
179        # Step 6: Router_1 & Router_2 respond with Address Notification message
180        #         with matching Target TLVs
181
182        for i in (1, 2):
183            with pkts.save_index():
184                pkts.filter_wpan_src64(pv.vars['ROUTER_%d' %i]).\
185                    filter_ipv6_dst(LEADER_RLOC).\
186                    filter_coap_request(ADDR_NTF_URI, port=MM).\
187                    filter(lambda p: {
188                                      NL_ML_EID_TLV,
189                                      NL_RLOC16_TLV,
190                                      NL_TARGET_EID_TLV
191                                      } <= set(p.coap.tlv.type) and\
192                           p.thread_address.tlv.target_eid == IPV6_ADDR
193                           ).\
194                   must_next()
195
196        # Step 7: The DUT MUST multicast an Address Error Notification to the Realm-Local
197        #         All Routers address (FF03::2):
198        #             CoAP URI-Path
199        #                 - NON POST coap://[<peer address>]:MM/a/ae
200        #             CoAP Payload
201        #                 - Target EID TLV
202        #                 - ML-EID TLV
203        #
204        #         The IPv6 Source address MUST be the RLOC of the originator
205
206        pkts.filter_ipv6_src_dst(LEADER_RLOC, REALM_LOCAL_ALL_ROUTERS_ADDRESS).\
207            filter_coap_request(ADDR_ERR_URI, port=MM).\
208            filter(lambda p: {
209                              NL_ML_EID_TLV,
210                              NL_TARGET_EID_TLV
211                              } == set(p.coap.tlv.type) and\
212                   p.thread_address.tlv.target_eid == IPV6_ADDR
213                   ).\
214            must_next()
215
216
217if __name__ == '__main__':
218    unittest.main()
219