1#!/usr/bin/env python3 2# 3# Copyright (c) 2020, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29import ipaddress 30import logging 31import re 32from collections import Counter 33from typing import Callable, List, Collection, Union, Tuple, Optional, Dict, Pattern, Any 34 35from . import connectors 36from .command_handlers import OTCommandHandler, OtCliCommandRunner, OtbrSshCommandRunner, OtbrAdbCommandRunner 37from .connectors import Simulator 38from .errors import UnexpectedCommandOutput, ExpectLineTimeoutError, CommandError, InvalidArgumentsError 39from .types import ChildId, Rloc16, Ip6Addr, ThreadState, PartitionId, DeviceMode, RouterId, SecurityPolicy, Ip6Prefix, \ 40 RouterTableEntry, NetifIdentifier 41from .utils import match_line, constant_property 42 43 44class OTCI(object): 45 """ 46 This class represents an OpenThread Controller Interface instance that provides versatile interfaces to 47 manipulate an OpenThread device. 48 """ 49 50 DEFAULT_EXEC_COMMAND_RETRY = 4 # A command is retried 4 times if failed. 51 52 __exec_command_retry = DEFAULT_EXEC_COMMAND_RETRY 53 54 def __init__(self, otcmd: OTCommandHandler): 55 """ 56 This method initializes an OTCI instance. 57 58 :param otcmd: An OpenThread Command Handler instance to execute OpenThread CLI commands. 59 """ 60 self.__otcmd: OTCommandHandler = otcmd 61 self.__logger = logging.getLogger(name=str(self)) 62 63 def __repr__(self): 64 """Gets the string representation of the OTCI instance.""" 65 return repr(self.__otcmd) 66 67 def wait(self, duration: float, expect_line: Optional[Union[str, Pattern, Collection[Any]]] = None): 68 """Wait for a given duration. 69 70 :param duration: The duration (in seconds) wait for. 71 :param expect_line: The line expected to output if given. 72 Raise ExpectLineTimeoutError if expect_line is not found within the given duration. 73 """ 74 self.log('info', "wait for %.3f seconds", duration) 75 if expect_line is None: 76 self.__otcmd.wait(duration) 77 else: 78 success = False 79 80 while duration > 0: 81 output = self.__otcmd.wait(1) 82 if any(match_line(line, expect_line) for line in output): 83 success = True 84 break 85 86 duration -= 1 87 88 if not success: 89 raise ExpectLineTimeoutError(expect_line) 90 91 def close(self): 92 """Close the OTCI instance.""" 93 self.__otcmd.close() 94 95 def execute_command(self, 96 cmd: str, 97 timeout: float = 10, 98 silent: bool = False, 99 already_is_ok: bool = True) -> List[str]: 100 for i in range(self.__exec_command_retry + 1): 101 try: 102 return self.__execute_command(cmd, timeout, silent, already_is_ok=already_is_ok) 103 except Exception: 104 self.wait(2) 105 if i == self.__exec_command_retry: 106 raise 107 assert False 108 109 def __execute_command(self, 110 cmd: str, 111 timeout: float = 10, 112 silent: bool = False, 113 already_is_ok: bool = True) -> List[str]: 114 """Execute the OpenThread CLI command. 115 116 :param cmd: The command to execute. 117 :param timeout: The command timeout. 118 :param silent: Whether to run the command silent without logging. 119 :returns: The command output as a list of lines. 120 """ 121 if not silent: 122 self.log('info', '> %s', cmd) 123 124 output = self.__otcmd.execute_command(cmd, timeout) 125 126 if not silent: 127 for line in output: 128 self.log('info', '%s', line) 129 130 if cmd in ('reset', 'factoryreset'): 131 return output 132 133 if output[-1] == 'Done' or (already_is_ok and output[-1] == 'Error 24: Already'): 134 output = output[:-1] 135 return output 136 else: 137 raise CommandError(cmd, output) 138 139 def set_execute_command_retry(self, n: int): 140 assert n >= 0 141 self.__exec_command_retry = n 142 143 def shell(self, cmd: str, timeout: float = 10): 144 self.log('info', '# %s', cmd) 145 output = self.__otcmd.shell(cmd, timeout=timeout) 146 for line in output: 147 self.log('info', '%s', line) 148 return output 149 150 def set_logger(self, logger: logging.Logger): 151 """Set the logger for the OTCI instance, or None to disable logging.""" 152 self.__logger = logger 153 154 def log(self, level, fmt, *args, **kwargs): 155 if self.__logger is not None: 156 getattr(self.__logger, level)('(%s) ' + fmt, repr(self), *args, **kwargs) 157 158 def set_line_read_callback(self, callback: Optional[Callable[[str], Any]]): 159 """Set the callback that will be called for each line output by the CLI.""" 160 self.__otcmd.set_line_read_callback(callback) 161 162 # 163 # Constant properties 164 # 165 @constant_property 166 def version(self): 167 """Returns the firmware version. (e.g. "OPENTHREAD/20191113-01411-gb2d66e424-dirty; SIMULATION; Nov 14 2020 14:24:38")""" 168 return self.__parse_str(self.execute_command('version')) 169 170 @constant_property 171 def thread_version(self): 172 """Get the Thread Version number.""" 173 return self.__parse_int(self.execute_command('thread version')) 174 175 @constant_property 176 def api_version(self): 177 """Get API version number.""" 178 try: 179 return self.__parse_int(self.execute_command('version api')) 180 except ValueError: 181 # If the device does not have `version api` command, it will print the firmware version, which would lead to ValueError. 182 return 0 183 184 # 185 # Basic device operations 186 # 187 def ifconfig_up(self): 188 """Bring up the IPv6 interface.""" 189 self.execute_command('ifconfig up') 190 191 def ifconfig_down(self): 192 """Bring down the IPv6 interface.""" 193 self.execute_command('ifconfig down') 194 195 def get_ifconfig_state(self) -> bool: 196 """Get the status of the IPv6 interface.""" 197 return self.__parse_values(self.execute_command('ifconfig'), up=True, down=False) 198 199 def thread_start(self): 200 """Enable Thread protocol operation and attach to a Thread network.""" 201 self.execute_command('thread start') 202 203 def thread_stop(self): 204 """Disable Thread protocol operation and detach from a Thread network.""" 205 self.execute_command('thread stop') 206 207 def reset(self): 208 """Signal a platform reset.""" 209 self.execute_command('reset') 210 211 def factory_reset(self): 212 """Delete all stored settings, and signal a platform reset.""" 213 self.execute_command('factoryreset') 214 215 # 216 # Network Operations 217 # 218 _PING_STATISTICS_PATTERN = re.compile( 219 r'^(?P<transmitted>\d+) packets transmitted, (?P<received>\d+) packets received.(?: Packet loss = (?P<loss>\d+\.\d+)%.)?(?: Round-trip min/avg/max = (?P<min>\d+)/(?P<avg>\d+\.\d+)/(?P<max>\d+) ms.)?$' 220 ) 221 222 def ping(self, 223 ip: Union[str, Ip6Addr], 224 size: int = 8, 225 count: int = 1, 226 interval: float = 1, 227 hoplimit: int = 64, 228 timeout: float = 3) -> Dict: 229 """Send an ICMPv6 Echo Request. 230 The default arguments are consistent with https://github.com/openthread/openthread/blob/main/src/core/utils/ping_sender.hpp. 231 232 :param ip: The target IPv6 address to ping. 233 :param size: The number of data bytes in the payload. Default is 8. 234 :param count: The number of ICMPv6 Echo Requests to be sent. Default is 1. 235 :param interval: The interval between two consecutive ICMPv6 Echo Requests in seconds. The value may have fractional form, for example 0.5. Default is 1. 236 :param hoplimit: The hoplimit of ICMPv6 Echo Request to be sent. Default is 64. See OPENTHREAD_CONFIG_IP6_HOP_LIMIT_DEFAULT in src/core/config/ip6.h. 237 :param timeout: The maximum duration in seconds for the ping command to wait after the final echo request is sent. Default is 3. 238 """ 239 cmd = f'ping {ip} {size} {count} {interval} {hoplimit} {timeout}' 240 241 timeout_allowance = 3 242 lines = self.execute_command(cmd, timeout=(count - 1) * interval + timeout + timeout_allowance) 243 244 statistics = {} 245 for line in lines: 246 m = OTCI._PING_STATISTICS_PATTERN.match(line) 247 if m is not None: 248 if m.group('transmitted') is not None: 249 statistics['transmitted_packets'] = int(m.group('transmitted')) 250 statistics['received_packets'] = int(m.group('received')) 251 if m.group('loss') is not None: 252 statistics['packet_loss'] = float(m.group('loss')) / 100 253 if m.group('min') is not None: 254 statistics['round_trip_time'] = { 255 'min': int(m.group('min')), 256 'avg': float(m.group('avg')), 257 'max': int(m.group('max')) 258 } 259 return statistics 260 261 def ping_stop(self): 262 """Stop sending ICMPv6 Echo Requests.""" 263 self.execute_command('ping stop') 264 265 def discover(self, channel: Optional[int] = None) -> List[Dict[str, Any]]: 266 """Perform an MLE Discovery operation.""" 267 return self.__scan_networks('discover', channel) 268 269 def scan(self, channel: Optional[int] = None) -> List[Dict[str, Any]]: 270 """Perform an IEEE 802.15.4 Active Scan.""" 271 return self.__scan_networks('scan', channel) 272 273 def __scan_networks(self, cmd: str, channel: Optional[int] = None) -> List[Dict[str, Any]]: 274 if channel is not None: 275 cmd += f' {channel}' 276 277 output = self.execute_command(cmd, timeout=10) 278 if len(output) < 2: 279 raise UnexpectedCommandOutput(output) 280 281 networks = [] 282 for line in output[2:]: 283 fields = line.strip().split('|') 284 285 try: 286 _, J, netname, extpanid, panid, extaddr, ch, dbm, lqi, _ = fields 287 except Exception: 288 logging.warning('ignored output: %r', line) 289 continue 290 291 networks.append({ 292 'joinable': bool(int(J)), 293 'network_name': netname.strip(), 294 'extpanid': extpanid, 295 'panid': int(panid, 16), 296 'extaddr': extaddr, 297 'channel': int(ch), 298 'dbm': int(dbm), 299 'lqi': int(lqi), 300 }) 301 302 return networks 303 304 def scan_energy(self, duration: Optional[float] = None, channel: Optional[int] = None) -> Dict[int, int]: 305 """Perform an IEEE 802.15.4 Energy Scan.""" 306 cmd = 'scan energy' 307 if duration is not None: 308 cmd += f' {duration * 1000:d}' 309 310 if channel is not None: 311 cmd += f' {channel}' 312 313 output = self.execute_command(cmd, timeout=10) 314 if len(output) < 2: 315 raise UnexpectedCommandOutput(output) 316 317 channels = {} 318 for line in output[2:]: 319 fields = line.strip().split('|') 320 321 _, Ch, RSSI, _ = fields 322 channels[int(Ch)] = int(RSSI) 323 324 return channels 325 326 def mac_send_data_request(self): 327 """Instruct an Rx-Off-When-Idle device to send a Data Request mac frame to its parent.""" 328 self.execute_command('mac send datarequest') 329 330 def mac_send_empty_data(self): 331 """Instruct an Rx-Off-When-Idle device to send a Empty Data mac frame to its parent.""" 332 self.execute_command('mac send emptydata') 333 334 # TODO: discover 335 # TODO: dns resolve <hostname> [DNS server IP] [DNS server port] 336 # TODO: fake /a/an <dst-ipaddr> <target> <meshLocalIid> 337 # TODO: sntp query 338 339 # 340 # Set or get device/network parameters 341 # 342 343 def get_mode(self) -> str: 344 """Get the Thread Device Mode value. 345 346 -: no flags set (rx-off-when-idle, minimal Thread device, stable network data) 347 r: rx-on-when-idle 348 d: Full Thread Device 349 n: Full Network Data 350 """ 351 return self.__parse_str(self.execute_command('mode')) 352 353 def set_mode(self, mode: str): 354 """Set the Thread Device Mode value. 355 356 -: no flags set (rx-off-when-idle, minimal Thread device, stable network data) 357 r: rx-on-when-idle 358 d: Full Thread Device 359 n: Full Network Data 360 """ 361 self.execute_command(f'mode {DeviceMode(mode)}') 362 363 def get_extaddr(self) -> str: 364 """Get the IEEE 802.15.4 Extended Address.""" 365 return self.__parse_extaddr(self.execute_command('extaddr')) 366 367 def set_extaddr(self, extaddr: str): 368 """Set the IEEE 802.15.4 Extended Address.""" 369 self.__validate_hex64b(extaddr) 370 self.execute_command(f'extaddr {extaddr}') 371 372 def get_eui64(self) -> str: 373 """Get the factory-assigned IEEE EUI-64.""" 374 return self.__parse_eui64(self.execute_command('eui64')) 375 376 def set_extpanid(self, extpanid: str): 377 """Set the Thread Extended PAN ID value.""" 378 self.__validate_extpanid(extpanid) 379 self.execute_command(f'extpanid {extpanid}') 380 381 def get_extpanid(self) -> str: 382 """Get the Thread Extended PAN ID value.""" 383 return self.__parse_extpanid(self.execute_command('extpanid')) 384 385 def set_channel(self, ch): 386 """Set the IEEE 802.15.4 Channel value.""" 387 self.execute_command('channel %d' % ch) 388 389 def get_channel(self): 390 """Get the IEEE 802.15.4 Channel value.""" 391 return self.__parse_int(self.execute_command('channel')) 392 393 def get_preferred_channel_mask(self) -> int: 394 """Get preferred channel mask.""" 395 return self.__parse_int(self.execute_command('channel preferred')) 396 397 def get_supported_channel_mask(self): 398 """Get supported channel mask.""" 399 return self.__parse_int(self.execute_command('channel supported')) 400 401 def get_panid(self): 402 """Get the IEEE 802.15.4 PAN ID value.""" 403 return self.__parse_int(self.execute_command('panid'), 16) 404 405 def set_panid(self, panid): 406 """Get the IEEE 802.15.4 PAN ID value.""" 407 self.execute_command('panid %d' % panid) 408 409 def set_network_name(self, name): 410 """Set network name.""" 411 self.execute_command('networkname %s' % self.__escape_escapable(name)) 412 413 def get_network_name(self): 414 """Get network name.""" 415 return self.__parse_str(self.execute_command('networkname')) 416 417 def get_network_key(self) -> str: 418 """Get the network key.""" 419 return self.__parse_network_key(self.execute_command(self.__detect_networkkey_cmd())) 420 421 def set_network_key(self, networkkey: str): 422 """Set the network key.""" 423 self.__validate_network_key(networkkey) 424 cmd = self.__detect_networkkey_cmd() 425 self.execute_command(f'{cmd} {networkkey}') 426 427 def get_key_sequence_counter(self) -> int: 428 """Get the Thread Key Sequence Counter.""" 429 return self.__parse_int(self.execute_command('keysequence counter')) 430 431 def set_key_sequence_counter(self, counter: int): 432 """Set the Thread Key Sequence Counter.""" 433 self.execute_command(f'keysequence counter {counter}') 434 435 def get_key_sequence_guard_time(self) -> int: 436 """Get Thread Key Switch Guard Time (in hours).""" 437 return self.__parse_int(self.execute_command('keysequence guardtime')) 438 439 def set_key_sequence_guard_time(self, hours: int): 440 """Set Thread Key Switch Guard Time (in hours) 0 means Thread Key Switch immediately if key index match.""" 441 self.execute_command(f'keysequence guardtime {hours}') 442 443 def get_cca_threshold(self) -> int: 444 """Get the CCA threshold in dBm measured at antenna connector per IEEE 802.15.4 - 2015 section 10.1.4.""" 445 output = self.execute_command(f'ccathreshold') 446 val = self.__parse_str(output) 447 if not val.endswith(' dBm'): 448 raise UnexpectedCommandOutput(output) 449 450 return int(val[:-4]) 451 452 def set_cca_threshold(self, val: int): 453 """Set the CCA threshold measured at antenna connector per IEEE 802.15.4 - 2015 section 10.1.4.""" 454 self.execute_command(f'ccathreshold {val}') 455 456 def get_promiscuous(self) -> bool: 457 """Get radio promiscuous property.""" 458 return self.__parse_Enabled_or_Disabled(self.execute_command('promiscuous')) 459 460 def enable_promiscuous(self): 461 """Enable radio promiscuous operation and print raw packet content.""" 462 self.execute_command('promiscuous enable') 463 464 def disable_promiscuous(self): 465 """Disable radio promiscuous operation.""" 466 self.execute_command('promiscuous disable') 467 468 def get_txpower(self) -> int: 469 """Get the transmit power in dBm.""" 470 line = self.__parse_str(self.execute_command('txpower')) 471 if not line.endswith(' dBm'): 472 raise UnexpectedCommandOutput([line]) 473 474 return int(line.split()[0]) 475 476 def set_txpower(self, val: int): 477 """Set the transmit power in dBm.""" 478 self.execute_command(f'txpower {val}') 479 480 # TODO: fem 481 # TODO: fem lnagain 482 # TODO: fem lnagain <LNA gain> 483 # TODO: mac retries direct 484 # TODO: mac retries direct 485 # TODO: mac retries indirect 486 # TODO: mac retries indirect <number> 487 488 # 489 # Basic Node states and properties 490 # 491 492 def get_state(self) -> ThreadState: 493 """Get the current Thread state.""" 494 return ThreadState(self.__parse_str(self.execute_command('state'))) 495 496 def set_state(self, state: str): 497 """Try to switch to state detached, child, router or leader.""" 498 self.execute_command(f'state {state}') 499 500 def get_rloc16(self) -> Rloc16: 501 """Get the Thread RLOC16 value.""" 502 return Rloc16(self.__parse_int(self.execute_command('rloc16'), 16)) 503 504 def get_router_id(self) -> int: 505 """Get the Thread Router ID value.""" 506 return self.get_rloc16() >> 10 507 508 def prefer_router_id(self, routerid: int): 509 """Prefer a Router ID when solicit router id from Leader.""" 510 self.execute_command(f'preferrouterid {routerid}') 511 512 def is_singleton(self) -> bool: 513 return self.__parse_values(self.execute_command('singleton'), true=True, false=False) 514 515 # 516 # RCP related utilities 517 # 518 519 def get_rcp_version(self): 520 return self.__parse_str(self.execute_command('rcp version')) 521 522 # 523 # Unsecure port utilities 524 # 525 526 def get_unsecure_ports(self) -> List[int]: 527 """all ports from the allowed unsecured port list.""" 528 return self.__parse_int_list(self.execute_command('unsecureport get')) 529 530 def add_unsecure_port(self, port: int): 531 """Add a port to the allowed unsecured port list.""" 532 self.execute_command(f'unsecureport add {port}') 533 534 def remove_unsecure_port(self, port: int): 535 """Remove a port from the allowed unsecured port list.""" 536 self.execute_command(f'unsecureport remove {port}') 537 538 def clear_unsecure_ports(self): 539 """Remove all ports from the allowed unsecured port list.""" 540 self.execute_command('unsecureport remove all') 541 542 # 543 # Leader configurations 544 # 545 546 def get_preferred_partition_id(self) -> PartitionId: 547 """Get the preferred Thread Leader Partition ID.""" 548 return PartitionId(self.__parse_int(self.execute_command(self.__get_partition_preferred_cmd()))) 549 550 def set_preferred_partition_id(self, parid: int): 551 """Set the preferred Thread Leader Partition ID.""" 552 self.execute_command(f'{self.__get_partition_preferred_cmd()} {parid}') 553 554 def __get_partition_preferred_cmd(self) -> str: 555 """""" 556 return 'partitionid preferred' if self.api_version >= 51 else 'leaderpartitionid' 557 558 def get_leader_weight(self) -> int: 559 """Get the Thread Leader Weight.""" 560 return self.__parse_int(self.execute_command('leaderweight')) 561 562 def set_leader_weight(self, weight: int): 563 """Set the Thread Leader Weight.""" 564 self.execute_command(f'leaderweight {weight}') 565 566 __LEADER_DATA_KEY_MAP = { 567 'Partition ID': 'partition_id', 568 'Weighting': 'weight', 569 'Data Version': 'data_ver', 570 'Stable Data Version': 'stable_data_ver', 571 'Leader Router ID': 'leader_id', 572 } 573 574 def get_leader_data(self) -> Dict[str, int]: 575 """Get the Thread Leader Data.""" 576 data = {} 577 output = self.execute_command('leaderdata') 578 579 try: 580 for line in output: 581 k, v = line.split(': ') 582 data[OTCI.__LEADER_DATA_KEY_MAP[k]] = int(v) 583 except KeyError: 584 raise UnexpectedCommandOutput(output) 585 586 return data 587 588 # 589 # Router configurations 590 # 591 592 def get_router_selection_jitter(self): 593 """Get the ROUTER_SELECTION_JITTER value.""" 594 return self.__parse_int(self.execute_command('routerselectionjitter')) 595 596 def set_router_selection_jitter(self, jitter): 597 """Set the ROUTER_SELECTION_JITTER value.""" 598 self.execute_command(f'routerselectionjitter {jitter}') 599 600 def get_network_id_timeout(self) -> int: 601 """Get the NETWORK_ID_TIMEOUT parameter used in the Router role.""" 602 return self.__parse_int(self.execute_command('networkidtimeout')) 603 604 def set_network_id_timeout(self, timeout: int): 605 """Set the NETWORK_ID_TIMEOUT parameter used in the Router role.""" 606 self.execute_command(f'networkidtimeout {timeout}') 607 608 def get_parent_priority(self) -> int: 609 """Get the assigned parent priority value, -2 means not assigned.""" 610 return self.__parse_int(self.execute_command('parentpriority')) 611 612 def set_parent_priority(self, priority: int): 613 """Set the assigned parent priority value: 1, 0, -1 or -2.""" 614 self.execute_command(f'parentpriority {priority}') 615 616 def get_router_upgrade_threshold(self) -> int: 617 """Get the ROUTER_UPGRADE_THRESHOLD value.""" 618 return self.__parse_int(self.execute_command('routerupgradethreshold')) 619 620 def set_router_upgrade_threshold(self, threshold: int): 621 """Set the ROUTER_UPGRADE_THRESHOLD value.""" 622 self.execute_command(f'routerupgradethreshold {threshold}') 623 624 def get_router_downgrade_threshold(self): 625 """Set the ROUTER_DOWNGRADE_THRESHOLD value.""" 626 return self.__parse_int(self.execute_command('routerdowngradethreshold')) 627 628 def set_router_downgrade_threshold(self, threshold: int): 629 """Get the ROUTER_DOWNGRADE_THRESHOLD value.""" 630 self.execute_command(f'routerdowngradethreshold {threshold}') 631 632 def get_router_eligible(self) -> bool: 633 """Indicates whether the router role is enabled or disabled.""" 634 return self.__parse_Enabled_or_Disabled(self.execute_command('routereligible')) 635 636 def enable_router_eligible(self): 637 """Disable the router role.""" 638 self.execute_command('routereligible enable') 639 640 def disable_router_eligible(self): 641 """Disable the router role.""" 642 self.execute_command('routereligible disable') 643 644 def get_router_list(self) -> List[RouterId]: 645 """Get allocated Router IDs.""" 646 line = self.__parse_str(self.execute_command('router list')) 647 return list(map(RouterId, line.strip().split())) 648 649 def get_router_table(self) -> Dict[RouterId, RouterTableEntry]: 650 """table of routers.""" 651 output = self.execute_command('router table') 652 if len(output) < 2: 653 raise UnexpectedCommandOutput(output) 654 655 # 656 # Example output: 657 # 658 # | ID | RLOC16 | Next Hop | Path Cost | LQ In | LQ Out | Age | Extended MAC | 659 # +----+--------+----------+-----------+-------+--------+-----+------------------+ 660 # | 21 | 0x5400 | 21 | 0 | 3 | 3 | 5 | d28d7f875888fccb | 661 # | 56 | 0xe000 | 56 | 0 | 0 | 0 | 182 | f2d92a82c8d8fe43 | 662 # Done 663 # 664 665 headers = self.__split_table_row(output[0]) 666 667 table = {} 668 for line in output[2:]: 669 line = line.strip() 670 if not line: 671 continue 672 673 fields = self.__split_table_row(line) 674 if len(fields) != len(headers): 675 raise UnexpectedCommandOutput(output) 676 677 col = lambda colname: self.__get_table_col(colname, headers, fields) 678 id = col('ID') 679 680 table[RouterId(id)] = router = RouterTableEntry({ 681 'id': RouterId(id), 682 'rloc16': Rloc16(col('RLOC16'), 16), 683 'next_hop': int(col('Next Hop')), 684 'path_cost': int(col('Path Cost')), 685 'lq_in': int(col('LQ In')), 686 'lq_out': int(col('LQ Out')), 687 'age': int(col('Age')), 688 'extaddr': col('Extended MAC'), 689 }) 690 691 if 'Link' in headers: 692 router['link'] = int(col('Link')) 693 else: 694 # support older version of OT which does not output `Link` field 695 router['link'] = self.get_router_info(router['id'], silent=True)['link'] 696 697 return table 698 699 def get_router_info(self, id: int, silent: bool = False) -> RouterTableEntry: 700 cmd = f'router {id}' 701 info = {} 702 output = self.execute_command(cmd, silent=silent) 703 items = [line.strip().split(': ') for line in output] 704 705 headers = [h for h, _ in items] 706 fields = [f for _, f in items] 707 col = lambda colname: self.__get_table_col(colname, headers, fields) 708 709 return RouterTableEntry({ 710 'id': RouterId(id), 711 'rloc16': Rloc16(col('Rloc'), 16), 712 'alloc': int(col('Alloc')), 713 'next_hop': int(col('Next Hop'), 16) >> 10, # convert RLOC16 to Router ID 714 'link': int(col('Link')), 715 }) 716 717 # 718 # Router utilities: Child management 719 # 720 721 def get_child_table(self) -> Dict[ChildId, Dict[str, Any]]: 722 """Get the table of attached children.""" 723 output = self.execute_command('child table') 724 if len(output) < 2: 725 raise UnexpectedCommandOutput(output) 726 727 # 728 # Example output: 729 # | ID | RLOC16 | Timeout | Age | LQ In | C_VN |R|D|N|Ver|CSL|QMsgCnt| Extended MAC | 730 # +-----+--------+------------+------------+-------+------+-+-+-+---+---+-------+------------------+ 731 # | 1 | 0xc801 | 240 | 24 | 3 | 131 |1|0|0| 3| 0 | 0 | 4ecede68435358ac | 732 # | 2 | 0xc802 | 240 | 2 | 3 | 131 |0|0|0| 3| 1 | 0 | a672a601d2ce37d8 | 733 # Done 734 # 735 736 headers = self.__split_table_row(output[0]) 737 738 table = {} 739 for line in output[2:]: 740 line = line.strip() 741 if not line: 742 continue 743 744 fields = self.__split_table_row(line) 745 col = lambda colname: self.__get_table_col(colname, headers, fields) 746 747 id = int(col("ID")) 748 r, d, n = int(col("R")), int(col("D")), int(col("N")) 749 mode = DeviceMode(f'{"r" if r else ""}{"d" if d else ""}{"n" if n else ""}') 750 751 child = { 752 'id': ChildId(id), 753 'rloc16': Rloc16(col('RLOC16'), 16), 754 'timeout': int(col('Timeout')), 755 'age': int(col('Age')), 756 'lq_in': int(col('LQ In')), 757 'c_vn': int(col('C_VN')), 758 'mode': mode, 759 'extaddr': col('Extended MAC') 760 } 761 762 if 'Ver' in headers: 763 child['ver'] = int(col('Ver')) 764 765 if 'CSL' in headers: 766 child['csl'] = bool(int(col('CSL'))) 767 768 if 'QMsgCnt' in headers: 769 child['qmsgcnt'] = int(col('QMsgCnt')) 770 771 table[ChildId(id)] = child 772 773 return table 774 775 # 776 # DNS server & client utilities 777 # 778 779 _IPV6_SERVER_PORT_PATTERN = re.compile(r'\[(.*)\]:(\d+)') 780 781 def dns_get_config(self): 782 """Get DNS client query config.""" 783 output = self.execute_command('dns config') 784 config = {} 785 for line in output: 786 k, v = line.split(': ') 787 if k == 'Server': 788 matched = re.match(OTCI._IPV6_SERVER_PORT_PATTERN, v) 789 assert matched is not None 790 ip, port = matched.groups() 791 config['server'] = (Ip6Addr(ip), int(port)) 792 elif k == 'ResponseTimeout': 793 config['response_timeout'] = int(v[:-3]) 794 elif k == 'MaxTxAttempts': 795 config['max_tx_attempts'] = int(v) 796 elif k == 'RecursionDesired': 797 config['recursion_desired'] = (v == 'yes') 798 else: 799 logging.warning("dns config ignored: %s", line) 800 801 return config 802 803 def dns_set_config(self, 804 server: Tuple[Union[str, ipaddress.IPv6Address], int], 805 response_timeout: Optional[int] = None, 806 max_tx_attempts: Optional[int] = None, 807 recursion_desired: Optional[bool] = None): 808 """Set DNS client query config.""" 809 cmd = f'dns config {str(server[0])} {server[1]}' 810 if response_timeout is not None: 811 cmd += f' {response_timeout}' 812 813 assert max_tx_attempts is None or response_timeout is not None, "must specify `response_timeout` if `max_tx_attempts` is specified." 814 if max_tx_attempts is not None: 815 cmd += f' {max_tx_attempts}' 816 817 assert recursion_desired is None or max_tx_attempts is not None, 'must specify `max_tx_attempts` if `recursion_desired` is specified.' 818 if recursion_desired is not None: 819 cmd += f' {1 if recursion_desired else 0}' 820 821 self.execute_command(cmd) 822 823 def dns_get_compression(self) -> bool: 824 """Get DNS compression mode.""" 825 return self.__parse_Enabled_or_Disabled(self.execute_command('dns compression')) 826 827 def dns_enable_compression(self): 828 """Enable DNS compression mode.""" 829 self.execute_command('dns compression enable') 830 831 def dns_disable_compression(self): 832 """Disable DNS compression mode.""" 833 self.execute_command('dns compression disable') 834 835 def dns_browse(self, service: str) -> List[Dict]: 836 """Browse DNS service instances.""" 837 cmd = f'dns browse {service}' 838 output = '\n'.join(self.execute_command(cmd, 30.0)) 839 840 result = [] 841 for ins, port, priority, weight, srv_ttl, hostname, address, aaaa_ttl, txt_data, txt_ttl in re.findall( 842 r'(.*?)\s+Port:(\d+), Priority:(\d+), Weight:(\d+), TTL:(\d+)\s*Host:(\S+)\s+HostAddress:(\S+) TTL:(\d+)\s+TXT:(\[.*?\]) TTL:(\d+)', 843 output): 844 result.append({ 845 'instance': ins, 846 'service': service, 847 'port': int(port), 848 'priority': int(priority), 849 'weight': int(weight), 850 'host': hostname, 851 'address': Ip6Addr(address), 852 'txt': self.__parse_srp_server_service_txt(txt_data), 853 'srv_ttl': int(srv_ttl), 854 'txt_ttl': int(txt_ttl), 855 'aaaa_ttl': int(aaaa_ttl), 856 }) 857 858 return result 859 860 def dns_resolve(self, hostname: str) -> List[Dict]: 861 """Resolve a DNS host name.""" 862 cmd = f'dns resolve {hostname}' 863 output = self.execute_command(cmd, 30.0) 864 dns_resp = output[0] 865 addrs = dns_resp.strip().split(' - ')[1].split(' ') 866 ips = [Ip6Addr(item.strip()) for item in addrs[::2]] 867 ttls = [int(item.split('TTL:')[1]) for item in addrs[1::2]] 868 869 return [{ 870 'address': ip, 871 'ttl': ttl, 872 } for ip, ttl in zip(ips, ttls)] 873 874 def dns_resolve_service(self, instance: str, service: str) -> Dict: 875 """Resolves aservice instance.""" 876 instance = self.__escape_escapable(instance) 877 cmd = f'dns service {instance} {service}' 878 output = self.execute_command(cmd, 30.0) 879 880 m = re.match( 881 r'.*Port:(\d+), Priority:(\d+), Weight:(\d+), TTL:(\d+)\s+Host:(.*?)\s+HostAddress:(\S+) TTL:(\d+)\s+TXT:(\[.*?\]) TTL:(\d+)', 882 '\t'.join(output)) 883 if m: 884 port, priority, weight, srv_ttl, hostname, address, aaaa_ttl, txt_data, txt_ttl = m.groups() 885 return { 886 'instance': instance, 887 'service': service, 888 'port': int(port), 889 'priority': int(priority), 890 'weight': int(weight), 891 'host': hostname, 892 'address': Ip6Addr(address), 893 'txt': self.__parse_srp_server_service_txt(txt_data), 894 'srv_ttl': int(srv_ttl), 895 'txt_ttl': int(txt_ttl), 896 'aaaa_ttl': int(aaaa_ttl), 897 } 898 else: 899 raise CommandError(cmd, output) 900 901 # 902 # SRP server & client utilities 903 # 904 905 def srp_server_get_state(self): 906 """Get the SRP server state""" 907 return self.__parse_str(self.execute_command('srp server state')) 908 909 def srp_server_enable(self): 910 """Enable SRP server.""" 911 self.execute_command('srp server enable') 912 913 def srp_server_disable(self): 914 """Disable SRP server.""" 915 self.execute_command('srp server disable') 916 917 def srp_server_get_domain(self) -> str: 918 """Get the SRP server domain.""" 919 return self.__parse_str(self.execute_command('srp server domain')) 920 921 def srp_server_set_domain(self, domain: str): 922 """Set the SRP server domain.""" 923 self.execute_command(f'srp server domain {domain}') 924 925 def srp_server_get_hosts(self) -> List[Dict]: 926 """Get SRP server registered hosts.""" 927 return self.__parse_srp_server_hosts(self.execute_command('srp server host')) 928 929 def srp_server_get_services(self) -> List[Dict]: 930 """Get SRP server registered services.""" 931 output = self.execute_command('srp server service') 932 return self.__parse_srp_server_services(output) 933 934 def __parse_srp_server_hosts(self, output: List[str]) -> List[Dict]: 935 result = [] 936 info = None 937 for line in output: 938 if not line.startswith(' '): 939 info = {'host': line} 940 result.append(info) 941 else: 942 assert info is not None 943 k, v = line.strip().split(': ') 944 if k == 'deleted': 945 if v not in ('true', 'false'): 946 raise UnexpectedCommandOutput(output) 947 948 info['deleted'] = (v == 'true') 949 950 elif k == 'addresses': 951 if not v.startswith('[') or not v.endswith(']'): 952 raise UnexpectedCommandOutput(output) 953 954 v = v[1:-1] 955 info['addresses'] = list(map(Ip6Addr, v.split(', '))) 956 else: 957 raise UnexpectedCommandOutput(output) 958 959 return result 960 961 def __parse_srp_server_services(self, output: List[str]) -> List[Dict]: 962 result = [] 963 info = None 964 for line in output: 965 if not line.startswith(' '): 966 info = {'instance': line} 967 result.append(info) 968 else: 969 assert info is not None 970 k, v = line.strip().split(': ') 971 if k == 'deleted': 972 if v not in ('true', 'false'): 973 raise UnexpectedCommandOutput(output) 974 975 info['deleted'] = (v == 'true') 976 977 elif k == 'addresses': 978 if not v.startswith('[') or not v.endswith(']'): 979 raise UnexpectedCommandOutput(output) 980 981 v = v[1:-1] 982 info['addresses'] = list(map(Ip6Addr, v.split(', '))) 983 elif k == 'subtypes': 984 info[k] = list() if v == '(null)' else list(v.split(',')) 985 elif k in ('port', 'weight', 'priority', 'ttl', 'lease', 'key-lease'): 986 info[k] = int(v) 987 elif k in ('host',): 988 info[k] = v 989 elif k == 'TXT': 990 info['txt'] = self.__parse_srp_server_service_txt(v) 991 else: 992 raise UnexpectedCommandOutput(output) 993 994 return result 995 996 def __parse_srp_server_service_txt(self, txt: str) -> Dict[str, Union[bytes, bool]]: 997 # example value: [txt11=76616c3131, txt12=76616c3132] 998 assert txt.startswith('[') and txt.endswith(']') 999 txt_dict = {} 1000 for entry in txt[1:-1].split(', '): 1001 if not entry: 1002 continue 1003 1004 equal_pos = entry.find('=') 1005 1006 if equal_pos != -1: 1007 k, v = entry[:equal_pos], entry[equal_pos + 1:] 1008 txt_dict[k] = bytes(int(v[i:i + 2], 16) for i in range(0, len(v), 2)) 1009 else: 1010 txt_dict[entry] = True 1011 1012 return txt_dict 1013 1014 def srp_server_get_lease(self) -> Tuple[int, int, int, int]: 1015 """Get SRP server LEASE & KEY-LEASE range (in seconds).""" 1016 lines = self.execute_command(f'srp server lease') 1017 return tuple([int(line.split(':')[1].strip()) for line in lines]) 1018 1019 def srp_server_set_lease(self, min_lease: int, max_lease: int, min_key_lease: int, max_key_lease: int): 1020 """Configure SRP server LEASE & KEY-LEASE range (in seconds).""" 1021 self.execute_command(f'srp server lease {min_lease} {max_lease} {min_key_lease} {max_key_lease}') 1022 1023 def srp_client_get_state(self) -> bool: 1024 """Get SRP client state.""" 1025 return self.__parse_Enabled_or_Disabled(self.execute_command('srp client state')) 1026 1027 def srp_client_start(self, server_ip: Union[str, ipaddress.IPv6Address], server_port: int): 1028 """Start SRP client.""" 1029 self.execute_command(f'srp client start {str(server_ip)} {server_port}') 1030 1031 def srp_client_stop(self): 1032 """Stop SRP client.""" 1033 self.execute_command('srp client stop') 1034 1035 def srp_client_get_autostart(self) -> bool: 1036 """Get SRP client autostart mode.""" 1037 return self.__parse_Enabled_or_Disabled(self.execute_command('srp client autostart')) 1038 1039 def srp_client_enable_autostart(self): 1040 """Enable SRP client autostart mode.""" 1041 self.execute_command('srp client autostart enable') 1042 1043 def srp_client_disable_autostart(self): 1044 """Disable SRP client autostart mode.""" 1045 self.execute_command('srp client autostart disable') 1046 1047 def srp_client_get_callback(self) -> bool: 1048 """Get SRP client callback mode.""" 1049 return self.__parse_Enabled_or_Disabled(self.execute_command('srp client callback')) 1050 1051 def srp_client_enable_callback(self): 1052 """Enable SRP client callback mode.""" 1053 self.execute_command('srp client callback enable') 1054 1055 def srp_client_disable_callback(self): 1056 """Disable SRP client callback mode.""" 1057 self.execute_command('srp client callback disable') 1058 1059 def srp_client_set_host_name(self, name: str): 1060 """Set SRP client host name.""" 1061 self.execute_command(f'srp client host name {name}') 1062 1063 def srp_client_get_host(self) -> Dict: 1064 """Get SRP client host.""" 1065 output = self.__parse_str(self.execute_command('srp client host')) 1066 return self.__parse_srp_client_host(output) 1067 1068 _SRP_CLIENT_HOST_PATTERN = re.compile(r'name:("(.*)"|(\(null\))), state:(\S+), addrs:\[(.*)\]') 1069 1070 def __parse_srp_client_host(self, line: str) -> Dict: 1071 m = re.match(OTCI._SRP_CLIENT_HOST_PATTERN, line) 1072 if not m: 1073 raise UnexpectedCommandOutput([line]) 1074 1075 _, host, _, state, addrs = m.groups() 1076 return { 1077 'host': host or '', 1078 'state': state, 1079 'addresses': [Ip6Addr(ip) for ip in addrs.split(', ')] if addrs else [], 1080 } 1081 1082 def srp_client_get_host_name(self) -> str: 1083 """Get SRP client host name.""" 1084 name = self.__parse_str(self.execute_command('srp client host name')) 1085 return name if name != '(null)' else '' 1086 1087 def srp_client_get_host_addresses(self) -> List[Ip6Addr]: 1088 """Get SRP client host addresses.""" 1089 return self.__parse_ip6addr_list(self.execute_command('srp client host address')) 1090 1091 def srp_client_set_host_addresses(self, *addrs: Union[str, ipaddress.IPv6Address]): 1092 """Set SRP client host addresses.""" 1093 self.execute_command(f'srp client host address {" ".join(map(str, addrs))}') 1094 1095 def srp_client_get_host_state(self): 1096 """Get SRP client host state.""" 1097 return self.__parse_str(self.execute_command('srp client host state')) 1098 1099 def srp_client_remove_host(self, remove_key_lease=False): 1100 """Remove SRP client host.""" 1101 cmd = 'srp client host remove' 1102 if remove_key_lease: 1103 cmd += ' 1' 1104 1105 self.execute_command(cmd) 1106 1107 def srp_client_get_services(self) -> List[Dict]: 1108 """Get SRP client services.""" 1109 output = self.execute_command('srp client service') 1110 return [self.__parse_srp_client_service(line) for line in output] 1111 1112 _SRP_CLIENT_SERVICE_PATTERN = re.compile( 1113 r'instance:"(.*)", name:"(.*)", state:(\S+), port:(\d+), priority:(\d+), weight:(\d+)') 1114 1115 def __parse_srp_client_service(self, line: str) -> Dict: 1116 # e.g. instance:"ins2", name:"_meshcop._udp", state:ToAdd, port:2000, priority:2, weight:2 1117 m = OTCI._SRP_CLIENT_SERVICE_PATTERN.match(line) 1118 if m is None: 1119 raise UnexpectedCommandOutput([line]) 1120 1121 instance, service, state, port, priority, weight = m.groups() 1122 port, priority, weight = int(port), int(priority), int(weight) 1123 return { 1124 'instance': instance, 1125 'service': service, 1126 'state': state, 1127 'port': port, 1128 'priority': priority, 1129 'weight': weight, 1130 } 1131 1132 def srp_client_add_service(self, 1133 instance: str, 1134 service: str, 1135 port: int, 1136 priority: int = 0, 1137 weight: int = 0, 1138 txt: Optional[Dict[str, Union[str, bytes, bool]]] = None): 1139 instance = self.__escape_escapable(instance) 1140 cmd = f'srp client service add {instance} {service} {port} {priority} {weight}' 1141 if txt: 1142 cmd += f' {self.__txt_to_hex(txt)}' 1143 self.execute_command(cmd) 1144 1145 def srp_client_remove_service(self, instance: str, service: str): 1146 """Remove a service from SRP client.""" 1147 self.execute_command(f'srp client service remove {instance} {service}') 1148 1149 def srp_client_clear_service(self, instance: str, service: str): 1150 """Remove a service from SRP client without notifying the SRP server.""" 1151 self.execute_command(f'srp client service clear {instance} {service}') 1152 1153 def srp_client_get_key_lease_interval(self) -> int: 1154 """Get SRP client key lease interval (in seconds).""" 1155 return self.__parse_int(self.execute_command('srp client keyleaseinterval')) 1156 1157 def srp_client_set_key_lease_interval(self, interval: int): 1158 """Set SRP client key lease interval (in seconds).""" 1159 self.execute_command(f'srp client keyleaseinterval {interval}') 1160 1161 def srp_client_get_lease_interval(self) -> int: 1162 """Get SRP client lease interval (in seconds).""" 1163 return self.__parse_int(self.execute_command('srp client leaseinterval')) 1164 1165 def srp_client_set_lease_interval(self, interval: int): 1166 """Set SRP client lease interval (in seconds).""" 1167 self.execute_command(f'srp client leaseinterval {interval}') 1168 1169 def srp_client_get_server(self) -> Tuple[Ip6Addr, int]: 1170 """Get the SRP server (IP, port).""" 1171 result = self.__parse_str(self.execute_command('srp client server')) 1172 matched = re.match(OTCI._IPV6_SERVER_PORT_PATTERN, result) 1173 assert matched 1174 ip, port = matched.groups() 1175 return Ip6Addr(ip), int(port) 1176 1177 def srp_client_get_service_key(self) -> bool: 1178 """Get SRP client "service key record inclusion" mode.""" 1179 return self.__parse_Enabled_or_Disabled(self.execute_command('srp client service key')) 1180 1181 def srp_client_enable_service_key(self): 1182 """Enable SRP client "service key record inclusion" mode.""" 1183 self.execute_command('srp client service key enable') 1184 1185 def srp_client_disable_service_key(self): 1186 """Disable SRP client "service key record inclusion" mode.""" 1187 self.execute_command('srp client service key disable') 1188 1189 def __split_table_row(self, row: str) -> List[str]: 1190 if not (row.startswith('|') and row.endswith('|')): 1191 raise ValueError(row) 1192 1193 fields = row.split('|') 1194 fields = [x.strip() for x in fields[1:-1]] 1195 return fields 1196 1197 def __get_table_col(self, colname: str, headers: List[str], fields: List[str]) -> str: 1198 return fields[headers.index(colname)] 1199 1200 def get_child_list(self) -> List[ChildId]: 1201 """Get attached Child IDs.""" 1202 line = self.__parse_str(self.execute_command(f'child list')) 1203 return [ChildId(id) for id in line.strip().split()] 1204 1205 def get_child_info(self, child: Union[ChildId, Rloc16]) -> Dict[str, Any]: 1206 output = self.execute_command(f'child {child}') 1207 1208 info = {} 1209 1210 for line in output: 1211 k, v = line.split(': ') 1212 if k == 'Child ID': 1213 info['id'] = int(v) 1214 elif k == 'Rloc': 1215 info['rloc16'] = int(v, 16) 1216 elif k == 'Ext Addr': 1217 info['extaddr'] = v 1218 elif k == 'Mode': 1219 info['mode'] = DeviceMode(v) 1220 elif k == 'Net Data': 1221 info['c_vn'] = int(v) 1222 elif k == 'Timeout': 1223 info['timeout'] = int(v) 1224 elif k == 'Age': 1225 info['age'] = int(v) 1226 elif k == 'Link Quality In': 1227 info['lq_in'] = int(v) 1228 elif k == 'RSSI': 1229 info['rssi'] = int(v) 1230 else: 1231 self.log('warning', "Child info %s: %s ignored", k, v) 1232 1233 return info 1234 1235 def get_child_ipaddrs(self) -> Dict[Rloc16, List[Ip6Addr]]: 1236 """Get the list of IP addresses stored for MTD children. 1237 1238 Note: Each MTD child might has multiple IP addresses. 1239 """ 1240 output = self.execute_command('childip') 1241 1242 ipaddrs = {} 1243 1244 for line in output: 1245 rloc16, ip = line.split(': ') 1246 rloc16 = Rloc16(rloc16, 16) 1247 ipaddrs.setdefault(rloc16, []).append(Ip6Addr(ip.strip())) 1248 1249 return ipaddrs 1250 1251 # 1252 # Child configurations 1253 # 1254 1255 def get_max_children(self) -> int: 1256 """Get the Thread maximum number of allowed children.""" 1257 return self.__parse_int(self.execute_command('childmax')) 1258 1259 def set_max_children(self, val: int): 1260 """Set the Thread maximum number of allowed children.""" 1261 self.execute_command(f'childmax {val}') 1262 1263 def get_child_ip_max(self) -> int: 1264 """Get the maximum number of IP addresses that each MTD child may register with this device as parent.""" 1265 return self.__parse_int(self.execute_command('childip max')) 1266 1267 def set_child_ip_max(self, val: int): 1268 """Get the maximum number of IP addresses that each MTD child may register with this device as parent.""" 1269 self.execute_command(f'childip max {val}') 1270 1271 def get_child_timeout(self): 1272 """Get the Thread Child Timeout value.""" 1273 return self.__parse_int(self.execute_command('childtimeout')) 1274 1275 def set_child_timeout(self, timeout): 1276 """Set the Thread Child Timeout value.""" 1277 self.execute_command('childtimeout %d' % timeout) 1278 1279 def get_child_supervision_interval(self) -> int: 1280 """Get the Child Supervision Check Timeout value.""" 1281 return self.__parse_int(self.execute_command('childsupervision interval')) 1282 1283 def set_child_supervision_interval(self, val: int): 1284 """Set the Child Supervision Interval value. 1285 This command can only be used with FTD devices. 1286 """ 1287 self.execute_command(f'childsupervision interval {val}') 1288 1289 def get_child_supervision_check_timeout(self) -> int: 1290 """Get the Child Supervision Check Timeout value.""" 1291 return self.__parse_int(self.execute_command('childsupervision checktimeout')) 1292 1293 def set_child_supervision_check_timeout(self, val: int): 1294 """Set the Child Supervision Check Timeout value.""" 1295 self.execute_command(f'childsupervision checktimeout {val}') 1296 1297 # 1298 # Neighbor management 1299 # 1300 1301 def get_neighbor_list(self) -> List[Rloc16]: 1302 """Get a list of RLOC16 of neighbors""" 1303 line = self.__parse_str(self.execute_command('neighbor list')).strip() 1304 return [Rloc16(id, 16) for id in line.split()] 1305 1306 def get_neighbor_table(self) -> Dict[Rloc16, Dict[str, Any]]: 1307 output = self.execute_command('neighbor table') 1308 if len(output) < 2: 1309 raise UnexpectedCommandOutput(output) 1310 1311 # 1312 # Example output: 1313 # 1314 # | Role | RLOC16 | Age | Avg RSSI | Last RSSI |R|D|N| Extended MAC | 1315 # +------+--------+-----+----------+-----------+-+-+-+------------------+ 1316 # | C | 0xcc01 | 96 | -46 | -46 |1|1|1| 1eb9ba8a6522636b | 1317 # | R | 0xc800 | 2 | -29 | -29 |1|1|1| 9a91556102c39ddb | 1318 # | R | 0xf000 | 3 | -28 | -28 |1|1|1| 0ad7ed6beaa6016d | 1319 # Done 1320 # 1321 1322 headers = self.__split_table_row(output[0]) 1323 1324 table = {} 1325 for line in output[2:]: 1326 line = line.strip() 1327 if not line: 1328 continue 1329 1330 fields = self.__split_table_row(line) 1331 col = lambda colname: self.__get_table_col(colname, headers, fields) 1332 1333 role = col('Role') 1334 is_router = role == 'R' 1335 r, d, n = int(col('R')), int(col('D')), int(col('N')) 1336 mode = DeviceMode(f'{"r" if r else ""}{"d" if d else ""}{"n" if n else ""}') 1337 1338 rloc16 = Rloc16(col('RLOC16'), 16) 1339 1340 table[rloc16] = { 1341 'is_router': is_router, 1342 'rloc16': rloc16, 1343 'age': int(col('Age')), 1344 'avg_rssi': int(col('Avg RSSI')), 1345 'last_rssi': int(col('Last RSSI')), 1346 'mode': mode, 1347 'extaddr': col('Extended MAC'), 1348 } 1349 1350 return table 1351 1352 # 1353 # SED/SSED configuration 1354 # 1355 1356 def get_poll_period(self) -> int: 1357 """Get the customized data poll period of sleepy end device (milliseconds). 1358 Only for Reference Device.""" 1359 return self.__parse_int(self.execute_command('pollperiod')) 1360 1361 def set_poll_period(self, poll_period: int): 1362 """Set the customized data poll period (in milliseconds) for sleepy end device. 1363 1364 Only for Reference Device.""" 1365 self.execute_command(f'pollperiod {poll_period}') 1366 1367 # TODO: csl 1368 # TODO: csl channel <channel> 1369 # TODO: csl period <period> 1370 # TODO: csl timeout <timeout> 1371 1372 _CSL_PERIOD_PATTERN = re.compile(r'(\d+)us') 1373 _CSL_TIMEOUT_PATTERN = re.compile(r'(\d+)s') 1374 1375 def get_csl_config(self) -> Dict[str, int]: 1376 """Get the CSL configuration.""" 1377 output = self.execute_command('csl') 1378 1379 cfg = {} 1380 for line in output: 1381 k, v = line.split(': ') 1382 if k == 'Channel': 1383 cfg['channel'] = int(v) 1384 elif k == 'Timeout': 1385 matched = OTCI._CSL_TIMEOUT_PATTERN.match(v) 1386 assert matched is not None 1387 cfg['timeout'] = int(matched.group(1)) 1388 elif k == 'Period': 1389 matched = OTCI._CSL_PERIOD_PATTERN.match(v) 1390 assert matched is not None 1391 cfg['period'] = int(matched.group(1)) 1392 else: 1393 logging.warning("Ignore unknown CSL parameter: %s: %s", k, v) 1394 1395 return cfg 1396 1397 def config_csl(self, channel: Optional[int] = None, period: Optional[int] = None, timeout: Optional[int] = None): 1398 """Configure CSL parameters. 1399 1400 :param channel: Set CSL channel. 1401 :param period: Set CSL period in usec. Disable CSL by setting this parameter to 0. 1402 :param timeout: Set the CSL timeout in seconds. 1403 """ 1404 1405 if channel is None and period is None and timeout is None: 1406 raise InvalidArgumentsError("Please specify at least 1 parameter to configure.") 1407 1408 if channel is not None: 1409 self.execute_command(f'csl channel {channel}') 1410 1411 if period is not None: 1412 self.execute_command(f'csl period {period}') 1413 1414 if timeout is not None: 1415 self.execute_command(f'csl timeout {timeout}') 1416 1417 # 1418 # Leader utilities 1419 # 1420 1421 def get_context_id_reuse_delay(self) -> int: 1422 """Get the CONTEXT_ID_REUSE_DELAY value.""" 1423 return self.__parse_int(self.execute_command('contextreusedelay')) 1424 1425 def set_context_id_reuse_delay(self, val: int): 1426 """Set the CONTEXT_ID_REUSE_DELAY value.""" 1427 self.execute_command(f'contextreusedelay {val}') 1428 1429 def release_router_id(self, routerid: int): 1430 """Release a Router ID that has been allocated by the device in the Leader role.""" 1431 self.execute_command(f'releaserouterid {routerid}') 1432 1433 # Time Sync utilities 1434 # TODO: networktime 1435 # TODO: networktime <timesyncperiod> <xtalthreshold> 1436 # TODO: delaytimermin 1437 # TODO: delaytimermin <delaytimermin> 1438 1439 # 1440 # Commissioniner operations 1441 # 1442 1443 def commissioner_start(self): 1444 """Start the Commissioner role.""" 1445 self.execute_command('commissioner start') 1446 1447 def commissioner_stop(self): 1448 """Stop the Commissioner role.""" 1449 self.execute_command('commissioner stop') 1450 1451 def get_commissioiner_state(self) -> str: 1452 """Get current Commissioner state (active or petitioning or disabled).""" 1453 return self.__parse_str(self.execute_command('commissioner state')) 1454 1455 def get_commissioner_session_id(self) -> int: 1456 """Get current commissioner session id.""" 1457 return self.__parse_int(self.execute_command('commissioner sessionid')) 1458 1459 def commissioner_add_joiner(self, pskd, eui64=None, discerner=None, timeout=None): 1460 """Add a Joiner entry. 1461 1462 :param pskd: Pre-Shared Key for the Joiner. 1463 :param eui64: The IEEE EUI-64 of the Joiner or '*' to match any Joiner 1464 :param discerner: The Joiner discerner in format number/length. 1465 :param timeout: Joiner timeout in seconds. 1466 """ 1467 if (eui64 is not None) == (discerner is not None): 1468 raise InvalidArgumentsError("Please specify eui64 or discerner, but not both.") 1469 1470 if eui64 is not None and eui64 != '*': 1471 self.__validate_extaddr(eui64) 1472 1473 cmd = f'commissioner joiner add {eui64 or discerner} {pskd}' 1474 1475 if timeout is not None: 1476 cmd += f' {timeout}' 1477 1478 self.execute_command(cmd) 1479 1480 def commissioner_remove_jointer(self, eui64=None, discerner=None): 1481 if (eui64 is not None) == (discerner is not None): 1482 raise InvalidArgumentsError("Please specify eui64 or discerner, but not both.") 1483 1484 if eui64 is not None and eui64 != '*': 1485 self.__validate_extaddr(eui64) 1486 1487 self.execute_command(f'commissioner joiner remove {eui64 or discerner}') 1488 1489 def set_commissioner_provisioning_url(self, url: str): 1490 self.execute_command(f'commissioner provisioningurl {url}') 1491 1492 # TODO: commissioner announce 1493 # TODO: commissioner energy 1494 # TODO: commissioner mgmtget 1495 # TODO: commissioner mgmtset 1496 # TODO: commissioner panid 1497 1498 # 1499 # Joiner operations 1500 # 1501 def joiner_start(self, psk: str, provisioning_url: Optional[str] = None): 1502 """Start the Joiner.""" 1503 cmd = f'joiner start {psk}' 1504 if provisioning_url is not None: 1505 cmd += f' {provisioning_url}' 1506 1507 self.execute_command(cmd) 1508 1509 def joiner_stop(self): 1510 """Stop the Joiner role.""" 1511 self.execute_command('joiner stop') 1512 1513 def get_joiner_id(self) -> str: 1514 """Get the Joiner ID.""" 1515 return self.__parse_joiner_id(self.execute_command('joiner id')) 1516 1517 def get_joiner_port(self) -> int: 1518 """Get the Joiner port.""" 1519 return self.__parse_int(self.execute_command(f'joinerport')) 1520 1521 def set_joiner_port(self, port: int): 1522 """Set the Joiner port.""" 1523 self.execute_command(f'joinerport {port}') 1524 1525 # TODO: joiner discerner 1526 1527 # 1528 # Network Data utilities 1529 # 1530 def get_local_prefixes(self) -> List[Tuple[Ip6Prefix, str, str, Rloc16]]: 1531 """Get prefixes from local Network Data.""" 1532 output = self.execute_command('prefix') 1533 return self.__parse_prefixes(output) 1534 1535 def __parse_prefixes(self, output: List[str]) -> List[Tuple[Ip6Prefix, str, str, Rloc16]]: 1536 prefixes = [] 1537 1538 for line in output: 1539 if line.startswith('- '): 1540 line = line[2:] 1541 1542 prefix, flags, prf, rloc16 = line.split()[:4] 1543 prefixes.append((Ip6Prefix(prefix), flags, prf, Rloc16(rloc16, 16))) 1544 1545 return prefixes 1546 1547 def add_prefix(self, prefix: str, flags='paosr', prf='med'): 1548 """Add a valid prefix to the Network Data.""" 1549 self.execute_command(f'prefix add {prefix} {flags} {prf}') 1550 1551 def remove_prefix(self, prefix: str): 1552 """Invalidate a prefix in the Network Data.""" 1553 self.execute_command(f'prefix remove {prefix}') 1554 1555 def register_network_data(self): 1556 self.execute_command('netdata register') 1557 1558 def get_network_data(self) -> Dict[str, List]: 1559 output = self.execute_command('netdata show') 1560 1561 netdata = {} 1562 if output.pop(0) != 'Prefixes:': 1563 raise UnexpectedCommandOutput(output) 1564 1565 prefixes_output = [] 1566 while True: 1567 line = output.pop(0) 1568 if line == 'Routes:': 1569 break 1570 else: 1571 prefixes_output.append(line) 1572 1573 netdata['prefixes'] = self.__parse_prefixes(prefixes_output) 1574 1575 routes_output = [] 1576 while True: 1577 line = output.pop(0) 1578 if line == 'Services:': 1579 break 1580 else: 1581 routes_output.append(line) 1582 1583 netdata['routes'] = self.__parse_routes(routes_output) 1584 1585 services_output = [] 1586 while True: 1587 line = output.pop(0) 1588 if line == 'Contexts:': 1589 break 1590 else: 1591 services_output.append(line) 1592 1593 netdata['services'] = self.__parse_services(services_output) 1594 1595 return netdata 1596 1597 def get_prefixes(self) -> List[Tuple[Ip6Prefix, str, str, Rloc16]]: 1598 """Get network prefixes from Thread Network Data.""" 1599 network_data = self.get_network_data() 1600 return network_data['prefixes'] 1601 1602 def get_routes(self) -> List[Tuple[str, bool, str, Rloc16]]: 1603 """Get routes from Thread Network Data.""" 1604 network_data = self.get_network_data() 1605 return network_data['routes'] 1606 1607 def get_services(self) -> List[Tuple[int, bytes, bytes, bool, Rloc16]]: 1608 """Get services from Thread Network Data""" 1609 network_data = self.get_network_data() 1610 return network_data['services'] 1611 1612 def __parse_services(self, output: List[str]) -> List[Tuple[int, bytes, bytes, bool, Rloc16]]: 1613 services = [] 1614 for line in output: 1615 line = line.split() 1616 1617 enterprise_number, service_data, server_data = line[:3] 1618 if line[3] == 's': 1619 stable, rloc16 = True, line[4] 1620 else: 1621 stable, rloc16 = False, line[3] 1622 1623 enterprise_number = int(enterprise_number) 1624 service_data = self.__hex_to_bytes(service_data) 1625 server_data = self.__hex_to_bytes(server_data) 1626 rloc16 = Rloc16(rloc16, 16) 1627 1628 services.append((enterprise_number, service_data, server_data, stable, rloc16)) 1629 1630 return services 1631 1632 def get_network_data_bytes(self) -> bytes: 1633 """Get the raw Network Data.""" 1634 hexstr = self.__parse_str(self.execute_command('netdata show -x')) 1635 return bytes(int(hexstr[i:i + 2], 16) for i in range(0, len(hexstr), 2)) 1636 1637 def get_local_routes(self) -> List[Tuple[str, bool, str, Rloc16]]: 1638 """Get routes from local Network Data.""" 1639 return self.__parse_routes(self.execute_command('route')) 1640 1641 def __parse_routes(self, output: List[str]) -> List[Tuple[str, bool, str, Rloc16]]: 1642 routes = [] 1643 for line in output: 1644 line = line.split() 1645 if len(line) == 4: 1646 prefix, flags, prf, rloc16 = line 1647 stable = 's' in flags 1648 else: 1649 prefix, prf, rloc16 = line 1650 stable = False 1651 1652 rloc16 = Rloc16(rloc16, 16) 1653 routes.append((prefix, stable, prf, rloc16)) 1654 1655 return routes 1656 1657 def add_route(self, prefix: str, stable=True, prf='med'): 1658 """Add a valid external route to the Network Data.""" 1659 cmd = f'route add {prefix}' 1660 if stable: 1661 cmd += ' s' 1662 1663 cmd += f' {prf}' 1664 self.execute_command(cmd) 1665 1666 def remove_route(self, prefix: str): 1667 """Invalidate a external route in the Network Data.""" 1668 self.execute_command(f'route remove {prefix}') 1669 1670 def add_service(self, enterprise_number: int, service_data: Union[str, bytes], server_data: Union[str, bytes]): 1671 """Add service to the Network Data. 1672 1673 enterpriseNumber: IANA enterprise number 1674 serviceData: hex-encoded binary service data 1675 serverData: hex-encoded binary server data 1676 """ 1677 service_data = self.__validate_hex_or_bytes(service_data) 1678 server_data = self.__validate_hex_or_bytes(server_data) 1679 self.execute_command(f'service add {enterprise_number} {service_data} {server_data}') 1680 1681 def remove_service(self, enterprise_number, service_data): 1682 """Remove service from Network Data. 1683 1684 enterpriseNumber: IANA enterprise number 1685 serviceData: hext-encoded binary service data 1686 """ 1687 service_data = self.__validate_hex_or_bytes(service_data) 1688 self.execute_command(f'service remove {enterprise_number} {service_data}') 1689 1690 # 1691 # Dataset management 1692 # 1693 1694 def dataset_init_buffer(self, get_active_dataset=False, get_pending_dataset=False): 1695 """Initialize operational dataset buffer.""" 1696 if get_active_dataset and get_pending_dataset: 1697 raise InvalidArgumentsError("Can not specify both `get_active_dataset` and `get_pending_dataset`.") 1698 1699 if get_active_dataset: 1700 self.execute_command(f'dataset init active') 1701 elif get_pending_dataset: 1702 self.execute_command(f'dataset init pending') 1703 else: 1704 self.execute_command(f'dataset init new') 1705 1706 def dataset_commit_buffer(self, dataset: str): 1707 if dataset in ('active', 'pending'): 1708 cmd = f'dataset commit {dataset}' 1709 else: 1710 raise InvalidArgumentsError(f'Unkonwn dataset: {dataset}') 1711 1712 self.execute_command(cmd) 1713 1714 def dataset_clear_buffer(self): 1715 """Reset operational dataset buffer.""" 1716 self.execute_command('dataset clear') 1717 1718 def get_dataset(self, dataset: str = 'buffer'): 1719 if dataset in ('active', 'pending'): 1720 cmd = f'dataset {dataset}' 1721 elif dataset == 'buffer': 1722 cmd = 'dataset' 1723 else: 1724 raise InvalidArgumentsError(f'Unkonwn dataset: {dataset}') 1725 1726 output = self.execute_command(cmd) 1727 return self.__parse_dataset(output) 1728 1729 def __parse_dataset(self, output: List[str]) -> Dict[str, Any]: 1730 # Example output: 1731 # 1732 # Active Timestamp: 1 1733 # Channel: 22 1734 # Channel Mask: 0x07fff800 1735 # Ext PAN ID: 5c93ae980ff22d35 1736 # Mesh Local Prefix: fdc7:55fe:6363:bd01::/64 1737 # Network Key: d1a8348d59fb1fac1d6c4f95007d487a 1738 # Network Name: OpenThread-7caa 1739 # PAN ID: 0x7caa 1740 # PSKc: 167d89fd169e439ca0b8266de248090f 1741 # Security Policy: 672 onrc 0 1742 1743 dataset = {} 1744 1745 for line in output: 1746 line = line.split(': ') 1747 key, val = line[0], ': '.join(line[1:]) 1748 1749 if key == 'Active Timestamp': 1750 dataset['active_timestamp'] = int(val) 1751 elif key == 'Channel': 1752 dataset['channel'] = int(val) 1753 elif key == 'Channel Mask': 1754 dataset['channel_mask'] = int(val, 16) 1755 elif key == 'Ext PAN ID': 1756 dataset['extpanid'] = val 1757 elif key == 'Mesh Local Prefix': 1758 dataset['mesh_local_prefix'] = val 1759 elif key in ('Network Key', 'Master Key'): 1760 dataset['networkkey'] = val 1761 elif key == 'Network Name': 1762 dataset['network_name'] = val 1763 elif key == 'PAN ID': 1764 dataset['panid'] = int(val, 16) 1765 elif key == 'PSKc': 1766 dataset['pskc'] = val 1767 elif key == 'Security Policy': 1768 rotation_time, flags, version_threshold = val.split(' ') 1769 rotation_time = int(rotation_time) 1770 dataset['security_policy'] = SecurityPolicy(rotation_time, flags) 1771 else: 1772 raise UnexpectedCommandOutput(output) 1773 1774 return dataset 1775 1776 def get_dataset_bytes(self, dataset: str) -> bytes: 1777 if dataset in ('active', 'pending'): 1778 cmd = f'dataset {dataset} -x' 1779 else: 1780 raise InvalidArgumentsError(f'Unkonwn dataset: {dataset}') 1781 1782 hexstr = self.__parse_str(self.execute_command(cmd)) 1783 return self.__hex_to_bytes(hexstr) 1784 1785 def set_dataset_bytes(self, dataset: str, data: bytes) -> None: 1786 if dataset in ('active', 'pending'): 1787 cmd = f'dataset set {dataset} {self.__bytes_to_hex(data)}' 1788 else: 1789 raise InvalidArgumentsError(f'Unkonwn dataset: {dataset}') 1790 1791 self.execute_command(cmd) 1792 1793 def dataset_set_buffer(self, 1794 active_timestamp: Optional[int] = None, 1795 channel: Optional[int] = None, 1796 channel_mask: Optional[int] = None, 1797 extpanid: Optional[str] = None, 1798 mesh_local_prefix: Optional[str] = None, 1799 network_key: Optional[str] = None, 1800 network_name: Optional[str] = None, 1801 panid: Optional[int] = None, 1802 pskc: Optional[str] = None, 1803 security_policy: Optional[tuple] = None, 1804 pending_timestamp: Optional[int] = None): 1805 if active_timestamp is not None: 1806 self.execute_command(f'dataset activetimestamp {active_timestamp}') 1807 1808 if channel is not None: 1809 self.execute_command(f'dataset channel {channel}') 1810 1811 if channel_mask is not None: 1812 self.execute_command(f'dataset channelmask {channel_mask}') 1813 1814 if extpanid is not None: 1815 self.execute_command(f'dataset extpanid {extpanid}') 1816 1817 if mesh_local_prefix is not None: 1818 self.execute_command(f'dataset meshlocalprefix {mesh_local_prefix}') 1819 1820 if network_key is not None: 1821 nwk_cmd = self.__detect_networkkey_cmd() 1822 self.execute_command(f'dataset {nwk_cmd} {network_key}') 1823 1824 if network_name is not None: 1825 self.execute_command(f'dataset networkname {self.__escape_escapable(network_name)}') 1826 1827 if panid is not None: 1828 self.execute_command(f'dataset panid {panid}') 1829 1830 if pskc is not None: 1831 self.execute_command(f'dataset pskc {pskc}') 1832 1833 if security_policy is not None: 1834 rotation_time, flags = security_policy 1835 self.execute_command(f'dataset securitypolicy {rotation_time} {flags}') 1836 1837 if pending_timestamp is not None: 1838 self.execute_command(f'dataset pendingtimestamp {pending_timestamp}') 1839 1840 # TODO: dataset mgmtgetcommand 1841 # TODO: dataset mgmtsetcommand 1842 # TODO: dataset set <active|pending> <dataset> 1843 1844 # 1845 # Allowlist management 1846 # 1847 1848 def enable_allowlist(self): 1849 self.execute_command(f'macfilter addr {self.__detect_allowlist_cmd()}') 1850 1851 def disable_allowlist(self): 1852 self.execute_command('macfilter addr disable') 1853 1854 def add_allowlist(self, addr: str, rssi: Optional[int] = None): 1855 cmd = f'macfilter addr add {addr}' 1856 1857 if rssi is not None: 1858 cmd += f' {rssi}' 1859 1860 self.execute_command(cmd) 1861 1862 def remove_allowlist(self, addr: str): 1863 self.execute_command(f'macfilter addr remove {addr}') 1864 1865 def clear_allowlist(self): 1866 self.execute_command('macfilter addr clear') 1867 1868 def set_allowlist(self, allowlist: Collection[Union[str, Tuple[str, int]]]): 1869 self.clear_allowlist() 1870 1871 if allowlist is None: 1872 self.disable_allowlist() 1873 else: 1874 self.enable_allowlist() 1875 for item in allowlist: 1876 if isinstance(item, str): 1877 self.add_allowlist(item) 1878 else: 1879 addr, rssi = item[0], item[1] 1880 self.add_allowlist(addr, rssi) 1881 1882 # TODO: denylist 1883 # TODO: macfilter rss 1884 # TODO: macfilter rss add <extaddr> <rss> 1885 # TODO: macfilter rss add-lqi <extaddr> <lqi> 1886 # TODO: macfilter rss remove <extaddr> 1887 # TODO: macfilter rss clear 1888 1889 def __detect_allowlist_cmd(self): 1890 if self.api_version >= 28: 1891 return 'allowlist' 1892 else: 1893 return '\x77\x68\x69\x74\x65\x6c\x69\x73\x74' 1894 1895 def __detect_networkkey_cmd(self) -> str: 1896 return 'networkkey' if self.api_version >= 126 else 'masterkey' 1897 1898 # 1899 # Unicast Addresses management 1900 # 1901 def add_ipaddr(self, ip: Union[str, ipaddress.IPv6Address]): 1902 """Add an IPv6 address to the Thread interface.""" 1903 self.execute_command(f'ipaddr add {ip}') 1904 1905 def del_ipaddr(self, ip: Union[str, ipaddress.IPv6Address]): 1906 """Delete an IPv6 address from the Thread interface.""" 1907 self.execute_command(f'ipaddr del {ip}') 1908 1909 def get_ipaddrs(self) -> Tuple[Ip6Addr]: 1910 """Get all IPv6 addresses assigned to the Thread interface.""" 1911 return tuple(map(Ip6Addr, self.execute_command('ipaddr'))) 1912 1913 def has_ipaddr(self, ip: Union[str, ipaddress.IPv6Address]): 1914 """Check if a IPv6 address was added to the Thread interface.""" 1915 return ip in self.get_ipaddrs() 1916 1917 def get_ipaddr_mleid(self) -> Ip6Addr: 1918 """Get Thread Mesh Local EID address.""" 1919 return self.__parse_ip6addr(self.execute_command('ipaddr mleid')) 1920 1921 def get_ipaddr_linklocal(self) -> Ip6Addr: 1922 """Get Thread link-local IPv6 address.""" 1923 return self.__parse_ip6addr(self.execute_command('ipaddr linklocal')) 1924 1925 def get_ipaddr_rloc(self) -> Ip6Addr: 1926 """Get Thread Routing Locator (RLOC) address.""" 1927 return self.__parse_ip6addr(self.execute_command('ipaddr rloc')) 1928 1929 # 1930 # Multicast Addresses management 1931 # 1932 1933 def add_ipmaddr(self, ip: Union[str, ipaddress.IPv6Address]): 1934 """Subscribe the Thread interface to the IPv6 multicast address.""" 1935 self.execute_command(f'ipmaddr add {ip}') 1936 1937 def del_ipmaddr(self, ip: Union[str, ipaddress.IPv6Address]): 1938 """Unsubscribe the Thread interface to the IPv6 multicast address.""" 1939 self.execute_command(f'ipmaddr del {ip}') 1940 1941 def get_ipmaddrs(self) -> Tuple[Ip6Addr]: 1942 """Get all IPv6 multicast addresses subscribed to the Thread interface.""" 1943 return tuple(map(Ip6Addr, self.execute_command('ipmaddr'))) 1944 1945 def has_ipmaddr(self, ip: Union[str, ipaddress.IPv6Address]): 1946 """Check if a IPv6 multicast address was subscribed by the Thread interface.""" 1947 return ip in self.get_ipmaddrs() 1948 1949 def get_ipmaddr_promiscuous(self) -> bool: 1950 """Get multicast promiscuous mode.""" 1951 return self.__parse_Enabled_or_Disabled(self.execute_command("ipmaddr promiscuous")) 1952 1953 def enable_ipmaddr_promiscuous(self): 1954 """Enable multicast promiscuous mode.""" 1955 self.execute_command('ipmaddr promiscuous enable') 1956 1957 def disable_ipmaddr_promiscuous(self): 1958 """Disable multicast promiscuous mode.""" 1959 self.execute_command('ipmaddr promiscuous disable') 1960 1961 def get_ipmaddr_llatn(self) -> Ip6Addr: 1962 """Get Link Local All Thread Nodes Multicast Address""" 1963 return self.__parse_ip6addr(self.execute_command('ipmaddr llatn')) 1964 1965 def get_ipmaddr_rlatn(self) -> Ip6Addr: 1966 """Get Realm Local All Thread Nodes Multicast Address""" 1967 return self.__parse_ip6addr(self.execute_command('ipmaddr rlatn')) 1968 1969 # 1970 # Backbone Router Utilities 1971 # 1972 1973 # TODO: bbr mgmt ... 1974 1975 def enable_backbone_router(self): 1976 """Enable Backbone Router Service for Thread 1.2 FTD. 1977 1978 SRV_DATA.ntf would be triggered for attached device if there is no Backbone Router Service in Thread Network Data. 1979 """ 1980 self.execute_command('bbr enable') 1981 1982 def disable_backbone_router(self): 1983 """Disable Backbone Router Service for Thread 1.2 FTD. 1984 1985 SRV_DATA.ntf would be triggered if Backbone Router is Primary state. 1986 """ 1987 self.execute_command('bbr disable') 1988 1989 def get_backbone_router_state(self) -> str: 1990 """Get local Backbone state (Disabled or Primary or Secondary) for Thread 1.2 FTD.""" 1991 return self.__parse_str(self.execute_command('bbr state')) 1992 1993 def get_primary_backbone_router_info(self) -> Optional[dict]: 1994 """Show current Primary Backbone Router information for Thread 1.2 device.""" 1995 output = self.execute_command('bbr') 1996 1997 if len(output) < 1: 1998 raise UnexpectedCommandOutput(output) 1999 2000 line = output[0] 2001 if line == 'BBR Primary: None': 2002 return None 2003 2004 if line != 'BBR Primary:': 2005 raise UnexpectedCommandOutput(output) 2006 2007 # Example output: 2008 # BBR Primary: 2009 # server16: 0xE400 2010 # seqno: 10 2011 # delay: 120 secs 2012 # timeout: 300 secs 2013 2014 dataset = {} 2015 2016 for line in output[1:]: 2017 key, val = line.split(':') 2018 key, val = key.strip(), val.strip() 2019 if key == 'server16': 2020 dataset[key] = int(val, 16) 2021 elif key == 'seqno': 2022 dataset[key] = int(val) 2023 elif key == 'delay': 2024 if not val.endswith(' secs'): 2025 raise UnexpectedCommandOutput(output) 2026 dataset[key] = int(val.split()[0]) 2027 elif key == 'timeout': 2028 if not val.endswith(' secs'): 2029 raise UnexpectedCommandOutput(output) 2030 dataset[key] = int(val.split()[0]) 2031 else: 2032 raise UnexpectedCommandOutput(output) 2033 2034 return dataset 2035 2036 def register_backbone_router_dataset(self): 2037 """Register Backbone Router Service for Thread 1.2 FTD. 2038 2039 SRV_DATA.ntf would be triggered for attached device. 2040 """ 2041 self.execute_command('bbr register') 2042 2043 def get_backbone_router_config(self) -> dict: 2044 """Show local Backbone Router configuration for Thread 1.2 FTD.""" 2045 output = self.execute_command('bbr config') 2046 # Example output: 2047 # seqno: 10 2048 # delay: 120 secs 2049 # timeout: 300 secs 2050 2051 config = {} 2052 2053 for line in output: 2054 key, val = line.split(':') 2055 key, val = key.strip(), val.strip() 2056 if key == 'seqno': 2057 config[key] = int(val) 2058 elif key in ('delay', 'timeout'): 2059 if not line.endswith(' secs'): 2060 raise UnexpectedCommandOutput(output) 2061 config[key] = int(val.split()[0]) 2062 else: 2063 raise UnexpectedCommandOutput(output) 2064 2065 return config 2066 2067 def set_backbone_router_config(self, 2068 seqno: Optional[int] = None, 2069 delay: Optional[int] = None, 2070 timeout: Optional[int] = None): 2071 """Configure local Backbone Router configuration for Thread 1.2 FTD. 2072 2073 Call register_backbone_router_dataset() to explicitly register Backbone Router service to Leader for Secondary Backbone Router. 2074 """ 2075 if seqno is None and delay is None and timeout is None: 2076 raise InvalidArgumentsError("Please specify seqno or delay or timeout") 2077 2078 cmd = 'bbr config' 2079 if seqno is not None: 2080 cmd += f' seqno {seqno}' 2081 2082 if delay is not None: 2083 cmd += f' delay {delay}' 2084 2085 if timeout is not None: 2086 cmd += f' timeout {timeout}' 2087 2088 self.execute_command(cmd) 2089 2090 def get_backbone_router_jitter(self) -> int: 2091 """Get jitter (in seconds) for Backbone Router registration for Thread 1.2 FTD.""" 2092 return self.__parse_int(self.execute_command('bbr jitter')) 2093 2094 def set_backbone_router_jitter(self, val: int): 2095 """Set jitter (in seconds) for Backbone Router registration for Thread 1.2 FTD.""" 2096 self.execute_command(f'bbr jitter {val}') 2097 2098 def backbone_router_get_multicast_listeners(self) -> List[Tuple[Ip6Addr, int]]: 2099 """Get Backbone Router Multicast Listeners.""" 2100 listeners = [] 2101 for line in self.execute_command('bbr mgmt mlr listener'): 2102 ip, timeout = line.split() 2103 listeners.append((Ip6Addr(ip), int(timeout))) 2104 2105 return listeners 2106 2107 # 2108 # Thread 1.2 and DUA/MLR utilities 2109 # 2110 2111 def get_domain_name(self) -> str: 2112 """Get the Thread Domain Name for Thread 1.2 device.""" 2113 return self.__parse_str(self.execute_command('domainname')) 2114 2115 def set_domain_name(self, name: str): 2116 """Set the Thread Domain Name for Thread 1.2 device.""" 2117 self.execute_command('domainname %s' % self.__escape_escapable(name)) 2118 2119 # TODO: dua iid 2120 # TODO: dua iid <iid> 2121 # TODO: dua iid clear 2122 # TODO: mlr reg <ipaddr> ... [timeout] 2123 2124 # 2125 # Link metrics management 2126 # 2127 # TODO: linkmetrics mgmt <ipaddr> forward <seriesid> [ldraX][pqmr] 2128 # TODO: linkmetrics probe <ipaddr> <seriesid> <length> 2129 # TODO: linkmetrics query <ipaddr> single [pqmr] 2130 # TODO: linkmetrics query <ipaddr> forward <seriesid> 2131 # TODO: linkquality <extaddr> 2132 # TODO: linkquality <extaddr> <linkquality> 2133 # 2134 2135 # 2136 # Logging 2137 # 2138 2139 def get_log_level(self) -> int: 2140 """Get the log level.""" 2141 return self.__parse_int(self.execute_command('log level')) 2142 2143 def set_log_level(self, level: int): 2144 """Set the log level.""" 2145 self.execute_command(f'log level {level}') 2146 2147 # 2148 # Device performance related information 2149 # 2150 2151 def get_message_buffer_info(self) -> dict: 2152 """Get the current message buffer information.""" 2153 output = self.execute_command('bufferinfo') 2154 2155 info = {} 2156 2157 def _parse_val(val): 2158 vals = val.split() 2159 return int(vals[0]) if len(vals) == 1 else tuple(map(int, vals)) 2160 2161 for line in output: 2162 key, val = line.split(':') 2163 key, val = key.strip(), val.strip() 2164 info[key.replace(' ', '_')] = _parse_val(val) 2165 2166 return info 2167 2168 @constant_property 2169 def counter_names(self): 2170 """Get the supported counter names.""" 2171 return tuple(self.execute_command('counters')) 2172 2173 def get_counter(self, name: str) -> Counter: 2174 """Reset the counter value.""" 2175 output = self.execute_command(f'counters {name}') 2176 2177 counter = Counter() 2178 for line in output: 2179 k, v = line.strip().split(': ') 2180 counter[k] = int(v) 2181 2182 return counter 2183 2184 def reset_counter(self, name: str): 2185 """Reset the counter value.""" 2186 self.execute_command(f'counters {name} reset') 2187 2188 def get_eidcache(self) -> Dict[Ip6Addr, Rloc16]: 2189 """Get the EID-to-RLOC cache entries.""" 2190 output = self.execute_command('eidcache') 2191 cache = {} 2192 2193 for line in output: 2194 ip, rloc16, _ = line.split(" ", 2) 2195 2196 cache[Ip6Addr(ip)] = Rloc16(rloc16, 16) 2197 2198 return cache 2199 2200 # 2201 # UDP utilities 2202 # 2203 2204 def udp_open(self): 2205 """Opens the example socket.""" 2206 self.execute_command('udp open') 2207 2208 def udp_close(self): 2209 """Opens the example socket.""" 2210 self.execute_command('udp close') 2211 2212 def udp_bind(self, ip: str, port: int, netif: NetifIdentifier = NetifIdentifier.THERAD): 2213 """Assigns a name (i.e. IPv6 address and port) to the example socket. 2214 2215 :param ip: the IPv6 address or the unspecified IPv6 address (::). 2216 :param port: the UDP port 2217 """ 2218 bindarg = '' 2219 if netif == NetifIdentifier.UNSPECIFIED: 2220 bindarg += ' -u' 2221 elif netif == NetifIdentifier.BACKBONE: 2222 bindarg += ' -b' 2223 2224 self.execute_command(f'udp bind{bindarg} {ip} {port}') 2225 2226 def udp_connect(self, ip: str, port: int): 2227 """Specifies the peer with which the socket is to be associated. 2228 2229 ip: the peer's IPv6 address. 2230 port: the peer's UDP port. 2231 """ 2232 self.execute_command(f'udp connect {ip} {port}') 2233 2234 def udp_send(self, 2235 ip: Optional[Union[str, Ip6Addr]] = None, 2236 port: Optional[int] = None, 2237 text: Optional[str] = None, 2238 random_bytes: Optional[int] = None, 2239 hex: Optional[str] = None): 2240 """Send a few bytes over UDP. 2241 2242 ip: the IPv6 destination address. 2243 port: the UDP destination port. 2244 type: the type of the message: _ -t: text payload in the value, same as without specifying the type. _ -s: autogenerated payload with specified length indicated in the value. 2245 * -x: binary data in hexadecimal representation in the value. 2246 """ 2247 if (ip is None) != (port is None): 2248 raise InvalidArgumentsError("Please specify both `ip` and `port`.") 2249 2250 if (text is not None) + (random_bytes is not None) + (hex is not None) != 1: 2251 raise InvalidArgumentsError("Please specify `text` or `random_bytes` or `hex`.") 2252 2253 cmd = 'udp send' 2254 2255 if ip is not None: 2256 cmd += f' {ip} {port}' 2257 2258 if text is not None: 2259 cmd += f' -t {text}' 2260 elif random_bytes is not None: 2261 cmd += f' -s {random_bytes}' 2262 elif hex is not None: 2263 self.__validate_hex(hex) 2264 cmd += f' -x {hex}' 2265 2266 self.execute_command(cmd) 2267 2268 def udp_get_link_security(self) -> bool: 2269 """Gets whether the link security is enabled or disabled.""" 2270 return self.__parse_Enabled_or_Disabled(self.execute_command('udp linksecurity')) 2271 2272 def udp_enable_link_security(self): 2273 """Enable link security.""" 2274 self.execute_command('udp linksecurity enable') 2275 2276 def udp_disable_link_security(self): 2277 """Disable link security.""" 2278 self.execute_command('udp linksecurity disable') 2279 2280 def netstat(self) -> List[Tuple[Tuple[Ip6Addr, int], Tuple[Ip6Addr, int]]]: 2281 cmd = 'netstat' 2282 output = self.execute_command(cmd) 2283 if len(output) < 2: 2284 raise UnexpectedCommandOutput(output) 2285 2286 socks = [] 2287 for line in output[2:]: 2288 _, sock_addr, peer_addr = line.strip().split('|')[:3] 2289 sock_addr = self.__parse_socket_addr(sock_addr.strip()) 2290 peer_addr = self.__parse_socket_addr(peer_addr.strip()) 2291 socks.append((sock_addr, peer_addr)) 2292 2293 return socks 2294 2295 @staticmethod 2296 def __parse_socket_addr(addr: str) -> Tuple[Ip6Addr, int]: 2297 addr, port = addr.rsplit(':', 1) 2298 if addr.startswith('[') and addr.endswith(']'): 2299 addr = addr[1:-1] 2300 2301 return Ip6Addr(addr), int(port) if port != '*' else 0 2302 2303 # 2304 # CoAP CLI (test) utilities 2305 # 2306 def coap_start(self): 2307 """Starts the application coap service.""" 2308 self.execute_command('coap start') 2309 2310 def coap_stop(self): 2311 """Stops the application coap service.""" 2312 self.execute_command('coap stop') 2313 2314 def coap_get(self, addr: Union[str, Ip6Addr], uri_path: str, type: str = "con"): 2315 cmd = f'coap get {addr} {uri_path} {type}' 2316 self.execute_command(cmd) 2317 2318 def coap_put(self, addr: Union[str, Ip6Addr], uri_path: str, type: str = "con", payload: Optional[str] = None): 2319 cmd = f'coap put {addr} {uri_path} {type}' 2320 2321 if payload is not None: 2322 cmd += f' {payload}' 2323 2324 self.execute_command(cmd) 2325 2326 def coap_post(self, addr: Union[str, Ip6Addr], uri_path: str, type: str = "con", payload: Optional[str] = None): 2327 cmd = f'coap post {addr} {uri_path} {type}' 2328 2329 if payload is not None: 2330 cmd += f' {payload}' 2331 2332 self.execute_command(cmd) 2333 2334 def coap_delete(self, addr: Union[str, Ip6Addr], uri_path: str, type: str = "con", payload: Optional[str] = None): 2335 cmd = f'coap delete {addr} {uri_path} {type}' 2336 2337 if payload is not None: 2338 cmd += f' {payload}' 2339 2340 self.execute_command(cmd) 2341 2342 def coap_get_test_resource_path(self) -> str: 2343 """Gets the URI path for the test resource.""" 2344 return self.__parse_str(self.execute_command('coap resource')) 2345 2346 def coap_set_test_resource_path(self, path: str): 2347 """Sets the URI path for the test resource.""" 2348 self.execute_command(f'coap resource {path}') 2349 2350 def coap_test_set_resource_content(self, content: str): 2351 """Sets the content sent by the test resource. If a CoAP client is observing the resource, a notification is sent to that client.""" 2352 self.execute_command(f'coap set {content}') 2353 2354 # TODO: coap observe <address> <uri-path> [type] 2355 # TODO: coap cancel 2356 # TODO: coap parameters <type> ["default"|<ack_timeout> <ack_random_factor_numerator> <ack_random_factor_denominator> <max_retransmit>] 2357 # TODO: CoAP Secure utilities 2358 2359 # 2360 # Other TODOs 2361 # 2362 # TODO: netstat 2363 # TODO: networkdiagnostic get <addr> <type> .. 2364 # TODO: networkdiagnostic reset <addr> <type> .. 2365 # TODO: parent 2366 # TODO: pskc [-p] <key>|<passphrase> 2367 # 2368 2369 # 2370 # Private methods 2371 # 2372 2373 def __parse_str(self, output: List[str]) -> str: 2374 if len(output) != 1: 2375 raise UnexpectedCommandOutput(output) 2376 2377 return output[0] 2378 2379 def __parse_int_list(self, output: List[str]) -> List[int]: 2380 line = self.__parse_str(output) 2381 return list(map(int, line.strip().split())) 2382 2383 def __parse_ip6addr(self, output: List[str]) -> Ip6Addr: 2384 return Ip6Addr(self.__parse_str(output)) 2385 2386 def __parse_ip6addr_list(self, output: List[str]) -> List[Ip6Addr]: 2387 return [Ip6Addr(line) for line in output] 2388 2389 def __parse_int(self, output: List[str], base=10) -> int: 2390 if len(output) != 1: 2391 raise UnexpectedCommandOutput(output) 2392 2393 return int(output[0], base) 2394 2395 def __parse_network_key(self, output: List[str]) -> str: 2396 networkkey = self.__parse_str(output) 2397 2398 try: 2399 self.__validate_network_key(networkkey) 2400 except ValueError: 2401 raise UnexpectedCommandOutput(output) 2402 2403 return networkkey 2404 2405 def __validate_network_key(self, networkkey: str): 2406 if len(networkkey) != 32: 2407 raise ValueError(networkkey) 2408 2409 int(networkkey, 16) 2410 2411 def __parse_hex64b(self, output: List[str]) -> str: 2412 extaddr = self.__parse_str(output) 2413 2414 try: 2415 self.__validate_hex64b(extaddr) 2416 except ValueError: 2417 raise UnexpectedCommandOutput(output) 2418 2419 return extaddr 2420 2421 __parse_extaddr = __parse_hex64b 2422 __parse_extpanid = __parse_hex64b 2423 __parse_eui64 = __parse_hex64b 2424 __parse_joiner_id = __parse_hex64b 2425 2426 def __validate_hex64b(self, extaddr: str): 2427 if len(extaddr) != 16: 2428 raise ValueError(extaddr) 2429 2430 self.__validate_hex(extaddr) 2431 2432 def __validate_hex(self, hexstr: str): 2433 if len(hexstr) % 2 != 0: 2434 raise ValueError(hexstr) 2435 2436 for i in range(0, len(hexstr), 2): 2437 int(hexstr[i:i + 2], 16) 2438 2439 __validate_extaddr = __validate_hex64b 2440 __validate_extpanid = __validate_hex64b 2441 2442 def __parse_Enabled_or_Disabled(self, output: List[str]) -> bool: 2443 return self.__parse_values(output, Enabled=True, Disabled=False) 2444 2445 def __parse_values(self, output: List[str], **vals) -> Any: 2446 val = self.__parse_str(output) 2447 if val not in vals: 2448 raise UnexpectedCommandOutput(output) 2449 2450 return vals[val] 2451 2452 def __validate_hex_or_bytes(self, data: Union[str, bytes]) -> str: 2453 if isinstance(data, bytes): 2454 return ''.join('%02x' % c for c in data) 2455 else: 2456 self.__validate_hex(data) 2457 return data 2458 2459 def __hex_to_bytes(self, hexstr: str) -> bytes: 2460 self.__validate_hex(hexstr) 2461 return bytes(int(hexstr[i:i + 2], 16) for i in range(0, len(hexstr), 2)) 2462 2463 def __bytes_to_hex(self, data: bytes) -> str: 2464 return ''.join('%02x' % b for b in data) 2465 2466 def __escape_escapable(self, s: str) -> str: 2467 """Escape CLI escapable characters in the given string. 2468 """ 2469 escapable_chars = '\\ \t\r\n' 2470 for char in escapable_chars: 2471 s = s.replace(char, '\\%s' % char) 2472 return s 2473 2474 def __txt_to_hex(self, txt: Dict[str, Union[str, bytes, bool]]) -> str: 2475 txt_bin = b'' 2476 for k, v in txt.items(): 2477 assert '=' not in k, 'TXT key must not contain `=`' 2478 2479 if isinstance(v, str): 2480 entry = f'{k}={v}'.encode('utf8') 2481 elif isinstance(v, bytes): 2482 entry = f'{k}='.encode('utf8') + v 2483 else: 2484 assert v is True, 'TXT val must be str or bytes or True' 2485 entry = k.encode('utf8') 2486 2487 assert len(entry) <= 255, 'TXT entry is too long' 2488 2489 txt_bin += bytes([len(entry)]) 2490 txt_bin += entry 2491 2492 return ''.join('%02x' % b for b in txt_bin) 2493 2494 2495def connect_cli_sim(executable: str, nodeid: int, simulator: Optional[Simulator] = None) -> OTCI: 2496 cli_handler = connectors.OtCliSim(executable, nodeid, simulator=simulator) 2497 cmd_handler = OtCliCommandRunner(cli_handler) 2498 return OTCI(cmd_handler) 2499 2500 2501def connect_cli_serial(dev: str, baudrate=115200) -> OTCI: 2502 cli_handler = connectors.OtCliSerial(dev, baudrate) 2503 cmd_handler = OtCliCommandRunner(cli_handler) 2504 return OTCI(cmd_handler) 2505 2506 2507def connect_ncp_sim(executable: str, nodeid: int, simulator: Optional[Simulator] = None) -> OTCI: 2508 ncp_handler = connectors.OtNcpSim(executable, nodeid, simulator=simulator) 2509 cmd_handler = OtCliCommandRunner(ncp_handler, is_spinel_cli=True) 2510 return OTCI(cmd_handler) 2511 2512 2513def connect_otbr_ssh(host: str, port: int = 22, username='pi', password='raspberry', sudo=True): 2514 cmd_handler = OtbrSshCommandRunner(host, port, username, password, sudo=sudo) 2515 return OTCI(cmd_handler) 2516 2517 2518def connect_otbr_adb(host: str, port: int = 5555): 2519 cmd_handler = OtbrAdbCommandRunner(host, port) 2520 return OTCI(cmd_handler) 2521 2522 2523def connect_cmd_handler(cmd_handler: OTCommandHandler) -> OTCI: 2524 return OTCI(cmd_handler) 2525