1#!/usr/bin/env python3
2#
3#  Copyright (c) 2024, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29
30import argparse
31import logging
32import os
33import sys
34import textwrap
35import threading
36
37from typing import List
38
39import otci
40from otci import OTCI
41from otci.types import Ip6Addr
42
43CP_CAPABILITY_VERSION = "0.1.1-dev"
44
45logging.basicConfig(level=logging.WARNING)
46
47
48class RcpCaps(object):
49    """
50    This class represents an OpenThread RCP capability test instance.
51    """
52
53    DEFAULT_FORMAT_ALIGN_LENGTH = 58  # The default formatted string alignment length
54
55    def __init__(self):
56        self.__dut = self.__connect_dut()
57        self.__ref = self.__connect_reference_device()
58
59    def test_diag_commands(self):
60        """Test all diag commands."""
61        self.__dut.factory_reset()
62        self.__ref.factory_reset()
63
64        ret = self.__dut.is_command_supported('diag start')
65        if ret is False:
66            print('All diag commands are not supported')
67            return
68
69        self.__dut.diag_start()
70        self.__ref.diag_start()
71
72        self.__test_diag_channel()
73        self.__test_diag_power()
74        self.__test_diag_radio()
75        self.__test_diag_repeat()
76        self.__test_diag_send()
77        self.__test_diag_frame()
78        self.__test_diag_echo()
79        self.__test_diag_utils()
80        self.__test_diag_rawpowersetting()
81        self.__test_diag_powersettings()
82        self.__test_diag_gpio_mode()
83        self.__test_diag_gpio_value()
84
85        self.__ref.diag_stop()
86        self.__dut.diag_stop()
87
88    def test_frame_format(self):
89        """Test whether the DUT supports sending and receiving 802.15.4 frames of all formats."""
90        frames = [
91            {
92                'name': 'ver:2003,Cmd,seq,dst[addr:short,pan:id],src[addr:no,pan:no],sec:no,ie:no,plen:0',
93                'psdu': '030800ffffffff070000'
94            },
95            {
96                'name': 'ver:2003,Bcon,seq,dst[addr:no,pan:no],src[addr:extd,pan:id],sec:no,ie:no,plen:30',
97                'psdu': '00c000eeee0102030405060708ff0f000003514f70656e54687265616400000000000001020304050607080000'
98            },
99            {
100                'name': 'ver:2006,Cmd,seq,dst[addr:short,pan:id],src[addr:short,pan:no],sec:l5,ie:no,plen:0',
101                'psdu': '4b98ddddddaaaabbbb0d708001020304050607081565'
102            },
103            {
104                'name': 'ver:2006,Cmd,seq,dst[addr:extd,pan:id],src[addr:extd,pan:no],sec:l5,ie:no,plen:0',
105                'psdu': '4bdcdddddd102030405060708001020304050607080d6e54687265046400820ee803'
106            },
107            {
108                'name': 'ver:2006,Data,seq,dst[addr:extd,pan:id],src[addr:extd,pan:id],sec:no,ie:no,plen:0',
109                'psdu': '01dcdddddd1020304050607080000001020304050607085468'
110            },
111            {
112                'name': 'ver:2006,Data,seq,dst[addr:short,pan:id],src[addr:short,pan:id],sec:no,ie:no,plen:0',
113                'psdu': '0198ddddddaaaaeeeebbbb7080'
114            },
115            {
116                'name': 'ver:2006,Data,seq,dst[addr:extd,pan:id],src[addr:no,pan:no],sec:no,ie:no,plen:0',
117                'psdu': '011cdddddd10203040506070800000'
118            },
119            {
120                'name': 'ver:2006,Data,seq,dst[addr:short,pan:id],src[addr:no,pan:no],sec:no,ie:no,plen:0',
121                'psdu': '0118ddddddaaaa3040'
122            },
123            {
124                'name': 'ver:2015,Data,seq,dst[addr:no,pan:no],src[addr:no,pan:no],sec:no,ie:no,plen:0',
125                'psdu': '0120dddddd'
126            },
127            {
128                'name': 'ver:2015,Data,seq,dst[addr:no,pan:id],src[addr:no,pan:no],sec:no,ie:no,plen:0',
129                'psdu': '4120ddddddaaaa'
130            },
131            {
132                'name': 'ver:2015,Data,seq,dst[addr:extd,pan:id],src[addr:no,pan:no],sec:no,ie:no,plen:0',
133                'psdu': '012cdddddd10203040506070800000'
134            },
135            {
136                'name': 'ver:2015,Data,seq,dst[addr:extd,pan:no],src[addr:no,pan:no],sec:no,ie:no,plen:0',
137                'psdu': '412cdd10203040506070807080'
138            },
139            {
140                'name': 'ver:2015,Data,seq,dst[addr:no,pan:no],src[addr:extd,pan:id],sec:no,ie:no,plen:0',
141                'psdu': '01e0ddeeee01020304050607080000'
142            },
143            {
144                'name': 'ver:2015,Data,seq,dst[addr:no,pan:no],src[addr:extd,pan:no],sec:no,ie:no,plen:0',
145                'psdu': '41e0dd01020304050607080708'
146            },
147            {
148                'name': 'ver:2015,Data,seq,dst[addr:extd,pan:id],src[addr:extd,pan:no],sec:no,ie:no,plen:0',
149                'psdu': '01ecdddddd102030405060708001020304050607080708'
150            },
151            {
152                'name': 'ver:2015,Data,seq,dst[addr:extd,pan:no],src[addr:extd,pan:no],sec:no,ie:no,plen:0',
153                'psdu': '41ecdd102030405060708001020304050607080708'
154            },
155            {
156                'name': 'ver:2015,Data,seq,dst[addr:short,pan:id],src[addr:short,pan:id],sec:no,ie:no,plen:0',
157                'psdu': '01a8ddddddaaaaeeeebbbb0102'
158            },
159            {
160                'name': 'ver:2015,Data,seq,dst[addr:short,pan:id],src[addr:extd,pan:id],sec:no,ie:no,plen:0',
161                'psdu': '01e8ddddddaaaaeeee01020304050607080708'
162            },
163            {
164                'name': 'ver:2015,Data,seq,dst[addr:extd,pan:id],src[addr:short,pan:id],sec:no,ie:no,plen:0',
165                'psdu': '01acdddddd1020304050607080eeeebbbb0708'
166            },
167            {
168                'name': 'ver:2015,Data,seq,dst[addr:short,pan:id],src[addr:short,pan:id],sec:no,ie[csl],plen:0',
169                'psdu': '01aaddddddaaaaeeeebbbb040dc800e8030708'
170            },
171            {
172                'name': 'ver:2015,Data,noseq,dst[addr:short,pan:id],src[addr:short,pan:id],sec:no,ie:no,plen:0',
173                'psdu': '01a9ddddaaaaeeeebbbbbb04'
174            },
175        ]
176
177        self.__dut.factory_reset()
178        self.__ref.factory_reset()
179
180        ret = self.__dut.is_command_supported('diag start')
181        if ret is False:
182            print('Diag commands are not supported')
183            return
184
185        self.__dut.diag_start()
186        self.__ref.diag_start()
187
188        for frame in frames:
189            self.__test_send_formated_frame(self.__dut, self.__ref, 'TX ' + frame['name'], frame['psdu'], 100)
190            self.__test_send_formated_frame(self.__ref, self.__dut, 'RX ' + frame['name'], frame['psdu'], 100)
191
192        self.__ref.diag_stop()
193        self.__dut.diag_stop()
194
195    def test_csl(self):
196        """Test whether the DUT supports CSL transmitter."""
197        self.__dataset = self.__get_default_dataset()
198        self.__test_csl_transmitter()
199
200    def test_data_poll(self):
201        """Test whether the DUT supports data poll parent and child."""
202        self.__dataset = self.__get_default_dataset()
203        self.__test_data_poll_parent()
204        self.__test_data_poll_child()
205
206    def test_throughput(self):
207        """Test Thread network 1 hop throughput."""
208        if not self.__dut.support_iperf3():
209            print("The DUT doesn't support the tool iperf3")
210            return
211
212        if not self.__ref.support_iperf3():
213            print("The reference device doesn't support the tool iperf3")
214            return
215
216        bitrate = 90000
217        length = 1232
218        transmit_time = 30
219        max_wait_time = 30
220        timeout = transmit_time + max_wait_time
221
222        self.__dut.factory_reset()
223        self.__ref.factory_reset()
224
225        dataset = self.__get_default_dataset()
226
227        self.__dut.join(dataset)
228        self.__dut.wait_for('state', 'leader')
229
230        self.__ref.set_router_selection_jitter(1)
231        self.__ref.join(dataset)
232        self.__ref.wait_for('state', ['child', 'router'])
233
234        ref_mleid = self.__ref.get_ipaddr_mleid()
235
236        ref_iperf3_server = threading.Thread(target=self.__ref_iperf3_server_task,
237                                             args=(ref_mleid, timeout),
238                                             daemon=True)
239        ref_iperf3_server.start()
240        self.__dut.wait(1)
241
242        results = self.__dut.iperf3_client(host=ref_mleid, bitrate=bitrate, transmit_time=transmit_time, length=length)
243        ref_iperf3_server.join()
244
245        if not results:
246            print('Failed to run the iperf3')
247            return
248
249        self.__output_format_string('Throughput', self.__bitrate_to_string(results['receiver']['bitrate']))
250
251    def test_link_metrics(self):
252        """Test whether the DUT supports Link Metrics Initiator and Subject."""
253        self.__dataset = self.__get_default_dataset()
254
255        self.__dut.factory_reset()
256        self.__ref.factory_reset()
257
258        self.__dut.join(self.__dataset)
259        self.__dut.wait_for('state', 'leader')
260
261        self.__ref.join(self.__dataset)
262        self.__ref.wait_for('state', ['child', 'router'])
263
264        test_case = 'Link Metrics Initiator'
265        ref_linklocal_address = self.__ref.get_ipaddr_linklocal()
266        ret = self.__run_link_metrics_test_commands(initiator=self.__dut, subject_address=ref_linklocal_address)
267        self.__output_format_bool(test_case, ret)
268
269        test_case = 'Link Metrics Subject'
270        dut_linklocal_address = self.__dut.get_ipaddr_linklocal()
271        ret = self.__run_link_metrics_test_commands(initiator=self.__ref, subject_address=dut_linklocal_address)
272        self.__output_format_bool(test_case, ret)
273
274        self.__ref.leave()
275        self.__dut.leave()
276
277    #
278    # Private methods
279    #
280    def __run_link_metrics_test_commands(self, initiator: OTCI, subject_address: Ip6Addr) -> bool:
281        seriesid = 1
282        series_flags = 'ldra'
283        link_metrics_flags = 'qr'
284        probe_length = 10
285
286        if not initiator.linkmetrics_config_enhanced_ack_register(subject_address, link_metrics_flags):
287            return False
288
289        if not initiator.linkmetrics_config_forward(subject_address, seriesid, series_flags, link_metrics_flags):
290            return False
291
292        initiator.linkmetrics_probe(subject_address, seriesid, probe_length)
293
294        results = initiator.linkmetrics_request_single(subject_address, link_metrics_flags)
295        if not ('lqi' in results.keys() and 'rssi' in results.keys()):
296            return False
297
298        results = initiator.linkmetrics_request_forward(subject_address, seriesid)
299        if not ('lqi' in results.keys() and 'rssi' in results.keys()):
300            return False
301
302        if not initiator.linkmetrics_config_enhanced_ack_clear(subject_address):
303            return False
304
305        return True
306
307    def __ref_iperf3_server_task(self, bind_address: str, timeout: int):
308        self.__ref.iperf3_server(bind_address, timeout=timeout)
309
310    def __bitrate_to_string(self, bitrate: float):
311        units = ['bits/sec', 'Kbits/sec', 'Mbits/sec', 'Gbits/sec', 'Tbits/sec']
312        unit_index = 0
313
314        while bitrate >= 1000 and unit_index < len(units) - 1:
315            bitrate /= 1000
316            unit_index += 1
317
318        return f'{bitrate:.2f} {units[unit_index]}'
319
320    def __get_default_dataset(self):
321        return self.__dut.create_dataset(channel=20, network_key='00112233445566778899aabbccddcafe')
322
323    def __test_csl_transmitter(self):
324        packets = 10
325
326        self.__dut.factory_reset()
327        self.__ref.factory_reset()
328
329        self.__dut.join(self.__dataset)
330        self.__dut.wait_for('state', 'leader')
331
332        # Set the reference device as an SSED
333        self.__ref.set_mode('-')
334        self.__ref.config_csl(channel=15, period=320000, timeout=100)
335        self.__ref.join(self.__dataset)
336        self.__ref.wait_for('state', 'child')
337
338        child_table = self.__dut.get_child_table()
339        ret = len(child_table) == 1 and child_table[1]['csl']
340
341        if ret:
342            ref_mleid = self.__ref.get_ipaddr_mleid()
343            result = self.__dut.ping(ref_mleid, count=packets, interval=1)
344            ret = result['transmitted_packets'] == result['received_packets'] == packets
345
346        self.__dut.leave()
347        self.__ref.leave()
348
349        self.__output_format_bool('CSL Transmitter', ret)
350
351    def __test_data_poll_parent(self):
352        packets = 10
353
354        self.__dut.factory_reset()
355        self.__ref.factory_reset()
356
357        self.__dut.join(self.__dataset)
358        self.__dut.wait_for('state', 'leader')
359
360        # Set the reference device as an SED
361        self.__ref.set_mode('-')
362        self.__ref.set_poll_period(500)
363        self.__ref.join(self.__dataset)
364        self.__ref.wait_for('state', 'child')
365
366        dut_mleid = self.__dut.get_ipaddr_mleid()
367        result = self.__ref.ping(dut_mleid, count=packets, interval=1)
368
369        self.__dut.leave()
370        self.__ref.leave()
371
372        ret = result['transmitted_packets'] == result['received_packets'] == packets
373        self.__output_format_bool('Data Poll Parent', ret)
374
375    def __test_data_poll_child(self):
376        packets = 10
377
378        self.__dut.factory_reset()
379        self.__ref.factory_reset()
380
381        self.__ref.join(self.__dataset)
382        self.__ref.wait_for('state', 'leader')
383
384        # Set the DUT as an SED
385        self.__dut.set_mode('-')
386        self.__dut.set_poll_period(500)
387        self.__dut.join(self.__dataset)
388        self.__dut.wait_for('state', 'child')
389
390        dut_mleid = self.__dut.get_ipaddr_mleid()
391        result = self.__ref.ping(dut_mleid, count=packets, interval=1)
392
393        self.__dut.leave()
394        self.__ref.leave()
395
396        ret = result['transmitted_packets'] == result['received_packets'] == packets
397        self.__output_format_bool('Data Poll Child', ret)
398
399    def __test_diag_channel(self):
400        channel = 20
401        commands = ['diag channel', f'diag channel {channel}']
402
403        if self.__support_commands(commands):
404            self.__dut.diag_set_channel(channel)
405            value = self.__dut.diag_get_channel()
406            ret = value == channel
407        else:
408            ret = False
409
410        self.__output_results(commands, ret)
411
412    def __test_diag_power(self):
413        power = self.__get_dut_diag_power()
414        commands = ['diag power', f'diag power {power}']
415
416        if self.__support_commands(commands):
417            self.__dut.diag_set_power(power)
418            value = self.__dut.diag_get_power()
419            ret = value == power
420        else:
421            ret = False
422
423        self.__output_results(commands, ret)
424
425    def __test_diag_radio(self):
426        commands = ['diag radio receive', 'diag radio sleep', 'diag radio state']
427
428        if self.__support_commands(commands):
429            self.__dut.diag_radio_receive()
430            receive_state = self.__dut.diag_get_radio_state()
431            self.__dut.wait(0.1)
432            self.__dut.diag_radio_sleep()
433            sleep_state = self.__dut.diag_get_radio_state()
434
435            ret = sleep_state == 'sleep' and receive_state == 'receive'
436        else:
437            ret = False
438
439        self.__output_results(commands, ret)
440
441    def __test_diag_gpio_value(self):
442        gpio = self.__get_dut_diag_gpio()
443        commands = [f'diag gpio get {gpio}', f'diag gpio set {gpio} 0', f'diag gpio set {gpio} 1']
444
445        if self.__support_commands(commands):
446            self.__dut.diag_set_gpio_value(gpio, 0)
447            value_0 = self.__dut.diag_get_gpio_value(gpio)
448            self.__dut.diag_set_gpio_value(gpio, 1)
449            value_1 = self.__dut.diag_get_gpio_value(gpio)
450
451            ret = value_0 == 0 and value_1 == 1
452        else:
453            ret = False
454
455        self.__output_results(commands, ret)
456
457    def __test_diag_gpio_mode(self):
458        gpio = self.__get_dut_diag_gpio()
459        commands = [f'diag gpio mode {gpio}', f'diag gpio mode {gpio} in', f'diag gpio mode {gpio} out']
460
461        if self.__support_commands(commands):
462            self.__dut.diag_set_gpio_mode(gpio, 'in')
463            mode_in = self.__dut.diag_get_gpio_mode(gpio)
464            self.__dut.diag_set_gpio_value(gpio, 'out')
465            mode_out = self.__dut.diag_get_gpio_mode(gpio)
466
467            ret = mode_in == 'in' and mode_out == 'out'
468        else:
469            ret = False
470
471        self.__output_results(commands, ret)
472
473    def __test_diag_echo(self):
474        echo_msg = '0123456789'
475        cmd_diag_echo = f'diag echo {echo_msg}'
476        cmd_diag_echo_num = f'diag echo -n 10'
477
478        if self.__dut.is_command_supported(cmd_diag_echo):
479            reply = self.__dut.diag_echo(echo_msg)
480            ret = reply == echo_msg
481        else:
482            ret = False
483        self.__output_format_bool(cmd_diag_echo, ret)
484
485        if self.__dut.is_command_supported(cmd_diag_echo_num):
486            reply = self.__dut.diag_echo_number(10)
487            ret = reply == echo_msg
488        else:
489            ret = False
490        self.__output_format_bool(cmd_diag_echo_num, ret)
491
492    def __test_diag_utils(self):
493        commands = [
494            'diag cw start', 'diag cw stop', 'diag stream start', 'diag stream stop', 'diag stats', 'diag stats clear'
495        ]
496
497        for command in commands:
498            ret = self.__dut.is_command_supported(command)
499            self.__output_format_bool(command, ret)
500
501    def __test_diag_rawpowersetting(self):
502        rawpowersetting = self.__get_dut_diag_raw_power_setting()
503        commands = [
504            'diag rawpowersetting enable', f'diag rawpowersetting {rawpowersetting}', 'diag rawpowersetting',
505            'diag rawpowersetting disable'
506        ]
507
508        if self.__support_commands(commands):
509            self.__dut.diag_enable_rawpowersetting()
510            self.__dut.diag_set_rawpowersetting(rawpowersetting)
511            reply = self.__dut.diag_get_rawpowersetting()
512            self.__dut.diag_disable_rawpowersetting()
513
514            ret = reply == rawpowersetting
515        else:
516            ret = False
517
518        self.__output_results(commands, ret)
519
520    def __test_diag_powersettings(self):
521        commands = ['diag powersettings', 'diag powersettings 20']
522
523        if self.__support_commands(commands):
524            powersettings = self.__dut.diag_get_powersettings()
525            ret = len(powersettings) > 0
526        else:
527            ret = False
528
529        self.__output_results(commands, ret)
530
531    def __test_diag_send(self):
532        packets = 100
533        threshold = 80
534        length = 64
535        channel = 20
536        commands = [f'diag send {packets} {length}', f'diag stats', f'diag stats clear']
537
538        if self.__support_commands(commands):
539            self.__dut.wait(1)
540            self.__dut.diag_set_channel(channel)
541            self.__ref.diag_set_channel(channel)
542            self.__ref.diag_radio_receive()
543
544            self.__dut.diag_stats_clear()
545            self.__ref.diag_stats_clear()
546
547            self.__dut.diag_send(packets, length)
548            self.__dut.wait(1)
549            dut_stats = self.__dut.diag_get_stats()
550            ref_stats = self.__ref.diag_get_stats()
551
552            ret = dut_stats['sent_success_packets'] == packets and ref_stats['received_packets'] > threshold
553        else:
554            ret = False
555
556        self.__output_results(commands, ret)
557
558    def __test_diag_repeat(self):
559        delay = 10
560        threshold = 80
561        length = 64
562        channel = 20
563        cmd_diag_repeat = f'diag repeat {delay} {length}'
564        cmd_diag_repeat_stop = 'diag repeat stop'
565        commands = [cmd_diag_repeat, 'diag repeat stop', 'diag stats', 'diag stats clear']
566
567        if self.__support_commands(commands):
568            self.__dut.diag_set_channel(channel)
569            self.__ref.diag_set_channel(channel)
570            self.__ref.diag_radio_receive()
571
572            self.__dut.diag_stats_clear()
573            self.__ref.diag_stats_clear()
574
575            self.__dut.diag_repeat(delay, length)
576            self.__dut.wait(1)
577            self.__dut.diag_repeat_stop()
578            dut_stats = self.__dut.diag_get_stats()
579            ref_stats = self.__ref.diag_get_stats()
580
581            ret = dut_stats['sent_success_packets'] > threshold and ref_stats['received_packets'] > threshold
582        else:
583            ret = False
584
585        self.__output_format_bool(cmd_diag_repeat, ret)
586        self.__output_format_bool(cmd_diag_repeat_stop, ret)
587
588    def __test_send_formated_frame(self,
589                                   sender: OTCI,
590                                   receiver: OTCI,
591                                   format_name: str,
592                                   frame: str,
593                                   align_length: int = DEFAULT_FORMAT_ALIGN_LENGTH):
594        packets = 100
595        threshold = 80
596        channel = 20
597        cmd_diag_frame = f'diag frame {frame}'
598        commands = [cmd_diag_frame, f'diag send {packets}', f'diag stats', f'diag stats clear']
599
600        if self.__support_commands(commands):
601            sender.wait(1)
602            sender.diag_set_channel(channel)
603            receiver.diag_set_channel(channel)
604            receiver.diag_radio_receive()
605
606            sender.diag_stats_clear()
607            sender.diag_stats_clear()
608
609            sender.diag_frame(frame)
610            sender.diag_send(packets, None)
611            sender.wait(1)
612            sender_stats = sender.diag_get_stats()
613            receiver_stats = receiver.diag_get_stats()
614
615            ret = sender_stats['sent_success_packets'] == packets and receiver_stats['received_packets'] > threshold
616        else:
617            ret = False
618
619        self.__output_format_bool(format_name, ret, align_length)
620
621    def __test_diag_frame(self):
622        frame = '00010203040506070809'
623        cmd_diag_frame = f'diag frame {frame}'
624
625        self.__test_send_formated_frame(self.__dut, self.__ref, cmd_diag_frame, frame)
626
627    def __support_commands(self, commands: List[str]) -> bool:
628        ret = True
629
630        for command in commands:
631            if self.__dut.is_command_supported(command) is False:
632                ret = False
633                break
634
635        return ret
636
637    def __output_results(self, commands: List[str], support: bool):
638        for command in commands:
639            self.__output_format_bool(command, support)
640
641    def __get_dut_diag_power(self) -> int:
642        return int(os.getenv('DUT_DIAG_POWER', '10'))
643
644    def __get_dut_diag_gpio(self) -> int:
645        return int(os.getenv('DUT_DIAG_GPIO', '0'))
646
647    def __get_dut_diag_raw_power_setting(self) -> str:
648        return os.getenv('DUT_DIAG_RAW_POWER_SETTING', '112233')
649
650    def __connect_dut(self) -> OTCI:
651        if os.getenv('DUT_ADB_TCP'):
652            node = otci.connect_otbr_adb_tcp(os.getenv('DUT_ADB_TCP'))
653        elif os.getenv('DUT_ADB_USB'):
654            node = otci.connect_otbr_adb_usb(os.getenv('DUT_ADB_USB'))
655        elif os.getenv('DUT_CLI_SERIAL'):
656            node = otci.connect_cli_serial(os.getenv('DUT_CLI_SERIAL'))
657        elif os.getenv('DUT_SSH'):
658            node = otci.connect_otbr_ssh(os.getenv('DUT_SSH'))
659        else:
660            self.__fail("Please set DUT_ADB_TCP, DUT_ADB_USB, DUT_CLI_SERIAL or DUT_SSH to connect to the DUT device.")
661
662        return node
663
664    def __connect_reference_device(self) -> OTCI:
665        if os.getenv('REF_CLI_SERIAL'):
666            node = otci.connect_cli_serial(os.getenv('REF_CLI_SERIAL'))
667        elif os.getenv('REF_SSH'):
668            node = otci.connect_otbr_ssh(os.getenv('REF_SSH'))
669        elif os.getenv('REF_ADB_USB'):
670            node = otci.connect_otbr_adb_usb(os.getenv('REF_ADB_USB'))
671        else:
672            self.__fail("Please set REF_CLI_SERIAL, REF_SSH or REF_ADB_USB to connect to the reference device.")
673
674        return node
675
676    def __output_format_string(self, name: str, value: str, align_length: int = DEFAULT_FORMAT_ALIGN_LENGTH):
677        prefix = (name + ' ').ljust(align_length, '-')
678        print(f'{prefix} {value}')
679
680    def __output_format_bool(self, name: str, value: bool, align_length: int = DEFAULT_FORMAT_ALIGN_LENGTH):
681        self.__output_format_string(name, 'OK' if value else 'NotSupported', align_length)
682
683    def __fail(self, value: str):
684        print(f'{value}')
685        sys.exit(1)
686
687
688def parse_arguments():
689    """Parse all arguments."""
690    description_msg = 'This script is used for testing RCP capabilities.'
691    epilog_msg = textwrap.dedent(
692        'Device Interfaces:\r\n'
693        '  DUT_ADB_TCP=<device_ip>        Connect to the DUT via adb tcp\r\n'
694        '  DUT_ADB_USB=<serial_number>    Connect to the DUT via adb usb\r\n'
695        '  DUT_CLI_SERIAL=<serial_device> Connect to the DUT via cli serial port\r\n'
696        '  DUT_SSH=<device_ip>            Connect to the DUT via ssh\r\n'
697        '  REF_ADB_USB=<serial_number>    Connect to the reference device via adb usb\r\n'
698        '  REF_CLI_SERIAL=<serial_device> Connect to the reference device via cli serial port\r\n'
699        '  REF_SSH=<device_ip>            Connect to the reference device via ssh\r\n'
700        '\r\n'
701        'Example:\r\n'
702        f'  DUT_ADB_USB=1169UC2F2T0M95OR REF_CLI_SERIAL=/dev/ttyACM0 python3 {sys.argv[0]} -d\r\n')
703
704    parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,
705                                     description=description_msg,
706                                     epilog=epilog_msg)
707
708    parser.add_argument(
709        '-c',
710        '--csl',
711        action='store_true',
712        default=False,
713        help='test whether the RCP supports CSL transmitter',
714    )
715
716    parser.add_argument(
717        '-l',
718        '--link-metrics',
719        action='store_true',
720        default=False,
721        help='test whether the RCP supports link metrics',
722    )
723
724    parser.add_argument(
725        '-d',
726        '--diag-commands',
727        action='store_true',
728        default=False,
729        help='test whether the RCP supports all diag commands',
730    )
731
732    parser.add_argument(
733        '-f',
734        '--frame-format',
735        action='store_true',
736        default=False,
737        help='test whether the RCP supports 802.15.4 frames of all formats',
738    )
739
740    parser.add_argument(
741        '-p',
742        '--data-poll',
743        action='store_true',
744        default=False,
745        help='test whether the RCP supports data poll',
746    )
747
748    parser.add_argument(
749        '-t',
750        '--throughput',
751        action='store_true',
752        default=False,
753        help='test Thread network 1-hop throughput',
754    )
755
756    parser.add_argument(
757        '-v',
758        '--version',
759        action='store_true',
760        default=False,
761        help='output version',
762    )
763
764    parser.add_argument(
765        '-D',
766        '--debug',
767        action='store_true',
768        default=False,
769        help='output debug information',
770    )
771
772    return parser.parse_args()
773
774
775def main():
776    arguments = parse_arguments()
777
778    if arguments.version:
779        print(f'Version: {CP_CAPABILITY_VERSION}')
780        exit()
781
782    if arguments.debug:
783        logger = logging.getLogger()
784        logger.setLevel(logging.DEBUG)
785
786    rcp_caps = RcpCaps()
787
788    if arguments.diag_commands:
789        rcp_caps.test_diag_commands()
790
791    if arguments.csl:
792        rcp_caps.test_csl()
793
794    if arguments.data_poll:
795        rcp_caps.test_data_poll()
796
797    if arguments.link_metrics:
798        rcp_caps.test_link_metrics()
799
800    if arguments.throughput:
801        rcp_caps.test_throughput()
802
803    if arguments.frame_format:
804        rcp_caps.test_frame_format()
805
806
807if __name__ == '__main__':
808    main()
809