1#!/usr/bin/env python3
2#
3#  Copyright (c) 2022, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29
30import ipaddress
31import unittest
32
33import command
34import config
35import thread_cert
36
37# Test description:
38#
39#   This test verifies behavior of MLE related to handling of received
40#   larger key sequence based on the MLE message class (authoritative,
41#   or peer).
42#
43#
44# Topology:
45#
46#   leader ---  router
47#    |    \
48#    |     \
49#  child   reed
50#
51
52LEADER = 1
53CHILD = 2
54REED = 3
55ROUTER = 4
56
57
58class MleMsgKeySeqJump(thread_cert.TestCase):
59    USE_MESSAGE_FACTORY = False
60    SUPPORT_NCP = False
61
62    TOPOLOGY = {
63        LEADER: {
64            'name': 'LEADER',
65            'mode': 'rdn',
66        },
67        CHILD: {
68            'name': 'CHILD',
69            'is_mtd': True,
70            'mode': 'rn',
71        },
72        REED: {
73            'name': 'REED',
74            'mode': 'rn'
75        },
76        ROUTER: {
77            'name': 'ROUTER',
78            'mode': 'rdn',
79        },
80    }
81
82    def test(self):
83        leader = self.nodes[LEADER]
84        child = self.nodes[CHILD]
85        reed = self.nodes[REED]
86        router = self.nodes[ROUTER]
87
88        nodes = [leader, child, reed, router]
89
90        #-------------------------------------------------------------------
91        # Form the network.
92
93        for node in nodes:
94            node.set_key_sequence_counter(0)
95
96        leader.start()
97        self.simulator.go(config.LEADER_STARTUP_DELAY)
98        self.assertEqual(leader.get_state(), 'leader')
99
100        child.start()
101        reed.start()
102        self.simulator.go(5)
103        self.assertEqual(child.get_state(), 'child')
104        self.assertEqual(reed.get_state(), 'child')
105
106        router.start()
107        self.simulator.go(config.ROUTER_STARTUP_DELAY)
108        self.assertEqual(router.get_state(), 'router')
109
110        #-------------------------------------------------------------------
111        # Validate the initial key seq counter on all nodes
112
113        for node in nodes:
114            self.assertEqual(node.get_key_sequence_counter(), 0)
115
116        #-------------------------------------------------------------------
117        # Manually increase the key seq on child. Then change MLE mode on
118        # child which triggers a "Child Update Request" to its parent
119        # (leader). The key jump noticed on parent side would trigger an
120        # authoritative MLE Child Update exchange (including challenge and
121        # response TLVs) and causes the parent (leader) to also adopt the
122        # larger key seq.
123
124        child.set_key_sequence_counter(5)
125        self.assertEqual(child.get_key_sequence_counter(), 5)
126
127        child.set_mode('r')
128        self.simulator.go(1)
129
130        self.assertEqual(child.get_key_sequence_counter(), 5)
131        self.assertEqual(leader.get_key_sequence_counter(), 5)
132
133        #-------------------------------------------------------------------
134        # Wait long enough for MLE Advertisement to be sent. This would
135        # trigger reed and router to also notice key seq jump and try to
136        # re-establish link again (using authoritative exchanges). Validate
137        # that all nodes are using the new key seq.
138
139        self.simulator.go(52)
140        for node in nodes:
141            self.assertEqual(node.get_key_sequence_counter(), 5)
142
143        #-------------------------------------------------------------------
144        # Manually increase the key seq on leader. Wait for max time between
145        # advertisements. This would trigger both reed and router
146        # to notice key seq jump and try to re-establish link (link
147        # request/accept exchange). Validate that they all adopt the new
148        # key seq.
149
150        leader.set_key_sequence_counter(10)
151        self.assertEqual(leader.get_key_sequence_counter(), 10)
152
153        self.simulator.go(52)
154
155        self.assertEqual(router.get_key_sequence_counter(), 10)
156        self.assertEqual(reed.get_key_sequence_counter(), 10)
157
158        #-------------------------------------------------------------------
159        # Change MLE mode on child to trigger a "Child Update Request" exchange
160        # which should then update the key seq on child as well.
161
162        child.set_mode('rn')
163        self.simulator.go(5)
164        self.assertEqual(child.get_key_sequence_counter(), 10)
165
166        #-------------------------------------------------------------------
167        # Stop all other nodes except for leader. Move the leader key seq
168        # forward and then restart all other node. Validate that router,
169        # reed and child all re-attach successfully and adopt the higher key
170        # sequence.
171
172        router.stop()
173        reed.stop()
174        child.stop()
175
176        leader.set_key_sequence_counter(15)
177        self.assertEqual(leader.get_key_sequence_counter(), 15)
178
179        child.start()
180        reed.start()
181        router.start()
182        self.simulator.go(5)
183
184        self.assertEqual(child.get_state(), 'child')
185        self.assertEqual(reed.get_state(), 'child')
186        self.assertEqual(router.get_state(), 'router')
187
188        for node in nodes:
189            self.assertEqual(node.get_key_sequence_counter(), 15)
190
191        #-------------------------------------------------------------------
192        # Stop all other nodes except for leader. Move the child key seq
193        # forward and then restart child. Ensure it re-attached successfully
194        # to leader and that leader adopts the higher key seq counter.
195
196        router.stop()
197        reed.stop()
198        child.stop()
199
200        child.set_key_sequence_counter(20)
201        self.assertEqual(child.get_key_sequence_counter(), 20)
202
203        child.start()
204        self.simulator.go(5)
205
206        self.assertEqual(child.get_state(), 'child')
207        self.assertEqual(leader.get_key_sequence_counter(), 20)
208
209        #-------------------------------------------------------------------
210        # Restart router and reed and ensure they are re-attached and get the
211        # higher key seq counter.
212
213        router.start()
214        reed.start()
215
216        self.simulator.go(5)
217        self.assertEqual(router.get_state(), 'router')
218        self.assertEqual(reed.get_state(), 'child')
219
220        self.assertEqual(router.get_key_sequence_counter(), 20)
221        self.assertEqual(reed.get_key_sequence_counter(), 20)
222
223        #-------------------------------------------------------------------
224        # Move forward the key seq counter by one on router. Wait for max
225        # time between advertisements. Validate that leader adopts the higher
226        # counter value.
227
228        router.set_key_sequence_counter(21)
229        self.assertEqual(router.get_key_sequence_counter(), 21)
230
231        self.simulator.go(52)
232        self.assertEqual(leader.get_key_sequence_counter(), 21)
233        self.assertEqual(reed.get_key_sequence_counter(), 21)
234
235        child.set_mode('r')
236        self.simulator.go(2)
237        self.assertEqual(child.get_key_sequence_counter(), 21)
238
239        #-------------------------------------------------------------------
240        # Force a reattachment from the child with a higher key seq counter,
241        # so that the leader generated a fragmented Child Id Response. Ensure
242        # the child becomes attached on first attempt while the leader adopts
243        # the higher counter value.
244
245        router.stop()
246        reed.stop()
247
248        child.factory_reset()
249        self.assertEqual(child.get_state(), 'disabled')
250
251        child.set_active_dataset(channel=leader.get_channel(),
252                                 network_key=leader.get_networkkey(),
253                                 panid=leader.get_panid())
254        child.set_key_sequence_counter(25)
255        self.assertEqual(child.get_key_sequence_counter(), 25)
256
257        child.start()
258        self.simulator.go(2)
259
260        self.assertEqual(child.get_state(), 'child')
261        self.assertEqual(leader.get_key_sequence_counter(), 25)
262
263
264if __name__ == '__main__':
265    unittest.main()
266