1#!/usr/bin/env python3
2#
3#  Copyright (c) 2024, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29
30import argparse
31import logging
32import os
33import sys
34import textwrap
35import threading
36
37from typing import List
38
39import otci
40from otci import OTCI
41from otci.types import Ip6Addr
42
43logging.basicConfig(level=logging.WARNING)
44
45
46class RcpCaps(object):
47    """
48    This class represents an OpenThread RCP capability test instance.
49    """
50
51    def __init__(self):
52        self.__dut = self.__connect_dut()
53        self.__ref = self.__connect_reference_device()
54
55    def test_diag_commands(self):
56        """Test all diag commands."""
57        self.__dut.factory_reset()
58        self.__ref.factory_reset()
59
60        ret = self.__dut.is_command_supported('diag start')
61        if ret is False:
62            print('All diag commands are not supported')
63            return
64
65        self.__dut.diag_start()
66        self.__ref.diag_start()
67
68        self.__test_diag_channel()
69        self.__test_diag_power()
70        self.__test_diag_radio()
71        self.__test_diag_repeat()
72        self.__test_diag_send()
73        self.__test_diag_frame()
74        self.__test_diag_echo()
75        self.__test_diag_utils()
76        self.__test_diag_rawpowersetting()
77        self.__test_diag_powersettings()
78        self.__test_diag_gpio_mode()
79        self.__test_diag_gpio_value()
80
81        self.__ref.diag_stop()
82        self.__dut.diag_stop()
83
84    def test_csl(self):
85        """Test whether the DUT supports CSL transmitter."""
86        self.__dataset = self.__get_default_dataset()
87        self.__test_csl_transmitter()
88
89    def test_data_poll(self):
90        """Test whether the DUT supports data poll parent and child."""
91        self.__dataset = self.__get_default_dataset()
92        self.__test_data_poll_parent()
93        self.__test_data_poll_child()
94
95    def test_throughput(self):
96        """Test Thread network 1 hop throughput."""
97        if not self.__dut.support_iperf3():
98            print("The DUT doesn't support the tool iperf3")
99            return
100
101        if not self.__ref.support_iperf3():
102            print("The reference device doesn't support the tool iperf3")
103            return
104
105        bitrate = 90000
106        length = 1232
107        transmit_time = 30
108        max_wait_time = 30
109        timeout = transmit_time + max_wait_time
110
111        self.__dut.factory_reset()
112        self.__ref.factory_reset()
113
114        dataset = self.__get_default_dataset()
115
116        self.__dut.join(dataset)
117        self.__dut.wait_for('state', 'leader')
118
119        self.__ref.set_router_selection_jitter(1)
120        self.__ref.join(dataset)
121        self.__ref.wait_for('state', ['child', 'router'])
122
123        ref_mleid = self.__ref.get_ipaddr_mleid()
124
125        ref_iperf3_server = threading.Thread(target=self.__ref_iperf3_server_task,
126                                             args=(ref_mleid, timeout),
127                                             daemon=True)
128        ref_iperf3_server.start()
129        self.__dut.wait(1)
130
131        results = self.__dut.iperf3_client(host=ref_mleid, bitrate=bitrate, transmit_time=transmit_time, length=length)
132        ref_iperf3_server.join()
133
134        if not results:
135            print('Failed to run the iperf3')
136            return
137
138        self.__output_format_string('Throughput', self.__bitrate_to_string(results['receiver']['bitrate']))
139
140    def test_link_metrics(self):
141        """Test whether the DUT supports Link Metrics Initiator and Subject."""
142        self.__dataset = self.__get_default_dataset()
143
144        self.__dut.factory_reset()
145        self.__ref.factory_reset()
146
147        self.__dut.join(self.__dataset)
148        self.__dut.wait_for('state', 'leader')
149
150        self.__ref.join(self.__dataset)
151        self.__ref.wait_for('state', ['child', 'router'])
152
153        test_case = 'Link Metrics Initiator'
154        ref_linklocal_address = self.__ref.get_ipaddr_linklocal()
155        ret = self.__run_link_metrics_test_commands(initiator=self.__dut, subject_address=ref_linklocal_address)
156        self.__output_format_bool(test_case, ret)
157
158        test_case = 'Link Metrics Subject'
159        dut_linklocal_address = self.__dut.get_ipaddr_linklocal()
160        ret = self.__run_link_metrics_test_commands(initiator=self.__ref, subject_address=dut_linklocal_address)
161        self.__output_format_bool(test_case, ret)
162
163        self.__ref.leave()
164        self.__dut.leave()
165
166    #
167    # Private methods
168    #
169    def __run_link_metrics_test_commands(self, initiator: OTCI, subject_address: Ip6Addr) -> bool:
170        seriesid = 1
171        series_flags = 'ldra'
172        link_metrics_flags = 'qr'
173        probe_length = 10
174
175        if not initiator.linkmetrics_config_enhanced_ack_register(subject_address, link_metrics_flags):
176            return False
177
178        if not initiator.linkmetrics_config_forward(subject_address, seriesid, series_flags, link_metrics_flags):
179            return False
180
181        initiator.linkmetrics_probe(subject_address, seriesid, probe_length)
182
183        results = initiator.linkmetrics_request_single(subject_address, link_metrics_flags)
184        if not ('lqi' in results.keys() and 'rssi' in results.keys()):
185            return False
186
187        results = initiator.linkmetrics_request_forward(subject_address, seriesid)
188        if not ('lqi' in results.keys() and 'rssi' in results.keys()):
189            return False
190
191        if not initiator.linkmetrics_config_enhanced_ack_clear(subject_address):
192            return False
193
194        return True
195
196    def __ref_iperf3_server_task(self, bind_address: str, timeout: int):
197        self.__ref.iperf3_server(bind_address, timeout=timeout)
198
199    def __bitrate_to_string(self, bitrate: float):
200        units = ['bits/sec', 'Kbits/sec', 'Mbits/sec', 'Gbits/sec', 'Tbits/sec']
201        unit_index = 0
202
203        while bitrate >= 1000 and unit_index < len(units) - 1:
204            bitrate /= 1000
205            unit_index += 1
206
207        return f'{bitrate:.2f} {units[unit_index]}'
208
209    def __get_default_dataset(self):
210        return self.__dut.create_dataset(channel=20, network_key='00112233445566778899aabbccddcafe')
211
212    def __test_csl_transmitter(self):
213        packets = 10
214
215        self.__dut.factory_reset()
216        self.__ref.factory_reset()
217
218        self.__dut.join(self.__dataset)
219        self.__dut.wait_for('state', 'leader')
220
221        # Set the reference device as an SSED
222        self.__ref.set_mode('-')
223        self.__ref.config_csl(channel=15, period=320000, timeout=100)
224        self.__ref.join(self.__dataset)
225        self.__ref.wait_for('state', 'child')
226
227        child_table = self.__dut.get_child_table()
228        ret = len(child_table) == 1 and child_table[1]['csl']
229
230        if ret:
231            ref_mleid = self.__ref.get_ipaddr_mleid()
232            result = self.__dut.ping(ref_mleid, count=packets, interval=1)
233            ret = result['transmitted_packets'] == result['received_packets'] == packets
234
235        self.__dut.leave()
236        self.__ref.leave()
237
238        self.__output_format_bool('CSL Transmitter', ret)
239
240    def __test_data_poll_parent(self):
241        packets = 10
242
243        self.__dut.factory_reset()
244        self.__ref.factory_reset()
245
246        self.__dut.join(self.__dataset)
247        self.__dut.wait_for('state', 'leader')
248
249        # Set the reference device as an SED
250        self.__ref.set_mode('-')
251        self.__ref.set_poll_period(500)
252        self.__ref.join(self.__dataset)
253        self.__ref.wait_for('state', 'child')
254
255        dut_mleid = self.__dut.get_ipaddr_mleid()
256        result = self.__ref.ping(dut_mleid, count=packets, interval=1)
257
258        self.__dut.leave()
259        self.__ref.leave()
260
261        ret = result['transmitted_packets'] == result['received_packets'] == packets
262        self.__output_format_bool('Data Poll Parent', ret)
263
264    def __test_data_poll_child(self):
265        packets = 10
266
267        self.__dut.factory_reset()
268        self.__ref.factory_reset()
269
270        self.__ref.join(self.__dataset)
271        self.__ref.wait_for('state', 'leader')
272
273        # Set the DUT as an SED
274        self.__dut.set_mode('-')
275        self.__dut.set_poll_period(500)
276        self.__dut.join(self.__dataset)
277        self.__dut.wait_for('state', 'child')
278
279        dut_mleid = self.__dut.get_ipaddr_mleid()
280        result = self.__ref.ping(dut_mleid, count=packets, interval=1)
281
282        self.__dut.leave()
283        self.__ref.leave()
284
285        ret = result['transmitted_packets'] == result['received_packets'] == packets
286        self.__output_format_bool('Data Poll Child', ret)
287
288    def __test_diag_channel(self):
289        channel = 20
290        commands = ['diag channel', f'diag channel {channel}']
291
292        if self.__support_commands(commands):
293            self.__dut.diag_set_channel(channel)
294            value = self.__dut.diag_get_channel()
295            ret = value == channel
296        else:
297            ret = False
298
299        self.__output_results(commands, ret)
300
301    def __test_diag_power(self):
302        power = self.__get_dut_diag_power()
303        commands = ['diag power', f'diag power {power}']
304
305        if self.__support_commands(commands):
306            self.__dut.diag_set_power(power)
307            value = self.__dut.diag_get_power()
308            ret = value == power
309        else:
310            ret = False
311
312        self.__output_results(commands, ret)
313
314    def __test_diag_radio(self):
315        commands = ['diag radio receive', 'diag radio sleep', 'diag radio state']
316
317        if self.__support_commands(commands):
318            self.__dut.diag_radio_receive()
319            receive_state = self.__dut.diag_get_radio_state()
320            self.__dut.wait(0.1)
321            self.__dut.diag_radio_sleep()
322            sleep_state = self.__dut.diag_get_radio_state()
323
324            ret = sleep_state == 'sleep' and receive_state == 'receive'
325        else:
326            ret = False
327
328        self.__output_results(commands, ret)
329
330    def __test_diag_gpio_value(self):
331        gpio = self.__get_dut_diag_gpio()
332        commands = [f'diag gpio get {gpio}', f'diag gpio set {gpio} 0', f'diag gpio set {gpio} 1']
333
334        if self.__support_commands(commands):
335            self.__dut.diag_set_gpio_value(gpio, 0)
336            value_0 = self.__dut.diag_get_gpio_value(gpio)
337            self.__dut.diag_set_gpio_value(gpio, 1)
338            value_1 = self.__dut.diag_get_gpio_value(gpio)
339
340            ret = value_0 == 0 and value_1 == 1
341        else:
342            ret = False
343
344        self.__output_results(commands, ret)
345
346    def __test_diag_gpio_mode(self):
347        gpio = self.__get_dut_diag_gpio()
348        commands = [f'diag gpio mode {gpio}', f'diag gpio mode {gpio} in', f'diag gpio mode {gpio} out']
349
350        if self.__support_commands(commands):
351            self.__dut.diag_set_gpio_mode(gpio, 'in')
352            mode_in = self.__dut.diag_get_gpio_mode(gpio)
353            self.__dut.diag_set_gpio_value(gpio, 'out')
354            mode_out = self.__dut.diag_get_gpio_mode(gpio)
355
356            ret = mode_in == 'in' and mode_out == 'out'
357        else:
358            ret = False
359
360        self.__output_results(commands, ret)
361
362    def __test_diag_echo(self):
363        echo_msg = '0123456789'
364        cmd_diag_echo = f'diag echo {echo_msg}'
365        cmd_diag_echo_num = f'diag echo -n 10'
366
367        if self.__dut.is_command_supported(cmd_diag_echo):
368            reply = self.__dut.diag_echo(echo_msg)
369            ret = reply == echo_msg
370        else:
371            ret = False
372        self.__output_format_bool(cmd_diag_echo, ret)
373
374        if self.__dut.is_command_supported(cmd_diag_echo_num):
375            reply = self.__dut.diag_echo_number(10)
376            ret = reply == echo_msg
377        else:
378            ret = False
379        self.__output_format_bool(cmd_diag_echo_num, ret)
380
381    def __test_diag_utils(self):
382        commands = [
383            'diag cw start', 'diag cw stop', 'diag stream start', 'diag stream stop', 'diag stats', 'diag stats clear'
384        ]
385
386        for command in commands:
387            ret = self.__dut.is_command_supported(command)
388            self.__output_format_bool(command, ret)
389
390    def __test_diag_rawpowersetting(self):
391        rawpowersetting = self.__get_dut_diag_raw_power_setting()
392        commands = [
393            'diag rawpowersetting enable', f'diag rawpowersetting {rawpowersetting}', 'diag rawpowersetting',
394            'diag rawpowersetting disable'
395        ]
396
397        if self.__support_commands(commands):
398            self.__dut.diag_enable_rawpowersetting()
399            self.__dut.diag_set_rawpowersetting(rawpowersetting)
400            reply = self.__dut.diag_get_rawpowersetting()
401            self.__dut.diag_disable_rawpowersetting()
402
403            ret = reply == rawpowersetting
404        else:
405            ret = False
406
407        self.__output_results(commands, ret)
408
409    def __test_diag_powersettings(self):
410        commands = ['diag powersettings', 'diag powersettings 20']
411
412        if self.__support_commands(commands):
413            powersettings = self.__dut.diag_get_powersettings()
414            ret = len(powersettings) > 0
415        else:
416            ret = False
417
418        self.__output_results(commands, ret)
419
420    def __test_diag_send(self):
421        packets = 100
422        threshold = 80
423        length = 64
424        channel = 20
425        commands = [f'diag send {packets} {length}', f'diag stats', f'diag stats clear']
426
427        if self.__support_commands(commands):
428            self.__dut.wait(1)
429            self.__dut.diag_set_channel(channel)
430            self.__ref.diag_set_channel(channel)
431            self.__ref.diag_radio_receive()
432
433            self.__dut.diag_stats_clear()
434            self.__ref.diag_stats_clear()
435
436            self.__dut.diag_send(packets, length)
437            self.__dut.wait(1)
438            dut_stats = self.__dut.diag_get_stats()
439            ref_stats = self.__ref.diag_get_stats()
440
441            ret = dut_stats['sent_packets'] == packets and ref_stats['received_packets'] > threshold
442        else:
443            ret = False
444
445        self.__output_results(commands, ret)
446
447    def __test_diag_repeat(self):
448        delay = 10
449        threshold = 80
450        length = 64
451        channel = 20
452        cmd_diag_repeat = f'diag repeat {delay} {length}'
453        cmd_diag_repeat_stop = 'diag repeat stop'
454        commands = [cmd_diag_repeat, 'diag repeat stop', 'diag stats', 'diag stats clear']
455
456        if self.__support_commands(commands):
457            self.__dut.diag_set_channel(channel)
458            self.__ref.diag_set_channel(channel)
459            self.__ref.diag_radio_receive()
460
461            self.__dut.diag_stats_clear()
462            self.__ref.diag_stats_clear()
463
464            self.__dut.diag_repeat(delay, length)
465            self.__dut.wait(1)
466            self.__dut.diag_repeat_stop()
467            dut_stats = self.__dut.diag_get_stats()
468            ref_stats = self.__ref.diag_get_stats()
469
470            ret = dut_stats['sent_packets'] > threshold and ref_stats['received_packets'] > threshold
471        else:
472            ret = False
473
474        self.__output_format_bool(cmd_diag_repeat, ret)
475        self.__output_format_bool(cmd_diag_repeat_stop, ret)
476
477    def __test_diag_frame(self):
478        packets = 100
479        threshold = 80
480        channel = 20
481        frame = '00010203040506070809'
482        cmd_diag_frame = f'diag frame {frame}'
483        commands = [cmd_diag_frame, f'diag send {packets}', f'diag stats', f'diag stats clear']
484
485        if self.__support_commands(commands):
486            self.__dut.wait(1)
487            self.__dut.diag_set_channel(channel)
488            self.__ref.diag_set_channel(channel)
489            self.__ref.diag_radio_receive()
490
491            self.__dut.diag_stats_clear()
492            self.__ref.diag_stats_clear()
493
494            self.__ref.diag_frame(frame)
495            self.__dut.diag_send(packets, None)
496            self.__dut.wait(1)
497            dut_stats = self.__dut.diag_get_stats()
498            ref_stats = self.__ref.diag_get_stats()
499
500            ret = dut_stats['sent_packets'] == packets and ref_stats['received_packets'] > threshold
501        else:
502            ret = False
503
504        self.__output_format_bool(cmd_diag_frame, ret)
505
506    def __support_commands(self, commands: List[str]) -> bool:
507        ret = True
508
509        for command in commands:
510            if self.__dut.is_command_supported(command) is False:
511                ret = False
512                break
513
514        return ret
515
516    def __output_results(self, commands: List[str], support: bool):
517        for command in commands:
518            self.__output_format_bool(command, support)
519
520    def __get_dut_diag_power(self) -> int:
521        return int(os.getenv('DUT_DIAG_POWER', '10'))
522
523    def __get_dut_diag_gpio(self) -> int:
524        return int(os.getenv('DUT_DIAG_GPIO', '0'))
525
526    def __get_dut_diag_raw_power_setting(self) -> str:
527        return os.getenv('DUT_DIAG_RAW_POWER_SETTING', '112233')
528
529    def __connect_dut(self) -> OTCI:
530        if os.getenv('DUT_ADB_TCP'):
531            node = otci.connect_otbr_adb_tcp(os.getenv('DUT_ADB_TCP'))
532        elif os.getenv('DUT_ADB_USB'):
533            node = otci.connect_otbr_adb_usb(os.getenv('DUT_ADB_USB'))
534        elif os.getenv('DUT_SSH'):
535            node = otci.connect_otbr_ssh(os.getenv('DUT_SSH'))
536        else:
537            self.__fail("Please set DUT_ADB_TCP, DUT_ADB_USB or DUT_SSH to connect to the DUT device.")
538
539        return node
540
541    def __connect_reference_device(self) -> OTCI:
542        if os.getenv('REF_CLI_SERIAL'):
543            node = otci.connect_cli_serial(os.getenv('REF_CLI_SERIAL'))
544        elif os.getenv('REF_SSH'):
545            node = otci.connect_otbr_ssh(os.getenv('REF_SSH'))
546        elif os.getenv('REF_ADB_USB'):
547            node = otci.connect_otbr_adb_usb(os.getenv('REF_ADB_USB'))
548        else:
549            self.__fail("Please set REF_CLI_SERIAL, REF_SSH or REF_ADB_USB to connect to the reference device.")
550
551        return node
552
553    def __output_format_string(self, name: str, value: str):
554        prefix = '{0:-<58}'.format('{} '.format(name))
555        print(f'{prefix} {value}')
556
557    def __output_format_bool(self, name: str, value: bool):
558        self.__output_format_string(name, 'OK' if value else 'NotSupported')
559
560    def __fail(self, value: str):
561        print(f'{value}')
562        sys.exit(1)
563
564
565def parse_arguments():
566    """Parse all arguments."""
567    description_msg = 'This script is used for testing RCP capabilities.'
568    epilog_msg = textwrap.dedent(
569        'Device Interfaces:\r\n'
570        '  DUT_SSH=<device_ip>            Connect to the DUT via ssh\r\n'
571        '  DUT_ADB_TCP=<device_ip>        Connect to the DUT via adb tcp\r\n'
572        '  DUT_ADB_USB=<serial_number>    Connect to the DUT via adb usb\r\n'
573        '  REF_CLI_SERIAL=<serial_device> Connect to the reference device via cli serial port\r\n'
574        '  REF_ADB_USB=<serial_number>    Connect to the reference device via adb usb\r\n'
575        '  REF_SSH=<device_ip>            Connect to the reference device via ssh\r\n'
576        '\r\n'
577        'Example:\r\n'
578        f'  DUT_ADB_USB=1169UC2F2T0M95OR REF_CLI_SERIAL=/dev/ttyACM0 python3 {sys.argv[0]} -d\r\n')
579
580    parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,
581                                     description=description_msg,
582                                     epilog=epilog_msg)
583
584    parser.add_argument(
585        '-c',
586        '--csl',
587        action='store_true',
588        default=False,
589        help='test whether the RCP supports CSL transmitter',
590    )
591
592    parser.add_argument(
593        '-l',
594        '--link-metrics',
595        action='store_true',
596        default=False,
597        help='test whether the RCP supports link metrics',
598    )
599
600    parser.add_argument(
601        '-d',
602        '--diag-commands',
603        action='store_true',
604        default=False,
605        help='test whether the RCP supports all diag commands',
606    )
607
608    parser.add_argument(
609        '-p',
610        '--data-poll',
611        action='store_true',
612        default=False,
613        help='test whether the RCP supports data poll',
614    )
615
616    parser.add_argument(
617        '-t',
618        '--throughput',
619        action='store_true',
620        default=False,
621        help='test Thread network 1-hop throughput',
622    )
623
624    parser.add_argument(
625        '-v',
626        '--verbose',
627        action='store_true',
628        default=False,
629        help='output verbose information',
630    )
631
632    return parser.parse_args()
633
634
635def main():
636    arguments = parse_arguments()
637
638    if arguments.verbose is True:
639        logger = logging.getLogger()
640        logger.setLevel(logging.DEBUG)
641
642    rcp_caps = RcpCaps()
643
644    if arguments.diag_commands is True:
645        rcp_caps.test_diag_commands()
646
647    if arguments.csl is True:
648        rcp_caps.test_csl()
649
650    if arguments.data_poll is True:
651        rcp_caps.test_data_poll()
652
653    if arguments.link_metrics is True:
654        rcp_caps.test_link_metrics()
655
656    if arguments.throughput:
657        rcp_caps.test_throughput()
658
659
660if __name__ == '__main__':
661    main()
662