1#!/usr/bin/env python3
2#
3#  Copyright (c) 2019, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29
30import unittest
31
32import command
33import mesh_cop
34import thread_cert
35from pktverify.consts import MLE_DATA_RESPONSE, LEAD_PET_URI, LEAD_KA_URI, MGMT_COMMISSIONER_SET_URI, NM_CHANNEL_TLV, NM_COMMISSIONER_ID_TLV, NM_COMMISSIONER_SESSION_ID_TLV, NM_STATE_TLV, NM_STEERING_DATA_TLV, NM_BORDER_AGENT_LOCATOR_TLV, LEADER_DATA_TLV, NETWORK_DATA_TLV, ACTIVE_TIMESTAMP_TLV, SOURCE_ADDRESS_TLV, NWD_COMMISSIONING_DATA_TLV, MESHCOP_ACCEPT, MESHCOP_REJECT, LEADER_ALOC
36from pktverify.packet_verifier import PacketVerifier
37from pktverify.bytes import Bytes
38
39COMMISSIONER = 1
40LEADER = 2
41
42# Test Purpose and Description:
43# -----------------------------
44# The purpose of this test case is to verify Leader's and active Commissioner's behavior via
45# MGMT_COMMISSIONER_SET request and response
46#
47# Test Topology:
48# -------------
49# Commissioner
50#    |
51#  Leader
52#
53# DUT Types:
54# ----------
55#  Leader
56#  Commissioner
57
58
59class Cert_9_2_02_MGMTCommissionerSet(thread_cert.TestCase):
60    SUPPORT_NCP = False
61
62    TOPOLOGY = {
63        COMMISSIONER: {
64            'name': 'COMMISSIONER',
65            'mode': 'rdn',
66            'allowlist': [LEADER]
67        },
68        LEADER: {
69            'name': 'LEADER',
70            'mode': 'rdn',
71            'allowlist': [COMMISSIONER]
72        },
73    }
74
75    def test(self):
76        self.nodes[LEADER].start()
77        self.simulator.go(5)
78        self.assertEqual(self.nodes[LEADER].get_state(), 'leader')
79
80        self.nodes[COMMISSIONER].start()
81        self.simulator.go(5)
82        self.assertEqual(self.nodes[COMMISSIONER].get_state(), 'router')
83        self.simulator.get_messages_sent_by(LEADER)
84
85        self.collect_rlocs()
86        self.collect_rloc16s()
87
88        self.nodes[COMMISSIONER].commissioner_start()
89        self.simulator.go(3)
90
91        leader_messages = self.simulator.get_messages_sent_by(LEADER)
92        msg = leader_messages.next_coap_message('2.04', assert_enabled=True)
93        commissioner_session_id_tlv = command.get_sub_tlv(msg.coap.payload, mesh_cop.CommissionerSessionId)
94
95        steering_data_tlv = mesh_cop.SteeringData(bytes([0xff]))
96        self.nodes[COMMISSIONER].commissioner_mgmtset_with_tlvs([steering_data_tlv])
97        self.simulator.go(5)
98
99        self.nodes[COMMISSIONER].commissioner_mgmtset_with_tlvs([steering_data_tlv, commissioner_session_id_tlv])
100        self.simulator.go(5)
101
102        border_agent_locator_tlv = mesh_cop.BorderAgentLocator(0x0400)
103        self.nodes[COMMISSIONER].commissioner_mgmtset_with_tlvs(
104            [commissioner_session_id_tlv, border_agent_locator_tlv])
105        self.simulator.go(5)
106
107        self.nodes[COMMISSIONER].commissioner_mgmtset_with_tlvs([
108            steering_data_tlv,
109            commissioner_session_id_tlv,
110            border_agent_locator_tlv,
111        ])
112        self.simulator.go(5)
113
114        self.nodes[COMMISSIONER].commissioner_mgmtset_with_tlvs(
115            [mesh_cop.CommissionerSessionId(0xffff), steering_data_tlv])
116        self.simulator.go(5)
117
118        self.nodes[COMMISSIONER].commissioner_mgmtset_with_tlvs([
119            commissioner_session_id_tlv,
120            steering_data_tlv,
121            mesh_cop.Channel(0x0, 0x0),
122        ])
123        self.simulator.go(5)
124
125        leader_rloc = self.nodes[LEADER].get_rloc()
126        commissioner_rloc = self.nodes[COMMISSIONER].get_rloc()
127        self.assertTrue(self.nodes[COMMISSIONER].ping(leader_rloc))
128        self.simulator.go(1)
129        self.assertTrue(self.nodes[LEADER].ping(commissioner_rloc))
130
131    def verify(self, pv):
132        pkts = pv.pkts
133        pv.summary.show()
134
135        LEADER = pv.vars['LEADER']
136        LEADER_RLOC = pv.vars['LEADER_RLOC']
137        LEADER_RLOC16 = pv.vars['LEADER_RLOC16']
138        COMMISSIONER = pv.vars['COMMISSIONER']
139        COMMISSIONER_RLOC = pv.vars['COMMISSIONER_RLOC']
140
141        # Step 1: Ensure topology is formed correctly
142        pv.verify_attached('COMMISSIONER', 'LEADER')
143
144        # Step 2: Commissioner sends a Set Commissioner Dataset Request (MGMT_COMMISSIONER_SET.req)
145        #         to Leader Anycast or Routing Locator:
146        #         CoAP Request URI
147        #             CON POST coap://<L>:MM/c/cs
148        #         CoAP Payload
149        #             (missing Commissioner Session ID TLV)
150        #             Steering Data TLV (0xFF)
151        _mgmt_set_pkt = pkts.filter_wpan_src64(COMMISSIONER).\
152            filter_ipv6_2dsts(LEADER_ALOC, LEADER_RLOC).\
153            filter_coap_request(MGMT_COMMISSIONER_SET_URI).\
154            filter(lambda p:
155                   [NM_STEERING_DATA_TLV] == p.coap.tlv.type and\
156                   p.thread_meshcop.tlv.steering_data == Bytes('ff')
157                   ).\
158           must_next()
159
160        # Step 3: Leader sends a Set Commissioner Dataset Response (MGMT_COMMISSIONER_SET.rsp) to
161        #         Commissioner:
162        #         CoAP Response Code
163        #             2.04 Changed
164        #         CoAP Payload
165        #             State TLV (value = Reject)
166        pkts.filter_ipv6_src_dst(_mgmt_set_pkt.ipv6.dst, COMMISSIONER_RLOC).\
167            filter_coap_ack(MGMT_COMMISSIONER_SET_URI).\
168            filter(lambda p:
169                   [NM_STATE_TLV] == p.coap.tlv.type and\
170                   p.thread_meshcop.tlv.state == MESHCOP_REJECT
171                   ).\
172           must_next()
173
174        # Step 4: Commissioner sends a Set Commissioner Dataset Request (MGMT_COMMISSIONER_SET.req)
175        #         to Leader Anycast or Routing Locator:
176        #         CoAP Request URI
177        #             CON POST coap://<L>:MM/c/cs
178        #         CoAP Payload
179        #             Commissioner Session ID TLV
180        #             Steering Data TLV (0xFF)
181        _mgmt_set_pkt = pkts.filter_wpan_src64(COMMISSIONER).\
182            filter_ipv6_2dsts(LEADER_ALOC, LEADER_RLOC).\
183            filter_coap_request(MGMT_COMMISSIONER_SET_URI).\
184            filter(lambda p: {
185                              NM_COMMISSIONER_SESSION_ID_TLV,
186                              NM_STEERING_DATA_TLV
187                             } == set(p.thread_meshcop.tlv.type) and\
188                   p.thread_meshcop.tlv.steering_data == Bytes('ff')
189                   ).\
190           must_next()
191
192        # Step 5: Leader sends a Set Commissioner Dataset Response (MGMT_COMMISSIONER_SET.rsp) to
193        #         Commissioner:
194        #         CoAP Response Code
195        #             2.04 Changed
196        #         CoAP Payload
197        #             State TLV (value = Accept)
198        pkts.filter_ipv6_src_dst(_mgmt_set_pkt.ipv6.dst, COMMISSIONER_RLOC).\
199            filter_coap_ack(MGMT_COMMISSIONER_SET_URI).\
200            filter(lambda p:
201                   [NM_STATE_TLV] == p.coap.tlv.type and\
202                   p.thread_meshcop.tlv.state == MESHCOP_ACCEPT
203                   ).\
204           must_next()
205
206        # Step 6: Leader sends a MLE Data Response to the network with the
207        #         following TLVs:
208        #             - Active Timestamp TLV
209        #             - Leader Data TLV
210        #             - Network Data TLV
211        #             - Source Address TLV
212        pkts.filter_wpan_src64(LEADER).\
213            filter_LLANMA().\
214            filter_mle_cmd(MLE_DATA_RESPONSE).\
215            filter(lambda p: {
216                              NETWORK_DATA_TLV,
217                              SOURCE_ADDRESS_TLV,
218                              ACTIVE_TIMESTAMP_TLV,
219                              LEADER_DATA_TLV
220                             } == set(p.mle.tlv.type) and\
221                             {
222                              NWD_COMMISSIONING_DATA_TLV
223                             } == set(p.thread_nwd.tlv.type) and\
224                             {
225                              NM_BORDER_AGENT_LOCATOR_TLV,
226                              NM_COMMISSIONER_SESSION_ID_TLV,
227                              NM_STEERING_DATA_TLV
228                             } == set(p.thread_meshcop.tlv.type) and\
229                   p.thread_nwd.tlv.stable == [0]
230                   ).\
231            must_next()
232
233        # Step 7: Commissioner sends a Set Commissioner Dataset Request (MGMT_COMMISSIONER_SET.req)
234        #         to Leader Anycast or Routing Locator:
235        #         CoAP Request URI
236        #             CON POST coap://<L>:MM/c/cs
237        #         CoAP Payload
238        #             Commissioner Session ID TLV
239        #             Border Agent Locator TLV (0x0400) (not allowed TLV)
240        _mgmt_set_pkt = pkts.filter_wpan_src64(COMMISSIONER).\
241            filter_ipv6_2dsts(LEADER_ALOC, LEADER_RLOC).\
242            filter_coap_request(MGMT_COMMISSIONER_SET_URI).\
243            filter(lambda p: {
244                              NM_COMMISSIONER_SESSION_ID_TLV,
245                              NM_BORDER_AGENT_LOCATOR_TLV
246                             } == set(p.thread_meshcop.tlv.type) and\
247                   p.thread_meshcop.tlv.ba_locator == 0x0400
248                   ).\
249           must_next()
250
251        # Step 8: Leader sends a Set Commissioner Dataset Response (MGMT_COMMISSIONER_SET.rsp) to
252        #         Commissioner:
253        #         CoAP Response Code
254        #             2.04 Changed
255        #         CoAP Payload
256        #             State TLV (value = Reject)
257        pkts.filter_ipv6_src_dst(_mgmt_set_pkt.ipv6.dst, COMMISSIONER_RLOC).\
258            filter_coap_ack(MGMT_COMMISSIONER_SET_URI).\
259            filter(lambda p:
260                   [NM_STATE_TLV] == p.coap.tlv.type and\
261                   p.thread_meshcop.tlv.state == MESHCOP_REJECT
262                   ).\
263           must_next()
264
265        # Step 9: Commissioner sends a Set Commissioner Dataset Request (MGMT_COMMISSIONER_SET.req)
266        #         to Leader Anycast or Routing Locator:
267        #         CoAP Request URI
268        #             CON POST coap://<L>:MM/c/cs
269        #         CoAP Payload
270        #             Commissioner Session ID TLV
271        #             Steering Data TLV (0xFF)
272        #             Border Agent Locator TLV (0x0400) (not allowed TLV)
273        _mgmt_set_pkt = pkts.filter_wpan_src64(COMMISSIONER).\
274            filter_ipv6_2dsts(LEADER_ALOC, LEADER_RLOC).\
275            filter_coap_request(MGMT_COMMISSIONER_SET_URI).\
276            filter(lambda p: {
277                              NM_COMMISSIONER_SESSION_ID_TLV,
278                              NM_STEERING_DATA_TLV,
279                              NM_BORDER_AGENT_LOCATOR_TLV
280                             } == set(p.thread_meshcop.tlv.type) and\
281                   p.thread_meshcop.tlv.ba_locator == 0x0400 and\
282                   p.thread_meshcop.tlv.steering_data == Bytes('ff')
283                   ).\
284           must_next()
285
286        # Step 10: Leader sends a Set Commissioner Dataset Response (MGMT_COMMISSIONER_SET.rsp) to
287        #         Commissioner:
288        #         CoAP Response Code
289        #             2.04 Changed
290        #         CoAP Payload
291        #             State TLV (value = Reject)
292        pkts.filter_ipv6_src_dst(_mgmt_set_pkt.ipv6.dst, COMMISSIONER_RLOC).\
293            filter_coap_ack(MGMT_COMMISSIONER_SET_URI).\
294            filter(lambda p:
295                   [NM_STATE_TLV] == p.coap.tlv.type and\
296                   p.thread_meshcop.tlv.state == MESHCOP_REJECT
297                   ).\
298           must_next()
299
300        # Step 11: Commissioner sends a Set Commissioner Dataset Request (MGMT_COMMISSIONER_SET.req)
301        #         to Leader Anycast or Routing Locator:
302        #         CoAP Request URI
303        #             CON POST coap://<L>:MM/c/cs
304        #         CoAP Payload
305        #             Commissioner Session ID TLV (0xFFFF) (invalid value)
306        #             Steering Data TLV (0xFF)
307        _mgmt_set_pkt = pkts.filter_wpan_src64(COMMISSIONER).\
308            filter_ipv6_2dsts(LEADER_ALOC, LEADER_RLOC).\
309            filter_coap_request(MGMT_COMMISSIONER_SET_URI).\
310            filter(lambda p: {
311                              NM_COMMISSIONER_SESSION_ID_TLV,
312                              NM_STEERING_DATA_TLV
313                             } == set(p.thread_meshcop.tlv.type) and\
314                   p.thread_meshcop.tlv.commissioner_sess_id == 0xFFFF and\
315                   p.thread_meshcop.tlv.steering_data == Bytes('ff')
316                   ).\
317           must_next()
318
319        # Step 12: Leader sends a Set Commissioner Dataset Response (MGMT_COMMISSIONER_SET.rsp) to
320        #         Commissioner:
321        #         CoAP Response Code
322        #             2.04 Changed
323        #         CoAP Payload
324        #             State TLV (value = Reject)
325        pkts.filter_ipv6_src_dst(_mgmt_set_pkt.ipv6.dst, COMMISSIONER_RLOC).\
326            filter_coap_ack(MGMT_COMMISSIONER_SET_URI).\
327            filter(lambda p:
328                   [NM_STATE_TLV] == p.coap.tlv.type and\
329                   p.thread_meshcop.tlv.state == MESHCOP_REJECT
330                   ).\
331           must_next()
332
333        # Step 13: Commissioner sends a Set Commissioner Dataset Request (MGMT_COMMISSIONER_SET.req)
334        #         to Leader Anycast or Routing Locator:
335        #         CoAP Request URI
336        #             CON POST coap://<L>:MM/c/cs
337        #         CoAP Payload
338        #             Commissioner Session ID TLV
339        #             Steering Data TLV (0xFF)
340        #             Channel TLV (not allowed TLV)
341        _mgmt_set_pkt = pkts.filter_wpan_src64(COMMISSIONER).\
342            filter_ipv6_2dsts(LEADER_ALOC, LEADER_RLOC).\
343            filter_coap_request(MGMT_COMMISSIONER_SET_URI).\
344            filter(lambda p: {
345                              NM_COMMISSIONER_SESSION_ID_TLV,
346                              NM_STEERING_DATA_TLV,
347                              NM_CHANNEL_TLV
348                             } == set(p.thread_meshcop.tlv.type) and\
349                   p.thread_meshcop.tlv.steering_data == Bytes('ff')
350                   ).\
351           must_next()
352
353        # Step 14: Leader sends a Set Commissioner Dataset Response (MGMT_COMMISSIONER_SET.rsp) to
354        #         Commissioner:
355        #         CoAP Response Code
356        #             2.04 Changed
357        #         CoAP Payload
358        #             State TLV (value = Accept)
359        pkts.filter_ipv6_src_dst(_mgmt_set_pkt.ipv6.dst, COMMISSIONER_RLOC).\
360            filter_coap_ack(MGMT_COMMISSIONER_SET_URI).\
361            filter(lambda p:
362                   [NM_STATE_TLV] == p.coap.tlv.type and\
363                   p.thread_meshcop.tlv.state == MESHCOP_ACCEPT
364                   ).\
365           must_next()
366
367        # Step 15: Verify connectivity by sending an ICMPv6 Echo Request to the DUT mesh local address
368        _pkt = pkts.filter_ping_request().\
369            filter_ipv6_src_dst(COMMISSIONER_RLOC, LEADER_RLOC).\
370            must_next()
371        pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\
372            filter_ipv6_src_dst(LEADER_RLOC, COMMISSIONER_RLOC).\
373            must_next()
374
375        _pkt = pkts.filter_ping_request().\
376            filter_ipv6_src_dst(LEADER_RLOC, COMMISSIONER_RLOC).\
377            must_next()
378        pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\
379            filter_ipv6_src_dst(COMMISSIONER_RLOC, LEADER_RLOC).\
380            must_next()
381
382
383if __name__ == '__main__':
384    unittest.main()
385