1#!/usr/bin/env python3
2#
3#  Copyright (c) 2019, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28
29import wpan
30from wpan import verify
31
32# -----------------------------------------------------------------------------------------------------------------------
33# Test description: Address Cache Table
34#
35# This test verifies that address cache entry associated with a SED child
36# addresses is removed from new parent node ensuring we would not have a
37# routing loop.
38
39test_name = __file__[:-3] if __file__.endswith('.py') else __file__
40print('-' * 120)
41print('Starting \'{}\''.format(test_name))
42
43# -----------------------------------------------------------------------------------------------------------------------
44# Creating `wpan.Nodes` instances
45
46speedup = 4
47wpan.Node.set_time_speedup_factor(speedup)
48
49r1 = wpan.Node()
50r2 = wpan.Node()
51r3 = wpan.Node()
52c = wpan.Node()
53c3 = wpan.Node()
54
55# -----------------------------------------------------------------------------------------------------------------------
56# Init all nodes
57
58wpan.Node.init_all_nodes()
59
60# -----------------------------------------------------------------------------------------------------------------------
61# Build network topology
62#
63#   r3 ---- r1 ---- r2
64#   |               |
65#   |               |
66#   c3              c
67#
68# c is initially attached to r2 but it switches parent during test to r1 and then r3
69# c3 is just added to make sure r3 become router quickly (not involved in test)
70
71PREFIX = "fd00:1234::"
72POLL_INTERVAL = 400
73
74r1.form("addr-cache")
75
76r1.add_prefix(PREFIX, stable=True, on_mesh=True, slaac=True, preferred=True)
77
78r1.allowlist_node(r2)
79r2.allowlist_node(r1)
80r2.join_node(r1, wpan.JOIN_TYPE_ROUTER)
81
82c.set(wpan.WPAN_POLL_INTERVAL, str(POLL_INTERVAL))
83c.allowlist_node(r2)
84r2.allowlist_node(c)
85c.join_node(r2, wpan.JOIN_TYPE_SLEEPY_END_DEVICE)
86
87r3.allowlist_node(r1)
88r1.allowlist_node(r3)
89r3.join_node(r1, wpan.JOIN_TYPE_ROUTER)
90
91c3.allowlist_node(r3)
92r3.allowlist_node(c3)
93c3.join_node(r3, wpan.JOIN_TYPE_END_DEVICE)
94
95# -----------------------------------------------------------------------------------------------------------------------
96# Test implementation
97#
98
99ROUTER_TABLE_WAIT_TIME = 30 / speedup + 5
100
101INVALID_ROUTER_ID = 63
102
103verify(r1.get(wpan.WPAN_NODE_TYPE) == wpan.NODE_TYPE_LEADER)
104verify(r2.get(wpan.WPAN_NODE_TYPE) == wpan.NODE_TYPE_ROUTER)
105verify(r3.get(wpan.WPAN_NODE_TYPE) == wpan.NODE_TYPE_ROUTER)
106verify(c.get(wpan.WPAN_NODE_TYPE) == wpan.NODE_TYPE_SLEEPY_END_DEVICE)
107verify(c3.get(wpan.WPAN_NODE_TYPE) == wpan.NODE_TYPE_END_DEVICE)
108
109r1_address = r1.find_ip6_address_with_prefix(PREFIX)
110r2_address = r2.find_ip6_address_with_prefix(PREFIX)
111c_address = c.find_ip6_address_with_prefix(PREFIX)
112
113# Send a single UDP message from r1 to c
114#
115# This adds an address cache entry on r1 for c pointing to r2 (the current parent of c).
116
117sender = r1.prepare_tx(r1_address, c_address, "Hi from r1 to c")
118recver = c.prepare_rx(sender)
119wpan.Node.perform_async_tx_rx()
120verify(sender.was_successful and recver.was_successful)
121
122# Force c to switch its parent from r2 to r1
123#
124#   r3 ---- r1 ---- r2
125#   |       |
126#   |       |
127#   c3      c
128
129CHILD_SUPERVISION_CHECK_TIMEOUT = 2
130PARENT_SUPERVISION_INTERVAL = 1
131
132REATTACH_WAIT_TIME = CHILD_SUPERVISION_CHECK_TIMEOUT / speedup + 6
133
134c.set(wpan.WPAN_CHILD_SUPERVISION_CHECK_TIMEOUT, str(CHILD_SUPERVISION_CHECK_TIMEOUT))
135r2.set(wpan.WPAN_CHILD_SUPERVISION_INTERVAL, str(PARENT_SUPERVISION_INTERVAL))
136r1.set(wpan.WPAN_CHILD_SUPERVISION_INTERVAL, str(PARENT_SUPERVISION_INTERVAL))
137r3.set(wpan.WPAN_CHILD_SUPERVISION_INTERVAL, str(PARENT_SUPERVISION_INTERVAL))
138
139r2.un_allowlist_node(c)
140r1.allowlist_node(c)
141c.allowlist_node(r1)
142
143# Wait for c to detach from r2 and attach to r1.
144
145
146def check_c_is_removed_from_r2_child_table():
147    child_table = wpan.parse_list(r2.get(wpan.WPAN_THREAD_CHILD_TABLE))
148    verify(len(child_table) == 0)
149
150
151wpan.verify_within(check_c_is_removed_from_r2_child_table, REATTACH_WAIT_TIME)
152
153# check that c is now a child of r1
154child_table = wpan.parse_list(r1.get(wpan.WPAN_THREAD_CHILD_TABLE))
155verify(len(child_table) == 1)
156
157# Send a single UDP message from r2 to c
158#
159# This adds an address cache entry on r2 for c pointing to r1 (the current parent of c).
160
161sender = r2.prepare_tx(r2_address, c_address, "Hi from r2 to c")
162recver = c.prepare_rx(sender)
163wpan.Node.perform_async_tx_rx()
164verify(sender.was_successful and recver.was_successful)
165
166# Force c to switch its parent from r1 to r3
167#
168#   r3 ---- r1 ---- r2
169#   | \
170#   |  \
171#   c3  c
172
173r1.un_allowlist_node(c)
174r3.allowlist_node(c)
175c.allowlist_node(r3)
176
177# Wait for c to detach from r1 and attach to r3.
178
179
180def check_c_is_removed_from_r1_child_table():
181    child_table = wpan.parse_list(r1.get(wpan.WPAN_THREAD_CHILD_TABLE))
182    verify(len(child_table) == 0)
183
184
185wpan.verify_within(check_c_is_removed_from_r1_child_table, REATTACH_WAIT_TIME)
186
187# check that c is now a child of r3 (r3 should have two child, c and c3)
188child_table = wpan.parse_list(r3.get(wpan.WPAN_THREAD_CHILD_TABLE))
189verify(len(child_table) == 2)
190
191# Send a single UDP message from r1 to c
192#
193# If the r1 address cache entry is not cleared when c attached to r1,
194# r1 will still have an entry pointing to r2, and r2 will have an entry
195# pointing to r1, thus creating a loop (the msg will not be delivered to r3)
196
197sender = r1.prepare_tx(r1_address, c_address, "Hi from r1 to c")
198recver = c.prepare_rx(sender)
199wpan.Node.perform_async_tx_rx()
200verify(sender.was_successful and recver.was_successful)
201
202# -----------------------------------------------------------------------------------------------------------------------
203# Test finished
204
205wpan.Node.finalize_all_nodes()
206
207print('\'{}\' passed.'.format(test_name))
208