1#!/usr/bin/env python3
2#
3#  Copyright (c) 2016, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29
30import unittest
31
32import config
33import thread_cert
34from pktverify.consts import MLE_CHILD_ID_RESPONSE, MGMT_ACTIVE_SET_URI, MGMT_ACTIVE_GET_URI
35from pktverify.packet_verifier import PacketVerifier
36from pktverify.bytes import Bytes
37
38COMMISSIONER = 1
39LEADER = 2
40
41
42class Cert_9_2_04_ActiveDataset(thread_cert.TestCase):
43    SUPPORT_NCP = False
44
45    TOPOLOGY = {
46        COMMISSIONER: {
47            'name': 'COMMISSIONER',
48            'active_dataset': {
49                'timestamp': 10,
50                'network_key': '00112233445566778899aabbccddeeff'
51            },
52            'mode': 'rdn',
53            'allowlist': [LEADER]
54        },
55        LEADER: {
56            'name': 'LEADER',
57            'active_dataset': {
58                'timestamp': 10,
59                'network_key': '00112233445566778899aabbccddeeff'
60            },
61            'mode': 'rdn',
62            'allowlist': [COMMISSIONER]
63        },
64    }
65
66    def test(self):
67        self.nodes[LEADER].start()
68        self.simulator.go(config.LEADER_STARTUP_DELAY)
69        self.assertEqual(self.nodes[LEADER].get_state(), 'leader')
70
71        self.nodes[COMMISSIONER].start()
72        self.simulator.go(config.ROUTER_STARTUP_DELAY)
73        self.assertEqual(self.nodes[COMMISSIONER].get_state(), 'router')
74
75        self.nodes[COMMISSIONER].commissioner_start()
76        self.simulator.go(5)
77
78        self.collect_rlocs()
79        self.collect_leader_aloc(LEADER)
80        # Step 2
81        self.nodes[COMMISSIONER].send_mgmt_active_set(
82            active_timestamp=101,
83            channel_mask=0x7fff800,
84            extended_panid='000db70000000000',
85            network_name='GRL',
86        )
87        self.simulator.go(5)
88        self.assertEqual(self.nodes[LEADER].get_network_name(), 'GRL')
89        self.nodes[COMMISSIONER].send_mgmt_active_get()
90        self.simulator.go(5)
91
92        # Step 6
93        # Attempt to set Channel TLV
94        self.nodes[COMMISSIONER].send_mgmt_active_set(
95            active_timestamp=102,
96            channel=18,
97            channel_mask=0x7fff800,
98            extended_panid='000db70000000001',
99            network_name='threadcert',
100        )
101        self.simulator.go(5)
102        self.assertEqual(self.nodes[LEADER].get_network_name(), 'GRL')
103
104        # Step 8
105        # Attempt to set Mesh Local Prefix TLV
106        self.nodes[COMMISSIONER].send_mgmt_active_set(
107            active_timestamp=103,
108            channel_mask=0x7fff800,
109            extended_panid='000db70000000000',
110            mesh_local='fd00:0db7::',
111            network_name='UL',
112        )
113        self.simulator.go(5)
114        self.assertEqual(self.nodes[LEADER].get_network_name(), 'GRL')
115
116        # Step 10
117        # Attempt to set Network Key TLV
118        self.nodes[COMMISSIONER].send_mgmt_active_set(
119            active_timestamp=104,
120            channel_mask=0x7fff800,
121            extended_panid='000db70000000000',
122            network_key='ffeeddccbbaa99887766554433221100',
123            mesh_local='fd00:0db7::',
124            network_name='GRL',
125        )
126        self.simulator.go(5)
127        self.assertEqual(self.nodes[LEADER].get_network_name(), 'GRL')
128
129        # Step 12
130        # Attempt to set PAN ID TLV
131        self.nodes[COMMISSIONER].send_mgmt_active_set(
132            active_timestamp=105,
133            channel_mask=0x7fff800,
134            extended_panid='000db70000000000',
135            network_key='00112233445566778899aabbccddeeff',
136            mesh_local='fd00:0db7::',
137            network_name='UL',
138            panid=0xafce,
139        )
140        self.simulator.go(5)
141        self.assertEqual(self.nodes[LEADER].get_network_name(), 'GRL')
142
143        # Step 14
144        # Invalid Commissioner Session ID
145        self.nodes[COMMISSIONER].send_mgmt_active_set(
146            active_timestamp=106,
147            channel_mask=0x7fff800,
148            extended_panid='000db70000000000',
149            network_name='UL',
150            binary='0b02abcd',
151        )
152        self.simulator.go(5)
153        self.assertEqual(self.nodes[LEADER].get_network_name(), 'GRL')
154
155        # Step 16
156        # Old Active Timestamp
157        self.nodes[COMMISSIONER].send_mgmt_active_set(
158            active_timestamp=101,
159            channel_mask=0x01fff800,
160            extended_panid='000db70000000000',
161            network_name='UL',
162        )
163        self.simulator.go(5)
164        self.assertEqual(self.nodes[LEADER].get_network_name(), 'GRL')
165
166        # Step 18
167        # Unexpected Steering Data TLV
168        self.nodes[COMMISSIONER].send_mgmt_active_set(
169            active_timestamp=107,
170            channel_mask=0x7fff800,
171            extended_panid='000db70000000000',
172            network_name='UL',
173            binary='0806113320440000',
174        )
175        self.simulator.go(5)
176        self.assertEqual(self.nodes[LEADER].get_network_name(), 'UL')
177
178        # Step 20
179        # Undefined TLV
180        self.nodes[COMMISSIONER].send_mgmt_active_set(
181            active_timestamp=108,
182            channel_mask=0x7fff800,
183            extended_panid='000db70000000000',
184            network_name='GRL',
185            binary='8202aa55',
186        )
187        self.simulator.go(5)
188        self.assertEqual(self.nodes[LEADER].get_network_name(), 'GRL')
189
190        ipaddrs = self.nodes[COMMISSIONER].get_addrs()
191        for ipaddr in ipaddrs:
192            self.assertTrue(self.nodes[LEADER].ping(ipaddr))
193
194    def verify(self, pv):
195        pkts = pv.pkts
196        pv.summary.show()
197
198        LEADER = pv.vars['LEADER']
199        LEADER_RLOC = pv.vars['LEADER_RLOC']
200        LEADER_ALOC = pv.vars['LEADER_ALOC']
201        COMMISSIONER = pv.vars['COMMISSIONER']
202        COMMISSIONER_RLOC = pv.vars['COMMISSIONER_RLOC']
203
204        # Step 1: Ensure the topology is formed correctly
205        pkts.filter_wpan_src64(LEADER).filter_wpan_dst64(COMMISSIONER).filter_mle_cmd(
206            MLE_CHILD_ID_RESPONSE).must_next()
207
208        # Step 2: Commissioner sends MGMT_ACTIVE_SET.req to Leader RLOC or Anycast Locator
209        pkts.filter_wpan_src64(COMMISSIONER).filter_ipv6_2dsts(LEADER_RLOC, LEADER_ALOC).filter_coap_request(
210            MGMT_ACTIVE_SET_URI).filter(lambda p: p.thread_meshcop.tlv.xpan_id == '000db70000000000' and p.
211                                        thread_meshcop.tlv.net_name == ['GRL'] and p.thread_meshcop.tlv.chan_mask_mask
212                                        == '001fffe0' and p.thread_meshcop.tlv.active_tstamp == 101).must_next()
213
214        # Step 3: Leader MUST send MGMT_ACTIVE_SET.rsp to the Commissioner
215        pkts.filter_wpan_src64(LEADER).filter_ipv6_dst(COMMISSIONER_RLOC).filter_coap_ack(MGMT_ACTIVE_SET_URI).filter(
216            lambda p: p.thread_meshcop.tlv.state == 1).must_next()
217
218        # Step 4: Commissioner sends MGMT_ACTIVE_GET.req to Leader
219        pkts.filter_wpan_src64(COMMISSIONER).filter_ipv6_2dsts(
220            LEADER_RLOC, LEADER_ALOC).filter_coap_request(MGMT_ACTIVE_GET_URI).must_next()
221
222        # Step 5: The Leader MUST send MGMT_ACTIVE_GET.rsp to the Commissioner
223        pkts.filter_wpan_src64(LEADER).filter_ipv6_dst(COMMISSIONER_RLOC).filter_coap_ack(
224            MGMT_ACTIVE_GET_URI).filter(lambda p: p.thread_meshcop.tlv.active_tstamp == 101 and p.thread_meshcop.tlv.
225                                        xpan_id == '000db70000000000' and p.thread_meshcop.tlv.net_name == ['GRL'] and
226                                        p.thread_meshcop.tlv.chan_mask_mask == '001fffe0').must_next()
227
228        # Step 6: Commissioner sends MGMT_ACTIVE_SET.req to Leader RLOC or Anycast Locator
229        pkts.filter_wpan_src64(COMMISSIONER).filter_ipv6_2dsts(
230            LEADER_RLOC, LEADER_ALOC).filter_coap_request(MGMT_ACTIVE_SET_URI).filter(
231                lambda p: p.thread_meshcop.tlv.active_tstamp == 102 and p.thread_meshcop.tlv.xpan_id ==
232                '000db70000000001' and p.thread_meshcop.tlv.net_name == ['threadcert'] and p.thread_meshcop.tlv.
233                chan_mask_mask == '001fffe0' and p.thread_meshcop.tlv.channel == [18]).must_next()
234
235        # Step 7: Leader MUST send MGMT_ACTIVE_SET.rsp to the Commissioner
236        pkts.filter_wpan_src64(LEADER).filter_ipv6_dst(COMMISSIONER_RLOC).filter_coap_ack(MGMT_ACTIVE_SET_URI).filter(
237            lambda p: p.thread_meshcop.tlv.state == -1).must_next()
238
239        # Step 8: Commissioner sends MGMT_ACTIVE_SET.req to Leader RLOC or Leader Anycast Locator
240        pkts.filter_wpan_src64(COMMISSIONER).filter_ipv6_2dsts(
241            LEADER_RLOC, LEADER_ALOC).filter_coap_request(MGMT_ACTIVE_SET_URI).filter(
242                lambda p: p.thread_meshcop.tlv.active_tstamp == 103 and p.thread_meshcop.tlv.xpan_id ==
243                '000db70000000000' and p.thread_meshcop.tlv.net_name == ['UL'] and p.thread_meshcop.tlv.chan_mask_mask
244                == '001fffe0' and p.thread_meshcop.tlv.ml_prefix == 'fd000db700000000').must_next()
245
246        # Step 9: Leader MUST send MGMT_ACTIVE_SET.rsp to the Commissioner
247        pkts.filter_wpan_src64(LEADER).filter_ipv6_dst(COMMISSIONER_RLOC).filter_coap_ack(MGMT_ACTIVE_SET_URI).filter(
248            lambda p: p.thread_meshcop.tlv.state == -1).must_next()
249
250        # Step 10: Commissioner sends MGMT_ACTIVE_SET.req to Leader RLOC or Leader Anycast Locator
251        pkts.filter_wpan_src64(COMMISSIONER).filter_ipv6_2dsts(
252            LEADER_RLOC, LEADER_ALOC).filter_coap_request(MGMT_ACTIVE_SET_URI).filter(
253                lambda p: p.thread_meshcop.tlv.active_tstamp == 104 and p.thread_meshcop.tlv.xpan_id ==
254                '000db70000000000' and p.thread_meshcop.tlv.net_name == ['GRL'] and p.thread_meshcop.tlv.master_key ==
255                'ffeeddccbbaa99887766554433221100' and p.thread_meshcop.tlv.chan_mask_mask == '001fffe0' and p.
256                thread_meshcop.tlv.ml_prefix == 'fd000db700000000').must_next()
257
258        # Step 11: Leader MUST send MGMT_ACTIVE_SET.rsp to the Commissioner
259        pkts.filter_wpan_src64(LEADER).filter_ipv6_dst(COMMISSIONER_RLOC).filter_coap_ack(MGMT_ACTIVE_SET_URI).filter(
260            lambda p: p.thread_meshcop.tlv.state == -1).must_next()
261
262        # Step 12: Commissioner sends MGMT_ACTIVE_SET.req to Leader RLOC or Leader Anycast Locator
263        pkts.filter_wpan_src64(COMMISSIONER).filter_ipv6_2dsts(
264            LEADER_RLOC, LEADER_ALOC).filter_coap_request(MGMT_ACTIVE_SET_URI).filter(
265                lambda p: p.thread_meshcop.tlv.active_tstamp == 105 and p.thread_meshcop.tlv.xpan_id ==
266                '000db70000000000' and p.thread_meshcop.tlv.net_name == ['UL'] and p.thread_meshcop.tlv.master_key ==
267                '00112233445566778899aabbccddeeff' and p.thread_meshcop.tlv.pan_id == [0xafce] and p.thread_meshcop.tlv
268                .chan_mask_mask == '001fffe0' and p.thread_meshcop.tlv.ml_prefix == 'fd000db700000000').must_next()
269
270        # Step 13: Leader MUST send MGMT_ACTIVE_SET.rsp to the Commissioner
271        pkts.filter_wpan_src64(LEADER).filter_ipv6_dst(COMMISSIONER_RLOC).filter_coap_ack(MGMT_ACTIVE_SET_URI).filter(
272            lambda p: p.thread_meshcop.tlv.state == -1).must_next()
273
274        # Step 14: Commissioner sends MGMT_ACTIVE_SET.req to Leader RLOC or Leader Anycast Locator
275        pkts.filter_wpan_src64(COMMISSIONER).filter_ipv6_2dsts(
276            LEADER_RLOC, LEADER_ALOC).filter_coap_request(MGMT_ACTIVE_SET_URI).filter(
277                lambda p: p.thread_meshcop.tlv.active_tstamp == 106 and p.thread_meshcop.tlv.xpan_id ==
278                '000db70000000000' and p.thread_meshcop.tlv.net_name == ['UL'] and p.thread_meshcop.tlv.
279                commissioner_sess_id == 0xabcd and p.thread_meshcop.tlv.chan_mask_mask == '001fffe0').must_next()
280
281        # Step 15: Leader MUST send MGMT_ACTIVE_SET.rsp to the Commissioner
282        pkts.filter_wpan_src64(LEADER).filter_ipv6_dst(COMMISSIONER_RLOC).filter_coap_ack(MGMT_ACTIVE_SET_URI).filter(
283            lambda p: p.thread_meshcop.tlv.state == -1).must_next()
284
285        # Step 16: Commissioner sends MGMT_ACTIVE_SET.req to Leader RLOC or Leader Anycast Locator
286        pkts.filter_wpan_src64(COMMISSIONER).filter_ipv6_2dsts(LEADER_RLOC, LEADER_ALOC).filter_coap_request(
287            MGMT_ACTIVE_SET_URI).filter(lambda p: p.thread_meshcop.tlv.active_tstamp == 101 and p.thread_meshcop.tlv.
288                                        xpan_id == '000db70000000000' and p.thread_meshcop.tlv.net_name == ['UL'] and p
289                                        .thread_meshcop.tlv.chan_mask_mask == '001fff80').must_next()
290
291        # Step 17: Leader MUST send MGMT_ACTIVE_SET.rsp to the Commissioner
292        pkts.filter_wpan_src64(LEADER).filter_ipv6_dst(COMMISSIONER_RLOC).filter_coap_ack(MGMT_ACTIVE_SET_URI).filter(
293            lambda p: p.thread_meshcop.tlv.state == -1).must_next()
294
295        # Step 18: Commissioner sends MGMT_ACTIVE_SET.req to Leader RLOC or Leader Anycast Locator
296        pkts.filter_wpan_src64(COMMISSIONER).filter_ipv6_2dsts(
297            LEADER_RLOC, LEADER_ALOC).filter_coap_request(MGMT_ACTIVE_SET_URI).filter(
298                lambda p: p.thread_meshcop.tlv.active_tstamp == 107 and p.thread_meshcop.tlv.xpan_id ==
299                '000db70000000000' and p.thread_meshcop.tlv.net_name == ['UL'] and p.thread_meshcop.tlv.steering_data
300                == Bytes('113320440000') and p.thread_meshcop.tlv.chan_mask_mask == '001fffe0').must_next()
301
302        # Step 19: Leader MUST send MGMT_ACTIVE_SET.rsp to the Commissioner
303        pkts.filter_wpan_src64(LEADER).filter_ipv6_dst(COMMISSIONER_RLOC).filter_coap_ack(MGMT_ACTIVE_SET_URI).filter(
304            lambda p: p.thread_meshcop.tlv.state == 1).must_next()
305
306        # Step 20: Commissioner sends MGMT_ACTIVE_SET.req to Leader RLOC or Leader Anycast Locator
307        pkts.filter_wpan_src64(COMMISSIONER).filter_ipv6_2dsts(
308            LEADER_RLOC, LEADER_ALOC).filter_coap_request(MGMT_ACTIVE_SET_URI).filter(
309                lambda p: p.thread_meshcop.tlv.active_tstamp == 108 and p.thread_meshcop.tlv.xpan_id ==
310                '000db70000000000' and p.thread_meshcop.tlv.net_name == ['GRL'] and p.thread_meshcop.tlv.unknown ==
311                'aa55' and p.thread_meshcop.tlv.chan_mask_mask == '001fffe0').must_next()
312
313        # Step 21: Leader MUST send MGMT_ACTIVE_SET.rsp to the Commissioner
314        pkts.filter_wpan_src64(LEADER).filter_ipv6_dst(COMMISSIONER_RLOC).filter_coap_ack(MGMT_ACTIVE_SET_URI).filter(
315            lambda p: p.thread_meshcop.tlv.state == 1).must_next()
316
317
318if __name__ == '__main__':
319    unittest.main()
320