1""" 2 Copyright (c) 2024, The OpenThread Authors. 3 All rights reserved. 4 5 Redistribution and use in source and binary forms, with or without 6 modification, are permitted provided that the following conditions are met: 7 1. Redistributions of source code must retain the above copyright 8 notice, this list of conditions and the following disclaimer. 9 2. Redistributions in binary form must reproduce the above copyright 10 notice, this list of conditions and the following disclaimer in the 11 documentation and/or other materials provided with the distribution. 12 3. Neither the name of the copyright holder nor the 13 names of its contributors may be used to endorse or promote products 14 derived from this software without specific prior written permission. 15 16 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 17 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 20 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 21 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 22 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 23 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 24 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 25 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 26 POSSIBILITY OF SUCH DAMAGE. 27""" 28 29from cli.command import Command, CommandResultNone 30from dataset.dataset import ThreadDataset, initial_dataset 31from tlv.dataset_tlv import MeshcopTlvType 32from copy import deepcopy 33 34 35def handle_dataset_entry_command(type: MeshcopTlvType, args, context): 36 ds: ThreadDataset = context['dataset'] 37 if len(args) == 0: 38 ds.get_entry(type).print_content() 39 return CommandResultNone() 40 41 ds.set_entry(type, args) 42 print('Done.') 43 return CommandResultNone() 44 45 46class DatasetClearCommand(Command): 47 48 def get_help_string(self) -> str: 49 return 'Clear dataset.' 50 51 async def execute_default(self, args, context): 52 ds: ThreadDataset = context['dataset'] 53 ds.clear() 54 return CommandResultNone() 55 56 57class DatasetHelpCommand(Command): 58 59 def get_help_string(self) -> str: 60 return 'Display help message and return.' 61 62 async def execute_default(self, args, context): 63 indent_width = 4 64 indentation = ' ' * indent_width 65 commands: ThreadDataset = context['commands'] 66 ds_command: Command = commands['dataset'] 67 print(ds_command.get_help_string()) 68 print('Subcommands:') 69 for name, subcommand in ds_command._subcommands.items(): 70 print(f'{indentation}{name}') 71 print(f'{indentation}{" " * indent_width}{subcommand.get_help_string()}') 72 return CommandResultNone() 73 74 75class DatasetHexCommand(Command): 76 77 def get_help_string(self) -> str: 78 return 'Get or set dataset as hex-encoded TLVs.' 79 80 async def execute_default(self, args, context): 81 ds: ThreadDataset = context['dataset'] 82 if args: 83 try: 84 ds_tmp = deepcopy(ds) 85 tlvs_str: str = args[0] 86 tlvs_bytes = bytes.fromhex(tlvs_str) 87 ds_tmp.set_from_bytes(tlvs_bytes) 88 ds.clear() 89 ds.set_from_bytes(tlvs_bytes) 90 except Exception as e: 91 print(e) 92 print(ds.to_bytes().hex()) 93 return CommandResultNone() 94 95 96class ReloadDatasetCommand(Command): 97 98 def get_help_string(self) -> str: 99 return 'Reset dataset to the initial value.' 100 101 async def execute_default(self, args, context): 102 context['dataset'].set_from_bytes(initial_dataset) 103 return CommandResultNone() 104 105 106class ActiveTimestampCommand(Command): 107 108 def get_help_string(self) -> str: 109 return 'View and set ActiveTimestamp seconds. Arguments: [seconds (int)]' 110 111 async def execute_default(self, args, context): 112 return handle_dataset_entry_command(MeshcopTlvType.ACTIVETIMESTAMP, args, context) 113 114 115class PendingTimestampCommand(Command): 116 117 def get_help_string(self) -> str: 118 return 'View and set PendingTimestamp seconds. Arguments: [seconds (int)]' 119 120 async def execute_default(self, args, context): 121 return handle_dataset_entry_command(MeshcopTlvType.PENDINGTIMESTAMP, args, context) 122 123 124class NetworkKeyCommand(Command): 125 126 def get_help_string(self) -> str: 127 return 'View and set NetworkKey. Arguments: [nk (hexstring, len=32)]' 128 129 async def execute_default(self, args, context): 130 return handle_dataset_entry_command(MeshcopTlvType.NETWORKKEY, args, context) 131 132 133class NetworkNameCommand(Command): 134 135 def get_help_string(self) -> str: 136 return 'View and set NetworkName. Arguments: [nn (string, maxlen=16)]' 137 138 async def execute_default(self, args, context): 139 return handle_dataset_entry_command(MeshcopTlvType.NETWORKNAME, args, context) 140 141 142class ExtPanIDCommand(Command): 143 144 def get_help_string(self) -> str: 145 return 'View and set ExtPanID. Arguments: [extpanid (hexstring, len=16)]' 146 147 async def execute_default(self, args, context): 148 return handle_dataset_entry_command(MeshcopTlvType.EXTPANID, args, context) 149 150 151class MeshLocalPrefixCommand(Command): 152 153 def get_help_string(self) -> str: 154 return 'View and set MeshLocalPrefix. Arguments: [mlp (hexstring, len=16)]' 155 156 async def execute_default(self, args, context): 157 return handle_dataset_entry_command(MeshcopTlvType.MESHLOCALPREFIX, args, context) 158 159 160class DelayTimerCommand(Command): 161 162 def get_help_string(self) -> str: 163 return 'View and set DelayTimer delay. Arguments: [delay (int)]' 164 165 async def execute_default(self, args, context): 166 return handle_dataset_entry_command(MeshcopTlvType.DELAYTIMER, args, context) 167 168 169class PanIDCommand(Command): 170 171 def get_help_string(self) -> str: 172 return 'View and set PanID. Arguments: [panid (hexstring, len=4)]' 173 174 async def execute_default(self, args, context): 175 return handle_dataset_entry_command(MeshcopTlvType.PANID, args, context) 176 177 178class ChannelCommand(Command): 179 180 def get_help_string(self) -> str: 181 return 'View and set Channel. Arguments: [channel (int)]' 182 183 async def execute_default(self, args, context): 184 return handle_dataset_entry_command(MeshcopTlvType.CHANNEL, args, context) 185 186 187class ChannelMaskCommand(Command): 188 189 def get_help_string(self) -> str: 190 return 'View and set ChannelMask. Arguments: [mask (hexstring)]' 191 192 async def execute_default(self, args, context): 193 return handle_dataset_entry_command(MeshcopTlvType.CHANNELMASK, args, context) 194 195 196class PskcCommand(Command): 197 198 def get_help_string(self) -> str: 199 return 'View and set Pskc. Arguments: [pskc (hexstring, maxlen=32)]' 200 201 async def execute_default(self, args, context): 202 return handle_dataset_entry_command(MeshcopTlvType.PSKC, args, context) 203 204 205class SecurityPolicyCommand(Command): 206 207 def get_help_string(self) -> str: 208 return 'View and set SecurityPolicy. Arguments: '\ 209 '[<rotation_time (int)> [flags (string)] [version_threshold (int)]]' 210 211 async def execute_default(self, args, context): 212 return handle_dataset_entry_command(MeshcopTlvType.SECURITYPOLICY, args, context) 213 214 215class DatasetCommand(Command): 216 217 def __init__(self): 218 self._subcommands = { 219 'clear': DatasetClearCommand(), 220 'help': DatasetHelpCommand(), 221 'hex': DatasetHexCommand(), 222 'reload': ReloadDatasetCommand(), 223 'activetimestamp': ActiveTimestampCommand(), 224 'pendingtimestamp': PendingTimestampCommand(), 225 'networkkey': NetworkKeyCommand(), 226 'networkname': NetworkNameCommand(), 227 'extpanid': ExtPanIDCommand(), 228 'meshlocalprefix': MeshLocalPrefixCommand(), 229 'delay': DelayTimerCommand(), 230 'panid': PanIDCommand(), 231 'channel': ChannelCommand(), 232 'channelmask': ChannelMaskCommand(), 233 'pskc': PskcCommand(), 234 'securitypolicy': SecurityPolicyCommand() 235 } 236 237 def get_help_string(self) -> str: 238 return 'View and manipulate current dataset. ' \ 239 'Call without parameters to show current dataset.' 240 241 async def execute_default(self, args, context): 242 ds: ThreadDataset = context['dataset'] 243 ds.print_content() 244 return CommandResultNone() 245