1""" 2 Copyright (c) 2024, The OpenThread Authors. 3 All rights reserved. 4 5 Redistribution and use in source and binary forms, with or without 6 modification, are permitted provided that the following conditions are met: 7 1. Redistributions of source code must retain the above copyright 8 notice, this list of conditions and the following disclaimer. 9 2. Redistributions in binary form must reproduce the above copyright 10 notice, this list of conditions and the following disclaimer in the 11 documentation and/or other materials provided with the distribution. 12 3. Neither the name of the copyright holder nor the 13 names of its contributors may be used to endorse or promote products 14 derived from this software without specific prior written permission. 15 16 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 17 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 20 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 21 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 22 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 23 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 24 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 25 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 26 POSSIBILITY OF SUCH DAMAGE. 27""" 28 29from abc import abstractmethod 30from ble.ble_connection_constants import BBTC_SERVICE_UUID, BBTC_TX_CHAR_UUID, \ 31 BBTC_RX_CHAR_UUID 32from ble.ble_stream import BleStream 33from ble.ble_stream_secure import BleStreamSecure 34from ble import ble_scanner 35from tlv.tlv import TLV 36from tlv.tcat_tlv import TcatTLVType 37from cli.command import Command, CommandResultNone, CommandResultTLV 38from dataset.dataset import ThreadDataset 39from utils import select_device_by_user_input 40from os import path 41from time import time 42from secrets import token_bytes 43 44 45class HelpCommand(Command): 46 47 def get_help_string(self) -> str: 48 return 'Display help and return.' 49 50 async def execute_default(self, args, context): 51 commands = context['commands'] 52 for name, command in commands.items(): 53 print(f'{name}') 54 command.print_help(indent=1) 55 return CommandResultNone() 56 57 58class BleCommand(Command): 59 60 @abstractmethod 61 def get_log_string(self) -> str: 62 pass 63 64 @abstractmethod 65 def prepare_data(self, context): 66 pass 67 68 async def execute_default(self, args, context): 69 bless: BleStreamSecure = context['ble_sstream'] 70 71 print(self.get_log_string()) 72 data = self.prepare_data(context) 73 response = await bless.send_with_resp(data) 74 if not response: 75 return 76 tlv_response = TLV.from_bytes(response) 77 return CommandResultTLV(tlv_response) 78 79 80class HelloCommand(BleCommand): 81 82 def get_log_string(self) -> str: 83 return 'Sending hello world...' 84 85 def get_help_string(self) -> str: 86 return 'Send round trip "Hello world!" message.' 87 88 def prepare_data(self, context): 89 return TLV(TcatTLVType.APPLICATION.value, bytes('Hello world!', 'ascii')).to_bytes() 90 91 92class CommissionCommand(BleCommand): 93 94 def get_log_string(self) -> str: 95 return 'Commissioning...' 96 97 def get_help_string(self) -> str: 98 return 'Update the connected device with current dataset.' 99 100 def prepare_data(self, context): 101 dataset: ThreadDataset = context['dataset'] 102 dataset_bytes = dataset.to_bytes() 103 return TLV(TcatTLVType.ACTIVE_DATASET.value, dataset_bytes).to_bytes() 104 105 106class DecommissionCommand(BleCommand): 107 108 def get_log_string(self) -> str: 109 return 'Disabling Thread and decommissioning device...' 110 111 def get_help_string(self) -> str: 112 return 'Stop Thread interface and decommission device from current network.' 113 114 def prepare_data(self, context): 115 return TLV(TcatTLVType.DECOMMISSION.value, bytes()).to_bytes() 116 117 118class GetDeviceIdCommand(BleCommand): 119 120 def get_log_string(self) -> str: 121 return 'Retrieving device id.' 122 123 def get_help_string(self) -> str: 124 return 'Get unique identifier for the TCAT device.' 125 126 def prepare_data(self, context): 127 return TLV(TcatTLVType.GET_DEVICE_ID.value, bytes()).to_bytes() 128 129 130class GetExtPanIDCommand(BleCommand): 131 132 def get_log_string(self) -> str: 133 return 'Retrieving extended PAN ID.' 134 135 def get_help_string(self) -> str: 136 return 'Get extended PAN ID that is commissioned in the active dataset.' 137 138 def prepare_data(self, context): 139 return TLV(TcatTLVType.GET_EXT_PAN_ID.value, bytes()).to_bytes() 140 141 142class GetProvisioningUrlCommand(BleCommand): 143 144 def get_log_string(self) -> str: 145 return 'Retrieving provisioning url.' 146 147 def get_help_string(self) -> str: 148 return 'Get a URL for an application suited to commission the TCAT device.' 149 150 def prepare_data(self, context): 151 return TLV(TcatTLVType.GET_PROVISIONING_URL.value, bytes()).to_bytes() 152 153 154class GetNetworkNameCommand(BleCommand): 155 156 def get_log_string(self) -> str: 157 return 'Retrieving network name.' 158 159 def get_help_string(self) -> str: 160 return 'Get the Thread network name that is commissioned in the active dataset.' 161 162 def prepare_data(self, context): 163 return TLV(TcatTLVType.GET_NETWORK_NAME.value, bytes()).to_bytes() 164 165 166class PingCommand(Command): 167 168 def get_help_string(self) -> str: 169 return 'Send echo request to TCAT device.' 170 171 async def execute_default(self, args, context): 172 bless: BleStreamSecure = context['ble_sstream'] 173 payload_size = 10 174 max_payload = 512 175 if len(args) > 0: 176 payload_size = int(args[0]) 177 if payload_size > max_payload: 178 print(f'Payload size too large. Maximum supported value is {max_payload}') 179 return 180 to_send = token_bytes(payload_size) 181 data = TLV(TcatTLVType.PING.value, to_send).to_bytes() 182 elapsed_time = time() 183 response = await bless.send_with_resp(data) 184 elapsed_time = 1e3 * (time() - elapsed_time) 185 if not response: 186 return 187 188 tlv_response = TLV.from_bytes(response) 189 if tlv_response.value != to_send: 190 print("Received malformed response.") 191 192 print(f"Roundtrip time: {elapsed_time} ms") 193 194 return CommandResultTLV(tlv_response) 195 196 197class ThreadStartCommand(BleCommand): 198 199 def get_log_string(self) -> str: 200 return 'Enabling Thread...' 201 202 def get_help_string(self) -> str: 203 return 'Enable thread interface.' 204 205 def prepare_data(self, context): 206 return TLV(TcatTLVType.THREAD_START.value, bytes()).to_bytes() 207 208 209class ThreadStopCommand(BleCommand): 210 211 def get_log_string(self) -> str: 212 return 'Disabling Thread...' 213 214 def get_help_string(self) -> str: 215 return 'Disable thread interface.' 216 217 def prepare_data(self, context): 218 return TLV(TcatTLVType.THREAD_STOP.value, bytes()).to_bytes() 219 220 221class ThreadStateCommand(Command): 222 223 def __init__(self): 224 self._subcommands = {'start': ThreadStartCommand(), 'stop': ThreadStopCommand()} 225 226 def get_help_string(self) -> str: 227 return 'Manipulate state of the Thread interface of the connected device.' 228 229 async def execute_default(self, args, context): 230 print('Invalid usage. Provide a subcommand.') 231 return CommandResultNone() 232 233 234class ScanCommand(Command): 235 236 def get_help_string(self) -> str: 237 return 'Perform scan for TCAT devices.' 238 239 async def execute_default(self, args, context): 240 if not (context['ble_sstream'] is None): 241 del context['ble_sstream'] 242 243 tcat_devices = await ble_scanner.scan_tcat_devices() 244 device = select_device_by_user_input(tcat_devices) 245 246 if device is None: 247 return CommandResultNone() 248 249 ble_sstream = None 250 251 print(f'Connecting to {device}') 252 ble_stream = await BleStream.create(device.address, BBTC_SERVICE_UUID, BBTC_TX_CHAR_UUID, BBTC_RX_CHAR_UUID) 253 ble_sstream = BleStreamSecure(ble_stream) 254 cert_path = context['cmd_args'].cert_path if context['cmd_args'] else 'auth' 255 ble_sstream.load_cert( 256 certfile=path.join(cert_path, 'commissioner_cert.pem'), 257 keyfile=path.join(cert_path, 'commissioner_key.pem'), 258 cafile=path.join(cert_path, 'ca_cert.pem'), 259 ) 260 print('Setting up secure channel...') 261 if await ble_sstream.do_handshake(): 262 print('Done') 263 context['ble_sstream'] = ble_sstream 264 else: 265 print('Secure channel not established.') 266 await ble_stream.disconnect() 267