1#!/usr/bin/env python3 2# 3# Copyright (c) 2020, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 'AS IS' 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import unittest 31 32import config 33import mle 34import thread_cert 35from pktverify import consts 36 37LEADER = 1 38SSED_1 = 2 39 40 41class SSED_CslTransmission(thread_cert.TestCase): 42 TOPOLOGY = { 43 LEADER: { 44 'version': '1.2', 45 }, 46 SSED_1: { 47 'version': '1.2', 48 'is_mtd': True, 49 'mode': '-', 50 }, 51 } 52 """All nodes are created with default configurations""" 53 54 def test(self): 55 56 self.nodes[SSED_1].set_csl_period(consts.CSL_DEFAULT_PERIOD) 57 self.nodes[SSED_1].set_csl_timeout(consts.CSL_DEFAULT_TIMEOUT) 58 59 self.nodes[SSED_1].get_csl_info() 60 61 self.nodes[LEADER].start() 62 self.simulator.go(config.LEADER_STARTUP_DELAY) 63 self.assertEqual(self.nodes[LEADER].get_state(), 'leader') 64 65 self.nodes[SSED_1].start() 66 self.simulator.go(7) 67 self.assertEqual(self.nodes[SSED_1].get_state(), 'child') 68 69 print('SSED rloc:%s' % self.nodes[SSED_1].get_rloc()) 70 self.assertTrue(self.nodes[LEADER].ping(self.nodes[SSED_1].get_rloc())) 71 self.simulator.go(5) 72 73 ssed_messages = self.simulator.get_messages_sent_by(SSED_1) 74 msg = ssed_messages.next_mle_message(mle.CommandType.CHILD_UPDATE_REQUEST) 75 76 self.nodes[SSED_1].set_csl_channel(consts.CSL_DEFAULT_CHANNEL) 77 self.simulator.go(1) 78 ssed_messages = self.simulator.get_messages_sent_by(SSED_1) 79 msg = ssed_messages.next_mle_message(mle.CommandType.CHILD_UPDATE_REQUEST) 80 msg.assertMleMessageContainsTlv(mle.CslChannel) 81 self.assertTrue(self.nodes[LEADER].ping(self.nodes[SSED_1].get_rloc())) 82 self.simulator.go(5) 83 84 self.nodes[SSED_1].set_csl_channel(0) 85 self.simulator.go(1) 86 self.assertTrue(self.nodes[LEADER].ping(self.nodes[SSED_1].get_rloc())) 87 self.simulator.go(5) 88 89 ssed_messages = self.simulator.get_messages_sent_by(SSED_1) 90 msg = ssed_messages.next_mle_message(mle.CommandType.CHILD_UPDATE_REQUEST) 91 92 self.nodes[SSED_1].set_csl_period(0) 93 self.assertFalse(self.nodes[LEADER].ping(self.nodes[SSED_1].get_rloc())) 94 self.simulator.go(2) 95 96 self.nodes[SSED_1].set_pollperiod(1000) 97 self.simulator.go(2) 98 self.nodes[SSED_1].set_pollperiod(0) 99 self.simulator.go(5) 100 101 # Check if data poll is not sent if data packet with CSL IE is sent to parent 102 self.nodes[SSED_1].set_csl_period(consts.CSL_DEFAULT_PERIOD) 103 self.simulator.go(consts.CSL_DEFAULT_TIMEOUT / 2) 104 self.assertTrue(self.nodes[LEADER].ping(self.nodes[SSED_1].get_rloc())) 105 self.flush_all() 106 self.simulator.go(consts.CSL_DEFAULT_TIMEOUT / 2) 107 ssed_messages = self.simulator.get_messages_sent_by(SSED_1) 108 self.assertIsNone(ssed_messages.next_data_poll()) 109 110 # Check if data poll is used to sync SSED's parent before CSL timeout 111 self.assertTrue(self.nodes[LEADER].ping(self.nodes[SSED_1].get_rloc())) 112 self.flush_all() 113 self.simulator.go(consts.CSL_DEFAULT_TIMEOUT) 114 ssed_messages = self.simulator.get_messages_sent_by(SSED_1) 115 self.assertIsNotNone(ssed_messages.next_data_poll()) 116 117 # Check if data poll is used to resync SSED's parent after retransmission 118 leader_rloc = self.nodes[LEADER].get_rloc() 119 self.nodes[LEADER].stop() 120 self.flush_all() 121 self.assertFalse(self.nodes[SSED_1].ping(leader_rloc)) 122 self.simulator.go(2) 123 ssed_messages = self.simulator.get_messages_sent_by(SSED_1) 124 self.assertIsNotNone(ssed_messages.next_data_poll()) 125 126 # Check if SSED is able to resynchronize with the parent after it is gone longer than the timeout 127 self.nodes[LEADER].start() 128 self.simulator.go(config.LEADER_RESET_DELAY) 129 self.nodes[SSED_1].set_csl_timeout(8) 130 self.nodes[SSED_1].set_timeout(10) 131 self.simulator.go(2) 132 self.nodes[LEADER].stop() 133 self.simulator.go(25) 134 self.flush_all() 135 self.nodes[LEADER].start() 136 self.simulator.go(config.LEADER_RESET_DELAY) 137 self.assertEqual(self.nodes[LEADER].get_state(), 'leader') 138 self.simulator.go(5) 139 self.assertEqual(self.nodes[SSED_1].get_state(), 'child') 140 ssed_messages = self.simulator.get_messages_sent_by(SSED_1) 141 self.assertIsNotNone(ssed_messages.next_mle_message(mle.CommandType.CHILD_UPDATE_REQUEST)) 142 143 144if __name__ == '__main__': 145 unittest.main() 146