1#!/usr/bin/env python3 2# 3# Copyright (c) 2024, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import argparse 31import logging 32import os 33import sys 34import textwrap 35import threading 36 37from typing import List 38 39import otci 40from otci import OTCI 41from otci.types import Ip6Addr 42 43logging.basicConfig(level=logging.WARNING) 44 45 46class RcpCaps(object): 47 """ 48 This class represents an OpenThread RCP capability test instance. 49 """ 50 51 def __init__(self): 52 self.__dut = self.__connect_dut() 53 self.__ref = self.__connect_reference_device() 54 55 def test_diag_commands(self): 56 """Test all diag commands.""" 57 self.__dut.factory_reset() 58 self.__ref.factory_reset() 59 60 ret = self.__dut.is_command_supported('diag start') 61 if ret is False: 62 print('All diag commands are not supported') 63 return 64 65 self.__dut.diag_start() 66 self.__ref.diag_start() 67 68 self.__test_diag_channel() 69 self.__test_diag_power() 70 self.__test_diag_radio() 71 self.__test_diag_repeat() 72 self.__test_diag_send() 73 self.__test_diag_frame() 74 self.__test_diag_echo() 75 self.__test_diag_utils() 76 self.__test_diag_rawpowersetting() 77 self.__test_diag_powersettings() 78 self.__test_diag_gpio_mode() 79 self.__test_diag_gpio_value() 80 81 self.__ref.diag_stop() 82 self.__dut.diag_stop() 83 84 def test_csl(self): 85 """Test whether the DUT supports CSL transmitter.""" 86 self.__dataset = self.__get_default_dataset() 87 self.__test_csl_transmitter() 88 89 def test_data_poll(self): 90 """Test whether the DUT supports data poll parent and child.""" 91 self.__dataset = self.__get_default_dataset() 92 self.__test_data_poll_parent() 93 self.__test_data_poll_child() 94 95 def test_throughput(self): 96 """Test Thread network 1 hop throughput.""" 97 if not self.__dut.support_iperf3(): 98 print("The DUT doesn't support the tool iperf3") 99 return 100 101 if not self.__ref.support_iperf3(): 102 print("The reference device doesn't support the tool iperf3") 103 return 104 105 bitrate = 90000 106 length = 1232 107 transmit_time = 30 108 max_wait_time = 30 109 timeout = transmit_time + max_wait_time 110 111 self.__dut.factory_reset() 112 self.__ref.factory_reset() 113 114 dataset = self.__get_default_dataset() 115 116 self.__dut.join(dataset) 117 self.__dut.wait_for('state', 'leader') 118 119 self.__ref.set_router_selection_jitter(1) 120 self.__ref.join(dataset) 121 self.__ref.wait_for('state', ['child', 'router']) 122 123 ref_mleid = self.__ref.get_ipaddr_mleid() 124 125 ref_iperf3_server = threading.Thread(target=self.__ref_iperf3_server_task, 126 args=(ref_mleid, timeout), 127 daemon=True) 128 ref_iperf3_server.start() 129 self.__dut.wait(1) 130 131 results = self.__dut.iperf3_client(host=ref_mleid, bitrate=bitrate, transmit_time=transmit_time, length=length) 132 ref_iperf3_server.join() 133 134 if not results: 135 print('Failed to run the iperf3') 136 return 137 138 self.__output_format_string('Throughput', self.__bitrate_to_string(results['receiver']['bitrate'])) 139 140 def test_link_metrics(self): 141 """Test whether the DUT supports Link Metrics Initiator and Subject.""" 142 self.__dataset = self.__get_default_dataset() 143 144 self.__dut.factory_reset() 145 self.__ref.factory_reset() 146 147 self.__dut.join(self.__dataset) 148 self.__dut.wait_for('state', 'leader') 149 150 self.__ref.join(self.__dataset) 151 self.__ref.wait_for('state', ['child', 'router']) 152 153 test_case = 'Link Metrics Initiator' 154 ref_linklocal_address = self.__ref.get_ipaddr_linklocal() 155 ret = self.__run_link_metrics_test_commands(initiator=self.__dut, subject_address=ref_linklocal_address) 156 self.__output_format_bool(test_case, ret) 157 158 test_case = 'Link Metrics Subject' 159 dut_linklocal_address = self.__dut.get_ipaddr_linklocal() 160 ret = self.__run_link_metrics_test_commands(initiator=self.__ref, subject_address=dut_linklocal_address) 161 self.__output_format_bool(test_case, ret) 162 163 self.__ref.leave() 164 self.__dut.leave() 165 166 # 167 # Private methods 168 # 169 def __run_link_metrics_test_commands(self, initiator: OTCI, subject_address: Ip6Addr) -> bool: 170 seriesid = 1 171 series_flags = 'ldra' 172 link_metrics_flags = 'qr' 173 probe_length = 10 174 175 if not initiator.linkmetrics_config_enhanced_ack_register(subject_address, link_metrics_flags): 176 return False 177 178 if not initiator.linkmetrics_config_forward(subject_address, seriesid, series_flags, link_metrics_flags): 179 return False 180 181 initiator.linkmetrics_probe(subject_address, seriesid, probe_length) 182 183 results = initiator.linkmetrics_request_single(subject_address, link_metrics_flags) 184 if not ('lqi' in results.keys() and 'rssi' in results.keys()): 185 return False 186 187 results = initiator.linkmetrics_request_forward(subject_address, seriesid) 188 if not ('lqi' in results.keys() and 'rssi' in results.keys()): 189 return False 190 191 if not initiator.linkmetrics_config_enhanced_ack_clear(subject_address): 192 return False 193 194 return True 195 196 def __ref_iperf3_server_task(self, bind_address: str, timeout: int): 197 self.__ref.iperf3_server(bind_address, timeout=timeout) 198 199 def __bitrate_to_string(self, bitrate: float): 200 units = ['bits/sec', 'Kbits/sec', 'Mbits/sec', 'Gbits/sec', 'Tbits/sec'] 201 unit_index = 0 202 203 while bitrate >= 1000 and unit_index < len(units) - 1: 204 bitrate /= 1000 205 unit_index += 1 206 207 return f'{bitrate:.2f} {units[unit_index]}' 208 209 def __get_default_dataset(self): 210 return self.__dut.create_dataset(channel=20, network_key='00112233445566778899aabbccddcafe') 211 212 def __test_csl_transmitter(self): 213 packets = 10 214 215 self.__dut.factory_reset() 216 self.__ref.factory_reset() 217 218 self.__dut.join(self.__dataset) 219 self.__dut.wait_for('state', 'leader') 220 221 # Set the reference device as an SSED 222 self.__ref.set_mode('-') 223 self.__ref.config_csl(channel=15, period=320000, timeout=100) 224 self.__ref.join(self.__dataset) 225 self.__ref.wait_for('state', 'child') 226 227 child_table = self.__dut.get_child_table() 228 ret = len(child_table) == 1 and child_table[1]['csl'] 229 230 if ret: 231 ref_mleid = self.__ref.get_ipaddr_mleid() 232 result = self.__dut.ping(ref_mleid, count=packets, interval=1) 233 ret = result['transmitted_packets'] == result['received_packets'] == packets 234 235 self.__dut.leave() 236 self.__ref.leave() 237 238 self.__output_format_bool('CSL Transmitter', ret) 239 240 def __test_data_poll_parent(self): 241 packets = 10 242 243 self.__dut.factory_reset() 244 self.__ref.factory_reset() 245 246 self.__dut.join(self.__dataset) 247 self.__dut.wait_for('state', 'leader') 248 249 # Set the reference device as an SED 250 self.__ref.set_mode('-') 251 self.__ref.set_poll_period(500) 252 self.__ref.join(self.__dataset) 253 self.__ref.wait_for('state', 'child') 254 255 dut_mleid = self.__dut.get_ipaddr_mleid() 256 result = self.__ref.ping(dut_mleid, count=packets, interval=1) 257 258 self.__dut.leave() 259 self.__ref.leave() 260 261 ret = result['transmitted_packets'] == result['received_packets'] == packets 262 self.__output_format_bool('Data Poll Parent', ret) 263 264 def __test_data_poll_child(self): 265 packets = 10 266 267 self.__dut.factory_reset() 268 self.__ref.factory_reset() 269 270 self.__ref.join(self.__dataset) 271 self.__ref.wait_for('state', 'leader') 272 273 # Set the DUT as an SED 274 self.__dut.set_mode('-') 275 self.__dut.set_poll_period(500) 276 self.__dut.join(self.__dataset) 277 self.__dut.wait_for('state', 'child') 278 279 dut_mleid = self.__dut.get_ipaddr_mleid() 280 result = self.__ref.ping(dut_mleid, count=packets, interval=1) 281 282 self.__dut.leave() 283 self.__ref.leave() 284 285 ret = result['transmitted_packets'] == result['received_packets'] == packets 286 self.__output_format_bool('Data Poll Child', ret) 287 288 def __test_diag_channel(self): 289 channel = 20 290 commands = ['diag channel', f'diag channel {channel}'] 291 292 if self.__support_commands(commands): 293 self.__dut.diag_set_channel(channel) 294 value = self.__dut.diag_get_channel() 295 ret = value == channel 296 else: 297 ret = False 298 299 self.__output_results(commands, ret) 300 301 def __test_diag_power(self): 302 power = self.__get_dut_diag_power() 303 commands = ['diag power', f'diag power {power}'] 304 305 if self.__support_commands(commands): 306 self.__dut.diag_set_power(power) 307 value = self.__dut.diag_get_power() 308 ret = value == power 309 else: 310 ret = False 311 312 self.__output_results(commands, ret) 313 314 def __test_diag_radio(self): 315 commands = ['diag radio receive', 'diag radio sleep', 'diag radio state'] 316 317 if self.__support_commands(commands): 318 self.__dut.diag_radio_receive() 319 receive_state = self.__dut.diag_get_radio_state() 320 self.__dut.wait(0.1) 321 self.__dut.diag_radio_sleep() 322 sleep_state = self.__dut.diag_get_radio_state() 323 324 ret = sleep_state == 'sleep' and receive_state == 'receive' 325 else: 326 ret = False 327 328 self.__output_results(commands, ret) 329 330 def __test_diag_gpio_value(self): 331 gpio = self.__get_dut_diag_gpio() 332 commands = [f'diag gpio get {gpio}', f'diag gpio set {gpio} 0', f'diag gpio set {gpio} 1'] 333 334 if self.__support_commands(commands): 335 self.__dut.diag_set_gpio_value(gpio, 0) 336 value_0 = self.__dut.diag_get_gpio_value(gpio) 337 self.__dut.diag_set_gpio_value(gpio, 1) 338 value_1 = self.__dut.diag_get_gpio_value(gpio) 339 340 ret = value_0 == 0 and value_1 == 1 341 else: 342 ret = False 343 344 self.__output_results(commands, ret) 345 346 def __test_diag_gpio_mode(self): 347 gpio = self.__get_dut_diag_gpio() 348 commands = [f'diag gpio mode {gpio}', f'diag gpio mode {gpio} in', f'diag gpio mode {gpio} out'] 349 350 if self.__support_commands(commands): 351 self.__dut.diag_set_gpio_mode(gpio, 'in') 352 mode_in = self.__dut.diag_get_gpio_mode(gpio) 353 self.__dut.diag_set_gpio_value(gpio, 'out') 354 mode_out = self.__dut.diag_get_gpio_mode(gpio) 355 356 ret = mode_in == 'in' and mode_out == 'out' 357 else: 358 ret = False 359 360 self.__output_results(commands, ret) 361 362 def __test_diag_echo(self): 363 echo_msg = '0123456789' 364 cmd_diag_echo = f'diag echo {echo_msg}' 365 cmd_diag_echo_num = f'diag echo -n 10' 366 367 if self.__dut.is_command_supported(cmd_diag_echo): 368 reply = self.__dut.diag_echo(echo_msg) 369 ret = reply == echo_msg 370 else: 371 ret = False 372 self.__output_format_bool(cmd_diag_echo, ret) 373 374 if self.__dut.is_command_supported(cmd_diag_echo_num): 375 reply = self.__dut.diag_echo_number(10) 376 ret = reply == echo_msg 377 else: 378 ret = False 379 self.__output_format_bool(cmd_diag_echo_num, ret) 380 381 def __test_diag_utils(self): 382 commands = [ 383 'diag cw start', 'diag cw stop', 'diag stream start', 'diag stream stop', 'diag stats', 'diag stats clear' 384 ] 385 386 for command in commands: 387 ret = self.__dut.is_command_supported(command) 388 self.__output_format_bool(command, ret) 389 390 def __test_diag_rawpowersetting(self): 391 rawpowersetting = self.__get_dut_diag_raw_power_setting() 392 commands = [ 393 'diag rawpowersetting enable', f'diag rawpowersetting {rawpowersetting}', 'diag rawpowersetting', 394 'diag rawpowersetting disable' 395 ] 396 397 if self.__support_commands(commands): 398 self.__dut.diag_enable_rawpowersetting() 399 self.__dut.diag_set_rawpowersetting(rawpowersetting) 400 reply = self.__dut.diag_get_rawpowersetting() 401 self.__dut.diag_disable_rawpowersetting() 402 403 ret = reply == rawpowersetting 404 else: 405 ret = False 406 407 self.__output_results(commands, ret) 408 409 def __test_diag_powersettings(self): 410 commands = ['diag powersettings', 'diag powersettings 20'] 411 412 if self.__support_commands(commands): 413 powersettings = self.__dut.diag_get_powersettings() 414 ret = len(powersettings) > 0 415 else: 416 ret = False 417 418 self.__output_results(commands, ret) 419 420 def __test_diag_send(self): 421 packets = 100 422 threshold = 80 423 length = 64 424 channel = 20 425 commands = [f'diag send {packets} {length}', f'diag stats', f'diag stats clear'] 426 427 if self.__support_commands(commands): 428 self.__dut.wait(1) 429 self.__dut.diag_set_channel(channel) 430 self.__ref.diag_set_channel(channel) 431 self.__ref.diag_radio_receive() 432 433 self.__dut.diag_stats_clear() 434 self.__ref.diag_stats_clear() 435 436 self.__dut.diag_send(packets, length) 437 self.__dut.wait(1) 438 dut_stats = self.__dut.diag_get_stats() 439 ref_stats = self.__ref.diag_get_stats() 440 441 ret = dut_stats['sent_packets'] == packets and ref_stats['received_packets'] > threshold 442 else: 443 ret = False 444 445 self.__output_results(commands, ret) 446 447 def __test_diag_repeat(self): 448 delay = 10 449 threshold = 80 450 length = 64 451 channel = 20 452 cmd_diag_repeat = f'diag repeat {delay} {length}' 453 cmd_diag_repeat_stop = 'diag repeat stop' 454 commands = [cmd_diag_repeat, 'diag repeat stop', 'diag stats', 'diag stats clear'] 455 456 if self.__support_commands(commands): 457 self.__dut.diag_set_channel(channel) 458 self.__ref.diag_set_channel(channel) 459 self.__ref.diag_radio_receive() 460 461 self.__dut.diag_stats_clear() 462 self.__ref.diag_stats_clear() 463 464 self.__dut.diag_repeat(delay, length) 465 self.__dut.wait(1) 466 self.__dut.diag_repeat_stop() 467 dut_stats = self.__dut.diag_get_stats() 468 ref_stats = self.__ref.diag_get_stats() 469 470 ret = dut_stats['sent_packets'] > threshold and ref_stats['received_packets'] > threshold 471 else: 472 ret = False 473 474 self.__output_format_bool(cmd_diag_repeat, ret) 475 self.__output_format_bool(cmd_diag_repeat_stop, ret) 476 477 def __test_diag_frame(self): 478 packets = 100 479 threshold = 80 480 channel = 20 481 frame = '00010203040506070809' 482 cmd_diag_frame = f'diag frame {frame}' 483 commands = [cmd_diag_frame, f'diag send {packets}', f'diag stats', f'diag stats clear'] 484 485 if self.__support_commands(commands): 486 self.__dut.wait(1) 487 self.__dut.diag_set_channel(channel) 488 self.__ref.diag_set_channel(channel) 489 self.__ref.diag_radio_receive() 490 491 self.__dut.diag_stats_clear() 492 self.__ref.diag_stats_clear() 493 494 self.__ref.diag_frame(frame) 495 self.__dut.diag_send(packets, None) 496 self.__dut.wait(1) 497 dut_stats = self.__dut.diag_get_stats() 498 ref_stats = self.__ref.diag_get_stats() 499 500 ret = dut_stats['sent_packets'] == packets and ref_stats['received_packets'] > threshold 501 else: 502 ret = False 503 504 self.__output_format_bool(cmd_diag_frame, ret) 505 506 def __support_commands(self, commands: List[str]) -> bool: 507 ret = True 508 509 for command in commands: 510 if self.__dut.is_command_supported(command) is False: 511 ret = False 512 break 513 514 return ret 515 516 def __output_results(self, commands: List[str], support: bool): 517 for command in commands: 518 self.__output_format_bool(command, support) 519 520 def __get_dut_diag_power(self) -> int: 521 return int(os.getenv('DUT_DIAG_POWER', '10')) 522 523 def __get_dut_diag_gpio(self) -> int: 524 return int(os.getenv('DUT_DIAG_GPIO', '0')) 525 526 def __get_dut_diag_raw_power_setting(self) -> str: 527 return os.getenv('DUT_DIAG_RAW_POWER_SETTING', '112233') 528 529 def __connect_dut(self) -> OTCI: 530 if os.getenv('DUT_ADB_TCP'): 531 node = otci.connect_otbr_adb_tcp(os.getenv('DUT_ADB_TCP')) 532 elif os.getenv('DUT_ADB_USB'): 533 node = otci.connect_otbr_adb_usb(os.getenv('DUT_ADB_USB')) 534 elif os.getenv('DUT_SSH'): 535 node = otci.connect_otbr_ssh(os.getenv('DUT_SSH')) 536 else: 537 self.__fail("Please set DUT_ADB_TCP, DUT_ADB_USB or DUT_SSH to connect to the DUT device.") 538 539 return node 540 541 def __connect_reference_device(self) -> OTCI: 542 if os.getenv('REF_CLI_SERIAL'): 543 node = otci.connect_cli_serial(os.getenv('REF_CLI_SERIAL')) 544 elif os.getenv('REF_SSH'): 545 node = otci.connect_otbr_ssh(os.getenv('REF_SSH')) 546 elif os.getenv('REF_ADB_USB'): 547 node = otci.connect_otbr_adb_usb(os.getenv('REF_ADB_USB')) 548 else: 549 self.__fail("Please set REF_CLI_SERIAL, REF_SSH or REF_ADB_USB to connect to the reference device.") 550 551 return node 552 553 def __output_format_string(self, name: str, value: str): 554 prefix = '{0:-<58}'.format('{} '.format(name)) 555 print(f'{prefix} {value}') 556 557 def __output_format_bool(self, name: str, value: bool): 558 self.__output_format_string(name, 'OK' if value else 'NotSupported') 559 560 def __fail(self, value: str): 561 print(f'{value}') 562 sys.exit(1) 563 564 565def parse_arguments(): 566 """Parse all arguments.""" 567 description_msg = 'This script is used for testing RCP capabilities.' 568 epilog_msg = textwrap.dedent( 569 'Device Interfaces:\r\n' 570 ' DUT_SSH=<device_ip> Connect to the DUT via ssh\r\n' 571 ' DUT_ADB_TCP=<device_ip> Connect to the DUT via adb tcp\r\n' 572 ' DUT_ADB_USB=<serial_number> Connect to the DUT via adb usb\r\n' 573 ' REF_CLI_SERIAL=<serial_device> Connect to the reference device via cli serial port\r\n' 574 ' REF_ADB_USB=<serial_number> Connect to the reference device via adb usb\r\n' 575 ' REF_SSH=<device_ip> Connect to the reference device via ssh\r\n' 576 '\r\n' 577 'Example:\r\n' 578 f' DUT_ADB_USB=1169UC2F2T0M95OR REF_CLI_SERIAL=/dev/ttyACM0 python3 {sys.argv[0]} -d\r\n') 579 580 parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter, 581 description=description_msg, 582 epilog=epilog_msg) 583 584 parser.add_argument( 585 '-c', 586 '--csl', 587 action='store_true', 588 default=False, 589 help='test whether the RCP supports CSL transmitter', 590 ) 591 592 parser.add_argument( 593 '-l', 594 '--link-metrics', 595 action='store_true', 596 default=False, 597 help='test whether the RCP supports link metrics', 598 ) 599 600 parser.add_argument( 601 '-d', 602 '--diag-commands', 603 action='store_true', 604 default=False, 605 help='test whether the RCP supports all diag commands', 606 ) 607 608 parser.add_argument( 609 '-p', 610 '--data-poll', 611 action='store_true', 612 default=False, 613 help='test whether the RCP supports data poll', 614 ) 615 616 parser.add_argument( 617 '-t', 618 '--throughput', 619 action='store_true', 620 default=False, 621 help='test Thread network 1-hop throughput', 622 ) 623 624 parser.add_argument( 625 '-v', 626 '--verbose', 627 action='store_true', 628 default=False, 629 help='output verbose information', 630 ) 631 632 return parser.parse_args() 633 634 635def main(): 636 arguments = parse_arguments() 637 638 if arguments.verbose is True: 639 logger = logging.getLogger() 640 logger.setLevel(logging.DEBUG) 641 642 rcp_caps = RcpCaps() 643 644 if arguments.diag_commands is True: 645 rcp_caps.test_diag_commands() 646 647 if arguments.csl is True: 648 rcp_caps.test_csl() 649 650 if arguments.data_poll is True: 651 rcp_caps.test_data_poll() 652 653 if arguments.link_metrics is True: 654 rcp_caps.test_link_metrics() 655 656 if arguments.throughput: 657 rcp_caps.test_throughput() 658 659 660if __name__ == '__main__': 661 main() 662