1#!/usr/bin/env python3 2# 3# Copyright (c) 2020, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29import ipaddress 30import logging 31import re 32from collections import Counter 33from typing import Callable, List, Collection, Union, Tuple, Optional, Dict, Pattern, Any 34 35from . import connectors 36from .command_handlers import OTCommandHandler, OtCliCommandRunner, OtbrSshCommandRunner, OtbrAdbTcpCommandRunner, OtbrAdbUsbCommandRunner 37from .connectors import Simulator 38from .errors import UnexpectedCommandOutput, ExpectLineTimeoutError, CommandError, InvalidArgumentsError 39from .types import ChildId, Rloc16, Ip6Addr, ThreadState, PartitionId, DeviceMode, RouterId, SecurityPolicy, Ip6Prefix, \ 40 RouterTableEntry, NetifIdentifier 41from .utils import match_line, constant_property 42 43 44class OTCI(object): 45 """ 46 This class represents an OpenThread Controller Interface instance that provides versatile interfaces to 47 manipulate an OpenThread device. 48 """ 49 50 DEFAULT_EXEC_COMMAND_RETRY = 4 # A command is retried 4 times if failed. 51 52 __exec_command_retry = DEFAULT_EXEC_COMMAND_RETRY 53 54 def __init__(self, otcmd: OTCommandHandler): 55 """ 56 This method initializes an OTCI instance. 57 58 :param otcmd: An OpenThread Command Handler instance to execute OpenThread CLI commands. 59 """ 60 self.__otcmd: OTCommandHandler = otcmd 61 self.__logger = logging.getLogger(name=str(self)) 62 63 def __repr__(self): 64 """Gets the string representation of the OTCI instance.""" 65 return repr(self.__otcmd) 66 67 def wait(self, duration: float, expect_line: Optional[Union[str, Pattern, Collection[Any]]] = None): 68 """Wait for a given duration. 69 70 :param duration: The duration (in seconds) wait for. 71 :param expect_line: The line expected to output if given. 72 Raise ExpectLineTimeoutError if expect_line is not found within the given duration. 73 """ 74 self.log('info', "wait for %.3f seconds", duration) 75 if expect_line is None: 76 self.__otcmd.wait(duration) 77 else: 78 success = False 79 80 while duration > 0: 81 output = self.__otcmd.wait(1) 82 if any(match_line(line, expect_line) for line in output): 83 success = True 84 break 85 86 duration -= 1 87 88 if not success: 89 raise ExpectLineTimeoutError(expect_line) 90 91 def close(self): 92 """Close the OTCI instance.""" 93 self.__otcmd.close() 94 95 def execute_command(self, 96 cmd: str, 97 timeout: float = 10, 98 silent: bool = False, 99 already_is_ok: bool = True) -> List[str]: 100 for i in range(self.__exec_command_retry + 1): 101 try: 102 return self.__execute_command(cmd, timeout, silent, already_is_ok=already_is_ok) 103 except Exception: 104 self.wait(2) 105 if i == self.__exec_command_retry: 106 raise 107 assert False 108 109 def __execute_command(self, 110 cmd: str, 111 timeout: float = 10, 112 silent: bool = False, 113 already_is_ok: bool = True) -> List[str]: 114 """Execute the OpenThread CLI command. 115 116 :param cmd: The command to execute. 117 :param timeout: The command timeout. 118 :param silent: Whether to run the command silent without logging. 119 :returns: The command output as a list of lines. 120 """ 121 if not silent: 122 self.log('info', '> %s', cmd) 123 124 output = self.__otcmd.execute_command(cmd, timeout) 125 126 if not silent: 127 for line in output: 128 self.log('info', '%s', line) 129 130 if cmd in ('reset', 'factoryreset'): 131 return output 132 133 if output[-1] == 'Done' or (already_is_ok and output[-1] == 'Error 24: Already'): 134 output = output[:-1] 135 return output 136 else: 137 raise CommandError(cmd, output) 138 139 def execute_platform_command(self, cmd: str, timeout: float = 10, silent: bool = False) -> List[str]: 140 """Execute the platform command. 141 142 :param cmd: The command to execute. 143 :param timeout: The command timeout. 144 :param silent: Whether to run the command silent without logging. 145 :returns: The command output as a list of lines. 146 """ 147 if not silent: 148 self.log('info', '> %s', cmd) 149 150 output = self.__otcmd.execute_platform_command(cmd, timeout) 151 152 if not silent: 153 for line in output: 154 self.log('info', '%s', line) 155 156 return output 157 158 def set_execute_command_retry(self, n: int): 159 assert n >= 0 160 self.__exec_command_retry = n 161 162 def shell(self, cmd: str, timeout: float = 10): 163 self.log('info', '# %s', cmd) 164 output = self.__otcmd.shell(cmd, timeout=timeout) 165 for line in output: 166 self.log('info', '%s', line) 167 return output 168 169 def set_logger(self, logger: logging.Logger): 170 """Set the logger for the OTCI instance, or None to disable logging.""" 171 self.__logger = logger 172 173 def log(self, level, fmt, *args, **kwargs): 174 if self.__logger is not None: 175 getattr(self.__logger, level)('(%s) ' + fmt, repr(self), *args, **kwargs) 176 177 def set_line_read_callback(self, callback: Optional[Callable[[str], Any]]): 178 """Set the callback that will be called for each line output by the CLI.""" 179 self.__otcmd.set_line_read_callback(callback) 180 181 # 182 # Constant properties 183 # 184 @constant_property 185 def version(self): 186 """Returns the firmware version. (e.g. "OPENTHREAD/20191113-01411-gb2d66e424-dirty; SIMULATION; Nov 14 2020 14:24:38")""" 187 return self.__parse_str(self.execute_command('version')) 188 189 @constant_property 190 def thread_version(self): 191 """Get the Thread Version number.""" 192 return self.__parse_int(self.execute_command('thread version')) 193 194 @constant_property 195 def api_version(self): 196 """Get API version number.""" 197 try: 198 return self.__parse_int(self.execute_command('version api')) 199 except ValueError: 200 # If the device does not have `version api` command, it will print the firmware version, which would lead to ValueError. 201 return 0 202 203 # 204 # Basic device operations 205 # 206 def ifconfig_up(self): 207 """Bring up the IPv6 interface.""" 208 self.execute_command('ifconfig up') 209 210 def ifconfig_down(self): 211 """Bring down the IPv6 interface.""" 212 self.execute_command('ifconfig down') 213 214 def get_ifconfig_state(self) -> bool: 215 """Get the status of the IPv6 interface.""" 216 return self.__parse_values(self.execute_command('ifconfig'), up=True, down=False) 217 218 def thread_start(self): 219 """Enable Thread protocol operation and attach to a Thread network.""" 220 self.execute_command('thread start') 221 222 def thread_stop(self): 223 """Disable Thread protocol operation and detach from a Thread network.""" 224 self.execute_command('thread stop') 225 226 def reset(self): 227 """Signal a platform reset.""" 228 self.execute_command('reset') 229 230 def factory_reset(self): 231 """Delete all stored settings, and signal a platform reset.""" 232 self.execute_command('factoryreset') 233 234 # 235 # Network Operations 236 # 237 _PING_STATISTICS_PATTERN = re.compile( 238 r'^(?P<transmitted>\d+) packets transmitted, (?P<received>\d+) packets received.(?: Packet loss = (?P<loss>\d+\.\d+)%.)?(?: Round-trip min/avg/max = (?P<min>\d+)/(?P<avg>\d+\.\d+)/(?P<max>\d+) ms.)?$' 239 ) 240 241 def ping(self, 242 ip: Union[str, Ip6Addr], 243 size: int = 8, 244 count: int = 1, 245 interval: float = 1, 246 hoplimit: int = 64, 247 timeout: float = 3) -> Dict: 248 """Send an ICMPv6 Echo Request. 249 The default arguments are consistent with https://github.com/openthread/openthread/blob/main/src/core/utils/ping_sender.hpp. 250 251 :param ip: The target IPv6 address to ping. 252 :param size: The number of data bytes in the payload. Default is 8. 253 :param count: The number of ICMPv6 Echo Requests to be sent. Default is 1. 254 :param interval: The interval between two consecutive ICMPv6 Echo Requests in seconds. The value may have fractional form, for example 0.5. Default is 1. 255 :param hoplimit: The hoplimit of ICMPv6 Echo Request to be sent. Default is 64. See OPENTHREAD_CONFIG_IP6_HOP_LIMIT_DEFAULT in src/core/config/ip6.h. 256 :param timeout: The maximum duration in seconds for the ping command to wait after the final echo request is sent. Default is 3. 257 """ 258 cmd = f'ping {ip} {size} {count} {interval} {hoplimit} {timeout}' 259 260 timeout_allowance = 3 261 lines = self.execute_command(cmd, timeout=(count - 1) * interval + timeout + timeout_allowance) 262 263 statistics = {} 264 for line in lines: 265 m = OTCI._PING_STATISTICS_PATTERN.match(line) 266 if m is not None: 267 if m.group('transmitted') is not None: 268 statistics['transmitted_packets'] = int(m.group('transmitted')) 269 statistics['received_packets'] = int(m.group('received')) 270 if m.group('loss') is not None: 271 statistics['packet_loss'] = float(m.group('loss')) / 100 272 if m.group('min') is not None: 273 statistics['round_trip_time'] = { 274 'min': int(m.group('min')), 275 'avg': float(m.group('avg')), 276 'max': int(m.group('max')) 277 } 278 return statistics 279 280 def ping_stop(self): 281 """Stop sending ICMPv6 Echo Requests.""" 282 self.execute_command('ping stop') 283 284 def discover(self, channel: Optional[int] = None) -> List[Dict[str, Any]]: 285 """Perform an MLE Discovery operation.""" 286 return self.__scan_networks('discover', channel) 287 288 def scan(self, channel: Optional[int] = None) -> List[Dict[str, Any]]: 289 """Perform an IEEE 802.15.4 Active Scan.""" 290 return self.__scan_networks('scan', channel) 291 292 def __scan_networks(self, cmd: str, channel: Optional[int] = None) -> List[Dict[str, Any]]: 293 if channel is not None: 294 cmd += f' {channel}' 295 296 output = self.execute_command(cmd, timeout=10) 297 if len(output) < 2: 298 raise UnexpectedCommandOutput(output) 299 300 networks = [] 301 for line in output[2:]: 302 fields = line.strip().split('|') 303 304 try: 305 _, J, netname, extpanid, panid, extaddr, ch, dbm, lqi, _ = fields 306 except Exception: 307 logging.warning('ignored output: %r', line) 308 continue 309 310 networks.append({ 311 'joinable': bool(int(J)), 312 'network_name': netname.strip(), 313 'extpanid': extpanid, 314 'panid': int(panid, 16), 315 'extaddr': extaddr, 316 'channel': int(ch), 317 'dbm': int(dbm), 318 'lqi': int(lqi), 319 }) 320 321 return networks 322 323 def scan_energy(self, duration: Optional[float] = None, channel: Optional[int] = None) -> Dict[int, int]: 324 """Perform an IEEE 802.15.4 Energy Scan.""" 325 cmd = 'scan energy' 326 if duration is not None: 327 cmd += f' {duration * 1000:d}' 328 329 if channel is not None: 330 cmd += f' {channel}' 331 332 output = self.execute_command(cmd, timeout=10) 333 if len(output) < 2: 334 raise UnexpectedCommandOutput(output) 335 336 channels = {} 337 for line in output[2:]: 338 fields = line.strip().split('|') 339 340 _, Ch, RSSI, _ = fields 341 channels[int(Ch)] = int(RSSI) 342 343 return channels 344 345 def mac_send_data_request(self): 346 """Instruct an Rx-Off-When-Idle device to send a Data Request mac frame to its parent.""" 347 self.execute_command('mac send datarequest') 348 349 def mac_send_empty_data(self): 350 """Instruct an Rx-Off-When-Idle device to send a Empty Data mac frame to its parent.""" 351 self.execute_command('mac send emptydata') 352 353 # TODO: discover 354 # TODO: dns resolve <hostname> [DNS server IP] [DNS server port] 355 # TODO: fake /a/an <dst-ipaddr> <target> <meshLocalIid> 356 # TODO: sntp query 357 358 # 359 # Set or get device/network parameters 360 # 361 362 def get_mode(self) -> str: 363 """Get the Thread Device Mode value. 364 365 -: no flags set (rx-off-when-idle, minimal Thread device, stable network data) 366 r: rx-on-when-idle 367 d: Full Thread Device 368 n: Full Network Data 369 """ 370 return self.__parse_str(self.execute_command('mode')) 371 372 def set_mode(self, mode: str): 373 """Set the Thread Device Mode value. 374 375 -: no flags set (rx-off-when-idle, minimal Thread device, stable network data) 376 r: rx-on-when-idle 377 d: Full Thread Device 378 n: Full Network Data 379 """ 380 self.execute_command(f'mode {DeviceMode(mode)}') 381 382 def get_extaddr(self) -> str: 383 """Get the IEEE 802.15.4 Extended Address.""" 384 return self.__parse_extaddr(self.execute_command('extaddr')) 385 386 def set_extaddr(self, extaddr: str): 387 """Set the IEEE 802.15.4 Extended Address.""" 388 self.__validate_hex64b(extaddr) 389 self.execute_command(f'extaddr {extaddr}') 390 391 def get_eui64(self) -> str: 392 """Get the factory-assigned IEEE EUI-64.""" 393 return self.__parse_eui64(self.execute_command('eui64')) 394 395 def set_extpanid(self, extpanid: str): 396 """Set the Thread Extended PAN ID value.""" 397 self.__validate_extpanid(extpanid) 398 self.execute_command(f'extpanid {extpanid}') 399 400 def get_extpanid(self) -> str: 401 """Get the Thread Extended PAN ID value.""" 402 return self.__parse_extpanid(self.execute_command('extpanid')) 403 404 def set_channel(self, ch): 405 """Set the IEEE 802.15.4 Channel value.""" 406 self.execute_command('channel %d' % ch) 407 408 def get_channel(self): 409 """Get the IEEE 802.15.4 Channel value.""" 410 return self.__parse_int(self.execute_command('channel')) 411 412 def get_preferred_channel_mask(self) -> int: 413 """Get preferred channel mask.""" 414 return self.__parse_int(self.execute_command('channel preferred')) 415 416 def get_supported_channel_mask(self): 417 """Get supported channel mask.""" 418 return self.__parse_int(self.execute_command('channel supported')) 419 420 def get_panid(self): 421 """Get the IEEE 802.15.4 PAN ID value.""" 422 return self.__parse_int(self.execute_command('panid'), 16) 423 424 def set_panid(self, panid): 425 """Get the IEEE 802.15.4 PAN ID value.""" 426 self.execute_command('panid %d' % panid) 427 428 def set_network_name(self, name): 429 """Set network name.""" 430 self.execute_command('networkname %s' % self.__escape_escapable(name)) 431 432 def get_network_name(self): 433 """Get network name.""" 434 return self.__parse_str(self.execute_command('networkname')) 435 436 def get_network_key(self) -> str: 437 """Get the network key.""" 438 return self.__parse_network_key(self.execute_command(self.__detect_networkkey_cmd())) 439 440 def set_network_key(self, networkkey: str): 441 """Set the network key.""" 442 self.__validate_network_key(networkkey) 443 cmd = self.__detect_networkkey_cmd() 444 self.execute_command(f'{cmd} {networkkey}') 445 446 def get_key_sequence_counter(self) -> int: 447 """Get the Thread Key Sequence Counter.""" 448 return self.__parse_int(self.execute_command('keysequence counter')) 449 450 def set_key_sequence_counter(self, counter: int): 451 """Set the Thread Key Sequence Counter.""" 452 self.execute_command(f'keysequence counter {counter}') 453 454 def get_key_sequence_guard_time(self) -> int: 455 """Get Thread Key Switch Guard Time (in hours).""" 456 return self.__parse_int(self.execute_command('keysequence guardtime')) 457 458 def set_key_sequence_guard_time(self, hours: int): 459 """Set Thread Key Switch Guard Time (in hours) 0 means Thread Key Switch immediately if key index match.""" 460 self.execute_command(f'keysequence guardtime {hours}') 461 462 def get_cca_threshold(self) -> int: 463 """Get the CCA threshold in dBm measured at antenna connector per IEEE 802.15.4 - 2015 section 10.1.4.""" 464 output = self.execute_command(f'ccathreshold') 465 val = self.__parse_str(output) 466 if not val.endswith(' dBm'): 467 raise UnexpectedCommandOutput(output) 468 469 return int(val[:-4]) 470 471 def set_cca_threshold(self, val: int): 472 """Set the CCA threshold measured at antenna connector per IEEE 802.15.4 - 2015 section 10.1.4.""" 473 self.execute_command(f'ccathreshold {val}') 474 475 def get_promiscuous(self) -> bool: 476 """Get radio promiscuous property.""" 477 return self.__parse_Enabled_or_Disabled(self.execute_command('promiscuous')) 478 479 def enable_promiscuous(self): 480 """Enable radio promiscuous operation and print raw packet content.""" 481 self.execute_command('promiscuous enable') 482 483 def disable_promiscuous(self): 484 """Disable radio promiscuous operation.""" 485 self.execute_command('promiscuous disable') 486 487 def get_txpower(self) -> int: 488 """Get the transmit power in dBm.""" 489 line = self.__parse_str(self.execute_command('txpower')) 490 if not line.endswith(' dBm'): 491 raise UnexpectedCommandOutput([line]) 492 493 return int(line.split()[0]) 494 495 def set_txpower(self, val: int): 496 """Set the transmit power in dBm.""" 497 self.execute_command(f'txpower {val}') 498 499 # TODO: fem 500 # TODO: fem lnagain 501 # TODO: fem lnagain <LNA gain> 502 # TODO: mac retries direct 503 # TODO: mac retries direct 504 # TODO: mac retries indirect 505 # TODO: mac retries indirect <number> 506 507 # 508 # Basic Node states and properties 509 # 510 511 def get_state(self) -> ThreadState: 512 """Get the current Thread state.""" 513 return ThreadState(self.__parse_str(self.execute_command('state'))) 514 515 def set_state(self, state: str): 516 """Try to switch to state detached, child, router or leader.""" 517 self.execute_command(f'state {state}') 518 519 def get_rloc16(self) -> Rloc16: 520 """Get the Thread RLOC16 value.""" 521 return Rloc16(self.__parse_int(self.execute_command('rloc16'), 16)) 522 523 def get_router_id(self) -> int: 524 """Get the Thread Router ID value.""" 525 return self.get_rloc16() >> 10 526 527 def prefer_router_id(self, routerid: int): 528 """Prefer a Router ID when solicit router id from Leader.""" 529 self.execute_command(f'preferrouterid {routerid}') 530 531 def is_singleton(self) -> bool: 532 return self.__parse_values(self.execute_command('singleton'), true=True, false=False) 533 534 # 535 # RCP related utilities 536 # 537 538 def get_rcp_version(self): 539 return self.__parse_str(self.execute_command('rcp version')) 540 541 # 542 # Unsecure port utilities 543 # 544 545 def get_unsecure_ports(self) -> List[int]: 546 """all ports from the allowed unsecured port list.""" 547 return self.__parse_int_list(self.execute_command('unsecureport get')) 548 549 def add_unsecure_port(self, port: int): 550 """Add a port to the allowed unsecured port list.""" 551 self.execute_command(f'unsecureport add {port}') 552 553 def remove_unsecure_port(self, port: int): 554 """Remove a port from the allowed unsecured port list.""" 555 self.execute_command(f'unsecureport remove {port}') 556 557 def clear_unsecure_ports(self): 558 """Remove all ports from the allowed unsecured port list.""" 559 self.execute_command('unsecureport remove all') 560 561 # 562 # Leader configurations 563 # 564 565 def get_preferred_partition_id(self) -> PartitionId: 566 """Get the preferred Thread Leader Partition ID.""" 567 return PartitionId(self.__parse_int(self.execute_command(self.__get_partition_preferred_cmd()))) 568 569 def set_preferred_partition_id(self, parid: int): 570 """Set the preferred Thread Leader Partition ID.""" 571 self.execute_command(f'{self.__get_partition_preferred_cmd()} {parid}') 572 573 def __get_partition_preferred_cmd(self) -> str: 574 """""" 575 return 'partitionid preferred' if self.api_version >= 51 else 'leaderpartitionid' 576 577 def get_leader_weight(self) -> int: 578 """Get the Thread Leader Weight.""" 579 return self.__parse_int(self.execute_command('leaderweight')) 580 581 def set_leader_weight(self, weight: int): 582 """Set the Thread Leader Weight.""" 583 self.execute_command(f'leaderweight {weight}') 584 585 __LEADER_DATA_KEY_MAP = { 586 'Partition ID': 'partition_id', 587 'Weighting': 'weight', 588 'Data Version': 'data_ver', 589 'Stable Data Version': 'stable_data_ver', 590 'Leader Router ID': 'leader_id', 591 } 592 593 def get_leader_data(self) -> Dict[str, int]: 594 """Get the Thread Leader Data.""" 595 data = {} 596 output = self.execute_command('leaderdata') 597 598 try: 599 for line in output: 600 k, v = line.split(': ') 601 data[OTCI.__LEADER_DATA_KEY_MAP[k]] = int(v) 602 except KeyError: 603 raise UnexpectedCommandOutput(output) 604 605 return data 606 607 # 608 # Router configurations 609 # 610 611 def get_router_selection_jitter(self): 612 """Get the ROUTER_SELECTION_JITTER value.""" 613 return self.__parse_int(self.execute_command('routerselectionjitter')) 614 615 def set_router_selection_jitter(self, jitter): 616 """Set the ROUTER_SELECTION_JITTER value.""" 617 self.execute_command(f'routerselectionjitter {jitter}') 618 619 def get_network_id_timeout(self) -> int: 620 """Get the NETWORK_ID_TIMEOUT parameter used in the Router role.""" 621 return self.__parse_int(self.execute_command('networkidtimeout')) 622 623 def set_network_id_timeout(self, timeout: int): 624 """Set the NETWORK_ID_TIMEOUT parameter used in the Router role.""" 625 self.execute_command(f'networkidtimeout {timeout}') 626 627 def get_parent_priority(self) -> int: 628 """Get the assigned parent priority value, -2 means not assigned.""" 629 return self.__parse_int(self.execute_command('parentpriority')) 630 631 def set_parent_priority(self, priority: int): 632 """Set the assigned parent priority value: 1, 0, -1 or -2.""" 633 self.execute_command(f'parentpriority {priority}') 634 635 def get_router_upgrade_threshold(self) -> int: 636 """Get the ROUTER_UPGRADE_THRESHOLD value.""" 637 return self.__parse_int(self.execute_command('routerupgradethreshold')) 638 639 def set_router_upgrade_threshold(self, threshold: int): 640 """Set the ROUTER_UPGRADE_THRESHOLD value.""" 641 self.execute_command(f'routerupgradethreshold {threshold}') 642 643 def get_router_downgrade_threshold(self): 644 """Set the ROUTER_DOWNGRADE_THRESHOLD value.""" 645 return self.__parse_int(self.execute_command('routerdowngradethreshold')) 646 647 def set_router_downgrade_threshold(self, threshold: int): 648 """Get the ROUTER_DOWNGRADE_THRESHOLD value.""" 649 self.execute_command(f'routerdowngradethreshold {threshold}') 650 651 def get_router_eligible(self) -> bool: 652 """Indicates whether the router role is enabled or disabled.""" 653 return self.__parse_Enabled_or_Disabled(self.execute_command('routereligible')) 654 655 def enable_router_eligible(self): 656 """Disable the router role.""" 657 self.execute_command('routereligible enable') 658 659 def disable_router_eligible(self): 660 """Disable the router role.""" 661 self.execute_command('routereligible disable') 662 663 def get_router_list(self) -> List[RouterId]: 664 """Get allocated Router IDs.""" 665 line = self.__parse_str(self.execute_command('router list')) 666 return list(map(RouterId, line.strip().split())) 667 668 def get_router_table(self) -> Dict[RouterId, RouterTableEntry]: 669 """table of routers.""" 670 output = self.execute_command('router table') 671 if len(output) < 2: 672 raise UnexpectedCommandOutput(output) 673 674 # 675 # Example output: 676 # 677 # | ID | RLOC16 | Next Hop | Path Cost | LQ In | LQ Out | Age | Extended MAC | 678 # +----+--------+----------+-----------+-------+--------+-----+------------------+ 679 # | 21 | 0x5400 | 21 | 0 | 3 | 3 | 5 | d28d7f875888fccb | 680 # | 56 | 0xe000 | 56 | 0 | 0 | 0 | 182 | f2d92a82c8d8fe43 | 681 # Done 682 # 683 684 headers = self.__split_table_row(output[0]) 685 686 table = {} 687 for line in output[2:]: 688 line = line.strip() 689 if not line: 690 continue 691 692 fields = self.__split_table_row(line) 693 if len(fields) != len(headers): 694 raise UnexpectedCommandOutput(output) 695 696 col = lambda colname: self.__get_table_col(colname, headers, fields) 697 id = col('ID') 698 699 table[RouterId(id)] = router = RouterTableEntry({ 700 'id': RouterId(id), 701 'rloc16': Rloc16(col('RLOC16'), 16), 702 'next_hop': int(col('Next Hop')), 703 'path_cost': int(col('Path Cost')), 704 'lq_in': int(col('LQ In')), 705 'lq_out': int(col('LQ Out')), 706 'age': int(col('Age')), 707 'extaddr': col('Extended MAC'), 708 }) 709 710 if 'Link' in headers: 711 router['link'] = int(col('Link')) 712 else: 713 # support older version of OT which does not output `Link` field 714 router['link'] = self.get_router_info(router['id'], silent=True)['link'] 715 716 return table 717 718 def get_router_info(self, id: int, silent: bool = False) -> RouterTableEntry: 719 cmd = f'router {id}' 720 info = {} 721 output = self.execute_command(cmd, silent=silent) 722 items = [line.strip().split(': ') for line in output] 723 724 headers = [h for h, _ in items] 725 fields = [f for _, f in items] 726 col = lambda colname: self.__get_table_col(colname, headers, fields) 727 728 return RouterTableEntry({ 729 'id': RouterId(id), 730 'rloc16': Rloc16(col('Rloc'), 16), 731 'alloc': int(col('Alloc')), 732 'next_hop': int(col('Next Hop'), 16) >> 10, # convert RLOC16 to Router ID 733 'link': int(col('Link')), 734 }) 735 736 # 737 # Router utilities: Child management 738 # 739 740 def get_child_table(self) -> Dict[ChildId, Dict[str, Any]]: 741 """Get the table of attached children.""" 742 output = self.execute_command('child table') 743 if len(output) < 2: 744 raise UnexpectedCommandOutput(output) 745 746 # 747 # Example output: 748 # | ID | RLOC16 | Timeout | Age | LQ In | C_VN |R|D|N|Ver|CSL|QMsgCnt| Extended MAC | 749 # +-----+--------+------------+------------+-------+------+-+-+-+---+---+-------+------------------+ 750 # | 1 | 0xc801 | 240 | 24 | 3 | 131 |1|0|0| 3| 0 | 0 | 4ecede68435358ac | 751 # | 2 | 0xc802 | 240 | 2 | 3 | 131 |0|0|0| 3| 1 | 0 | a672a601d2ce37d8 | 752 # Done 753 # 754 755 headers = self.__split_table_row(output[0]) 756 757 table = {} 758 for line in output[2:]: 759 line = line.strip() 760 if not line: 761 continue 762 763 fields = self.__split_table_row(line) 764 col = lambda colname: self.__get_table_col(colname, headers, fields) 765 766 id = int(col("ID")) 767 r, d, n = int(col("R")), int(col("D")), int(col("N")) 768 769 # 770 # Device mode flags: 771 # 772 # r: rx-on-when-idle 773 # d: Full Thread Device 774 # n: Full Network Data 775 # -: no flags set (rx-off-when-idle, minimal Thread device, stable network data) 776 mode = DeviceMode( 777 f'{"r" if r else ""}{"d" if d else ""}{"n" if n else ""}{"-" if r == d == n == 0 else ""}') 778 779 child = { 780 'id': ChildId(id), 781 'rloc16': Rloc16(col('RLOC16'), 16), 782 'timeout': int(col('Timeout')), 783 'age': int(col('Age')), 784 'lq_in': int(col('LQ In')), 785 'c_vn': int(col('C_VN')), 786 'mode': mode, 787 'extaddr': col('Extended MAC') 788 } 789 790 if 'Ver' in headers: 791 child['ver'] = int(col('Ver')) 792 793 if 'CSL' in headers: 794 child['csl'] = bool(int(col('CSL'))) 795 796 if 'QMsgCnt' in headers: 797 child['qmsgcnt'] = int(col('QMsgCnt')) 798 799 if 'Suprvsn' in headers: 800 child['suprvsn'] = int(col('Suprvsn')) 801 802 table[ChildId(id)] = child 803 804 return table 805 806 # 807 # DNS server & client utilities 808 # 809 810 _IPV6_SERVER_PORT_PATTERN = re.compile(r'\[(.*)\]:(\d+)') 811 812 def dns_get_config(self): 813 """Get DNS client query config.""" 814 output = self.execute_command('dns config') 815 config = {} 816 for line in output: 817 k, v = line.split(': ') 818 if k == 'Server': 819 matched = re.match(OTCI._IPV6_SERVER_PORT_PATTERN, v) 820 assert matched is not None 821 ip, port = matched.groups() 822 config['server'] = (Ip6Addr(ip), int(port)) 823 elif k == 'ResponseTimeout': 824 config['response_timeout'] = int(v[:-3]) 825 elif k == 'MaxTxAttempts': 826 config['max_tx_attempts'] = int(v) 827 elif k == 'RecursionDesired': 828 config['recursion_desired'] = (v == 'yes') 829 else: 830 logging.warning("dns config ignored: %s", line) 831 832 return config 833 834 def dns_set_config(self, 835 server: Tuple[Union[str, ipaddress.IPv6Address], int], 836 response_timeout: Optional[int] = None, 837 max_tx_attempts: Optional[int] = None, 838 recursion_desired: Optional[bool] = None): 839 """Set DNS client query config.""" 840 cmd = f'dns config {str(server[0])} {server[1]}' 841 if response_timeout is not None: 842 cmd += f' {response_timeout}' 843 844 assert max_tx_attempts is None or response_timeout is not None, "must specify `response_timeout` if `max_tx_attempts` is specified." 845 if max_tx_attempts is not None: 846 cmd += f' {max_tx_attempts}' 847 848 assert recursion_desired is None or max_tx_attempts is not None, 'must specify `max_tx_attempts` if `recursion_desired` is specified.' 849 if recursion_desired is not None: 850 cmd += f' {1 if recursion_desired else 0}' 851 852 self.execute_command(cmd) 853 854 def dns_get_compression(self) -> bool: 855 """Get DNS compression mode.""" 856 return self.__parse_Enabled_or_Disabled(self.execute_command('dns compression')) 857 858 def dns_enable_compression(self): 859 """Enable DNS compression mode.""" 860 self.execute_command('dns compression enable') 861 862 def dns_disable_compression(self): 863 """Disable DNS compression mode.""" 864 self.execute_command('dns compression disable') 865 866 def dns_browse(self, service: str) -> List[Dict]: 867 """Browse DNS service instances.""" 868 cmd = f'dns browse {service}' 869 output = '\n'.join(self.execute_command(cmd, 30.0)) 870 871 result = [] 872 for ins, port, priority, weight, srv_ttl, hostname, address, aaaa_ttl, txt_data, txt_ttl in re.findall( 873 r'(.*?)\s+Port:(\d+), Priority:(\d+), Weight:(\d+), TTL:(\d+)\s*Host:(\S+)\s+HostAddress:(\S+) TTL:(\d+)\s+TXT:(\[.*?\]) TTL:(\d+)', 874 output): 875 result.append({ 876 'instance': ins, 877 'service': service, 878 'port': int(port), 879 'priority': int(priority), 880 'weight': int(weight), 881 'host': hostname, 882 'address': Ip6Addr(address), 883 'txt': self.__parse_srp_server_service_txt(txt_data), 884 'srv_ttl': int(srv_ttl), 885 'txt_ttl': int(txt_ttl), 886 'aaaa_ttl': int(aaaa_ttl), 887 }) 888 889 return result 890 891 def dns_resolve(self, hostname: str) -> List[Dict]: 892 """Resolve a DNS host name.""" 893 cmd = f'dns resolve {hostname}' 894 output = self.execute_command(cmd, 30.0) 895 dns_resp = output[0] 896 addrs = dns_resp.strip().split(' - ')[1].split(' ') 897 ips = [Ip6Addr(item.strip()) for item in addrs[::2]] 898 ttls = [int(item.split('TTL:')[1]) for item in addrs[1::2]] 899 900 return [{ 901 'address': ip, 902 'ttl': ttl, 903 } for ip, ttl in zip(ips, ttls)] 904 905 def dns_resolve_service(self, instance: str, service: str) -> Dict: 906 """Resolves aservice instance.""" 907 instance = self.__escape_escapable(instance) 908 cmd = f'dns service {instance} {service}' 909 output = self.execute_command(cmd, 30.0) 910 911 m = re.match( 912 r'.*Port:(\d+), Priority:(\d+), Weight:(\d+), TTL:(\d+)\s+Host:(.*?)\s+HostAddress:(\S+) TTL:(\d+)\s+TXT:(\[.*?\]) TTL:(\d+)', 913 '\t'.join(output)) 914 if m: 915 port, priority, weight, srv_ttl, hostname, address, aaaa_ttl, txt_data, txt_ttl = m.groups() 916 return { 917 'instance': instance, 918 'service': service, 919 'port': int(port), 920 'priority': int(priority), 921 'weight': int(weight), 922 'host': hostname, 923 'address': Ip6Addr(address), 924 'txt': self.__parse_srp_server_service_txt(txt_data), 925 'srv_ttl': int(srv_ttl), 926 'txt_ttl': int(txt_ttl), 927 'aaaa_ttl': int(aaaa_ttl), 928 } 929 else: 930 raise CommandError(cmd, output) 931 932 # 933 # SRP server & client utilities 934 # 935 936 def srp_server_get_state(self): 937 """Get the SRP server state""" 938 return self.__parse_str(self.execute_command('srp server state')) 939 940 def srp_server_enable(self): 941 """Enable SRP server.""" 942 self.execute_command('srp server enable') 943 944 def srp_server_disable(self): 945 """Disable SRP server.""" 946 self.execute_command('srp server disable') 947 948 def srp_server_get_domain(self) -> str: 949 """Get the SRP server domain.""" 950 return self.__parse_str(self.execute_command('srp server domain')) 951 952 def srp_server_set_domain(self, domain: str): 953 """Set the SRP server domain.""" 954 self.execute_command(f'srp server domain {domain}') 955 956 def srp_server_get_hosts(self) -> List[Dict]: 957 """Get SRP server registered hosts.""" 958 return self.__parse_srp_server_hosts(self.execute_command('srp server host')) 959 960 def srp_server_get_services(self) -> List[Dict]: 961 """Get SRP server registered services.""" 962 output = self.execute_command('srp server service') 963 return self.__parse_srp_server_services(output) 964 965 def __parse_srp_server_hosts(self, output: List[str]) -> List[Dict]: 966 result = [] 967 info = None 968 for line in output: 969 if not line.startswith(' '): 970 info = {'host': line} 971 result.append(info) 972 else: 973 assert info is not None 974 k, v = line.strip().split(': ') 975 if k == 'deleted': 976 if v not in ('true', 'false'): 977 raise UnexpectedCommandOutput(output) 978 979 info['deleted'] = (v == 'true') 980 981 elif k == 'addresses': 982 if not v.startswith('[') or not v.endswith(']'): 983 raise UnexpectedCommandOutput(output) 984 985 v = v[1:-1] 986 info['addresses'] = list(map(Ip6Addr, v.split(', '))) 987 else: 988 raise UnexpectedCommandOutput(output) 989 990 return result 991 992 def __parse_srp_server_services(self, output: List[str]) -> List[Dict]: 993 result = [] 994 info = None 995 for line in output: 996 if not line.startswith(' '): 997 info = {'instance': line} 998 result.append(info) 999 else: 1000 assert info is not None 1001 k, v = line.strip().split(': ') 1002 if k == 'deleted': 1003 if v not in ('true', 'false'): 1004 raise UnexpectedCommandOutput(output) 1005 1006 info['deleted'] = (v == 'true') 1007 1008 elif k == 'addresses': 1009 if not v.startswith('[') or not v.endswith(']'): 1010 raise UnexpectedCommandOutput(output) 1011 1012 v = v[1:-1] 1013 info['addresses'] = list(map(Ip6Addr, v.split(', '))) 1014 elif k == 'subtypes': 1015 info[k] = list() if v == '(null)' else list(v.split(',')) 1016 elif k in ('port', 'weight', 'priority', 'ttl', 'lease', 'key-lease'): 1017 info[k] = int(v) 1018 elif k in ('host',): 1019 info[k] = v 1020 elif k == 'TXT': 1021 info['txt'] = self.__parse_srp_server_service_txt(v) 1022 else: 1023 raise UnexpectedCommandOutput(output) 1024 1025 return result 1026 1027 def __parse_srp_server_service_txt(self, txt: str) -> Dict[str, Union[bytes, bool]]: 1028 # example value: [txt11=76616c3131, txt12=76616c3132] 1029 assert txt.startswith('[') and txt.endswith(']') 1030 txt_dict = {} 1031 for entry in txt[1:-1].split(', '): 1032 if not entry: 1033 continue 1034 1035 equal_pos = entry.find('=') 1036 1037 if equal_pos != -1: 1038 k, v = entry[:equal_pos], entry[equal_pos + 1:] 1039 txt_dict[k] = bytes(int(v[i:i + 2], 16) for i in range(0, len(v), 2)) 1040 else: 1041 txt_dict[entry] = True 1042 1043 return txt_dict 1044 1045 def srp_server_get_lease(self) -> Tuple[int, int, int, int]: 1046 """Get SRP server LEASE & KEY-LEASE range (in seconds).""" 1047 lines = self.execute_command(f'srp server lease') 1048 return tuple([int(line.split(':')[1].strip()) for line in lines]) 1049 1050 def srp_server_set_lease(self, min_lease: int, max_lease: int, min_key_lease: int, max_key_lease: int): 1051 """Configure SRP server LEASE & KEY-LEASE range (in seconds).""" 1052 self.execute_command(f'srp server lease {min_lease} {max_lease} {min_key_lease} {max_key_lease}') 1053 1054 def srp_client_get_state(self) -> bool: 1055 """Get SRP client state.""" 1056 return self.__parse_Enabled_or_Disabled(self.execute_command('srp client state')) 1057 1058 def srp_client_start(self, server_ip: Union[str, ipaddress.IPv6Address], server_port: int): 1059 """Start SRP client.""" 1060 self.execute_command(f'srp client start {str(server_ip)} {server_port}') 1061 1062 def srp_client_stop(self): 1063 """Stop SRP client.""" 1064 self.execute_command('srp client stop') 1065 1066 def srp_client_get_autostart(self) -> bool: 1067 """Get SRP client autostart mode.""" 1068 return self.__parse_Enabled_or_Disabled(self.execute_command('srp client autostart')) 1069 1070 def srp_client_enable_autostart(self): 1071 """Enable SRP client autostart mode.""" 1072 self.execute_command('srp client autostart enable') 1073 1074 def srp_client_disable_autostart(self): 1075 """Disable SRP client autostart mode.""" 1076 self.execute_command('srp client autostart disable') 1077 1078 def srp_client_get_callback(self) -> bool: 1079 """Get SRP client callback mode.""" 1080 return self.__parse_Enabled_or_Disabled(self.execute_command('srp client callback')) 1081 1082 def srp_client_enable_callback(self): 1083 """Enable SRP client callback mode.""" 1084 self.execute_command('srp client callback enable') 1085 1086 def srp_client_disable_callback(self): 1087 """Disable SRP client callback mode.""" 1088 self.execute_command('srp client callback disable') 1089 1090 def srp_client_set_host_name(self, name: str): 1091 """Set SRP client host name.""" 1092 self.execute_command(f'srp client host name {name}') 1093 1094 def srp_client_get_host(self) -> Dict: 1095 """Get SRP client host.""" 1096 output = self.__parse_str(self.execute_command('srp client host')) 1097 return self.__parse_srp_client_host(output) 1098 1099 _SRP_CLIENT_HOST_PATTERN = re.compile(r'name:("(.*)"|(\(null\))), state:(\S+), addrs:\[(.*)\]') 1100 1101 def __parse_srp_client_host(self, line: str) -> Dict: 1102 m = re.match(OTCI._SRP_CLIENT_HOST_PATTERN, line) 1103 if not m: 1104 raise UnexpectedCommandOutput([line]) 1105 1106 _, host, _, state, addrs = m.groups() 1107 return { 1108 'host': host or '', 1109 'state': state, 1110 'addresses': [Ip6Addr(ip) for ip in addrs.split(', ')] if addrs else [], 1111 } 1112 1113 def srp_client_get_host_name(self) -> str: 1114 """Get SRP client host name.""" 1115 name = self.__parse_str(self.execute_command('srp client host name')) 1116 return name if name != '(null)' else '' 1117 1118 def srp_client_get_host_addresses(self) -> List[Ip6Addr]: 1119 """Get SRP client host addresses.""" 1120 return self.__parse_ip6addr_list(self.execute_command('srp client host address')) 1121 1122 def srp_client_set_host_addresses(self, *addrs: Union[str, ipaddress.IPv6Address]): 1123 """Set SRP client host addresses.""" 1124 self.execute_command(f'srp client host address {" ".join(map(str, addrs))}') 1125 1126 def srp_client_get_host_state(self): 1127 """Get SRP client host state.""" 1128 return self.__parse_str(self.execute_command('srp client host state')) 1129 1130 def srp_client_remove_host(self, remove_key_lease=False): 1131 """Remove SRP client host.""" 1132 cmd = 'srp client host remove' 1133 if remove_key_lease: 1134 cmd += ' 1' 1135 1136 self.execute_command(cmd) 1137 1138 def srp_client_get_services(self) -> List[Dict]: 1139 """Get SRP client services.""" 1140 output = self.execute_command('srp client service') 1141 return [self.__parse_srp_client_service(line) for line in output] 1142 1143 _SRP_CLIENT_SERVICE_PATTERN = re.compile( 1144 r'instance:"(.*)", name:"(.*)", state:(\S+), port:(\d+), priority:(\d+), weight:(\d+)') 1145 1146 def __parse_srp_client_service(self, line: str) -> Dict: 1147 # e.g. instance:"ins2", name:"_meshcop._udp", state:ToAdd, port:2000, priority:2, weight:2 1148 m = OTCI._SRP_CLIENT_SERVICE_PATTERN.match(line) 1149 if m is None: 1150 raise UnexpectedCommandOutput([line]) 1151 1152 instance, service, state, port, priority, weight = m.groups() 1153 port, priority, weight = int(port), int(priority), int(weight) 1154 return { 1155 'instance': instance, 1156 'service': service, 1157 'state': state, 1158 'port': port, 1159 'priority': priority, 1160 'weight': weight, 1161 } 1162 1163 def srp_client_add_service(self, 1164 instance: str, 1165 service: str, 1166 port: int, 1167 priority: int = 0, 1168 weight: int = 0, 1169 txt: Optional[Dict[str, Union[str, bytes, bool]]] = None): 1170 instance = self.__escape_escapable(instance) 1171 cmd = f'srp client service add {instance} {service} {port} {priority} {weight}' 1172 if txt: 1173 cmd += f' {self.__txt_to_hex(txt)}' 1174 self.execute_command(cmd) 1175 1176 def srp_client_remove_service(self, instance: str, service: str): 1177 """Remove a service from SRP client.""" 1178 self.execute_command(f'srp client service remove {instance} {service}') 1179 1180 def srp_client_clear_service(self, instance: str, service: str): 1181 """Remove a service from SRP client without notifying the SRP server.""" 1182 self.execute_command(f'srp client service clear {instance} {service}') 1183 1184 def srp_client_get_key_lease_interval(self) -> int: 1185 """Get SRP client key lease interval (in seconds).""" 1186 return self.__parse_int(self.execute_command('srp client keyleaseinterval')) 1187 1188 def srp_client_set_key_lease_interval(self, interval: int): 1189 """Set SRP client key lease interval (in seconds).""" 1190 self.execute_command(f'srp client keyleaseinterval {interval}') 1191 1192 def srp_client_get_lease_interval(self) -> int: 1193 """Get SRP client lease interval (in seconds).""" 1194 return self.__parse_int(self.execute_command('srp client leaseinterval')) 1195 1196 def srp_client_set_lease_interval(self, interval: int): 1197 """Set SRP client lease interval (in seconds).""" 1198 self.execute_command(f'srp client leaseinterval {interval}') 1199 1200 def srp_client_get_server(self) -> Tuple[Ip6Addr, int]: 1201 """Get the SRP server (IP, port).""" 1202 result = self.__parse_str(self.execute_command('srp client server')) 1203 matched = re.match(OTCI._IPV6_SERVER_PORT_PATTERN, result) 1204 assert matched 1205 ip, port = matched.groups() 1206 return Ip6Addr(ip), int(port) 1207 1208 def srp_client_get_service_key(self) -> bool: 1209 """Get SRP client "service key record inclusion" mode.""" 1210 return self.__parse_Enabled_or_Disabled(self.execute_command('srp client service key')) 1211 1212 def srp_client_enable_service_key(self): 1213 """Enable SRP client "service key record inclusion" mode.""" 1214 self.execute_command('srp client service key enable') 1215 1216 def srp_client_disable_service_key(self): 1217 """Disable SRP client "service key record inclusion" mode.""" 1218 self.execute_command('srp client service key disable') 1219 1220 def __split_table_row(self, row: str) -> List[str]: 1221 if not (row.startswith('|') and row.endswith('|')): 1222 raise ValueError(row) 1223 1224 fields = row.split('|') 1225 fields = [x.strip() for x in fields[1:-1]] 1226 return fields 1227 1228 def __get_table_col(self, colname: str, headers: List[str], fields: List[str]) -> str: 1229 return fields[headers.index(colname)] 1230 1231 def get_child_list(self) -> List[ChildId]: 1232 """Get attached Child IDs.""" 1233 line = self.__parse_str(self.execute_command(f'child list')) 1234 return [ChildId(id) for id in line.strip().split()] 1235 1236 def get_child_info(self, child: Union[ChildId, Rloc16]) -> Dict[str, Any]: 1237 output = self.execute_command(f'child {child}') 1238 1239 info = {} 1240 1241 for line in output: 1242 k, v = line.split(': ') 1243 if k == 'Child ID': 1244 info['id'] = int(v) 1245 elif k == 'Rloc': 1246 info['rloc16'] = int(v, 16) 1247 elif k == 'Ext Addr': 1248 info['extaddr'] = v 1249 elif k == 'Mode': 1250 info['mode'] = DeviceMode(v) 1251 elif k == 'Net Data': 1252 info['c_vn'] = int(v) 1253 elif k == 'Timeout': 1254 info['timeout'] = int(v) 1255 elif k == 'Age': 1256 info['age'] = int(v) 1257 elif k == 'Link Quality In': 1258 info['lq_in'] = int(v) 1259 elif k == 'RSSI': 1260 info['rssi'] = int(v) 1261 else: 1262 self.log('warning', "Child info %s: %s ignored", k, v) 1263 1264 return info 1265 1266 def get_child_ipaddrs(self) -> Dict[Rloc16, List[Ip6Addr]]: 1267 """Get the list of IP addresses stored for MTD children. 1268 1269 Note: Each MTD child might has multiple IP addresses. 1270 """ 1271 output = self.execute_command('childip') 1272 1273 ipaddrs = {} 1274 1275 for line in output: 1276 rloc16, ip = line.split(': ') 1277 rloc16 = Rloc16(rloc16, 16) 1278 ipaddrs.setdefault(rloc16, []).append(Ip6Addr(ip.strip())) 1279 1280 return ipaddrs 1281 1282 # 1283 # Child configurations 1284 # 1285 1286 def get_max_children(self) -> int: 1287 """Get the Thread maximum number of allowed children.""" 1288 return self.__parse_int(self.execute_command('childmax')) 1289 1290 def set_max_children(self, val: int): 1291 """Set the Thread maximum number of allowed children.""" 1292 self.execute_command(f'childmax {val}') 1293 1294 def get_child_ip_max(self) -> int: 1295 """Get the maximum number of IP addresses that each MTD child may register with this device as parent.""" 1296 return self.__parse_int(self.execute_command('childip max')) 1297 1298 def set_child_ip_max(self, val: int): 1299 """Get the maximum number of IP addresses that each MTD child may register with this device as parent.""" 1300 self.execute_command(f'childip max {val}') 1301 1302 def get_child_timeout(self): 1303 """Get the Thread Child Timeout value.""" 1304 return self.__parse_int(self.execute_command('childtimeout')) 1305 1306 def set_child_timeout(self, timeout): 1307 """Set the Thread Child Timeout value.""" 1308 self.execute_command('childtimeout %d' % timeout) 1309 1310 def get_child_supervision_interval(self) -> int: 1311 """Get the Child Supervision Check Timeout value.""" 1312 return self.__parse_int(self.execute_command('childsupervision interval')) 1313 1314 def set_child_supervision_interval(self, val: int): 1315 """Set the Child Supervision Interval value. 1316 This command can only be used with FTD devices. 1317 """ 1318 self.execute_command(f'childsupervision interval {val}') 1319 1320 def get_child_supervision_check_timeout(self) -> int: 1321 """Get the Child Supervision Check Timeout value.""" 1322 return self.__parse_int(self.execute_command('childsupervision checktimeout')) 1323 1324 def set_child_supervision_check_timeout(self, val: int): 1325 """Set the Child Supervision Check Timeout value.""" 1326 self.execute_command(f'childsupervision checktimeout {val}') 1327 1328 # 1329 # Neighbor management 1330 # 1331 1332 def get_neighbor_list(self) -> List[Rloc16]: 1333 """Get a list of RLOC16 of neighbors""" 1334 line = self.__parse_str(self.execute_command('neighbor list')).strip() 1335 return [Rloc16(id, 16) for id in line.split()] 1336 1337 def get_neighbor_table(self) -> Dict[Rloc16, Dict[str, Any]]: 1338 output = self.execute_command('neighbor table') 1339 if len(output) < 2: 1340 raise UnexpectedCommandOutput(output) 1341 1342 # 1343 # Example output: 1344 # 1345 # | Role | RLOC16 | Age | Avg RSSI | Last RSSI |R|D|N| Extended MAC | 1346 # +------+--------+-----+----------+-----------+-+-+-+------------------+ 1347 # | C | 0xcc01 | 96 | -46 | -46 |1|1|1| 1eb9ba8a6522636b | 1348 # | R | 0xc800 | 2 | -29 | -29 |1|1|1| 9a91556102c39ddb | 1349 # | R | 0xf000 | 3 | -28 | -28 |1|1|1| 0ad7ed6beaa6016d | 1350 # Done 1351 # 1352 1353 headers = self.__split_table_row(output[0]) 1354 1355 table = {} 1356 for line in output[2:]: 1357 line = line.strip() 1358 if not line: 1359 continue 1360 1361 fields = self.__split_table_row(line) 1362 col = lambda colname: self.__get_table_col(colname, headers, fields) 1363 1364 role = col('Role') 1365 is_router = role == 'R' 1366 r, d, n = int(col('R')), int(col('D')), int(col('N')) 1367 mode = DeviceMode(f'{"r" if r else ""}{"d" if d else ""}{"n" if n else ""}') 1368 1369 rloc16 = Rloc16(col('RLOC16'), 16) 1370 1371 table[rloc16] = { 1372 'is_router': is_router, 1373 'rloc16': rloc16, 1374 'age': int(col('Age')), 1375 'avg_rssi': int(col('Avg RSSI')), 1376 'last_rssi': int(col('Last RSSI')), 1377 'mode': mode, 1378 'extaddr': col('Extended MAC'), 1379 } 1380 1381 return table 1382 1383 # 1384 # SED/SSED configuration 1385 # 1386 1387 def get_poll_period(self) -> int: 1388 """Get the customized data poll period of sleepy end device (milliseconds). 1389 Only for Reference Device.""" 1390 return self.__parse_int(self.execute_command('pollperiod')) 1391 1392 def set_poll_period(self, poll_period: int): 1393 """Set the customized data poll period (in milliseconds) for sleepy end device. 1394 1395 Only for Reference Device.""" 1396 self.execute_command(f'pollperiod {poll_period}') 1397 1398 # TODO: csl 1399 # TODO: csl channel <channel> 1400 # TODO: csl period <period> 1401 # TODO: csl timeout <timeout> 1402 1403 _CSL_PERIOD_PATTERN = re.compile(r'(\d+)us') 1404 _CSL_TIMEOUT_PATTERN = re.compile(r'(\d+)s') 1405 1406 def get_csl_config(self) -> Dict[str, int]: 1407 """Get the CSL configuration.""" 1408 output = self.execute_command('csl') 1409 1410 cfg = {} 1411 for line in output: 1412 k, v = line.split(': ') 1413 if k == 'Channel': 1414 cfg['channel'] = int(v) 1415 elif k == 'Timeout': 1416 matched = OTCI._CSL_TIMEOUT_PATTERN.match(v) 1417 assert matched is not None 1418 cfg['timeout'] = int(matched.group(1)) 1419 elif k == 'Period': 1420 matched = OTCI._CSL_PERIOD_PATTERN.match(v) 1421 assert matched is not None 1422 cfg['period'] = int(matched.group(1)) 1423 else: 1424 logging.warning("Ignore unknown CSL parameter: %s: %s", k, v) 1425 1426 return cfg 1427 1428 def config_csl(self, channel: Optional[int] = None, period: Optional[int] = None, timeout: Optional[int] = None): 1429 """Configure CSL parameters. 1430 1431 :param channel: Set CSL channel. 1432 :param period: Set CSL period in usec. Disable CSL by setting this parameter to 0. 1433 :param timeout: Set the CSL timeout in seconds. 1434 """ 1435 1436 if channel is None and period is None and timeout is None: 1437 raise InvalidArgumentsError("Please specify at least 1 parameter to configure.") 1438 1439 if channel is not None: 1440 self.execute_command(f'csl channel {channel}') 1441 1442 if period is not None: 1443 self.execute_command(f'csl period {period}') 1444 1445 if timeout is not None: 1446 self.execute_command(f'csl timeout {timeout}') 1447 1448 # 1449 # Leader utilities 1450 # 1451 1452 def get_context_id_reuse_delay(self) -> int: 1453 """Get the CONTEXT_ID_REUSE_DELAY value.""" 1454 return self.__parse_int(self.execute_command('contextreusedelay')) 1455 1456 def set_context_id_reuse_delay(self, val: int): 1457 """Set the CONTEXT_ID_REUSE_DELAY value.""" 1458 self.execute_command(f'contextreusedelay {val}') 1459 1460 def release_router_id(self, routerid: int): 1461 """Release a Router ID that has been allocated by the device in the Leader role.""" 1462 self.execute_command(f'releaserouterid {routerid}') 1463 1464 # Time Sync utilities 1465 # TODO: networktime 1466 # TODO: networktime <timesyncperiod> <xtalthreshold> 1467 # TODO: delaytimermin 1468 # TODO: delaytimermin <delaytimermin> 1469 1470 # 1471 # Commissioniner operations 1472 # 1473 1474 def commissioner_start(self): 1475 """Start the Commissioner role.""" 1476 self.execute_command('commissioner start') 1477 1478 def commissioner_stop(self): 1479 """Stop the Commissioner role.""" 1480 self.execute_command('commissioner stop') 1481 1482 def get_commissioiner_state(self) -> str: 1483 """Get current Commissioner state (active or petitioning or disabled).""" 1484 return self.__parse_str(self.execute_command('commissioner state')) 1485 1486 def get_commissioner_session_id(self) -> int: 1487 """Get current commissioner session id.""" 1488 return self.__parse_int(self.execute_command('commissioner sessionid')) 1489 1490 def commissioner_add_joiner(self, pskd, eui64=None, discerner=None, timeout=None): 1491 """Add a Joiner entry. 1492 1493 :param pskd: Pre-Shared Key for the Joiner. 1494 :param eui64: The IEEE EUI-64 of the Joiner or '*' to match any Joiner 1495 :param discerner: The Joiner discerner in format number/length. 1496 :param timeout: Joiner timeout in seconds. 1497 """ 1498 if (eui64 is not None) == (discerner is not None): 1499 raise InvalidArgumentsError("Please specify eui64 or discerner, but not both.") 1500 1501 if eui64 is not None and eui64 != '*': 1502 self.__validate_extaddr(eui64) 1503 1504 cmd = f'commissioner joiner add {eui64 or discerner} {pskd}' 1505 1506 if timeout is not None: 1507 cmd += f' {timeout}' 1508 1509 self.execute_command(cmd) 1510 1511 def commissioner_remove_jointer(self, eui64=None, discerner=None): 1512 if (eui64 is not None) == (discerner is not None): 1513 raise InvalidArgumentsError("Please specify eui64 or discerner, but not both.") 1514 1515 if eui64 is not None and eui64 != '*': 1516 self.__validate_extaddr(eui64) 1517 1518 self.execute_command(f'commissioner joiner remove {eui64 or discerner}') 1519 1520 def set_commissioner_provisioning_url(self, url: str): 1521 self.execute_command(f'commissioner provisioningurl {url}') 1522 1523 # TODO: commissioner announce 1524 # TODO: commissioner energy 1525 # TODO: commissioner mgmtget 1526 # TODO: commissioner mgmtset 1527 # TODO: commissioner panid 1528 1529 # 1530 # Joiner operations 1531 # 1532 def joiner_start(self, psk: str, provisioning_url: Optional[str] = None): 1533 """Start the Joiner.""" 1534 cmd = f'joiner start {psk}' 1535 if provisioning_url is not None: 1536 cmd += f' {provisioning_url}' 1537 1538 self.execute_command(cmd) 1539 1540 def joiner_stop(self): 1541 """Stop the Joiner role.""" 1542 self.execute_command('joiner stop') 1543 1544 def get_joiner_id(self) -> str: 1545 """Get the Joiner ID.""" 1546 return self.__parse_joiner_id(self.execute_command('joiner id')) 1547 1548 def get_joiner_port(self) -> int: 1549 """Get the Joiner port.""" 1550 return self.__parse_int(self.execute_command(f'joinerport')) 1551 1552 def set_joiner_port(self, port: int): 1553 """Set the Joiner port.""" 1554 self.execute_command(f'joinerport {port}') 1555 1556 # TODO: joiner discerner 1557 1558 # 1559 # Network Data utilities 1560 # 1561 def get_local_prefixes(self) -> List[Tuple[Ip6Prefix, str, str, Rloc16]]: 1562 """Get prefixes from local Network Data.""" 1563 output = self.execute_command('prefix') 1564 return self.__parse_prefixes(output) 1565 1566 def __parse_prefixes(self, output: List[str]) -> List[Tuple[Ip6Prefix, str, str, Rloc16]]: 1567 prefixes = [] 1568 1569 for line in output: 1570 if line.startswith('- '): 1571 line = line[2:] 1572 1573 prefix, flags, prf, rloc16 = line.split()[:4] 1574 prefixes.append((Ip6Prefix(prefix), flags, prf, Rloc16(rloc16, 16))) 1575 1576 return prefixes 1577 1578 def add_prefix(self, prefix: str, flags='paosr', prf='med'): 1579 """Add a valid prefix to the Network Data.""" 1580 self.execute_command(f'prefix add {prefix} {flags} {prf}') 1581 1582 def remove_prefix(self, prefix: str): 1583 """Invalidate a prefix in the Network Data.""" 1584 self.execute_command(f'prefix remove {prefix}') 1585 1586 def register_network_data(self): 1587 self.execute_command('netdata register') 1588 1589 def get_network_data(self) -> Dict[str, List]: 1590 output = self.execute_command('netdata show') 1591 1592 netdata = {} 1593 if output.pop(0) != 'Prefixes:': 1594 raise UnexpectedCommandOutput(output) 1595 1596 prefixes_output = [] 1597 while True: 1598 line = output.pop(0) 1599 if line == 'Routes:': 1600 break 1601 else: 1602 prefixes_output.append(line) 1603 1604 netdata['prefixes'] = self.__parse_prefixes(prefixes_output) 1605 1606 routes_output = [] 1607 while True: 1608 line = output.pop(0) 1609 if line == 'Services:': 1610 break 1611 else: 1612 routes_output.append(line) 1613 1614 netdata['routes'] = self.__parse_routes(routes_output) 1615 1616 services_output = [] 1617 while True: 1618 line = output.pop(0) 1619 if line == 'Contexts:': 1620 break 1621 else: 1622 services_output.append(line) 1623 1624 netdata['services'] = self.__parse_services(services_output) 1625 1626 return netdata 1627 1628 def get_prefixes(self) -> List[Tuple[Ip6Prefix, str, str, Rloc16]]: 1629 """Get network prefixes from Thread Network Data.""" 1630 network_data = self.get_network_data() 1631 return network_data['prefixes'] 1632 1633 def get_routes(self) -> List[Tuple[str, bool, str, Rloc16]]: 1634 """Get routes from Thread Network Data.""" 1635 network_data = self.get_network_data() 1636 return network_data['routes'] 1637 1638 def get_services(self) -> List[Tuple[int, bytes, bytes, bool, Rloc16]]: 1639 """Get services from Thread Network Data""" 1640 network_data = self.get_network_data() 1641 return network_data['services'] 1642 1643 def __parse_services(self, output: List[str]) -> List[Tuple[int, bytes, bytes, bool, Rloc16]]: 1644 services = [] 1645 for line in output: 1646 line = line.split() 1647 1648 enterprise_number, service_data, server_data = line[:3] 1649 if line[3] == 's': 1650 stable, rloc16 = True, line[4] 1651 else: 1652 stable, rloc16 = False, line[3] 1653 1654 enterprise_number = int(enterprise_number) 1655 service_data = self.__hex_to_bytes(service_data) 1656 server_data = self.__hex_to_bytes(server_data) 1657 rloc16 = Rloc16(rloc16, 16) 1658 1659 services.append((enterprise_number, service_data, server_data, stable, rloc16)) 1660 1661 return services 1662 1663 def get_network_data_bytes(self) -> bytes: 1664 """Get the raw Network Data.""" 1665 hexstr = self.__parse_str(self.execute_command('netdata show -x')) 1666 return bytes(int(hexstr[i:i + 2], 16) for i in range(0, len(hexstr), 2)) 1667 1668 def get_local_routes(self) -> List[Tuple[str, bool, str, Rloc16]]: 1669 """Get routes from local Network Data.""" 1670 return self.__parse_routes(self.execute_command('route')) 1671 1672 def __parse_routes(self, output: List[str]) -> List[Tuple[str, bool, str, Rloc16]]: 1673 routes = [] 1674 for line in output: 1675 line = line.split() 1676 if len(line) == 4: 1677 prefix, flags, prf, rloc16 = line 1678 stable = 's' in flags 1679 else: 1680 prefix, prf, rloc16 = line 1681 stable = False 1682 1683 rloc16 = Rloc16(rloc16, 16) 1684 routes.append((prefix, stable, prf, rloc16)) 1685 1686 return routes 1687 1688 def add_route(self, prefix: str, stable=True, prf='med'): 1689 """Add a valid external route to the Network Data.""" 1690 cmd = f'route add {prefix}' 1691 if stable: 1692 cmd += ' s' 1693 1694 cmd += f' {prf}' 1695 self.execute_command(cmd) 1696 1697 def remove_route(self, prefix: str): 1698 """Invalidate a external route in the Network Data.""" 1699 self.execute_command(f'route remove {prefix}') 1700 1701 def add_service(self, enterprise_number: int, service_data: Union[str, bytes], server_data: Union[str, bytes]): 1702 """Add service to the Network Data. 1703 1704 enterpriseNumber: IANA enterprise number 1705 serviceData: hex-encoded binary service data 1706 serverData: hex-encoded binary server data 1707 """ 1708 service_data = self.__validate_hex_or_bytes(service_data) 1709 server_data = self.__validate_hex_or_bytes(server_data) 1710 self.execute_command(f'service add {enterprise_number} {service_data} {server_data}') 1711 1712 def remove_service(self, enterprise_number, service_data): 1713 """Remove service from Network Data. 1714 1715 enterpriseNumber: IANA enterprise number 1716 serviceData: hext-encoded binary service data 1717 """ 1718 service_data = self.__validate_hex_or_bytes(service_data) 1719 self.execute_command(f'service remove {enterprise_number} {service_data}') 1720 1721 # 1722 # Dataset management 1723 # 1724 1725 def dataset_init_buffer(self, get_active_dataset=False, get_pending_dataset=False): 1726 """Initialize operational dataset buffer.""" 1727 if get_active_dataset and get_pending_dataset: 1728 raise InvalidArgumentsError("Can not specify both `get_active_dataset` and `get_pending_dataset`.") 1729 1730 if get_active_dataset: 1731 self.execute_command(f'dataset init active') 1732 elif get_pending_dataset: 1733 self.execute_command(f'dataset init pending') 1734 else: 1735 self.execute_command(f'dataset init new') 1736 1737 def dataset_commit_buffer(self, dataset: str): 1738 if dataset in ('active', 'pending'): 1739 cmd = f'dataset commit {dataset}' 1740 else: 1741 raise InvalidArgumentsError(f'Unkonwn dataset: {dataset}') 1742 1743 self.execute_command(cmd) 1744 1745 def dataset_clear_buffer(self): 1746 """Reset operational dataset buffer.""" 1747 self.execute_command('dataset clear') 1748 1749 def get_dataset(self, dataset: str = 'buffer'): 1750 if dataset in ('active', 'pending'): 1751 cmd = f'dataset {dataset}' 1752 elif dataset == 'buffer': 1753 cmd = 'dataset' 1754 else: 1755 raise InvalidArgumentsError(f'Unkonwn dataset: {dataset}') 1756 1757 output = self.execute_command(cmd) 1758 return self.__parse_dataset(output) 1759 1760 def __parse_dataset(self, output: List[str]) -> Dict[str, Any]: 1761 # Example output: 1762 # 1763 # Active Timestamp: 1 1764 # Channel: 22 1765 # Channel Mask: 0x07fff800 1766 # Ext PAN ID: 5c93ae980ff22d35 1767 # Mesh Local Prefix: fdc7:55fe:6363:bd01::/64 1768 # Network Key: d1a8348d59fb1fac1d6c4f95007d487a 1769 # Network Name: OpenThread-7caa 1770 # PAN ID: 0x7caa 1771 # PSKc: 167d89fd169e439ca0b8266de248090f 1772 # Security Policy: 672 onrc 0 1773 1774 dataset = {} 1775 1776 for line in output: 1777 line = line.split(': ') 1778 key, val = line[0], ': '.join(line[1:]) 1779 1780 if key == 'Active Timestamp': 1781 dataset['active_timestamp'] = int(val) 1782 elif key == 'Channel': 1783 dataset['channel'] = int(val) 1784 elif key == 'Channel Mask': 1785 dataset['channel_mask'] = int(val, 16) 1786 elif key == 'Ext PAN ID': 1787 dataset['extpanid'] = val 1788 elif key == 'Mesh Local Prefix': 1789 dataset['mesh_local_prefix'] = val 1790 elif key in ('Network Key', 'Master Key'): 1791 dataset['networkkey'] = val 1792 elif key == 'Network Name': 1793 dataset['network_name'] = val 1794 elif key == 'PAN ID': 1795 dataset['panid'] = int(val, 16) 1796 elif key == 'PSKc': 1797 dataset['pskc'] = val 1798 elif key == 'Security Policy': 1799 rotation_time, flags, version_threshold = val.split(' ') 1800 rotation_time = int(rotation_time) 1801 dataset['security_policy'] = SecurityPolicy(rotation_time, flags) 1802 else: 1803 raise UnexpectedCommandOutput(output) 1804 1805 return dataset 1806 1807 def get_dataset_bytes(self, dataset: str) -> bytes: 1808 if dataset in ('active', 'pending'): 1809 cmd = f'dataset {dataset} -x' 1810 else: 1811 raise InvalidArgumentsError(f'Unkonwn dataset: {dataset}') 1812 1813 hexstr = self.__parse_str(self.execute_command(cmd)) 1814 return self.__hex_to_bytes(hexstr) 1815 1816 def set_dataset_bytes(self, dataset: str, data: bytes) -> None: 1817 if dataset in ('active', 'pending'): 1818 cmd = f'dataset set {dataset} {self.__bytes_to_hex(data)}' 1819 else: 1820 raise InvalidArgumentsError(f'Unkonwn dataset: {dataset}') 1821 1822 self.execute_command(cmd) 1823 1824 def get_dataset_tlvs_bytes(self) -> bytes: 1825 """Gets bytes of the Operational Dataset TLVs""" 1826 hexstr = self.__parse_str(self.execute_command('dataset tlvs')) 1827 return self.__hex_to_bytes(hexstr) 1828 1829 def dataset_set_buffer(self, 1830 active_timestamp: Optional[int] = None, 1831 channel: Optional[int] = None, 1832 channel_mask: Optional[int] = None, 1833 extpanid: Optional[str] = None, 1834 mesh_local_prefix: Optional[str] = None, 1835 network_key: Optional[str] = None, 1836 network_name: Optional[str] = None, 1837 panid: Optional[int] = None, 1838 pskc: Optional[str] = None, 1839 security_policy: Optional[tuple] = None, 1840 pending_timestamp: Optional[int] = None): 1841 if active_timestamp is not None: 1842 self.execute_command(f'dataset activetimestamp {active_timestamp}') 1843 1844 if channel is not None: 1845 self.execute_command(f'dataset channel {channel}') 1846 1847 if channel_mask is not None: 1848 self.execute_command(f'dataset channelmask {channel_mask}') 1849 1850 if extpanid is not None: 1851 self.execute_command(f'dataset extpanid {extpanid}') 1852 1853 if mesh_local_prefix is not None: 1854 self.execute_command(f'dataset meshlocalprefix {mesh_local_prefix}') 1855 1856 if network_key is not None: 1857 nwk_cmd = self.__detect_networkkey_cmd() 1858 self.execute_command(f'dataset {nwk_cmd} {network_key}') 1859 1860 if network_name is not None: 1861 self.execute_command(f'dataset networkname {self.__escape_escapable(network_name)}') 1862 1863 if panid is not None: 1864 self.execute_command(f'dataset panid {panid}') 1865 1866 if pskc is not None: 1867 self.execute_command(f'dataset pskc {pskc}') 1868 1869 if security_policy is not None: 1870 rotation_time, flags = security_policy 1871 self.execute_command(f'dataset securitypolicy {rotation_time} {flags}') 1872 1873 if pending_timestamp is not None: 1874 self.execute_command(f'dataset pendingtimestamp {pending_timestamp}') 1875 1876 # TODO: dataset mgmtgetcommand 1877 # TODO: dataset mgmtsetcommand 1878 # TODO: dataset set <active|pending> <dataset> 1879 1880 # 1881 # Allowlist management 1882 # 1883 1884 def enable_allowlist(self): 1885 self.execute_command(f'macfilter addr {self.__detect_allowlist_cmd()}') 1886 1887 def disable_allowlist(self): 1888 self.execute_command('macfilter addr disable') 1889 1890 def add_allowlist(self, addr: str, rssi: Optional[int] = None): 1891 cmd = f'macfilter addr add {addr}' 1892 1893 if rssi is not None: 1894 cmd += f' {rssi}' 1895 1896 self.execute_command(cmd) 1897 1898 def remove_allowlist(self, addr: str): 1899 self.execute_command(f'macfilter addr remove {addr}') 1900 1901 def clear_allowlist(self): 1902 self.execute_command('macfilter addr clear') 1903 1904 def set_allowlist(self, allowlist: Collection[Union[str, Tuple[str, int]]]): 1905 self.clear_allowlist() 1906 1907 if allowlist is None: 1908 self.disable_allowlist() 1909 else: 1910 self.enable_allowlist() 1911 for item in allowlist: 1912 if isinstance(item, str): 1913 self.add_allowlist(item) 1914 else: 1915 addr, rssi = item[0], item[1] 1916 self.add_allowlist(addr, rssi) 1917 1918 # TODO: denylist 1919 # TODO: macfilter rss 1920 # TODO: macfilter rss add <extaddr> <rss> 1921 # TODO: macfilter rss add-lqi <extaddr> <lqi> 1922 # TODO: macfilter rss remove <extaddr> 1923 # TODO: macfilter rss clear 1924 1925 def __detect_allowlist_cmd(self): 1926 if self.api_version >= 28: 1927 return 'allowlist' 1928 else: 1929 return '\x77\x68\x69\x74\x65\x6c\x69\x73\x74' 1930 1931 def __detect_networkkey_cmd(self) -> str: 1932 return 'networkkey' if self.api_version >= 126 else 'masterkey' 1933 1934 # 1935 # Unicast Addresses management 1936 # 1937 def add_ipaddr(self, ip: Union[str, ipaddress.IPv6Address]): 1938 """Add an IPv6 address to the Thread interface.""" 1939 self.execute_command(f'ipaddr add {ip}') 1940 1941 def del_ipaddr(self, ip: Union[str, ipaddress.IPv6Address]): 1942 """Delete an IPv6 address from the Thread interface.""" 1943 self.execute_command(f'ipaddr del {ip}') 1944 1945 def get_ipaddrs(self) -> Tuple[Ip6Addr]: 1946 """Get all IPv6 addresses assigned to the Thread interface.""" 1947 return tuple(map(Ip6Addr, self.execute_command('ipaddr'))) 1948 1949 def has_ipaddr(self, ip: Union[str, ipaddress.IPv6Address]): 1950 """Check if a IPv6 address was added to the Thread interface.""" 1951 return ip in self.get_ipaddrs() 1952 1953 def get_ipaddr_mleid(self) -> Ip6Addr: 1954 """Get Thread Mesh Local EID address.""" 1955 return self.__parse_ip6addr(self.execute_command('ipaddr mleid')) 1956 1957 def get_ipaddr_linklocal(self) -> Ip6Addr: 1958 """Get Thread link-local IPv6 address.""" 1959 return self.__parse_ip6addr(self.execute_command('ipaddr linklocal')) 1960 1961 def get_ipaddr_rloc(self) -> Ip6Addr: 1962 """Get Thread Routing Locator (RLOC) address.""" 1963 return self.__parse_ip6addr(self.execute_command('ipaddr rloc')) 1964 1965 # 1966 # Multicast Addresses management 1967 # 1968 1969 def add_ipmaddr(self, ip: Union[str, ipaddress.IPv6Address]): 1970 """Subscribe the Thread interface to the IPv6 multicast address.""" 1971 self.execute_command(f'ipmaddr add {ip}') 1972 1973 def del_ipmaddr(self, ip: Union[str, ipaddress.IPv6Address]): 1974 """Unsubscribe the Thread interface to the IPv6 multicast address.""" 1975 self.execute_command(f'ipmaddr del {ip}') 1976 1977 def get_ipmaddrs(self) -> Tuple[Ip6Addr]: 1978 """Get all IPv6 multicast addresses subscribed to the Thread interface.""" 1979 return tuple(map(Ip6Addr, self.execute_command('ipmaddr'))) 1980 1981 def has_ipmaddr(self, ip: Union[str, ipaddress.IPv6Address]): 1982 """Check if a IPv6 multicast address was subscribed by the Thread interface.""" 1983 return ip in self.get_ipmaddrs() 1984 1985 def get_ipmaddr_llatn(self) -> Ip6Addr: 1986 """Get Link Local All Thread Nodes Multicast Address""" 1987 return self.__parse_ip6addr(self.execute_command('ipmaddr llatn')) 1988 1989 def get_ipmaddr_rlatn(self) -> Ip6Addr: 1990 """Get Realm Local All Thread Nodes Multicast Address""" 1991 return self.__parse_ip6addr(self.execute_command('ipmaddr rlatn')) 1992 1993 # 1994 # Backbone Router Utilities 1995 # 1996 1997 # TODO: bbr mgmt ... 1998 1999 def enable_backbone_router(self): 2000 """Enable Backbone Router Service for Thread 1.2 FTD. 2001 2002 SRV_DATA.ntf would be triggered for attached device if there is no Backbone Router Service in Thread Network Data. 2003 """ 2004 self.execute_command('bbr enable') 2005 2006 def disable_backbone_router(self): 2007 """Disable Backbone Router Service for Thread 1.2 FTD. 2008 2009 SRV_DATA.ntf would be triggered if Backbone Router is Primary state. 2010 """ 2011 self.execute_command('bbr disable') 2012 2013 def get_backbone_router_state(self) -> str: 2014 """Get local Backbone state (Disabled or Primary or Secondary) for Thread 1.2 FTD.""" 2015 return self.__parse_str(self.execute_command('bbr state')) 2016 2017 def get_primary_backbone_router_info(self) -> Optional[dict]: 2018 """Show current Primary Backbone Router information for Thread 1.2 device.""" 2019 output = self.execute_command('bbr') 2020 2021 if len(output) < 1: 2022 raise UnexpectedCommandOutput(output) 2023 2024 line = output[0] 2025 if line == 'BBR Primary: None': 2026 return None 2027 2028 if line != 'BBR Primary:': 2029 raise UnexpectedCommandOutput(output) 2030 2031 # Example output: 2032 # BBR Primary: 2033 # server16: 0xE400 2034 # seqno: 10 2035 # delay: 120 secs 2036 # timeout: 300 secs 2037 2038 dataset = {} 2039 2040 for line in output[1:]: 2041 key, val = line.split(':') 2042 key, val = key.strip(), val.strip() 2043 if key == 'server16': 2044 dataset[key] = int(val, 16) 2045 elif key == 'seqno': 2046 dataset[key] = int(val) 2047 elif key == 'delay': 2048 if not val.endswith(' secs'): 2049 raise UnexpectedCommandOutput(output) 2050 dataset[key] = int(val.split()[0]) 2051 elif key == 'timeout': 2052 if not val.endswith(' secs'): 2053 raise UnexpectedCommandOutput(output) 2054 dataset[key] = int(val.split()[0]) 2055 else: 2056 raise UnexpectedCommandOutput(output) 2057 2058 return dataset 2059 2060 def register_backbone_router_dataset(self): 2061 """Register Backbone Router Service for Thread 1.2 FTD. 2062 2063 SRV_DATA.ntf would be triggered for attached device. 2064 """ 2065 self.execute_command('bbr register') 2066 2067 def get_backbone_router_config(self) -> dict: 2068 """Show local Backbone Router configuration for Thread 1.2 FTD.""" 2069 output = self.execute_command('bbr config') 2070 # Example output: 2071 # seqno: 10 2072 # delay: 120 secs 2073 # timeout: 300 secs 2074 2075 config = {} 2076 2077 for line in output: 2078 key, val = line.split(':') 2079 key, val = key.strip(), val.strip() 2080 if key == 'seqno': 2081 config[key] = int(val) 2082 elif key in ('delay', 'timeout'): 2083 if not line.endswith(' secs'): 2084 raise UnexpectedCommandOutput(output) 2085 config[key] = int(val.split()[0]) 2086 else: 2087 raise UnexpectedCommandOutput(output) 2088 2089 return config 2090 2091 def set_backbone_router_config(self, 2092 seqno: Optional[int] = None, 2093 delay: Optional[int] = None, 2094 timeout: Optional[int] = None): 2095 """Configure local Backbone Router configuration for Thread 1.2 FTD. 2096 2097 Call register_backbone_router_dataset() to explicitly register Backbone Router service to Leader for Secondary Backbone Router. 2098 """ 2099 if seqno is None and delay is None and timeout is None: 2100 raise InvalidArgumentsError("Please specify seqno or delay or timeout") 2101 2102 cmd = 'bbr config' 2103 if seqno is not None: 2104 cmd += f' seqno {seqno}' 2105 2106 if delay is not None: 2107 cmd += f' delay {delay}' 2108 2109 if timeout is not None: 2110 cmd += f' timeout {timeout}' 2111 2112 self.execute_command(cmd) 2113 2114 def get_backbone_router_jitter(self) -> int: 2115 """Get jitter (in seconds) for Backbone Router registration for Thread 1.2 FTD.""" 2116 return self.__parse_int(self.execute_command('bbr jitter')) 2117 2118 def set_backbone_router_jitter(self, val: int): 2119 """Set jitter (in seconds) for Backbone Router registration for Thread 1.2 FTD.""" 2120 self.execute_command(f'bbr jitter {val}') 2121 2122 def backbone_router_get_multicast_listeners(self) -> List[Tuple[Ip6Addr, int]]: 2123 """Get Backbone Router Multicast Listeners.""" 2124 listeners = [] 2125 for line in self.execute_command('bbr mgmt mlr listener'): 2126 ip, timeout = line.split() 2127 listeners.append((Ip6Addr(ip), int(timeout))) 2128 2129 return listeners 2130 2131 # 2132 # Thread 1.2 and DUA/MLR utilities 2133 # 2134 2135 def get_domain_name(self) -> str: 2136 """Get the Thread Domain Name for Thread 1.2 device.""" 2137 return self.__parse_str(self.execute_command('domainname')) 2138 2139 def set_domain_name(self, name: str): 2140 """Set the Thread Domain Name for Thread 1.2 device.""" 2141 self.execute_command('domainname %s' % self.__escape_escapable(name)) 2142 2143 # TODO: dua iid 2144 # TODO: dua iid <iid> 2145 # TODO: dua iid clear 2146 # TODO: mlr reg <ipaddr> ... [timeout] 2147 2148 # 2149 # Link metrics management 2150 # 2151 2152 def linkmetrics_config_enhanced_ack_clear(self, peer_addr: Union[str, Ip6Addr]) -> bool: 2153 output = self.execute_command(f'linkmetrics config {peer_addr} enhanced-ack clear') 2154 return self.__parse_linkmetrics_mgmt_response(peer_addr, output) 2155 2156 def linkmetrics_config_enhanced_ack_register(self, 2157 peer_addr: Union[str, Ip6Addr], 2158 link_metrics_flags: str, 2159 reference: bool = False) -> bool: 2160 if self.__valid_flags(link_metrics_flags, 'qmr') is False: 2161 raise ValueError(link_metrics_flags) 2162 2163 output = self.execute_command( 2164 f'linkmetrics config {peer_addr} enhanced-ack register {link_metrics_flags} {"r" if reference else ""}') 2165 return self.__parse_linkmetrics_mgmt_response(peer_addr, output) 2166 2167 def linkmetrics_config_forward(self, peer_addr: Union[str, Ip6Addr], seriesid: int, series_flags: str, 2168 link_metrics_flags: str) -> bool: 2169 if self.__valid_flags(series_flags, 'ldraX') is False: 2170 raise ValueError(series_flags) 2171 2172 if self.__valid_flags(link_metrics_flags, 'pqmr') is False: 2173 raise ValueError(link_metrics_flags) 2174 2175 output = self.execute_command( 2176 f'linkmetrics config {peer_addr} forward {seriesid} {series_flags} {link_metrics_flags}') 2177 return self.__parse_linkmetrics_mgmt_response(peer_addr, output) 2178 2179 def linkmetrics_probe(self, peer_addr: Union[str, Ip6Addr], seriesid: int, length: int): 2180 if length < 0 or length > 64: 2181 raise ValueError(length) 2182 2183 self.execute_command(f'linkmetrics probe {peer_addr} {seriesid} {length}') 2184 2185 def linkmetrics_request_single(self, peer_addr: Union[str, Ip6Addr], link_metrics_flags: str) -> Dict[str, int]: 2186 if self.__valid_flags(link_metrics_flags, 'pqmr') is False: 2187 raise ValueError(link_metrics_flags) 2188 2189 output = self.execute_command(f'linkmetrics request {peer_addr} single {link_metrics_flags}') 2190 return self.__parse_linkmetrics_report(peer_addr, output) 2191 2192 def linkmetrics_request_forward(self, peer_addr: Union[str, Ip6Addr], seriesid: int) -> Dict[str, int]: 2193 output = self.execute_command(f'linkmetrics request {peer_addr} forward {seriesid}') 2194 return self.__parse_linkmetrics_report(peer_addr, output) 2195 2196 def __parse_linkmetrics_mgmt_response(self, peer_addr: Union[str, Ip6Addr], output: List[str]) -> bool: 2197 # 2198 # Example output: 2199 # 2200 # Received Link Metrics Management Response from: fe80:0:0:0:3092:f334:1455:1ad2 2201 # Status: Success 2202 # Done 2203 # 2204 2205 status = '' 2206 report_received = False 2207 ret = False 2208 2209 for line in output: 2210 if 'Received Link Metrics Management Response from' in line: 2211 address = line.split(': ')[1].strip() 2212 report_received = address == peer_addr 2213 elif 'Status' in line: 2214 status = line.split(':')[1].strip() 2215 2216 return report_received and status == 'Success' 2217 2218 def __parse_linkmetrics_report(self, peer_addr: Union[str, Ip6Addr], output: List[str]) -> Dict[str, int]: 2219 # 2220 # Example output: 2221 # 2222 # Received Link Metrics Report from: fe80:0:0:0:3092:f334:1455:1ad2 2223 # 2224 # - PDU Counter: 2 (Count/Summation) 2225 # - LQI: 76 (Exponential Moving Average) 2226 # - Margin: 82 (dB) (Exponential Moving Average) 2227 # - RSSI: -18 (dBm) (Exponential Moving Average) 2228 # Done 2229 # 2230 2231 results = {} 2232 report_received = False 2233 2234 for line in output: 2235 if 'Received Link Metrics Report' in line: 2236 address = line.split(': ')[1].strip() 2237 report_received = address == peer_addr 2238 elif 'Received Link Metrics data in Enh Ack from neighbor' in line: 2239 # If the Enhanced-ACK Based Probing is enabled, the CLI will output the following 2240 # link metrics info after executing the `linkmetrics request` command. This case is 2241 # used to skip these Enhanced-ACK related link metrics info. 2242 # 2243 # Received Link Metrics data in Enh Ack from neighbor, short address:0x3400 , extended address:c6a24d6514cf9178 2244 # - LQI: 224 (Exponential Moving Average) 2245 # - Margin: 0 (dB) (Exponential Moving Average) 2246 # 2247 # Received Link Metrics Report from: fe80:0:0:0:3092:f334:1455:1ad2 2248 # 2249 # - PDU Counter: 2 (Count/Summation) 2250 # - LQI: 76 (Exponential Moving Average) 2251 # - Margin: 82 (dB) (Exponential Moving Average) 2252 # - RSSI: -18 (dBm) (Exponential Moving Average) 2253 # Done 2254 # 2255 report_received = False 2256 2257 if not report_received: 2258 continue 2259 2260 if '- LQI' in line: 2261 results['lqi'] = self.__parse_numbers(line)[0] 2262 elif '- Margin' in line: 2263 results['margin'] = self.__parse_numbers(line)[0] 2264 elif '- RSSI' in line: 2265 results['rssi'] = self.__parse_numbers(line)[0] 2266 elif '- PDU Counter' in line: 2267 results['pdu_counter'] = self.__parse_numbers(line)[0] 2268 2269 return results 2270 2271 def __parse_numbers(self, line: str) -> List[int]: 2272 values = re.findall("\-?\d+", line) 2273 return list(map(int, values)) 2274 2275 def __valid_flags(self, flags: str, flags_set: str): 2276 # check for duplicate chars 2277 if len(flags) != len(set(flags)): 2278 return False 2279 2280 return set(flags).issubset(set(flags_set)) 2281 2282 # 2283 # Logging 2284 # 2285 2286 def get_log_level(self) -> int: 2287 """Get the log level.""" 2288 return self.__parse_int(self.execute_command('log level')) 2289 2290 def set_log_level(self, level: int): 2291 """Set the log level.""" 2292 self.execute_command(f'log level {level}') 2293 2294 # 2295 # Device performance related information 2296 # 2297 2298 def get_message_buffer_info(self) -> dict: 2299 """Get the current message buffer information.""" 2300 output = self.execute_command('bufferinfo') 2301 2302 info = {} 2303 2304 def _parse_val(val): 2305 vals = val.split() 2306 return int(vals[0]) if len(vals) == 1 else tuple(map(int, vals)) 2307 2308 for line in output: 2309 key, val = line.split(':') 2310 key, val = key.strip(), val.strip() 2311 info[key.replace(' ', '_')] = _parse_val(val) 2312 2313 return info 2314 2315 @constant_property 2316 def counter_names(self): 2317 """Get the supported counter names.""" 2318 return tuple(self.execute_command('counters')) 2319 2320 def get_counter(self, name: str) -> Counter: 2321 """Reset the counter value.""" 2322 output = self.execute_command(f'counters {name}') 2323 2324 counter = Counter() 2325 for line in output: 2326 k, v = line.strip().split(': ') 2327 counter[k] = int(v) 2328 2329 return counter 2330 2331 def reset_counter(self, name: str): 2332 """Reset the counter value.""" 2333 self.execute_command(f'counters {name} reset') 2334 2335 def get_eidcache(self) -> Dict[Ip6Addr, Rloc16]: 2336 """Get the EID-to-RLOC cache entries.""" 2337 output = self.execute_command('eidcache') 2338 cache = {} 2339 2340 for line in output: 2341 ip, rloc16, _ = line.split(" ", 2) 2342 2343 cache[Ip6Addr(ip)] = Rloc16(rloc16, 16) 2344 2345 return cache 2346 2347 # 2348 # UDP utilities 2349 # 2350 2351 def udp_open(self): 2352 """Opens the example socket.""" 2353 self.execute_command('udp open') 2354 2355 def udp_close(self): 2356 """Opens the example socket.""" 2357 self.execute_command('udp close') 2358 2359 def udp_bind(self, ip: str, port: int, netif: NetifIdentifier = NetifIdentifier.THERAD): 2360 """Assigns a name (i.e. IPv6 address and port) to the example socket. 2361 2362 :param ip: the IPv6 address or the unspecified IPv6 address (::). 2363 :param port: the UDP port 2364 """ 2365 bindarg = '' 2366 if netif == NetifIdentifier.UNSPECIFIED: 2367 bindarg += ' -u' 2368 elif netif == NetifIdentifier.BACKBONE: 2369 bindarg += ' -b' 2370 2371 self.execute_command(f'udp bind{bindarg} {ip} {port}') 2372 2373 def udp_connect(self, ip: str, port: int): 2374 """Specifies the peer with which the socket is to be associated. 2375 2376 ip: the peer's IPv6 address. 2377 port: the peer's UDP port. 2378 """ 2379 self.execute_command(f'udp connect {ip} {port}') 2380 2381 def udp_send(self, 2382 ip: Optional[Union[str, Ip6Addr]] = None, 2383 port: Optional[int] = None, 2384 text: Optional[str] = None, 2385 random_bytes: Optional[int] = None, 2386 hex: Optional[str] = None): 2387 """Send a few bytes over UDP. 2388 2389 ip: the IPv6 destination address. 2390 port: the UDP destination port. 2391 type: the type of the message: _ -t: text payload in the value, same as without specifying the type. _ -s: autogenerated payload with specified length indicated in the value. 2392 * -x: binary data in hexadecimal representation in the value. 2393 """ 2394 if (ip is None) != (port is None): 2395 raise InvalidArgumentsError("Please specify both `ip` and `port`.") 2396 2397 if (text is not None) + (random_bytes is not None) + (hex is not None) != 1: 2398 raise InvalidArgumentsError("Please specify `text` or `random_bytes` or `hex`.") 2399 2400 cmd = 'udp send' 2401 2402 if ip is not None: 2403 cmd += f' {ip} {port}' 2404 2405 if text is not None: 2406 cmd += f' -t {text}' 2407 elif random_bytes is not None: 2408 cmd += f' -s {random_bytes}' 2409 elif hex is not None: 2410 self.__validate_hex(hex) 2411 cmd += f' -x {hex}' 2412 2413 self.execute_command(cmd) 2414 2415 def udp_get_link_security(self) -> bool: 2416 """Gets whether the link security is enabled or disabled.""" 2417 return self.__parse_Enabled_or_Disabled(self.execute_command('udp linksecurity')) 2418 2419 def udp_enable_link_security(self): 2420 """Enable link security.""" 2421 self.execute_command('udp linksecurity enable') 2422 2423 def udp_disable_link_security(self): 2424 """Disable link security.""" 2425 self.execute_command('udp linksecurity disable') 2426 2427 def netstat(self) -> List[Tuple[Tuple[Ip6Addr, int], Tuple[Ip6Addr, int]]]: 2428 cmd = 'netstat' 2429 output = self.execute_command(cmd) 2430 if len(output) < 2: 2431 raise UnexpectedCommandOutput(output) 2432 2433 socks = [] 2434 for line in output[2:]: 2435 _, sock_addr, peer_addr = line.strip().split('|')[:3] 2436 sock_addr = self.__parse_socket_addr(sock_addr.strip()) 2437 peer_addr = self.__parse_socket_addr(peer_addr.strip()) 2438 socks.append((sock_addr, peer_addr)) 2439 2440 return socks 2441 2442 @staticmethod 2443 def __parse_socket_addr(addr: str) -> Tuple[Ip6Addr, int]: 2444 addr, port = addr.rsplit(':', 1) 2445 if addr.startswith('[') and addr.endswith(']'): 2446 addr = addr[1:-1] 2447 2448 return Ip6Addr(addr), int(port) if port != '*' else 0 2449 2450 # 2451 # CoAP CLI (test) utilities 2452 # 2453 def coap_start(self): 2454 """Starts the application coap service.""" 2455 self.execute_command('coap start') 2456 2457 def coap_stop(self): 2458 """Stops the application coap service.""" 2459 self.execute_command('coap stop') 2460 2461 def coap_get(self, addr: Union[str, Ip6Addr], uri_path: str, type: str = "con"): 2462 cmd = f'coap get {addr} {uri_path} {type}' 2463 self.execute_command(cmd) 2464 2465 def coap_put(self, addr: Union[str, Ip6Addr], uri_path: str, type: str = "con", payload: Optional[str] = None): 2466 cmd = f'coap put {addr} {uri_path} {type}' 2467 2468 if payload is not None: 2469 cmd += f' {payload}' 2470 2471 self.execute_command(cmd) 2472 2473 def coap_post(self, addr: Union[str, Ip6Addr], uri_path: str, type: str = "con", payload: Optional[str] = None): 2474 cmd = f'coap post {addr} {uri_path} {type}' 2475 2476 if payload is not None: 2477 cmd += f' {payload}' 2478 2479 self.execute_command(cmd) 2480 2481 def coap_delete(self, addr: Union[str, Ip6Addr], uri_path: str, type: str = "con", payload: Optional[str] = None): 2482 cmd = f'coap delete {addr} {uri_path} {type}' 2483 2484 if payload is not None: 2485 cmd += f' {payload}' 2486 2487 self.execute_command(cmd) 2488 2489 def coap_get_test_resource_path(self) -> str: 2490 """Gets the URI path for the test resource.""" 2491 return self.__parse_str(self.execute_command('coap resource')) 2492 2493 def coap_set_test_resource_path(self, path: str): 2494 """Sets the URI path for the test resource.""" 2495 self.execute_command(f'coap resource {path}') 2496 2497 def coap_test_set_resource_content(self, content: str): 2498 """Sets the content sent by the test resource. If a CoAP client is observing the resource, a notification is sent to that client.""" 2499 self.execute_command(f'coap set {content}') 2500 2501 # TODO: coap observe <address> <uri-path> [type] 2502 # TODO: coap cancel 2503 # TODO: coap parameters <type> ["default"|<ack_timeout> <ack_random_factor_numerator> <ack_random_factor_denominator> <max_retransmit>] 2504 # TODO: CoAP Secure utilities 2505 2506 # 2507 # Diag Utilities 2508 # 2509 def diag_start(self): 2510 """Start diagnostics mode.""" 2511 self.execute_command('diag start') 2512 2513 def diag_stop(self): 2514 """Stop diagnostics mode.""" 2515 self.execute_command('diag stop') 2516 2517 def diag_set_channel(self, channel: int): 2518 """Set the IEEE 802.15.4 Channel value for diagnostics module.""" 2519 self.execute_command(f'diag channel {channel}') 2520 2521 def diag_get_channel(self) -> int: 2522 """Get the IEEE 802.15.4 Channel value for diagnostics module.""" 2523 line = self.__parse_str(self.execute_command('diag channel')) 2524 return int(line.split()[1]) 2525 2526 def diag_set_power(self, power: int): 2527 """Set the tx power value(dBm) for diagnostics module.""" 2528 self.execute_command(f'diag power {power}') 2529 2530 def diag_get_power(self) -> int: 2531 """Get the tx power value(dBm) for diagnostics module.""" 2532 line = self.__parse_str(self.execute_command('diag power')) 2533 if not line.endswith(' dBm'): 2534 raise UnexpectedCommandOutput([line]) 2535 2536 return int(line.split()[2]) 2537 2538 def diag_cw_start(self): 2539 """Start transmitting continuous carrier wave.""" 2540 self.execute_command('diag cw start') 2541 2542 def diag_cw_stop(self): 2543 """Stop transmitting continuous carrier wave.""" 2544 self.execute_command('diag cw stop') 2545 2546 def diag_frame(self, frame: str): 2547 """Set the frame (hex encoded) to be used by `diag send` and `diag repeat`.""" 2548 self.execute_command(f'diag frame {frame}') 2549 2550 def diag_stream_start(self): 2551 """Start transmitting a stream of characters.""" 2552 self.execute_command('diag stream start') 2553 2554 def diag_stream_stop(self): 2555 """Stop transmitting a stream of characters.""" 2556 self.execute_command('diag stream stop') 2557 2558 def diag_send(self, packets: int, length: Optional[int] = None): 2559 """Transmit a fixed number of packets.""" 2560 if length is None: 2561 command = f'diag send {packets}' 2562 else: 2563 command = f'diag send {packets} {length}' 2564 self.execute_command(command) 2565 2566 def diag_repeat(self, delay: int, length: Optional[int] = None): 2567 """Transmit packets repeatedly with a fixed interval.""" 2568 if length is None: 2569 command = f'diag repeat {delay}' 2570 else: 2571 command = f'diag repeat {delay} {length}' 2572 self.execute_command(command) 2573 2574 def diag_repeat_stop(self): 2575 """Stop repeated packet transmission.""" 2576 self.execute_command('diag repeat stop') 2577 2578 def diag_radio_sleep(self): 2579 """Enter radio sleep mode.""" 2580 self.execute_command('diag radio sleep') 2581 2582 def diag_radio_receive(self): 2583 """Set radio to receive mode.""" 2584 self.execute_command('diag radio receive') 2585 2586 def diag_get_radio_state(self) -> str: 2587 """Get the state of the radio.""" 2588 return self.__parse_str(self.execute_command('diag radio state')) 2589 2590 def diag_get_stats(self) -> Dict[str, int]: 2591 """Get statistics during diagnostics mode.""" 2592 output = self.execute_command('diag stats') 2593 if len(output) < 4: 2594 raise UnexpectedCommandOutput(output) 2595 2596 result = {} 2597 2598 result['received_packets'] = int(output[0].split(":")[1]) 2599 result['sent_packets'] = int(output[1].split(":")[1]) 2600 2601 values = re.findall("\-?\d+", output[2]) 2602 result['first_received_packet_rssi'] = int(values[0]) 2603 result['first_received_packet_lqi'] = int(values[1]) 2604 2605 values = re.findall("\-?\d+", output[3]) 2606 result['last_received_packet_rssi'] = int(values[0]) 2607 result['last_received_packet_lqi'] = int(values[1]) 2608 2609 return result 2610 2611 def diag_stats_clear(self): 2612 """Clear statistics during diagnostics mode.""" 2613 self.execute_command('diag stats clear') 2614 2615 def diag_set_gpio_value(self, gpio: int, value: int): 2616 """Set the gpio value.""" 2617 self.execute_command(f'diag gpio set {gpio} {value}') 2618 2619 def diag_get_gpio_value(self, gpio: int) -> int: 2620 """Get the gpio value.""" 2621 return int(self.__parse_str(self.execute_command(f'diag gpio get {gpio}'))) 2622 2623 def diag_set_gpio_mode(self, gpio: int, mode: str): 2624 """Set the gpio mode.""" 2625 self.execute_command(f'diag gpio mode {gpio} {mode}') 2626 2627 def diag_get_gpio_mode(self, gpio: int) -> str: 2628 """Get the gpio mode.""" 2629 return self.__parse_str(self.execute_command(f'diag gpio mode {gpio}')) 2630 2631 def diag_echo(self, message: str) -> str: 2632 """RCP echoes the given message.""" 2633 return self.__parse_str(self.execute_command(f'diag echo {message}')) 2634 2635 def diag_echo_number(self, number: int) -> str: 2636 """RCP echoes the given message.""" 2637 return self.__parse_str(self.execute_command(f'diag echo -n {number}')) 2638 2639 def diag_get_powersettings(self) -> List[Dict[str, Any]]: 2640 """Get the currently used power settings table.""" 2641 result = [] 2642 output = self.execute_command(f'diag powersettings') 2643 2644 if len(output) < 3: 2645 raise UnexpectedCommandOutput(output) 2646 2647 if not output[-1].startswith('Done'): 2648 raise UnexpectedCommandOutput(output) 2649 2650 for line in output[2:-1]: 2651 data = line.split('|') 2652 2653 result.append({ 2654 'channel_start': int(data[1]), 2655 'channel_end': int(data[2]), 2656 'target_power': int(data[3]), 2657 'actual_power': int(data[4]), 2658 'raw_power_setting': self.__hex_to_bytes(data[5].lstrip().rstrip()), 2659 }) 2660 2661 return result 2662 2663 def diag_get_channel_powersettings(self, channel: int) -> Dict[str, Any]: 2664 """Gets the currently used power settings for the given channel.""" 2665 result = {} 2666 output = self.execute_command(f'diag powersettings {channel}') 2667 2668 if len(output) != 4: 2669 raise UnexpectedCommandOutput(output) 2670 2671 if not output[-1].startswith('Done'): 2672 raise UnexpectedCommandOutput(output) 2673 2674 result['target_power'] = int(output[0].split(':')[1]) 2675 result['actual_power'] = int(output[1].split(':')[1]) 2676 result['raw_power_setting'] = self.__hex_to_bytes(output[2].split(':')[1].lstrip().rstrip()) 2677 2678 return result 2679 2680 def diag_get_rawpowersetting(self) -> str: 2681 """Get the raw power setting.""" 2682 return self.__parse_str(self.execute_command('diag rawpowersetting')) 2683 2684 def diag_set_rawpowersetting(self, rawpowersetting: str): 2685 """Set the raw power setting.""" 2686 self.execute_command(f'diag rawpowersetting {rawpowersetting}') 2687 2688 def diag_enable_rawpowersetting(self): 2689 """Enable the raw power setting.""" 2690 self.execute_command('diag rawpowersetting enable') 2691 2692 def diag_disable_rawpowersetting(self): 2693 """Disable the raw power setting.""" 2694 self.execute_command('diag rawpowersetting disable') 2695 2696 def is_command_supported(self, command: str) -> bool: 2697 """Check whether the the given command is supported by the device.""" 2698 output = self.__otcmd.execute_command(command, timeout=10) 2699 2700 if re.match("Error \d+: \w*", output[-1]): 2701 return False 2702 2703 return True 2704 2705 # 2706 # Network management utilities 2707 # 2708 def create_dataset(self, 2709 active_timestamp: Optional[int] = None, 2710 channel: Optional[int] = None, 2711 channel_mask: Optional[int] = None, 2712 extpanid: Optional[str] = None, 2713 mesh_local_prefix: Optional[str] = None, 2714 network_key: Optional[str] = None, 2715 network_name: Optional[str] = None, 2716 panid: Optional[int] = None, 2717 pskc: Optional[str] = None, 2718 security_policy: Optional[tuple] = None, 2719 pending_timestamp: Optional[int] = None) -> bytes: 2720 """Creates a new Operational Dataset with given parameters.""" 2721 self.dataset_clear_buffer() 2722 self.dataset_init_buffer() 2723 self.dataset_set_buffer(active_timestamp, channel, channel_mask, extpanid, mesh_local_prefix, network_key, 2724 network_name, panid, pskc, security_policy, pending_timestamp) 2725 return self.get_dataset_tlvs_bytes() 2726 2727 def join(self, dataset: bytes) -> None: 2728 """Joins to a Thread network with given Active Operational Dataset.""" 2729 self.set_dataset_bytes('active', dataset) 2730 self.ifconfig_up() 2731 self.thread_start() 2732 2733 def leave(self) -> None: 2734 """Leaves from the Thread network.""" 2735 self.thread_stop() 2736 self.ifconfig_down() 2737 2738 def wait_for(self, command: str, expect_line: Optional[Union[str, Pattern, Collection[Any]]], timeout: float = 60): 2739 """Wait for the expected output by periodically executing the given command.""" 2740 success = False 2741 2742 while timeout > 0: 2743 output = self.execute_command(command) 2744 if any(match_line(line, expect_line) for line in output): 2745 success = True 2746 break 2747 2748 self.__otcmd.wait(1) 2749 timeout -= 1 2750 2751 if not success: 2752 raise ExpectLineTimeoutError(expect_line) 2753 2754 # 2755 # Other TODOs 2756 # 2757 # TODO: netstat 2758 # TODO: networkdiagnostic get <addr> <type> .. 2759 # TODO: networkdiagnostic reset <addr> <type> .. 2760 # TODO: parent 2761 # TODO: pskc [-p] <key>|<passphrase> 2762 # 2763 2764 # 2765 # Platform Commands Utilities 2766 # 2767 def support_iperf3(self) -> bool: 2768 """Check whether the platform supports iperf3.""" 2769 # 2770 # Command example: 2771 # 2772 # $ command -v iperf3 2773 # /usr/bin/iperf3 2774 # 2775 ret = False 2776 output = self.execute_platform_command('command -v iperf3') 2777 if len(output) > 0 and 'iperf3' in output[0]: 2778 ret = True 2779 2780 return ret 2781 2782 def iperf3_client(self, 2783 host: Union[str, Ip6Addr], 2784 ipv6: bool = True, 2785 udp: bool = True, 2786 bind_address: Optional[Union[str, Ip6Addr]] = None, 2787 bitrate: int = 10000, 2788 interval: int = 10, 2789 transmit_time: int = 10, 2790 length: Optional[int] = None) -> Dict[str, Dict[str, Any]]: 2791 """Run iperf3 in client mode. 2792 2793 :param host: The host IPv6 address to send iperf3 traffic. 2794 :param ipv6: True to use IPv6, False to use IPv4 (default IPv6). 2795 :param udp: True to use UDP, False to use TCP (default UDP). 2796 :param bind_address: The local address to be bound. 2797 :param bitrate: The target bitrate in bits/sec (default 10000 bit/sec). 2798 :param interval: Seconds between periodic throughput reports (default 10 sec). 2799 :param transmit_time: Time in seconds to transmit for (default 10 secs) 2800 :param length: Length of buffer to read or write (default None). 2801 """ 2802 # 2803 # Iperf3 client example: 2804 # 2805 # $ iperf3 -6 -c fdd6:f5cf:d32d:8d88:a98b:cf7c:2ed2:691a -u -b 90000 -i 20 -t 10 -l 1232 -f k 2806 # Connecting to host fdd6:f5cf:d32d:8d88:a98b:cf7c:2ed2:691a, port 5201 2807 # [ 5] local fdd6:f5cf:d32d:8d88:0:ff:fe00:fc00 port 59495 connected to fdd6:f5cf:d32d:8d88:a98b:cf7c:2ed2:691a port 5201 2808 # [ ID] Interval Transfer Bitrate Total Datagrams 2809 # [ 5] 0.00-10.00 sec 111 KBytes 90.7 Kbits/sec 92 2810 # - - - - - - - - - - - - - - - - - - - - - - - - - 2811 # [ ID] Interval Transfer Bitrate Jitter Lost/Total Datagrams 2812 # [ 5] 0.00-10.00 sec 111 KBytes 90.7 Kbits/sec 0.000 ms 0/92 (0%) sender 2813 # [ 5] 0.00-10.96 sec 99.9 KBytes 74.7 Kbits/sec 30.157 ms 9/92 (9.8%) receiver 2814 # 2815 # iperf Done. 2816 # 2817 2818 wait_time = 10 2819 client_option = f'-c {host}' 2820 version_option = "-6" if ipv6 else "-4" 2821 udp_option = '-u' if udp else '' 2822 bind_option = f'-B {bind_address}' if bind_address else '' 2823 bitrate_option = f'-b {bitrate}' 2824 interval_option = f'-i {interval}' 2825 time_option = f'-t {transmit_time}' 2826 length_option = f'-l {length}' if length else '' 2827 format_option = '-f k' 2828 2829 cmd = f'iperf3 {version_option} {client_option} {udp_option} {bitrate_option} {interval_option} {time_option} {length_option} {format_option}' 2830 output = self.execute_platform_command(cmd, timeout=transmit_time + wait_time) 2831 2832 results = {} 2833 for line in output: 2834 fields = line.split() 2835 if len(fields) != 13: 2836 continue 2837 2838 if fields[-1] == 'sender': 2839 results['sender'] = self.__parse_iperf3_report(line) 2840 elif fields[-1] == 'receiver': 2841 results['receiver'] = self.__parse_iperf3_report(line) 2842 2843 return results 2844 2845 def iperf3_server(self, 2846 bind_address: Optional[Union[str, Ip6Addr]] = None, 2847 interval: int = 10, 2848 timeout: int = 60) -> Dict[str, Any]: 2849 """Run iperf3 in server mode. 2850 2851 :param bind_address: The local address to be bound. 2852 :param interval: Seconds between periodic throughput reports (default 10 sec). 2853 :param timeout: Timeout in seconds to abort the program (default 60 secs) 2854 """ 2855 # 2856 # Iperf3 server example: 2857 # 2858 # $ iperf3 -s -1 -B fdd6:f5cf:d32d:8d88:a98b:cf7c:2ed2:691a -i 50 -f k 2859 # ----------------------------------------------------------- 2860 # Server listening on 5201 2861 # ----------------------------------------------------------- 2862 # Accepted connection from fdd6:f5cf:d32d:8d88:0:ff:fe00:fc00, port 44080 2863 # [ 5] local fdd6:f5cf:d32d:8d88:a98b:cf7c:2ed2:691a port 5201 connected to fdd6:f5cf:d32d:8d88:0:ff:fe00:fc00 port 59495 2864 # [ ID] Interval Transfer Bitrate Jitter Lost/Total Datagrams 2865 # [ 5] 0.00-10.96 sec 99.9 KBytes 74.7 Kbits/sec 30.157 ms 9/92 (9.8%) 2866 # - - - - - - - - - - - - - - - - - - - - - - - - - 2867 # [ ID] Interval Transfer Bitrate Jitter Lost/Total Datagrams 2868 # [ 5] 0.00-10.96 sec 99.9 KBytes 74.7 Kbits/sec 30.157 ms 9/92 (9.8%) receiver 2869 # 2870 2871 bind_option = f'-B {bind_address}' if bind_address else '' 2872 interval_option = f'-i {interval}' 2873 format_option = '-f k' 2874 2875 cmd = f'iperf3 -s -1 {bind_option} {interval_option} {format_option}' 2876 output = self.execute_platform_command(cmd, timeout) 2877 2878 results = {} 2879 for line in output: 2880 fields = line.split() 2881 if len(fields) == 13 and fields[-1] == 'receiver': 2882 results = self.__parse_iperf3_report(line) 2883 2884 return results 2885 2886 def __parse_iperf3_report(self, line: str) -> Dict[str, Any]: 2887 results = {} 2888 fields = line.split() 2889 format_unit = 1000 2890 2891 if len(fields) == 13 and (fields[-1] == 'sender' or fields[-1] == 'receiver'): 2892 results['id'] = int(fields[1].replace(']', '')) 2893 results['interval_start'] = float(fields[2].split('-')[0]) 2894 results['interval_end'] = float(fields[2].split('-')[1]) 2895 results['transfer'] = int(float(fields[4]) * format_unit) 2896 results['bitrate'] = int(float(fields[6]) * format_unit) 2897 results['jitter'] = float(fields[8]) 2898 results['lossrate'] = float(fields[11].replace('(', '').replace(')', '').replace('%', '')) / 100 2899 results['datagrams'] = fields[12] 2900 2901 return results 2902 2903 # 2904 # Private methods 2905 # 2906 2907 def __parse_str(self, output: List[str]) -> str: 2908 if len(output) != 1: 2909 raise UnexpectedCommandOutput(output) 2910 2911 return output[0] 2912 2913 def __parse_int_list(self, output: List[str]) -> List[int]: 2914 line = self.__parse_str(output) 2915 return list(map(int, line.strip().split())) 2916 2917 def __parse_ip6addr(self, output: List[str]) -> Ip6Addr: 2918 return Ip6Addr(self.__parse_str(output)) 2919 2920 def __parse_ip6addr_list(self, output: List[str]) -> List[Ip6Addr]: 2921 return [Ip6Addr(line) for line in output] 2922 2923 def __parse_int(self, output: List[str], base=10) -> int: 2924 if len(output) != 1: 2925 raise UnexpectedCommandOutput(output) 2926 2927 return int(output[0], base) 2928 2929 def __parse_network_key(self, output: List[str]) -> str: 2930 networkkey = self.__parse_str(output) 2931 2932 try: 2933 self.__validate_network_key(networkkey) 2934 except ValueError: 2935 raise UnexpectedCommandOutput(output) 2936 2937 return networkkey 2938 2939 def __validate_network_key(self, networkkey: str): 2940 if len(networkkey) != 32: 2941 raise ValueError(networkkey) 2942 2943 int(networkkey, 16) 2944 2945 def __parse_hex64b(self, output: List[str]) -> str: 2946 extaddr = self.__parse_str(output) 2947 2948 try: 2949 self.__validate_hex64b(extaddr) 2950 except ValueError: 2951 raise UnexpectedCommandOutput(output) 2952 2953 return extaddr 2954 2955 __parse_extaddr = __parse_hex64b 2956 __parse_extpanid = __parse_hex64b 2957 __parse_eui64 = __parse_hex64b 2958 __parse_joiner_id = __parse_hex64b 2959 2960 def __validate_hex64b(self, extaddr: str): 2961 if len(extaddr) != 16: 2962 raise ValueError(extaddr) 2963 2964 self.__validate_hex(extaddr) 2965 2966 def __validate_hex(self, hexstr: str): 2967 if len(hexstr) % 2 != 0: 2968 raise ValueError(hexstr) 2969 2970 for i in range(0, len(hexstr), 2): 2971 int(hexstr[i:i + 2], 16) 2972 2973 __validate_extaddr = __validate_hex64b 2974 __validate_extpanid = __validate_hex64b 2975 2976 def __parse_Enabled_or_Disabled(self, output: List[str]) -> bool: 2977 return self.__parse_values(output, Enabled=True, Disabled=False) 2978 2979 def __parse_values(self, output: List[str], **vals) -> Any: 2980 val = self.__parse_str(output) 2981 if val not in vals: 2982 raise UnexpectedCommandOutput(output) 2983 2984 return vals[val] 2985 2986 def __validate_hex_or_bytes(self, data: Union[str, bytes]) -> str: 2987 if isinstance(data, bytes): 2988 return ''.join('%02x' % c for c in data) 2989 else: 2990 self.__validate_hex(data) 2991 return data 2992 2993 def __hex_to_bytes(self, hexstr: str) -> bytes: 2994 self.__validate_hex(hexstr) 2995 return bytes(int(hexstr[i:i + 2], 16) for i in range(0, len(hexstr), 2)) 2996 2997 def __bytes_to_hex(self, data: bytes) -> str: 2998 return ''.join('%02x' % b for b in data) 2999 3000 def __escape_escapable(self, s: str) -> str: 3001 """Escape CLI escapable characters in the given string. 3002 """ 3003 escapable_chars = '\\ \t\r\n' 3004 for char in escapable_chars: 3005 s = s.replace(char, '\\%s' % char) 3006 return s 3007 3008 def __txt_to_hex(self, txt: Dict[str, Union[str, bytes, bool]]) -> str: 3009 txt_bin = b'' 3010 for k, v in txt.items(): 3011 assert '=' not in k, 'TXT key must not contain `=`' 3012 3013 if isinstance(v, str): 3014 entry = f'{k}={v}'.encode('utf8') 3015 elif isinstance(v, bytes): 3016 entry = f'{k}='.encode('utf8') + v 3017 else: 3018 assert v is True, 'TXT val must be str or bytes or True' 3019 entry = k.encode('utf8') 3020 3021 assert len(entry) <= 255, 'TXT entry is too long' 3022 3023 txt_bin += bytes([len(entry)]) 3024 txt_bin += entry 3025 3026 return ''.join('%02x' % b for b in txt_bin) 3027 3028 3029def connect_cli_sim(executable: str, nodeid: int, simulator: Optional[Simulator] = None) -> OTCI: 3030 cli_handler = connectors.OtCliSim(executable, nodeid, simulator=simulator) 3031 cmd_handler = OtCliCommandRunner(cli_handler) 3032 return OTCI(cmd_handler) 3033 3034 3035def connect_cli_serial(dev: str, baudrate=115200) -> OTCI: 3036 cli_handler = connectors.OtCliSerial(dev, baudrate) 3037 cmd_handler = OtCliCommandRunner(cli_handler) 3038 return OTCI(cmd_handler) 3039 3040 3041def connect_ncp_sim(executable: str, nodeid: int, simulator: Optional[Simulator] = None) -> OTCI: 3042 ncp_handler = connectors.OtNcpSim(executable, nodeid, simulator=simulator) 3043 cmd_handler = OtCliCommandRunner(ncp_handler, is_spinel_cli=True) 3044 return OTCI(cmd_handler) 3045 3046 3047def connect_otbr_ssh(host: str, port: int = 22, username='pi', password='raspberry', sudo=True): 3048 cmd_handler = OtbrSshCommandRunner(host, port, username, password, sudo=sudo) 3049 return OTCI(cmd_handler) 3050 3051 3052def connect_otbr_adb_tcp(host: str, port: int = 5555): 3053 cmd_handler = OtbrAdbTcpCommandRunner(host, port) 3054 return OTCI(cmd_handler) 3055 3056 3057def connect_otbr_adb_usb(serial: str): 3058 cmd_handler = OtbrAdbUsbCommandRunner(serial) 3059 return OTCI(cmd_handler) 3060 3061 3062def connect_cmd_handler(cmd_handler: OTCommandHandler) -> OTCI: 3063 return OTCI(cmd_handler) 3064