1#!/usr/bin/env python3
2#
3#  Copyright (c) 2020, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29import ipaddress
30import logging
31import re
32from collections import Counter
33from typing import Callable, List, Collection, Union, Tuple, Optional, Dict, Pattern, Any
34
35from . import connectors
36from .command_handlers import OTCommandHandler, OtCliCommandRunner, OtbrSshCommandRunner, OtbrAdbTcpCommandRunner, OtbrAdbUsbCommandRunner
37from .connectors import Simulator
38from .errors import UnexpectedCommandOutput, ExpectLineTimeoutError, CommandError, InvalidArgumentsError
39from .types import ChildId, Rloc16, Ip6Addr, ThreadState, PartitionId, DeviceMode, RouterId, SecurityPolicy, Ip6Prefix, \
40    RouterTableEntry, NetifIdentifier
41from .utils import match_line, constant_property
42
43
44class OTCI(object):
45    """
46    This class represents an OpenThread Controller Interface instance that provides versatile interfaces to
47    manipulate an OpenThread device.
48    """
49
50    DEFAULT_EXEC_COMMAND_RETRY = 4  # A command is retried 4 times if failed.
51
52    __exec_command_retry = DEFAULT_EXEC_COMMAND_RETRY
53
54    def __init__(self, otcmd: OTCommandHandler):
55        """
56        This method initializes an OTCI instance.
57
58        :param otcmd: An OpenThread Command Handler instance to execute OpenThread CLI commands.
59        """
60        self.__otcmd: OTCommandHandler = otcmd
61        self.__logger = logging.getLogger(name=str(self))
62
63    def __repr__(self):
64        """Gets the string representation of the OTCI instance."""
65        return repr(self.__otcmd)
66
67    def wait(self, duration: float, expect_line: Optional[Union[str, Pattern, Collection[Any]]] = None):
68        """Wait for a given duration.
69
70        :param duration: The duration (in seconds) wait for.
71        :param expect_line: The line expected to output if given.
72                            Raise ExpectLineTimeoutError if expect_line is not found within the given duration.
73        """
74        self.log('info', "wait for %.3f seconds", duration)
75        if expect_line is None:
76            self.__otcmd.wait(duration)
77        else:
78            success = False
79
80            while duration > 0:
81                output = self.__otcmd.wait(1)
82                if any(match_line(line, expect_line) for line in output):
83                    success = True
84                    break
85
86                duration -= 1
87
88            if not success:
89                raise ExpectLineTimeoutError(expect_line)
90
91    def close(self):
92        """Close the OTCI instance."""
93        self.__otcmd.close()
94
95    def execute_command(self,
96                        cmd: str,
97                        timeout: float = 10,
98                        silent: bool = False,
99                        already_is_ok: bool = True) -> List[str]:
100        for i in range(self.__exec_command_retry + 1):
101            try:
102                return self.__execute_command(cmd, timeout, silent, already_is_ok=already_is_ok)
103            except Exception:
104                self.wait(2)
105                if i == self.__exec_command_retry:
106                    raise
107        assert False
108
109    def __execute_command(self,
110                          cmd: str,
111                          timeout: float = 10,
112                          silent: bool = False,
113                          already_is_ok: bool = True) -> List[str]:
114        """Execute the OpenThread CLI command.
115
116        :param cmd: The command to execute.
117        :param timeout: The command timeout.
118        :param silent: Whether to run the command silent without logging.
119        :returns: The command output as a list of lines.
120        """
121        if not silent:
122            self.log('info', '> %s', cmd)
123
124        output = self.__otcmd.execute_command(cmd, timeout)
125
126        if not silent:
127            for line in output:
128                self.log('info', '%s', line)
129
130        if cmd in ('reset', 'factoryreset'):
131            return output
132
133        if output[-1] == 'Done' or (already_is_ok and output[-1] == 'Error 24: Already'):
134            output = output[:-1]
135            return output
136        else:
137            raise CommandError(cmd, output)
138
139    def execute_platform_command(self, cmd: str, timeout: float = 10, silent: bool = False) -> List[str]:
140        """Execute the platform command.
141
142        :param cmd: The command to execute.
143        :param timeout: The command timeout.
144        :param silent: Whether to run the command silent without logging.
145        :returns: The command output as a list of lines.
146        """
147        if not silent:
148            self.log('info', '> %s', cmd)
149
150        output = self.__otcmd.execute_platform_command(cmd, timeout)
151
152        if not silent:
153            for line in output:
154                self.log('info', '%s', line)
155
156        return output
157
158    def set_execute_command_retry(self, n: int):
159        assert n >= 0
160        self.__exec_command_retry = n
161
162    def shell(self, cmd: str, timeout: float = 10):
163        self.log('info', '# %s', cmd)
164        output = self.__otcmd.shell(cmd, timeout=timeout)
165        for line in output:
166            self.log('info', '%s', line)
167        return output
168
169    def set_logger(self, logger: logging.Logger):
170        """Set the logger for the OTCI instance, or None to disable logging."""
171        self.__logger = logger
172
173    def log(self, level, fmt, *args, **kwargs):
174        if self.__logger is not None:
175            getattr(self.__logger, level)('(%s) ' + fmt, repr(self), *args, **kwargs)
176
177    def set_line_read_callback(self, callback: Optional[Callable[[str], Any]]):
178        """Set the callback that will be called for each line output by the CLI."""
179        self.__otcmd.set_line_read_callback(callback)
180
181    #
182    # Constant properties
183    #
184    @constant_property
185    def version(self):
186        """Returns the firmware version. (e.g. "OPENTHREAD/20191113-01411-gb2d66e424-dirty; SIMULATION; Nov 14 2020 14:24:38")"""
187        return self.__parse_str(self.execute_command('version'))
188
189    @constant_property
190    def thread_version(self):
191        """Get the Thread Version number."""
192        return self.__parse_int(self.execute_command('thread version'))
193
194    @constant_property
195    def api_version(self):
196        """Get API version number."""
197        try:
198            return self.__parse_int(self.execute_command('version api'))
199        except ValueError:
200            # If the device does not have `version api` command, it will print the firmware version, which would lead to ValueError.
201            return 0
202
203    #
204    # Basic device operations
205    #
206    def ifconfig_up(self):
207        """Bring up the IPv6 interface."""
208        self.execute_command('ifconfig up')
209
210    def ifconfig_down(self):
211        """Bring down the IPv6 interface."""
212        self.execute_command('ifconfig down')
213
214    def get_ifconfig_state(self) -> bool:
215        """Get the status of the IPv6 interface."""
216        return self.__parse_values(self.execute_command('ifconfig'), up=True, down=False)
217
218    def thread_start(self):
219        """Enable Thread protocol operation and attach to a Thread network."""
220        self.execute_command('thread start')
221
222    def thread_stop(self):
223        """Disable Thread protocol operation and detach from a Thread network."""
224        self.execute_command('thread stop')
225
226    def reset(self):
227        """Signal a platform reset."""
228        self.execute_command('reset')
229
230    def factory_reset(self):
231        """Delete all stored settings, and signal a platform reset."""
232        self.execute_command('factoryreset')
233
234    #
235    # Network Operations
236    #
237    _PING_STATISTICS_PATTERN = re.compile(
238        r'^(?P<transmitted>\d+) packets transmitted, (?P<received>\d+) packets received.(?: Packet loss = (?P<loss>\d+\.\d+)%.)?(?: Round-trip min/avg/max = (?P<min>\d+)/(?P<avg>\d+\.\d+)/(?P<max>\d+) ms.)?$'
239    )
240
241    def ping(self,
242             ip: Union[str, Ip6Addr],
243             size: int = 8,
244             count: int = 1,
245             interval: float = 1,
246             hoplimit: int = 64,
247             timeout: float = 3) -> Dict:
248        """Send an ICMPv6 Echo Request.
249        The default arguments are consistent with https://github.com/openthread/openthread/blob/main/src/core/utils/ping_sender.hpp.
250
251        :param ip: The target IPv6 address to ping.
252        :param size: The number of data bytes in the payload. Default is 8.
253        :param count: The number of ICMPv6 Echo Requests to be sent. Default is 1.
254        :param interval: The interval between two consecutive ICMPv6 Echo Requests in seconds. The value may have fractional form, for example 0.5. Default is 1.
255        :param hoplimit: The hoplimit of ICMPv6 Echo Request to be sent. Default is 64. See OPENTHREAD_CONFIG_IP6_HOP_LIMIT_DEFAULT in src/core/config/ip6.h.
256        :param timeout: The maximum duration in seconds for the ping command to wait after the final echo request is sent. Default is 3.
257        """
258        cmd = f'ping {ip} {size} {count} {interval} {hoplimit} {timeout}'
259
260        timeout_allowance = 3
261        lines = self.execute_command(cmd, timeout=(count - 1) * interval + timeout + timeout_allowance)
262
263        statistics = {}
264        for line in lines:
265            m = OTCI._PING_STATISTICS_PATTERN.match(line)
266            if m is not None:
267                if m.group('transmitted') is not None:
268                    statistics['transmitted_packets'] = int(m.group('transmitted'))
269                    statistics['received_packets'] = int(m.group('received'))
270                if m.group('loss') is not None:
271                    statistics['packet_loss'] = float(m.group('loss')) / 100
272                if m.group('min') is not None:
273                    statistics['round_trip_time'] = {
274                        'min': int(m.group('min')),
275                        'avg': float(m.group('avg')),
276                        'max': int(m.group('max'))
277                    }
278        return statistics
279
280    def ping_stop(self):
281        """Stop sending ICMPv6 Echo Requests."""
282        self.execute_command('ping stop')
283
284    def discover(self, channel: Optional[int] = None) -> List[Dict[str, Any]]:
285        """Perform an MLE Discovery operation."""
286        return self.__scan_networks('discover', channel)
287
288    def scan(self, channel: Optional[int] = None) -> List[Dict[str, Any]]:
289        """Perform an IEEE 802.15.4 Active Scan."""
290        return self.__scan_networks('scan', channel)
291
292    def __scan_networks(self, cmd: str, channel: Optional[int] = None) -> List[Dict[str, Any]]:
293        if channel is not None:
294            cmd += f' {channel}'
295
296        output = self.execute_command(cmd, timeout=10)
297        if len(output) < 2:
298            raise UnexpectedCommandOutput(output)
299
300        networks = []
301        for line in output[2:]:
302            fields = line.strip().split('|')
303
304            try:
305                _, J, netname, extpanid, panid, extaddr, ch, dbm, lqi, _ = fields
306            except Exception:
307                logging.warning('ignored output: %r', line)
308                continue
309
310            networks.append({
311                'joinable': bool(int(J)),
312                'network_name': netname.strip(),
313                'extpanid': extpanid,
314                'panid': int(panid, 16),
315                'extaddr': extaddr,
316                'channel': int(ch),
317                'dbm': int(dbm),
318                'lqi': int(lqi),
319            })
320
321        return networks
322
323    def scan_energy(self, duration: Optional[float] = None, channel: Optional[int] = None) -> Dict[int, int]:
324        """Perform an IEEE 802.15.4 Energy Scan."""
325        cmd = 'scan energy'
326        if duration is not None:
327            cmd += f' {duration * 1000:d}'
328
329        if channel is not None:
330            cmd += f' {channel}'
331
332        output = self.execute_command(cmd, timeout=10)
333        if len(output) < 2:
334            raise UnexpectedCommandOutput(output)
335
336        channels = {}
337        for line in output[2:]:
338            fields = line.strip().split('|')
339
340            _, Ch, RSSI, _ = fields
341            channels[int(Ch)] = int(RSSI)
342
343        return channels
344
345    def mac_send_data_request(self):
346        """Instruct an Rx-Off-When-Idle device to send a Data Request mac frame to its parent."""
347        self.execute_command('mac send datarequest')
348
349    def mac_send_empty_data(self):
350        """Instruct an Rx-Off-When-Idle device to send a Empty Data mac frame to its parent."""
351        self.execute_command('mac send emptydata')
352
353    # TODO: discover
354    # TODO: dns resolve <hostname> [DNS server IP] [DNS server port]
355    # TODO: fake /a/an <dst-ipaddr> <target> <meshLocalIid>
356    # TODO: sntp query
357
358    #
359    # Set or get device/network parameters
360    #
361
362    def get_mode(self) -> str:
363        """Get the Thread Device Mode value.
364
365            -: no flags set (rx-off-when-idle, minimal Thread device, stable network data)
366            r: rx-on-when-idle
367            d: Full Thread Device
368            n: Full Network Data
369        """
370        return self.__parse_str(self.execute_command('mode'))
371
372    def set_mode(self, mode: str):
373        """Set the Thread Device Mode value.
374
375            -: no flags set (rx-off-when-idle, minimal Thread device, stable network data)
376            r: rx-on-when-idle
377            d: Full Thread Device
378            n: Full Network Data
379        """
380        self.execute_command(f'mode {DeviceMode(mode)}')
381
382    def get_extaddr(self) -> str:
383        """Get the IEEE 802.15.4 Extended Address."""
384        return self.__parse_extaddr(self.execute_command('extaddr'))
385
386    def set_extaddr(self, extaddr: str):
387        """Set the IEEE 802.15.4 Extended Address."""
388        self.__validate_hex64b(extaddr)
389        self.execute_command(f'extaddr {extaddr}')
390
391    def get_eui64(self) -> str:
392        """Get the factory-assigned IEEE EUI-64."""
393        return self.__parse_eui64(self.execute_command('eui64'))
394
395    def set_extpanid(self, extpanid: str):
396        """Set the Thread Extended PAN ID value."""
397        self.__validate_extpanid(extpanid)
398        self.execute_command(f'extpanid {extpanid}')
399
400    def get_extpanid(self) -> str:
401        """Get the Thread Extended PAN ID value."""
402        return self.__parse_extpanid(self.execute_command('extpanid'))
403
404    def set_channel(self, ch):
405        """Set the IEEE 802.15.4 Channel value."""
406        self.execute_command('channel %d' % ch)
407
408    def get_channel(self):
409        """Get the IEEE 802.15.4 Channel value."""
410        return self.__parse_int(self.execute_command('channel'))
411
412    def get_preferred_channel_mask(self) -> int:
413        """Get preferred channel mask."""
414        return self.__parse_int(self.execute_command('channel preferred'))
415
416    def get_supported_channel_mask(self):
417        """Get supported channel mask."""
418        return self.__parse_int(self.execute_command('channel supported'))
419
420    def get_panid(self):
421        """Get the IEEE 802.15.4 PAN ID value."""
422        return self.__parse_int(self.execute_command('panid'), 16)
423
424    def set_panid(self, panid):
425        """Get the IEEE 802.15.4 PAN ID value."""
426        self.execute_command('panid %d' % panid)
427
428    def set_network_name(self, name):
429        """Set network name."""
430        self.execute_command('networkname %s' % self.__escape_escapable(name))
431
432    def get_network_name(self):
433        """Get network name."""
434        return self.__parse_str(self.execute_command('networkname'))
435
436    def get_network_key(self) -> str:
437        """Get the network key."""
438        return self.__parse_network_key(self.execute_command(self.__detect_networkkey_cmd()))
439
440    def set_network_key(self, networkkey: str):
441        """Set the network key."""
442        self.__validate_network_key(networkkey)
443        cmd = self.__detect_networkkey_cmd()
444        self.execute_command(f'{cmd} {networkkey}')
445
446    def get_key_sequence_counter(self) -> int:
447        """Get the Thread Key Sequence Counter."""
448        return self.__parse_int(self.execute_command('keysequence counter'))
449
450    def set_key_sequence_counter(self, counter: int):
451        """Set the Thread Key Sequence Counter."""
452        self.execute_command(f'keysequence counter {counter}')
453
454    def get_key_sequence_guard_time(self) -> int:
455        """Get Thread Key Switch Guard Time (in hours)."""
456        return self.__parse_int(self.execute_command('keysequence guardtime'))
457
458    def set_key_sequence_guard_time(self, hours: int):
459        """Set Thread Key Switch Guard Time (in hours) 0 means Thread Key Switch immediately if key index match."""
460        self.execute_command(f'keysequence guardtime {hours}')
461
462    def get_cca_threshold(self) -> int:
463        """Get the CCA threshold in dBm measured at antenna connector per IEEE 802.15.4 - 2015 section 10.1.4."""
464        output = self.execute_command(f'ccathreshold')
465        val = self.__parse_str(output)
466        if not val.endswith(' dBm'):
467            raise UnexpectedCommandOutput(output)
468
469        return int(val[:-4])
470
471    def set_cca_threshold(self, val: int):
472        """Set the CCA threshold measured at antenna connector per IEEE 802.15.4 - 2015 section 10.1.4."""
473        self.execute_command(f'ccathreshold {val}')
474
475    def get_promiscuous(self) -> bool:
476        """Get radio promiscuous property."""
477        return self.__parse_Enabled_or_Disabled(self.execute_command('promiscuous'))
478
479    def enable_promiscuous(self):
480        """Enable radio promiscuous operation and print raw packet content."""
481        self.execute_command('promiscuous enable')
482
483    def disable_promiscuous(self):
484        """Disable radio promiscuous operation."""
485        self.execute_command('promiscuous disable')
486
487    def get_txpower(self) -> int:
488        """Get the transmit power in dBm."""
489        line = self.__parse_str(self.execute_command('txpower'))
490        if not line.endswith(' dBm'):
491            raise UnexpectedCommandOutput([line])
492
493        return int(line.split()[0])
494
495    def set_txpower(self, val: int):
496        """Set the transmit power in dBm."""
497        self.execute_command(f'txpower {val}')
498
499    # TODO: fem
500    # TODO: fem lnagain
501    # TODO: fem lnagain <LNA gain>
502    # TODO: mac retries direct
503    # TODO: mac retries direct
504    # TODO: mac retries indirect
505    # TODO: mac retries indirect <number>
506
507    #
508    # Basic Node states and properties
509    #
510
511    def get_state(self) -> ThreadState:
512        """Get the current Thread state."""
513        return ThreadState(self.__parse_str(self.execute_command('state')))
514
515    def set_state(self, state: str):
516        """Try to switch to state detached, child, router or leader."""
517        self.execute_command(f'state {state}')
518
519    def get_rloc16(self) -> Rloc16:
520        """Get the Thread RLOC16 value."""
521        return Rloc16(self.__parse_int(self.execute_command('rloc16'), 16))
522
523    def get_router_id(self) -> int:
524        """Get the Thread Router ID value."""
525        return self.get_rloc16() >> 10
526
527    def prefer_router_id(self, routerid: int):
528        """Prefer a Router ID when solicit router id from Leader."""
529        self.execute_command(f'preferrouterid {routerid}')
530
531    def is_singleton(self) -> bool:
532        return self.__parse_values(self.execute_command('singleton'), true=True, false=False)
533
534    #
535    # RCP related utilities
536    #
537
538    def get_rcp_version(self):
539        return self.__parse_str(self.execute_command('rcp version'))
540
541    #
542    # Unsecure port utilities
543    #
544
545    def get_unsecure_ports(self) -> List[int]:
546        """all ports from the allowed unsecured port list."""
547        return self.__parse_int_list(self.execute_command('unsecureport get'))
548
549    def add_unsecure_port(self, port: int):
550        """Add a port to the allowed unsecured port list."""
551        self.execute_command(f'unsecureport add {port}')
552
553    def remove_unsecure_port(self, port: int):
554        """Remove a port from the allowed unsecured port list."""
555        self.execute_command(f'unsecureport remove {port}')
556
557    def clear_unsecure_ports(self):
558        """Remove all ports from the allowed unsecured port list."""
559        self.execute_command('unsecureport remove all')
560
561    #
562    # Leader configurations
563    #
564
565    def get_preferred_partition_id(self) -> PartitionId:
566        """Get the preferred Thread Leader Partition ID."""
567        return PartitionId(self.__parse_int(self.execute_command(self.__get_partition_preferred_cmd())))
568
569    def set_preferred_partition_id(self, parid: int):
570        """Set the preferred Thread Leader Partition ID."""
571        self.execute_command(f'{self.__get_partition_preferred_cmd()} {parid}')
572
573    def __get_partition_preferred_cmd(self) -> str:
574        """"""
575        return 'partitionid preferred' if self.api_version >= 51 else 'leaderpartitionid'
576
577    def get_leader_weight(self) -> int:
578        """Get the Thread Leader Weight."""
579        return self.__parse_int(self.execute_command('leaderweight'))
580
581    def set_leader_weight(self, weight: int):
582        """Set the Thread Leader Weight."""
583        self.execute_command(f'leaderweight {weight}')
584
585    __LEADER_DATA_KEY_MAP = {
586        'Partition ID': 'partition_id',
587        'Weighting': 'weight',
588        'Data Version': 'data_ver',
589        'Stable Data Version': 'stable_data_ver',
590        'Leader Router ID': 'leader_id',
591    }
592
593    def get_leader_data(self) -> Dict[str, int]:
594        """Get the Thread Leader Data."""
595        data = {}
596        output = self.execute_command('leaderdata')
597
598        try:
599            for line in output:
600                k, v = line.split(': ')
601                data[OTCI.__LEADER_DATA_KEY_MAP[k]] = int(v)
602        except KeyError:
603            raise UnexpectedCommandOutput(output)
604
605        return data
606
607    #
608    # Router configurations
609    #
610
611    def get_router_selection_jitter(self):
612        """Get the ROUTER_SELECTION_JITTER value."""
613        return self.__parse_int(self.execute_command('routerselectionjitter'))
614
615    def set_router_selection_jitter(self, jitter):
616        """Set the ROUTER_SELECTION_JITTER value."""
617        self.execute_command(f'routerselectionjitter {jitter}')
618
619    def get_network_id_timeout(self) -> int:
620        """Get the NETWORK_ID_TIMEOUT parameter used in the Router role."""
621        return self.__parse_int(self.execute_command('networkidtimeout'))
622
623    def set_network_id_timeout(self, timeout: int):
624        """Set the NETWORK_ID_TIMEOUT parameter used in the Router role."""
625        self.execute_command(f'networkidtimeout {timeout}')
626
627    def get_parent_priority(self) -> int:
628        """Get the assigned parent priority value, -2 means not assigned."""
629        return self.__parse_int(self.execute_command('parentpriority'))
630
631    def set_parent_priority(self, priority: int):
632        """Set the assigned parent priority value: 1, 0, -1 or -2."""
633        self.execute_command(f'parentpriority {priority}')
634
635    def get_router_upgrade_threshold(self) -> int:
636        """Get the ROUTER_UPGRADE_THRESHOLD value."""
637        return self.__parse_int(self.execute_command('routerupgradethreshold'))
638
639    def set_router_upgrade_threshold(self, threshold: int):
640        """Set the ROUTER_UPGRADE_THRESHOLD value."""
641        self.execute_command(f'routerupgradethreshold {threshold}')
642
643    def get_router_downgrade_threshold(self):
644        """Set the ROUTER_DOWNGRADE_THRESHOLD value."""
645        return self.__parse_int(self.execute_command('routerdowngradethreshold'))
646
647    def set_router_downgrade_threshold(self, threshold: int):
648        """Get the ROUTER_DOWNGRADE_THRESHOLD value."""
649        self.execute_command(f'routerdowngradethreshold {threshold}')
650
651    def get_router_eligible(self) -> bool:
652        """Indicates whether the router role is enabled or disabled."""
653        return self.__parse_Enabled_or_Disabled(self.execute_command('routereligible'))
654
655    def enable_router_eligible(self):
656        """Disable the router role."""
657        self.execute_command('routereligible enable')
658
659    def disable_router_eligible(self):
660        """Disable the router role."""
661        self.execute_command('routereligible disable')
662
663    def get_router_list(self) -> List[RouterId]:
664        """Get allocated Router IDs."""
665        line = self.__parse_str(self.execute_command('router list'))
666        return list(map(RouterId, line.strip().split()))
667
668    def get_router_table(self) -> Dict[RouterId, RouterTableEntry]:
669        """table of routers."""
670        output = self.execute_command('router table')
671        if len(output) < 2:
672            raise UnexpectedCommandOutput(output)
673
674        #
675        # Example output:
676        #
677        # | ID | RLOC16 | Next Hop | Path Cost | LQ In | LQ Out | Age | Extended MAC     |
678        # +----+--------+----------+-----------+-------+--------+-----+------------------+
679        # | 21 | 0x5400 |       21 |         0 |     3 |      3 |   5 | d28d7f875888fccb |
680        # | 56 | 0xe000 |       56 |         0 |     0 |      0 | 182 | f2d92a82c8d8fe43 |
681        # Done
682        #
683
684        headers = self.__split_table_row(output[0])
685
686        table = {}
687        for line in output[2:]:
688            line = line.strip()
689            if not line:
690                continue
691
692            fields = self.__split_table_row(line)
693            if len(fields) != len(headers):
694                raise UnexpectedCommandOutput(output)
695
696            col = lambda colname: self.__get_table_col(colname, headers, fields)
697            id = col('ID')
698
699            table[RouterId(id)] = router = RouterTableEntry({
700                'id': RouterId(id),
701                'rloc16': Rloc16(col('RLOC16'), 16),
702                'next_hop': int(col('Next Hop')),
703                'path_cost': int(col('Path Cost')),
704                'lq_in': int(col('LQ In')),
705                'lq_out': int(col('LQ Out')),
706                'age': int(col('Age')),
707                'extaddr': col('Extended MAC'),
708            })
709
710            if 'Link' in headers:
711                router['link'] = int(col('Link'))
712            else:
713                # support older version of OT which does not output `Link` field
714                router['link'] = self.get_router_info(router['id'], silent=True)['link']
715
716        return table
717
718    def get_router_info(self, id: int, silent: bool = False) -> RouterTableEntry:
719        cmd = f'router {id}'
720        info = {}
721        output = self.execute_command(cmd, silent=silent)
722        items = [line.strip().split(': ') for line in output]
723
724        headers = [h for h, _ in items]
725        fields = [f for _, f in items]
726        col = lambda colname: self.__get_table_col(colname, headers, fields)
727
728        return RouterTableEntry({
729            'id': RouterId(id),
730            'rloc16': Rloc16(col('Rloc'), 16),
731            'alloc': int(col('Alloc')),
732            'next_hop': int(col('Next Hop'), 16) >> 10,  # convert RLOC16 to Router ID
733            'link': int(col('Link')),
734        })
735
736    #
737    # Router utilities: Child management
738    #
739
740    def get_child_table(self) -> Dict[ChildId, Dict[str, Any]]:
741        """Get the table of attached children."""
742        output = self.execute_command('child table')
743        if len(output) < 2:
744            raise UnexpectedCommandOutput(output)
745
746        #
747        # Example output:
748        # | ID  | RLOC16 | Timeout    | Age        | LQ In | C_VN |R|D|N|Ver|CSL|QMsgCnt| Extended MAC     |
749        # +-----+--------+------------+------------+-------+------+-+-+-+---+---+-------+------------------+
750        # |   1 | 0xc801 |        240 |         24 |     3 |  131 |1|0|0|  3| 0 |     0 | 4ecede68435358ac |
751        # |   2 | 0xc802 |        240 |          2 |     3 |  131 |0|0|0|  3| 1 |     0 | a672a601d2ce37d8 |
752        # Done
753        #
754
755        headers = self.__split_table_row(output[0])
756
757        table = {}
758        for line in output[2:]:
759            line = line.strip()
760            if not line:
761                continue
762
763            fields = self.__split_table_row(line)
764            col = lambda colname: self.__get_table_col(colname, headers, fields)
765
766            id = int(col("ID"))
767            r, d, n = int(col("R")), int(col("D")), int(col("N"))
768
769            #
770            # Device mode flags:
771            #
772            # r: rx-on-when-idle
773            # d: Full Thread Device
774            # n: Full Network Data
775            # -: no flags set (rx-off-when-idle, minimal Thread device, stable network data)
776            mode = DeviceMode(
777                f'{"r" if r else ""}{"d" if d else ""}{"n" if n else ""}{"-" if r == d == n == 0 else ""}')
778
779            child = {
780                'id': ChildId(id),
781                'rloc16': Rloc16(col('RLOC16'), 16),
782                'timeout': int(col('Timeout')),
783                'age': int(col('Age')),
784                'lq_in': int(col('LQ In')),
785                'c_vn': int(col('C_VN')),
786                'mode': mode,
787                'extaddr': col('Extended MAC')
788            }
789
790            if 'Ver' in headers:
791                child['ver'] = int(col('Ver'))
792
793            if 'CSL' in headers:
794                child['csl'] = bool(int(col('CSL')))
795
796            if 'QMsgCnt' in headers:
797                child['qmsgcnt'] = int(col('QMsgCnt'))
798
799            if 'Suprvsn' in headers:
800                child['suprvsn'] = int(col('Suprvsn'))
801
802            table[ChildId(id)] = child
803
804        return table
805
806    #
807    # DNS server & client utilities
808    #
809
810    _IPV6_SERVER_PORT_PATTERN = re.compile(r'\[(.*)\]:(\d+)')
811
812    def dns_get_config(self):
813        """Get DNS client query config."""
814        output = self.execute_command('dns config')
815        config = {}
816        for line in output:
817            k, v = line.split(': ')
818            if k == 'Server':
819                matched = re.match(OTCI._IPV6_SERVER_PORT_PATTERN, v)
820                assert matched is not None
821                ip, port = matched.groups()
822                config['server'] = (Ip6Addr(ip), int(port))
823            elif k == 'ResponseTimeout':
824                config['response_timeout'] = int(v[:-3])
825            elif k == 'MaxTxAttempts':
826                config['max_tx_attempts'] = int(v)
827            elif k == 'RecursionDesired':
828                config['recursion_desired'] = (v == 'yes')
829            else:
830                logging.warning("dns config ignored: %s", line)
831
832        return config
833
834    def dns_set_config(self,
835                       server: Tuple[Union[str, ipaddress.IPv6Address], int],
836                       response_timeout: Optional[int] = None,
837                       max_tx_attempts: Optional[int] = None,
838                       recursion_desired: Optional[bool] = None):
839        """Set DNS client query config."""
840        cmd = f'dns config {str(server[0])} {server[1]}'
841        if response_timeout is not None:
842            cmd += f' {response_timeout}'
843
844        assert max_tx_attempts is None or response_timeout is not None, "must specify `response_timeout` if `max_tx_attempts` is specified."
845        if max_tx_attempts is not None:
846            cmd += f' {max_tx_attempts}'
847
848        assert recursion_desired is None or max_tx_attempts is not None, 'must specify `max_tx_attempts` if `recursion_desired` is specified.'
849        if recursion_desired is not None:
850            cmd += f' {1 if recursion_desired else 0}'
851
852        self.execute_command(cmd)
853
854    def dns_get_compression(self) -> bool:
855        """Get DNS compression mode."""
856        return self.__parse_Enabled_or_Disabled(self.execute_command('dns compression'))
857
858    def dns_enable_compression(self):
859        """Enable DNS compression mode."""
860        self.execute_command('dns compression enable')
861
862    def dns_disable_compression(self):
863        """Disable DNS compression mode."""
864        self.execute_command('dns compression disable')
865
866    def dns_browse(self, service: str) -> List[Dict]:
867        """Browse DNS service instances."""
868        cmd = f'dns browse {service}'
869        output = '\n'.join(self.execute_command(cmd, 30.0))
870
871        result = []
872        for ins, port, priority, weight, srv_ttl, hostname, address, aaaa_ttl, txt_data, txt_ttl in re.findall(
873                r'(.*?)\s+Port:(\d+), Priority:(\d+), Weight:(\d+), TTL:(\d+)\s*Host:(\S+)\s+HostAddress:(\S+) TTL:(\d+)\s+TXT:(\[.*?\]) TTL:(\d+)',
874                output):
875            result.append({
876                'instance': ins,
877                'service': service,
878                'port': int(port),
879                'priority': int(priority),
880                'weight': int(weight),
881                'host': hostname,
882                'address': Ip6Addr(address),
883                'txt': self.__parse_srp_server_service_txt(txt_data),
884                'srv_ttl': int(srv_ttl),
885                'txt_ttl': int(txt_ttl),
886                'aaaa_ttl': int(aaaa_ttl),
887            })
888
889        return result
890
891    def dns_resolve(self, hostname: str) -> List[Dict]:
892        """Resolve a DNS host name."""
893        cmd = f'dns resolve {hostname}'
894        output = self.execute_command(cmd, 30.0)
895        dns_resp = output[0]
896        addrs = dns_resp.strip().split(' - ')[1].split(' ')
897        ips = [Ip6Addr(item.strip()) for item in addrs[::2]]
898        ttls = [int(item.split('TTL:')[1]) for item in addrs[1::2]]
899
900        return [{
901            'address': ip,
902            'ttl': ttl,
903        } for ip, ttl in zip(ips, ttls)]
904
905    def dns_resolve_service(self, instance: str, service: str) -> Dict:
906        """Resolves aservice instance."""
907        instance = self.__escape_escapable(instance)
908        cmd = f'dns service {instance} {service}'
909        output = self.execute_command(cmd, 30.0)
910
911        m = re.match(
912            r'.*Port:(\d+), Priority:(\d+), Weight:(\d+), TTL:(\d+)\s+Host:(.*?)\s+HostAddress:(\S+) TTL:(\d+)\s+TXT:(\[.*?\]) TTL:(\d+)',
913            '\t'.join(output))
914        if m:
915            port, priority, weight, srv_ttl, hostname, address, aaaa_ttl, txt_data, txt_ttl = m.groups()
916            return {
917                'instance': instance,
918                'service': service,
919                'port': int(port),
920                'priority': int(priority),
921                'weight': int(weight),
922                'host': hostname,
923                'address': Ip6Addr(address),
924                'txt': self.__parse_srp_server_service_txt(txt_data),
925                'srv_ttl': int(srv_ttl),
926                'txt_ttl': int(txt_ttl),
927                'aaaa_ttl': int(aaaa_ttl),
928            }
929        else:
930            raise CommandError(cmd, output)
931
932    #
933    # SRP server & client utilities
934    #
935
936    def srp_server_get_state(self):
937        """Get the SRP server state"""
938        return self.__parse_str(self.execute_command('srp server state'))
939
940    def srp_server_enable(self):
941        """Enable SRP server."""
942        self.execute_command('srp server enable')
943
944    def srp_server_disable(self):
945        """Disable SRP server."""
946        self.execute_command('srp server disable')
947
948    def srp_server_get_domain(self) -> str:
949        """Get the SRP server domain."""
950        return self.__parse_str(self.execute_command('srp server domain'))
951
952    def srp_server_set_domain(self, domain: str):
953        """Set the SRP server domain."""
954        self.execute_command(f'srp server domain {domain}')
955
956    def srp_server_get_hosts(self) -> List[Dict]:
957        """Get SRP server registered hosts."""
958        return self.__parse_srp_server_hosts(self.execute_command('srp server host'))
959
960    def srp_server_get_services(self) -> List[Dict]:
961        """Get SRP server registered services."""
962        output = self.execute_command('srp server service')
963        return self.__parse_srp_server_services(output)
964
965    def __parse_srp_server_hosts(self, output: List[str]) -> List[Dict]:
966        result = []
967        info = None
968        for line in output:
969            if not line.startswith(' '):
970                info = {'host': line}
971                result.append(info)
972            else:
973                assert info is not None
974                k, v = line.strip().split(': ')
975                if k == 'deleted':
976                    if v not in ('true', 'false'):
977                        raise UnexpectedCommandOutput(output)
978
979                    info['deleted'] = (v == 'true')
980
981                elif k == 'addresses':
982                    if not v.startswith('[') or not v.endswith(']'):
983                        raise UnexpectedCommandOutput(output)
984
985                    v = v[1:-1]
986                    info['addresses'] = list(map(Ip6Addr, v.split(', ')))
987                else:
988                    raise UnexpectedCommandOutput(output)
989
990        return result
991
992    def __parse_srp_server_services(self, output: List[str]) -> List[Dict]:
993        result = []
994        info = None
995        for line in output:
996            if not line.startswith(' '):
997                info = {'instance': line}
998                result.append(info)
999            else:
1000                assert info is not None
1001                k, v = line.strip().split(': ')
1002                if k == 'deleted':
1003                    if v not in ('true', 'false'):
1004                        raise UnexpectedCommandOutput(output)
1005
1006                    info['deleted'] = (v == 'true')
1007
1008                elif k == 'addresses':
1009                    if not v.startswith('[') or not v.endswith(']'):
1010                        raise UnexpectedCommandOutput(output)
1011
1012                    v = v[1:-1]
1013                    info['addresses'] = list(map(Ip6Addr, v.split(', ')))
1014                elif k == 'subtypes':
1015                    info[k] = list() if v == '(null)' else list(v.split(','))
1016                elif k in ('port', 'weight', 'priority', 'ttl', 'lease', 'key-lease'):
1017                    info[k] = int(v)
1018                elif k in ('host',):
1019                    info[k] = v
1020                elif k == 'TXT':
1021                    info['txt'] = self.__parse_srp_server_service_txt(v)
1022                else:
1023                    raise UnexpectedCommandOutput(output)
1024
1025        return result
1026
1027    def __parse_srp_server_service_txt(self, txt: str) -> Dict[str, Union[bytes, bool]]:
1028        # example value: [txt11=76616c3131, txt12=76616c3132]
1029        assert txt.startswith('[') and txt.endswith(']')
1030        txt_dict = {}
1031        for entry in txt[1:-1].split(', '):
1032            if not entry:
1033                continue
1034
1035            equal_pos = entry.find('=')
1036
1037            if equal_pos != -1:
1038                k, v = entry[:equal_pos], entry[equal_pos + 1:]
1039                txt_dict[k] = bytes(int(v[i:i + 2], 16) for i in range(0, len(v), 2))
1040            else:
1041                txt_dict[entry] = True
1042
1043        return txt_dict
1044
1045    def srp_server_get_lease(self) -> Tuple[int, int, int, int]:
1046        """Get SRP server LEASE & KEY-LEASE range (in seconds)."""
1047        lines = self.execute_command(f'srp server lease')
1048        return tuple([int(line.split(':')[1].strip()) for line in lines])
1049
1050    def srp_server_set_lease(self, min_lease: int, max_lease: int, min_key_lease: int, max_key_lease: int):
1051        """Configure SRP server LEASE & KEY-LEASE range (in seconds)."""
1052        self.execute_command(f'srp server lease {min_lease} {max_lease} {min_key_lease} {max_key_lease}')
1053
1054    def srp_client_get_state(self) -> bool:
1055        """Get SRP client state."""
1056        return self.__parse_Enabled_or_Disabled(self.execute_command('srp client state'))
1057
1058    def srp_client_start(self, server_ip: Union[str, ipaddress.IPv6Address], server_port: int):
1059        """Start SRP client."""
1060        self.execute_command(f'srp client start {str(server_ip)} {server_port}')
1061
1062    def srp_client_stop(self):
1063        """Stop SRP client."""
1064        self.execute_command('srp client stop')
1065
1066    def srp_client_get_autostart(self) -> bool:
1067        """Get SRP client autostart mode."""
1068        return self.__parse_Enabled_or_Disabled(self.execute_command('srp client autostart'))
1069
1070    def srp_client_enable_autostart(self):
1071        """Enable SRP client autostart mode."""
1072        self.execute_command('srp client autostart enable')
1073
1074    def srp_client_disable_autostart(self):
1075        """Disable SRP client autostart mode."""
1076        self.execute_command('srp client autostart disable')
1077
1078    def srp_client_get_callback(self) -> bool:
1079        """Get SRP client callback mode."""
1080        return self.__parse_Enabled_or_Disabled(self.execute_command('srp client callback'))
1081
1082    def srp_client_enable_callback(self):
1083        """Enable SRP client callback mode."""
1084        self.execute_command('srp client callback enable')
1085
1086    def srp_client_disable_callback(self):
1087        """Disable SRP client callback mode."""
1088        self.execute_command('srp client callback disable')
1089
1090    def srp_client_set_host_name(self, name: str):
1091        """Set SRP client host name."""
1092        self.execute_command(f'srp client host name {name}')
1093
1094    def srp_client_get_host(self) -> Dict:
1095        """Get SRP client host."""
1096        output = self.__parse_str(self.execute_command('srp client host'))
1097        return self.__parse_srp_client_host(output)
1098
1099    _SRP_CLIENT_HOST_PATTERN = re.compile(r'name:("(.*)"|(\(null\))), state:(\S+), addrs:\[(.*)\]')
1100
1101    def __parse_srp_client_host(self, line: str) -> Dict:
1102        m = re.match(OTCI._SRP_CLIENT_HOST_PATTERN, line)
1103        if not m:
1104            raise UnexpectedCommandOutput([line])
1105
1106        _, host, _, state, addrs = m.groups()
1107        return {
1108            'host': host or '',
1109            'state': state,
1110            'addresses': [Ip6Addr(ip) for ip in addrs.split(', ')] if addrs else [],
1111        }
1112
1113    def srp_client_get_host_name(self) -> str:
1114        """Get SRP client host name."""
1115        name = self.__parse_str(self.execute_command('srp client host name'))
1116        return name if name != '(null)' else ''
1117
1118    def srp_client_get_host_addresses(self) -> List[Ip6Addr]:
1119        """Get SRP client host addresses."""
1120        return self.__parse_ip6addr_list(self.execute_command('srp client host address'))
1121
1122    def srp_client_set_host_addresses(self, *addrs: Union[str, ipaddress.IPv6Address]):
1123        """Set SRP client host addresses."""
1124        self.execute_command(f'srp client host address {" ".join(map(str, addrs))}')
1125
1126    def srp_client_get_host_state(self):
1127        """Get SRP client host state."""
1128        return self.__parse_str(self.execute_command('srp client host state'))
1129
1130    def srp_client_remove_host(self, remove_key_lease=False):
1131        """Remove SRP client host."""
1132        cmd = 'srp client host remove'
1133        if remove_key_lease:
1134            cmd += ' 1'
1135
1136        self.execute_command(cmd)
1137
1138    def srp_client_get_services(self) -> List[Dict]:
1139        """Get SRP client services."""
1140        output = self.execute_command('srp client service')
1141        return [self.__parse_srp_client_service(line) for line in output]
1142
1143    _SRP_CLIENT_SERVICE_PATTERN = re.compile(
1144        r'instance:"(.*)", name:"(.*)", state:(\S+), port:(\d+), priority:(\d+), weight:(\d+)')
1145
1146    def __parse_srp_client_service(self, line: str) -> Dict:
1147        # e.g. instance:"ins2", name:"_meshcop._udp", state:ToAdd, port:2000, priority:2, weight:2
1148        m = OTCI._SRP_CLIENT_SERVICE_PATTERN.match(line)
1149        if m is None:
1150            raise UnexpectedCommandOutput([line])
1151
1152        instance, service, state, port, priority, weight = m.groups()
1153        port, priority, weight = int(port), int(priority), int(weight)
1154        return {
1155            'instance': instance,
1156            'service': service,
1157            'state': state,
1158            'port': port,
1159            'priority': priority,
1160            'weight': weight,
1161        }
1162
1163    def srp_client_add_service(self,
1164                               instance: str,
1165                               service: str,
1166                               port: int,
1167                               priority: int = 0,
1168                               weight: int = 0,
1169                               txt: Optional[Dict[str, Union[str, bytes, bool]]] = None):
1170        instance = self.__escape_escapable(instance)
1171        cmd = f'srp client service add {instance} {service} {port} {priority} {weight}'
1172        if txt:
1173            cmd += f' {self.__txt_to_hex(txt)}'
1174        self.execute_command(cmd)
1175
1176    def srp_client_remove_service(self, instance: str, service: str):
1177        """Remove a service from SRP client."""
1178        self.execute_command(f'srp client service remove {instance} {service}')
1179
1180    def srp_client_clear_service(self, instance: str, service: str):
1181        """Remove a service from SRP client without notifying the SRP server."""
1182        self.execute_command(f'srp client service clear {instance} {service}')
1183
1184    def srp_client_get_key_lease_interval(self) -> int:
1185        """Get SRP client key lease interval (in seconds)."""
1186        return self.__parse_int(self.execute_command('srp client keyleaseinterval'))
1187
1188    def srp_client_set_key_lease_interval(self, interval: int):
1189        """Set SRP client key lease interval (in seconds)."""
1190        self.execute_command(f'srp client keyleaseinterval {interval}')
1191
1192    def srp_client_get_lease_interval(self) -> int:
1193        """Get SRP client lease interval (in seconds)."""
1194        return self.__parse_int(self.execute_command('srp client leaseinterval'))
1195
1196    def srp_client_set_lease_interval(self, interval: int):
1197        """Set SRP client lease interval (in seconds)."""
1198        self.execute_command(f'srp client leaseinterval {interval}')
1199
1200    def srp_client_get_server(self) -> Tuple[Ip6Addr, int]:
1201        """Get the SRP server (IP, port)."""
1202        result = self.__parse_str(self.execute_command('srp client server'))
1203        matched = re.match(OTCI._IPV6_SERVER_PORT_PATTERN, result)
1204        assert matched
1205        ip, port = matched.groups()
1206        return Ip6Addr(ip), int(port)
1207
1208    def srp_client_get_service_key(self) -> bool:
1209        """Get SRP client "service key record inclusion" mode."""
1210        return self.__parse_Enabled_or_Disabled(self.execute_command('srp client service key'))
1211
1212    def srp_client_enable_service_key(self):
1213        """Enable SRP client "service key record inclusion" mode."""
1214        self.execute_command('srp client service key enable')
1215
1216    def srp_client_disable_service_key(self):
1217        """Disable SRP client "service key record inclusion" mode."""
1218        self.execute_command('srp client service key disable')
1219
1220    def __split_table_row(self, row: str) -> List[str]:
1221        if not (row.startswith('|') and row.endswith('|')):
1222            raise ValueError(row)
1223
1224        fields = row.split('|')
1225        fields = [x.strip() for x in fields[1:-1]]
1226        return fields
1227
1228    def __get_table_col(self, colname: str, headers: List[str], fields: List[str]) -> str:
1229        return fields[headers.index(colname)]
1230
1231    def get_child_list(self) -> List[ChildId]:
1232        """Get attached Child IDs."""
1233        line = self.__parse_str(self.execute_command(f'child list'))
1234        return [ChildId(id) for id in line.strip().split()]
1235
1236    def get_child_info(self, child: Union[ChildId, Rloc16]) -> Dict[str, Any]:
1237        output = self.execute_command(f'child {child}')
1238
1239        info = {}
1240
1241        for line in output:
1242            k, v = line.split(': ')
1243            if k == 'Child ID':
1244                info['id'] = int(v)
1245            elif k == 'Rloc':
1246                info['rloc16'] = int(v, 16)
1247            elif k == 'Ext Addr':
1248                info['extaddr'] = v
1249            elif k == 'Mode':
1250                info['mode'] = DeviceMode(v)
1251            elif k == 'Net Data':
1252                info['c_vn'] = int(v)
1253            elif k == 'Timeout':
1254                info['timeout'] = int(v)
1255            elif k == 'Age':
1256                info['age'] = int(v)
1257            elif k == 'Link Quality In':
1258                info['lq_in'] = int(v)
1259            elif k == 'RSSI':
1260                info['rssi'] = int(v)
1261            else:
1262                self.log('warning', "Child info %s: %s ignored", k, v)
1263
1264        return info
1265
1266    def get_child_ipaddrs(self) -> Dict[Rloc16, List[Ip6Addr]]:
1267        """Get the list of IP addresses stored for MTD children.
1268
1269        Note: Each MTD child might has multiple IP addresses.
1270        """
1271        output = self.execute_command('childip')
1272
1273        ipaddrs = {}
1274
1275        for line in output:
1276            rloc16, ip = line.split(': ')
1277            rloc16 = Rloc16(rloc16, 16)
1278            ipaddrs.setdefault(rloc16, []).append(Ip6Addr(ip.strip()))
1279
1280        return ipaddrs
1281
1282    #
1283    # Child configurations
1284    #
1285
1286    def get_max_children(self) -> int:
1287        """Get the Thread maximum number of allowed children."""
1288        return self.__parse_int(self.execute_command('childmax'))
1289
1290    def set_max_children(self, val: int):
1291        """Set the Thread maximum number of allowed children."""
1292        self.execute_command(f'childmax {val}')
1293
1294    def get_child_ip_max(self) -> int:
1295        """Get the maximum number of IP addresses that each MTD child may register with this device as parent."""
1296        return self.__parse_int(self.execute_command('childip max'))
1297
1298    def set_child_ip_max(self, val: int):
1299        """Get the maximum number of IP addresses that each MTD child may register with this device as parent."""
1300        self.execute_command(f'childip max {val}')
1301
1302    def get_child_timeout(self):
1303        """Get the Thread Child Timeout value."""
1304        return self.__parse_int(self.execute_command('childtimeout'))
1305
1306    def set_child_timeout(self, timeout):
1307        """Set the Thread Child Timeout value."""
1308        self.execute_command('childtimeout %d' % timeout)
1309
1310    def get_child_supervision_interval(self) -> int:
1311        """Get the Child Supervision Check Timeout value."""
1312        return self.__parse_int(self.execute_command('childsupervision interval'))
1313
1314    def set_child_supervision_interval(self, val: int):
1315        """Set the Child Supervision Interval value.
1316        This command can only be used with FTD devices.
1317        """
1318        self.execute_command(f'childsupervision interval {val}')
1319
1320    def get_child_supervision_check_timeout(self) -> int:
1321        """Get the Child Supervision Check Timeout value."""
1322        return self.__parse_int(self.execute_command('childsupervision checktimeout'))
1323
1324    def set_child_supervision_check_timeout(self, val: int):
1325        """Set the Child Supervision Check Timeout value."""
1326        self.execute_command(f'childsupervision checktimeout {val}')
1327
1328    #
1329    # Neighbor management
1330    #
1331
1332    def get_neighbor_list(self) -> List[Rloc16]:
1333        """Get a list of RLOC16 of neighbors"""
1334        line = self.__parse_str(self.execute_command('neighbor list')).strip()
1335        return [Rloc16(id, 16) for id in line.split()]
1336
1337    def get_neighbor_table(self) -> Dict[Rloc16, Dict[str, Any]]:
1338        output = self.execute_command('neighbor table')
1339        if len(output) < 2:
1340            raise UnexpectedCommandOutput(output)
1341
1342        #
1343        # Example output:
1344        #
1345        # | Role | RLOC16 | Age | Avg RSSI | Last RSSI |R|D|N| Extended MAC     |
1346        # +------+--------+-----+----------+-----------+-+-+-+------------------+
1347        # |   C  | 0xcc01 |  96 |      -46 |       -46 |1|1|1| 1eb9ba8a6522636b |
1348        # |   R  | 0xc800 |   2 |      -29 |       -29 |1|1|1| 9a91556102c39ddb |
1349        # |   R  | 0xf000 |   3 |      -28 |       -28 |1|1|1| 0ad7ed6beaa6016d |
1350        # Done
1351        #
1352
1353        headers = self.__split_table_row(output[0])
1354
1355        table = {}
1356        for line in output[2:]:
1357            line = line.strip()
1358            if not line:
1359                continue
1360
1361            fields = self.__split_table_row(line)
1362            col = lambda colname: self.__get_table_col(colname, headers, fields)
1363
1364            role = col('Role')
1365            is_router = role == 'R'
1366            r, d, n = int(col('R')), int(col('D')), int(col('N'))
1367            mode = DeviceMode(f'{"r" if r else ""}{"d" if d else ""}{"n" if n else ""}')
1368
1369            rloc16 = Rloc16(col('RLOC16'), 16)
1370
1371            table[rloc16] = {
1372                'is_router': is_router,
1373                'rloc16': rloc16,
1374                'age': int(col('Age')),
1375                'avg_rssi': int(col('Avg RSSI')),
1376                'last_rssi': int(col('Last RSSI')),
1377                'mode': mode,
1378                'extaddr': col('Extended MAC'),
1379            }
1380
1381        return table
1382
1383    #
1384    # SED/SSED configuration
1385    #
1386
1387    def get_poll_period(self) -> int:
1388        """Get the customized data poll period of sleepy end device (milliseconds).
1389        Only for Reference Device."""
1390        return self.__parse_int(self.execute_command('pollperiod'))
1391
1392    def set_poll_period(self, poll_period: int):
1393        """Set the customized data poll period (in milliseconds) for sleepy end device.
1394
1395        Only for Reference Device."""
1396        self.execute_command(f'pollperiod {poll_period}')
1397
1398    # TODO: csl
1399    # TODO: csl channel <channel>
1400    # TODO: csl period <period>
1401    # TODO: csl timeout <timeout>
1402
1403    _CSL_PERIOD_PATTERN = re.compile(r'(\d+)us')
1404    _CSL_TIMEOUT_PATTERN = re.compile(r'(\d+)s')
1405
1406    def get_csl_config(self) -> Dict[str, int]:
1407        """Get the CSL configuration."""
1408        output = self.execute_command('csl')
1409
1410        cfg = {}
1411        for line in output:
1412            k, v = line.split(': ')
1413            if k == 'Channel':
1414                cfg['channel'] = int(v)
1415            elif k == 'Timeout':
1416                matched = OTCI._CSL_TIMEOUT_PATTERN.match(v)
1417                assert matched is not None
1418                cfg['timeout'] = int(matched.group(1))
1419            elif k == 'Period':
1420                matched = OTCI._CSL_PERIOD_PATTERN.match(v)
1421                assert matched is not None
1422                cfg['period'] = int(matched.group(1))
1423            else:
1424                logging.warning("Ignore unknown CSL parameter: %s: %s", k, v)
1425
1426        return cfg
1427
1428    def config_csl(self, channel: Optional[int] = None, period: Optional[int] = None, timeout: Optional[int] = None):
1429        """Configure CSL parameters.
1430
1431        :param channel: Set CSL channel.
1432        :param period: Set CSL period in usec. Disable CSL by setting this parameter to 0.
1433        :param timeout: Set the CSL timeout in seconds.
1434        """
1435
1436        if channel is None and period is None and timeout is None:
1437            raise InvalidArgumentsError("Please specify at least 1 parameter to configure.")
1438
1439        if channel is not None:
1440            self.execute_command(f'csl channel {channel}')
1441
1442        if period is not None:
1443            self.execute_command(f'csl period {period}')
1444
1445        if timeout is not None:
1446            self.execute_command(f'csl timeout {timeout}')
1447
1448    #
1449    # Leader utilities
1450    #
1451
1452    def get_context_id_reuse_delay(self) -> int:
1453        """Get the CONTEXT_ID_REUSE_DELAY value."""
1454        return self.__parse_int(self.execute_command('contextreusedelay'))
1455
1456    def set_context_id_reuse_delay(self, val: int):
1457        """Set the CONTEXT_ID_REUSE_DELAY value."""
1458        self.execute_command(f'contextreusedelay {val}')
1459
1460    def release_router_id(self, routerid: int):
1461        """Release a Router ID that has been allocated by the device in the Leader role."""
1462        self.execute_command(f'releaserouterid {routerid}')
1463
1464    # Time Sync utilities
1465    # TODO: networktime
1466    # TODO: networktime <timesyncperiod> <xtalthreshold>
1467    # TODO: delaytimermin
1468    # TODO: delaytimermin <delaytimermin>
1469
1470    #
1471    # Commissioniner operations
1472    #
1473
1474    def commissioner_start(self):
1475        """Start the Commissioner role."""
1476        self.execute_command('commissioner start')
1477
1478    def commissioner_stop(self):
1479        """Stop the Commissioner role."""
1480        self.execute_command('commissioner stop')
1481
1482    def get_commissioiner_state(self) -> str:
1483        """Get current Commissioner state (active or petitioning or disabled)."""
1484        return self.__parse_str(self.execute_command('commissioner state'))
1485
1486    def get_commissioner_session_id(self) -> int:
1487        """Get current commissioner session id."""
1488        return self.__parse_int(self.execute_command('commissioner sessionid'))
1489
1490    def commissioner_add_joiner(self, pskd, eui64=None, discerner=None, timeout=None):
1491        """Add a Joiner entry.
1492
1493        :param pskd: Pre-Shared Key for the Joiner.
1494        :param eui64: The IEEE EUI-64 of the Joiner or '*' to match any Joiner
1495        :param discerner: The Joiner discerner in format number/length.
1496        :param timeout: Joiner timeout in seconds.
1497        """
1498        if (eui64 is not None) == (discerner is not None):
1499            raise InvalidArgumentsError("Please specify eui64 or discerner, but not both.")
1500
1501        if eui64 is not None and eui64 != '*':
1502            self.__validate_extaddr(eui64)
1503
1504        cmd = f'commissioner joiner add {eui64 or discerner} {pskd}'
1505
1506        if timeout is not None:
1507            cmd += f' {timeout}'
1508
1509        self.execute_command(cmd)
1510
1511    def commissioner_remove_jointer(self, eui64=None, discerner=None):
1512        if (eui64 is not None) == (discerner is not None):
1513            raise InvalidArgumentsError("Please specify eui64 or discerner, but not both.")
1514
1515        if eui64 is not None and eui64 != '*':
1516            self.__validate_extaddr(eui64)
1517
1518        self.execute_command(f'commissioner joiner remove {eui64 or discerner}')
1519
1520    def set_commissioner_provisioning_url(self, url: str):
1521        self.execute_command(f'commissioner provisioningurl {url}')
1522
1523    # TODO: commissioner announce
1524    # TODO: commissioner energy
1525    # TODO: commissioner mgmtget
1526    # TODO: commissioner mgmtset
1527    # TODO: commissioner panid
1528
1529    #
1530    # Joiner operations
1531    #
1532    def joiner_start(self, psk: str, provisioning_url: Optional[str] = None):
1533        """Start the Joiner."""
1534        cmd = f'joiner start {psk}'
1535        if provisioning_url is not None:
1536            cmd += f' {provisioning_url}'
1537
1538        self.execute_command(cmd)
1539
1540    def joiner_stop(self):
1541        """Stop the Joiner role."""
1542        self.execute_command('joiner stop')
1543
1544    def get_joiner_id(self) -> str:
1545        """Get the Joiner ID."""
1546        return self.__parse_joiner_id(self.execute_command('joiner id'))
1547
1548    def get_joiner_port(self) -> int:
1549        """Get the Joiner port."""
1550        return self.__parse_int(self.execute_command(f'joinerport'))
1551
1552    def set_joiner_port(self, port: int):
1553        """Set the Joiner port."""
1554        self.execute_command(f'joinerport {port}')
1555
1556    # TODO: joiner discerner
1557
1558    #
1559    # Network Data utilities
1560    #
1561    def get_local_prefixes(self) -> List[Tuple[Ip6Prefix, str, str, Rloc16]]:
1562        """Get prefixes from local Network Data."""
1563        output = self.execute_command('prefix')
1564        return self.__parse_prefixes(output)
1565
1566    def __parse_prefixes(self, output: List[str]) -> List[Tuple[Ip6Prefix, str, str, Rloc16]]:
1567        prefixes = []
1568
1569        for line in output:
1570            if line.startswith('- '):
1571                line = line[2:]
1572
1573            prefix, flags, prf, rloc16 = line.split()[:4]
1574            prefixes.append((Ip6Prefix(prefix), flags, prf, Rloc16(rloc16, 16)))
1575
1576        return prefixes
1577
1578    def add_prefix(self, prefix: str, flags='paosr', prf='med'):
1579        """Add a valid prefix to the Network Data."""
1580        self.execute_command(f'prefix add {prefix} {flags} {prf}')
1581
1582    def remove_prefix(self, prefix: str):
1583        """Invalidate a prefix in the Network Data."""
1584        self.execute_command(f'prefix remove {prefix}')
1585
1586    def register_network_data(self):
1587        self.execute_command('netdata register')
1588
1589    def get_network_data(self) -> Dict[str, List]:
1590        output = self.execute_command('netdata show')
1591
1592        netdata = {}
1593        if output.pop(0) != 'Prefixes:':
1594            raise UnexpectedCommandOutput(output)
1595
1596        prefixes_output = []
1597        while True:
1598            line = output.pop(0)
1599            if line == 'Routes:':
1600                break
1601            else:
1602                prefixes_output.append(line)
1603
1604        netdata['prefixes'] = self.__parse_prefixes(prefixes_output)
1605
1606        routes_output = []
1607        while True:
1608            line = output.pop(0)
1609            if line == 'Services:':
1610                break
1611            else:
1612                routes_output.append(line)
1613
1614        netdata['routes'] = self.__parse_routes(routes_output)
1615
1616        services_output = []
1617        while True:
1618            line = output.pop(0)
1619            if line == 'Contexts:':
1620                break
1621            else:
1622                services_output.append(line)
1623
1624        netdata['services'] = self.__parse_services(services_output)
1625
1626        return netdata
1627
1628    def get_prefixes(self) -> List[Tuple[Ip6Prefix, str, str, Rloc16]]:
1629        """Get network prefixes from Thread Network Data."""
1630        network_data = self.get_network_data()
1631        return network_data['prefixes']
1632
1633    def get_routes(self) -> List[Tuple[str, bool, str, Rloc16]]:
1634        """Get routes from Thread Network Data."""
1635        network_data = self.get_network_data()
1636        return network_data['routes']
1637
1638    def get_services(self) -> List[Tuple[int, bytes, bytes, bool, Rloc16]]:
1639        """Get services from Thread Network Data"""
1640        network_data = self.get_network_data()
1641        return network_data['services']
1642
1643    def __parse_services(self, output: List[str]) -> List[Tuple[int, bytes, bytes, bool, Rloc16]]:
1644        services = []
1645        for line in output:
1646            line = line.split()
1647
1648            enterprise_number, service_data, server_data = line[:3]
1649            if line[3] == 's':
1650                stable, rloc16 = True, line[4]
1651            else:
1652                stable, rloc16 = False, line[3]
1653
1654            enterprise_number = int(enterprise_number)
1655            service_data = self.__hex_to_bytes(service_data)
1656            server_data = self.__hex_to_bytes(server_data)
1657            rloc16 = Rloc16(rloc16, 16)
1658
1659            services.append((enterprise_number, service_data, server_data, stable, rloc16))
1660
1661        return services
1662
1663    def get_network_data_bytes(self) -> bytes:
1664        """Get the raw Network Data."""
1665        hexstr = self.__parse_str(self.execute_command('netdata show -x'))
1666        return bytes(int(hexstr[i:i + 2], 16) for i in range(0, len(hexstr), 2))
1667
1668    def get_local_routes(self) -> List[Tuple[str, bool, str, Rloc16]]:
1669        """Get routes from local Network Data."""
1670        return self.__parse_routes(self.execute_command('route'))
1671
1672    def __parse_routes(self, output: List[str]) -> List[Tuple[str, bool, str, Rloc16]]:
1673        routes = []
1674        for line in output:
1675            line = line.split()
1676            if len(line) == 4:
1677                prefix, flags, prf, rloc16 = line
1678                stable = 's' in flags
1679            else:
1680                prefix, prf, rloc16 = line
1681                stable = False
1682
1683            rloc16 = Rloc16(rloc16, 16)
1684            routes.append((prefix, stable, prf, rloc16))
1685
1686        return routes
1687
1688    def add_route(self, prefix: str, stable=True, prf='med'):
1689        """Add a valid external route to the Network Data."""
1690        cmd = f'route add {prefix}'
1691        if stable:
1692            cmd += ' s'
1693
1694        cmd += f' {prf}'
1695        self.execute_command(cmd)
1696
1697    def remove_route(self, prefix: str):
1698        """Invalidate a external route in the Network Data."""
1699        self.execute_command(f'route remove {prefix}')
1700
1701    def add_service(self, enterprise_number: int, service_data: Union[str, bytes], server_data: Union[str, bytes]):
1702        """Add service to the Network Data.
1703
1704        enterpriseNumber: IANA enterprise number
1705        serviceData: hex-encoded binary service data
1706        serverData: hex-encoded binary server data
1707        """
1708        service_data = self.__validate_hex_or_bytes(service_data)
1709        server_data = self.__validate_hex_or_bytes(server_data)
1710        self.execute_command(f'service add {enterprise_number} {service_data} {server_data}')
1711
1712    def remove_service(self, enterprise_number, service_data):
1713        """Remove service from Network Data.
1714
1715        enterpriseNumber: IANA enterprise number
1716        serviceData: hext-encoded binary service data
1717        """
1718        service_data = self.__validate_hex_or_bytes(service_data)
1719        self.execute_command(f'service remove {enterprise_number} {service_data}')
1720
1721    #
1722    # Dataset management
1723    #
1724
1725    def dataset_init_buffer(self, get_active_dataset=False, get_pending_dataset=False):
1726        """Initialize operational dataset buffer."""
1727        if get_active_dataset and get_pending_dataset:
1728            raise InvalidArgumentsError("Can not specify both `get_active_dataset` and `get_pending_dataset`.")
1729
1730        if get_active_dataset:
1731            self.execute_command(f'dataset init active')
1732        elif get_pending_dataset:
1733            self.execute_command(f'dataset init pending')
1734        else:
1735            self.execute_command(f'dataset init new')
1736
1737    def dataset_commit_buffer(self, dataset: str):
1738        if dataset in ('active', 'pending'):
1739            cmd = f'dataset commit {dataset}'
1740        else:
1741            raise InvalidArgumentsError(f'Unkonwn dataset: {dataset}')
1742
1743        self.execute_command(cmd)
1744
1745    def dataset_clear_buffer(self):
1746        """Reset operational dataset buffer."""
1747        self.execute_command('dataset clear')
1748
1749    def get_dataset(self, dataset: str = 'buffer'):
1750        if dataset in ('active', 'pending'):
1751            cmd = f'dataset {dataset}'
1752        elif dataset == 'buffer':
1753            cmd = 'dataset'
1754        else:
1755            raise InvalidArgumentsError(f'Unkonwn dataset: {dataset}')
1756
1757        output = self.execute_command(cmd)
1758        return self.__parse_dataset(output)
1759
1760    def __parse_dataset(self, output: List[str]) -> Dict[str, Any]:
1761        # Example output:
1762        #
1763        # Active Timestamp: 1
1764        # Channel: 22
1765        # Channel Mask: 0x07fff800
1766        # Ext PAN ID: 5c93ae980ff22d35
1767        # Mesh Local Prefix: fdc7:55fe:6363:bd01::/64
1768        # Network Key: d1a8348d59fb1fac1d6c4f95007d487a
1769        # Network Name: OpenThread-7caa
1770        # PAN ID: 0x7caa
1771        # PSKc: 167d89fd169e439ca0b8266de248090f
1772        # Security Policy: 672 onrc 0
1773
1774        dataset = {}
1775
1776        for line in output:
1777            line = line.split(': ')
1778            key, val = line[0], ': '.join(line[1:])
1779
1780            if key == 'Active Timestamp':
1781                dataset['active_timestamp'] = int(val)
1782            elif key == 'Channel':
1783                dataset['channel'] = int(val)
1784            elif key == 'Channel Mask':
1785                dataset['channel_mask'] = int(val, 16)
1786            elif key == 'Ext PAN ID':
1787                dataset['extpanid'] = val
1788            elif key == 'Mesh Local Prefix':
1789                dataset['mesh_local_prefix'] = val
1790            elif key in ('Network Key', 'Master Key'):
1791                dataset['networkkey'] = val
1792            elif key == 'Network Name':
1793                dataset['network_name'] = val
1794            elif key == 'PAN ID':
1795                dataset['panid'] = int(val, 16)
1796            elif key == 'PSKc':
1797                dataset['pskc'] = val
1798            elif key == 'Security Policy':
1799                rotation_time, flags, version_threshold = val.split(' ')
1800                rotation_time = int(rotation_time)
1801                dataset['security_policy'] = SecurityPolicy(rotation_time, flags)
1802            else:
1803                raise UnexpectedCommandOutput(output)
1804
1805        return dataset
1806
1807    def get_dataset_bytes(self, dataset: str) -> bytes:
1808        if dataset in ('active', 'pending'):
1809            cmd = f'dataset {dataset} -x'
1810        else:
1811            raise InvalidArgumentsError(f'Unkonwn dataset: {dataset}')
1812
1813        hexstr = self.__parse_str(self.execute_command(cmd))
1814        return self.__hex_to_bytes(hexstr)
1815
1816    def set_dataset_bytes(self, dataset: str, data: bytes) -> None:
1817        if dataset in ('active', 'pending'):
1818            cmd = f'dataset set {dataset} {self.__bytes_to_hex(data)}'
1819        else:
1820            raise InvalidArgumentsError(f'Unkonwn dataset: {dataset}')
1821
1822        self.execute_command(cmd)
1823
1824    def get_dataset_tlvs_bytes(self) -> bytes:
1825        """Gets bytes of the Operational Dataset TLVs"""
1826        hexstr = self.__parse_str(self.execute_command('dataset tlvs'))
1827        return self.__hex_to_bytes(hexstr)
1828
1829    def dataset_set_buffer(self,
1830                           active_timestamp: Optional[int] = None,
1831                           channel: Optional[int] = None,
1832                           channel_mask: Optional[int] = None,
1833                           extpanid: Optional[str] = None,
1834                           mesh_local_prefix: Optional[str] = None,
1835                           network_key: Optional[str] = None,
1836                           network_name: Optional[str] = None,
1837                           panid: Optional[int] = None,
1838                           pskc: Optional[str] = None,
1839                           security_policy: Optional[tuple] = None,
1840                           pending_timestamp: Optional[int] = None):
1841        if active_timestamp is not None:
1842            self.execute_command(f'dataset activetimestamp {active_timestamp}')
1843
1844        if channel is not None:
1845            self.execute_command(f'dataset channel {channel}')
1846
1847        if channel_mask is not None:
1848            self.execute_command(f'dataset channelmask {channel_mask}')
1849
1850        if extpanid is not None:
1851            self.execute_command(f'dataset extpanid {extpanid}')
1852
1853        if mesh_local_prefix is not None:
1854            self.execute_command(f'dataset meshlocalprefix {mesh_local_prefix}')
1855
1856        if network_key is not None:
1857            nwk_cmd = self.__detect_networkkey_cmd()
1858            self.execute_command(f'dataset {nwk_cmd} {network_key}')
1859
1860        if network_name is not None:
1861            self.execute_command(f'dataset networkname {self.__escape_escapable(network_name)}')
1862
1863        if panid is not None:
1864            self.execute_command(f'dataset panid {panid}')
1865
1866        if pskc is not None:
1867            self.execute_command(f'dataset pskc {pskc}')
1868
1869        if security_policy is not None:
1870            rotation_time, flags = security_policy
1871            self.execute_command(f'dataset securitypolicy {rotation_time} {flags}')
1872
1873        if pending_timestamp is not None:
1874            self.execute_command(f'dataset pendingtimestamp {pending_timestamp}')
1875
1876    # TODO: dataset mgmtgetcommand
1877    # TODO: dataset mgmtsetcommand
1878    # TODO: dataset set <active|pending> <dataset>
1879
1880    #
1881    # Allowlist management
1882    #
1883
1884    def enable_allowlist(self):
1885        self.execute_command(f'macfilter addr {self.__detect_allowlist_cmd()}')
1886
1887    def disable_allowlist(self):
1888        self.execute_command('macfilter addr disable')
1889
1890    def add_allowlist(self, addr: str, rssi: Optional[int] = None):
1891        cmd = f'macfilter addr add {addr}'
1892
1893        if rssi is not None:
1894            cmd += f' {rssi}'
1895
1896        self.execute_command(cmd)
1897
1898    def remove_allowlist(self, addr: str):
1899        self.execute_command(f'macfilter addr remove {addr}')
1900
1901    def clear_allowlist(self):
1902        self.execute_command('macfilter addr clear')
1903
1904    def set_allowlist(self, allowlist: Collection[Union[str, Tuple[str, int]]]):
1905        self.clear_allowlist()
1906
1907        if allowlist is None:
1908            self.disable_allowlist()
1909        else:
1910            self.enable_allowlist()
1911            for item in allowlist:
1912                if isinstance(item, str):
1913                    self.add_allowlist(item)
1914                else:
1915                    addr, rssi = item[0], item[1]
1916                    self.add_allowlist(addr, rssi)
1917
1918    # TODO: denylist
1919    # TODO: macfilter rss
1920    # TODO: macfilter rss add <extaddr> <rss>
1921    # TODO: macfilter rss add-lqi <extaddr> <lqi>
1922    # TODO: macfilter rss remove <extaddr>
1923    # TODO: macfilter rss clear
1924
1925    def __detect_allowlist_cmd(self):
1926        if self.api_version >= 28:
1927            return 'allowlist'
1928        else:
1929            return '\x77\x68\x69\x74\x65\x6c\x69\x73\x74'
1930
1931    def __detect_networkkey_cmd(self) -> str:
1932        return 'networkkey' if self.api_version >= 126 else 'masterkey'
1933
1934    #
1935    # Unicast Addresses management
1936    #
1937    def add_ipaddr(self, ip: Union[str, ipaddress.IPv6Address]):
1938        """Add an IPv6 address to the Thread interface."""
1939        self.execute_command(f'ipaddr add {ip}')
1940
1941    def del_ipaddr(self, ip: Union[str, ipaddress.IPv6Address]):
1942        """Delete an IPv6 address from the Thread interface."""
1943        self.execute_command(f'ipaddr del {ip}')
1944
1945    def get_ipaddrs(self) -> Tuple[Ip6Addr]:
1946        """Get all IPv6 addresses assigned to the Thread interface."""
1947        return tuple(map(Ip6Addr, self.execute_command('ipaddr')))
1948
1949    def has_ipaddr(self, ip: Union[str, ipaddress.IPv6Address]):
1950        """Check if a IPv6 address was added to the Thread interface."""
1951        return ip in self.get_ipaddrs()
1952
1953    def get_ipaddr_mleid(self) -> Ip6Addr:
1954        """Get Thread Mesh Local EID address."""
1955        return self.__parse_ip6addr(self.execute_command('ipaddr mleid'))
1956
1957    def get_ipaddr_linklocal(self) -> Ip6Addr:
1958        """Get Thread link-local IPv6 address."""
1959        return self.__parse_ip6addr(self.execute_command('ipaddr linklocal'))
1960
1961    def get_ipaddr_rloc(self) -> Ip6Addr:
1962        """Get Thread Routing Locator (RLOC) address."""
1963        return self.__parse_ip6addr(self.execute_command('ipaddr rloc'))
1964
1965    #
1966    # Multicast Addresses management
1967    #
1968
1969    def add_ipmaddr(self, ip: Union[str, ipaddress.IPv6Address]):
1970        """Subscribe the Thread interface to the IPv6 multicast address."""
1971        self.execute_command(f'ipmaddr add {ip}')
1972
1973    def del_ipmaddr(self, ip: Union[str, ipaddress.IPv6Address]):
1974        """Unsubscribe the Thread interface to the IPv6 multicast address."""
1975        self.execute_command(f'ipmaddr del {ip}')
1976
1977    def get_ipmaddrs(self) -> Tuple[Ip6Addr]:
1978        """Get all IPv6 multicast addresses subscribed to the Thread interface."""
1979        return tuple(map(Ip6Addr, self.execute_command('ipmaddr')))
1980
1981    def has_ipmaddr(self, ip: Union[str, ipaddress.IPv6Address]):
1982        """Check if a IPv6 multicast address was subscribed by the Thread interface."""
1983        return ip in self.get_ipmaddrs()
1984
1985    def get_ipmaddr_llatn(self) -> Ip6Addr:
1986        """Get Link Local All Thread Nodes Multicast Address"""
1987        return self.__parse_ip6addr(self.execute_command('ipmaddr llatn'))
1988
1989    def get_ipmaddr_rlatn(self) -> Ip6Addr:
1990        """Get Realm Local All Thread Nodes Multicast Address"""
1991        return self.__parse_ip6addr(self.execute_command('ipmaddr rlatn'))
1992
1993    #
1994    # Backbone Router Utilities
1995    #
1996
1997    # TODO: bbr mgmt ...
1998
1999    def enable_backbone_router(self):
2000        """Enable Backbone Router Service for Thread 1.2 FTD.
2001
2002        SRV_DATA.ntf would be triggered for attached device if there is no Backbone Router Service in Thread Network Data.
2003        """
2004        self.execute_command('bbr enable')
2005
2006    def disable_backbone_router(self):
2007        """Disable Backbone Router Service for Thread 1.2 FTD.
2008
2009        SRV_DATA.ntf would be triggered if Backbone Router is Primary state.
2010        """
2011        self.execute_command('bbr disable')
2012
2013    def get_backbone_router_state(self) -> str:
2014        """Get local Backbone state (Disabled or Primary or Secondary) for Thread 1.2 FTD."""
2015        return self.__parse_str(self.execute_command('bbr state'))
2016
2017    def get_primary_backbone_router_info(self) -> Optional[dict]:
2018        """Show current Primary Backbone Router information for Thread 1.2 device."""
2019        output = self.execute_command('bbr')
2020
2021        if len(output) < 1:
2022            raise UnexpectedCommandOutput(output)
2023
2024        line = output[0]
2025        if line == 'BBR Primary: None':
2026            return None
2027
2028        if line != 'BBR Primary:':
2029            raise UnexpectedCommandOutput(output)
2030
2031        # Example output:
2032        # BBR Primary:
2033        # server16: 0xE400
2034        # seqno:    10
2035        # delay:    120 secs
2036        # timeout:  300 secs
2037
2038        dataset = {}
2039
2040        for line in output[1:]:
2041            key, val = line.split(':')
2042            key, val = key.strip(), val.strip()
2043            if key == 'server16':
2044                dataset[key] = int(val, 16)
2045            elif key == 'seqno':
2046                dataset[key] = int(val)
2047            elif key == 'delay':
2048                if not val.endswith(' secs'):
2049                    raise UnexpectedCommandOutput(output)
2050                dataset[key] = int(val.split()[0])
2051            elif key == 'timeout':
2052                if not val.endswith(' secs'):
2053                    raise UnexpectedCommandOutput(output)
2054                dataset[key] = int(val.split()[0])
2055            else:
2056                raise UnexpectedCommandOutput(output)
2057
2058        return dataset
2059
2060    def register_backbone_router_dataset(self):
2061        """Register Backbone Router Service for Thread 1.2 FTD.
2062
2063        SRV_DATA.ntf would be triggered for attached device.
2064        """
2065        self.execute_command('bbr register')
2066
2067    def get_backbone_router_config(self) -> dict:
2068        """Show local Backbone Router configuration for Thread 1.2 FTD."""
2069        output = self.execute_command('bbr config')
2070        # Example output:
2071        # seqno:    10
2072        # delay:    120 secs
2073        # timeout:  300 secs
2074
2075        config = {}
2076
2077        for line in output:
2078            key, val = line.split(':')
2079            key, val = key.strip(), val.strip()
2080            if key == 'seqno':
2081                config[key] = int(val)
2082            elif key in ('delay', 'timeout'):
2083                if not line.endswith(' secs'):
2084                    raise UnexpectedCommandOutput(output)
2085                config[key] = int(val.split()[0])
2086            else:
2087                raise UnexpectedCommandOutput(output)
2088
2089        return config
2090
2091    def set_backbone_router_config(self,
2092                                   seqno: Optional[int] = None,
2093                                   delay: Optional[int] = None,
2094                                   timeout: Optional[int] = None):
2095        """Configure local Backbone Router configuration for Thread 1.2 FTD.
2096
2097        Call register_backbone_router_dataset() to explicitly register Backbone Router service to Leader for Secondary Backbone Router.
2098        """
2099        if seqno is None and delay is None and timeout is None:
2100            raise InvalidArgumentsError("Please specify seqno or delay or timeout")
2101
2102        cmd = 'bbr config'
2103        if seqno is not None:
2104            cmd += f' seqno {seqno}'
2105
2106        if delay is not None:
2107            cmd += f' delay {delay}'
2108
2109        if timeout is not None:
2110            cmd += f' timeout {timeout}'
2111
2112        self.execute_command(cmd)
2113
2114    def get_backbone_router_jitter(self) -> int:
2115        """Get jitter (in seconds) for Backbone Router registration for Thread 1.2 FTD."""
2116        return self.__parse_int(self.execute_command('bbr jitter'))
2117
2118    def set_backbone_router_jitter(self, val: int):
2119        """Set jitter (in seconds) for Backbone Router registration for Thread 1.2 FTD."""
2120        self.execute_command(f'bbr jitter {val}')
2121
2122    def backbone_router_get_multicast_listeners(self) -> List[Tuple[Ip6Addr, int]]:
2123        """Get Backbone Router Multicast Listeners."""
2124        listeners = []
2125        for line in self.execute_command('bbr mgmt mlr listener'):
2126            ip, timeout = line.split()
2127            listeners.append((Ip6Addr(ip), int(timeout)))
2128
2129        return listeners
2130
2131    #
2132    # Thread 1.2 and DUA/MLR utilities
2133    #
2134
2135    def get_domain_name(self) -> str:
2136        """Get the Thread Domain Name for Thread 1.2 device."""
2137        return self.__parse_str(self.execute_command('domainname'))
2138
2139    def set_domain_name(self, name: str):
2140        """Set the Thread Domain Name for Thread 1.2 device."""
2141        self.execute_command('domainname %s' % self.__escape_escapable(name))
2142
2143    # TODO: dua iid
2144    # TODO: dua iid <iid>
2145    # TODO: dua iid clear
2146    # TODO: mlr reg <ipaddr> ... [timeout]
2147
2148    #
2149    # Link metrics management
2150    #
2151
2152    def linkmetrics_config_enhanced_ack_clear(self, peer_addr: Union[str, Ip6Addr]) -> bool:
2153        output = self.execute_command(f'linkmetrics config {peer_addr} enhanced-ack clear')
2154        return self.__parse_linkmetrics_mgmt_response(peer_addr, output)
2155
2156    def linkmetrics_config_enhanced_ack_register(self,
2157                                                 peer_addr: Union[str, Ip6Addr],
2158                                                 link_metrics_flags: str,
2159                                                 reference: bool = False) -> bool:
2160        if self.__valid_flags(link_metrics_flags, 'qmr') is False:
2161            raise ValueError(link_metrics_flags)
2162
2163        output = self.execute_command(
2164            f'linkmetrics config {peer_addr} enhanced-ack register {link_metrics_flags} {"r" if reference else ""}')
2165        return self.__parse_linkmetrics_mgmt_response(peer_addr, output)
2166
2167    def linkmetrics_config_forward(self, peer_addr: Union[str, Ip6Addr], seriesid: int, series_flags: str,
2168                                   link_metrics_flags: str) -> bool:
2169        if self.__valid_flags(series_flags, 'ldraX') is False:
2170            raise ValueError(series_flags)
2171
2172        if self.__valid_flags(link_metrics_flags, 'pqmr') is False:
2173            raise ValueError(link_metrics_flags)
2174
2175        output = self.execute_command(
2176            f'linkmetrics config {peer_addr} forward {seriesid} {series_flags} {link_metrics_flags}')
2177        return self.__parse_linkmetrics_mgmt_response(peer_addr, output)
2178
2179    def linkmetrics_probe(self, peer_addr: Union[str, Ip6Addr], seriesid: int, length: int):
2180        if length < 0 or length > 64:
2181            raise ValueError(length)
2182
2183        self.execute_command(f'linkmetrics probe {peer_addr} {seriesid} {length}')
2184
2185    def linkmetrics_request_single(self, peer_addr: Union[str, Ip6Addr], link_metrics_flags: str) -> Dict[str, int]:
2186        if self.__valid_flags(link_metrics_flags, 'pqmr') is False:
2187            raise ValueError(link_metrics_flags)
2188
2189        output = self.execute_command(f'linkmetrics request {peer_addr} single {link_metrics_flags}')
2190        return self.__parse_linkmetrics_report(peer_addr, output)
2191
2192    def linkmetrics_request_forward(self, peer_addr: Union[str, Ip6Addr], seriesid: int) -> Dict[str, int]:
2193        output = self.execute_command(f'linkmetrics request {peer_addr} forward {seriesid}')
2194        return self.__parse_linkmetrics_report(peer_addr, output)
2195
2196    def __parse_linkmetrics_mgmt_response(self, peer_addr: Union[str, Ip6Addr], output: List[str]) -> bool:
2197        #
2198        # Example output:
2199        #
2200        # Received Link Metrics Management Response from: fe80:0:0:0:3092:f334:1455:1ad2
2201        # Status: Success
2202        # Done
2203        #
2204
2205        status = ''
2206        report_received = False
2207        ret = False
2208
2209        for line in output:
2210            if 'Received Link Metrics Management Response from' in line:
2211                address = line.split(': ')[1].strip()
2212                report_received = address == peer_addr
2213            elif 'Status' in line:
2214                status = line.split(':')[1].strip()
2215
2216        return report_received and status == 'Success'
2217
2218    def __parse_linkmetrics_report(self, peer_addr: Union[str, Ip6Addr], output: List[str]) -> Dict[str, int]:
2219        #
2220        # Example output:
2221        #
2222        # Received Link Metrics Report from: fe80:0:0:0:3092:f334:1455:1ad2
2223        #
2224        # - PDU Counter: 2 (Count/Summation)
2225        # - LQI: 76 (Exponential Moving Average)
2226        # - Margin: 82 (dB) (Exponential Moving Average)
2227        # - RSSI: -18 (dBm) (Exponential Moving Average)
2228        # Done
2229        #
2230
2231        results = {}
2232        report_received = False
2233
2234        for line in output:
2235            if 'Received Link Metrics Report' in line:
2236                address = line.split(': ')[1].strip()
2237                report_received = address == peer_addr
2238            elif 'Received Link Metrics data in Enh Ack from neighbor' in line:
2239                # If the Enhanced-ACK Based Probing is enabled, the CLI will output the following
2240                # link metrics info after executing the `linkmetrics request` command. This case is
2241                # used to skip these Enhanced-ACK related link metrics info.
2242                #
2243                # Received Link Metrics data in Enh Ack from neighbor, short address:0x3400 , extended address:c6a24d6514cf9178
2244                # - LQI: 224 (Exponential Moving Average)
2245                # - Margin: 0 (dB) (Exponential Moving Average)
2246                #
2247                # Received Link Metrics Report from: fe80:0:0:0:3092:f334:1455:1ad2
2248                #
2249                # - PDU Counter: 2 (Count/Summation)
2250                # - LQI: 76 (Exponential Moving Average)
2251                # - Margin: 82 (dB) (Exponential Moving Average)
2252                # - RSSI: -18 (dBm) (Exponential Moving Average)
2253                # Done
2254                #
2255                report_received = False
2256
2257            if not report_received:
2258                continue
2259
2260            if '- LQI' in line:
2261                results['lqi'] = self.__parse_numbers(line)[0]
2262            elif '- Margin' in line:
2263                results['margin'] = self.__parse_numbers(line)[0]
2264            elif '- RSSI' in line:
2265                results['rssi'] = self.__parse_numbers(line)[0]
2266            elif '- PDU Counter' in line:
2267                results['pdu_counter'] = self.__parse_numbers(line)[0]
2268
2269        return results
2270
2271    def __parse_numbers(self, line: str) -> List[int]:
2272        values = re.findall("\-?\d+", line)
2273        return list(map(int, values))
2274
2275    def __valid_flags(self, flags: str, flags_set: str):
2276        # check for duplicate chars
2277        if len(flags) != len(set(flags)):
2278            return False
2279
2280        return set(flags).issubset(set(flags_set))
2281
2282    #
2283    # Logging
2284    #
2285
2286    def get_log_level(self) -> int:
2287        """Get the log level."""
2288        return self.__parse_int(self.execute_command('log level'))
2289
2290    def set_log_level(self, level: int):
2291        """Set the log level."""
2292        self.execute_command(f'log level {level}')
2293
2294    #
2295    # Device performance related information
2296    #
2297
2298    def get_message_buffer_info(self) -> dict:
2299        """Get the current message buffer information."""
2300        output = self.execute_command('bufferinfo')
2301
2302        info = {}
2303
2304        def _parse_val(val):
2305            vals = val.split()
2306            return int(vals[0]) if len(vals) == 1 else tuple(map(int, vals))
2307
2308        for line in output:
2309            key, val = line.split(':')
2310            key, val = key.strip(), val.strip()
2311            info[key.replace(' ', '_')] = _parse_val(val)
2312
2313        return info
2314
2315    @constant_property
2316    def counter_names(self):
2317        """Get the supported counter names."""
2318        return tuple(self.execute_command('counters'))
2319
2320    def get_counter(self, name: str) -> Counter:
2321        """Reset the counter value."""
2322        output = self.execute_command(f'counters {name}')
2323
2324        counter = Counter()
2325        for line in output:
2326            k, v = line.strip().split(': ')
2327            counter[k] = int(v)
2328
2329        return counter
2330
2331    def reset_counter(self, name: str):
2332        """Reset the counter value."""
2333        self.execute_command(f'counters {name} reset')
2334
2335    def get_eidcache(self) -> Dict[Ip6Addr, Rloc16]:
2336        """Get the EID-to-RLOC cache entries."""
2337        output = self.execute_command('eidcache')
2338        cache = {}
2339
2340        for line in output:
2341            ip, rloc16, _ = line.split(" ", 2)
2342
2343            cache[Ip6Addr(ip)] = Rloc16(rloc16, 16)
2344
2345        return cache
2346
2347    #
2348    # UDP utilities
2349    #
2350
2351    def udp_open(self):
2352        """Opens the example socket."""
2353        self.execute_command('udp open')
2354
2355    def udp_close(self):
2356        """Opens the example socket."""
2357        self.execute_command('udp close')
2358
2359    def udp_bind(self, ip: str, port: int, netif: NetifIdentifier = NetifIdentifier.THERAD):
2360        """Assigns a name (i.e. IPv6 address and port) to the example socket.
2361
2362        :param ip: the IPv6 address or the unspecified IPv6 address (::).
2363        :param port: the UDP port
2364        """
2365        bindarg = ''
2366        if netif == NetifIdentifier.UNSPECIFIED:
2367            bindarg += ' -u'
2368        elif netif == NetifIdentifier.BACKBONE:
2369            bindarg += ' -b'
2370
2371        self.execute_command(f'udp bind{bindarg} {ip} {port}')
2372
2373    def udp_connect(self, ip: str, port: int):
2374        """Specifies the peer with which the socket is to be associated.
2375
2376        ip: the peer's IPv6 address.
2377        port: the peer's UDP port.
2378        """
2379        self.execute_command(f'udp connect {ip} {port}')
2380
2381    def udp_send(self,
2382                 ip: Optional[Union[str, Ip6Addr]] = None,
2383                 port: Optional[int] = None,
2384                 text: Optional[str] = None,
2385                 random_bytes: Optional[int] = None,
2386                 hex: Optional[str] = None):
2387        """Send a few bytes over UDP.
2388
2389        ip: the IPv6 destination address.
2390        port: the UDP destination port.
2391        type: the type of the message: _ -t: text payload in the value, same as without specifying the type. _ -s: autogenerated payload with specified length indicated in the value.
2392        * -x: binary data in hexadecimal representation in the value.
2393        """
2394        if (ip is None) != (port is None):
2395            raise InvalidArgumentsError("Please specify both `ip` and `port`.")
2396
2397        if (text is not None) + (random_bytes is not None) + (hex is not None) != 1:
2398            raise InvalidArgumentsError("Please specify `text` or `random_bytes` or `hex`.")
2399
2400        cmd = 'udp send'
2401
2402        if ip is not None:
2403            cmd += f' {ip} {port}'
2404
2405        if text is not None:
2406            cmd += f' -t {text}'
2407        elif random_bytes is not None:
2408            cmd += f' -s {random_bytes}'
2409        elif hex is not None:
2410            self.__validate_hex(hex)
2411            cmd += f' -x {hex}'
2412
2413        self.execute_command(cmd)
2414
2415    def udp_get_link_security(self) -> bool:
2416        """Gets whether the link security is enabled or disabled."""
2417        return self.__parse_Enabled_or_Disabled(self.execute_command('udp linksecurity'))
2418
2419    def udp_enable_link_security(self):
2420        """Enable link security."""
2421        self.execute_command('udp linksecurity enable')
2422
2423    def udp_disable_link_security(self):
2424        """Disable link security."""
2425        self.execute_command('udp linksecurity disable')
2426
2427    def netstat(self) -> List[Tuple[Tuple[Ip6Addr, int], Tuple[Ip6Addr, int]]]:
2428        cmd = 'netstat'
2429        output = self.execute_command(cmd)
2430        if len(output) < 2:
2431            raise UnexpectedCommandOutput(output)
2432
2433        socks = []
2434        for line in output[2:]:
2435            _, sock_addr, peer_addr = line.strip().split('|')[:3]
2436            sock_addr = self.__parse_socket_addr(sock_addr.strip())
2437            peer_addr = self.__parse_socket_addr(peer_addr.strip())
2438            socks.append((sock_addr, peer_addr))
2439
2440        return socks
2441
2442    @staticmethod
2443    def __parse_socket_addr(addr: str) -> Tuple[Ip6Addr, int]:
2444        addr, port = addr.rsplit(':', 1)
2445        if addr.startswith('[') and addr.endswith(']'):
2446            addr = addr[1:-1]
2447
2448        return Ip6Addr(addr), int(port) if port != '*' else 0
2449
2450    #
2451    # CoAP CLI (test) utilities
2452    #
2453    def coap_start(self):
2454        """Starts the application coap service."""
2455        self.execute_command('coap start')
2456
2457    def coap_stop(self):
2458        """Stops the application coap service."""
2459        self.execute_command('coap stop')
2460
2461    def coap_get(self, addr: Union[str, Ip6Addr], uri_path: str, type: str = "con"):
2462        cmd = f'coap get {addr} {uri_path} {type}'
2463        self.execute_command(cmd)
2464
2465    def coap_put(self, addr: Union[str, Ip6Addr], uri_path: str, type: str = "con", payload: Optional[str] = None):
2466        cmd = f'coap put {addr} {uri_path} {type}'
2467
2468        if payload is not None:
2469            cmd += f' {payload}'
2470
2471        self.execute_command(cmd)
2472
2473    def coap_post(self, addr: Union[str, Ip6Addr], uri_path: str, type: str = "con", payload: Optional[str] = None):
2474        cmd = f'coap post {addr} {uri_path} {type}'
2475
2476        if payload is not None:
2477            cmd += f' {payload}'
2478
2479        self.execute_command(cmd)
2480
2481    def coap_delete(self, addr: Union[str, Ip6Addr], uri_path: str, type: str = "con", payload: Optional[str] = None):
2482        cmd = f'coap delete {addr} {uri_path} {type}'
2483
2484        if payload is not None:
2485            cmd += f' {payload}'
2486
2487        self.execute_command(cmd)
2488
2489    def coap_get_test_resource_path(self) -> str:
2490        """Gets the URI path for the test resource."""
2491        return self.__parse_str(self.execute_command('coap resource'))
2492
2493    def coap_set_test_resource_path(self, path: str):
2494        """Sets the URI path for the test resource."""
2495        self.execute_command(f'coap resource {path}')
2496
2497    def coap_test_set_resource_content(self, content: str):
2498        """Sets the content sent by the test resource. If a CoAP client is observing the resource, a notification is sent to that client."""
2499        self.execute_command(f'coap set {content}')
2500
2501    # TODO: coap observe <address> <uri-path> [type]
2502    # TODO: coap cancel
2503    # TODO: coap parameters <type> ["default"|<ack_timeout> <ack_random_factor_numerator> <ack_random_factor_denominator> <max_retransmit>]
2504    # TODO: CoAP Secure utilities
2505
2506    #
2507    # Diag Utilities
2508    #
2509    def diag_start(self):
2510        """Start diagnostics mode."""
2511        self.execute_command('diag start')
2512
2513    def diag_stop(self):
2514        """Stop diagnostics mode."""
2515        self.execute_command('diag stop')
2516
2517    def diag_set_channel(self, channel: int):
2518        """Set the IEEE 802.15.4 Channel value for diagnostics module."""
2519        self.execute_command(f'diag channel {channel}')
2520
2521    def diag_get_channel(self) -> int:
2522        """Get the IEEE 802.15.4 Channel value for diagnostics module."""
2523        line = self.__parse_str(self.execute_command('diag channel'))
2524        return int(line.split()[1])
2525
2526    def diag_set_power(self, power: int):
2527        """Set the tx power value(dBm) for diagnostics module."""
2528        self.execute_command(f'diag power {power}')
2529
2530    def diag_get_power(self) -> int:
2531        """Get the tx power value(dBm) for diagnostics module."""
2532        line = self.__parse_str(self.execute_command('diag power'))
2533        if not line.endswith(' dBm'):
2534            raise UnexpectedCommandOutput([line])
2535
2536        return int(line.split()[2])
2537
2538    def diag_cw_start(self):
2539        """Start transmitting continuous carrier wave."""
2540        self.execute_command('diag cw start')
2541
2542    def diag_cw_stop(self):
2543        """Stop transmitting continuous carrier wave."""
2544        self.execute_command('diag cw stop')
2545
2546    def diag_frame(self, frame: str):
2547        """Set the frame (hex encoded) to be used by `diag send` and `diag repeat`."""
2548        self.execute_command(f'diag frame {frame}')
2549
2550    def diag_stream_start(self):
2551        """Start transmitting a stream of characters."""
2552        self.execute_command('diag stream start')
2553
2554    def diag_stream_stop(self):
2555        """Stop transmitting a stream of characters."""
2556        self.execute_command('diag stream stop')
2557
2558    def diag_send(self, packets: int, length: Optional[int] = None):
2559        """Transmit a fixed number of packets."""
2560        if length is None:
2561            command = f'diag send {packets}'
2562        else:
2563            command = f'diag send {packets} {length}'
2564        self.execute_command(command)
2565
2566    def diag_repeat(self, delay: int, length: Optional[int] = None):
2567        """Transmit packets repeatedly with a fixed interval."""
2568        if length is None:
2569            command = f'diag repeat {delay}'
2570        else:
2571            command = f'diag repeat {delay} {length}'
2572        self.execute_command(command)
2573
2574    def diag_repeat_stop(self):
2575        """Stop repeated packet transmission."""
2576        self.execute_command('diag repeat stop')
2577
2578    def diag_radio_sleep(self):
2579        """Enter radio sleep mode."""
2580        self.execute_command('diag radio sleep')
2581
2582    def diag_radio_receive(self):
2583        """Set radio to receive mode."""
2584        self.execute_command('diag radio receive')
2585
2586    def diag_get_radio_state(self) -> str:
2587        """Get the state of the radio."""
2588        return self.__parse_str(self.execute_command('diag radio state'))
2589
2590    def diag_get_stats(self) -> Dict[str, int]:
2591        """Get statistics during diagnostics mode."""
2592        output = self.execute_command('diag stats')
2593        if len(output) < 4:
2594            raise UnexpectedCommandOutput(output)
2595
2596        result = {}
2597
2598        result['received_packets'] = int(output[0].split(":")[1])
2599        result['sent_packets'] = int(output[1].split(":")[1])
2600
2601        values = re.findall("\-?\d+", output[2])
2602        result['first_received_packet_rssi'] = int(values[0])
2603        result['first_received_packet_lqi'] = int(values[1])
2604
2605        values = re.findall("\-?\d+", output[3])
2606        result['last_received_packet_rssi'] = int(values[0])
2607        result['last_received_packet_lqi'] = int(values[1])
2608
2609        return result
2610
2611    def diag_stats_clear(self):
2612        """Clear statistics during diagnostics mode."""
2613        self.execute_command('diag stats clear')
2614
2615    def diag_set_gpio_value(self, gpio: int, value: int):
2616        """Set the gpio value."""
2617        self.execute_command(f'diag gpio set {gpio} {value}')
2618
2619    def diag_get_gpio_value(self, gpio: int) -> int:
2620        """Get the gpio value."""
2621        return int(self.__parse_str(self.execute_command(f'diag gpio get {gpio}')))
2622
2623    def diag_set_gpio_mode(self, gpio: int, mode: str):
2624        """Set the gpio mode."""
2625        self.execute_command(f'diag gpio mode {gpio} {mode}')
2626
2627    def diag_get_gpio_mode(self, gpio: int) -> str:
2628        """Get the gpio mode."""
2629        return self.__parse_str(self.execute_command(f'diag gpio mode {gpio}'))
2630
2631    def diag_echo(self, message: str) -> str:
2632        """RCP echoes the given message."""
2633        return self.__parse_str(self.execute_command(f'diag echo {message}'))
2634
2635    def diag_echo_number(self, number: int) -> str:
2636        """RCP echoes the given message."""
2637        return self.__parse_str(self.execute_command(f'diag echo -n {number}'))
2638
2639    def diag_get_powersettings(self) -> List[Dict[str, Any]]:
2640        """Get the currently used power settings table."""
2641        result = []
2642        output = self.execute_command(f'diag powersettings')
2643
2644        if len(output) < 3:
2645            raise UnexpectedCommandOutput(output)
2646
2647        if not output[-1].startswith('Done'):
2648            raise UnexpectedCommandOutput(output)
2649
2650        for line in output[2:-1]:
2651            data = line.split('|')
2652
2653            result.append({
2654                'channel_start': int(data[1]),
2655                'channel_end': int(data[2]),
2656                'target_power': int(data[3]),
2657                'actual_power': int(data[4]),
2658                'raw_power_setting': self.__hex_to_bytes(data[5].lstrip().rstrip()),
2659            })
2660
2661        return result
2662
2663    def diag_get_channel_powersettings(self, channel: int) -> Dict[str, Any]:
2664        """Gets the currently used power settings for the given channel."""
2665        result = {}
2666        output = self.execute_command(f'diag powersettings {channel}')
2667
2668        if len(output) != 4:
2669            raise UnexpectedCommandOutput(output)
2670
2671        if not output[-1].startswith('Done'):
2672            raise UnexpectedCommandOutput(output)
2673
2674        result['target_power'] = int(output[0].split(':')[1])
2675        result['actual_power'] = int(output[1].split(':')[1])
2676        result['raw_power_setting'] = self.__hex_to_bytes(output[2].split(':')[1].lstrip().rstrip())
2677
2678        return result
2679
2680    def diag_get_rawpowersetting(self) -> str:
2681        """Get the raw power setting."""
2682        return self.__parse_str(self.execute_command('diag rawpowersetting'))
2683
2684    def diag_set_rawpowersetting(self, rawpowersetting: str):
2685        """Set the raw power setting."""
2686        self.execute_command(f'diag rawpowersetting {rawpowersetting}')
2687
2688    def diag_enable_rawpowersetting(self):
2689        """Enable the raw power setting."""
2690        self.execute_command('diag rawpowersetting enable')
2691
2692    def diag_disable_rawpowersetting(self):
2693        """Disable the raw power setting."""
2694        self.execute_command('diag rawpowersetting disable')
2695
2696    def is_command_supported(self, command: str) -> bool:
2697        """Check whether the the given command is supported by the device."""
2698        output = self.__otcmd.execute_command(command, timeout=10)
2699
2700        if re.match("Error \d+: \w*", output[-1]):
2701            return False
2702
2703        return True
2704
2705    #
2706    # Network management utilities
2707    #
2708    def create_dataset(self,
2709                       active_timestamp: Optional[int] = None,
2710                       channel: Optional[int] = None,
2711                       channel_mask: Optional[int] = None,
2712                       extpanid: Optional[str] = None,
2713                       mesh_local_prefix: Optional[str] = None,
2714                       network_key: Optional[str] = None,
2715                       network_name: Optional[str] = None,
2716                       panid: Optional[int] = None,
2717                       pskc: Optional[str] = None,
2718                       security_policy: Optional[tuple] = None,
2719                       pending_timestamp: Optional[int] = None) -> bytes:
2720        """Creates a new Operational Dataset with given parameters."""
2721        self.dataset_clear_buffer()
2722        self.dataset_init_buffer()
2723        self.dataset_set_buffer(active_timestamp, channel, channel_mask, extpanid, mesh_local_prefix, network_key,
2724                                network_name, panid, pskc, security_policy, pending_timestamp)
2725        return self.get_dataset_tlvs_bytes()
2726
2727    def join(self, dataset: bytes) -> None:
2728        """Joins to a Thread network with given Active Operational Dataset."""
2729        self.set_dataset_bytes('active', dataset)
2730        self.ifconfig_up()
2731        self.thread_start()
2732
2733    def leave(self) -> None:
2734        """Leaves from the Thread network."""
2735        self.thread_stop()
2736        self.ifconfig_down()
2737
2738    def wait_for(self, command: str, expect_line: Optional[Union[str, Pattern, Collection[Any]]], timeout: float = 60):
2739        """Wait for the expected output by periodically executing the given command."""
2740        success = False
2741
2742        while timeout > 0:
2743            output = self.execute_command(command)
2744            if any(match_line(line, expect_line) for line in output):
2745                success = True
2746                break
2747
2748            self.__otcmd.wait(1)
2749            timeout -= 1
2750
2751        if not success:
2752            raise ExpectLineTimeoutError(expect_line)
2753
2754    #
2755    # Other TODOs
2756    #
2757    # TODO: netstat
2758    # TODO: networkdiagnostic get <addr> <type> ..
2759    # TODO: networkdiagnostic reset <addr> <type> ..
2760    # TODO: parent
2761    # TODO: pskc [-p] <key>|<passphrase>
2762    #
2763
2764    #
2765    # Platform Commands Utilities
2766    #
2767    def support_iperf3(self) -> bool:
2768        """Check whether the platform supports iperf3."""
2769        #
2770        # Command example:
2771        #
2772        # $ command -v iperf3
2773        # /usr/bin/iperf3
2774        #
2775        ret = False
2776        output = self.execute_platform_command('command -v iperf3')
2777        if len(output) > 0 and 'iperf3' in output[0]:
2778            ret = True
2779
2780        return ret
2781
2782    def iperf3_client(self,
2783                      host: Union[str, Ip6Addr],
2784                      ipv6: bool = True,
2785                      udp: bool = True,
2786                      bind_address: Optional[Union[str, Ip6Addr]] = None,
2787                      bitrate: int = 10000,
2788                      interval: int = 10,
2789                      transmit_time: int = 10,
2790                      length: Optional[int] = None) -> Dict[str, Dict[str, Any]]:
2791        """Run iperf3 in client mode.
2792
2793        :param host: The host IPv6 address to send iperf3 traffic.
2794        :param ipv6: True to use IPv6, False to use IPv4 (default IPv6).
2795        :param udp: True to use UDP, False to use TCP (default UDP).
2796        :param bind_address: The local address to be bound.
2797        :param bitrate: The target bitrate in bits/sec (default 10000 bit/sec).
2798        :param interval: Seconds between periodic throughput reports (default 10 sec).
2799        :param transmit_time: Time in seconds to transmit for (default 10 secs)
2800        :param length: Length of buffer to read or write (default None).
2801        """
2802        #
2803        # Iperf3 client example:
2804        #
2805        # $ iperf3 -6 -c fdd6:f5cf:d32d:8d88:a98b:cf7c:2ed2:691a -u -b 90000 -i 20 -t 10 -l 1232 -f k
2806        # Connecting to host fdd6:f5cf:d32d:8d88:a98b:cf7c:2ed2:691a, port 5201
2807        # [  5] local fdd6:f5cf:d32d:8d88:0:ff:fe00:fc00 port 59495 connected to fdd6:f5cf:d32d:8d88:a98b:cf7c:2ed2:691a port 5201
2808        # [ ID] Interval           Transfer     Bitrate         Total Datagrams
2809        # [  5]   0.00-10.00  sec   111 KBytes  90.7 Kbits/sec  92
2810        # - - - - - - - - - - - - - - - - - - - - - - - - -
2811        # [ ID] Interval           Transfer     Bitrate         Jitter    Lost/Total Datagrams
2812        # [  5]   0.00-10.00  sec   111 KBytes  90.7 Kbits/sec  0.000 ms  0/92 (0%)  sender
2813        # [  5]   0.00-10.96  sec  99.9 KBytes  74.7 Kbits/sec  30.157 ms  9/92 (9.8%)  receiver
2814        #
2815        # iperf Done.
2816        #
2817
2818        wait_time = 10
2819        client_option = f'-c {host}'
2820        version_option = "-6" if ipv6 else "-4"
2821        udp_option = '-u' if udp else ''
2822        bind_option = f'-B {bind_address}' if bind_address else ''
2823        bitrate_option = f'-b {bitrate}'
2824        interval_option = f'-i {interval}'
2825        time_option = f'-t {transmit_time}'
2826        length_option = f'-l {length}' if length else ''
2827        format_option = '-f k'
2828
2829        cmd = f'iperf3 {version_option} {client_option} {udp_option} {bitrate_option} {interval_option} {time_option} {length_option} {format_option}'
2830        output = self.execute_platform_command(cmd, timeout=transmit_time + wait_time)
2831
2832        results = {}
2833        for line in output:
2834            fields = line.split()
2835            if len(fields) != 13:
2836                continue
2837
2838            if fields[-1] == 'sender':
2839                results['sender'] = self.__parse_iperf3_report(line)
2840            elif fields[-1] == 'receiver':
2841                results['receiver'] = self.__parse_iperf3_report(line)
2842
2843        return results
2844
2845    def iperf3_server(self,
2846                      bind_address: Optional[Union[str, Ip6Addr]] = None,
2847                      interval: int = 10,
2848                      timeout: int = 60) -> Dict[str, Any]:
2849        """Run iperf3 in server mode.
2850
2851        :param bind_address: The local address to be bound.
2852        :param interval: Seconds between periodic throughput reports (default 10 sec).
2853        :param timeout: Timeout in seconds to abort the program (default 60 secs)
2854        """
2855        #
2856        # Iperf3 server example:
2857        #
2858        # $ iperf3 -s -1 -B fdd6:f5cf:d32d:8d88:a98b:cf7c:2ed2:691a -i 50 -f k
2859        # -----------------------------------------------------------
2860        # Server listening on 5201
2861        # -----------------------------------------------------------
2862        # Accepted connection from fdd6:f5cf:d32d:8d88:0:ff:fe00:fc00, port 44080
2863        # [  5] local fdd6:f5cf:d32d:8d88:a98b:cf7c:2ed2:691a port 5201 connected to fdd6:f5cf:d32d:8d88:0:ff:fe00:fc00 port 59495
2864        # [ ID] Interval           Transfer     Bitrate         Jitter    Lost/Total Datagrams
2865        # [  5]   0.00-10.96  sec  99.9 KBytes  74.7 Kbits/sec  30.157 ms  9/92 (9.8%)
2866        # - - - - - - - - - - - - - - - - - - - - - - - - -
2867        # [ ID] Interval           Transfer     Bitrate         Jitter    Lost/Total Datagrams
2868        # [  5]   0.00-10.96  sec  99.9 KBytes  74.7 Kbits/sec  30.157 ms  9/92 (9.8%)  receiver
2869        #
2870
2871        bind_option = f'-B {bind_address}' if bind_address else ''
2872        interval_option = f'-i {interval}'
2873        format_option = '-f k'
2874
2875        cmd = f'iperf3 -s -1 {bind_option} {interval_option} {format_option}'
2876        output = self.execute_platform_command(cmd, timeout)
2877
2878        results = {}
2879        for line in output:
2880            fields = line.split()
2881            if len(fields) == 13 and fields[-1] == 'receiver':
2882                results = self.__parse_iperf3_report(line)
2883
2884        return results
2885
2886    def __parse_iperf3_report(self, line: str) -> Dict[str, Any]:
2887        results = {}
2888        fields = line.split()
2889        format_unit = 1000
2890
2891        if len(fields) == 13 and (fields[-1] == 'sender' or fields[-1] == 'receiver'):
2892            results['id'] = int(fields[1].replace(']', ''))
2893            results['interval_start'] = float(fields[2].split('-')[0])
2894            results['interval_end'] = float(fields[2].split('-')[1])
2895            results['transfer'] = int(float(fields[4]) * format_unit)
2896            results['bitrate'] = int(float(fields[6]) * format_unit)
2897            results['jitter'] = float(fields[8])
2898            results['lossrate'] = float(fields[11].replace('(', '').replace(')', '').replace('%', '')) / 100
2899            results['datagrams'] = fields[12]
2900
2901        return results
2902
2903    #
2904    # Private methods
2905    #
2906
2907    def __parse_str(self, output: List[str]) -> str:
2908        if len(output) != 1:
2909            raise UnexpectedCommandOutput(output)
2910
2911        return output[0]
2912
2913    def __parse_int_list(self, output: List[str]) -> List[int]:
2914        line = self.__parse_str(output)
2915        return list(map(int, line.strip().split()))
2916
2917    def __parse_ip6addr(self, output: List[str]) -> Ip6Addr:
2918        return Ip6Addr(self.__parse_str(output))
2919
2920    def __parse_ip6addr_list(self, output: List[str]) -> List[Ip6Addr]:
2921        return [Ip6Addr(line) for line in output]
2922
2923    def __parse_int(self, output: List[str], base=10) -> int:
2924        if len(output) != 1:
2925            raise UnexpectedCommandOutput(output)
2926
2927        return int(output[0], base)
2928
2929    def __parse_network_key(self, output: List[str]) -> str:
2930        networkkey = self.__parse_str(output)
2931
2932        try:
2933            self.__validate_network_key(networkkey)
2934        except ValueError:
2935            raise UnexpectedCommandOutput(output)
2936
2937        return networkkey
2938
2939    def __validate_network_key(self, networkkey: str):
2940        if len(networkkey) != 32:
2941            raise ValueError(networkkey)
2942
2943        int(networkkey, 16)
2944
2945    def __parse_hex64b(self, output: List[str]) -> str:
2946        extaddr = self.__parse_str(output)
2947
2948        try:
2949            self.__validate_hex64b(extaddr)
2950        except ValueError:
2951            raise UnexpectedCommandOutput(output)
2952
2953        return extaddr
2954
2955    __parse_extaddr = __parse_hex64b
2956    __parse_extpanid = __parse_hex64b
2957    __parse_eui64 = __parse_hex64b
2958    __parse_joiner_id = __parse_hex64b
2959
2960    def __validate_hex64b(self, extaddr: str):
2961        if len(extaddr) != 16:
2962            raise ValueError(extaddr)
2963
2964        self.__validate_hex(extaddr)
2965
2966    def __validate_hex(self, hexstr: str):
2967        if len(hexstr) % 2 != 0:
2968            raise ValueError(hexstr)
2969
2970        for i in range(0, len(hexstr), 2):
2971            int(hexstr[i:i + 2], 16)
2972
2973    __validate_extaddr = __validate_hex64b
2974    __validate_extpanid = __validate_hex64b
2975
2976    def __parse_Enabled_or_Disabled(self, output: List[str]) -> bool:
2977        return self.__parse_values(output, Enabled=True, Disabled=False)
2978
2979    def __parse_values(self, output: List[str], **vals) -> Any:
2980        val = self.__parse_str(output)
2981        if val not in vals:
2982            raise UnexpectedCommandOutput(output)
2983
2984        return vals[val]
2985
2986    def __validate_hex_or_bytes(self, data: Union[str, bytes]) -> str:
2987        if isinstance(data, bytes):
2988            return ''.join('%02x' % c for c in data)
2989        else:
2990            self.__validate_hex(data)
2991            return data
2992
2993    def __hex_to_bytes(self, hexstr: str) -> bytes:
2994        self.__validate_hex(hexstr)
2995        return bytes(int(hexstr[i:i + 2], 16) for i in range(0, len(hexstr), 2))
2996
2997    def __bytes_to_hex(self, data: bytes) -> str:
2998        return ''.join('%02x' % b for b in data)
2999
3000    def __escape_escapable(self, s: str) -> str:
3001        """Escape CLI escapable characters in the given string.
3002        """
3003        escapable_chars = '\\ \t\r\n'
3004        for char in escapable_chars:
3005            s = s.replace(char, '\\%s' % char)
3006        return s
3007
3008    def __txt_to_hex(self, txt: Dict[str, Union[str, bytes, bool]]) -> str:
3009        txt_bin = b''
3010        for k, v in txt.items():
3011            assert '=' not in k, 'TXT key must not contain `=`'
3012
3013            if isinstance(v, str):
3014                entry = f'{k}={v}'.encode('utf8')
3015            elif isinstance(v, bytes):
3016                entry = f'{k}='.encode('utf8') + v
3017            else:
3018                assert v is True, 'TXT val must be str or bytes or True'
3019                entry = k.encode('utf8')
3020
3021            assert len(entry) <= 255, 'TXT entry is too long'
3022
3023            txt_bin += bytes([len(entry)])
3024            txt_bin += entry
3025
3026        return ''.join('%02x' % b for b in txt_bin)
3027
3028
3029def connect_cli_sim(executable: str, nodeid: int, simulator: Optional[Simulator] = None) -> OTCI:
3030    cli_handler = connectors.OtCliSim(executable, nodeid, simulator=simulator)
3031    cmd_handler = OtCliCommandRunner(cli_handler)
3032    return OTCI(cmd_handler)
3033
3034
3035def connect_cli_serial(dev: str, baudrate=115200) -> OTCI:
3036    cli_handler = connectors.OtCliSerial(dev, baudrate)
3037    cmd_handler = OtCliCommandRunner(cli_handler)
3038    return OTCI(cmd_handler)
3039
3040
3041def connect_ncp_sim(executable: str, nodeid: int, simulator: Optional[Simulator] = None) -> OTCI:
3042    ncp_handler = connectors.OtNcpSim(executable, nodeid, simulator=simulator)
3043    cmd_handler = OtCliCommandRunner(ncp_handler, is_spinel_cli=True)
3044    return OTCI(cmd_handler)
3045
3046
3047def connect_otbr_ssh(host: str, port: int = 22, username='pi', password='raspberry', sudo=True):
3048    cmd_handler = OtbrSshCommandRunner(host, port, username, password, sudo=sudo)
3049    return OTCI(cmd_handler)
3050
3051
3052def connect_otbr_adb_tcp(host: str, port: int = 5555):
3053    cmd_handler = OtbrAdbTcpCommandRunner(host, port)
3054    return OTCI(cmd_handler)
3055
3056
3057def connect_otbr_adb_usb(serial: str):
3058    cmd_handler = OtbrAdbUsbCommandRunner(serial)
3059    return OTCI(cmd_handler)
3060
3061
3062def connect_cmd_handler(cmd_handler: OTCommandHandler) -> OTCI:
3063    return OTCI(cmd_handler)
3064