1#!/usr/bin/env python3 2# 3# Copyright (c) 2018, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28 29import time 30import wpan 31from wpan import verify 32 33# ----------------------------------------------------------------------------------------------------------------------- 34# Test description: verifies `ChannelManager` channel change process 35 36test_name = __file__[:-3] if __file__.endswith('.py') else __file__ 37print('-' * 120) 38print('Starting \'{}\''.format(test_name)) 39 40 41def verify_channel(nodes, new_channel, wait_time=20): 42 """ 43 This function checks the channel on a given list of `nodes` and verifies that all nodes 44 switch to a given `new_channel` (as int) within certain `wait_time` (int and in seconds) 45 """ 46 start_time = time.time() 47 48 while not all([(new_channel == int(node.get(wpan.WPAN_CHANNEL), 0)) for node in nodes]): 49 if time.time() - start_time > wait_time: 50 print('Took too long to switch to channel {} ({}>{} sec)'.format(new_channel, 51 time.time() - start_time, wait_time)) 52 exit(1) 53 time.sleep(0.1) 54 55 56# ----------------------------------------------------------------------------------------------------------------------- 57# Creating `wpan.Nodes` instances 58 59speedup = 4 60wpan.Node.set_time_speedup_factor(speedup) 61 62r1 = wpan.Node() 63r2 = wpan.Node() 64r3 = wpan.Node() 65sc1 = wpan.Node() 66ec1 = wpan.Node() 67sc2 = wpan.Node() 68sc3 = wpan.Node() 69 70all_nodes = [r1, r2, r3, sc1, ec1, sc2, sc3] 71 72# ----------------------------------------------------------------------------------------------------------------------- 73# Init all nodes 74 75wpan.Node.init_all_nodes() 76 77# ----------------------------------------------------------------------------------------------------------------------- 78# Build network topology 79 80for node in all_nodes: 81 node.set(wpan.WPAN_OT_LOG_LEVEL, '0') 82 83r1.allowlist_node(r2) 84r2.allowlist_node(r1) 85r1.allowlist_node(r3) 86r3.allowlist_node(r1) 87 88r1.allowlist_node(sc1) 89r1.allowlist_node(ec1) 90r2.allowlist_node(sc2) 91r3.allowlist_node(sc3) 92 93r1.form('channel-manager', channel=12) 94r2.join_node(r1, node_type=wpan.JOIN_TYPE_ROUTER) 95r3.join_node(r1, node_type=wpan.JOIN_TYPE_ROUTER) 96sc1.join_node(r1, node_type=wpan.JOIN_TYPE_SLEEPY_END_DEVICE) 97ec1.join_node(r1, node_type=wpan.JOIN_TYPE_END_DEVICE) 98sc2.join_node(r2, node_type=wpan.JOIN_TYPE_SLEEPY_END_DEVICE) 99sc3.join_node(r3, node_type=wpan.JOIN_TYPE_SLEEPY_END_DEVICE) 100 101sc1.set(wpan.WPAN_POLL_INTERVAL, '500') 102sc2.set(wpan.WPAN_POLL_INTERVAL, '500') 103sc3.set(wpan.WPAN_POLL_INTERVAL, '500') 104 105# ----------------------------------------------------------------------------------------------------------------------- 106# Test implementation 107 108# The channel manager delay is set from "openthread-core-toranj-config.h". 109# Verify that it is 2 seconds. 110 111verify(int(r1.get(wpan.WPAN_CHANNEL_MANAGER_DELAY), 0) == 2) 112 113verify_channel(all_nodes, 12) 114 115# Request a channel change to channel 13 from router r1 116 117r1.set(wpan.WPAN_CHANNEL_MANAGER_NEW_CHANNEL, '13') 118verify(int(r1.get(wpan.WPAN_CHANNEL_MANAGER_NEW_CHANNEL), 0) == 13) 119verify_channel(all_nodes, 13) 120 121# Request same channel change on multiple routers at the same time 122 123r1.set(wpan.WPAN_CHANNEL_MANAGER_NEW_CHANNEL, '14') 124r2.set(wpan.WPAN_CHANNEL_MANAGER_NEW_CHANNEL, '14') 125r3.set(wpan.WPAN_CHANNEL_MANAGER_NEW_CHANNEL, '14') 126verify_channel(all_nodes, 14) 127 128# Request different channel changes from same router (router r1). 129 130r1.set(wpan.WPAN_CHANNEL_MANAGER_NEW_CHANNEL, '15') 131verify_channel(all_nodes, 14) 132r1.set(wpan.WPAN_CHANNEL_MANAGER_NEW_CHANNEL, '16') 133verify_channel(all_nodes, 16) 134 135# Request different channels from two routers (r1 and r2) 136 137r1.set(wpan.WPAN_CHANNEL_MANAGER_DELAY, '20') # increase the time to ensure r1 change is in process 138r1.set(wpan.WPAN_CHANNEL_MANAGER_NEW_CHANNEL, '17') 139time.sleep(10.5 / speedup) 140verify_channel(all_nodes, 16) 141r2.set(wpan.WPAN_CHANNEL_MANAGER_NEW_CHANNEL, '18') 142verify_channel(all_nodes, 18) 143 144# ----------------------------------------------------------------------------------------------------------------------- 145# Test finished 146 147wpan.Node.finalize_all_nodes() 148 149print('\'{}\' passed.'.format(test_name)) 150