1"""
2  Copyright (c) 2024, The OpenThread Authors.
3  All rights reserved.
4
5  Redistribution and use in source and binary forms, with or without
6  modification, are permitted provided that the following conditions are met:
7  1. Redistributions of source code must retain the above copyright
8     notice, this list of conditions and the following disclaimer.
9  2. Redistributions in binary form must reproduce the above copyright
10     notice, this list of conditions and the following disclaimer in the
11     documentation and/or other materials provided with the distribution.
12  3. Neither the name of the copyright holder nor the
13     names of its contributors may be used to endorse or promote products
14     derived from this software without specific prior written permission.
15
16  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
20  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  POSSIBILITY OF SUCH DAMAGE.
27"""
28
29from abc import abstractmethod
30from ble.ble_connection_constants import BBTC_SERVICE_UUID, BBTC_TX_CHAR_UUID, \
31    BBTC_RX_CHAR_UUID
32from ble.ble_stream import BleStream
33from ble.ble_stream_secure import BleStreamSecure
34from ble import ble_scanner
35from tlv.tlv import TLV
36from tlv.tcat_tlv import TcatTLVType
37from cli.command import Command, CommandResultNone, CommandResultTLV
38from dataset.dataset import ThreadDataset
39from utils import select_device_by_user_input
40from os import path
41from time import time
42from secrets import token_bytes
43
44
45class HelpCommand(Command):
46
47    def get_help_string(self) -> str:
48        return 'Display help and return.'
49
50    async def execute_default(self, args, context):
51        commands = context['commands']
52        for name, command in commands.items():
53            print(f'{name}')
54            command.print_help(indent=1)
55        return CommandResultNone()
56
57
58class BleCommand(Command):
59
60    @abstractmethod
61    def get_log_string(self) -> str:
62        pass
63
64    @abstractmethod
65    def prepare_data(self, context):
66        pass
67
68    async def execute_default(self, args, context):
69        bless: BleStreamSecure = context['ble_sstream']
70
71        print(self.get_log_string())
72        data = self.prepare_data(context)
73        response = await bless.send_with_resp(data)
74        if not response:
75            return
76        tlv_response = TLV.from_bytes(response)
77        return CommandResultTLV(tlv_response)
78
79
80class HelloCommand(BleCommand):
81
82    def get_log_string(self) -> str:
83        return 'Sending hello world...'
84
85    def get_help_string(self) -> str:
86        return 'Send round trip "Hello world!" message.'
87
88    def prepare_data(self, context):
89        return TLV(TcatTLVType.APPLICATION.value, bytes('Hello world!', 'ascii')).to_bytes()
90
91
92class CommissionCommand(BleCommand):
93
94    def get_log_string(self) -> str:
95        return 'Commissioning...'
96
97    def get_help_string(self) -> str:
98        return 'Update the connected device with current dataset.'
99
100    def prepare_data(self, context):
101        dataset: ThreadDataset = context['dataset']
102        dataset_bytes = dataset.to_bytes()
103        return TLV(TcatTLVType.ACTIVE_DATASET.value, dataset_bytes).to_bytes()
104
105
106class DecommissionCommand(BleCommand):
107
108    def get_log_string(self) -> str:
109        return 'Disabling Thread and decommissioning device...'
110
111    def get_help_string(self) -> str:
112        return 'Stop Thread interface and decommission device from current network.'
113
114    def prepare_data(self, context):
115        return TLV(TcatTLVType.DECOMMISSION.value, bytes()).to_bytes()
116
117
118class GetDeviceIdCommand(BleCommand):
119
120    def get_log_string(self) -> str:
121        return 'Retrieving device id.'
122
123    def get_help_string(self) -> str:
124        return 'Get unique identifier for the TCAT device.'
125
126    def prepare_data(self, context):
127        return TLV(TcatTLVType.GET_DEVICE_ID.value, bytes()).to_bytes()
128
129
130class GetExtPanIDCommand(BleCommand):
131
132    def get_log_string(self) -> str:
133        return 'Retrieving extended PAN ID.'
134
135    def get_help_string(self) -> str:
136        return 'Get extended PAN ID that is commissioned in the active dataset.'
137
138    def prepare_data(self, context):
139        return TLV(TcatTLVType.GET_EXT_PAN_ID.value, bytes()).to_bytes()
140
141
142class GetProvisioningUrlCommand(BleCommand):
143
144    def get_log_string(self) -> str:
145        return 'Retrieving provisioning url.'
146
147    def get_help_string(self) -> str:
148        return 'Get a URL for an application suited to commission the TCAT device.'
149
150    def prepare_data(self, context):
151        return TLV(TcatTLVType.GET_PROVISIONING_URL.value, bytes()).to_bytes()
152
153
154class GetNetworkNameCommand(BleCommand):
155
156    def get_log_string(self) -> str:
157        return 'Retrieving network name.'
158
159    def get_help_string(self) -> str:
160        return 'Get the Thread network name that is commissioned in the active dataset.'
161
162    def prepare_data(self, context):
163        return TLV(TcatTLVType.GET_NETWORK_NAME.value, bytes()).to_bytes()
164
165
166class PingCommand(Command):
167
168    def get_help_string(self) -> str:
169        return 'Send echo request to TCAT device.'
170
171    async def execute_default(self, args, context):
172        bless: BleStreamSecure = context['ble_sstream']
173        payload_size = 10
174        max_payload = 512
175        if len(args) > 0:
176            payload_size = int(args[0])
177            if payload_size > max_payload:
178                print(f'Payload size too large. Maximum supported value is {max_payload}')
179                return
180        to_send = token_bytes(payload_size)
181        data = TLV(TcatTLVType.PING.value, to_send).to_bytes()
182        elapsed_time = time()
183        response = await bless.send_with_resp(data)
184        elapsed_time = 1e3 * (time() - elapsed_time)
185        if not response:
186            return
187
188        tlv_response = TLV.from_bytes(response)
189        if tlv_response.value != to_send:
190            print("Received malformed response.")
191
192        print(f"Roundtrip time: {elapsed_time} ms")
193
194        return CommandResultTLV(tlv_response)
195
196
197class ThreadStartCommand(BleCommand):
198
199    def get_log_string(self) -> str:
200        return 'Enabling Thread...'
201
202    def get_help_string(self) -> str:
203        return 'Enable thread interface.'
204
205    def prepare_data(self, context):
206        return TLV(TcatTLVType.THREAD_START.value, bytes()).to_bytes()
207
208
209class ThreadStopCommand(BleCommand):
210
211    def get_log_string(self) -> str:
212        return 'Disabling Thread...'
213
214    def get_help_string(self) -> str:
215        return 'Disable thread interface.'
216
217    def prepare_data(self, context):
218        return TLV(TcatTLVType.THREAD_STOP.value, bytes()).to_bytes()
219
220
221class ThreadStateCommand(Command):
222
223    def __init__(self):
224        self._subcommands = {'start': ThreadStartCommand(), 'stop': ThreadStopCommand()}
225
226    def get_help_string(self) -> str:
227        return 'Manipulate state of the Thread interface of the connected device.'
228
229    async def execute_default(self, args, context):
230        print('Invalid usage. Provide a subcommand.')
231        return CommandResultNone()
232
233
234class ScanCommand(Command):
235
236    def get_help_string(self) -> str:
237        return 'Perform scan for TCAT devices.'
238
239    async def execute_default(self, args, context):
240        if not (context['ble_sstream'] is None):
241            del context['ble_sstream']
242
243        tcat_devices = await ble_scanner.scan_tcat_devices()
244        device = select_device_by_user_input(tcat_devices)
245
246        if device is None:
247            return CommandResultNone()
248
249        ble_sstream = None
250
251        print(f'Connecting to {device}')
252        ble_stream = await BleStream.create(device.address, BBTC_SERVICE_UUID, BBTC_TX_CHAR_UUID, BBTC_RX_CHAR_UUID)
253        ble_sstream = BleStreamSecure(ble_stream)
254        cert_path = context['cmd_args'].cert_path if context['cmd_args'] else 'auth'
255        ble_sstream.load_cert(
256            certfile=path.join(cert_path, 'commissioner_cert.pem'),
257            keyfile=path.join(cert_path, 'commissioner_key.pem'),
258            cafile=path.join(cert_path, 'ca_cert.pem'),
259        )
260        print('Setting up secure channel...')
261        if await ble_sstream.do_handshake():
262            print('Done')
263            context['ble_sstream'] = ble_sstream
264        else:
265            print('Secure channel not established.')
266            await ble_stream.disconnect()
267