1#!/usr/bin/env python3
2#
3#  Copyright (c) 2020, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29import ipaddress
30import logging
31import re
32from collections import Counter
33from typing import Callable, List, Collection, Union, Tuple, Optional, Dict, Pattern, Any
34
35from . import connectors
36from .command_handlers import OTCommandHandler, OtCliCommandRunner, OtbrSshCommandRunner, OtbrAdbCommandRunner
37from .connectors import Simulator
38from .errors import UnexpectedCommandOutput, ExpectLineTimeoutError, CommandError, InvalidArgumentsError
39from .types import ChildId, Rloc16, Ip6Addr, ThreadState, PartitionId, DeviceMode, RouterId, SecurityPolicy, Ip6Prefix, \
40    RouterTableEntry, NetifIdentifier
41from .utils import match_line, constant_property
42
43
44class OTCI(object):
45    """
46    This class represents an OpenThread Controller Interface instance that provides versatile interfaces to
47    manipulate an OpenThread device.
48    """
49
50    DEFAULT_EXEC_COMMAND_RETRY = 4  # A command is retried 4 times if failed.
51
52    __exec_command_retry = DEFAULT_EXEC_COMMAND_RETRY
53
54    def __init__(self, otcmd: OTCommandHandler):
55        """
56        This method initializes an OTCI instance.
57
58        :param otcmd: An OpenThread Command Handler instance to execute OpenThread CLI commands.
59        """
60        self.__otcmd: OTCommandHandler = otcmd
61        self.__logger = logging.getLogger(name=str(self))
62
63    def __repr__(self):
64        """Gets the string representation of the OTCI instance."""
65        return repr(self.__otcmd)
66
67    def wait(self, duration: float, expect_line: Optional[Union[str, Pattern, Collection[Any]]] = None):
68        """Wait for a given duration.
69
70        :param duration: The duration (in seconds) wait for.
71        :param expect_line: The line expected to output if given.
72                            Raise ExpectLineTimeoutError if expect_line is not found within the given duration.
73        """
74        self.log('info', "wait for %.3f seconds", duration)
75        if expect_line is None:
76            self.__otcmd.wait(duration)
77        else:
78            success = False
79
80            while duration > 0:
81                output = self.__otcmd.wait(1)
82                if any(match_line(line, expect_line) for line in output):
83                    success = True
84                    break
85
86                duration -= 1
87
88            if not success:
89                raise ExpectLineTimeoutError(expect_line)
90
91    def close(self):
92        """Close the OTCI instance."""
93        self.__otcmd.close()
94
95    def execute_command(self,
96                        cmd: str,
97                        timeout: float = 10,
98                        silent: bool = False,
99                        already_is_ok: bool = True) -> List[str]:
100        for i in range(self.__exec_command_retry + 1):
101            try:
102                return self.__execute_command(cmd, timeout, silent, already_is_ok=already_is_ok)
103            except Exception:
104                self.wait(2)
105                if i == self.__exec_command_retry:
106                    raise
107        assert False
108
109    def __execute_command(self,
110                          cmd: str,
111                          timeout: float = 10,
112                          silent: bool = False,
113                          already_is_ok: bool = True) -> List[str]:
114        """Execute the OpenThread CLI command.
115
116        :param cmd: The command to execute.
117        :param timeout: The command timeout.
118        :param silent: Whether to run the command silent without logging.
119        :returns: The command output as a list of lines.
120        """
121        if not silent:
122            self.log('info', '> %s', cmd)
123
124        output = self.__otcmd.execute_command(cmd, timeout)
125
126        if not silent:
127            for line in output:
128                self.log('info', '%s', line)
129
130        if cmd in ('reset', 'factoryreset'):
131            return output
132
133        if output[-1] == 'Done' or (already_is_ok and output[-1] == 'Error 24: Already'):
134            output = output[:-1]
135            return output
136        else:
137            raise CommandError(cmd, output)
138
139    def set_execute_command_retry(self, n: int):
140        assert n >= 0
141        self.__exec_command_retry = n
142
143    def shell(self, cmd: str, timeout: float = 10):
144        self.log('info', '# %s', cmd)
145        output = self.__otcmd.shell(cmd, timeout=timeout)
146        for line in output:
147            self.log('info', '%s', line)
148        return output
149
150    def set_logger(self, logger: logging.Logger):
151        """Set the logger for the OTCI instance, or None to disable logging."""
152        self.__logger = logger
153
154    def log(self, level, fmt, *args, **kwargs):
155        if self.__logger is not None:
156            getattr(self.__logger, level)('(%s) ' + fmt, repr(self), *args, **kwargs)
157
158    def set_line_read_callback(self, callback: Optional[Callable[[str], Any]]):
159        """Set the callback that will be called for each line output by the CLI."""
160        self.__otcmd.set_line_read_callback(callback)
161
162    #
163    # Constant properties
164    #
165    @constant_property
166    def version(self):
167        """Returns the firmware version. (e.g. "OPENTHREAD/20191113-01411-gb2d66e424-dirty; SIMULATION; Nov 14 2020 14:24:38")"""
168        return self.__parse_str(self.execute_command('version'))
169
170    @constant_property
171    def thread_version(self):
172        """Get the Thread Version number."""
173        return self.__parse_int(self.execute_command('thread version'))
174
175    @constant_property
176    def api_version(self):
177        """Get API version number."""
178        try:
179            return self.__parse_int(self.execute_command('version api'))
180        except ValueError:
181            # If the device does not have `version api` command, it will print the firmware version, which would lead to ValueError.
182            return 0
183
184    #
185    # Basic device operations
186    #
187    def ifconfig_up(self):
188        """Bring up the IPv6 interface."""
189        self.execute_command('ifconfig up')
190
191    def ifconfig_down(self):
192        """Bring down the IPv6 interface."""
193        self.execute_command('ifconfig down')
194
195    def get_ifconfig_state(self) -> bool:
196        """Get the status of the IPv6 interface."""
197        return self.__parse_values(self.execute_command('ifconfig'), up=True, down=False)
198
199    def thread_start(self):
200        """Enable Thread protocol operation and attach to a Thread network."""
201        self.execute_command('thread start')
202
203    def thread_stop(self):
204        """Disable Thread protocol operation and detach from a Thread network."""
205        self.execute_command('thread stop')
206
207    def reset(self):
208        """Signal a platform reset."""
209        self.execute_command('reset')
210
211    def factory_reset(self):
212        """Delete all stored settings, and signal a platform reset."""
213        self.execute_command('factoryreset')
214
215    #
216    # Network Operations
217    #
218    _PING_STATISTICS_PATTERN = re.compile(
219        r'^(?P<transmitted>\d+) packets transmitted, (?P<received>\d+) packets received.(?: Packet loss = (?P<loss>\d+\.\d+)%.)?(?: Round-trip min/avg/max = (?P<min>\d+)/(?P<avg>\d+\.\d+)/(?P<max>\d+) ms.)?$'
220    )
221
222    def ping(self,
223             ip: Union[str, Ip6Addr],
224             size: int = 8,
225             count: int = 1,
226             interval: float = 1,
227             hoplimit: int = 64,
228             timeout: float = 3) -> Dict:
229        """Send an ICMPv6 Echo Request.
230        The default arguments are consistent with https://github.com/openthread/openthread/blob/main/src/core/utils/ping_sender.hpp.
231
232        :param ip: The target IPv6 address to ping.
233        :param size: The number of data bytes in the payload. Default is 8.
234        :param count: The number of ICMPv6 Echo Requests to be sent. Default is 1.
235        :param interval: The interval between two consecutive ICMPv6 Echo Requests in seconds. The value may have fractional form, for example 0.5. Default is 1.
236        :param hoplimit: The hoplimit of ICMPv6 Echo Request to be sent. Default is 64. See OPENTHREAD_CONFIG_IP6_HOP_LIMIT_DEFAULT in src/core/config/ip6.h.
237        :param timeout: The maximum duration in seconds for the ping command to wait after the final echo request is sent. Default is 3.
238        """
239        cmd = f'ping {ip} {size} {count} {interval} {hoplimit} {timeout}'
240
241        timeout_allowance = 3
242        lines = self.execute_command(cmd, timeout=(count - 1) * interval + timeout + timeout_allowance)
243
244        statistics = {}
245        for line in lines:
246            m = OTCI._PING_STATISTICS_PATTERN.match(line)
247            if m is not None:
248                if m.group('transmitted') is not None:
249                    statistics['transmitted_packets'] = int(m.group('transmitted'))
250                    statistics['received_packets'] = int(m.group('received'))
251                if m.group('loss') is not None:
252                    statistics['packet_loss'] = float(m.group('loss')) / 100
253                if m.group('min') is not None:
254                    statistics['round_trip_time'] = {
255                        'min': int(m.group('min')),
256                        'avg': float(m.group('avg')),
257                        'max': int(m.group('max'))
258                    }
259        return statistics
260
261    def ping_stop(self):
262        """Stop sending ICMPv6 Echo Requests."""
263        self.execute_command('ping stop')
264
265    def discover(self, channel: Optional[int] = None) -> List[Dict[str, Any]]:
266        """Perform an MLE Discovery operation."""
267        return self.__scan_networks('discover', channel)
268
269    def scan(self, channel: Optional[int] = None) -> List[Dict[str, Any]]:
270        """Perform an IEEE 802.15.4 Active Scan."""
271        return self.__scan_networks('scan', channel)
272
273    def __scan_networks(self, cmd: str, channel: Optional[int] = None) -> List[Dict[str, Any]]:
274        if channel is not None:
275            cmd += f' {channel}'
276
277        output = self.execute_command(cmd, timeout=10)
278        if len(output) < 2:
279            raise UnexpectedCommandOutput(output)
280
281        networks = []
282        for line in output[2:]:
283            fields = line.strip().split('|')
284
285            try:
286                _, J, netname, extpanid, panid, extaddr, ch, dbm, lqi, _ = fields
287            except Exception:
288                logging.warning('ignored output: %r', line)
289                continue
290
291            networks.append({
292                'joinable': bool(int(J)),
293                'network_name': netname.strip(),
294                'extpanid': extpanid,
295                'panid': int(panid, 16),
296                'extaddr': extaddr,
297                'channel': int(ch),
298                'dbm': int(dbm),
299                'lqi': int(lqi),
300            })
301
302        return networks
303
304    def scan_energy(self, duration: Optional[float] = None, channel: Optional[int] = None) -> Dict[int, int]:
305        """Perform an IEEE 802.15.4 Energy Scan."""
306        cmd = 'scan energy'
307        if duration is not None:
308            cmd += f' {duration * 1000:d}'
309
310        if channel is not None:
311            cmd += f' {channel}'
312
313        output = self.execute_command(cmd, timeout=10)
314        if len(output) < 2:
315            raise UnexpectedCommandOutput(output)
316
317        channels = {}
318        for line in output[2:]:
319            fields = line.strip().split('|')
320
321            _, Ch, RSSI, _ = fields
322            channels[int(Ch)] = int(RSSI)
323
324        return channels
325
326    def mac_send_data_request(self):
327        """Instruct an Rx-Off-When-Idle device to send a Data Request mac frame to its parent."""
328        self.execute_command('mac send datarequest')
329
330    def mac_send_empty_data(self):
331        """Instruct an Rx-Off-When-Idle device to send a Empty Data mac frame to its parent."""
332        self.execute_command('mac send emptydata')
333
334    # TODO: discover
335    # TODO: dns resolve <hostname> [DNS server IP] [DNS server port]
336    # TODO: fake /a/an <dst-ipaddr> <target> <meshLocalIid>
337    # TODO: sntp query
338
339    #
340    # Set or get device/network parameters
341    #
342
343    def get_mode(self) -> str:
344        """Get the Thread Device Mode value.
345
346            -: no flags set (rx-off-when-idle, minimal Thread device, stable network data)
347            r: rx-on-when-idle
348            d: Full Thread Device
349            n: Full Network Data
350        """
351        return self.__parse_str(self.execute_command('mode'))
352
353    def set_mode(self, mode: str):
354        """Set the Thread Device Mode value.
355
356            -: no flags set (rx-off-when-idle, minimal Thread device, stable network data)
357            r: rx-on-when-idle
358            d: Full Thread Device
359            n: Full Network Data
360        """
361        self.execute_command(f'mode {DeviceMode(mode)}')
362
363    def get_extaddr(self) -> str:
364        """Get the IEEE 802.15.4 Extended Address."""
365        return self.__parse_extaddr(self.execute_command('extaddr'))
366
367    def set_extaddr(self, extaddr: str):
368        """Set the IEEE 802.15.4 Extended Address."""
369        self.__validate_hex64b(extaddr)
370        self.execute_command(f'extaddr {extaddr}')
371
372    def get_eui64(self) -> str:
373        """Get the factory-assigned IEEE EUI-64."""
374        return self.__parse_eui64(self.execute_command('eui64'))
375
376    def set_extpanid(self, extpanid: str):
377        """Set the Thread Extended PAN ID value."""
378        self.__validate_extpanid(extpanid)
379        self.execute_command(f'extpanid {extpanid}')
380
381    def get_extpanid(self) -> str:
382        """Get the Thread Extended PAN ID value."""
383        return self.__parse_extpanid(self.execute_command('extpanid'))
384
385    def set_channel(self, ch):
386        """Set the IEEE 802.15.4 Channel value."""
387        self.execute_command('channel %d' % ch)
388
389    def get_channel(self):
390        """Get the IEEE 802.15.4 Channel value."""
391        return self.__parse_int(self.execute_command('channel'))
392
393    def get_preferred_channel_mask(self) -> int:
394        """Get preferred channel mask."""
395        return self.__parse_int(self.execute_command('channel preferred'))
396
397    def get_supported_channel_mask(self):
398        """Get supported channel mask."""
399        return self.__parse_int(self.execute_command('channel supported'))
400
401    def get_panid(self):
402        """Get the IEEE 802.15.4 PAN ID value."""
403        return self.__parse_int(self.execute_command('panid'), 16)
404
405    def set_panid(self, panid):
406        """Get the IEEE 802.15.4 PAN ID value."""
407        self.execute_command('panid %d' % panid)
408
409    def set_network_name(self, name):
410        """Set network name."""
411        self.execute_command('networkname %s' % self.__escape_escapable(name))
412
413    def get_network_name(self):
414        """Get network name."""
415        return self.__parse_str(self.execute_command('networkname'))
416
417    def get_network_key(self) -> str:
418        """Get the network key."""
419        return self.__parse_network_key(self.execute_command(self.__detect_networkkey_cmd()))
420
421    def set_network_key(self, networkkey: str):
422        """Set the network key."""
423        self.__validate_network_key(networkkey)
424        cmd = self.__detect_networkkey_cmd()
425        self.execute_command(f'{cmd} {networkkey}')
426
427    def get_key_sequence_counter(self) -> int:
428        """Get the Thread Key Sequence Counter."""
429        return self.__parse_int(self.execute_command('keysequence counter'))
430
431    def set_key_sequence_counter(self, counter: int):
432        """Set the Thread Key Sequence Counter."""
433        self.execute_command(f'keysequence counter {counter}')
434
435    def get_key_sequence_guard_time(self) -> int:
436        """Get Thread Key Switch Guard Time (in hours)."""
437        return self.__parse_int(self.execute_command('keysequence guardtime'))
438
439    def set_key_sequence_guard_time(self, hours: int):
440        """Set Thread Key Switch Guard Time (in hours) 0 means Thread Key Switch immediately if key index match."""
441        self.execute_command(f'keysequence guardtime {hours}')
442
443    def get_cca_threshold(self) -> int:
444        """Get the CCA threshold in dBm measured at antenna connector per IEEE 802.15.4 - 2015 section 10.1.4."""
445        output = self.execute_command(f'ccathreshold')
446        val = self.__parse_str(output)
447        if not val.endswith(' dBm'):
448            raise UnexpectedCommandOutput(output)
449
450        return int(val[:-4])
451
452    def set_cca_threshold(self, val: int):
453        """Set the CCA threshold measured at antenna connector per IEEE 802.15.4 - 2015 section 10.1.4."""
454        self.execute_command(f'ccathreshold {val}')
455
456    def get_promiscuous(self) -> bool:
457        """Get radio promiscuous property."""
458        return self.__parse_Enabled_or_Disabled(self.execute_command('promiscuous'))
459
460    def enable_promiscuous(self):
461        """Enable radio promiscuous operation and print raw packet content."""
462        self.execute_command('promiscuous enable')
463
464    def disable_promiscuous(self):
465        """Disable radio promiscuous operation."""
466        self.execute_command('promiscuous disable')
467
468    def get_txpower(self) -> int:
469        """Get the transmit power in dBm."""
470        line = self.__parse_str(self.execute_command('txpower'))
471        if not line.endswith(' dBm'):
472            raise UnexpectedCommandOutput([line])
473
474        return int(line.split()[0])
475
476    def set_txpower(self, val: int):
477        """Set the transmit power in dBm."""
478        self.execute_command(f'txpower {val}')
479
480    # TODO: fem
481    # TODO: fem lnagain
482    # TODO: fem lnagain <LNA gain>
483    # TODO: mac retries direct
484    # TODO: mac retries direct
485    # TODO: mac retries indirect
486    # TODO: mac retries indirect <number>
487
488    #
489    # Basic Node states and properties
490    #
491
492    def get_state(self) -> ThreadState:
493        """Get the current Thread state."""
494        return ThreadState(self.__parse_str(self.execute_command('state')))
495
496    def set_state(self, state: str):
497        """Try to switch to state detached, child, router or leader."""
498        self.execute_command(f'state {state}')
499
500    def get_rloc16(self) -> Rloc16:
501        """Get the Thread RLOC16 value."""
502        return Rloc16(self.__parse_int(self.execute_command('rloc16'), 16))
503
504    def get_router_id(self) -> int:
505        """Get the Thread Router ID value."""
506        return self.get_rloc16() >> 10
507
508    def prefer_router_id(self, routerid: int):
509        """Prefer a Router ID when solicit router id from Leader."""
510        self.execute_command(f'preferrouterid {routerid}')
511
512    def is_singleton(self) -> bool:
513        return self.__parse_values(self.execute_command('singleton'), true=True, false=False)
514
515    #
516    # RCP related utilities
517    #
518
519    def get_rcp_version(self):
520        return self.__parse_str(self.execute_command('rcp version'))
521
522    #
523    # Unsecure port utilities
524    #
525
526    def get_unsecure_ports(self) -> List[int]:
527        """all ports from the allowed unsecured port list."""
528        return self.__parse_int_list(self.execute_command('unsecureport get'))
529
530    def add_unsecure_port(self, port: int):
531        """Add a port to the allowed unsecured port list."""
532        self.execute_command(f'unsecureport add {port}')
533
534    def remove_unsecure_port(self, port: int):
535        """Remove a port from the allowed unsecured port list."""
536        self.execute_command(f'unsecureport remove {port}')
537
538    def clear_unsecure_ports(self):
539        """Remove all ports from the allowed unsecured port list."""
540        self.execute_command('unsecureport remove all')
541
542    #
543    # Leader configurations
544    #
545
546    def get_preferred_partition_id(self) -> PartitionId:
547        """Get the preferred Thread Leader Partition ID."""
548        return PartitionId(self.__parse_int(self.execute_command(self.__get_partition_preferred_cmd())))
549
550    def set_preferred_partition_id(self, parid: int):
551        """Set the preferred Thread Leader Partition ID."""
552        self.execute_command(f'{self.__get_partition_preferred_cmd()} {parid}')
553
554    def __get_partition_preferred_cmd(self) -> str:
555        """"""
556        return 'partitionid preferred' if self.api_version >= 51 else 'leaderpartitionid'
557
558    def get_leader_weight(self) -> int:
559        """Get the Thread Leader Weight."""
560        return self.__parse_int(self.execute_command('leaderweight'))
561
562    def set_leader_weight(self, weight: int):
563        """Set the Thread Leader Weight."""
564        self.execute_command(f'leaderweight {weight}')
565
566    __LEADER_DATA_KEY_MAP = {
567        'Partition ID': 'partition_id',
568        'Weighting': 'weight',
569        'Data Version': 'data_ver',
570        'Stable Data Version': 'stable_data_ver',
571        'Leader Router ID': 'leader_id',
572    }
573
574    def get_leader_data(self) -> Dict[str, int]:
575        """Get the Thread Leader Data."""
576        data = {}
577        output = self.execute_command('leaderdata')
578
579        try:
580            for line in output:
581                k, v = line.split(': ')
582                data[OTCI.__LEADER_DATA_KEY_MAP[k]] = int(v)
583        except KeyError:
584            raise UnexpectedCommandOutput(output)
585
586        return data
587
588    #
589    # Router configurations
590    #
591
592    def get_router_selection_jitter(self):
593        """Get the ROUTER_SELECTION_JITTER value."""
594        return self.__parse_int(self.execute_command('routerselectionjitter'))
595
596    def set_router_selection_jitter(self, jitter):
597        """Set the ROUTER_SELECTION_JITTER value."""
598        self.execute_command(f'routerselectionjitter {jitter}')
599
600    def get_network_id_timeout(self) -> int:
601        """Get the NETWORK_ID_TIMEOUT parameter used in the Router role."""
602        return self.__parse_int(self.execute_command('networkidtimeout'))
603
604    def set_network_id_timeout(self, timeout: int):
605        """Set the NETWORK_ID_TIMEOUT parameter used in the Router role."""
606        self.execute_command(f'networkidtimeout {timeout}')
607
608    def get_parent_priority(self) -> int:
609        """Get the assigned parent priority value, -2 means not assigned."""
610        return self.__parse_int(self.execute_command('parentpriority'))
611
612    def set_parent_priority(self, priority: int):
613        """Set the assigned parent priority value: 1, 0, -1 or -2."""
614        self.execute_command(f'parentpriority {priority}')
615
616    def get_router_upgrade_threshold(self) -> int:
617        """Get the ROUTER_UPGRADE_THRESHOLD value."""
618        return self.__parse_int(self.execute_command('routerupgradethreshold'))
619
620    def set_router_upgrade_threshold(self, threshold: int):
621        """Set the ROUTER_UPGRADE_THRESHOLD value."""
622        self.execute_command(f'routerupgradethreshold {threshold}')
623
624    def get_router_downgrade_threshold(self):
625        """Set the ROUTER_DOWNGRADE_THRESHOLD value."""
626        return self.__parse_int(self.execute_command('routerdowngradethreshold'))
627
628    def set_router_downgrade_threshold(self, threshold: int):
629        """Get the ROUTER_DOWNGRADE_THRESHOLD value."""
630        self.execute_command(f'routerdowngradethreshold {threshold}')
631
632    def get_router_eligible(self) -> bool:
633        """Indicates whether the router role is enabled or disabled."""
634        return self.__parse_Enabled_or_Disabled(self.execute_command('routereligible'))
635
636    def enable_router_eligible(self):
637        """Disable the router role."""
638        self.execute_command('routereligible enable')
639
640    def disable_router_eligible(self):
641        """Disable the router role."""
642        self.execute_command('routereligible disable')
643
644    def get_router_list(self) -> List[RouterId]:
645        """Get allocated Router IDs."""
646        line = self.__parse_str(self.execute_command('router list'))
647        return list(map(RouterId, line.strip().split()))
648
649    def get_router_table(self) -> Dict[RouterId, RouterTableEntry]:
650        """table of routers."""
651        output = self.execute_command('router table')
652        if len(output) < 2:
653            raise UnexpectedCommandOutput(output)
654
655        #
656        # Example output:
657        #
658        # | ID | RLOC16 | Next Hop | Path Cost | LQ In | LQ Out | Age | Extended MAC     |
659        # +----+--------+----------+-----------+-------+--------+-----+------------------+
660        # | 21 | 0x5400 |       21 |         0 |     3 |      3 |   5 | d28d7f875888fccb |
661        # | 56 | 0xe000 |       56 |         0 |     0 |      0 | 182 | f2d92a82c8d8fe43 |
662        # Done
663        #
664
665        headers = self.__split_table_row(output[0])
666
667        table = {}
668        for line in output[2:]:
669            line = line.strip()
670            if not line:
671                continue
672
673            fields = self.__split_table_row(line)
674            if len(fields) != len(headers):
675                raise UnexpectedCommandOutput(output)
676
677            col = lambda colname: self.__get_table_col(colname, headers, fields)
678            id = col('ID')
679
680            table[RouterId(id)] = router = RouterTableEntry({
681                'id': RouterId(id),
682                'rloc16': Rloc16(col('RLOC16'), 16),
683                'next_hop': int(col('Next Hop')),
684                'path_cost': int(col('Path Cost')),
685                'lq_in': int(col('LQ In')),
686                'lq_out': int(col('LQ Out')),
687                'age': int(col('Age')),
688                'extaddr': col('Extended MAC'),
689            })
690
691            if 'Link' in headers:
692                router['link'] = int(col('Link'))
693            else:
694                # support older version of OT which does not output `Link` field
695                router['link'] = self.get_router_info(router['id'], silent=True)['link']
696
697        return table
698
699    def get_router_info(self, id: int, silent: bool = False) -> RouterTableEntry:
700        cmd = f'router {id}'
701        info = {}
702        output = self.execute_command(cmd, silent=silent)
703        items = [line.strip().split(': ') for line in output]
704
705        headers = [h for h, _ in items]
706        fields = [f for _, f in items]
707        col = lambda colname: self.__get_table_col(colname, headers, fields)
708
709        return RouterTableEntry({
710            'id': RouterId(id),
711            'rloc16': Rloc16(col('Rloc'), 16),
712            'alloc': int(col('Alloc')),
713            'next_hop': int(col('Next Hop'), 16) >> 10,  # convert RLOC16 to Router ID
714            'link': int(col('Link')),
715        })
716
717    #
718    # Router utilities: Child management
719    #
720
721    def get_child_table(self) -> Dict[ChildId, Dict[str, Any]]:
722        """Get the table of attached children."""
723        output = self.execute_command('child table')
724        if len(output) < 2:
725            raise UnexpectedCommandOutput(output)
726
727        #
728        # Example output:
729        # | ID  | RLOC16 | Timeout    | Age        | LQ In | C_VN |R|D|N|Ver|CSL|QMsgCnt| Extended MAC     |
730        # +-----+--------+------------+------------+-------+------+-+-+-+---+---+-------+------------------+
731        # |   1 | 0xc801 |        240 |         24 |     3 |  131 |1|0|0|  3| 0 |     0 | 4ecede68435358ac |
732        # |   2 | 0xc802 |        240 |          2 |     3 |  131 |0|0|0|  3| 1 |     0 | a672a601d2ce37d8 |
733        # Done
734        #
735
736        headers = self.__split_table_row(output[0])
737
738        table = {}
739        for line in output[2:]:
740            line = line.strip()
741            if not line:
742                continue
743
744            fields = self.__split_table_row(line)
745            col = lambda colname: self.__get_table_col(colname, headers, fields)
746
747            id = int(col("ID"))
748            r, d, n = int(col("R")), int(col("D")), int(col("N"))
749            mode = DeviceMode(f'{"r" if r else ""}{"d" if d else ""}{"n" if n else ""}')
750
751            child = {
752                'id': ChildId(id),
753                'rloc16': Rloc16(col('RLOC16'), 16),
754                'timeout': int(col('Timeout')),
755                'age': int(col('Age')),
756                'lq_in': int(col('LQ In')),
757                'c_vn': int(col('C_VN')),
758                'mode': mode,
759                'extaddr': col('Extended MAC')
760            }
761
762            if 'Ver' in headers:
763                child['ver'] = int(col('Ver'))
764
765            if 'CSL' in headers:
766                child['csl'] = bool(int(col('CSL')))
767
768            if 'QMsgCnt' in headers:
769                child['qmsgcnt'] = int(col('QMsgCnt'))
770
771            table[ChildId(id)] = child
772
773        return table
774
775    #
776    # DNS server & client utilities
777    #
778
779    _IPV6_SERVER_PORT_PATTERN = re.compile(r'\[(.*)\]:(\d+)')
780
781    def dns_get_config(self):
782        """Get DNS client query config."""
783        output = self.execute_command('dns config')
784        config = {}
785        for line in output:
786            k, v = line.split(': ')
787            if k == 'Server':
788                matched = re.match(OTCI._IPV6_SERVER_PORT_PATTERN, v)
789                assert matched is not None
790                ip, port = matched.groups()
791                config['server'] = (Ip6Addr(ip), int(port))
792            elif k == 'ResponseTimeout':
793                config['response_timeout'] = int(v[:-3])
794            elif k == 'MaxTxAttempts':
795                config['max_tx_attempts'] = int(v)
796            elif k == 'RecursionDesired':
797                config['recursion_desired'] = (v == 'yes')
798            else:
799                logging.warning("dns config ignored: %s", line)
800
801        return config
802
803    def dns_set_config(self,
804                       server: Tuple[Union[str, ipaddress.IPv6Address], int],
805                       response_timeout: Optional[int] = None,
806                       max_tx_attempts: Optional[int] = None,
807                       recursion_desired: Optional[bool] = None):
808        """Set DNS client query config."""
809        cmd = f'dns config {str(server[0])} {server[1]}'
810        if response_timeout is not None:
811            cmd += f' {response_timeout}'
812
813        assert max_tx_attempts is None or response_timeout is not None, "must specify `response_timeout` if `max_tx_attempts` is specified."
814        if max_tx_attempts is not None:
815            cmd += f' {max_tx_attempts}'
816
817        assert recursion_desired is None or max_tx_attempts is not None, 'must specify `max_tx_attempts` if `recursion_desired` is specified.'
818        if recursion_desired is not None:
819            cmd += f' {1 if recursion_desired else 0}'
820
821        self.execute_command(cmd)
822
823    def dns_get_compression(self) -> bool:
824        """Get DNS compression mode."""
825        return self.__parse_Enabled_or_Disabled(self.execute_command('dns compression'))
826
827    def dns_enable_compression(self):
828        """Enable DNS compression mode."""
829        self.execute_command('dns compression enable')
830
831    def dns_disable_compression(self):
832        """Disable DNS compression mode."""
833        self.execute_command('dns compression disable')
834
835    def dns_browse(self, service: str) -> List[Dict]:
836        """Browse DNS service instances."""
837        cmd = f'dns browse {service}'
838        output = '\n'.join(self.execute_command(cmd, 30.0))
839
840        result = []
841        for ins, port, priority, weight, srv_ttl, hostname, address, aaaa_ttl, txt_data, txt_ttl in re.findall(
842                r'(.*?)\s+Port:(\d+), Priority:(\d+), Weight:(\d+), TTL:(\d+)\s*Host:(\S+)\s+HostAddress:(\S+) TTL:(\d+)\s+TXT:(\[.*?\]) TTL:(\d+)',
843                output):
844            result.append({
845                'instance': ins,
846                'service': service,
847                'port': int(port),
848                'priority': int(priority),
849                'weight': int(weight),
850                'host': hostname,
851                'address': Ip6Addr(address),
852                'txt': self.__parse_srp_server_service_txt(txt_data),
853                'srv_ttl': int(srv_ttl),
854                'txt_ttl': int(txt_ttl),
855                'aaaa_ttl': int(aaaa_ttl),
856            })
857
858        return result
859
860    def dns_resolve(self, hostname: str) -> List[Dict]:
861        """Resolve a DNS host name."""
862        cmd = f'dns resolve {hostname}'
863        output = self.execute_command(cmd, 30.0)
864        dns_resp = output[0]
865        addrs = dns_resp.strip().split(' - ')[1].split(' ')
866        ips = [Ip6Addr(item.strip()) for item in addrs[::2]]
867        ttls = [int(item.split('TTL:')[1]) for item in addrs[1::2]]
868
869        return [{
870            'address': ip,
871            'ttl': ttl,
872        } for ip, ttl in zip(ips, ttls)]
873
874    def dns_resolve_service(self, instance: str, service: str) -> Dict:
875        """Resolves aservice instance."""
876        instance = self.__escape_escapable(instance)
877        cmd = f'dns service {instance} {service}'
878        output = self.execute_command(cmd, 30.0)
879
880        m = re.match(
881            r'.*Port:(\d+), Priority:(\d+), Weight:(\d+), TTL:(\d+)\s+Host:(.*?)\s+HostAddress:(\S+) TTL:(\d+)\s+TXT:(\[.*?\]) TTL:(\d+)',
882            '\t'.join(output))
883        if m:
884            port, priority, weight, srv_ttl, hostname, address, aaaa_ttl, txt_data, txt_ttl = m.groups()
885            return {
886                'instance': instance,
887                'service': service,
888                'port': int(port),
889                'priority': int(priority),
890                'weight': int(weight),
891                'host': hostname,
892                'address': Ip6Addr(address),
893                'txt': self.__parse_srp_server_service_txt(txt_data),
894                'srv_ttl': int(srv_ttl),
895                'txt_ttl': int(txt_ttl),
896                'aaaa_ttl': int(aaaa_ttl),
897            }
898        else:
899            raise CommandError(cmd, output)
900
901    #
902    # SRP server & client utilities
903    #
904
905    def srp_server_get_state(self):
906        """Get the SRP server state"""
907        return self.__parse_str(self.execute_command('srp server state'))
908
909    def srp_server_enable(self):
910        """Enable SRP server."""
911        self.execute_command('srp server enable')
912
913    def srp_server_disable(self):
914        """Disable SRP server."""
915        self.execute_command('srp server disable')
916
917    def srp_server_get_domain(self) -> str:
918        """Get the SRP server domain."""
919        return self.__parse_str(self.execute_command('srp server domain'))
920
921    def srp_server_set_domain(self, domain: str):
922        """Set the SRP server domain."""
923        self.execute_command(f'srp server domain {domain}')
924
925    def srp_server_get_hosts(self) -> List[Dict]:
926        """Get SRP server registered hosts."""
927        return self.__parse_srp_server_hosts(self.execute_command('srp server host'))
928
929    def srp_server_get_services(self) -> List[Dict]:
930        """Get SRP server registered services."""
931        output = self.execute_command('srp server service')
932        return self.__parse_srp_server_services(output)
933
934    def __parse_srp_server_hosts(self, output: List[str]) -> List[Dict]:
935        result = []
936        info = None
937        for line in output:
938            if not line.startswith(' '):
939                info = {'host': line}
940                result.append(info)
941            else:
942                assert info is not None
943                k, v = line.strip().split(': ')
944                if k == 'deleted':
945                    if v not in ('true', 'false'):
946                        raise UnexpectedCommandOutput(output)
947
948                    info['deleted'] = (v == 'true')
949
950                elif k == 'addresses':
951                    if not v.startswith('[') or not v.endswith(']'):
952                        raise UnexpectedCommandOutput(output)
953
954                    v = v[1:-1]
955                    info['addresses'] = list(map(Ip6Addr, v.split(', ')))
956                else:
957                    raise UnexpectedCommandOutput(output)
958
959        return result
960
961    def __parse_srp_server_services(self, output: List[str]) -> List[Dict]:
962        result = []
963        info = None
964        for line in output:
965            if not line.startswith(' '):
966                info = {'instance': line}
967                result.append(info)
968            else:
969                assert info is not None
970                k, v = line.strip().split(': ')
971                if k == 'deleted':
972                    if v not in ('true', 'false'):
973                        raise UnexpectedCommandOutput(output)
974
975                    info['deleted'] = (v == 'true')
976
977                elif k == 'addresses':
978                    if not v.startswith('[') or not v.endswith(']'):
979                        raise UnexpectedCommandOutput(output)
980
981                    v = v[1:-1]
982                    info['addresses'] = list(map(Ip6Addr, v.split(', ')))
983                elif k == 'subtypes':
984                    info[k] = list() if v == '(null)' else list(v.split(','))
985                elif k in ('port', 'weight', 'priority', 'ttl', 'lease', 'key-lease'):
986                    info[k] = int(v)
987                elif k in ('host',):
988                    info[k] = v
989                elif k == 'TXT':
990                    info['txt'] = self.__parse_srp_server_service_txt(v)
991                else:
992                    raise UnexpectedCommandOutput(output)
993
994        return result
995
996    def __parse_srp_server_service_txt(self, txt: str) -> Dict[str, Union[bytes, bool]]:
997        # example value: [txt11=76616c3131, txt12=76616c3132]
998        assert txt.startswith('[') and txt.endswith(']')
999        txt_dict = {}
1000        for entry in txt[1:-1].split(', '):
1001            if not entry:
1002                continue
1003
1004            equal_pos = entry.find('=')
1005
1006            if equal_pos != -1:
1007                k, v = entry[:equal_pos], entry[equal_pos + 1:]
1008                txt_dict[k] = bytes(int(v[i:i + 2], 16) for i in range(0, len(v), 2))
1009            else:
1010                txt_dict[entry] = True
1011
1012        return txt_dict
1013
1014    def srp_server_get_lease(self) -> Tuple[int, int, int, int]:
1015        """Get SRP server LEASE & KEY-LEASE range (in seconds)."""
1016        lines = self.execute_command(f'srp server lease')
1017        return tuple([int(line.split(':')[1].strip()) for line in lines])
1018
1019    def srp_server_set_lease(self, min_lease: int, max_lease: int, min_key_lease: int, max_key_lease: int):
1020        """Configure SRP server LEASE & KEY-LEASE range (in seconds)."""
1021        self.execute_command(f'srp server lease {min_lease} {max_lease} {min_key_lease} {max_key_lease}')
1022
1023    def srp_client_get_state(self) -> bool:
1024        """Get SRP client state."""
1025        return self.__parse_Enabled_or_Disabled(self.execute_command('srp client state'))
1026
1027    def srp_client_start(self, server_ip: Union[str, ipaddress.IPv6Address], server_port: int):
1028        """Start SRP client."""
1029        self.execute_command(f'srp client start {str(server_ip)} {server_port}')
1030
1031    def srp_client_stop(self):
1032        """Stop SRP client."""
1033        self.execute_command('srp client stop')
1034
1035    def srp_client_get_autostart(self) -> bool:
1036        """Get SRP client autostart mode."""
1037        return self.__parse_Enabled_or_Disabled(self.execute_command('srp client autostart'))
1038
1039    def srp_client_enable_autostart(self):
1040        """Enable SRP client autostart mode."""
1041        self.execute_command('srp client autostart enable')
1042
1043    def srp_client_disable_autostart(self):
1044        """Disable SRP client autostart mode."""
1045        self.execute_command('srp client autostart disable')
1046
1047    def srp_client_get_callback(self) -> bool:
1048        """Get SRP client callback mode."""
1049        return self.__parse_Enabled_or_Disabled(self.execute_command('srp client callback'))
1050
1051    def srp_client_enable_callback(self):
1052        """Enable SRP client callback mode."""
1053        self.execute_command('srp client callback enable')
1054
1055    def srp_client_disable_callback(self):
1056        """Disable SRP client callback mode."""
1057        self.execute_command('srp client callback disable')
1058
1059    def srp_client_set_host_name(self, name: str):
1060        """Set SRP client host name."""
1061        self.execute_command(f'srp client host name {name}')
1062
1063    def srp_client_get_host(self) -> Dict:
1064        """Get SRP client host."""
1065        output = self.__parse_str(self.execute_command('srp client host'))
1066        return self.__parse_srp_client_host(output)
1067
1068    _SRP_CLIENT_HOST_PATTERN = re.compile(r'name:("(.*)"|(\(null\))), state:(\S+), addrs:\[(.*)\]')
1069
1070    def __parse_srp_client_host(self, line: str) -> Dict:
1071        m = re.match(OTCI._SRP_CLIENT_HOST_PATTERN, line)
1072        if not m:
1073            raise UnexpectedCommandOutput([line])
1074
1075        _, host, _, state, addrs = m.groups()
1076        return {
1077            'host': host or '',
1078            'state': state,
1079            'addresses': [Ip6Addr(ip) for ip in addrs.split(', ')] if addrs else [],
1080        }
1081
1082    def srp_client_get_host_name(self) -> str:
1083        """Get SRP client host name."""
1084        name = self.__parse_str(self.execute_command('srp client host name'))
1085        return name if name != '(null)' else ''
1086
1087    def srp_client_get_host_addresses(self) -> List[Ip6Addr]:
1088        """Get SRP client host addresses."""
1089        return self.__parse_ip6addr_list(self.execute_command('srp client host address'))
1090
1091    def srp_client_set_host_addresses(self, *addrs: Union[str, ipaddress.IPv6Address]):
1092        """Set SRP client host addresses."""
1093        self.execute_command(f'srp client host address {" ".join(map(str, addrs))}')
1094
1095    def srp_client_get_host_state(self):
1096        """Get SRP client host state."""
1097        return self.__parse_str(self.execute_command('srp client host state'))
1098
1099    def srp_client_remove_host(self, remove_key_lease=False):
1100        """Remove SRP client host."""
1101        cmd = 'srp client host remove'
1102        if remove_key_lease:
1103            cmd += ' 1'
1104
1105        self.execute_command(cmd)
1106
1107    def srp_client_get_services(self) -> List[Dict]:
1108        """Get SRP client services."""
1109        output = self.execute_command('srp client service')
1110        return [self.__parse_srp_client_service(line) for line in output]
1111
1112    _SRP_CLIENT_SERVICE_PATTERN = re.compile(
1113        r'instance:"(.*)", name:"(.*)", state:(\S+), port:(\d+), priority:(\d+), weight:(\d+)')
1114
1115    def __parse_srp_client_service(self, line: str) -> Dict:
1116        # e.g. instance:"ins2", name:"_meshcop._udp", state:ToAdd, port:2000, priority:2, weight:2
1117        m = OTCI._SRP_CLIENT_SERVICE_PATTERN.match(line)
1118        if m is None:
1119            raise UnexpectedCommandOutput([line])
1120
1121        instance, service, state, port, priority, weight = m.groups()
1122        port, priority, weight = int(port), int(priority), int(weight)
1123        return {
1124            'instance': instance,
1125            'service': service,
1126            'state': state,
1127            'port': port,
1128            'priority': priority,
1129            'weight': weight,
1130        }
1131
1132    def srp_client_add_service(self,
1133                               instance: str,
1134                               service: str,
1135                               port: int,
1136                               priority: int = 0,
1137                               weight: int = 0,
1138                               txt: Optional[Dict[str, Union[str, bytes, bool]]] = None):
1139        instance = self.__escape_escapable(instance)
1140        cmd = f'srp client service add {instance} {service} {port} {priority} {weight}'
1141        if txt:
1142            cmd += f' {self.__txt_to_hex(txt)}'
1143        self.execute_command(cmd)
1144
1145    def srp_client_remove_service(self, instance: str, service: str):
1146        """Remove a service from SRP client."""
1147        self.execute_command(f'srp client service remove {instance} {service}')
1148
1149    def srp_client_clear_service(self, instance: str, service: str):
1150        """Remove a service from SRP client without notifying the SRP server."""
1151        self.execute_command(f'srp client service clear {instance} {service}')
1152
1153    def srp_client_get_key_lease_interval(self) -> int:
1154        """Get SRP client key lease interval (in seconds)."""
1155        return self.__parse_int(self.execute_command('srp client keyleaseinterval'))
1156
1157    def srp_client_set_key_lease_interval(self, interval: int):
1158        """Set SRP client key lease interval (in seconds)."""
1159        self.execute_command(f'srp client keyleaseinterval {interval}')
1160
1161    def srp_client_get_lease_interval(self) -> int:
1162        """Get SRP client lease interval (in seconds)."""
1163        return self.__parse_int(self.execute_command('srp client leaseinterval'))
1164
1165    def srp_client_set_lease_interval(self, interval: int):
1166        """Set SRP client lease interval (in seconds)."""
1167        self.execute_command(f'srp client leaseinterval {interval}')
1168
1169    def srp_client_get_server(self) -> Tuple[Ip6Addr, int]:
1170        """Get the SRP server (IP, port)."""
1171        result = self.__parse_str(self.execute_command('srp client server'))
1172        matched = re.match(OTCI._IPV6_SERVER_PORT_PATTERN, result)
1173        assert matched
1174        ip, port = matched.groups()
1175        return Ip6Addr(ip), int(port)
1176
1177    def srp_client_get_service_key(self) -> bool:
1178        """Get SRP client "service key record inclusion" mode."""
1179        return self.__parse_Enabled_or_Disabled(self.execute_command('srp client service key'))
1180
1181    def srp_client_enable_service_key(self):
1182        """Enable SRP client "service key record inclusion" mode."""
1183        self.execute_command('srp client service key enable')
1184
1185    def srp_client_disable_service_key(self):
1186        """Disable SRP client "service key record inclusion" mode."""
1187        self.execute_command('srp client service key disable')
1188
1189    def __split_table_row(self, row: str) -> List[str]:
1190        if not (row.startswith('|') and row.endswith('|')):
1191            raise ValueError(row)
1192
1193        fields = row.split('|')
1194        fields = [x.strip() for x in fields[1:-1]]
1195        return fields
1196
1197    def __get_table_col(self, colname: str, headers: List[str], fields: List[str]) -> str:
1198        return fields[headers.index(colname)]
1199
1200    def get_child_list(self) -> List[ChildId]:
1201        """Get attached Child IDs."""
1202        line = self.__parse_str(self.execute_command(f'child list'))
1203        return [ChildId(id) for id in line.strip().split()]
1204
1205    def get_child_info(self, child: Union[ChildId, Rloc16]) -> Dict[str, Any]:
1206        output = self.execute_command(f'child {child}')
1207
1208        info = {}
1209
1210        for line in output:
1211            k, v = line.split(': ')
1212            if k == 'Child ID':
1213                info['id'] = int(v)
1214            elif k == 'Rloc':
1215                info['rloc16'] = int(v, 16)
1216            elif k == 'Ext Addr':
1217                info['extaddr'] = v
1218            elif k == 'Mode':
1219                info['mode'] = DeviceMode(v)
1220            elif k == 'Net Data':
1221                info['c_vn'] = int(v)
1222            elif k == 'Timeout':
1223                info['timeout'] = int(v)
1224            elif k == 'Age':
1225                info['age'] = int(v)
1226            elif k == 'Link Quality In':
1227                info['lq_in'] = int(v)
1228            elif k == 'RSSI':
1229                info['rssi'] = int(v)
1230            else:
1231                self.log('warning', "Child info %s: %s ignored", k, v)
1232
1233        return info
1234
1235    def get_child_ipaddrs(self) -> Dict[Rloc16, List[Ip6Addr]]:
1236        """Get the list of IP addresses stored for MTD children.
1237
1238        Note: Each MTD child might has multiple IP addresses.
1239        """
1240        output = self.execute_command('childip')
1241
1242        ipaddrs = {}
1243
1244        for line in output:
1245            rloc16, ip = line.split(': ')
1246            rloc16 = Rloc16(rloc16, 16)
1247            ipaddrs.setdefault(rloc16, []).append(Ip6Addr(ip.strip()))
1248
1249        return ipaddrs
1250
1251    #
1252    # Child configurations
1253    #
1254
1255    def get_max_children(self) -> int:
1256        """Get the Thread maximum number of allowed children."""
1257        return self.__parse_int(self.execute_command('childmax'))
1258
1259    def set_max_children(self, val: int):
1260        """Set the Thread maximum number of allowed children."""
1261        self.execute_command(f'childmax {val}')
1262
1263    def get_child_ip_max(self) -> int:
1264        """Get the maximum number of IP addresses that each MTD child may register with this device as parent."""
1265        return self.__parse_int(self.execute_command('childip max'))
1266
1267    def set_child_ip_max(self, val: int):
1268        """Get the maximum number of IP addresses that each MTD child may register with this device as parent."""
1269        self.execute_command(f'childip max {val}')
1270
1271    def get_child_timeout(self):
1272        """Get the Thread Child Timeout value."""
1273        return self.__parse_int(self.execute_command('childtimeout'))
1274
1275    def set_child_timeout(self, timeout):
1276        """Set the Thread Child Timeout value."""
1277        self.execute_command('childtimeout %d' % timeout)
1278
1279    def get_child_supervision_interval(self) -> int:
1280        """Get the Child Supervision Check Timeout value."""
1281        return self.__parse_int(self.execute_command('childsupervision interval'))
1282
1283    def set_child_supervision_interval(self, val: int):
1284        """Set the Child Supervision Interval value.
1285        This command can only be used with FTD devices.
1286        """
1287        self.execute_command(f'childsupervision interval {val}')
1288
1289    def get_child_supervision_check_timeout(self) -> int:
1290        """Get the Child Supervision Check Timeout value."""
1291        return self.__parse_int(self.execute_command('childsupervision checktimeout'))
1292
1293    def set_child_supervision_check_timeout(self, val: int):
1294        """Set the Child Supervision Check Timeout value."""
1295        self.execute_command(f'childsupervision checktimeout {val}')
1296
1297    #
1298    # Neighbor management
1299    #
1300
1301    def get_neighbor_list(self) -> List[Rloc16]:
1302        """Get a list of RLOC16 of neighbors"""
1303        line = self.__parse_str(self.execute_command('neighbor list')).strip()
1304        return [Rloc16(id, 16) for id in line.split()]
1305
1306    def get_neighbor_table(self) -> Dict[Rloc16, Dict[str, Any]]:
1307        output = self.execute_command('neighbor table')
1308        if len(output) < 2:
1309            raise UnexpectedCommandOutput(output)
1310
1311        #
1312        # Example output:
1313        #
1314        # | Role | RLOC16 | Age | Avg RSSI | Last RSSI |R|D|N| Extended MAC     |
1315        # +------+--------+-----+----------+-----------+-+-+-+------------------+
1316        # |   C  | 0xcc01 |  96 |      -46 |       -46 |1|1|1| 1eb9ba8a6522636b |
1317        # |   R  | 0xc800 |   2 |      -29 |       -29 |1|1|1| 9a91556102c39ddb |
1318        # |   R  | 0xf000 |   3 |      -28 |       -28 |1|1|1| 0ad7ed6beaa6016d |
1319        # Done
1320        #
1321
1322        headers = self.__split_table_row(output[0])
1323
1324        table = {}
1325        for line in output[2:]:
1326            line = line.strip()
1327            if not line:
1328                continue
1329
1330            fields = self.__split_table_row(line)
1331            col = lambda colname: self.__get_table_col(colname, headers, fields)
1332
1333            role = col('Role')
1334            is_router = role == 'R'
1335            r, d, n = int(col('R')), int(col('D')), int(col('N'))
1336            mode = DeviceMode(f'{"r" if r else ""}{"d" if d else ""}{"n" if n else ""}')
1337
1338            rloc16 = Rloc16(col('RLOC16'), 16)
1339
1340            table[rloc16] = {
1341                'is_router': is_router,
1342                'rloc16': rloc16,
1343                'age': int(col('Age')),
1344                'avg_rssi': int(col('Avg RSSI')),
1345                'last_rssi': int(col('Last RSSI')),
1346                'mode': mode,
1347                'extaddr': col('Extended MAC'),
1348            }
1349
1350        return table
1351
1352    #
1353    # SED/SSED configuration
1354    #
1355
1356    def get_poll_period(self) -> int:
1357        """Get the customized data poll period of sleepy end device (milliseconds).
1358        Only for Reference Device."""
1359        return self.__parse_int(self.execute_command('pollperiod'))
1360
1361    def set_poll_period(self, poll_period: int):
1362        """Set the customized data poll period (in milliseconds) for sleepy end device.
1363
1364        Only for Reference Device."""
1365        self.execute_command(f'pollperiod {poll_period}')
1366
1367    # TODO: csl
1368    # TODO: csl channel <channel>
1369    # TODO: csl period <period>
1370    # TODO: csl timeout <timeout>
1371
1372    _CSL_PERIOD_PATTERN = re.compile(r'(\d+)us')
1373    _CSL_TIMEOUT_PATTERN = re.compile(r'(\d+)s')
1374
1375    def get_csl_config(self) -> Dict[str, int]:
1376        """Get the CSL configuration."""
1377        output = self.execute_command('csl')
1378
1379        cfg = {}
1380        for line in output:
1381            k, v = line.split(': ')
1382            if k == 'Channel':
1383                cfg['channel'] = int(v)
1384            elif k == 'Timeout':
1385                matched = OTCI._CSL_TIMEOUT_PATTERN.match(v)
1386                assert matched is not None
1387                cfg['timeout'] = int(matched.group(1))
1388            elif k == 'Period':
1389                matched = OTCI._CSL_PERIOD_PATTERN.match(v)
1390                assert matched is not None
1391                cfg['period'] = int(matched.group(1))
1392            else:
1393                logging.warning("Ignore unknown CSL parameter: %s: %s", k, v)
1394
1395        return cfg
1396
1397    def config_csl(self, channel: Optional[int] = None, period: Optional[int] = None, timeout: Optional[int] = None):
1398        """Configure CSL parameters.
1399
1400        :param channel: Set CSL channel.
1401        :param period: Set CSL period in usec. Disable CSL by setting this parameter to 0.
1402        :param timeout: Set the CSL timeout in seconds.
1403        """
1404
1405        if channel is None and period is None and timeout is None:
1406            raise InvalidArgumentsError("Please specify at least 1 parameter to configure.")
1407
1408        if channel is not None:
1409            self.execute_command(f'csl channel {channel}')
1410
1411        if period is not None:
1412            self.execute_command(f'csl period {period}')
1413
1414        if timeout is not None:
1415            self.execute_command(f'csl timeout {timeout}')
1416
1417    #
1418    # Leader utilities
1419    #
1420
1421    def get_context_id_reuse_delay(self) -> int:
1422        """Get the CONTEXT_ID_REUSE_DELAY value."""
1423        return self.__parse_int(self.execute_command('contextreusedelay'))
1424
1425    def set_context_id_reuse_delay(self, val: int):
1426        """Set the CONTEXT_ID_REUSE_DELAY value."""
1427        self.execute_command(f'contextreusedelay {val}')
1428
1429    def release_router_id(self, routerid: int):
1430        """Release a Router ID that has been allocated by the device in the Leader role."""
1431        self.execute_command(f'releaserouterid {routerid}')
1432
1433    # Time Sync utilities
1434    # TODO: networktime
1435    # TODO: networktime <timesyncperiod> <xtalthreshold>
1436    # TODO: delaytimermin
1437    # TODO: delaytimermin <delaytimermin>
1438
1439    #
1440    # Commissioniner operations
1441    #
1442
1443    def commissioner_start(self):
1444        """Start the Commissioner role."""
1445        self.execute_command('commissioner start')
1446
1447    def commissioner_stop(self):
1448        """Stop the Commissioner role."""
1449        self.execute_command('commissioner stop')
1450
1451    def get_commissioiner_state(self) -> str:
1452        """Get current Commissioner state (active or petitioning or disabled)."""
1453        return self.__parse_str(self.execute_command('commissioner state'))
1454
1455    def get_commissioner_session_id(self) -> int:
1456        """Get current commissioner session id."""
1457        return self.__parse_int(self.execute_command('commissioner sessionid'))
1458
1459    def commissioner_add_joiner(self, pskd, eui64=None, discerner=None, timeout=None):
1460        """Add a Joiner entry.
1461
1462        :param pskd: Pre-Shared Key for the Joiner.
1463        :param eui64: The IEEE EUI-64 of the Joiner or '*' to match any Joiner
1464        :param discerner: The Joiner discerner in format number/length.
1465        :param timeout: Joiner timeout in seconds.
1466        """
1467        if (eui64 is not None) == (discerner is not None):
1468            raise InvalidArgumentsError("Please specify eui64 or discerner, but not both.")
1469
1470        if eui64 is not None and eui64 != '*':
1471            self.__validate_extaddr(eui64)
1472
1473        cmd = f'commissioner joiner add {eui64 or discerner} {pskd}'
1474
1475        if timeout is not None:
1476            cmd += f' {timeout}'
1477
1478        self.execute_command(cmd)
1479
1480    def commissioner_remove_jointer(self, eui64=None, discerner=None):
1481        if (eui64 is not None) == (discerner is not None):
1482            raise InvalidArgumentsError("Please specify eui64 or discerner, but not both.")
1483
1484        if eui64 is not None and eui64 != '*':
1485            self.__validate_extaddr(eui64)
1486
1487        self.execute_command(f'commissioner joiner remove {eui64 or discerner}')
1488
1489    def set_commissioner_provisioning_url(self, url: str):
1490        self.execute_command(f'commissioner provisioningurl {url}')
1491
1492    # TODO: commissioner announce
1493    # TODO: commissioner energy
1494    # TODO: commissioner mgmtget
1495    # TODO: commissioner mgmtset
1496    # TODO: commissioner panid
1497
1498    #
1499    # Joiner operations
1500    #
1501    def joiner_start(self, psk: str, provisioning_url: Optional[str] = None):
1502        """Start the Joiner."""
1503        cmd = f'joiner start {psk}'
1504        if provisioning_url is not None:
1505            cmd += f' {provisioning_url}'
1506
1507        self.execute_command(cmd)
1508
1509    def joiner_stop(self):
1510        """Stop the Joiner role."""
1511        self.execute_command('joiner stop')
1512
1513    def get_joiner_id(self) -> str:
1514        """Get the Joiner ID."""
1515        return self.__parse_joiner_id(self.execute_command('joiner id'))
1516
1517    def get_joiner_port(self) -> int:
1518        """Get the Joiner port."""
1519        return self.__parse_int(self.execute_command(f'joinerport'))
1520
1521    def set_joiner_port(self, port: int):
1522        """Set the Joiner port."""
1523        self.execute_command(f'joinerport {port}')
1524
1525    # TODO: joiner discerner
1526
1527    #
1528    # Network Data utilities
1529    #
1530    def get_local_prefixes(self) -> List[Tuple[Ip6Prefix, str, str, Rloc16]]:
1531        """Get prefixes from local Network Data."""
1532        output = self.execute_command('prefix')
1533        return self.__parse_prefixes(output)
1534
1535    def __parse_prefixes(self, output: List[str]) -> List[Tuple[Ip6Prefix, str, str, Rloc16]]:
1536        prefixes = []
1537
1538        for line in output:
1539            if line.startswith('- '):
1540                line = line[2:]
1541
1542            prefix, flags, prf, rloc16 = line.split()[:4]
1543            prefixes.append((Ip6Prefix(prefix), flags, prf, Rloc16(rloc16, 16)))
1544
1545        return prefixes
1546
1547    def add_prefix(self, prefix: str, flags='paosr', prf='med'):
1548        """Add a valid prefix to the Network Data."""
1549        self.execute_command(f'prefix add {prefix} {flags} {prf}')
1550
1551    def remove_prefix(self, prefix: str):
1552        """Invalidate a prefix in the Network Data."""
1553        self.execute_command(f'prefix remove {prefix}')
1554
1555    def register_network_data(self):
1556        self.execute_command('netdata register')
1557
1558    def get_network_data(self) -> Dict[str, List]:
1559        output = self.execute_command('netdata show')
1560
1561        netdata = {}
1562        if output.pop(0) != 'Prefixes:':
1563            raise UnexpectedCommandOutput(output)
1564
1565        prefixes_output = []
1566        while True:
1567            line = output.pop(0)
1568            if line == 'Routes:':
1569                break
1570            else:
1571                prefixes_output.append(line)
1572
1573        netdata['prefixes'] = self.__parse_prefixes(prefixes_output)
1574
1575        routes_output = []
1576        while True:
1577            line = output.pop(0)
1578            if line == 'Services:':
1579                break
1580            else:
1581                routes_output.append(line)
1582
1583        netdata['routes'] = self.__parse_routes(routes_output)
1584
1585        services_output = []
1586        while True:
1587            line = output.pop(0)
1588            if line == 'Contexts:':
1589                break
1590            else:
1591                services_output.append(line)
1592
1593        netdata['services'] = self.__parse_services(services_output)
1594
1595        return netdata
1596
1597    def get_prefixes(self) -> List[Tuple[Ip6Prefix, str, str, Rloc16]]:
1598        """Get network prefixes from Thread Network Data."""
1599        network_data = self.get_network_data()
1600        return network_data['prefixes']
1601
1602    def get_routes(self) -> List[Tuple[str, bool, str, Rloc16]]:
1603        """Get routes from Thread Network Data."""
1604        network_data = self.get_network_data()
1605        return network_data['routes']
1606
1607    def get_services(self) -> List[Tuple[int, bytes, bytes, bool, Rloc16]]:
1608        """Get services from Thread Network Data"""
1609        network_data = self.get_network_data()
1610        return network_data['services']
1611
1612    def __parse_services(self, output: List[str]) -> List[Tuple[int, bytes, bytes, bool, Rloc16]]:
1613        services = []
1614        for line in output:
1615            line = line.split()
1616
1617            enterprise_number, service_data, server_data = line[:3]
1618            if line[3] == 's':
1619                stable, rloc16 = True, line[4]
1620            else:
1621                stable, rloc16 = False, line[3]
1622
1623            enterprise_number = int(enterprise_number)
1624            service_data = self.__hex_to_bytes(service_data)
1625            server_data = self.__hex_to_bytes(server_data)
1626            rloc16 = Rloc16(rloc16, 16)
1627
1628            services.append((enterprise_number, service_data, server_data, stable, rloc16))
1629
1630        return services
1631
1632    def get_network_data_bytes(self) -> bytes:
1633        """Get the raw Network Data."""
1634        hexstr = self.__parse_str(self.execute_command('netdata show -x'))
1635        return bytes(int(hexstr[i:i + 2], 16) for i in range(0, len(hexstr), 2))
1636
1637    def get_local_routes(self) -> List[Tuple[str, bool, str, Rloc16]]:
1638        """Get routes from local Network Data."""
1639        return self.__parse_routes(self.execute_command('route'))
1640
1641    def __parse_routes(self, output: List[str]) -> List[Tuple[str, bool, str, Rloc16]]:
1642        routes = []
1643        for line in output:
1644            line = line.split()
1645            if len(line) == 4:
1646                prefix, flags, prf, rloc16 = line
1647                stable = 's' in flags
1648            else:
1649                prefix, prf, rloc16 = line
1650                stable = False
1651
1652            rloc16 = Rloc16(rloc16, 16)
1653            routes.append((prefix, stable, prf, rloc16))
1654
1655        return routes
1656
1657    def add_route(self, prefix: str, stable=True, prf='med'):
1658        """Add a valid external route to the Network Data."""
1659        cmd = f'route add {prefix}'
1660        if stable:
1661            cmd += ' s'
1662
1663        cmd += f' {prf}'
1664        self.execute_command(cmd)
1665
1666    def remove_route(self, prefix: str):
1667        """Invalidate a external route in the Network Data."""
1668        self.execute_command(f'route remove {prefix}')
1669
1670    def add_service(self, enterprise_number: int, service_data: Union[str, bytes], server_data: Union[str, bytes]):
1671        """Add service to the Network Data.
1672
1673        enterpriseNumber: IANA enterprise number
1674        serviceData: hex-encoded binary service data
1675        serverData: hex-encoded binary server data
1676        """
1677        service_data = self.__validate_hex_or_bytes(service_data)
1678        server_data = self.__validate_hex_or_bytes(server_data)
1679        self.execute_command(f'service add {enterprise_number} {service_data} {server_data}')
1680
1681    def remove_service(self, enterprise_number, service_data):
1682        """Remove service from Network Data.
1683
1684        enterpriseNumber: IANA enterprise number
1685        serviceData: hext-encoded binary service data
1686        """
1687        service_data = self.__validate_hex_or_bytes(service_data)
1688        self.execute_command(f'service remove {enterprise_number} {service_data}')
1689
1690    #
1691    # Dataset management
1692    #
1693
1694    def dataset_init_buffer(self, get_active_dataset=False, get_pending_dataset=False):
1695        """Initialize operational dataset buffer."""
1696        if get_active_dataset and get_pending_dataset:
1697            raise InvalidArgumentsError("Can not specify both `get_active_dataset` and `get_pending_dataset`.")
1698
1699        if get_active_dataset:
1700            self.execute_command(f'dataset init active')
1701        elif get_pending_dataset:
1702            self.execute_command(f'dataset init pending')
1703        else:
1704            self.execute_command(f'dataset init new')
1705
1706    def dataset_commit_buffer(self, dataset: str):
1707        if dataset in ('active', 'pending'):
1708            cmd = f'dataset commit {dataset}'
1709        else:
1710            raise InvalidArgumentsError(f'Unkonwn dataset: {dataset}')
1711
1712        self.execute_command(cmd)
1713
1714    def dataset_clear_buffer(self):
1715        """Reset operational dataset buffer."""
1716        self.execute_command('dataset clear')
1717
1718    def get_dataset(self, dataset: str = 'buffer'):
1719        if dataset in ('active', 'pending'):
1720            cmd = f'dataset {dataset}'
1721        elif dataset == 'buffer':
1722            cmd = 'dataset'
1723        else:
1724            raise InvalidArgumentsError(f'Unkonwn dataset: {dataset}')
1725
1726        output = self.execute_command(cmd)
1727        return self.__parse_dataset(output)
1728
1729    def __parse_dataset(self, output: List[str]) -> Dict[str, Any]:
1730        # Example output:
1731        #
1732        # Active Timestamp: 1
1733        # Channel: 22
1734        # Channel Mask: 0x07fff800
1735        # Ext PAN ID: 5c93ae980ff22d35
1736        # Mesh Local Prefix: fdc7:55fe:6363:bd01::/64
1737        # Network Key: d1a8348d59fb1fac1d6c4f95007d487a
1738        # Network Name: OpenThread-7caa
1739        # PAN ID: 0x7caa
1740        # PSKc: 167d89fd169e439ca0b8266de248090f
1741        # Security Policy: 672 onrc 0
1742
1743        dataset = {}
1744
1745        for line in output:
1746            line = line.split(': ')
1747            key, val = line[0], ': '.join(line[1:])
1748
1749            if key == 'Active Timestamp':
1750                dataset['active_timestamp'] = int(val)
1751            elif key == 'Channel':
1752                dataset['channel'] = int(val)
1753            elif key == 'Channel Mask':
1754                dataset['channel_mask'] = int(val, 16)
1755            elif key == 'Ext PAN ID':
1756                dataset['extpanid'] = val
1757            elif key == 'Mesh Local Prefix':
1758                dataset['mesh_local_prefix'] = val
1759            elif key in ('Network Key', 'Master Key'):
1760                dataset['networkkey'] = val
1761            elif key == 'Network Name':
1762                dataset['network_name'] = val
1763            elif key == 'PAN ID':
1764                dataset['panid'] = int(val, 16)
1765            elif key == 'PSKc':
1766                dataset['pskc'] = val
1767            elif key == 'Security Policy':
1768                rotation_time, flags, version_threshold = val.split(' ')
1769                rotation_time = int(rotation_time)
1770                dataset['security_policy'] = SecurityPolicy(rotation_time, flags)
1771            else:
1772                raise UnexpectedCommandOutput(output)
1773
1774        return dataset
1775
1776    def get_dataset_bytes(self, dataset: str) -> bytes:
1777        if dataset in ('active', 'pending'):
1778            cmd = f'dataset {dataset} -x'
1779        else:
1780            raise InvalidArgumentsError(f'Unkonwn dataset: {dataset}')
1781
1782        hexstr = self.__parse_str(self.execute_command(cmd))
1783        return self.__hex_to_bytes(hexstr)
1784
1785    def set_dataset_bytes(self, dataset: str, data: bytes) -> None:
1786        if dataset in ('active', 'pending'):
1787            cmd = f'dataset set {dataset} {self.__bytes_to_hex(data)}'
1788        else:
1789            raise InvalidArgumentsError(f'Unkonwn dataset: {dataset}')
1790
1791        self.execute_command(cmd)
1792
1793    def dataset_set_buffer(self,
1794                           active_timestamp: Optional[int] = None,
1795                           channel: Optional[int] = None,
1796                           channel_mask: Optional[int] = None,
1797                           extpanid: Optional[str] = None,
1798                           mesh_local_prefix: Optional[str] = None,
1799                           network_key: Optional[str] = None,
1800                           network_name: Optional[str] = None,
1801                           panid: Optional[int] = None,
1802                           pskc: Optional[str] = None,
1803                           security_policy: Optional[tuple] = None,
1804                           pending_timestamp: Optional[int] = None):
1805        if active_timestamp is not None:
1806            self.execute_command(f'dataset activetimestamp {active_timestamp}')
1807
1808        if channel is not None:
1809            self.execute_command(f'dataset channel {channel}')
1810
1811        if channel_mask is not None:
1812            self.execute_command(f'dataset channelmask {channel_mask}')
1813
1814        if extpanid is not None:
1815            self.execute_command(f'dataset extpanid {extpanid}')
1816
1817        if mesh_local_prefix is not None:
1818            self.execute_command(f'dataset meshlocalprefix {mesh_local_prefix}')
1819
1820        if network_key is not None:
1821            nwk_cmd = self.__detect_networkkey_cmd()
1822            self.execute_command(f'dataset {nwk_cmd} {network_key}')
1823
1824        if network_name is not None:
1825            self.execute_command(f'dataset networkname {self.__escape_escapable(network_name)}')
1826
1827        if panid is not None:
1828            self.execute_command(f'dataset panid {panid}')
1829
1830        if pskc is not None:
1831            self.execute_command(f'dataset pskc {pskc}')
1832
1833        if security_policy is not None:
1834            rotation_time, flags = security_policy
1835            self.execute_command(f'dataset securitypolicy {rotation_time} {flags}')
1836
1837        if pending_timestamp is not None:
1838            self.execute_command(f'dataset pendingtimestamp {pending_timestamp}')
1839
1840    # TODO: dataset mgmtgetcommand
1841    # TODO: dataset mgmtsetcommand
1842    # TODO: dataset set <active|pending> <dataset>
1843
1844    #
1845    # Allowlist management
1846    #
1847
1848    def enable_allowlist(self):
1849        self.execute_command(f'macfilter addr {self.__detect_allowlist_cmd()}')
1850
1851    def disable_allowlist(self):
1852        self.execute_command('macfilter addr disable')
1853
1854    def add_allowlist(self, addr: str, rssi: Optional[int] = None):
1855        cmd = f'macfilter addr add {addr}'
1856
1857        if rssi is not None:
1858            cmd += f' {rssi}'
1859
1860        self.execute_command(cmd)
1861
1862    def remove_allowlist(self, addr: str):
1863        self.execute_command(f'macfilter addr remove {addr}')
1864
1865    def clear_allowlist(self):
1866        self.execute_command('macfilter addr clear')
1867
1868    def set_allowlist(self, allowlist: Collection[Union[str, Tuple[str, int]]]):
1869        self.clear_allowlist()
1870
1871        if allowlist is None:
1872            self.disable_allowlist()
1873        else:
1874            self.enable_allowlist()
1875            for item in allowlist:
1876                if isinstance(item, str):
1877                    self.add_allowlist(item)
1878                else:
1879                    addr, rssi = item[0], item[1]
1880                    self.add_allowlist(addr, rssi)
1881
1882    # TODO: denylist
1883    # TODO: macfilter rss
1884    # TODO: macfilter rss add <extaddr> <rss>
1885    # TODO: macfilter rss add-lqi <extaddr> <lqi>
1886    # TODO: macfilter rss remove <extaddr>
1887    # TODO: macfilter rss clear
1888
1889    def __detect_allowlist_cmd(self):
1890        if self.api_version >= 28:
1891            return 'allowlist'
1892        else:
1893            return '\x77\x68\x69\x74\x65\x6c\x69\x73\x74'
1894
1895    def __detect_networkkey_cmd(self) -> str:
1896        return 'networkkey' if self.api_version >= 126 else 'masterkey'
1897
1898    #
1899    # Unicast Addresses management
1900    #
1901    def add_ipaddr(self, ip: Union[str, ipaddress.IPv6Address]):
1902        """Add an IPv6 address to the Thread interface."""
1903        self.execute_command(f'ipaddr add {ip}')
1904
1905    def del_ipaddr(self, ip: Union[str, ipaddress.IPv6Address]):
1906        """Delete an IPv6 address from the Thread interface."""
1907        self.execute_command(f'ipaddr del {ip}')
1908
1909    def get_ipaddrs(self) -> Tuple[Ip6Addr]:
1910        """Get all IPv6 addresses assigned to the Thread interface."""
1911        return tuple(map(Ip6Addr, self.execute_command('ipaddr')))
1912
1913    def has_ipaddr(self, ip: Union[str, ipaddress.IPv6Address]):
1914        """Check if a IPv6 address was added to the Thread interface."""
1915        return ip in self.get_ipaddrs()
1916
1917    def get_ipaddr_mleid(self) -> Ip6Addr:
1918        """Get Thread Mesh Local EID address."""
1919        return self.__parse_ip6addr(self.execute_command('ipaddr mleid'))
1920
1921    def get_ipaddr_linklocal(self) -> Ip6Addr:
1922        """Get Thread link-local IPv6 address."""
1923        return self.__parse_ip6addr(self.execute_command('ipaddr linklocal'))
1924
1925    def get_ipaddr_rloc(self) -> Ip6Addr:
1926        """Get Thread Routing Locator (RLOC) address."""
1927        return self.__parse_ip6addr(self.execute_command('ipaddr rloc'))
1928
1929    #
1930    # Multicast Addresses management
1931    #
1932
1933    def add_ipmaddr(self, ip: Union[str, ipaddress.IPv6Address]):
1934        """Subscribe the Thread interface to the IPv6 multicast address."""
1935        self.execute_command(f'ipmaddr add {ip}')
1936
1937    def del_ipmaddr(self, ip: Union[str, ipaddress.IPv6Address]):
1938        """Unsubscribe the Thread interface to the IPv6 multicast address."""
1939        self.execute_command(f'ipmaddr del {ip}')
1940
1941    def get_ipmaddrs(self) -> Tuple[Ip6Addr]:
1942        """Get all IPv6 multicast addresses subscribed to the Thread interface."""
1943        return tuple(map(Ip6Addr, self.execute_command('ipmaddr')))
1944
1945    def has_ipmaddr(self, ip: Union[str, ipaddress.IPv6Address]):
1946        """Check if a IPv6 multicast address was subscribed by the Thread interface."""
1947        return ip in self.get_ipmaddrs()
1948
1949    def get_ipmaddr_promiscuous(self) -> bool:
1950        """Get multicast promiscuous mode."""
1951        return self.__parse_Enabled_or_Disabled(self.execute_command("ipmaddr promiscuous"))
1952
1953    def enable_ipmaddr_promiscuous(self):
1954        """Enable multicast promiscuous mode."""
1955        self.execute_command('ipmaddr promiscuous enable')
1956
1957    def disable_ipmaddr_promiscuous(self):
1958        """Disable multicast promiscuous mode."""
1959        self.execute_command('ipmaddr promiscuous disable')
1960
1961    def get_ipmaddr_llatn(self) -> Ip6Addr:
1962        """Get Link Local All Thread Nodes Multicast Address"""
1963        return self.__parse_ip6addr(self.execute_command('ipmaddr llatn'))
1964
1965    def get_ipmaddr_rlatn(self) -> Ip6Addr:
1966        """Get Realm Local All Thread Nodes Multicast Address"""
1967        return self.__parse_ip6addr(self.execute_command('ipmaddr rlatn'))
1968
1969    #
1970    # Backbone Router Utilities
1971    #
1972
1973    # TODO: bbr mgmt ...
1974
1975    def enable_backbone_router(self):
1976        """Enable Backbone Router Service for Thread 1.2 FTD.
1977
1978        SRV_DATA.ntf would be triggered for attached device if there is no Backbone Router Service in Thread Network Data.
1979        """
1980        self.execute_command('bbr enable')
1981
1982    def disable_backbone_router(self):
1983        """Disable Backbone Router Service for Thread 1.2 FTD.
1984
1985        SRV_DATA.ntf would be triggered if Backbone Router is Primary state.
1986        """
1987        self.execute_command('bbr disable')
1988
1989    def get_backbone_router_state(self) -> str:
1990        """Get local Backbone state (Disabled or Primary or Secondary) for Thread 1.2 FTD."""
1991        return self.__parse_str(self.execute_command('bbr state'))
1992
1993    def get_primary_backbone_router_info(self) -> Optional[dict]:
1994        """Show current Primary Backbone Router information for Thread 1.2 device."""
1995        output = self.execute_command('bbr')
1996
1997        if len(output) < 1:
1998            raise UnexpectedCommandOutput(output)
1999
2000        line = output[0]
2001        if line == 'BBR Primary: None':
2002            return None
2003
2004        if line != 'BBR Primary:':
2005            raise UnexpectedCommandOutput(output)
2006
2007        # Example output:
2008        # BBR Primary:
2009        # server16: 0xE400
2010        # seqno:    10
2011        # delay:    120 secs
2012        # timeout:  300 secs
2013
2014        dataset = {}
2015
2016        for line in output[1:]:
2017            key, val = line.split(':')
2018            key, val = key.strip(), val.strip()
2019            if key == 'server16':
2020                dataset[key] = int(val, 16)
2021            elif key == 'seqno':
2022                dataset[key] = int(val)
2023            elif key == 'delay':
2024                if not val.endswith(' secs'):
2025                    raise UnexpectedCommandOutput(output)
2026                dataset[key] = int(val.split()[0])
2027            elif key == 'timeout':
2028                if not val.endswith(' secs'):
2029                    raise UnexpectedCommandOutput(output)
2030                dataset[key] = int(val.split()[0])
2031            else:
2032                raise UnexpectedCommandOutput(output)
2033
2034        return dataset
2035
2036    def register_backbone_router_dataset(self):
2037        """Register Backbone Router Service for Thread 1.2 FTD.
2038
2039        SRV_DATA.ntf would be triggered for attached device.
2040        """
2041        self.execute_command('bbr register')
2042
2043    def get_backbone_router_config(self) -> dict:
2044        """Show local Backbone Router configuration for Thread 1.2 FTD."""
2045        output = self.execute_command('bbr config')
2046        # Example output:
2047        # seqno:    10
2048        # delay:    120 secs
2049        # timeout:  300 secs
2050
2051        config = {}
2052
2053        for line in output:
2054            key, val = line.split(':')
2055            key, val = key.strip(), val.strip()
2056            if key == 'seqno':
2057                config[key] = int(val)
2058            elif key in ('delay', 'timeout'):
2059                if not line.endswith(' secs'):
2060                    raise UnexpectedCommandOutput(output)
2061                config[key] = int(val.split()[0])
2062            else:
2063                raise UnexpectedCommandOutput(output)
2064
2065        return config
2066
2067    def set_backbone_router_config(self,
2068                                   seqno: Optional[int] = None,
2069                                   delay: Optional[int] = None,
2070                                   timeout: Optional[int] = None):
2071        """Configure local Backbone Router configuration for Thread 1.2 FTD.
2072
2073        Call register_backbone_router_dataset() to explicitly register Backbone Router service to Leader for Secondary Backbone Router.
2074        """
2075        if seqno is None and delay is None and timeout is None:
2076            raise InvalidArgumentsError("Please specify seqno or delay or timeout")
2077
2078        cmd = 'bbr config'
2079        if seqno is not None:
2080            cmd += f' seqno {seqno}'
2081
2082        if delay is not None:
2083            cmd += f' delay {delay}'
2084
2085        if timeout is not None:
2086            cmd += f' timeout {timeout}'
2087
2088        self.execute_command(cmd)
2089
2090    def get_backbone_router_jitter(self) -> int:
2091        """Get jitter (in seconds) for Backbone Router registration for Thread 1.2 FTD."""
2092        return self.__parse_int(self.execute_command('bbr jitter'))
2093
2094    def set_backbone_router_jitter(self, val: int):
2095        """Set jitter (in seconds) for Backbone Router registration for Thread 1.2 FTD."""
2096        self.execute_command(f'bbr jitter {val}')
2097
2098    def backbone_router_get_multicast_listeners(self) -> List[Tuple[Ip6Addr, int]]:
2099        """Get Backbone Router Multicast Listeners."""
2100        listeners = []
2101        for line in self.execute_command('bbr mgmt mlr listener'):
2102            ip, timeout = line.split()
2103            listeners.append((Ip6Addr(ip), int(timeout)))
2104
2105        return listeners
2106
2107    #
2108    # Thread 1.2 and DUA/MLR utilities
2109    #
2110
2111    def get_domain_name(self) -> str:
2112        """Get the Thread Domain Name for Thread 1.2 device."""
2113        return self.__parse_str(self.execute_command('domainname'))
2114
2115    def set_domain_name(self, name: str):
2116        """Set the Thread Domain Name for Thread 1.2 device."""
2117        self.execute_command('domainname %s' % self.__escape_escapable(name))
2118
2119    # TODO: dua iid
2120    # TODO: dua iid <iid>
2121    # TODO: dua iid clear
2122    # TODO: mlr reg <ipaddr> ... [timeout]
2123
2124    #
2125    # Link metrics management
2126    #
2127    # TODO: linkmetrics mgmt <ipaddr> forward <seriesid> [ldraX][pqmr]
2128    # TODO: linkmetrics probe <ipaddr> <seriesid> <length>
2129    # TODO: linkmetrics query <ipaddr> single [pqmr]
2130    # TODO: linkmetrics query <ipaddr> forward <seriesid>
2131    # TODO: linkquality <extaddr>
2132    # TODO: linkquality <extaddr> <linkquality>
2133    #
2134
2135    #
2136    # Logging
2137    #
2138
2139    def get_log_level(self) -> int:
2140        """Get the log level."""
2141        return self.__parse_int(self.execute_command('log level'))
2142
2143    def set_log_level(self, level: int):
2144        """Set the log level."""
2145        self.execute_command(f'log level {level}')
2146
2147    #
2148    # Device performance related information
2149    #
2150
2151    def get_message_buffer_info(self) -> dict:
2152        """Get the current message buffer information."""
2153        output = self.execute_command('bufferinfo')
2154
2155        info = {}
2156
2157        def _parse_val(val):
2158            vals = val.split()
2159            return int(vals[0]) if len(vals) == 1 else tuple(map(int, vals))
2160
2161        for line in output:
2162            key, val = line.split(':')
2163            key, val = key.strip(), val.strip()
2164            info[key.replace(' ', '_')] = _parse_val(val)
2165
2166        return info
2167
2168    @constant_property
2169    def counter_names(self):
2170        """Get the supported counter names."""
2171        return tuple(self.execute_command('counters'))
2172
2173    def get_counter(self, name: str) -> Counter:
2174        """Reset the counter value."""
2175        output = self.execute_command(f'counters {name}')
2176
2177        counter = Counter()
2178        for line in output:
2179            k, v = line.strip().split(': ')
2180            counter[k] = int(v)
2181
2182        return counter
2183
2184    def reset_counter(self, name: str):
2185        """Reset the counter value."""
2186        self.execute_command(f'counters {name} reset')
2187
2188    def get_eidcache(self) -> Dict[Ip6Addr, Rloc16]:
2189        """Get the EID-to-RLOC cache entries."""
2190        output = self.execute_command('eidcache')
2191        cache = {}
2192
2193        for line in output:
2194            ip, rloc16, _ = line.split(" ", 2)
2195
2196            cache[Ip6Addr(ip)] = Rloc16(rloc16, 16)
2197
2198        return cache
2199
2200    #
2201    # UDP utilities
2202    #
2203
2204    def udp_open(self):
2205        """Opens the example socket."""
2206        self.execute_command('udp open')
2207
2208    def udp_close(self):
2209        """Opens the example socket."""
2210        self.execute_command('udp close')
2211
2212    def udp_bind(self, ip: str, port: int, netif: NetifIdentifier = NetifIdentifier.THERAD):
2213        """Assigns a name (i.e. IPv6 address and port) to the example socket.
2214
2215        :param ip: the IPv6 address or the unspecified IPv6 address (::).
2216        :param port: the UDP port
2217        """
2218        bindarg = ''
2219        if netif == NetifIdentifier.UNSPECIFIED:
2220            bindarg += ' -u'
2221        elif netif == NetifIdentifier.BACKBONE:
2222            bindarg += ' -b'
2223
2224        self.execute_command(f'udp bind{bindarg} {ip} {port}')
2225
2226    def udp_connect(self, ip: str, port: int):
2227        """Specifies the peer with which the socket is to be associated.
2228
2229        ip: the peer's IPv6 address.
2230        port: the peer's UDP port.
2231        """
2232        self.execute_command(f'udp connect {ip} {port}')
2233
2234    def udp_send(self,
2235                 ip: Optional[Union[str, Ip6Addr]] = None,
2236                 port: Optional[int] = None,
2237                 text: Optional[str] = None,
2238                 random_bytes: Optional[int] = None,
2239                 hex: Optional[str] = None):
2240        """Send a few bytes over UDP.
2241
2242        ip: the IPv6 destination address.
2243        port: the UDP destination port.
2244        type: the type of the message: _ -t: text payload in the value, same as without specifying the type. _ -s: autogenerated payload with specified length indicated in the value.
2245        * -x: binary data in hexadecimal representation in the value.
2246        """
2247        if (ip is None) != (port is None):
2248            raise InvalidArgumentsError("Please specify both `ip` and `port`.")
2249
2250        if (text is not None) + (random_bytes is not None) + (hex is not None) != 1:
2251            raise InvalidArgumentsError("Please specify `text` or `random_bytes` or `hex`.")
2252
2253        cmd = 'udp send'
2254
2255        if ip is not None:
2256            cmd += f' {ip} {port}'
2257
2258        if text is not None:
2259            cmd += f' -t {text}'
2260        elif random_bytes is not None:
2261            cmd += f' -s {random_bytes}'
2262        elif hex is not None:
2263            self.__validate_hex(hex)
2264            cmd += f' -x {hex}'
2265
2266        self.execute_command(cmd)
2267
2268    def udp_get_link_security(self) -> bool:
2269        """Gets whether the link security is enabled or disabled."""
2270        return self.__parse_Enabled_or_Disabled(self.execute_command('udp linksecurity'))
2271
2272    def udp_enable_link_security(self):
2273        """Enable link security."""
2274        self.execute_command('udp linksecurity enable')
2275
2276    def udp_disable_link_security(self):
2277        """Disable link security."""
2278        self.execute_command('udp linksecurity disable')
2279
2280    def netstat(self) -> List[Tuple[Tuple[Ip6Addr, int], Tuple[Ip6Addr, int]]]:
2281        cmd = 'netstat'
2282        output = self.execute_command(cmd)
2283        if len(output) < 2:
2284            raise UnexpectedCommandOutput(output)
2285
2286        socks = []
2287        for line in output[2:]:
2288            _, sock_addr, peer_addr = line.strip().split('|')[:3]
2289            sock_addr = self.__parse_socket_addr(sock_addr.strip())
2290            peer_addr = self.__parse_socket_addr(peer_addr.strip())
2291            socks.append((sock_addr, peer_addr))
2292
2293        return socks
2294
2295    @staticmethod
2296    def __parse_socket_addr(addr: str) -> Tuple[Ip6Addr, int]:
2297        addr, port = addr.rsplit(':', 1)
2298        if addr.startswith('[') and addr.endswith(']'):
2299            addr = addr[1:-1]
2300
2301        return Ip6Addr(addr), int(port) if port != '*' else 0
2302
2303    #
2304    # CoAP CLI (test) utilities
2305    #
2306    def coap_start(self):
2307        """Starts the application coap service."""
2308        self.execute_command('coap start')
2309
2310    def coap_stop(self):
2311        """Stops the application coap service."""
2312        self.execute_command('coap stop')
2313
2314    def coap_get(self, addr: Union[str, Ip6Addr], uri_path: str, type: str = "con"):
2315        cmd = f'coap get {addr} {uri_path} {type}'
2316        self.execute_command(cmd)
2317
2318    def coap_put(self, addr: Union[str, Ip6Addr], uri_path: str, type: str = "con", payload: Optional[str] = None):
2319        cmd = f'coap put {addr} {uri_path} {type}'
2320
2321        if payload is not None:
2322            cmd += f' {payload}'
2323
2324        self.execute_command(cmd)
2325
2326    def coap_post(self, addr: Union[str, Ip6Addr], uri_path: str, type: str = "con", payload: Optional[str] = None):
2327        cmd = f'coap post {addr} {uri_path} {type}'
2328
2329        if payload is not None:
2330            cmd += f' {payload}'
2331
2332        self.execute_command(cmd)
2333
2334    def coap_delete(self, addr: Union[str, Ip6Addr], uri_path: str, type: str = "con", payload: Optional[str] = None):
2335        cmd = f'coap delete {addr} {uri_path} {type}'
2336
2337        if payload is not None:
2338            cmd += f' {payload}'
2339
2340        self.execute_command(cmd)
2341
2342    def coap_get_test_resource_path(self) -> str:
2343        """Gets the URI path for the test resource."""
2344        return self.__parse_str(self.execute_command('coap resource'))
2345
2346    def coap_set_test_resource_path(self, path: str):
2347        """Sets the URI path for the test resource."""
2348        self.execute_command(f'coap resource {path}')
2349
2350    def coap_test_set_resource_content(self, content: str):
2351        """Sets the content sent by the test resource. If a CoAP client is observing the resource, a notification is sent to that client."""
2352        self.execute_command(f'coap set {content}')
2353
2354    # TODO: coap observe <address> <uri-path> [type]
2355    # TODO: coap cancel
2356    # TODO: coap parameters <type> ["default"|<ack_timeout> <ack_random_factor_numerator> <ack_random_factor_denominator> <max_retransmit>]
2357    # TODO: CoAP Secure utilities
2358
2359    #
2360    # Other TODOs
2361    #
2362    # TODO: netstat
2363    # TODO: networkdiagnostic get <addr> <type> ..
2364    # TODO: networkdiagnostic reset <addr> <type> ..
2365    # TODO: parent
2366    # TODO: pskc [-p] <key>|<passphrase>
2367    #
2368
2369    #
2370    # Private methods
2371    #
2372
2373    def __parse_str(self, output: List[str]) -> str:
2374        if len(output) != 1:
2375            raise UnexpectedCommandOutput(output)
2376
2377        return output[0]
2378
2379    def __parse_int_list(self, output: List[str]) -> List[int]:
2380        line = self.__parse_str(output)
2381        return list(map(int, line.strip().split()))
2382
2383    def __parse_ip6addr(self, output: List[str]) -> Ip6Addr:
2384        return Ip6Addr(self.__parse_str(output))
2385
2386    def __parse_ip6addr_list(self, output: List[str]) -> List[Ip6Addr]:
2387        return [Ip6Addr(line) for line in output]
2388
2389    def __parse_int(self, output: List[str], base=10) -> int:
2390        if len(output) != 1:
2391            raise UnexpectedCommandOutput(output)
2392
2393        return int(output[0], base)
2394
2395    def __parse_network_key(self, output: List[str]) -> str:
2396        networkkey = self.__parse_str(output)
2397
2398        try:
2399            self.__validate_network_key(networkkey)
2400        except ValueError:
2401            raise UnexpectedCommandOutput(output)
2402
2403        return networkkey
2404
2405    def __validate_network_key(self, networkkey: str):
2406        if len(networkkey) != 32:
2407            raise ValueError(networkkey)
2408
2409        int(networkkey, 16)
2410
2411    def __parse_hex64b(self, output: List[str]) -> str:
2412        extaddr = self.__parse_str(output)
2413
2414        try:
2415            self.__validate_hex64b(extaddr)
2416        except ValueError:
2417            raise UnexpectedCommandOutput(output)
2418
2419        return extaddr
2420
2421    __parse_extaddr = __parse_hex64b
2422    __parse_extpanid = __parse_hex64b
2423    __parse_eui64 = __parse_hex64b
2424    __parse_joiner_id = __parse_hex64b
2425
2426    def __validate_hex64b(self, extaddr: str):
2427        if len(extaddr) != 16:
2428            raise ValueError(extaddr)
2429
2430        self.__validate_hex(extaddr)
2431
2432    def __validate_hex(self, hexstr: str):
2433        if len(hexstr) % 2 != 0:
2434            raise ValueError(hexstr)
2435
2436        for i in range(0, len(hexstr), 2):
2437            int(hexstr[i:i + 2], 16)
2438
2439    __validate_extaddr = __validate_hex64b
2440    __validate_extpanid = __validate_hex64b
2441
2442    def __parse_Enabled_or_Disabled(self, output: List[str]) -> bool:
2443        return self.__parse_values(output, Enabled=True, Disabled=False)
2444
2445    def __parse_values(self, output: List[str], **vals) -> Any:
2446        val = self.__parse_str(output)
2447        if val not in vals:
2448            raise UnexpectedCommandOutput(output)
2449
2450        return vals[val]
2451
2452    def __validate_hex_or_bytes(self, data: Union[str, bytes]) -> str:
2453        if isinstance(data, bytes):
2454            return ''.join('%02x' % c for c in data)
2455        else:
2456            self.__validate_hex(data)
2457            return data
2458
2459    def __hex_to_bytes(self, hexstr: str) -> bytes:
2460        self.__validate_hex(hexstr)
2461        return bytes(int(hexstr[i:i + 2], 16) for i in range(0, len(hexstr), 2))
2462
2463    def __bytes_to_hex(self, data: bytes) -> str:
2464        return ''.join('%02x' % b for b in data)
2465
2466    def __escape_escapable(self, s: str) -> str:
2467        """Escape CLI escapable characters in the given string.
2468        """
2469        escapable_chars = '\\ \t\r\n'
2470        for char in escapable_chars:
2471            s = s.replace(char, '\\%s' % char)
2472        return s
2473
2474    def __txt_to_hex(self, txt: Dict[str, Union[str, bytes, bool]]) -> str:
2475        txt_bin = b''
2476        for k, v in txt.items():
2477            assert '=' not in k, 'TXT key must not contain `=`'
2478
2479            if isinstance(v, str):
2480                entry = f'{k}={v}'.encode('utf8')
2481            elif isinstance(v, bytes):
2482                entry = f'{k}='.encode('utf8') + v
2483            else:
2484                assert v is True, 'TXT val must be str or bytes or True'
2485                entry = k.encode('utf8')
2486
2487            assert len(entry) <= 255, 'TXT entry is too long'
2488
2489            txt_bin += bytes([len(entry)])
2490            txt_bin += entry
2491
2492        return ''.join('%02x' % b for b in txt_bin)
2493
2494
2495def connect_cli_sim(executable: str, nodeid: int, simulator: Optional[Simulator] = None) -> OTCI:
2496    cli_handler = connectors.OtCliSim(executable, nodeid, simulator=simulator)
2497    cmd_handler = OtCliCommandRunner(cli_handler)
2498    return OTCI(cmd_handler)
2499
2500
2501def connect_cli_serial(dev: str, baudrate=115200) -> OTCI:
2502    cli_handler = connectors.OtCliSerial(dev, baudrate)
2503    cmd_handler = OtCliCommandRunner(cli_handler)
2504    return OTCI(cmd_handler)
2505
2506
2507def connect_ncp_sim(executable: str, nodeid: int, simulator: Optional[Simulator] = None) -> OTCI:
2508    ncp_handler = connectors.OtNcpSim(executable, nodeid, simulator=simulator)
2509    cmd_handler = OtCliCommandRunner(ncp_handler, is_spinel_cli=True)
2510    return OTCI(cmd_handler)
2511
2512
2513def connect_otbr_ssh(host: str, port: int = 22, username='pi', password='raspberry', sudo=True):
2514    cmd_handler = OtbrSshCommandRunner(host, port, username, password, sudo=sudo)
2515    return OTCI(cmd_handler)
2516
2517
2518def connect_otbr_adb(host: str, port: int = 5555):
2519    cmd_handler = OtbrAdbCommandRunner(host, port)
2520    return OTCI(cmd_handler)
2521
2522
2523def connect_cmd_handler(cmd_handler: OTCommandHandler) -> OTCI:
2524    return OTCI(cmd_handler)
2525