1#!/usr/bin/env python3
2#
3#  Copyright (c) 2021, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29
30import os
31import unittest
32
33import config
34import thread_cert
35
36from pktverify.packet_filter import PacketFilter
37
38# Test description:
39# The purpose of this test is to verify a MED will inform its previous parent when re-attaches to another parent.
40#
41# Initial Topology:
42#
43#  LEADER ----- ROUTER
44#    |
45#   MED
46#
47
48LEADER = 1
49ROUTER = 2
50MED = 3
51
52LONG_CHILD_TIMEOUT = 120
53
54
55class TestReset(thread_cert.TestCase):
56    SUPPORT_NCP = False
57    USE_MESSAGE_FACTORY = False
58
59    TOPOLOGY = {
60        LEADER: {
61            'name': 'LEADER',
62            'mode': 'rdn',
63            'allowlist': [ROUTER, MED]
64        },
65        ROUTER: {
66            'name': 'ROUTER',
67            'mode': 'rdn',
68            'allowlist': [LEADER]
69        },
70        MED: {
71            'name': 'MED',
72            'is_mtd': True,
73            'mode': 'rn',
74            'allowlist': [LEADER],
75            'timeout': LONG_CHILD_TIMEOUT,
76        },
77    }
78
79    def test(self):
80        if 'posix' in os.getenv('OT_CLI_PATH', ''):
81            self.skipTest("skip for posix tests")
82
83        self.nodes[LEADER].start()
84        self.simulator.go(config.LEADER_STARTUP_DELAY)
85        self.assertEqual(self.nodes[LEADER].get_state(), 'leader')
86
87        self.nodes[ROUTER].start()
88        self.simulator.go(config.ROUTER_STARTUP_DELAY)
89        self.assertEqual(self.nodes[ROUTER].get_state(), 'router')
90
91        self.nodes[MED].start()
92        self.simulator.go(7)
93        self.assertEqual(self.nodes[MED].get_state(), 'child')
94
95        self.assertIsChildOf(MED, LEADER)
96
97        self.nodes[LEADER].remove_allowlist(self.nodes[MED].get_addr64())
98        self.nodes[MED].remove_allowlist(self.nodes[LEADER].get_addr64())
99
100        self.nodes[ROUTER].add_allowlist(self.nodes[MED].get_addr64())
101        self.nodes[MED].add_allowlist(self.nodes[ROUTER].get_addr64())
102
103        self.nodes[MED].set_timeout(config.DEFAULT_CHILD_TIMEOUT)
104
105        self.simulator.go(config.DEFAULT_CHILD_TIMEOUT * 2)
106        self.assertIsChildOf(MED, ROUTER)
107
108        # Verify MED is not in the LEADER's Child Table.
109        med_extaddr = self.nodes[MED].get_addr64()
110        self.assertFalse(any(info['extaddr'] == med_extaddr for info in self.nodes[LEADER].get_child_table().values()))
111
112        self.collect_ipaddrs()
113        self.collect_rlocs()
114
115    def verify(self, pv):
116        pkts: PacketFilter = pv.pkts
117        pv.summary.show()
118
119        MED = pv.vars['MED']
120        LEADER_RLOC = pv.vars['LEADER_RLOC']
121
122        # MED should attach to LEADER first.
123        pv.verify_attached('MED', 'LEADER', child_type='MTD')
124        # MED should re-attach to ROUTER.
125        pv.verify_attached('MED', 'ROUTER', child_type='MTD')
126        # MED should send empty IPv6 message to inform previous parent (LEADER).
127        pkts.filter_wpan_src64(MED).filter('lowpan.dst == {LEADER_RLOC} and lowpan.next == 0x3b',
128                                           LEADER_RLOC=LEADER_RLOC).must_next()
129
130    def assertIsChildOf(self, childid, parentid):
131        childRloc16 = self.nodes[childid].get_addr16()
132        parentRloc16 = self.nodes[parentid].get_addr16()
133        self.assertEqual(parentRloc16 & 0xfc00, parentRloc16)
134        self.assertEqual(childRloc16 & 0xfc00, parentRloc16)
135
136        child_extaddr = self.nodes[childid].get_addr64()
137        self.assertTrue(
138            any(info['extaddr'] == child_extaddr for info in self.nodes[parentid].get_child_table().values()))
139
140
141if __name__ == '__main__':
142    unittest.main()
143