1#!/usr/bin/env python3 2# 3# Copyright (c) 2018, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28 29from wpan import verify 30import wpan 31import time 32 33# ----------------------------------------------------------------------------------------------------------------------- 34# Test description: Orphaned node attach through MLE Announcement 35 36test_name = __file__[:-3] if __file__.endswith('.py') else __file__ 37print('-' * 120) 38print('Starting \'{}\''.format(test_name)) 39 40 41def verify_channel(nodes, new_channel, wait_time=20): 42 """ 43 This function checks the channel on a given list of `nodes` and verifies that all nodes 44 switch to a given `new_channel` (as int) within certain `wait_time` (int and in seconds) 45 """ 46 start_time = time.time() 47 48 while not all([(new_channel == int(node.get(wpan.WPAN_CHANNEL), 0)) for node in nodes]): 49 if time.time() - start_time > wait_time: 50 print('Took too long to switch to channel {} ({}>{} sec)'.format(new_channel, 51 time.time() - start_time, wait_time)) 52 exit(1) 53 time.sleep(0.1) 54 55 56# ----------------------------------------------------------------------------------------------------------------------- 57# Creating `wpan.Nodes` instances 58 59router = wpan.Node() 60c1 = wpan.Node() 61c2 = wpan.Node() 62 63all_nodes = [router, c1, c2] 64 65# ----------------------------------------------------------------------------------------------------------------------- 66# Init all nodes 67 68wpan.Node.init_all_nodes() 69 70# ----------------------------------------------------------------------------------------------------------------------- 71# Build network topology 72 73router.form('announce-tst', channel=11) 74 75c1.join_node(router, node_type=wpan.JOIN_TYPE_SLEEPY_END_DEVICE) 76c2.join_node(router, node_type=wpan.JOIN_TYPE_SLEEPY_END_DEVICE) 77 78c1.set(wpan.WPAN_POLL_INTERVAL, '500') 79c2.set(wpan.WPAN_POLL_INTERVAL, '500') 80 81c1.set(wpan.WPAN_THREAD_DEVICE_MODE, '5') 82c2.set(wpan.WPAN_THREAD_DEVICE_MODE, '5') 83 84# ----------------------------------------------------------------------------------------------------------------------- 85# Test implementation 86 87# Reset c2 and keep it in detached state 88c2.set('Daemon:AutoAssociateAfterReset', 'false') 89c2.reset() 90 91# Switch the rest of network to channel 26 92router.set(wpan.WPAN_CHANNEL_MANAGER_NEW_CHANNEL, '26') 93verify_channel([router, c1], 26) 94 95# Now re-enable c2 and verify that it does attach to router and is on channel 26 96# c2 would go through the ML Announce recovery. 97 98c2.set('Daemon:AutoAssociateAfterReset', 'true') 99c2.reset() 100verify(int(c2.get(wpan.WPAN_CHANNEL), 0) == 11) 101 102# wait for 20s for c2 to be attached/associated 103 104 105def check_c2_is_associated(): 106 verify(c2.is_associated()) 107 108 109wpan.verify_within(check_c2_is_associated, 20) 110 111# Check that c2 is now on channel 26. 112verify(int(c2.get(wpan.WPAN_CHANNEL), 0) == 26) 113 114# ----------------------------------------------------------------------------------------------------------------------- 115# Test finished 116 117wpan.Node.finalize_all_nodes() 118 119print('\'{}\' passed.'.format(test_name)) 120