1"""
2  Copyright (c) 2024, The OpenThread Authors.
3  All rights reserved.
4
5  Redistribution and use in source and binary forms, with or without
6  modification, are permitted provided that the following conditions are met:
7  1. Redistributions of source code must retain the above copyright
8     notice, this list of conditions and the following disclaimer.
9  2. Redistributions in binary form must reproduce the above copyright
10     notice, this list of conditions and the following disclaimer in the
11     documentation and/or other materials provided with the distribution.
12  3. Neither the name of the copyright holder nor the
13     names of its contributors may be used to endorse or promote products
14     derived from this software without specific prior written permission.
15
16  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
20  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  POSSIBILITY OF SUCH DAMAGE.
27"""
28
29from cli.command import Command, CommandResultNone
30from dataset.dataset import ThreadDataset, initial_dataset
31from tlv.dataset_tlv import MeshcopTlvType
32from copy import deepcopy
33
34
35def handle_dataset_entry_command(type: MeshcopTlvType, args, context):
36    ds: ThreadDataset = context['dataset']
37    if len(args) == 0:
38        ds.get_entry(type).print_content()
39        return CommandResultNone()
40
41    ds.set_entry(type, args)
42    print('Done.')
43    return CommandResultNone()
44
45
46class DatasetClearCommand(Command):
47
48    def get_help_string(self) -> str:
49        return 'Clear dataset.'
50
51    async def execute_default(self, args, context):
52        ds: ThreadDataset = context['dataset']
53        ds.clear()
54        return CommandResultNone()
55
56
57class DatasetHelpCommand(Command):
58
59    def get_help_string(self) -> str:
60        return 'Display help message and return.'
61
62    async def execute_default(self, args, context):
63        indent_width = 4
64        indentation = ' ' * indent_width
65        commands: ThreadDataset = context['commands']
66        ds_command: Command = commands['dataset']
67        print(ds_command.get_help_string())
68        print('Subcommands:')
69        for name, subcommand in ds_command._subcommands.items():
70            print(f'{indentation}{name}')
71            print(f'{indentation}{" " * indent_width}{subcommand.get_help_string()}')
72        return CommandResultNone()
73
74
75class DatasetHexCommand(Command):
76
77    def get_help_string(self) -> str:
78        return 'Get or set dataset as hex-encoded TLVs.'
79
80    async def execute_default(self, args, context):
81        ds: ThreadDataset = context['dataset']
82        if args:
83            try:
84                ds_tmp = deepcopy(ds)
85                tlvs_str: str = args[0]
86                tlvs_bytes = bytes.fromhex(tlvs_str)
87                ds_tmp.set_from_bytes(tlvs_bytes)
88                ds.clear()
89                ds.set_from_bytes(tlvs_bytes)
90            except Exception as e:
91                print(e)
92        print(ds.to_bytes().hex())
93        return CommandResultNone()
94
95
96class ReloadDatasetCommand(Command):
97
98    def get_help_string(self) -> str:
99        return 'Reset dataset to the initial value.'
100
101    async def execute_default(self, args, context):
102        context['dataset'].set_from_bytes(initial_dataset)
103        return CommandResultNone()
104
105
106class ActiveTimestampCommand(Command):
107
108    def get_help_string(self) -> str:
109        return 'View and set ActiveTimestamp seconds. Arguments: [seconds (int)]'
110
111    async def execute_default(self, args, context):
112        return handle_dataset_entry_command(MeshcopTlvType.ACTIVETIMESTAMP, args, context)
113
114
115class PendingTimestampCommand(Command):
116
117    def get_help_string(self) -> str:
118        return 'View and set PendingTimestamp seconds. Arguments: [seconds (int)]'
119
120    async def execute_default(self, args, context):
121        return handle_dataset_entry_command(MeshcopTlvType.PENDINGTIMESTAMP, args, context)
122
123
124class NetworkKeyCommand(Command):
125
126    def get_help_string(self) -> str:
127        return 'View and set NetworkKey. Arguments: [nk (hexstring, len=32)]'
128
129    async def execute_default(self, args, context):
130        return handle_dataset_entry_command(MeshcopTlvType.NETWORKKEY, args, context)
131
132
133class NetworkNameCommand(Command):
134
135    def get_help_string(self) -> str:
136        return 'View and set NetworkName. Arguments: [nn (string, maxlen=16)]'
137
138    async def execute_default(self, args, context):
139        return handle_dataset_entry_command(MeshcopTlvType.NETWORKNAME, args, context)
140
141
142class ExtPanIDCommand(Command):
143
144    def get_help_string(self) -> str:
145        return 'View and set ExtPanID. Arguments: [extpanid (hexstring, len=16)]'
146
147    async def execute_default(self, args, context):
148        return handle_dataset_entry_command(MeshcopTlvType.EXTPANID, args, context)
149
150
151class MeshLocalPrefixCommand(Command):
152
153    def get_help_string(self) -> str:
154        return 'View and set MeshLocalPrefix. Arguments: [mlp (hexstring, len=16)]'
155
156    async def execute_default(self, args, context):
157        return handle_dataset_entry_command(MeshcopTlvType.MESHLOCALPREFIX, args, context)
158
159
160class DelayTimerCommand(Command):
161
162    def get_help_string(self) -> str:
163        return 'View and set DelayTimer delay. Arguments: [delay (int)]'
164
165    async def execute_default(self, args, context):
166        return handle_dataset_entry_command(MeshcopTlvType.DELAYTIMER, args, context)
167
168
169class PanIDCommand(Command):
170
171    def get_help_string(self) -> str:
172        return 'View and set PanID. Arguments: [panid (hexstring, len=4)]'
173
174    async def execute_default(self, args, context):
175        return handle_dataset_entry_command(MeshcopTlvType.PANID, args, context)
176
177
178class ChannelCommand(Command):
179
180    def get_help_string(self) -> str:
181        return 'View and set Channel. Arguments: [channel (int)]'
182
183    async def execute_default(self, args, context):
184        return handle_dataset_entry_command(MeshcopTlvType.CHANNEL, args, context)
185
186
187class ChannelMaskCommand(Command):
188
189    def get_help_string(self) -> str:
190        return 'View and set ChannelMask. Arguments: [mask (hexstring)]'
191
192    async def execute_default(self, args, context):
193        return handle_dataset_entry_command(MeshcopTlvType.CHANNELMASK, args, context)
194
195
196class PskcCommand(Command):
197
198    def get_help_string(self) -> str:
199        return 'View and set Pskc. Arguments: [pskc (hexstring, maxlen=32)]'
200
201    async def execute_default(self, args, context):
202        return handle_dataset_entry_command(MeshcopTlvType.PSKC, args, context)
203
204
205class SecurityPolicyCommand(Command):
206
207    def get_help_string(self) -> str:
208        return 'View and set SecurityPolicy. Arguments: '\
209               '[<rotation_time (int)> [flags (string)] [version_threshold (int)]]'
210
211    async def execute_default(self, args, context):
212        return handle_dataset_entry_command(MeshcopTlvType.SECURITYPOLICY, args, context)
213
214
215class DatasetCommand(Command):
216
217    def __init__(self):
218        self._subcommands = {
219            'clear': DatasetClearCommand(),
220            'help': DatasetHelpCommand(),
221            'hex': DatasetHexCommand(),
222            'reload': ReloadDatasetCommand(),
223            'activetimestamp': ActiveTimestampCommand(),
224            'pendingtimestamp': PendingTimestampCommand(),
225            'networkkey': NetworkKeyCommand(),
226            'networkname': NetworkNameCommand(),
227            'extpanid': ExtPanIDCommand(),
228            'meshlocalprefix': MeshLocalPrefixCommand(),
229            'delay': DelayTimerCommand(),
230            'panid': PanIDCommand(),
231            'channel': ChannelCommand(),
232            'channelmask': ChannelMaskCommand(),
233            'pskc': PskcCommand(),
234            'securitypolicy': SecurityPolicyCommand()
235        }
236
237    def get_help_string(self) -> str:
238        return 'View and manipulate current dataset. ' \
239            'Call without parameters to show current dataset.'
240
241    async def execute_default(self, args, context):
242        ds: ThreadDataset = context['dataset']
243        ds.print_content()
244        return CommandResultNone()
245