1#!/usr/bin/env python3 2# 3# Copyright (c) 2024, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import argparse 31import logging 32import os 33import sys 34import textwrap 35import threading 36 37from typing import List 38 39import otci 40from otci import OTCI 41from otci.types import Ip6Addr 42 43CP_CAPABILITY_VERSION = "0.1.1-dev" 44 45logging.basicConfig(level=logging.WARNING) 46 47 48class RcpCaps(object): 49 """ 50 This class represents an OpenThread RCP capability test instance. 51 """ 52 53 DEFAULT_FORMAT_ALIGN_LENGTH = 58 # The default formatted string alignment length 54 55 def __init__(self): 56 self.__dut = self.__connect_dut() 57 self.__ref = self.__connect_reference_device() 58 59 def test_diag_commands(self): 60 """Test all diag commands.""" 61 self.__dut.factory_reset() 62 self.__ref.factory_reset() 63 64 ret = self.__dut.is_command_supported('diag start') 65 if ret is False: 66 print('All diag commands are not supported') 67 return 68 69 self.__dut.diag_start() 70 self.__ref.diag_start() 71 72 self.__test_diag_channel() 73 self.__test_diag_power() 74 self.__test_diag_radio() 75 self.__test_diag_repeat() 76 self.__test_diag_send() 77 self.__test_diag_frame() 78 self.__test_diag_echo() 79 self.__test_diag_utils() 80 self.__test_diag_rawpowersetting() 81 self.__test_diag_powersettings() 82 self.__test_diag_gpio_mode() 83 self.__test_diag_gpio_value() 84 85 self.__ref.diag_stop() 86 self.__dut.diag_stop() 87 88 def test_frame_format(self): 89 """Test whether the DUT supports sending and receiving 802.15.4 frames of all formats.""" 90 frames = [ 91 { 92 'name': 'ver:2003,Cmd,seq,dst[addr:short,pan:id],src[addr:no,pan:no],sec:no,ie:no,plen:0', 93 'psdu': '030800ffffffff070000' 94 }, 95 { 96 'name': 'ver:2003,Bcon,seq,dst[addr:no,pan:no],src[addr:extd,pan:id],sec:no,ie:no,plen:30', 97 'psdu': '00c000eeee0102030405060708ff0f000003514f70656e54687265616400000000000001020304050607080000' 98 }, 99 { 100 'name': 'ver:2006,Cmd,seq,dst[addr:short,pan:id],src[addr:short,pan:no],sec:l5,ie:no,plen:0', 101 'psdu': '4b98ddddddaaaabbbb0d708001020304050607081565' 102 }, 103 { 104 'name': 'ver:2006,Cmd,seq,dst[addr:extd,pan:id],src[addr:extd,pan:no],sec:l5,ie:no,plen:0', 105 'psdu': '4bdcdddddd102030405060708001020304050607080d6e54687265046400820ee803' 106 }, 107 { 108 'name': 'ver:2006,Data,seq,dst[addr:extd,pan:id],src[addr:extd,pan:id],sec:no,ie:no,plen:0', 109 'psdu': '01dcdddddd1020304050607080000001020304050607085468' 110 }, 111 { 112 'name': 'ver:2006,Data,seq,dst[addr:short,pan:id],src[addr:short,pan:id],sec:no,ie:no,plen:0', 113 'psdu': '0198ddddddaaaaeeeebbbb7080' 114 }, 115 { 116 'name': 'ver:2006,Data,seq,dst[addr:extd,pan:id],src[addr:no,pan:no],sec:no,ie:no,plen:0', 117 'psdu': '011cdddddd10203040506070800000' 118 }, 119 { 120 'name': 'ver:2006,Data,seq,dst[addr:short,pan:id],src[addr:no,pan:no],sec:no,ie:no,plen:0', 121 'psdu': '0118ddddddaaaa3040' 122 }, 123 { 124 'name': 'ver:2015,Data,seq,dst[addr:no,pan:no],src[addr:no,pan:no],sec:no,ie:no,plen:0', 125 'psdu': '0120dddddd' 126 }, 127 { 128 'name': 'ver:2015,Data,seq,dst[addr:no,pan:id],src[addr:no,pan:no],sec:no,ie:no,plen:0', 129 'psdu': '4120ddddddaaaa' 130 }, 131 { 132 'name': 'ver:2015,Data,seq,dst[addr:extd,pan:id],src[addr:no,pan:no],sec:no,ie:no,plen:0', 133 'psdu': '012cdddddd10203040506070800000' 134 }, 135 { 136 'name': 'ver:2015,Data,seq,dst[addr:extd,pan:no],src[addr:no,pan:no],sec:no,ie:no,plen:0', 137 'psdu': '412cdd10203040506070807080' 138 }, 139 { 140 'name': 'ver:2015,Data,seq,dst[addr:no,pan:no],src[addr:extd,pan:id],sec:no,ie:no,plen:0', 141 'psdu': '01e0ddeeee01020304050607080000' 142 }, 143 { 144 'name': 'ver:2015,Data,seq,dst[addr:no,pan:no],src[addr:extd,pan:no],sec:no,ie:no,plen:0', 145 'psdu': '41e0dd01020304050607080708' 146 }, 147 { 148 'name': 'ver:2015,Data,seq,dst[addr:extd,pan:id],src[addr:extd,pan:no],sec:no,ie:no,plen:0', 149 'psdu': '01ecdddddd102030405060708001020304050607080708' 150 }, 151 { 152 'name': 'ver:2015,Data,seq,dst[addr:extd,pan:no],src[addr:extd,pan:no],sec:no,ie:no,plen:0', 153 'psdu': '41ecdd102030405060708001020304050607080708' 154 }, 155 { 156 'name': 'ver:2015,Data,seq,dst[addr:short,pan:id],src[addr:short,pan:id],sec:no,ie:no,plen:0', 157 'psdu': '01a8ddddddaaaaeeeebbbb0102' 158 }, 159 { 160 'name': 'ver:2015,Data,seq,dst[addr:short,pan:id],src[addr:extd,pan:id],sec:no,ie:no,plen:0', 161 'psdu': '01e8ddddddaaaaeeee01020304050607080708' 162 }, 163 { 164 'name': 'ver:2015,Data,seq,dst[addr:extd,pan:id],src[addr:short,pan:id],sec:no,ie:no,plen:0', 165 'psdu': '01acdddddd1020304050607080eeeebbbb0708' 166 }, 167 { 168 'name': 'ver:2015,Data,seq,dst[addr:short,pan:id],src[addr:short,pan:id],sec:no,ie[csl],plen:0', 169 'psdu': '01aaddddddaaaaeeeebbbb040dc800e8030708' 170 }, 171 { 172 'name': 'ver:2015,Data,noseq,dst[addr:short,pan:id],src[addr:short,pan:id],sec:no,ie:no,plen:0', 173 'psdu': '01a9ddddaaaaeeeebbbbbb04' 174 }, 175 ] 176 177 self.__dut.factory_reset() 178 self.__ref.factory_reset() 179 180 ret = self.__dut.is_command_supported('diag start') 181 if ret is False: 182 print('Diag commands are not supported') 183 return 184 185 self.__dut.diag_start() 186 self.__ref.diag_start() 187 188 for frame in frames: 189 self.__test_send_formated_frame(self.__dut, self.__ref, 'TX ' + frame['name'], frame['psdu'], 100) 190 self.__test_send_formated_frame(self.__ref, self.__dut, 'RX ' + frame['name'], frame['psdu'], 100) 191 192 self.__ref.diag_stop() 193 self.__dut.diag_stop() 194 195 def test_csl(self): 196 """Test whether the DUT supports CSL transmitter.""" 197 self.__dataset = self.__get_default_dataset() 198 self.__test_csl_transmitter() 199 200 def test_data_poll(self): 201 """Test whether the DUT supports data poll parent and child.""" 202 self.__dataset = self.__get_default_dataset() 203 self.__test_data_poll_parent() 204 self.__test_data_poll_child() 205 206 def test_throughput(self): 207 """Test Thread network 1 hop throughput.""" 208 if not self.__dut.support_iperf3(): 209 print("The DUT doesn't support the tool iperf3") 210 return 211 212 if not self.__ref.support_iperf3(): 213 print("The reference device doesn't support the tool iperf3") 214 return 215 216 bitrate = 90000 217 length = 1232 218 transmit_time = 30 219 max_wait_time = 30 220 timeout = transmit_time + max_wait_time 221 222 self.__dut.factory_reset() 223 self.__ref.factory_reset() 224 225 dataset = self.__get_default_dataset() 226 227 self.__dut.join(dataset) 228 self.__dut.wait_for('state', 'leader') 229 230 self.__ref.set_router_selection_jitter(1) 231 self.__ref.join(dataset) 232 self.__ref.wait_for('state', ['child', 'router']) 233 234 ref_mleid = self.__ref.get_ipaddr_mleid() 235 236 ref_iperf3_server = threading.Thread(target=self.__ref_iperf3_server_task, 237 args=(ref_mleid, timeout), 238 daemon=True) 239 ref_iperf3_server.start() 240 self.__dut.wait(1) 241 242 results = self.__dut.iperf3_client(host=ref_mleid, bitrate=bitrate, transmit_time=transmit_time, length=length) 243 ref_iperf3_server.join() 244 245 if not results: 246 print('Failed to run the iperf3') 247 return 248 249 self.__output_format_string('Throughput', self.__bitrate_to_string(results['receiver']['bitrate'])) 250 251 def test_link_metrics(self): 252 """Test whether the DUT supports Link Metrics Initiator and Subject.""" 253 self.__dataset = self.__get_default_dataset() 254 255 self.__dut.factory_reset() 256 self.__ref.factory_reset() 257 258 self.__dut.join(self.__dataset) 259 self.__dut.wait_for('state', 'leader') 260 261 self.__ref.join(self.__dataset) 262 self.__ref.wait_for('state', ['child', 'router']) 263 264 test_case = 'Link Metrics Initiator' 265 ref_linklocal_address = self.__ref.get_ipaddr_linklocal() 266 ret = self.__run_link_metrics_test_commands(initiator=self.__dut, subject_address=ref_linklocal_address) 267 self.__output_format_bool(test_case, ret) 268 269 test_case = 'Link Metrics Subject' 270 dut_linklocal_address = self.__dut.get_ipaddr_linklocal() 271 ret = self.__run_link_metrics_test_commands(initiator=self.__ref, subject_address=dut_linklocal_address) 272 self.__output_format_bool(test_case, ret) 273 274 self.__ref.leave() 275 self.__dut.leave() 276 277 # 278 # Private methods 279 # 280 def __run_link_metrics_test_commands(self, initiator: OTCI, subject_address: Ip6Addr) -> bool: 281 seriesid = 1 282 series_flags = 'ldra' 283 link_metrics_flags = 'qr' 284 probe_length = 10 285 286 if not initiator.linkmetrics_config_enhanced_ack_register(subject_address, link_metrics_flags): 287 return False 288 289 if not initiator.linkmetrics_config_forward(subject_address, seriesid, series_flags, link_metrics_flags): 290 return False 291 292 initiator.linkmetrics_probe(subject_address, seriesid, probe_length) 293 294 results = initiator.linkmetrics_request_single(subject_address, link_metrics_flags) 295 if not ('lqi' in results.keys() and 'rssi' in results.keys()): 296 return False 297 298 results = initiator.linkmetrics_request_forward(subject_address, seriesid) 299 if not ('lqi' in results.keys() and 'rssi' in results.keys()): 300 return False 301 302 if not initiator.linkmetrics_config_enhanced_ack_clear(subject_address): 303 return False 304 305 return True 306 307 def __ref_iperf3_server_task(self, bind_address: str, timeout: int): 308 self.__ref.iperf3_server(bind_address, timeout=timeout) 309 310 def __bitrate_to_string(self, bitrate: float): 311 units = ['bits/sec', 'Kbits/sec', 'Mbits/sec', 'Gbits/sec', 'Tbits/sec'] 312 unit_index = 0 313 314 while bitrate >= 1000 and unit_index < len(units) - 1: 315 bitrate /= 1000 316 unit_index += 1 317 318 return f'{bitrate:.2f} {units[unit_index]}' 319 320 def __get_default_dataset(self): 321 return self.__dut.create_dataset(channel=20, network_key='00112233445566778899aabbccddcafe') 322 323 def __test_csl_transmitter(self): 324 packets = 10 325 326 self.__dut.factory_reset() 327 self.__ref.factory_reset() 328 329 self.__dut.join(self.__dataset) 330 self.__dut.wait_for('state', 'leader') 331 332 # Set the reference device as an SSED 333 self.__ref.set_mode('-') 334 self.__ref.config_csl(channel=15, period=320000, timeout=100) 335 self.__ref.join(self.__dataset) 336 self.__ref.wait_for('state', 'child') 337 338 child_table = self.__dut.get_child_table() 339 ret = len(child_table) == 1 and child_table[1]['csl'] 340 341 if ret: 342 ref_mleid = self.__ref.get_ipaddr_mleid() 343 result = self.__dut.ping(ref_mleid, count=packets, interval=1) 344 ret = result['transmitted_packets'] == result['received_packets'] == packets 345 346 self.__dut.leave() 347 self.__ref.leave() 348 349 self.__output_format_bool('CSL Transmitter', ret) 350 351 def __test_data_poll_parent(self): 352 packets = 10 353 354 self.__dut.factory_reset() 355 self.__ref.factory_reset() 356 357 self.__dut.join(self.__dataset) 358 self.__dut.wait_for('state', 'leader') 359 360 # Set the reference device as an SED 361 self.__ref.set_mode('-') 362 self.__ref.set_poll_period(500) 363 self.__ref.join(self.__dataset) 364 self.__ref.wait_for('state', 'child') 365 366 dut_mleid = self.__dut.get_ipaddr_mleid() 367 result = self.__ref.ping(dut_mleid, count=packets, interval=1) 368 369 self.__dut.leave() 370 self.__ref.leave() 371 372 ret = result['transmitted_packets'] == result['received_packets'] == packets 373 self.__output_format_bool('Data Poll Parent', ret) 374 375 def __test_data_poll_child(self): 376 packets = 10 377 378 self.__dut.factory_reset() 379 self.__ref.factory_reset() 380 381 self.__ref.join(self.__dataset) 382 self.__ref.wait_for('state', 'leader') 383 384 # Set the DUT as an SED 385 self.__dut.set_mode('-') 386 self.__dut.set_poll_period(500) 387 self.__dut.join(self.__dataset) 388 self.__dut.wait_for('state', 'child') 389 390 dut_mleid = self.__dut.get_ipaddr_mleid() 391 result = self.__ref.ping(dut_mleid, count=packets, interval=1) 392 393 self.__dut.leave() 394 self.__ref.leave() 395 396 ret = result['transmitted_packets'] == result['received_packets'] == packets 397 self.__output_format_bool('Data Poll Child', ret) 398 399 def __test_diag_channel(self): 400 channel = 20 401 commands = ['diag channel', f'diag channel {channel}'] 402 403 if self.__support_commands(commands): 404 self.__dut.diag_set_channel(channel) 405 value = self.__dut.diag_get_channel() 406 ret = value == channel 407 else: 408 ret = False 409 410 self.__output_results(commands, ret) 411 412 def __test_diag_power(self): 413 power = self.__get_dut_diag_power() 414 commands = ['diag power', f'diag power {power}'] 415 416 if self.__support_commands(commands): 417 self.__dut.diag_set_power(power) 418 value = self.__dut.diag_get_power() 419 ret = value == power 420 else: 421 ret = False 422 423 self.__output_results(commands, ret) 424 425 def __test_diag_radio(self): 426 commands = ['diag radio receive', 'diag radio sleep', 'diag radio state'] 427 428 if self.__support_commands(commands): 429 self.__dut.diag_radio_receive() 430 receive_state = self.__dut.diag_get_radio_state() 431 self.__dut.wait(0.1) 432 self.__dut.diag_radio_sleep() 433 sleep_state = self.__dut.diag_get_radio_state() 434 435 ret = sleep_state == 'sleep' and receive_state == 'receive' 436 else: 437 ret = False 438 439 self.__output_results(commands, ret) 440 441 def __test_diag_gpio_value(self): 442 gpio = self.__get_dut_diag_gpio() 443 commands = [f'diag gpio get {gpio}', f'diag gpio set {gpio} 0', f'diag gpio set {gpio} 1'] 444 445 if self.__support_commands(commands): 446 self.__dut.diag_set_gpio_value(gpio, 0) 447 value_0 = self.__dut.diag_get_gpio_value(gpio) 448 self.__dut.diag_set_gpio_value(gpio, 1) 449 value_1 = self.__dut.diag_get_gpio_value(gpio) 450 451 ret = value_0 == 0 and value_1 == 1 452 else: 453 ret = False 454 455 self.__output_results(commands, ret) 456 457 def __test_diag_gpio_mode(self): 458 gpio = self.__get_dut_diag_gpio() 459 commands = [f'diag gpio mode {gpio}', f'diag gpio mode {gpio} in', f'diag gpio mode {gpio} out'] 460 461 if self.__support_commands(commands): 462 self.__dut.diag_set_gpio_mode(gpio, 'in') 463 mode_in = self.__dut.diag_get_gpio_mode(gpio) 464 self.__dut.diag_set_gpio_value(gpio, 'out') 465 mode_out = self.__dut.diag_get_gpio_mode(gpio) 466 467 ret = mode_in == 'in' and mode_out == 'out' 468 else: 469 ret = False 470 471 self.__output_results(commands, ret) 472 473 def __test_diag_echo(self): 474 echo_msg = '0123456789' 475 cmd_diag_echo = f'diag echo {echo_msg}' 476 cmd_diag_echo_num = f'diag echo -n 10' 477 478 if self.__dut.is_command_supported(cmd_diag_echo): 479 reply = self.__dut.diag_echo(echo_msg) 480 ret = reply == echo_msg 481 else: 482 ret = False 483 self.__output_format_bool(cmd_diag_echo, ret) 484 485 if self.__dut.is_command_supported(cmd_diag_echo_num): 486 reply = self.__dut.diag_echo_number(10) 487 ret = reply == echo_msg 488 else: 489 ret = False 490 self.__output_format_bool(cmd_diag_echo_num, ret) 491 492 def __test_diag_utils(self): 493 commands = [ 494 'diag cw start', 'diag cw stop', 'diag stream start', 'diag stream stop', 'diag stats', 'diag stats clear' 495 ] 496 497 for command in commands: 498 ret = self.__dut.is_command_supported(command) 499 self.__output_format_bool(command, ret) 500 501 def __test_diag_rawpowersetting(self): 502 rawpowersetting = self.__get_dut_diag_raw_power_setting() 503 commands = [ 504 'diag rawpowersetting enable', f'diag rawpowersetting {rawpowersetting}', 'diag rawpowersetting', 505 'diag rawpowersetting disable' 506 ] 507 508 if self.__support_commands(commands): 509 self.__dut.diag_enable_rawpowersetting() 510 self.__dut.diag_set_rawpowersetting(rawpowersetting) 511 reply = self.__dut.diag_get_rawpowersetting() 512 self.__dut.diag_disable_rawpowersetting() 513 514 ret = reply == rawpowersetting 515 else: 516 ret = False 517 518 self.__output_results(commands, ret) 519 520 def __test_diag_powersettings(self): 521 commands = ['diag powersettings', 'diag powersettings 20'] 522 523 if self.__support_commands(commands): 524 powersettings = self.__dut.diag_get_powersettings() 525 ret = len(powersettings) > 0 526 else: 527 ret = False 528 529 self.__output_results(commands, ret) 530 531 def __test_diag_send(self): 532 packets = 100 533 threshold = 80 534 length = 64 535 channel = 20 536 commands = [f'diag send {packets} {length}', f'diag stats', f'diag stats clear'] 537 538 if self.__support_commands(commands): 539 self.__dut.wait(1) 540 self.__dut.diag_set_channel(channel) 541 self.__ref.diag_set_channel(channel) 542 self.__ref.diag_radio_receive() 543 544 self.__dut.diag_stats_clear() 545 self.__ref.diag_stats_clear() 546 547 self.__dut.diag_send(packets, length) 548 self.__dut.wait(1) 549 dut_stats = self.__dut.diag_get_stats() 550 ref_stats = self.__ref.diag_get_stats() 551 552 ret = dut_stats['sent_success_packets'] == packets and ref_stats['received_packets'] > threshold 553 else: 554 ret = False 555 556 self.__output_results(commands, ret) 557 558 def __test_diag_repeat(self): 559 delay = 10 560 threshold = 80 561 length = 64 562 channel = 20 563 cmd_diag_repeat = f'diag repeat {delay} {length}' 564 cmd_diag_repeat_stop = 'diag repeat stop' 565 commands = [cmd_diag_repeat, 'diag repeat stop', 'diag stats', 'diag stats clear'] 566 567 if self.__support_commands(commands): 568 self.__dut.diag_set_channel(channel) 569 self.__ref.diag_set_channel(channel) 570 self.__ref.diag_radio_receive() 571 572 self.__dut.diag_stats_clear() 573 self.__ref.diag_stats_clear() 574 575 self.__dut.diag_repeat(delay, length) 576 self.__dut.wait(1) 577 self.__dut.diag_repeat_stop() 578 dut_stats = self.__dut.diag_get_stats() 579 ref_stats = self.__ref.diag_get_stats() 580 581 ret = dut_stats['sent_success_packets'] > threshold and ref_stats['received_packets'] > threshold 582 else: 583 ret = False 584 585 self.__output_format_bool(cmd_diag_repeat, ret) 586 self.__output_format_bool(cmd_diag_repeat_stop, ret) 587 588 def __test_send_formated_frame(self, 589 sender: OTCI, 590 receiver: OTCI, 591 format_name: str, 592 frame: str, 593 align_length: int = DEFAULT_FORMAT_ALIGN_LENGTH): 594 packets = 100 595 threshold = 80 596 channel = 20 597 cmd_diag_frame = f'diag frame {frame}' 598 commands = [cmd_diag_frame, f'diag send {packets}', f'diag stats', f'diag stats clear'] 599 600 if self.__support_commands(commands): 601 sender.wait(1) 602 sender.diag_set_channel(channel) 603 receiver.diag_set_channel(channel) 604 receiver.diag_radio_receive() 605 606 sender.diag_stats_clear() 607 sender.diag_stats_clear() 608 609 sender.diag_frame(frame) 610 sender.diag_send(packets, None) 611 sender.wait(1) 612 sender_stats = sender.diag_get_stats() 613 receiver_stats = receiver.diag_get_stats() 614 615 ret = sender_stats['sent_success_packets'] == packets and receiver_stats['received_packets'] > threshold 616 else: 617 ret = False 618 619 self.__output_format_bool(format_name, ret, align_length) 620 621 def __test_diag_frame(self): 622 frame = '00010203040506070809' 623 cmd_diag_frame = f'diag frame {frame}' 624 625 self.__test_send_formated_frame(self.__dut, self.__ref, cmd_diag_frame, frame) 626 627 def __support_commands(self, commands: List[str]) -> bool: 628 ret = True 629 630 for command in commands: 631 if self.__dut.is_command_supported(command) is False: 632 ret = False 633 break 634 635 return ret 636 637 def __output_results(self, commands: List[str], support: bool): 638 for command in commands: 639 self.__output_format_bool(command, support) 640 641 def __get_dut_diag_power(self) -> int: 642 return int(os.getenv('DUT_DIAG_POWER', '10')) 643 644 def __get_dut_diag_gpio(self) -> int: 645 return int(os.getenv('DUT_DIAG_GPIO', '0')) 646 647 def __get_dut_diag_raw_power_setting(self) -> str: 648 return os.getenv('DUT_DIAG_RAW_POWER_SETTING', '112233') 649 650 def __connect_dut(self) -> OTCI: 651 if os.getenv('DUT_ADB_TCP'): 652 node = otci.connect_otbr_adb_tcp(os.getenv('DUT_ADB_TCP')) 653 elif os.getenv('DUT_ADB_USB'): 654 node = otci.connect_otbr_adb_usb(os.getenv('DUT_ADB_USB')) 655 elif os.getenv('DUT_CLI_SERIAL'): 656 node = otci.connect_cli_serial(os.getenv('DUT_CLI_SERIAL')) 657 elif os.getenv('DUT_SSH'): 658 node = otci.connect_otbr_ssh(os.getenv('DUT_SSH')) 659 else: 660 self.__fail("Please set DUT_ADB_TCP, DUT_ADB_USB, DUT_CLI_SERIAL or DUT_SSH to connect to the DUT device.") 661 662 return node 663 664 def __connect_reference_device(self) -> OTCI: 665 if os.getenv('REF_CLI_SERIAL'): 666 node = otci.connect_cli_serial(os.getenv('REF_CLI_SERIAL')) 667 elif os.getenv('REF_SSH'): 668 node = otci.connect_otbr_ssh(os.getenv('REF_SSH')) 669 elif os.getenv('REF_ADB_USB'): 670 node = otci.connect_otbr_adb_usb(os.getenv('REF_ADB_USB')) 671 else: 672 self.__fail("Please set REF_CLI_SERIAL, REF_SSH or REF_ADB_USB to connect to the reference device.") 673 674 return node 675 676 def __output_format_string(self, name: str, value: str, align_length: int = DEFAULT_FORMAT_ALIGN_LENGTH): 677 prefix = (name + ' ').ljust(align_length, '-') 678 print(f'{prefix} {value}') 679 680 def __output_format_bool(self, name: str, value: bool, align_length: int = DEFAULT_FORMAT_ALIGN_LENGTH): 681 self.__output_format_string(name, 'OK' if value else 'NotSupported', align_length) 682 683 def __fail(self, value: str): 684 print(f'{value}') 685 sys.exit(1) 686 687 688def parse_arguments(): 689 """Parse all arguments.""" 690 description_msg = 'This script is used for testing RCP capabilities.' 691 epilog_msg = textwrap.dedent( 692 'Device Interfaces:\r\n' 693 ' DUT_ADB_TCP=<device_ip> Connect to the DUT via adb tcp\r\n' 694 ' DUT_ADB_USB=<serial_number> Connect to the DUT via adb usb\r\n' 695 ' DUT_CLI_SERIAL=<serial_device> Connect to the DUT via cli serial port\r\n' 696 ' DUT_SSH=<device_ip> Connect to the DUT via ssh\r\n' 697 ' REF_ADB_USB=<serial_number> Connect to the reference device via adb usb\r\n' 698 ' REF_CLI_SERIAL=<serial_device> Connect to the reference device via cli serial port\r\n' 699 ' REF_SSH=<device_ip> Connect to the reference device via ssh\r\n' 700 '\r\n' 701 'Example:\r\n' 702 f' DUT_ADB_USB=1169UC2F2T0M95OR REF_CLI_SERIAL=/dev/ttyACM0 python3 {sys.argv[0]} -d\r\n') 703 704 parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter, 705 description=description_msg, 706 epilog=epilog_msg) 707 708 parser.add_argument( 709 '-c', 710 '--csl', 711 action='store_true', 712 default=False, 713 help='test whether the RCP supports CSL transmitter', 714 ) 715 716 parser.add_argument( 717 '-l', 718 '--link-metrics', 719 action='store_true', 720 default=False, 721 help='test whether the RCP supports link metrics', 722 ) 723 724 parser.add_argument( 725 '-d', 726 '--diag-commands', 727 action='store_true', 728 default=False, 729 help='test whether the RCP supports all diag commands', 730 ) 731 732 parser.add_argument( 733 '-f', 734 '--frame-format', 735 action='store_true', 736 default=False, 737 help='test whether the RCP supports 802.15.4 frames of all formats', 738 ) 739 740 parser.add_argument( 741 '-p', 742 '--data-poll', 743 action='store_true', 744 default=False, 745 help='test whether the RCP supports data poll', 746 ) 747 748 parser.add_argument( 749 '-t', 750 '--throughput', 751 action='store_true', 752 default=False, 753 help='test Thread network 1-hop throughput', 754 ) 755 756 parser.add_argument( 757 '-v', 758 '--version', 759 action='store_true', 760 default=False, 761 help='output version', 762 ) 763 764 parser.add_argument( 765 '-D', 766 '--debug', 767 action='store_true', 768 default=False, 769 help='output debug information', 770 ) 771 772 return parser.parse_args() 773 774 775def main(): 776 arguments = parse_arguments() 777 778 if arguments.version: 779 print(f'Version: {CP_CAPABILITY_VERSION}') 780 exit() 781 782 if arguments.debug: 783 logger = logging.getLogger() 784 logger.setLevel(logging.DEBUG) 785 786 rcp_caps = RcpCaps() 787 788 if arguments.diag_commands: 789 rcp_caps.test_diag_commands() 790 791 if arguments.csl: 792 rcp_caps.test_csl() 793 794 if arguments.data_poll: 795 rcp_caps.test_data_poll() 796 797 if arguments.link_metrics: 798 rcp_caps.test_link_metrics() 799 800 if arguments.throughput: 801 rcp_caps.test_throughput() 802 803 if arguments.frame_format: 804 rcp_caps.test_frame_format() 805 806 807if __name__ == '__main__': 808 main() 809