1#!/usr/bin/env python3
2#
3#  Copyright (c) 2016, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29
30import json
31import binascii
32import ipaddress
33import logging
34import os
35import re
36import shlex
37import socket
38import subprocess
39import sys
40import time
41import traceback
42import typing
43import unittest
44from ipaddress import IPv6Address, IPv6Network
45from typing import Union, Dict, Optional, List, Any
46
47import pexpect
48import pexpect.popen_spawn
49
50import config
51import simulator
52import thread_cert
53
54PORT_OFFSET = int(os.getenv('PORT_OFFSET', "0"))
55
56INFRA_DNS64 = int(os.getenv('NAT64', 0))
57
58
59class OtbrDocker:
60    RESET_DELAY = 3
61
62    _socat_proc = None
63    _ot_rcp_proc = None
64    _docker_proc = None
65    _border_routing_counters = None
66
67    def __init__(self, nodeid: int, backbone_network: str, **kwargs):
68        self.verbose = int(float(os.getenv('VERBOSE', 0)))
69
70        assert backbone_network is not None
71        self.backbone_network = backbone_network
72        try:
73            self._docker_name = config.OTBR_DOCKER_NAME_PREFIX + str(nodeid)
74            self._prepare_ot_rcp_sim(nodeid)
75            self._launch_docker()
76        except Exception:
77            traceback.print_exc()
78            self.destroy()
79            raise
80
81    def _prepare_ot_rcp_sim(self, nodeid: int):
82        self._socat_proc = subprocess.Popen(['socat', '-d', '-d', 'pty,raw,echo=0', 'pty,raw,echo=0'],
83                                            stderr=subprocess.PIPE,
84                                            stdin=subprocess.DEVNULL,
85                                            stdout=subprocess.DEVNULL)
86
87        line = self._socat_proc.stderr.readline().decode('ascii').strip()
88        self._rcp_device_pty = rcp_device_pty = line[line.index('PTY is /dev') + 7:]
89        line = self._socat_proc.stderr.readline().decode('ascii').strip()
90        self._rcp_device = rcp_device = line[line.index('PTY is /dev') + 7:]
91        logging.info(f"socat running: device PTY: {rcp_device_pty}, device: {rcp_device}")
92
93        ot_rcp_path = self._get_ot_rcp_path()
94        self._ot_rcp_proc = subprocess.Popen(f"{ot_rcp_path} {nodeid} > {rcp_device_pty} < {rcp_device_pty}",
95                                             shell=True,
96                                             stdin=subprocess.DEVNULL,
97                                             stdout=subprocess.DEVNULL,
98                                             stderr=subprocess.DEVNULL)
99
100        try:
101            self._ot_rcp_proc.wait(1)
102        except subprocess.TimeoutExpired:
103            # We expect ot-rcp not to quit in 1 second.
104            pass
105        else:
106            raise Exception(f"ot-rcp {nodeid} exited unexpectedly!")
107
108    def _get_ot_rcp_path(self) -> str:
109        srcdir = os.environ['top_builddir']
110        path = '%s/examples/apps/ncp/ot-rcp' % srcdir
111        logging.info("ot-rcp path: %s", path)
112        return path
113
114    def _launch_docker(self):
115        logging.info(f'Docker image: {config.OTBR_DOCKER_IMAGE}')
116        subprocess.check_call(f"docker rm -f {self._docker_name} || true", shell=True)
117        CI_ENV = os.getenv('CI_ENV', '').split()
118        dns = ['--dns=127.0.0.1'] if INFRA_DNS64 == 1 else ['--dns=8.8.8.8']
119        nat64_prefix = ['--nat64-prefix', '2001:db8:1:ffff::/96'] if INFRA_DNS64 == 1 else []
120        os.makedirs('/tmp/coverage/', exist_ok=True)
121
122        cmd = ['docker', 'run'] + CI_ENV + [
123            '--rm',
124            '--name',
125            self._docker_name,
126            '--network',
127            self.backbone_network,
128        ] + dns + [
129            '-i',
130            '--sysctl',
131            'net.ipv6.conf.all.disable_ipv6=0 net.ipv4.conf.all.forwarding=1 net.ipv6.conf.all.forwarding=1',
132            '--privileged',
133            '--cap-add=NET_ADMIN',
134            '--volume',
135            f'{self._rcp_device}:/dev/ttyUSB0',
136            '-v',
137            '/tmp/coverage/:/tmp/coverage/',
138            config.OTBR_DOCKER_IMAGE,
139            '-B',
140            config.BACKBONE_IFNAME,
141            '--trel-url',
142            f'trel://{config.BACKBONE_IFNAME}',
143        ] + nat64_prefix
144        logging.info(' '.join(cmd))
145        self._docker_proc = subprocess.Popen(cmd,
146                                             stdin=subprocess.DEVNULL,
147                                             stdout=sys.stdout if self.verbose else subprocess.DEVNULL,
148                                             stderr=sys.stderr if self.verbose else subprocess.DEVNULL)
149
150        launch_docker_deadline = time.time() + 300
151        launch_ok = False
152
153        while time.time() < launch_docker_deadline:
154            try:
155                subprocess.check_call(f'docker exec -i {self._docker_name} ot-ctl state', shell=True)
156                launch_ok = True
157                logging.info("OTBR Docker %s on %s Is Ready!", self._docker_name, self.backbone_network)
158                break
159            except subprocess.CalledProcessError:
160                time.sleep(5)
161                continue
162
163        assert launch_ok
164
165        self.start_ot_ctl()
166
167    def __repr__(self):
168        return f'OtbrDocker<{self.nodeid}>'
169
170    def start_otbr_service(self):
171        self.bash('service otbr-agent start')
172        self.simulator.go(3)
173        self.start_ot_ctl()
174
175    def stop_otbr_service(self):
176        self.stop_ot_ctl()
177        self.bash('service otbr-agent stop')
178
179    def stop_mdns_service(self):
180        self.bash('service avahi-daemon stop; service mdns stop; !(cat /proc/net/udp | grep -i :14E9)')
181
182    def start_mdns_service(self):
183        self.bash('service avahi-daemon start; service mdns start; cat /proc/net/udp | grep -i :14E9')
184
185    def start_ot_ctl(self):
186        cmd = f'docker exec -i {self._docker_name} ot-ctl'
187        self.pexpect = pexpect.popen_spawn.PopenSpawn(cmd, timeout=30)
188        if self.verbose:
189            self.pexpect.logfile_read = sys.stdout.buffer
190
191        # Add delay to ensure that the process is ready to receive commands.
192        timeout = 0.4
193        while timeout > 0:
194            self.pexpect.send('\r\n')
195            try:
196                self.pexpect.expect('> ', timeout=0.1)
197                break
198            except pexpect.TIMEOUT:
199                timeout -= 0.1
200
201    def stop_ot_ctl(self):
202        self.pexpect.sendeof()
203        self.pexpect.wait()
204        self.pexpect.proc.kill()
205
206    def reserve_udp_port(self, port):
207        self.bash(f'socat -u UDP6-LISTEN:{port},bindtodevice=wpan0 - &')
208
209    def destroy(self):
210        logging.info("Destroying %s", self)
211        self._shutdown_docker()
212        self._shutdown_ot_rcp()
213        self._shutdown_socat()
214
215    def _shutdown_docker(self):
216        if self._docker_proc is None:
217            return
218
219        try:
220            COVERAGE = int(os.getenv('COVERAGE', '0'))
221            OTBR_COVERAGE = int(os.getenv('OTBR_COVERAGE', '0'))
222            test_name = os.getenv('TEST_NAME')
223            unique_node_id = f'{test_name}-{PORT_OFFSET}-{self.nodeid}'
224
225            if COVERAGE or OTBR_COVERAGE:
226                self.bash('service otbr-agent stop')
227
228                cov_file_path = f'/tmp/coverage/coverage-{unique_node_id}.info'
229                # Upload OTBR code coverage if OTBR_COVERAGE=1, otherwise OpenThread code coverage.
230                if OTBR_COVERAGE:
231                    codecov_cmd = f'lcov --directory . --capture --output-file {cov_file_path}'
232                else:
233                    codecov_cmd = ('lcov --directory build/otbr/third_party/openthread/repo --capture '
234                                   f'--output-file {cov_file_path}')
235
236                self.bash(codecov_cmd)
237
238            copyCore = subprocess.run(f'docker cp {self._docker_name}:/core ./coredump_{unique_node_id}', shell=True)
239            if copyCore.returncode == 0:
240                subprocess.check_call(
241                    f'docker cp {self._docker_name}:/usr/sbin/otbr-agent ./otbr-agent_{unique_node_id}', shell=True)
242
243        finally:
244            subprocess.check_call(f"docker rm -f {self._docker_name}", shell=True)
245            self._docker_proc.wait()
246            del self._docker_proc
247
248    def _shutdown_ot_rcp(self):
249        if self._ot_rcp_proc is not None:
250            self._ot_rcp_proc.kill()
251            self._ot_rcp_proc.wait()
252            del self._ot_rcp_proc
253
254    def _shutdown_socat(self):
255        if self._socat_proc is not None:
256            self._socat_proc.stderr.close()
257            self._socat_proc.kill()
258            self._socat_proc.wait()
259            del self._socat_proc
260
261    def bash(self, cmd: str, encoding='ascii') -> List[str]:
262        logging.info("%s $ %s", self, cmd)
263        proc = subprocess.Popen(['docker', 'exec', '-i', self._docker_name, 'bash', '-c', cmd],
264                                stdin=subprocess.DEVNULL,
265                                stdout=subprocess.PIPE,
266                                stderr=sys.stderr,
267                                encoding=encoding)
268
269        with proc:
270
271            lines = []
272
273            while True:
274                line = proc.stdout.readline()
275
276                if not line:
277                    break
278
279                lines.append(line)
280                logging.info("%s $ %r", self, line.rstrip('\r\n'))
281
282            proc.wait()
283
284            if proc.returncode != 0:
285                raise subprocess.CalledProcessError(proc.returncode, cmd, ''.join(lines))
286            else:
287                return lines
288
289    def dns_dig(self, server: str, name: str, qtype: str):
290        """
291        Run dig command to query a DNS server.
292
293        Args:
294            server: the server address.
295            name: the name to query.
296            qtype: the query type (e.g. AAAA, PTR, TXT, SRV).
297
298        Returns:
299            The dig result similar as below:
300            {
301                "opcode": "QUERY",
302                "status": "NOERROR",
303                "id": "64144",
304                "QUESTION": [
305                    ('google.com.', 'IN', 'AAAA')
306                ],
307                "ANSWER": [
308                    ('google.com.', 107,	'IN', 'AAAA', '2404:6800:4008:c00::71'),
309                    ('google.com.', 107,	'IN', 'AAAA', '2404:6800:4008:c00::8a'),
310                    ('google.com.', 107,	'IN', 'AAAA', '2404:6800:4008:c00::66'),
311                    ('google.com.', 107,	'IN', 'AAAA', '2404:6800:4008:c00::8b'),
312                ],
313                "ADDITIONAL": [
314                ],
315            }
316        """
317        output = self.bash(f'dig -6 @{server} \'{name}\' {qtype}', encoding='raw_unicode_escape')
318
319        section = None
320        dig_result = {
321            'QUESTION': [],
322            'ANSWER': [],
323            'ADDITIONAL': [],
324        }
325
326        for line in output:
327            line = line.strip()
328
329            if line.startswith(';; ->>HEADER<<- '):
330                headers = line[len(';; ->>HEADER<<- '):].split(', ')
331                for header in headers:
332                    key, val = header.split(': ')
333                    dig_result[key] = val
334
335                continue
336
337            if line == ';; QUESTION SECTION:':
338                section = 'QUESTION'
339                continue
340            elif line == ';; ANSWER SECTION:':
341                section = 'ANSWER'
342                continue
343            elif line == ';; ADDITIONAL SECTION:':
344                section = 'ADDITIONAL'
345                continue
346            elif section and not line:
347                section = None
348                continue
349
350            if section:
351                assert line
352
353                if section == 'QUESTION':
354                    assert line.startswith(';')
355                    line = line[1:]
356                record = list(line.split())
357
358                if section == 'QUESTION':
359                    if record[2] in ('SRV', 'TXT'):
360                        record[0] = self.__unescape_dns_instance_name(record[0])
361                else:
362                    record[1] = int(record[1])
363                    if record[3] == 'SRV':
364                        record[0] = self.__unescape_dns_instance_name(record[0])
365                        record[4], record[5], record[6] = map(int, [record[4], record[5], record[6]])
366                    elif record[3] == 'TXT':
367                        record[0] = self.__unescape_dns_instance_name(record[0])
368                        record[4:] = [self.__parse_dns_dig_txt(line)]
369                    elif record[3] == 'PTR':
370                        record[4] = self.__unescape_dns_instance_name(record[4])
371
372                dig_result[section].append(tuple(record))
373
374        return dig_result
375
376    def call_dbus_method(self, *args):
377        args = shlex.join([args[0], args[1], json.dumps(args[2:])])
378        return json.loads(
379            self.bash(f'python3 /app/third_party/openthread/repo/tests/scripts/thread-cert/call_dbus_method.py {args}')
380            [0])
381
382    def get_dbus_property(self, property_name):
383        return self.call_dbus_method('org.freedesktop.DBus.Properties', 'Get', 'io.openthread.BorderRouter',
384                                     property_name)
385
386    def set_dbus_property(self, property_name, property_value):
387        return self.call_dbus_method('org.freedesktop.DBus.Properties', 'Set', 'io.openthread.BorderRouter',
388                                     property_name, property_value)
389
390    def get_border_routing_counters(self):
391        counters = self.get_dbus_property('BorderRoutingCounters')
392        counters = {
393            'inbound_unicast': counters[0],
394            'inbound_multicast': counters[1],
395            'outbound_unicast': counters[2],
396            'outbound_multicast': counters[3],
397            'ra_rx': counters[4],
398            'ra_tx_success': counters[5],
399            'ra_tx_failure': counters[6],
400            'rs_rx': counters[7],
401            'rs_tx_success': counters[8],
402            'rs_tx_failure': counters[9],
403        }
404        logging.info(f'border routing counters: {counters}')
405        return counters
406
407    def _process_traffic_counters(self, counter):
408        return {
409            '4to6': {
410                'packets': counter[0],
411                'bytes': counter[1],
412            },
413            '6to4': {
414                'packets': counter[2],
415                'bytes': counter[3],
416            }
417        }
418
419    def _process_packet_counters(self, counter):
420        return {'4to6': {'packets': counter[0]}, '6to4': {'packets': counter[1]}}
421
422    def nat64_set_enabled(self, enable):
423        return self.call_dbus_method('io.openthread.BorderRouter', 'SetNat64Enabled', enable)
424
425    def activate_ephemeral_key_mode(self, lifetime):
426        return self.call_dbus_method('io.openthread.BorderRouter', 'ActivateEphemeralKeyMode', lifetime)
427
428    def deactivate_ephemeral_key_mode(self):
429        return self.call_dbus_method('io.openthread.BorderRouter', 'DeactivateEphemeralKeyMode')
430
431    @property
432    def nat64_cidr(self):
433        self.send_command('nat64 cidr')
434        cidr = self._expect_command_output()[0].strip()
435        return ipaddress.IPv4Network(cidr, strict=False)
436
437    @nat64_cidr.setter
438    def nat64_cidr(self, cidr: ipaddress.IPv4Network):
439        if not isinstance(cidr, ipaddress.IPv4Network):
440            raise ValueError("cidr is expected to be an instance of ipaddress.IPv4Network")
441        self.send_command(f'nat64 cidr {cidr}')
442        self._expect_done()
443
444    @property
445    def nat64_state(self):
446        state = self.get_dbus_property('Nat64State')
447        return {'PrefixManager': state[0], 'Translator': state[1]}
448
449    @property
450    def nat64_mappings(self):
451        return [{
452            'id': row[0],
453            'ip4': row[1],
454            'ip6': row[2],
455            'expiry': row[3],
456            'counters': {
457                'total': self._process_traffic_counters(row[4][0]),
458                'ICMP': self._process_traffic_counters(row[4][1]),
459                'UDP': self._process_traffic_counters(row[4][2]),
460                'TCP': self._process_traffic_counters(row[4][3]),
461            }
462        } for row in self.get_dbus_property('Nat64Mappings')]
463
464    @property
465    def nat64_counters(self):
466        res_error = self.get_dbus_property('Nat64ErrorCounters')
467        res_proto = self.get_dbus_property('Nat64ProtocolCounters')
468        return {
469            'protocol': {
470                'Total': self._process_traffic_counters(res_proto[0]),
471                'ICMP': self._process_traffic_counters(res_proto[1]),
472                'UDP': self._process_traffic_counters(res_proto[2]),
473                'TCP': self._process_traffic_counters(res_proto[3]),
474            },
475            'errors': {
476                'Unknown': self._process_packet_counters(res_error[0]),
477                'Illegal Pkt': self._process_packet_counters(res_error[1]),
478                'Unsup Proto': self._process_packet_counters(res_error[2]),
479                'No Mapping': self._process_packet_counters(res_error[3]),
480            }
481        }
482
483    @property
484    def nat64_traffic_counters(self):
485        res = self.get_dbus_property('Nat64TrafficCounters')
486        return {
487            'Total': self._process_traffic_counters(res[0]),
488            'ICMP': self._process_traffic_counters(res[1]),
489            'UDP': self._process_traffic_counters(res[2]),
490            'TCP': self._process_traffic_counters(res[3]),
491        }
492
493    @property
494    def dns_upstream_query_state(self):
495        return bool(self.get_dbus_property('DnsUpstreamQueryState'))
496
497    @dns_upstream_query_state.setter
498    def dns_upstream_query_state(self, value):
499        if type(value) is not bool:
500            raise ValueError("dns_upstream_query_state must be a bool")
501        return self.set_dbus_property('DnsUpstreamQueryState', value)
502
503    @property
504    def ephemeral_key_enabled(self):
505        return bool(self.get_dbus_property('EphemeralKeyEnabled'))
506
507    @ephemeral_key_enabled.setter
508    def ephemeral_key_enabled(self, value):
509        if type(value) is not bool:
510            raise ValueError("ephemeral_key_enabled must be a bool")
511        return self.set_dbus_property('EphemeralKeyEnabled', value)
512
513    def read_border_routing_counters_delta(self):
514        old_counters = self._border_routing_counters
515        new_counters = self.get_border_routing_counters()
516        self._border_routing_counters = new_counters
517        delta_counters = {}
518        if old_counters is None:
519            delta_counters = new_counters
520        else:
521            for i in ('inbound', 'outbound'):
522                for j in ('unicast', 'multicast'):
523                    key = f'{i}_{j}'
524                    assert (key in old_counters)
525                    assert (key in new_counters)
526                    value = [new_counters[key][0] - old_counters[key][0], new_counters[key][1] - old_counters[key][1]]
527                    delta_counters[key] = value
528        delta_counters = {
529            key: value for key, value in delta_counters.items() if not isinstance(value, int) and value[0] and value[1]
530        }
531
532        return delta_counters
533
534    @staticmethod
535    def __unescape_dns_instance_name(name: str) -> str:
536        new_name = []
537        i = 0
538        while i < len(name):
539            c = name[i]
540
541            if c == '\\':
542                assert i + 1 < len(name), name
543                if name[i + 1].isdigit():
544                    assert i + 3 < len(name) and name[i + 2].isdigit() and name[i + 3].isdigit(), name
545                    new_name.append(chr(int(name[i + 1:i + 4])))
546                    i += 3
547                else:
548                    new_name.append(name[i + 1])
549                    i += 1
550            else:
551                new_name.append(c)
552
553            i += 1
554
555        return ''.join(new_name)
556
557    def __parse_dns_dig_txt(self, line: str):
558        # Example TXT entry:
559        # "xp=\\000\\013\\184\\000\\000\\000\\000\\000"
560        txt = {}
561        for entry in re.findall(r'"((?:[^\\]|\\.)*?)"', line):
562            if entry == "":
563                continue
564
565            k, v = entry.split('=', 1)
566            txt[k] = v
567
568        return txt
569
570    def _setup_sysctl(self):
571        self.bash(f'sysctl net.ipv6.conf.{self.ETH_DEV}.accept_ra=2')
572        self.bash(f'sysctl net.ipv6.conf.{self.ETH_DEV}.accept_ra_rt_info_max_plen=64')
573
574
575class OtCli:
576    RESET_DELAY = 0.1
577
578    def __init__(self, nodeid, is_mtd=False, version=None, is_bbr=False, **kwargs):
579        self.verbose = int(float(os.getenv('VERBOSE', 0)))
580        self.node_type = os.getenv('NODE_TYPE', 'sim')
581        self.env_version = os.getenv('THREAD_VERSION', '1.1')
582        self.is_bbr = is_bbr
583        self._initialized = False
584        if os.getenv('COVERAGE', 0) and os.getenv('CC', 'gcc') == 'gcc':
585            self._cmd_prefix = '/usr/bin/env GCOV_PREFIX=%s/ot-run/%s/ot-gcda.%d ' % (os.getenv(
586                'top_srcdir', '.'), sys.argv[0], nodeid)
587        else:
588            self._cmd_prefix = ''
589
590        if version is not None:
591            self.version = version
592        else:
593            self.version = self.env_version
594
595        mode = os.environ.get('USE_MTD') == '1' and is_mtd and 'mtd' or 'ftd'
596
597        if self.node_type == 'soc':
598            self.__init_soc(nodeid)
599        elif self.node_type == 'ncp-sim':
600            # TODO use mode after ncp-mtd is available.
601            self.__init_ncp_sim(nodeid, 'ftd')
602        else:
603            self.__init_sim(nodeid, mode)
604
605        if self.verbose:
606            self.pexpect.logfile_read = sys.stdout.buffer
607
608        self._initialized = True
609
610    def __init_sim(self, nodeid, mode):
611        """ Initialize a simulation node. """
612
613        # Default command if no match below, will be overridden if below conditions are met.
614        cmd = './ot-cli-%s' % (mode)
615
616        # For Thread 1.2 MTD node, use ot-cli-mtd build regardless of OT_CLI_PATH
617        if self.version != '1.1' and mode == 'mtd' and 'top_builddir' in os.environ:
618            srcdir = os.environ['top_builddir']
619            cmd = '%s/examples/apps/cli/ot-cli-%s %d' % (srcdir, mode, nodeid)
620
621        # If Thread version of node matches the testing environment version.
622        elif self.version == self.env_version:
623            # Load Thread 1.2 BBR device when testing Thread 1.2 scenarios
624            # which requires device with Backbone functionality.
625            if self.version != '1.1' and self.is_bbr:
626                if 'OT_CLI_PATH_BBR' in os.environ:
627                    cmd = os.environ['OT_CLI_PATH_BBR']
628                elif 'top_builddir_1_4_bbr' in os.environ:
629                    srcdir = os.environ['top_builddir_1_4_bbr']
630                    cmd = '%s/examples/apps/cli/ot-cli-%s' % (srcdir, mode)
631
632            # Load Thread device of the testing environment version (may be 1.1 or 1.2)
633            else:
634                if 'OT_CLI_PATH' in os.environ:
635                    cmd = os.environ['OT_CLI_PATH']
636                elif 'top_builddir' in os.environ:
637                    srcdir = os.environ['top_builddir']
638                    cmd = '%s/examples/apps/cli/ot-cli-%s' % (srcdir, mode)
639
640            if 'RADIO_DEVICE' in os.environ:
641                cmd += ' --real-time-signal=+1 -v spinel+hdlc+uart://%s?forkpty-arg=%d' % (os.environ['RADIO_DEVICE'],
642                                                                                           nodeid)
643                self.is_posix = True
644            else:
645                cmd += ' %d' % nodeid
646
647        # Load Thread 1.1 node when testing Thread 1.2 scenarios for interoperability
648        elif self.version == '1.1':
649            # Posix app
650            if 'OT_CLI_PATH_1_1' in os.environ:
651                cmd = os.environ['OT_CLI_PATH_1_1']
652            elif 'top_builddir_1_1' in os.environ:
653                srcdir = os.environ['top_builddir_1_1']
654                cmd = '%s/examples/apps/cli/ot-cli-%s' % (srcdir, mode)
655
656            if 'RADIO_DEVICE_1_1' in os.environ:
657                cmd += ' --real-time-signal=+1 -v spinel+hdlc+uart://%s?forkpty-arg=%d' % (
658                    os.environ['RADIO_DEVICE_1_1'], nodeid)
659                self.is_posix = True
660            else:
661                cmd += ' %d' % nodeid
662
663        print("%s" % cmd)
664
665        self.pexpect = pexpect.popen_spawn.PopenSpawn(self._cmd_prefix + cmd, timeout=10)
666
667        # Add delay to ensure that the process is ready to receive commands.
668        timeout = 0.4
669        while timeout > 0:
670            self.pexpect.send('\r\n')
671            try:
672                self.pexpect.expect('> ', timeout=0.1)
673                break
674            except pexpect.TIMEOUT:
675                timeout -= 0.1
676
677    def __init_ncp_sim(self, nodeid, mode):
678        """ Initialize an NCP simulation node. """
679
680        # Default command if no match below, will be overridden if below conditions are met.
681        cmd = 'spinel-cli.py -p ./ot-ncp-%s -n' % mode
682
683        # If Thread version of node matches the testing environment version.
684        if self.version == self.env_version:
685            if 'RADIO_DEVICE' in os.environ:
686                args = ' --real-time-signal=+1 spinel+hdlc+uart://%s?forkpty-arg=%d' % (os.environ['RADIO_DEVICE'],
687                                                                                        nodeid)
688                self.is_posix = True
689            else:
690                args = ''
691
692            # Load Thread 1.2 BBR device when testing Thread 1.2 scenarios
693            # which requires device with Backbone functionality.
694            if self.version != '1.1' and self.is_bbr:
695                if 'OT_NCP_PATH_1_4_BBR' in os.environ:
696                    cmd = 'spinel-cli.py -p "%s%s" -n' % (
697                        os.environ['OT_NCP_PATH_1_4_BBR'],
698                        args,
699                    )
700                elif 'top_builddir_1_4_bbr' in os.environ:
701                    srcdir = os.environ['top_builddir_1_4_bbr']
702                    cmd = '%s/examples/apps/ncp/ot-ncp-%s' % (srcdir, mode)
703                    cmd = 'spinel-cli.py -p "%s%s" -n' % (
704                        cmd,
705                        args,
706                    )
707
708            # Load Thread device of the testing environment version (may be 1.1 or 1.2).
709            else:
710                if 'OT_NCP_PATH' in os.environ:
711                    cmd = 'spinel-cli.py -p "%s%s" -n' % (
712                        os.environ['OT_NCP_PATH'],
713                        args,
714                    )
715                elif 'top_builddir' in os.environ:
716                    srcdir = os.environ['top_builddir']
717                    cmd = '%s/examples/apps/ncp/ot-ncp-%s' % (srcdir, mode)
718                    cmd = 'spinel-cli.py -p "%s%s" -n' % (
719                        cmd,
720                        args,
721                    )
722
723        # Load Thread 1.1 node when testing Thread 1.2 scenarios for interoperability.
724        elif self.version == '1.1':
725            if 'RADIO_DEVICE_1_1' in os.environ:
726                args = ' --real-time-signal=+1 spinel+hdlc+uart://%s?forkpty-arg=%d' % (os.environ['RADIO_DEVICE_1_1'],
727                                                                                        nodeid)
728                self.is_posix = True
729            else:
730                args = ''
731
732            if 'OT_NCP_PATH_1_1' in os.environ:
733                cmd = 'spinel-cli.py -p "%s%s" -n' % (
734                    os.environ['OT_NCP_PATH_1_1'],
735                    args,
736                )
737            elif 'top_builddir_1_1' in os.environ:
738                srcdir = os.environ['top_builddir_1_1']
739                cmd = '%s/examples/apps/ncp/ot-ncp-%s' % (srcdir, mode)
740                cmd = 'spinel-cli.py -p "%s%s" -n' % (
741                    cmd,
742                    args,
743                )
744
745        cmd += ' %d' % nodeid
746        print("%s" % cmd)
747
748        self.pexpect = pexpect.spawn(self._cmd_prefix + cmd, timeout=10)
749
750        # Add delay to ensure that the process is ready to receive commands.
751        time.sleep(0.2)
752        self._expect('spinel-cli >')
753        self.debug(int(os.getenv('DEBUG', '0')))
754
755    def __init_soc(self, nodeid):
756        """ Initialize a System-on-a-chip node connected via UART. """
757        import fdpexpect
758
759        serialPort = '/dev/ttyUSB%d' % ((nodeid - 1) * 2)
760        self.pexpect = fdpexpect.fdspawn(os.open(serialPort, os.O_RDWR | os.O_NONBLOCK | os.O_NOCTTY))
761
762    def destroy(self):
763        if not self._initialized:
764            return
765
766        if (hasattr(self.pexpect, 'proc') and self.pexpect.proc.poll() is None or
767                not hasattr(self.pexpect, 'proc') and self.pexpect.isalive()):
768            print("%d: exit" % self.nodeid)
769            self.pexpect.send('exit\n')
770            self.pexpect.expect(pexpect.EOF)
771            self.pexpect.wait()
772            self._initialized = False
773
774
775class NodeImpl:
776    is_host = False
777    is_otbr = False
778
779    def __init__(self, nodeid, name=None, simulator=None, **kwargs):
780        self.nodeid = nodeid
781        self.name = name or ('Node%d' % nodeid)
782        self.is_posix = False
783
784        self.simulator = simulator
785        if self.simulator:
786            self.simulator.add_node(self)
787
788        super().__init__(nodeid, **kwargs)
789
790        self.set_addr64('%016x' % (thread_cert.EXTENDED_ADDRESS_BASE + nodeid))
791
792    def _expect(self, pattern, timeout=-1, *args, **kwargs):
793        """ Process simulator events until expected the pattern. """
794        if timeout == -1:
795            timeout = self.pexpect.timeout
796
797        assert timeout > 0
798
799        while timeout > 0:
800            try:
801                return self.pexpect.expect(pattern, 0.1, *args, **kwargs)
802            except pexpect.TIMEOUT:
803                timeout -= 0.1
804                self.simulator.go(0)
805                if timeout <= 0:
806                    raise
807
808    def _expect_done(self, timeout=-1):
809        self._expect('Done', timeout)
810
811    def _expect_result(self, pattern, *args, **kwargs):
812        """Expect a single matching result.
813
814        The arguments are identical to pexpect.expect().
815
816        Returns:
817            The matched line.
818        """
819        results = self._expect_results(pattern, *args, **kwargs)
820        assert len(results) == 1, results
821        return results[0]
822
823    def _expect_results(self, pattern, *args, **kwargs):
824        """Expect multiple matching results.
825
826        The arguments are identical to pexpect.expect().
827
828        Returns:
829            The matched lines.
830        """
831        output = self._expect_command_output()
832        results = [line for line in output if self._match_pattern(line, pattern)]
833        return results
834
835    def _expect_key_value_pairs(self, pattern, separator=': '):
836        """Expect 'key: value' in multiple lines.
837
838        Returns:
839            Dictionary of the key:value pairs.
840        """
841        result = {}
842        for line in self._expect_results(pattern):
843            key, val = line.split(separator)
844            result.update({key: val})
845        return result
846
847    @staticmethod
848    def _match_pattern(line, pattern):
849        if isinstance(pattern, str):
850            pattern = re.compile(pattern)
851
852        if isinstance(pattern, typing.Pattern):
853            return pattern.match(line)
854        else:
855            return any(NodeImpl._match_pattern(line, p) for p in pattern)
856
857    def _expect_command_output(self, ignore_logs=True):
858        lines = []
859
860        while True:
861            line = self.__readline(ignore_logs=ignore_logs)
862
863            if line == 'Done':
864                break
865            elif line.startswith('Error '):
866                raise Exception(line)
867            else:
868                lines.append(line)
869
870        print(f'_expect_command_output() returns {lines!r}')
871        return lines
872
873    def __is_logging_line(self, line: str) -> bool:
874        return len(line) >= 3 and line[:3] in {'[D]', '[I]', '[N]', '[W]', '[C]', '[-]'}
875
876    def read_cert_messages_in_commissioning_log(self, timeout=-1):
877        """Get the log of the traffic after DTLS handshake.
878        """
879        format_str = br"=+?\[\[THCI\].*?type=%s.*?\].*?=+?[\s\S]+?-{40,}"
880        join_fin_req = format_str % br"JOIN_FIN\.req"
881        join_fin_rsp = format_str % br"JOIN_FIN\.rsp"
882        dummy_format_str = br"\[THCI\].*?type=%s.*?"
883        join_ent_ntf = dummy_format_str % br"JOIN_ENT\.ntf"
884        join_ent_rsp = dummy_format_str % br"JOIN_ENT\.rsp"
885        pattern = (b"(" + join_fin_req + b")|(" + join_fin_rsp + b")|(" + join_ent_ntf + b")|(" + join_ent_rsp + b")")
886
887        messages = []
888        # There are at most 4 cert messages both for joiner and commissioner
889        for _ in range(0, 4):
890            try:
891                self._expect(pattern, timeout=timeout)
892                log = self.pexpect.match.group(0)
893                messages.append(self._extract_cert_message(log))
894            except BaseException:
895                break
896        return messages
897
898    def _extract_cert_message(self, log):
899        res = re.search(br"direction=\w+", log)
900        assert res
901        direction = res.group(0).split(b'=')[1].strip()
902
903        res = re.search(br"type=\S+", log)
904        assert res
905        type = res.group(0).split(b'=')[1].strip()
906
907        payload = bytearray([])
908        payload_len = 0
909        if type in [b"JOIN_FIN.req", b"JOIN_FIN.rsp"]:
910            res = re.search(br"len=\d+", log)
911            assert res
912            payload_len = int(res.group(0).split(b'=')[1].strip())
913
914            hex_pattern = br"\|(\s([0-9a-fA-F]{2}|\.\.))+?\s+?\|"
915            while True:
916                res = re.search(hex_pattern, log)
917                if not res:
918                    break
919                data = [int(hex, 16) for hex in res.group(0)[1:-1].split(b' ') if hex and hex != b'..']
920                payload += bytearray(data)
921                log = log[res.end() - 1:]
922        assert len(payload) == payload_len
923        return (direction, type, payload)
924
925    def send_command(self, cmd, go=True, expect_command_echo=True):
926        print("%d: %s" % (self.nodeid, cmd))
927        self.pexpect.send(cmd + '\n')
928        if go:
929            self.simulator.go(0, nodeid=self.nodeid)
930        sys.stdout.flush()
931
932        if expect_command_echo:
933            self._expect_command_echo(cmd)
934
935    def _expect_command_echo(self, cmd):
936        cmd = cmd.strip()
937        while True:
938            line = self.__readline()
939            if line == cmd:
940                break
941
942            logging.warning("expecting echo %r, but read %r", cmd, line)
943
944    def __readline(self, ignore_logs=True):
945        PROMPT = 'spinel-cli > ' if self.node_type == 'ncp-sim' else '> '
946        while True:
947            self._expect(r"[^\n]+\n")
948            line = self.pexpect.match.group(0).decode('utf8').strip()
949            while line.startswith(PROMPT):
950                line = line[len(PROMPT):]
951
952            if line == '':
953                continue
954
955            if ignore_logs and self.__is_logging_line(line):
956                continue
957
958            return line
959
960    def get_commands(self):
961        self.send_command('?')
962        self._expect('Commands:')
963        return self._expect_results(r'\S+')
964
965    def set_mode(self, mode):
966        cmd = 'mode %s' % mode
967        self.send_command(cmd)
968        self._expect_done()
969
970    def debug(self, level):
971        # `debug` command will not trigger interaction with simulator
972        self.send_command('debug %d' % level, go=False)
973
974    def start(self):
975        self.interface_up()
976        self.thread_start()
977
978    def stop(self):
979        self.thread_stop()
980        self.interface_down()
981
982    def set_log_level(self, level: int):
983        self.send_command(f'log level {level}')
984        self._expect_done()
985
986    def interface_up(self):
987        self.send_command('ifconfig up')
988        self._expect_done()
989
990    def interface_down(self):
991        self.send_command('ifconfig down')
992        self._expect_done()
993
994    def thread_start(self):
995        self.send_command('thread start')
996        self._expect_done()
997
998    def thread_stop(self):
999        self.send_command('thread stop')
1000        self._expect_done()
1001
1002    def detach(self, is_async=False):
1003        cmd = 'detach'
1004        if is_async:
1005            cmd += ' async'
1006
1007        self.send_command(cmd)
1008
1009        if is_async:
1010            self._expect_done()
1011            return
1012
1013        end = self.simulator.now() + 4
1014        while True:
1015            self.simulator.go(1)
1016            try:
1017                self._expect_done(timeout=0.1)
1018                return
1019            except (pexpect.TIMEOUT, socket.timeout):
1020                if self.simulator.now() > end:
1021                    raise
1022
1023    def expect_finished_detaching(self):
1024        self._expect('Finished detaching')
1025
1026    def commissioner_start(self):
1027        cmd = 'commissioner start'
1028        self.send_command(cmd)
1029        self._expect_done()
1030
1031    def commissioner_stop(self):
1032        cmd = 'commissioner stop'
1033        self.send_command(cmd)
1034        self._expect_done()
1035
1036    def commissioner_state(self):
1037        states = [r'disabled', r'petitioning', r'active']
1038        self.send_command('commissioner state')
1039        return self._expect_result(states)
1040
1041    def commissioner_add_joiner(self, addr, psk):
1042        cmd = 'commissioner joiner add %s %s' % (addr, psk)
1043        self.send_command(cmd)
1044        self._expect_done()
1045
1046    def commissioner_set_provisioning_url(self, provisioning_url=''):
1047        cmd = 'commissioner provisioningurl %s' % provisioning_url
1048        self.send_command(cmd)
1049        self._expect_done()
1050
1051    def joiner_start(self, pskd='', provisioning_url=''):
1052        cmd = 'joiner start %s %s' % (pskd, provisioning_url)
1053        self.send_command(cmd)
1054        self._expect_done()
1055
1056    def clear_allowlist(self):
1057        cmd = 'macfilter addr clear'
1058        self.send_command(cmd)
1059        self._expect_done()
1060
1061    def enable_allowlist(self):
1062        cmd = 'macfilter addr allowlist'
1063        self.send_command(cmd)
1064        self._expect_done()
1065
1066    def disable_allowlist(self):
1067        cmd = 'macfilter addr disable'
1068        self.send_command(cmd)
1069        self._expect_done()
1070
1071    def add_allowlist(self, addr, rssi=None):
1072        cmd = 'macfilter addr add %s' % addr
1073
1074        if rssi is not None:
1075            cmd += ' %s' % rssi
1076
1077        self.send_command(cmd)
1078        self._expect_done()
1079
1080    def radiofilter_is_enabled(self) -> bool:
1081        states = [r'Disabled', r'Enabled']
1082        self.send_command('radiofilter')
1083        return self._expect_result(states) == 'Enabled'
1084
1085    def radiofilter_enable(self):
1086        cmd = 'radiofilter enable'
1087        self.send_command(cmd)
1088        self._expect_done()
1089
1090    def radiofilter_disable(self):
1091        cmd = 'radiofilter disable'
1092        self.send_command(cmd)
1093        self._expect_done()
1094
1095    def get_bbr_registration_jitter(self):
1096        self.send_command('bbr jitter')
1097        return int(self._expect_result(r'\d+'))
1098
1099    def set_bbr_registration_jitter(self, jitter):
1100        cmd = 'bbr jitter %d' % jitter
1101        self.send_command(cmd)
1102        self._expect_done()
1103
1104    def get_rcp_version(self) -> str:
1105        self.send_command('rcp version')
1106        rcp_version = self._expect_command_output()[0].strip()
1107        return rcp_version
1108
1109    def srp_server_get_state(self):
1110        states = ['disabled', 'running', 'stopped']
1111        self.send_command('srp server state')
1112        return self._expect_result(states)
1113
1114    def srp_server_get_addr_mode(self):
1115        modes = [r'unicast', r'anycast']
1116        self.send_command(f'srp server addrmode')
1117        return self._expect_result(modes)
1118
1119    def srp_server_set_addr_mode(self, mode):
1120        self.send_command(f'srp server addrmode {mode}')
1121        self._expect_done()
1122
1123    def srp_server_get_anycast_seq_num(self):
1124        self.send_command(f'srp server seqnum')
1125        return int(self._expect_result(r'\d+'))
1126
1127    def srp_server_set_anycast_seq_num(self, seqnum):
1128        self.send_command(f'srp server seqnum {seqnum}')
1129        self._expect_done()
1130
1131    def srp_server_set_enabled(self, enable):
1132        cmd = f'srp server {"enable" if enable else "disable"}'
1133        self.send_command(cmd)
1134        self._expect_done()
1135
1136    def srp_server_set_lease_range(self, min_lease, max_lease, min_key_lease, max_key_lease):
1137        self.send_command(f'srp server lease {min_lease} {max_lease} {min_key_lease} {max_key_lease}')
1138        self._expect_done()
1139
1140    def srp_server_set_ttl_range(self, min_ttl, max_ttl):
1141        self.send_command(f'srp server ttl {min_ttl} {max_ttl}')
1142        self._expect_done()
1143
1144    def srp_server_get_hosts(self):
1145        """Returns the host list on the SRP server as a list of property
1146           dictionary.
1147
1148           Example output:
1149           [{
1150               'fullname': 'my-host.default.service.arpa.',
1151               'name': 'my-host',
1152               'deleted': 'false',
1153               'addresses': ['2001::1', '2001::2']
1154           }]
1155        """
1156
1157        cmd = 'srp server host'
1158        self.send_command(cmd)
1159        lines = self._expect_command_output()
1160        host_list = []
1161        while lines:
1162            host = {}
1163
1164            host['fullname'] = lines.pop(0).strip()
1165            host['name'] = host['fullname'].split('.')[0]
1166
1167            host['deleted'] = lines.pop(0).strip().split(':')[1].strip()
1168            if host['deleted'] == 'true':
1169                host_list.append(host)
1170                continue
1171
1172            addresses = lines.pop(0).strip().split('[')[1].strip(' ]').split(',')
1173            map(str.strip, addresses)
1174            host['addresses'] = [addr.strip() for addr in addresses if addr]
1175
1176            host_list.append(host)
1177
1178        return host_list
1179
1180    def srp_server_get_host(self, host_name):
1181        """Returns host on the SRP server that matches given host name.
1182
1183           Example usage:
1184           self.srp_server_get_host("my-host")
1185        """
1186
1187        for host in self.srp_server_get_hosts():
1188            if host_name == host['name']:
1189                return host
1190
1191    def srp_server_get_services(self):
1192        """Returns the service list on the SRP server as a list of property
1193           dictionary.
1194
1195           Example output:
1196           [{
1197               'fullname': 'my-service._ipps._tcp.default.service.arpa.',
1198               'instance': 'my-service',
1199               'name': '_ipps._tcp',
1200               'deleted': 'false',
1201               'port': '12345',
1202               'priority': '0',
1203               'weight': '0',
1204               'ttl': '7200',
1205               'lease': '7200',
1206               'key-lease': '7200',
1207               'TXT': ['abc=010203'],
1208               'host_fullname': 'my-host.default.service.arpa.',
1209               'host': 'my-host',
1210               'addresses': ['2001::1', '2001::2']
1211           }]
1212
1213           Note that the TXT data is output as a HEX string.
1214        """
1215
1216        cmd = 'srp server service'
1217        self.send_command(cmd)
1218        lines = self._expect_command_output()
1219
1220        service_list = []
1221        while lines:
1222            service = {}
1223
1224            service['fullname'] = lines.pop(0).strip()
1225            name_labels = service['fullname'].split('.')
1226            service['instance'] = name_labels[0]
1227            service['name'] = '.'.join(name_labels[1:3])
1228
1229            service['deleted'] = lines.pop(0).strip().split(':')[1].strip()
1230            if service['deleted'] == 'true':
1231                service_list.append(service)
1232                continue
1233
1234            # 'subtypes', port', 'priority', 'weight', 'ttl', 'lease', and 'key-lease'
1235            for i in range(0, 7):
1236                key_value = lines.pop(0).strip().split(':')
1237                service[key_value[0].strip()] = key_value[1].strip()
1238
1239            txt_entries = lines.pop(0).strip().split('[')[1].strip(' ]').split(',')
1240            txt_entries = map(str.strip, txt_entries)
1241            service['TXT'] = [txt for txt in txt_entries if txt]
1242
1243            service['host_fullname'] = lines.pop(0).strip().split(':')[1].strip()
1244            service['host'] = service['host_fullname'].split('.')[0]
1245
1246            addresses = lines.pop(0).strip().split('[')[1].strip(' ]').split(',')
1247            addresses = map(str.strip, addresses)
1248            service['addresses'] = [addr for addr in addresses if addr]
1249
1250            service_list.append(service)
1251
1252        return service_list
1253
1254    def srp_server_get_service(self, instance_name, service_name):
1255        """Returns service on the SRP server that matches given instance
1256           name and service name.
1257
1258           Example usage:
1259           self.srp_server_get_service("my-service", "_ipps._tcp")
1260        """
1261
1262        for service in self.srp_server_get_services():
1263            if (instance_name == service['instance'] and service_name == service['name']):
1264                return service
1265
1266    def get_srp_server_port(self):
1267        """Returns the SRP server UDP port by parsing
1268           the SRP Server Data in Network Data.
1269        """
1270
1271        for service in self.get_services():
1272            # TODO: for now, we are using 0xfd as the SRP service data.
1273            #       May use a dedicated bit flag for SRP server.
1274            if int(service[1], 16) == 0x5d:
1275                # The SRP server data contains IPv6 address (16 bytes)
1276                # followed by UDP port number.
1277                return int(service[2][2 * 16:], 16)
1278
1279    def srp_client_start(self, server_address, server_port):
1280        self.send_command(f'srp client start {server_address} {server_port}')
1281        self._expect_done()
1282
1283    def srp_client_stop(self):
1284        self.send_command(f'srp client stop')
1285        self._expect_done()
1286
1287    def srp_client_get_state(self):
1288        cmd = 'srp client state'
1289        self.send_command(cmd)
1290        return self._expect_command_output()[0]
1291
1292    def srp_client_get_auto_start_mode(self):
1293        cmd = 'srp client autostart'
1294        self.send_command(cmd)
1295        return self._expect_command_output()[0]
1296
1297    def srp_client_enable_auto_start_mode(self):
1298        self.send_command(f'srp client autostart enable')
1299        self._expect_done()
1300
1301    def srp_client_disable_auto_start_mode(self):
1302        self.send_command(f'srp client autostart disable')
1303        self._expect_done()
1304
1305    def srp_client_get_server_address(self):
1306        cmd = 'srp client server address'
1307        self.send_command(cmd)
1308        return self._expect_command_output()[0]
1309
1310    def srp_client_get_server_port(self):
1311        cmd = 'srp client server port'
1312        self.send_command(cmd)
1313        return int(self._expect_command_output()[0])
1314
1315    def srp_client_get_host_state(self):
1316        cmd = 'srp client host state'
1317        self.send_command(cmd)
1318        return self._expect_command_output()[0]
1319
1320    def srp_client_set_host_name(self, name):
1321        self.send_command(f'srp client host name {name}')
1322        self._expect_done()
1323
1324    def srp_client_get_host_name(self):
1325        self.send_command(f'srp client host name')
1326        self._expect_done()
1327
1328    def srp_client_remove_host(self, remove_key=False, send_unreg_to_server=False):
1329        self.send_command(f'srp client host remove {int(remove_key)} {int(send_unreg_to_server)}')
1330        self._expect_done()
1331
1332    def srp_client_clear_host(self):
1333        self.send_command(f'srp client host clear')
1334        self._expect_done()
1335
1336    def srp_client_enable_auto_host_address(self):
1337        self.send_command(f'srp client host address auto')
1338        self._expect_done()
1339
1340    def srp_client_set_host_address(self, *addrs: str):
1341        self.send_command(f'srp client host address {" ".join(addrs)}')
1342        self._expect_done()
1343
1344    def srp_client_get_host_address(self):
1345        self.send_command(f'srp client host address')
1346        self._expect_done()
1347
1348    def srp_client_add_service(self,
1349                               instance_name,
1350                               service_name,
1351                               port,
1352                               priority=0,
1353                               weight=0,
1354                               txt_entries=[],
1355                               lease=0,
1356                               key_lease=0):
1357        txt_record = "".join(self._encode_txt_entry(entry) for entry in txt_entries)
1358        if txt_record == '':
1359            txt_record = '-'
1360        instance_name = self._escape_escapable(instance_name)
1361        self.send_command(
1362            f'srp client service add {instance_name} {service_name} {port} {priority} {weight} {txt_record} {lease} {key_lease}'
1363        )
1364        self._expect_done()
1365
1366    def srp_client_remove_service(self, instance_name, service_name):
1367        self.send_command(f'srp client service remove {instance_name} {service_name}')
1368        self._expect_done()
1369
1370    def srp_client_clear_service(self, instance_name, service_name):
1371        self.send_command(f'srp client service clear {instance_name} {service_name}')
1372        self._expect_done()
1373
1374    def srp_client_get_services(self):
1375        cmd = 'srp client service'
1376        self.send_command(cmd)
1377        service_lines = self._expect_command_output()
1378        return [self._parse_srp_client_service(line) for line in service_lines]
1379
1380    def srp_client_set_lease_interval(self, leaseinterval: int):
1381        cmd = f'srp client leaseinterval {leaseinterval}'
1382        self.send_command(cmd)
1383        self._expect_done()
1384
1385    def srp_client_get_lease_interval(self) -> int:
1386        cmd = 'srp client leaseinterval'
1387        self.send_command(cmd)
1388        return int(self._expect_result('\d+'))
1389
1390    def srp_client_set_key_lease_interval(self, leaseinterval: int):
1391        cmd = f'srp client keyleaseinterval {leaseinterval}'
1392        self.send_command(cmd)
1393        self._expect_done()
1394
1395    def srp_client_get_key_lease_interval(self) -> int:
1396        cmd = 'srp client keyleaseinterval'
1397        self.send_command(cmd)
1398        return int(self._expect_result('\d+'))
1399
1400    def srp_client_set_ttl(self, ttl: int):
1401        cmd = f'srp client ttl {ttl}'
1402        self.send_command(cmd)
1403        self._expect_done()
1404
1405    def srp_client_get_ttl(self) -> int:
1406        cmd = 'srp client ttl'
1407        self.send_command(cmd)
1408        return int(self._expect_result('\d+'))
1409
1410    #
1411    # TREL utilities
1412    #
1413
1414    def enable_trel(self):
1415        cmd = 'trel enable'
1416        self.send_command(cmd)
1417        self._expect_done()
1418
1419    def is_trel_enabled(self) -> Union[None, bool]:
1420        states = [r'Disabled', r'Enabled']
1421        self.send_command('trel')
1422        try:
1423            return self._expect_result(states) == 'Enabled'
1424        except Exception as ex:
1425            if 'InvalidCommand' in str(ex):
1426                return None
1427
1428            raise
1429
1430    def get_trel_counters(self):
1431        cmd = 'trel counters'
1432        self.send_command(cmd)
1433        result = self._expect_command_output()
1434
1435        counters = {}
1436        for line in result:
1437            m = re.match(r'(\w+)\:[^\d]+(\d+)[^\d]+(\d+)(?:[^\d]+(\d+))?', line)
1438            if m:
1439                groups = m.groups()
1440                sub_counters = {
1441                    'packets': int(groups[1]),
1442                    'bytes': int(groups[2]),
1443                }
1444                if groups[3]:
1445                    sub_counters['failures'] = int(groups[3])
1446                counters[groups[0]] = sub_counters
1447        return counters
1448
1449    def reset_trel_counters(self):
1450        cmd = 'trel counters reset'
1451        self.send_command(cmd)
1452        self._expect_done()
1453
1454    def set_epskc(self, keystring: str, timeout=120000, port=0):
1455        cmd = 'ba ephemeralkey set ' + keystring + ' ' + str(timeout) + ' ' + str(port)
1456        self.send_command(cmd)
1457        self._expect(r"(Done|Error .*)")
1458
1459    def clear_epskc(self):
1460        cmd = 'ba ephemeralkey clear'
1461        self.send_command(cmd)
1462        self._expect_done()
1463
1464    def get_border_agent_counters(self):
1465        cmd = 'ba counters'
1466        self.send_command(cmd)
1467        result = self._expect_command_output()
1468
1469        counters = {}
1470        for line in result:
1471            m = re.match(r'(\w+)\: (\d+)', line)
1472            if m:
1473                counter_name = m.group(1)
1474                counter_value = m.group(2)
1475
1476                counters[counter_name] = int(counter_value)
1477        return counters
1478
1479    def _encode_txt_entry(self, entry):
1480        """Encodes the TXT entry to the DNS-SD TXT record format as a HEX string.
1481
1482           Example usage:
1483           self._encode_txt_entries(['abc'])     -> '03616263'
1484           self._encode_txt_entries(['def='])    -> '046465663d'
1485           self._encode_txt_entries(['xyz=XYZ']) -> '0778797a3d58595a'
1486        """
1487        return '{:02x}'.format(len(entry)) + "".join("{:02x}".format(ord(c)) for c in entry)
1488
1489    def _parse_srp_client_service(self, line: str):
1490        """Parse one line of srp service list into a dictionary which
1491           maps string keys to string values.
1492
1493           Example output for input
1494           'instance:\"%s\", name:\"%s\", state:%s, port:%d, priority:%d, weight:%d"'
1495           {
1496               'instance': 'my-service',
1497               'name': '_ipps._udp',
1498               'state': 'ToAdd',
1499               'port': '12345',
1500               'priority': '0',
1501               'weight': '0'
1502           }
1503
1504           Note that value of 'port', 'priority' and 'weight' are represented
1505           as strings but not integers.
1506        """
1507        key_values = [word.strip().split(':') for word in line.split(', ')]
1508        keys = [key_value[0] for key_value in key_values]
1509        values = [key_value[1].strip('"') for key_value in key_values]
1510        return dict(zip(keys, values))
1511
1512    def locate(self, anycast_addr):
1513        cmd = 'locate ' + anycast_addr
1514        self.send_command(cmd)
1515        self.simulator.go(5)
1516        return self._parse_locate_result(self._expect_command_output()[0])
1517
1518    def _parse_locate_result(self, line: str):
1519        """Parse anycast locate result as list of ml-eid and rloc16.
1520
1521           Example output for input
1522           'fd00:db8:0:0:acf9:9d0:7f3c:b06e 0xa800'
1523
1524           [ 'fd00:db8:0:0:acf9:9d0:7f3c:b06e', '0xa800' ]
1525        """
1526        return line.split(' ')
1527
1528    def enable_backbone_router(self):
1529        cmd = 'bbr enable'
1530        self.send_command(cmd)
1531        self._expect_done()
1532
1533    def disable_backbone_router(self):
1534        cmd = 'bbr disable'
1535        self.send_command(cmd)
1536        self._expect_done()
1537
1538    def register_backbone_router(self):
1539        cmd = 'bbr register'
1540        self.send_command(cmd)
1541        self._expect_done()
1542
1543    def get_backbone_router_state(self):
1544        states = [r'Disabled', r'Primary', r'Secondary']
1545        self.send_command('bbr state')
1546        return self._expect_result(states)
1547
1548    @property
1549    def is_primary_backbone_router(self) -> bool:
1550        return self.get_backbone_router_state() == 'Primary'
1551
1552    def get_backbone_router(self):
1553        cmd = 'bbr config'
1554        self.send_command(cmd)
1555        self._expect(r'(.*)Done')
1556        g = self.pexpect.match.groups()
1557        output = g[0].decode("utf-8")
1558        lines = output.strip().split('\n')
1559        lines = [l.strip() for l in lines]
1560        ret = {}
1561        for l in lines:
1562            z = re.search(r'seqno:\s+([0-9]+)', l)
1563            if z:
1564                ret['seqno'] = int(z.groups()[0])
1565
1566            z = re.search(r'delay:\s+([0-9]+)', l)
1567            if z:
1568                ret['delay'] = int(z.groups()[0])
1569
1570            z = re.search(r'timeout:\s+([0-9]+)', l)
1571            if z:
1572                ret['timeout'] = int(z.groups()[0])
1573
1574        return ret
1575
1576    def set_backbone_router(self, seqno=None, reg_delay=None, mlr_timeout=None):
1577        cmd = 'bbr config'
1578
1579        if seqno is not None:
1580            cmd += ' seqno %d' % seqno
1581
1582        if reg_delay is not None:
1583            cmd += ' delay %d' % reg_delay
1584
1585        if mlr_timeout is not None:
1586            cmd += ' timeout %d' % mlr_timeout
1587
1588        self.send_command(cmd)
1589        self._expect_done()
1590
1591    def set_domain_prefix(self, prefix, flags='prosD'):
1592        self.add_prefix(prefix, flags)
1593        self.register_netdata()
1594
1595    def remove_domain_prefix(self, prefix):
1596        self.remove_prefix(prefix)
1597        self.register_netdata()
1598
1599    def set_next_dua_response(self, status: Union[str, int], iid=None):
1600        # Convert 5.00 to COAP CODE 160
1601        if isinstance(status, str):
1602            assert '.' in status
1603            status = status.split('.')
1604            status = (int(status[0]) << 5) + int(status[1])
1605
1606        cmd = 'bbr mgmt dua {}'.format(status)
1607        if iid is not None:
1608            cmd += ' ' + str(iid)
1609        self.send_command(cmd)
1610        self._expect_done()
1611
1612    def set_dua_iid(self, iid: str):
1613        assert len(iid) == 16
1614        int(iid, 16)
1615
1616        cmd = 'dua iid {}'.format(iid)
1617        self.send_command(cmd)
1618        self._expect_done()
1619
1620    def clear_dua_iid(self):
1621        cmd = 'dua iid clear'
1622        self.send_command(cmd)
1623        self._expect_done()
1624
1625    def multicast_listener_list(self) -> Dict[IPv6Address, int]:
1626        cmd = 'bbr mgmt mlr listener'
1627        self.send_command(cmd)
1628
1629        table = {}
1630        for line in self._expect_results("\S+ \d+"):
1631            line = line.split()
1632            assert len(line) == 2, line
1633            ip = IPv6Address(line[0])
1634            timeout = int(line[1])
1635            assert ip not in table
1636
1637            table[ip] = timeout
1638
1639        return table
1640
1641    def multicast_listener_clear(self):
1642        cmd = f'bbr mgmt mlr listener clear'
1643        self.send_command(cmd)
1644        self._expect_done()
1645
1646    def multicast_listener_add(self, ip: Union[IPv6Address, str], timeout: int = 0):
1647        if not isinstance(ip, IPv6Address):
1648            ip = IPv6Address(ip)
1649
1650        cmd = f'bbr mgmt mlr listener add {ip.compressed} {timeout}'
1651        self.send_command(cmd)
1652        self._expect(r"(Done|Error .*)")
1653
1654    def set_next_mlr_response(self, status: int):
1655        cmd = 'bbr mgmt mlr response {}'.format(status)
1656        self.send_command(cmd)
1657        self._expect_done()
1658
1659    def register_multicast_listener(self, *ipaddrs: Union[IPv6Address, str], timeout=None):
1660        assert len(ipaddrs) > 0, ipaddrs
1661
1662        ipaddrs = map(str, ipaddrs)
1663        cmd = f'mlr reg {" ".join(ipaddrs)}'
1664        if timeout is not None:
1665            cmd += f' {int(timeout)}'
1666        self.send_command(cmd)
1667        self.simulator.go(3)
1668        lines = self._expect_command_output()
1669        m = re.match(r'status (\d+), (\d+) failed', lines[0])
1670        assert m is not None, lines
1671        status = int(m.group(1))
1672        failed_num = int(m.group(2))
1673        assert failed_num == len(lines) - 1
1674        failed_ips = list(map(IPv6Address, lines[1:]))
1675        print(f"register_multicast_listener {ipaddrs} => status: {status}, failed ips: {failed_ips}")
1676        return status, failed_ips
1677
1678    def set_link_quality(self, addr, lqi):
1679        cmd = 'macfilter rss add-lqi %s %s' % (addr, lqi)
1680        self.send_command(cmd)
1681        self._expect_done()
1682
1683    def set_outbound_link_quality(self, lqi):
1684        cmd = 'macfilter rss add-lqi * %s' % (lqi)
1685        self.send_command(cmd)
1686        self._expect_done()
1687
1688    def remove_allowlist(self, addr):
1689        cmd = 'macfilter addr remove %s' % addr
1690        self.send_command(cmd)
1691        self._expect_done()
1692
1693    def get_addr16(self):
1694        self.send_command('rloc16')
1695        rloc16 = self._expect_result(r'[0-9a-fA-F]{4}')
1696        return int(rloc16, 16)
1697
1698    def get_router_id(self):
1699        rloc16 = self.get_addr16()
1700        return rloc16 >> 10
1701
1702    def get_addr64(self):
1703        self.send_command('extaddr')
1704        return self._expect_result('[0-9a-fA-F]{16}')
1705
1706    def set_addr64(self, addr64: str):
1707        # Make sure `addr64` is a hex string of length 16
1708        assert len(addr64) == 16
1709        int(addr64, 16)
1710        self.send_command('extaddr %s' % addr64)
1711        self._expect_done()
1712
1713    def get_eui64(self):
1714        self.send_command('eui64')
1715        return self._expect_result('[0-9a-fA-F]{16}')
1716
1717    def set_extpanid(self, extpanid):
1718        self.send_command('extpanid %s' % extpanid)
1719        self._expect_done()
1720
1721    def get_extpanid(self):
1722        self.send_command('extpanid')
1723        return self._expect_result('[0-9a-fA-F]{16}')
1724
1725    def get_mesh_local_prefix(self):
1726        self.send_command('prefix meshlocal')
1727        return self._expect_command_output()[0]
1728
1729    def set_mesh_local_prefix(self, mesh_local_prefix):
1730        self.send_command('prefix meshlocal %s' % mesh_local_prefix)
1731        self._expect_done()
1732
1733    def get_joiner_id(self):
1734        self.send_command('joiner id')
1735        return self._expect_result('[0-9a-fA-F]{16}')
1736
1737    def get_channel(self):
1738        self.send_command('channel')
1739        return int(self._expect_result(r'\d+'))
1740
1741    def set_channel(self, channel):
1742        cmd = 'channel %d' % channel
1743        self.send_command(cmd)
1744        self._expect_done()
1745
1746    def get_networkkey(self):
1747        self.send_command('networkkey')
1748        return self._expect_result('[0-9a-fA-F]{32}')
1749
1750    def set_networkkey(self, networkkey):
1751        cmd = 'networkkey %s' % networkkey
1752        self.send_command(cmd)
1753        self._expect_done()
1754
1755    def get_key_sequence_counter(self):
1756        self.send_command('keysequence counter')
1757        result = self._expect_result(r'\d+')
1758        return int(result)
1759
1760    def set_key_sequence_counter(self, key_sequence_counter):
1761        cmd = 'keysequence counter %d' % key_sequence_counter
1762        self.send_command(cmd)
1763        self._expect_done()
1764
1765    def get_key_switch_guardtime(self):
1766        self.send_command('keysequence guardtime')
1767        return int(self._expect_result(r'\d+'))
1768
1769    def set_key_switch_guardtime(self, key_switch_guardtime):
1770        cmd = 'keysequence guardtime %d' % key_switch_guardtime
1771        self.send_command(cmd)
1772        self._expect_done()
1773
1774    def set_network_id_timeout(self, network_id_timeout):
1775        cmd = 'networkidtimeout %d' % network_id_timeout
1776        self.send_command(cmd)
1777        self._expect_done()
1778
1779    def _escape_escapable(self, string):
1780        """Escape CLI escapable characters in the given string.
1781
1782        Args:
1783            string (str): UTF-8 input string.
1784
1785        Returns:
1786            [str]: The modified string with escaped characters.
1787        """
1788        escapable_chars = '\\ \t\r\n'
1789        for char in escapable_chars:
1790            string = string.replace(char, '\\%s' % char)
1791        return string
1792
1793    def get_network_name(self):
1794        self.send_command('networkname')
1795        return self._expect_result([r'\S+'])
1796
1797    def set_network_name(self, network_name):
1798        cmd = 'networkname %s' % self._escape_escapable(network_name)
1799        self.send_command(cmd)
1800        self._expect_done()
1801
1802    def get_panid(self):
1803        self.send_command('panid')
1804        result = self._expect_result('0x[0-9a-fA-F]{4}')
1805        return int(result, 16)
1806
1807    def set_panid(self, panid=config.PANID):
1808        cmd = 'panid %d' % panid
1809        self.send_command(cmd)
1810        self._expect_done()
1811
1812    def set_parent_priority(self, priority):
1813        cmd = 'parentpriority %d' % priority
1814        self.send_command(cmd)
1815        self._expect_done()
1816
1817    def get_partition_id(self):
1818        self.send_command('partitionid')
1819        return self._expect_result(r'\d+')
1820
1821    def get_preferred_partition_id(self):
1822        self.send_command('partitionid preferred')
1823        return self._expect_result(r'\d+')
1824
1825    def set_preferred_partition_id(self, partition_id):
1826        cmd = 'partitionid preferred %d' % partition_id
1827        self.send_command(cmd)
1828        self._expect_done()
1829
1830    def get_pollperiod(self):
1831        self.send_command('pollperiod')
1832        return self._expect_result(r'\d+')
1833
1834    def set_pollperiod(self, pollperiod):
1835        self.send_command('pollperiod %d' % pollperiod)
1836        self._expect_done()
1837
1838    def get_child_supervision_interval(self):
1839        self.send_command('childsupervision interval')
1840        return self._expect_result(r'\d+')
1841
1842    def set_child_supervision_interval(self, interval):
1843        self.send_command('childsupervision interval %d' % interval)
1844        self._expect_done()
1845
1846    def get_child_supervision_check_timeout(self):
1847        self.send_command('childsupervision checktimeout')
1848        return self._expect_result(r'\d+')
1849
1850    def set_child_supervision_check_timeout(self, timeout):
1851        self.send_command('childsupervision checktimeout %d' % timeout)
1852        self._expect_done()
1853
1854    def get_child_supervision_check_failure_counter(self):
1855        self.send_command('childsupervision failcounter')
1856        return self._expect_result(r'\d+')
1857
1858    def reset_child_supervision_check_failure_counter(self):
1859        self.send_command('childsupervision failcounter reset')
1860        self._expect_done()
1861
1862    def get_csl_info(self):
1863        self.send_command('csl')
1864        return self._expect_key_value_pairs(r'\S+')
1865
1866    def set_csl_channel(self, csl_channel):
1867        self.send_command('csl channel %d' % csl_channel)
1868        self._expect_done()
1869
1870    def set_csl_period(self, csl_period):
1871        self.send_command('csl period %d' % csl_period)
1872        self._expect_done()
1873
1874    def set_csl_timeout(self, csl_timeout):
1875        self.send_command('csl timeout %d' % csl_timeout)
1876        self._expect_done()
1877
1878    def send_mac_emptydata(self):
1879        self.send_command('mac send emptydata')
1880        self._expect_done()
1881
1882    def send_mac_datarequest(self):
1883        self.send_command('mac send datarequest')
1884        self._expect_done()
1885
1886    def set_router_upgrade_threshold(self, threshold):
1887        cmd = 'routerupgradethreshold %d' % threshold
1888        self.send_command(cmd)
1889        self._expect_done()
1890
1891    def set_router_downgrade_threshold(self, threshold):
1892        cmd = 'routerdowngradethreshold %d' % threshold
1893        self.send_command(cmd)
1894        self._expect_done()
1895
1896    def get_router_downgrade_threshold(self) -> int:
1897        self.send_command('routerdowngradethreshold')
1898        return int(self._expect_result(r'\d+'))
1899
1900    def set_router_eligible(self, enable: bool):
1901        cmd = f'routereligible {"enable" if enable else "disable"}'
1902        self.send_command(cmd)
1903        self._expect_done()
1904
1905    def get_router_eligible(self) -> bool:
1906        states = [r'Disabled', r'Enabled']
1907        self.send_command('routereligible')
1908        return self._expect_result(states) == 'Enabled'
1909
1910    def prefer_router_id(self, router_id):
1911        cmd = 'preferrouterid %d' % router_id
1912        self.send_command(cmd)
1913        self._expect_done()
1914
1915    def release_router_id(self, router_id):
1916        cmd = 'releaserouterid %d' % router_id
1917        self.send_command(cmd)
1918        self._expect_done()
1919
1920    def get_state(self):
1921        states = [r'detached', r'child', r'router', r'leader', r'disabled']
1922        self.send_command('state')
1923        return self._expect_result(states)
1924
1925    def set_state(self, state):
1926        cmd = 'state %s' % state
1927        self.send_command(cmd)
1928        self._expect_done()
1929
1930    def get_ephemeral_key_state(self):
1931        cmd = 'ba ephemeralkey'
1932        states = [r'inactive', r'active']
1933        self.send_command(cmd)
1934        return self._expect_result(states)
1935
1936    def get_timeout(self):
1937        self.send_command('childtimeout')
1938        return self._expect_result(r'\d+')
1939
1940    def set_timeout(self, timeout):
1941        cmd = 'childtimeout %d' % timeout
1942        self.send_command(cmd)
1943        self._expect_done()
1944
1945    def set_max_children(self, number):
1946        cmd = 'childmax %d' % number
1947        self.send_command(cmd)
1948        self._expect_done()
1949
1950    def get_weight(self):
1951        self.send_command('leaderweight')
1952        return self._expect_result(r'\d+')
1953
1954    def set_weight(self, weight):
1955        cmd = 'leaderweight %d' % weight
1956        self.send_command(cmd)
1957        self._expect_done()
1958
1959    def add_ipaddr(self, ipaddr):
1960        cmd = 'ipaddr add %s' % ipaddr
1961        self.send_command(cmd)
1962        self._expect_done()
1963
1964    def del_ipaddr(self, ipaddr):
1965        cmd = 'ipaddr del %s' % ipaddr
1966        self.send_command(cmd)
1967        self._expect_done()
1968
1969    def add_ipmaddr(self, ipmaddr):
1970        cmd = 'ipmaddr add %s' % ipmaddr
1971        self.send_command(cmd)
1972        self._expect_done()
1973
1974    def del_ipmaddr(self, ipmaddr):
1975        cmd = 'ipmaddr del %s' % ipmaddr
1976        self.send_command(cmd)
1977        self._expect_done()
1978
1979    def get_addrs(self, verbose=False):
1980        self.send_command('ipaddr' + (' -v' if verbose else ''))
1981
1982        return self._expect_results(r'\S+(:\S*)+')
1983
1984    def get_mleid(self):
1985        self.send_command('ipaddr mleid')
1986        return self._expect_result(r'\S+(:\S*)+')
1987
1988    def get_linklocal(self):
1989        self.send_command('ipaddr linklocal')
1990        return self._expect_result(r'\S+(:\S*)+')
1991
1992    def get_rloc(self):
1993        self.send_command('ipaddr rloc')
1994        return self._expect_result(r'\S+(:\S*)+')
1995
1996    def get_addr(self, prefix):
1997        network = ipaddress.ip_network(u'%s' % str(prefix))
1998        addrs = self.get_addrs()
1999
2000        for addr in addrs:
2001            if isinstance(addr, bytearray):
2002                addr = bytes(addr)
2003            ipv6_address = ipaddress.ip_address(addr)
2004            if ipv6_address in network:
2005                return ipv6_address.exploded
2006
2007        return None
2008
2009    def has_ipaddr(self, address):
2010        ipaddr = ipaddress.ip_address(address)
2011        ipaddrs = self.get_addrs()
2012        for addr in ipaddrs:
2013            if isinstance(addr, bytearray):
2014                addr = bytes(addr)
2015            if ipaddress.ip_address(addr) == ipaddr:
2016                return True
2017        return False
2018
2019    def get_ipmaddrs(self):
2020        self.send_command('ipmaddr')
2021        return self._expect_results(r'\S+(:\S*)+')
2022
2023    def has_ipmaddr(self, address):
2024        ipmaddr = ipaddress.ip_address(address)
2025        ipmaddrs = self.get_ipmaddrs()
2026        for addr in ipmaddrs:
2027            if isinstance(addr, bytearray):
2028                addr = bytes(addr)
2029            if ipaddress.ip_address(addr) == ipmaddr:
2030                return True
2031        return False
2032
2033    def get_addr_leader_aloc(self):
2034        addrs = self.get_addrs()
2035        for addr in addrs:
2036            segs = addr.split(':')
2037            if (segs[4] == '0' and segs[5] == 'ff' and segs[6] == 'fe00' and segs[7] == 'fc00'):
2038                return addr
2039        return None
2040
2041    def get_mleid_iid(self):
2042        ml_eid = IPv6Address(self.get_mleid())
2043        return ml_eid.packed[8:].hex()
2044
2045    def get_eidcaches(self):
2046        eidcaches = []
2047        self.send_command('eidcache')
2048        for line in self._expect_results(r'([a-fA-F0-9\:]+) ([a-fA-F0-9]+)'):
2049            eidcaches.append(line.split())
2050
2051        return eidcaches
2052
2053    def add_service(self, enterpriseNumber, serviceData, serverData):
2054        cmd = 'service add %s %s %s' % (
2055            enterpriseNumber,
2056            serviceData,
2057            serverData,
2058        )
2059        self.send_command(cmd)
2060        self._expect_done()
2061
2062    def remove_service(self, enterpriseNumber, serviceData):
2063        cmd = 'service remove %s %s' % (enterpriseNumber, serviceData)
2064        self.send_command(cmd)
2065        self._expect_done()
2066
2067    def get_child_table(self) -> Dict[int, Dict[str, Any]]:
2068        """Get the table of attached children."""
2069        cmd = 'child table'
2070        self.send_command(cmd)
2071        output = self._expect_command_output()
2072
2073        #
2074        # Example output:
2075        # | ID  | RLOC16 | Timeout    | Age        | LQ In | C_VN |R|D|N|Ver|CSL|QMsgCnt|Suprvsn| Extended MAC     |
2076        # +-----+--------+------------+------------+-------+------+-+-+-+---+---+-------+-------+------------------+
2077        # |   1 | 0xc801 |        240 |         24 |     3 |  131 |1|0|0|  3| 0 |     0 |   129 | 4ecede68435358ac |
2078        # |   2 | 0xc802 |        240 |          2 |     3 |  131 |0|0|0|  3| 1 |     0 |     0 | a672a601d2ce37d8 |
2079        # Done
2080        #
2081
2082        headers = self.__split_table_row(output[0])
2083
2084        table = {}
2085        for line in output[2:]:
2086            line = line.strip()
2087            if not line:
2088                continue
2089
2090            fields = self.__split_table_row(line)
2091            col = lambda colname: self.__get_table_col(colname, headers, fields)
2092
2093            id = int(col("ID"))
2094            r, d, n = int(col("R")), int(col("D")), int(col("N"))
2095            mode = f'{"r" if r else ""}{"d" if d else ""}{"n" if n else ""}'
2096
2097            table[int(id)] = {
2098                'id': int(id),
2099                'rloc16': int(col('RLOC16'), 16),
2100                'timeout': int(col('Timeout')),
2101                'age': int(col('Age')),
2102                'lq_in': int(col('LQ In')),
2103                'c_vn': int(col('C_VN')),
2104                'mode': mode,
2105                'extaddr': col('Extended MAC'),
2106                'ver': int(col('Ver')),
2107                'csl': bool(int(col('CSL'))),
2108                'qmsgcnt': int(col('QMsgCnt')),
2109                'suprvsn': int(col('Suprvsn'))
2110            }
2111
2112        return table
2113
2114    def __split_table_row(self, row: str) -> List[str]:
2115        if not (row.startswith('|') and row.endswith('|')):
2116            raise ValueError(row)
2117
2118        fields = row.split('|')
2119        fields = [x.strip() for x in fields[1:-1]]
2120        return fields
2121
2122    def __get_table_col(self, colname: str, headers: List[str], fields: List[str]) -> str:
2123        return fields[headers.index(colname)]
2124
2125    def __getOmrAddress(self):
2126        prefixes = [prefix.split('::')[0] for prefix in self.get_prefixes()]
2127        omr_addrs = []
2128        for addr in self.get_addrs():
2129            for prefix in prefixes:
2130                if (addr.startswith(prefix)) and (addr != self.__getDua()):
2131                    omr_addrs.append(addr)
2132                    break
2133
2134        return omr_addrs
2135
2136    def __getLinkLocalAddress(self):
2137        for ip6Addr in self.get_addrs():
2138            if re.match(config.LINK_LOCAL_REGEX_PATTERN, ip6Addr, re.I):
2139                return ip6Addr
2140
2141        return None
2142
2143    def __getGlobalAddress(self):
2144        global_address = []
2145        for ip6Addr in self.get_addrs():
2146            if ((not re.match(config.LINK_LOCAL_REGEX_PATTERN, ip6Addr, re.I)) and
2147                (not re.match(config.MESH_LOCAL_PREFIX_REGEX_PATTERN, ip6Addr, re.I)) and
2148                (not re.match(config.ROUTING_LOCATOR_REGEX_PATTERN, ip6Addr, re.I))):
2149                global_address.append(ip6Addr)
2150
2151        return global_address
2152
2153    def __getRloc(self):
2154        for ip6Addr in self.get_addrs():
2155            if (re.match(config.MESH_LOCAL_PREFIX_REGEX_PATTERN, ip6Addr, re.I) and
2156                    re.match(config.ROUTING_LOCATOR_REGEX_PATTERN, ip6Addr, re.I) and
2157                    not (re.match(config.ALOC_FLAG_REGEX_PATTERN, ip6Addr, re.I))):
2158                return ip6Addr
2159        return None
2160
2161    def __getAloc(self):
2162        aloc = []
2163        for ip6Addr in self.get_addrs():
2164            if (re.match(config.MESH_LOCAL_PREFIX_REGEX_PATTERN, ip6Addr, re.I) and
2165                    re.match(config.ROUTING_LOCATOR_REGEX_PATTERN, ip6Addr, re.I) and
2166                    re.match(config.ALOC_FLAG_REGEX_PATTERN, ip6Addr, re.I)):
2167                aloc.append(ip6Addr)
2168
2169        return aloc
2170
2171    def __getMleid(self):
2172        for ip6Addr in self.get_addrs():
2173            if re.match(config.MESH_LOCAL_PREFIX_REGEX_PATTERN, ip6Addr,
2174                        re.I) and not (re.match(config.ROUTING_LOCATOR_REGEX_PATTERN, ip6Addr, re.I)):
2175                return ip6Addr
2176
2177        return None
2178
2179    def __getDua(self) -> Optional[str]:
2180        for ip6Addr in self.get_addrs():
2181            if re.match(config.DOMAIN_PREFIX_REGEX_PATTERN, ip6Addr, re.I):
2182                return ip6Addr
2183
2184        return None
2185
2186    def get_ip6_address_by_prefix(self, prefix: Union[str, IPv6Network]) -> List[IPv6Address]:
2187        """Get addresses matched with given prefix.
2188
2189        Args:
2190            prefix: the prefix to match against.
2191                    Can be either a string or ipaddress.IPv6Network.
2192
2193        Returns:
2194            The IPv6 address list.
2195        """
2196        if isinstance(prefix, str):
2197            prefix = IPv6Network(prefix)
2198        addrs = map(IPv6Address, self.get_addrs())
2199
2200        return [addr for addr in addrs if addr in prefix]
2201
2202    def get_ip6_address(self, address_type):
2203        """Get specific type of IPv6 address configured on thread device.
2204
2205        Args:
2206            address_type: the config.ADDRESS_TYPE type of IPv6 address.
2207
2208        Returns:
2209            IPv6 address string.
2210        """
2211        if address_type == config.ADDRESS_TYPE.LINK_LOCAL:
2212            return self.__getLinkLocalAddress()
2213        elif address_type == config.ADDRESS_TYPE.GLOBAL:
2214            return self.__getGlobalAddress()
2215        elif address_type == config.ADDRESS_TYPE.RLOC:
2216            return self.__getRloc()
2217        elif address_type == config.ADDRESS_TYPE.ALOC:
2218            return self.__getAloc()
2219        elif address_type == config.ADDRESS_TYPE.ML_EID:
2220            return self.__getMleid()
2221        elif address_type == config.ADDRESS_TYPE.DUA:
2222            return self.__getDua()
2223        elif address_type == config.ADDRESS_TYPE.BACKBONE_GUA:
2224            return self._getBackboneGua()
2225        elif address_type == config.ADDRESS_TYPE.OMR:
2226            return self.__getOmrAddress()
2227        else:
2228            return None
2229
2230    def get_context_reuse_delay(self):
2231        self.send_command('contextreusedelay')
2232        return self._expect_result(r'\d+')
2233
2234    def set_context_reuse_delay(self, delay):
2235        cmd = 'contextreusedelay %d' % delay
2236        self.send_command(cmd)
2237        self._expect_done()
2238
2239    def add_prefix(self, prefix, flags='paosr', prf='med'):
2240        cmd = 'prefix add %s %s %s' % (prefix, flags, prf)
2241        self.send_command(cmd)
2242        self._expect_done()
2243
2244    def remove_prefix(self, prefix):
2245        cmd = 'prefix remove %s' % prefix
2246        self.send_command(cmd)
2247        self._expect_done()
2248
2249    #
2250    # BR commands
2251    #
2252    def enable_br(self):
2253        self.send_command('br enable')
2254        self._expect_done()
2255
2256    def disable_br(self):
2257        self.send_command('br disable')
2258        self._expect_done()
2259
2260    def get_br_omr_prefix(self):
2261        cmd = 'br omrprefix local'
2262        self.send_command(cmd)
2263        return self._expect_command_output()[0]
2264
2265    def get_br_peers(self) -> List[str]:
2266        # Example output of `br peers` command:
2267        #   rloc16:0xa800 age:00:00:50
2268        #   rloc16:0x6800 age:00:00:51
2269        #   Done
2270        self.send_command('br peers')
2271        return self._expect_command_output()
2272
2273    def get_br_peers_rloc16s(self) -> List[int]:
2274        """parse `br peers` output and return the list of RLOC16s"""
2275        return [
2276            int(pair.split(':')[1], 16)
2277            for line in self.get_br_peers()
2278            for pair in line.split()
2279            if pair.split(':')[0] == 'rloc16'
2280        ]
2281
2282    def get_br_routers(self) -> List[str]:
2283        # Example output of `br routers` command:
2284        #   fe80:0:0:0:42:acff:fe14:3 (M:0 O:0 Stub:1) ms-since-rx:144160 reachable:yes age:00:17:36 (peer BR)
2285        #   fe80:0:0:0:42:acff:fe14:2 (M:0 O:0 Stub:1) ms-since-rx:45179 reachable:yes age:00:17:36
2286        #   Done
2287        self.send_command('br routers')
2288        return self._expect_command_output()
2289
2290    def get_br_routers_ip_addresses(self) -> List[IPv6Address]:
2291        """parse `br routers` output and return the list of IPv6 addresses"""
2292        return [IPv6Address(line.split()[0]) for line in self.get_br_routers()]
2293
2294    def get_netdata_omr_prefixes(self):
2295        omr_prefixes = []
2296        for prefix in self.get_prefixes():
2297            prefix, flags = prefix.split()[:2]
2298            if 'a' in flags and 'o' in flags and 's' in flags and 'D' not in flags:
2299                omr_prefixes.append(prefix)
2300
2301        return omr_prefixes
2302
2303    def get_br_on_link_prefix(self):
2304        cmd = 'br onlinkprefix local'
2305        self.send_command(cmd)
2306        return self._expect_command_output()[0]
2307
2308    def get_netdata_non_nat64_routes(self):
2309        nat64_routes = []
2310        routes = self.get_routes()
2311        for route in routes:
2312            if 'n' not in route.split(' ')[1]:
2313                nat64_routes.append(route.split(' ')[0])
2314        return nat64_routes
2315
2316    def get_netdata_nat64_routes(self):
2317        nat64_routes = []
2318        routes = self.get_routes()
2319        for route in routes:
2320            if 'n' in route.split(' ')[1]:
2321                nat64_routes.append(route.split(' ')[0])
2322        return nat64_routes
2323
2324    def get_br_nat64_prefix(self):
2325        cmd = 'br nat64prefix local'
2326        self.send_command(cmd)
2327        return self._expect_command_output()[0]
2328
2329    def get_br_favored_nat64_prefix(self):
2330        cmd = 'br nat64prefix favored'
2331        self.send_command(cmd)
2332        return self._expect_command_output()[0].split(' ')[0]
2333
2334    def enable_nat64(self):
2335        self.send_command(f'nat64 enable')
2336        self._expect_done()
2337
2338    def disable_nat64(self):
2339        self.send_command(f'nat64 disable')
2340        self._expect_done()
2341
2342    def get_nat64_state(self):
2343        self.send_command('nat64 state')
2344        res = {}
2345        for line in self._expect_command_output():
2346            state = line.split(':')
2347            res[state[0].strip()] = state[1].strip()
2348        return res
2349
2350    def get_nat64_mappings(self):
2351        cmd = 'nat64 mappings'
2352        self.send_command(cmd)
2353        result = self._expect_command_output()
2354        session = None
2355        session_counters = None
2356        sessions = []
2357
2358        for line in result:
2359            m = re.match(
2360                r'\|\s+([a-f0-9]+)\s+\|\s+(.+)\s+\|\s+(.+)\s+\|\s+(\d+)s\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|',
2361                line)
2362            if m:
2363                groups = m.groups()
2364                if session:
2365                    session['counters'] = session_counters
2366                    sessions.append(session)
2367                session = {
2368                    'id': groups[0],
2369                    'ip6': groups[1],
2370                    'ip4': groups[2],
2371                    'expiry': int(groups[3]),
2372                }
2373                session_counters = {}
2374                session_counters['total'] = {
2375                    '4to6': {
2376                        'packets': int(groups[4]),
2377                        'bytes': int(groups[5]),
2378                    },
2379                    '6to4': {
2380                        'packets': int(groups[6]),
2381                        'bytes': int(groups[7]),
2382                    },
2383                }
2384                continue
2385            if not session:
2386                continue
2387            m = re.match(r'\|\s+\|\s+(.+)\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|', line)
2388            if m:
2389                groups = m.groups()
2390                session_counters[groups[0]] = {
2391                    '4to6': {
2392                        'packets': int(groups[1]),
2393                        'bytes': int(groups[2]),
2394                    },
2395                    '6to4': {
2396                        'packets': int(groups[3]),
2397                        'bytes': int(groups[4]),
2398                    },
2399                }
2400        if session:
2401            session['counters'] = session_counters
2402            sessions.append(session)
2403        return sessions
2404
2405    def get_nat64_counters(self):
2406        cmd = 'nat64 counters'
2407        self.send_command(cmd)
2408        result = self._expect_command_output()
2409
2410        protocol_counters = {}
2411        error_counters = {}
2412        for line in result:
2413            m = re.match(r'\|\s+(.+)\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|', line)
2414            if m:
2415                groups = m.groups()
2416                protocol_counters[groups[0]] = {
2417                    '4to6': {
2418                        'packets': int(groups[1]),
2419                        'bytes': int(groups[2]),
2420                    },
2421                    '6to4': {
2422                        'packets': int(groups[3]),
2423                        'bytes': int(groups[4]),
2424                    },
2425                }
2426                continue
2427            m = re.match(r'\|\s+(.+)\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|', line)
2428            if m:
2429                groups = m.groups()
2430                error_counters[groups[0]] = {
2431                    '4to6': {
2432                        'packets': int(groups[1]),
2433                    },
2434                    '6to4': {
2435                        'packets': int(groups[2]),
2436                    },
2437                }
2438                continue
2439        return {'protocol': protocol_counters, 'errors': error_counters}
2440
2441    def get_prefixes(self):
2442        return self.get_netdata()['Prefixes']
2443
2444    def get_routes(self):
2445        return self.get_netdata()['Routes']
2446
2447    def get_services(self):
2448        netdata = self.netdata_show()
2449        services = []
2450        services_section = False
2451
2452        for line in netdata:
2453            if line.startswith('Services:'):
2454                services_section = True
2455            elif line.startswith('Contexts'):
2456                services_section = False
2457            elif services_section:
2458                services.append(line.strip().split(' '))
2459        return services
2460
2461    def netdata_show(self):
2462        self.send_command('netdata show')
2463        return self._expect_command_output()
2464
2465    def get_netdata(self):
2466        raw_netdata = self.netdata_show()
2467        netdata = {'Prefixes': [], 'Routes': [], 'Services': [], 'Contexts': [], 'Commissioning': []}
2468        key_list = ['Prefixes', 'Routes', 'Services', 'Contexts', 'Commissioning']
2469        key = None
2470
2471        for i in range(0, len(raw_netdata)):
2472            keys = list(filter(raw_netdata[i].startswith, key_list))
2473            if keys != []:
2474                key = keys[0]
2475            elif key is not None:
2476                netdata[key].append(raw_netdata[i])
2477
2478        return netdata
2479
2480    def add_route(self, prefix, stable=False, nat64=False, prf='med'):
2481        cmd = 'route add %s ' % prefix
2482        if stable:
2483            cmd += 's'
2484        if nat64:
2485            cmd += 'n'
2486        cmd += ' %s' % prf
2487        self.send_command(cmd)
2488        self._expect_done()
2489
2490    def remove_route(self, prefix):
2491        cmd = 'route remove %s' % prefix
2492        self.send_command(cmd)
2493        self._expect_done()
2494
2495    def register_netdata(self):
2496        self.send_command('netdata register')
2497        self._expect_done()
2498
2499    def netdata_publish_dnssrp_anycast(self, seqnum):
2500        self.send_command(f'netdata publish dnssrp anycast {seqnum}')
2501        self._expect_done()
2502
2503    def netdata_publish_dnssrp_unicast(self, address, port):
2504        self.send_command(f'netdata publish dnssrp unicast {address} {port}')
2505        self._expect_done()
2506
2507    def netdata_publish_dnssrp_unicast_mleid(self, port):
2508        self.send_command(f'netdata publish dnssrp unicast {port}')
2509        self._expect_done()
2510
2511    def netdata_unpublish_dnssrp(self):
2512        self.send_command('netdata unpublish dnssrp')
2513        self._expect_done()
2514
2515    def netdata_publish_prefix(self, prefix, flags='paosr', prf='med'):
2516        self.send_command(f'netdata publish prefix {prefix} {flags} {prf}')
2517        self._expect_done()
2518
2519    def netdata_publish_route(self, prefix, flags='s', prf='med'):
2520        self.send_command(f'netdata publish route {prefix} {flags} {prf}')
2521        self._expect_done()
2522
2523    def netdata_publish_replace(self, old_prefix, prefix, flags='s', prf='med'):
2524        self.send_command(f'netdata publish replace {old_prefix} {prefix} {flags} {prf}')
2525        self._expect_done()
2526
2527    def netdata_unpublish_prefix(self, prefix):
2528        self.send_command(f'netdata unpublish {prefix}')
2529        self._expect_done()
2530
2531    def send_network_diag_get(self, addr, tlv_types):
2532        self.send_command('networkdiagnostic get %s %s' % (addr, ' '.join([str(t.value) for t in tlv_types])))
2533
2534        if isinstance(self.simulator, simulator.VirtualTime):
2535            self.simulator.go(8)
2536            timeout = 1
2537        else:
2538            timeout = 8
2539
2540        self._expect_done(timeout=timeout)
2541
2542    def send_network_diag_reset(self, addr, tlv_types):
2543        self.send_command('networkdiagnostic reset %s %s' % (addr, ' '.join([str(t.value) for t in tlv_types])))
2544
2545        if isinstance(self.simulator, simulator.VirtualTime):
2546            self.simulator.go(8)
2547            timeout = 1
2548        else:
2549            timeout = 8
2550
2551        self._expect_done(timeout=timeout)
2552
2553    def energy_scan(self, mask, count, period, scan_duration, ipaddr):
2554        cmd = 'commissioner energy %d %d %d %d %s' % (
2555            mask,
2556            count,
2557            period,
2558            scan_duration,
2559            ipaddr,
2560        )
2561        self.send_command(cmd)
2562
2563        if isinstance(self.simulator, simulator.VirtualTime):
2564            self.simulator.go(8)
2565            timeout = 1
2566        else:
2567            timeout = 8
2568
2569        self._expect('Energy:', timeout=timeout)
2570
2571    def panid_query(self, panid, mask, ipaddr):
2572        cmd = 'commissioner panid %d %d %s' % (panid, mask, ipaddr)
2573        self.send_command(cmd)
2574
2575        if isinstance(self.simulator, simulator.VirtualTime):
2576            self.simulator.go(8)
2577            timeout = 1
2578        else:
2579            timeout = 8
2580
2581        self._expect('Conflict:', timeout=timeout)
2582
2583    def scan(self, result=1, timeout=10):
2584        self.send_command('scan')
2585
2586        self.simulator.go(timeout)
2587
2588        if result == 1:
2589            networks = []
2590            for line in self._expect_command_output()[2:]:
2591                _, panid, extaddr, channel, dbm, lqi, _ = map(str.strip, line.split('|'))
2592                panid = int(panid, 16)
2593                channel, dbm, lqi = map(int, (channel, dbm, lqi))
2594
2595                networks.append({
2596                    'panid': panid,
2597                    'extaddr': extaddr,
2598                    'channel': channel,
2599                    'dbm': dbm,
2600                    'lqi': lqi,
2601                })
2602            return networks
2603
2604    def scan_energy(self, timeout=10):
2605        self.send_command('scan energy')
2606        self.simulator.go(timeout)
2607        rssi_list = []
2608        for line in self._expect_command_output()[2:]:
2609            _, channel, rssi, _ = line.split('|')
2610            rssi_list.append({
2611                'channel': int(channel.strip()),
2612                'rssi': int(rssi.strip()),
2613            })
2614        return rssi_list
2615
2616    def ping(self, ipaddr, num_responses=1, size=8, timeout=5, count=1, interval=1, hoplimit=64, interface=None):
2617        args = f'{ipaddr} {size} {count} {interval} {hoplimit} {timeout}'
2618        if interface is not None:
2619            args = f'-I {interface} {args}'
2620        cmd = f'ping {args}'
2621
2622        self.send_command(cmd)
2623
2624        wait_allowance = 3
2625        end = self.simulator.now() + timeout + wait_allowance
2626
2627        responders = {}
2628
2629        result = True
2630        # ncp-sim doesn't print Done
2631        done = (self.node_type == 'ncp-sim')
2632        while len(responders) < num_responses or not done:
2633            self.simulator.go(1)
2634            try:
2635                i = self._expect([r'from (\S+):', r'Done'], timeout=0.1)
2636            except (pexpect.TIMEOUT, socket.timeout):
2637                if self.simulator.now() < end:
2638                    continue
2639                result = False
2640                if isinstance(self.simulator, simulator.VirtualTime):
2641                    self.simulator.sync_devices()
2642                break
2643            else:
2644                if i == 0:
2645                    responders[self.pexpect.match.groups()[0]] = 1
2646                elif i == 1:
2647                    done = True
2648        return result
2649
2650    def reset(self):
2651        self._reset('reset')
2652
2653    def factory_reset(self):
2654        self._reset('factoryreset')
2655
2656    def _reset(self, cmd):
2657        self.send_command(cmd, expect_command_echo=False)
2658        time.sleep(self.RESET_DELAY)
2659        # Send a "version" command and drain the CLI output after reset
2660        self.send_command('version', expect_command_echo=False)
2661        while True:
2662            try:
2663                self._expect(r"[^\n]+\n", timeout=0.1)
2664                continue
2665            except pexpect.TIMEOUT:
2666                break
2667
2668        if self.is_otbr:
2669            self.set_log_level(5)
2670
2671    def set_router_selection_jitter(self, jitter):
2672        cmd = 'routerselectionjitter %d' % jitter
2673        self.send_command(cmd)
2674        self._expect_done()
2675
2676    def set_active_dataset(
2677        self,
2678        timestamp=None,
2679        channel=None,
2680        channel_mask=None,
2681        extended_panid=None,
2682        mesh_local_prefix=None,
2683        network_key=None,
2684        network_name=None,
2685        panid=None,
2686        pskc=None,
2687        security_policy=[],
2688        updateExisting=False,
2689    ):
2690
2691        if updateExisting:
2692            self.send_command('dataset init active', go=False)
2693        else:
2694            self.send_command('dataset clear', go=False)
2695        self._expect_done()
2696
2697        if timestamp is not None:
2698            cmd = 'dataset activetimestamp %d' % timestamp
2699            self.send_command(cmd, go=False)
2700            self._expect_done()
2701
2702        if channel is not None:
2703            cmd = 'dataset channel %d' % channel
2704            self.send_command(cmd, go=False)
2705            self._expect_done()
2706
2707        if channel_mask is not None:
2708            cmd = 'dataset channelmask %d' % channel_mask
2709            self.send_command(cmd, go=False)
2710            self._expect_done()
2711
2712        if extended_panid is not None:
2713            cmd = 'dataset extpanid %s' % extended_panid
2714            self.send_command(cmd, go=False)
2715            self._expect_done()
2716
2717        if mesh_local_prefix is not None:
2718            cmd = 'dataset meshlocalprefix %s' % mesh_local_prefix
2719            self.send_command(cmd, go=False)
2720            self._expect_done()
2721
2722        if network_key is not None:
2723            cmd = 'dataset networkkey %s' % network_key
2724            self.send_command(cmd, go=False)
2725            self._expect_done()
2726
2727        if network_name is not None:
2728            cmd = 'dataset networkname %s' % network_name
2729            self.send_command(cmd, go=False)
2730            self._expect_done()
2731
2732        if panid is not None:
2733            cmd = 'dataset panid %d' % panid
2734            self.send_command(cmd, go=False)
2735            self._expect_done()
2736
2737        if pskc is not None:
2738            cmd = 'dataset pskc %s' % pskc
2739            self.send_command(cmd, go=False)
2740            self._expect_done()
2741
2742        if security_policy is not None:
2743            if len(security_policy) >= 2:
2744                cmd = 'dataset securitypolicy %s %s' % (
2745                    str(security_policy[0]),
2746                    security_policy[1],
2747                )
2748            if len(security_policy) >= 3:
2749                cmd += ' %s' % (str(security_policy[2]))
2750            self.send_command(cmd, go=False)
2751            self._expect_done()
2752
2753        self.send_command('dataset commit active', go=False)
2754        self._expect_done()
2755
2756    def set_pending_dataset(self, pendingtimestamp, activetimestamp, panid=None, channel=None, delay=None):
2757        self.send_command('dataset clear')
2758        self._expect_done()
2759
2760        cmd = 'dataset pendingtimestamp %d' % pendingtimestamp
2761        self.send_command(cmd)
2762        self._expect_done()
2763
2764        cmd = 'dataset activetimestamp %d' % activetimestamp
2765        self.send_command(cmd)
2766        self._expect_done()
2767
2768        if panid is not None:
2769            cmd = 'dataset panid %d' % panid
2770            self.send_command(cmd)
2771            self._expect_done()
2772
2773        if channel is not None:
2774            cmd = 'dataset channel %d' % channel
2775            self.send_command(cmd)
2776            self._expect_done()
2777
2778        if delay is not None:
2779            cmd = 'dataset delay %d' % delay
2780            self.send_command(cmd)
2781            self._expect_done()
2782
2783        # Set the meshlocal prefix in config.py
2784        self.send_command('dataset meshlocalprefix %s' % config.MESH_LOCAL_PREFIX.split('/')[0])
2785        self._expect_done()
2786
2787        self.send_command('dataset commit pending')
2788        self._expect_done()
2789
2790    def start_dataset_updater(self, panid=None, channel=None, security_policy=None, delay=None):
2791        self.send_command('dataset clear')
2792        self._expect_done()
2793
2794        if panid is not None:
2795            cmd = 'dataset panid %d' % panid
2796            self.send_command(cmd)
2797            self._expect_done()
2798
2799        if channel is not None:
2800            cmd = 'dataset channel %d' % channel
2801            self.send_command(cmd)
2802            self._expect_done()
2803
2804        if security_policy is not None:
2805            cmd = 'dataset securitypolicy %d %s ' % (security_policy[0], security_policy[1])
2806            if (len(security_policy) >= 3):
2807                cmd += '%d ' % (security_policy[2])
2808            self.send_command(cmd)
2809            self._expect_done()
2810
2811        if delay is not None:
2812            cmd = 'dataset delay %d ' % delay
2813            self.send_command(cmd)
2814            self._expect_done()
2815
2816        self.send_command('dataset updater start')
2817        self._expect_done()
2818
2819    def announce_begin(self, mask, count, period, ipaddr):
2820        cmd = 'commissioner announce %d %d %d %s' % (
2821            mask,
2822            count,
2823            period,
2824            ipaddr,
2825        )
2826        self.send_command(cmd)
2827        self._expect_done()
2828
2829    def send_mgmt_active_set(
2830        self,
2831        active_timestamp=None,
2832        channel=None,
2833        channel_mask=None,
2834        extended_panid=None,
2835        panid=None,
2836        network_key=None,
2837        mesh_local=None,
2838        network_name=None,
2839        security_policy=None,
2840        binary=None,
2841    ):
2842        cmd = 'dataset mgmtsetcommand active '
2843
2844        if active_timestamp is not None:
2845            cmd += 'activetimestamp %d ' % active_timestamp
2846
2847        if channel is not None:
2848            cmd += 'channel %d ' % channel
2849
2850        if channel_mask is not None:
2851            cmd += 'channelmask %d ' % channel_mask
2852
2853        if extended_panid is not None:
2854            cmd += 'extpanid %s ' % extended_panid
2855
2856        if panid is not None:
2857            cmd += 'panid %d ' % panid
2858
2859        if network_key is not None:
2860            cmd += 'networkkey %s ' % network_key
2861
2862        if mesh_local is not None:
2863            cmd += 'localprefix %s ' % mesh_local
2864
2865        if network_name is not None:
2866            cmd += 'networkname %s ' % self._escape_escapable(network_name)
2867
2868        if security_policy is not None:
2869            cmd += 'securitypolicy %d %s ' % (security_policy[0], security_policy[1])
2870            if (len(security_policy) >= 3):
2871                cmd += '%d ' % (security_policy[2])
2872
2873        if binary is not None:
2874            cmd += '-x %s ' % binary
2875
2876        self.send_command(cmd)
2877        self._expect_done()
2878
2879    def send_mgmt_active_get(self, addr='', tlvs=[]):
2880        cmd = 'dataset mgmtgetcommand active'
2881
2882        if addr != '':
2883            cmd += ' address '
2884            cmd += addr
2885
2886        if len(tlvs) != 0:
2887            tlv_str = ''.join('%02x' % tlv for tlv in tlvs)
2888            cmd += ' -x '
2889            cmd += tlv_str
2890
2891        self.send_command(cmd)
2892        self._expect_done()
2893
2894    def send_mgmt_pending_get(self, addr='', tlvs=[]):
2895        cmd = 'dataset mgmtgetcommand pending'
2896
2897        if addr != '':
2898            cmd += ' address '
2899            cmd += addr
2900
2901        if len(tlvs) != 0:
2902            tlv_str = ''.join('%02x' % tlv for tlv in tlvs)
2903            cmd += ' -x '
2904            cmd += tlv_str
2905
2906        self.send_command(cmd)
2907        self._expect_done()
2908
2909    def send_mgmt_pending_set(
2910        self,
2911        pending_timestamp=None,
2912        active_timestamp=None,
2913        delay_timer=None,
2914        channel=None,
2915        panid=None,
2916        network_key=None,
2917        mesh_local=None,
2918        network_name=None,
2919    ):
2920        cmd = 'dataset mgmtsetcommand pending '
2921        if pending_timestamp is not None:
2922            cmd += 'pendingtimestamp %d ' % pending_timestamp
2923
2924        if active_timestamp is not None:
2925            cmd += 'activetimestamp %d ' % active_timestamp
2926
2927        if delay_timer is not None:
2928            cmd += 'delaytimer %d ' % delay_timer
2929
2930        if channel is not None:
2931            cmd += 'channel %d ' % channel
2932
2933        if panid is not None:
2934            cmd += 'panid %d ' % panid
2935
2936        if network_key is not None:
2937            cmd += 'networkkey %s ' % network_key
2938
2939        if mesh_local is not None:
2940            cmd += 'localprefix %s ' % mesh_local
2941
2942        if network_name is not None:
2943            cmd += 'networkname %s ' % self._escape_escapable(network_name)
2944
2945        self.send_command(cmd)
2946        self._expect_done()
2947
2948    def coap_cancel(self):
2949        """
2950        Cancel a CoAP subscription.
2951        """
2952        cmd = 'coap cancel'
2953        self.send_command(cmd)
2954        self._expect_done()
2955
2956    def coap_delete(self, ipaddr, uri, con=False, payload=None):
2957        """
2958        Send a DELETE request via CoAP.
2959        """
2960        return self._coap_rq('delete', ipaddr, uri, con, payload)
2961
2962    def coap_get(self, ipaddr, uri, con=False, payload=None):
2963        """
2964        Send a GET request via CoAP.
2965        """
2966        return self._coap_rq('get', ipaddr, uri, con, payload)
2967
2968    def coap_get_block(self, ipaddr, uri, size=16, count=0):
2969        """
2970        Send a GET request via CoAP.
2971        """
2972        return self._coap_rq_block('get', ipaddr, uri, size, count)
2973
2974    def coap_observe(self, ipaddr, uri, con=False, payload=None):
2975        """
2976        Send a GET request via CoAP with Observe set.
2977        """
2978        return self._coap_rq('observe', ipaddr, uri, con, payload)
2979
2980    def coap_post(self, ipaddr, uri, con=False, payload=None):
2981        """
2982        Send a POST request via CoAP.
2983        """
2984        return self._coap_rq('post', ipaddr, uri, con, payload)
2985
2986    def coap_post_block(self, ipaddr, uri, size=16, count=0):
2987        """
2988        Send a POST request via CoAP.
2989        """
2990        return self._coap_rq_block('post', ipaddr, uri, size, count)
2991
2992    def coap_put(self, ipaddr, uri, con=False, payload=None):
2993        """
2994        Send a PUT request via CoAP.
2995        """
2996        return self._coap_rq('put', ipaddr, uri, con, payload)
2997
2998    def coap_put_block(self, ipaddr, uri, size=16, count=0):
2999        """
3000        Send a PUT request via CoAP.
3001        """
3002        return self._coap_rq_block('put', ipaddr, uri, size, count)
3003
3004    def _coap_rq(self, method, ipaddr, uri, con=False, payload=None):
3005        """
3006        Issue a GET/POST/PUT/DELETE/GET OBSERVE request.
3007        """
3008        cmd = 'coap %s %s %s' % (method, ipaddr, uri)
3009        if con:
3010            cmd += ' con'
3011        else:
3012            cmd += ' non'
3013
3014        if payload is not None:
3015            cmd += ' %s' % payload
3016
3017        self.send_command(cmd)
3018        return self.coap_wait_response()
3019
3020    def _coap_rq_block(self, method, ipaddr, uri, size=16, count=0):
3021        """
3022        Issue a GET/POST/PUT/DELETE/GET OBSERVE BLOCK request.
3023        """
3024        cmd = 'coap %s %s %s' % (method, ipaddr, uri)
3025
3026        cmd += ' block-%d' % size
3027
3028        if count != 0:
3029            cmd += ' %d' % count
3030
3031        self.send_command(cmd)
3032        return self.coap_wait_response()
3033
3034    def coap_wait_response(self):
3035        """
3036        Wait for a CoAP response, and return it.
3037        """
3038        if isinstance(self.simulator, simulator.VirtualTime):
3039            self.simulator.go(5)
3040            timeout = 1
3041        else:
3042            timeout = 5
3043
3044        self._expect(r'coap response from ([\da-f:]+)(?: OBS=(\d+))?'
3045                     r'(?: with payload: ([\da-f]+))?\b',
3046                     timeout=timeout)
3047        (source, observe, payload) = self.pexpect.match.groups()
3048        source = source.decode('UTF-8')
3049
3050        if observe is not None:
3051            observe = int(observe, base=10)
3052
3053        if payload is not None:
3054            try:
3055                payload = binascii.a2b_hex(payload).decode('UTF-8')
3056            except UnicodeDecodeError:
3057                pass
3058
3059        # Return the values received
3060        return dict(source=source, observe=observe, payload=payload)
3061
3062    def coap_wait_request(self):
3063        """
3064        Wait for a CoAP request to be made.
3065        """
3066        if isinstance(self.simulator, simulator.VirtualTime):
3067            self.simulator.go(5)
3068            timeout = 1
3069        else:
3070            timeout = 5
3071
3072        self._expect(r'coap request from ([\da-f:]+)(?: OBS=(\d+))?'
3073                     r'(?: with payload: ([\da-f]+))?\b',
3074                     timeout=timeout)
3075        (source, observe, payload) = self.pexpect.match.groups()
3076        source = source.decode('UTF-8')
3077
3078        if observe is not None:
3079            observe = int(observe, base=10)
3080
3081        if payload is not None:
3082            payload = binascii.a2b_hex(payload).decode('UTF-8')
3083
3084        # Return the values received
3085        return dict(source=source, observe=observe, payload=payload)
3086
3087    def coap_wait_subscribe(self):
3088        """
3089        Wait for a CoAP client to be subscribed.
3090        """
3091        if isinstance(self.simulator, simulator.VirtualTime):
3092            self.simulator.go(5)
3093            timeout = 1
3094        else:
3095            timeout = 5
3096
3097        self._expect(r'Subscribing client\b', timeout=timeout)
3098
3099    def coap_wait_ack(self):
3100        """
3101        Wait for a CoAP notification ACK.
3102        """
3103        if isinstance(self.simulator, simulator.VirtualTime):
3104            self.simulator.go(5)
3105            timeout = 1
3106        else:
3107            timeout = 5
3108
3109        self._expect(r'Received ACK in reply to notification from ([\da-f:]+)\b', timeout=timeout)
3110        (source,) = self.pexpect.match.groups()
3111        source = source.decode('UTF-8')
3112
3113        return source
3114
3115    def coap_set_resource_path(self, path):
3116        """
3117        Set the path for the CoAP resource.
3118        """
3119        cmd = 'coap resource %s' % path
3120        self.send_command(cmd)
3121        self._expect_done()
3122
3123    def coap_set_resource_path_block(self, path, count=0):
3124        """
3125        Set the path for the CoAP resource and how many blocks can be received from this resource.
3126        """
3127        cmd = 'coap resource %s %d' % (path, count)
3128        self.send_command(cmd)
3129        self._expect('Done')
3130
3131    def coap_set_content(self, content):
3132        """
3133        Set the content of the CoAP resource.
3134        """
3135        cmd = 'coap set %s' % content
3136        self.send_command(cmd)
3137        self._expect_done()
3138
3139    def coap_start(self):
3140        """
3141        Start the CoAP service.
3142        """
3143        cmd = 'coap start'
3144        self.send_command(cmd)
3145        self._expect_done()
3146
3147    def coap_stop(self):
3148        """
3149        Stop the CoAP service.
3150        """
3151        cmd = 'coap stop'
3152        self.send_command(cmd)
3153
3154        if isinstance(self.simulator, simulator.VirtualTime):
3155            self.simulator.go(5)
3156            timeout = 1
3157        else:
3158            timeout = 5
3159
3160        self._expect_done(timeout=timeout)
3161
3162    def coaps_start_psk(self, psk, pskIdentity):
3163        cmd = 'coaps psk %s %s' % (psk, pskIdentity)
3164        self.send_command(cmd)
3165        self._expect_done()
3166
3167        cmd = 'coaps start'
3168        self.send_command(cmd)
3169        self._expect_done()
3170
3171    def coaps_start_x509(self):
3172        cmd = 'coaps x509'
3173        self.send_command(cmd)
3174        self._expect_done()
3175
3176        cmd = 'coaps start'
3177        self.send_command(cmd)
3178        self._expect_done()
3179
3180    def coaps_set_resource_path(self, path):
3181        cmd = 'coaps resource %s' % path
3182        self.send_command(cmd)
3183        self._expect_done()
3184
3185    def coaps_stop(self):
3186        cmd = 'coaps stop'
3187        self.send_command(cmd)
3188
3189        if isinstance(self.simulator, simulator.VirtualTime):
3190            self.simulator.go(5)
3191            timeout = 1
3192        else:
3193            timeout = 5
3194
3195        self._expect_done(timeout=timeout)
3196
3197    def coaps_connect(self, ipaddr):
3198        cmd = 'coaps connect %s' % ipaddr
3199        self.send_command(cmd)
3200
3201        if isinstance(self.simulator, simulator.VirtualTime):
3202            self.simulator.go(5)
3203            timeout = 1
3204        else:
3205            timeout = 5
3206
3207        self._expect('coaps connected', timeout=timeout)
3208
3209    def coaps_disconnect(self):
3210        cmd = 'coaps disconnect'
3211        self.send_command(cmd)
3212        self._expect_done()
3213        self.simulator.go(5)
3214
3215    def coaps_get(self):
3216        cmd = 'coaps get test'
3217        self.send_command(cmd)
3218
3219        if isinstance(self.simulator, simulator.VirtualTime):
3220            self.simulator.go(5)
3221            timeout = 1
3222        else:
3223            timeout = 5
3224
3225        self._expect('coaps response', timeout=timeout)
3226
3227    def commissioner_mgmtget(self, tlvs_binary=None):
3228        cmd = 'commissioner mgmtget'
3229        if tlvs_binary is not None:
3230            cmd += ' -x %s' % tlvs_binary
3231        self.send_command(cmd)
3232        self._expect_done()
3233
3234    def commissioner_mgmtset(self, tlvs_binary):
3235        cmd = 'commissioner mgmtset -x %s' % tlvs_binary
3236        self.send_command(cmd)
3237        self._expect_done()
3238
3239    def bytes_to_hex_str(self, src):
3240        return ''.join(format(x, '02x') for x in src)
3241
3242    def commissioner_mgmtset_with_tlvs(self, tlvs):
3243        payload = bytearray()
3244        for tlv in tlvs:
3245            payload += tlv.to_hex()
3246        self.commissioner_mgmtset(self.bytes_to_hex_str(payload))
3247
3248    def udp_start(self, local_ipaddr, local_port, bind_unspecified=False):
3249        cmd = 'udp open'
3250        self.send_command(cmd)
3251        self._expect_done()
3252
3253        cmd = 'udp bind %s %s %s' % ("-u" if bind_unspecified else "", local_ipaddr, local_port)
3254        self.send_command(cmd)
3255        self._expect_done()
3256
3257    def udp_stop(self):
3258        cmd = 'udp close'
3259        self.send_command(cmd)
3260        self._expect_done()
3261
3262    def udp_send(self, bytes, ipaddr, port, success=True):
3263        cmd = 'udp send %s %d -s %d ' % (ipaddr, port, bytes)
3264        self.send_command(cmd)
3265        if success:
3266            self._expect_done()
3267        else:
3268            self._expect('Error')
3269
3270    def udp_check_rx(self, bytes_should_rx):
3271        self._expect('%d bytes' % bytes_should_rx)
3272
3273    def set_routereligible(self, enable: bool):
3274        cmd = f'routereligible {"enable" if enable else "disable"}'
3275        self.send_command(cmd)
3276        self._expect_done()
3277
3278    def router_list(self):
3279        cmd = 'router list'
3280        self.send_command(cmd)
3281        self._expect([r'(\d+)((\s\d+)*)'])
3282
3283        g = self.pexpect.match.groups()
3284        router_list = g[0].decode('utf8') + ' ' + g[1].decode('utf8')
3285        router_list = [int(x) for x in router_list.split()]
3286        self._expect_done()
3287        return router_list
3288
3289    def router_table(self):
3290        cmd = 'router table'
3291        self.send_command(cmd)
3292
3293        self._expect(r'(.*)Done')
3294        g = self.pexpect.match.groups()
3295        output = g[0].decode('utf8')
3296        lines = output.strip().split('\n')
3297        lines = [l.strip() for l in lines]
3298        router_table = {}
3299        for i, line in enumerate(lines):
3300            if not line.startswith('|') or not line.endswith('|'):
3301                if i not in (0, 2):
3302                    # should not happen
3303                    print("unexpected line %d: %s" % (i, line))
3304
3305                continue
3306
3307            line = line[1:][:-1]
3308            line = [x.strip() for x in line.split('|')]
3309            if len(line) < 9:
3310                print("unexpected line %d: %s" % (i, line))
3311                continue
3312
3313            try:
3314                int(line[0])
3315            except ValueError:
3316                if i != 1:
3317                    print("unexpected line %d: %s" % (i, line))
3318                continue
3319
3320            id = int(line[0])
3321            rloc16 = int(line[1], 16)
3322            nexthop = int(line[2])
3323            pathcost = int(line[3])
3324            lqin = int(line[4])
3325            lqout = int(line[5])
3326            age = int(line[6])
3327            emac = str(line[7])
3328            link = int(line[8])
3329
3330            router_table[id] = {
3331                'rloc16': rloc16,
3332                'nexthop': nexthop,
3333                'pathcost': pathcost,
3334                'lqin': lqin,
3335                'lqout': lqout,
3336                'age': age,
3337                'emac': emac,
3338                'link': link,
3339            }
3340
3341        return router_table
3342
3343    def link_metrics_request_single_probe(self, dst_addr: str, linkmetrics_flags: str, mode: str = ''):
3344        cmd = 'linkmetrics request %s %s single %s' % (mode, dst_addr, linkmetrics_flags)
3345        self.send_command(cmd)
3346        self.simulator.go(5)
3347        return self._parse_linkmetrics_query_result(self._expect_command_output())
3348
3349    def link_metrics_request_forward_tracking_series(self, dst_addr: str, series_id: int, mode: str = ''):
3350        cmd = 'linkmetrics request %s %s forward %d' % (mode, dst_addr, series_id)
3351        self.send_command(cmd)
3352        self.simulator.go(5)
3353        return self._parse_linkmetrics_query_result(self._expect_command_output())
3354
3355    def _parse_linkmetrics_query_result(self, lines):
3356        """Parse link metrics query result"""
3357
3358        # Example of command output:
3359        # ['Received Link Metrics Report from: fe80:0:0:0:146e:a00:0:1',
3360        #  '- PDU Counter: 1 (Count/Summation)',
3361        #  '- LQI: 0 (Exponential Moving Average)',
3362        #  '- Margin: 80 (dB) (Exponential Moving Average)',
3363        #  '- RSSI: -20 (dBm) (Exponential Moving Average)']
3364        #
3365        # Or 'Link Metrics Report, status: {status}'
3366
3367        result = {}
3368        for line in lines:
3369            if line.startswith('- '):
3370                k, v = line[2:].split(': ')
3371                result[k] = v.split(' ')[0]
3372            elif line.startswith('Link Metrics Report, status: '):
3373                result['Status'] = line[29:]
3374        return result
3375
3376    def link_metrics_config_req_enhanced_ack_based_probing(self,
3377                                                           dst_addr: str,
3378                                                           enable: bool,
3379                                                           metrics_flags: str,
3380                                                           ext_flags='',
3381                                                           mode: str = ''):
3382        cmd = "linkmetrics config %s %s enhanced-ack" % (mode, dst_addr)
3383        if enable:
3384            cmd = cmd + (" register %s %s" % (metrics_flags, ext_flags))
3385        else:
3386            cmd = cmd + " clear"
3387        self.send_command(cmd)
3388        self._expect_done()
3389
3390    def link_metrics_config_req_forward_tracking_series(self,
3391                                                        dst_addr: str,
3392                                                        series_id: int,
3393                                                        series_flags: str,
3394                                                        metrics_flags: str,
3395                                                        mode: str = ''):
3396        cmd = "linkmetrics config %s %s forward %d %s %s" % (mode, dst_addr, series_id, series_flags, metrics_flags)
3397        self.send_command(cmd)
3398        self._expect_done()
3399
3400    def link_metrics_send_link_probe(self, dst_addr: str, series_id: int, length: int):
3401        cmd = "linkmetrics probe %s %d %d" % (dst_addr, series_id, length)
3402        self.send_command(cmd)
3403        self._expect_done()
3404
3405    def link_metrics_mgr_set_enabled(self, enable: bool):
3406        op_str = "enable" if enable else "disable"
3407        cmd = f'linkmetricsmgr {op_str}'
3408        self.send_command(cmd)
3409        self._expect_done()
3410
3411    def send_address_notification(self, dst: str, target: str, mliid: str):
3412        cmd = f'fake /a/an {dst} {target} {mliid}'
3413        self.send_command(cmd)
3414        self._expect_done()
3415
3416    def send_proactive_backbone_notification(self, target: str, mliid: str, ltt: int):
3417        cmd = f'fake /b/ba {target} {mliid} {ltt}'
3418        self.send_command(cmd)
3419        self._expect_done()
3420
3421    def dns_get_config(self):
3422        """
3423        Returns the DNS config as a list of property dictionary (string key and string value).
3424
3425        Example output:
3426        {
3427            'Server': '[fd00:0:0:0:0:0:0:1]:1234'
3428            'ResponseTimeout': '5000 ms'
3429            'MaxTxAttempts': '2'
3430            'RecursionDesired': 'no'
3431        }
3432        """
3433        cmd = f'dns config'
3434        self.send_command(cmd)
3435        output = self._expect_command_output()
3436        config = {}
3437        for line in output:
3438            k, v = line.split(': ')
3439            config[k] = v
3440        return config
3441
3442    def dns_set_config(self, config):
3443        cmd = f'dns config {config}'
3444        self.send_command(cmd)
3445        self._expect_done()
3446
3447    def dns_resolve(self, hostname, server=None, port=53):
3448        cmd = f'dns resolve {hostname}'
3449        if server is not None:
3450            cmd += f' {server} {port}'
3451
3452        self.send_command(cmd)
3453        self.simulator.go(10)
3454        output = self._expect_command_output()
3455        dns_resp = output[0]
3456        # example output: "DNS response for host1.default.service.arpa. - fd00:db8:0:0:fd3d:d471:1e8c:b60 TTL:7190 "
3457        #                 " fd00:db8:0:0:0:ff:fe00:9000 TTL:7190"
3458        addrs = dns_resp.strip().split(' - ')[1].split(' ')
3459        ip = [item.strip() for item in addrs[::2]]
3460        ttl = [int(item.split('TTL:')[1]) for item in addrs[1::2]]
3461
3462        return list(zip(ip, ttl))
3463
3464    def _parse_dns_service_info(self, output):
3465        # Example of `output`
3466        #   Port:22222, Priority:2, Weight:2, TTL:7155
3467        #   Host:host2.default.service.arpa.
3468        #   HostAddress:0:0:0:0:0:0:0:0 TTL:0
3469        #   TXT:[a=00, b=02bb] TTL:7155
3470
3471        m = re.match(
3472            r'.*Port:(\d+), Priority:(\d+), Weight:(\d+), TTL:(\d+)\s+Host:(.*?)\s+HostAddress:(\S+) TTL:(\d+)\s+TXT:\[(.*?)\] TTL:(\d+)',
3473            '\r'.join(output))
3474        if not m:
3475            return {}
3476        port, priority, weight, srv_ttl, hostname, address, aaaa_ttl, txt_data, txt_ttl = m.groups()
3477        return {
3478            'port': int(port),
3479            'priority': int(priority),
3480            'weight': int(weight),
3481            'host': hostname,
3482            'address': address,
3483            'txt_data': txt_data,
3484            'srv_ttl': int(srv_ttl),
3485            'txt_ttl': int(txt_ttl),
3486            'aaaa_ttl': int(aaaa_ttl),
3487        }
3488
3489    def dns_resolve_service(self, instance, service, server=None, port=53):
3490        """
3491        Resolves the service instance and returns the instance information as a dict.
3492
3493        Example return value:
3494            {
3495                'port': 12345,
3496                'priority': 0,
3497                'weight': 0,
3498                'host': 'ins1._ipps._tcp.default.service.arpa.',
3499                'address': '2001::1',
3500                'txt_data': 'a=00, b=02bb',
3501                'srv_ttl': 7100,
3502                'txt_ttl': 7100,
3503                'aaaa_ttl': 7100,
3504            }
3505        """
3506        instance = self._escape_escapable(instance)
3507        cmd = f'dns service {instance} {service}'
3508        if server is not None:
3509            cmd += f' {server} {port}'
3510
3511        self.send_command(cmd)
3512        self.simulator.go(10)
3513        output = self._expect_command_output()
3514        info = self._parse_dns_service_info(output)
3515        if not info:
3516            raise Exception('dns resolve service failed: %s.%s' % (instance, service))
3517        return info
3518
3519    @staticmethod
3520    def __parse_hex_string(hexstr: str) -> bytes:
3521        assert (len(hexstr) % 2 == 0)
3522        return bytes(int(hexstr[i:i + 2], 16) for i in range(0, len(hexstr), 2))
3523
3524    def dns_browse(self, service_name, server=None, port=53):
3525        """
3526        Browse the service and returns the instances.
3527
3528        Example return value:
3529            {
3530                'ins1': {
3531                    'port': 12345,
3532                    'priority': 1,
3533                    'weight': 1,
3534                    'host': 'ins1._ipps._tcp.default.service.arpa.',
3535                    'address': '2001::1',
3536                    'txt_data': 'a=00, b=11cf',
3537                    'srv_ttl': 7100,
3538                    'txt_ttl': 7100,
3539                    'aaaa_ttl': 7100,
3540                },
3541                'ins2': {
3542                    'port': 12345,
3543                    'priority': 2,
3544                    'weight': 2,
3545                    'host': 'ins2._ipps._tcp.default.service.arpa.',
3546                    'address': '2001::2',
3547                    'txt_data': 'a=01, b=23dd',
3548                    'srv_ttl': 7100,
3549                    'txt_ttl': 7100,
3550                    'aaaa_ttl': 7100,
3551                }
3552            }
3553        """
3554        cmd = f'dns browse {service_name}'
3555        if server is not None:
3556            cmd += f' {server} {port}'
3557
3558        self.send_command(cmd)
3559        self.simulator.go(10)
3560        output = self._expect_command_output()
3561
3562        # Example output:
3563        # DNS browse response for _ipps._tcp.default.service.arpa.
3564        # ins2
3565        #     Port:22222, Priority:2, Weight:2, TTL:7175
3566        #     Host:host2.default.service.arpa.
3567        #     HostAddress:fd00:db8:0:0:3205:28dd:5b87:6a63 TTL:7175
3568        #     TXT:[a=00, b=11cf] TTL:7175
3569        # ins1
3570        #     Port:11111, Priority:1, Weight:1, TTL:7170
3571        #     Host:host1.default.service.arpa.
3572        #     HostAddress:fd00:db8:0:0:39f4:d9:eb4f:778 TTL:7170
3573        #     TXT:[a=01, b=23dd] TTL:7170
3574        # Done
3575
3576        result = {}
3577        index = 1  # skip first line
3578        while index < len(output):
3579            ins = output[index].strip()
3580            result[ins] = self._parse_dns_service_info(output[index + 1:index + 6])
3581            index = index + (5 if result[ins] else 1)
3582        return result
3583
3584    def set_mliid(self, mliid: str):
3585        cmd = f'mliid {mliid}'
3586        self.send_command(cmd)
3587        self._expect_command_output()
3588
3589    def history_netinfo(self, num_entries=0):
3590        """
3591        Get the `netinfo` history list, parse each entry and return
3592        a list of dictionary (string key and string value) entries.
3593
3594        Example of return value:
3595        [
3596            {
3597                'age': '00:00:00.000 ago',
3598                'role': 'disabled',
3599                'mode': 'rdn',
3600                'rloc16': '0x7400',
3601                'partition-id': '1318093703'
3602            },
3603            {
3604                'age': '00:00:02.588 ago',
3605                'role': 'leader',
3606                'mode': 'rdn',
3607                'rloc16': '0x7400',
3608                'partition-id': '1318093703'
3609            }
3610        ]
3611        """
3612        cmd = f'history netinfo list {num_entries}'
3613        self.send_command(cmd)
3614        output = self._expect_command_output()
3615        netinfos = []
3616        for entry in output:
3617            netinfo = {}
3618            age, info = entry.split(' -> ')
3619            netinfo['age'] = age
3620            for item in info.split(' '):
3621                k, v = item.split(':')
3622                netinfo[k] = v
3623            netinfos.append(netinfo)
3624        return netinfos
3625
3626    def history_rx(self, num_entries=0):
3627        """
3628        Get the IPv6 RX history list, parse each entry and return
3629        a list of dictionary (string key and string value) entries.
3630
3631        Example of return value:
3632        [
3633            {
3634                'age': '00:00:01.999',
3635                'type': 'ICMP6(EchoReqst)',
3636                'len': '16',
3637                'sec': 'yes',
3638                'prio': 'norm',
3639                'rss': '-20',
3640                'from': '0xac00',
3641                'radio': '15.4',
3642                'src': '[fd00:db8:0:0:2cfa:fd61:58a9:f0aa]:0',
3643                'dst': '[fd00:db8:0:0:ed7e:2d04:e543:eba5]:0',
3644            }
3645        ]
3646        """
3647        cmd = f'history rx list {num_entries}'
3648        self.send_command(cmd)
3649        return self._parse_history_rx_tx_ouput(self._expect_command_output())
3650
3651    def history_tx(self, num_entries=0):
3652        """
3653        Get the IPv6 TX history list, parse each entry and return
3654        a list of dictionary (string key and string value) entries.
3655
3656        Example of return value:
3657        [
3658            {
3659                'age': '00:00:01.999',
3660                'type': 'ICMP6(EchoReply)',
3661                'len': '16',
3662                'sec': 'yes',
3663                'prio': 'norm',
3664                'to': '0xac00',
3665                'tx-success': 'yes',
3666                'radio': '15.4',
3667                'src': '[fd00:db8:0:0:ed7e:2d04:e543:eba5]:0',
3668                'dst': '[fd00:db8:0:0:2cfa:fd61:58a9:f0aa]:0',
3669
3670            }
3671        ]
3672        """
3673        cmd = f'history tx list {num_entries}'
3674        self.send_command(cmd)
3675        return self._parse_history_rx_tx_ouput(self._expect_command_output())
3676
3677    def _parse_history_rx_tx_ouput(self, lines):
3678        rxtx_list = []
3679        for line in lines:
3680            if line.strip().startswith('type:'):
3681                for item in line.strip().split(' '):
3682                    k, v = item.split(':')
3683                    entry[k] = v
3684            elif line.strip().startswith('src:'):
3685                entry['src'] = line[4:]
3686            elif line.strip().startswith('dst:'):
3687                entry['dst'] = line[4:]
3688                rxtx_list.append(entry)
3689            else:
3690                entry = {}
3691                entry['age'] = line
3692
3693        return rxtx_list
3694
3695    def set_router_id_range(self, min_router_id: int, max_router_id: int):
3696        cmd = f'routeridrange {min_router_id} {max_router_id}'
3697        self.send_command(cmd)
3698        self._expect_command_output()
3699
3700    def get_router_id_range(self):
3701        cmd = 'routeridrange'
3702        self.send_command(cmd)
3703        line = self._expect_command_output()[0]
3704        return [int(item) for item in line.split()]
3705
3706    def get_channel_monitor_info(self) -> Dict:
3707        """
3708        Returns:
3709            Dict of channel monitor info, e.g.
3710                {'enabled': '1',
3711                 'interval': '41000',
3712                 'threshold': '-75',
3713                 'window': '960',
3714                 'count': '985',
3715                 'occupancies': {
3716                    '11': '0.00%',
3717                    '12': '3.50%',
3718                    '13': '9.89%',
3719                    '14': '15.36%',
3720                    '15': '20.02%',
3721                    '16': '21.95%',
3722                    '17': '32.71%',
3723                    '18': '35.76%',
3724                    '19': '37.97%',
3725                    '20': '43.68%',
3726                    '21': '48.95%',
3727                    '22': '54.05%',
3728                    '23': '58.65%',
3729                    '24': '68.26%',
3730                    '25': '66.73%',
3731                    '26': '73.12%'
3732                    }
3733                }
3734        """
3735        config = {}
3736        self.send_command('channel monitor')
3737
3738        for line in self._expect_results(r'\S+'):
3739            if re.match(r'.*:\s.*', line):
3740                key, val = line.split(':')
3741                config.update({key: val.strip()})
3742            elif re.match(r'.*:', line):  # occupancy
3743                occ_key, val = line.split(':')
3744                val = {}
3745                config.update({occ_key: val})
3746            elif 'busy' in line:
3747                # channel occupancies
3748                key = line.split()[1]
3749                val = line.split()[3]
3750                config[occ_key].update({key: val})
3751        return config
3752
3753    def set_channel_manager_auto_enable(self, enable: bool):
3754        self.send_command(f'channel manager auto {int(enable)}')
3755        self._expect_done()
3756
3757    def set_channel_manager_autocsl_enable(self, enable: bool):
3758        self.send_command(f'channel manager autocsl {int(enable)}')
3759        self._expect_done()
3760
3761    def set_channel_manager_supported(self, channel_mask: int):
3762        self.send_command(f'channel manager supported {int(channel_mask)}')
3763        self._expect_done()
3764
3765    def set_channel_manager_favored(self, channel_mask: int):
3766        self.send_command(f'channel manager favored {int(channel_mask)}')
3767        self._expect_done()
3768
3769    def set_channel_manager_interval(self, interval: int):
3770        self.send_command(f'channel manager interval {interval}')
3771        self._expect_done()
3772
3773    def set_channel_manager_cca_threshold(self, hex_value: str):
3774        self.send_command(f'channel manager threshold {hex_value}')
3775        self._expect_done()
3776
3777    def get_channel_manager_config(self):
3778        self.send_command('channel manager')
3779        return self._expect_key_value_pairs(r'\S+')
3780
3781
3782class Node(NodeImpl, OtCli):
3783    pass
3784
3785
3786class LinuxHost():
3787    PING_RESPONSE_PATTERN = re.compile(r'\d+ bytes from .*:.*')
3788    ETH_DEV = config.BACKBONE_IFNAME
3789
3790    def enable_ether(self):
3791        """Enable the ethernet interface.
3792        """
3793
3794        self.bash(f'ip link set {self.ETH_DEV} up')
3795
3796    def disable_ether(self):
3797        """Disable the ethernet interface.
3798        """
3799
3800        self.bash(f'ip link set {self.ETH_DEV} down')
3801
3802    def get_ether_addrs(self):
3803        output = self.bash(f'ip -6 addr list dev {self.ETH_DEV}')
3804
3805        addrs = []
3806        for line in output:
3807            # line example: "inet6 fe80::42:c0ff:fea8:903/64 scope link"
3808            line = line.strip().split()
3809
3810            if line and line[0] == 'inet6':
3811                addr = line[1]
3812                if '/' in addr:
3813                    addr = addr.split('/')[0]
3814                addrs.append(addr)
3815
3816        logging.debug('%s: get_ether_addrs: %r', self, addrs)
3817        return addrs
3818
3819    def get_ether_mac(self):
3820        output = self.bash(f'ip addr list dev {self.ETH_DEV}')
3821        for line in output:
3822            # link/ether 02:42:ac:11:00:02 brd ff:ff:ff:ff:ff:ff link-netnsid 0
3823            line = line.strip().split()
3824            if line and line[0] == 'link/ether':
3825                return line[1]
3826
3827        assert False, output
3828
3829    def add_ipmaddr_ether(self, ip: str):
3830        cmd = f'python3 /app/third_party/openthread/repo/tests/scripts/thread-cert/mcast6.py {self.ETH_DEV} {ip} &'
3831        self.bash(cmd)
3832
3833    def ping_ether(self, ipaddr, num_responses=1, size=None, timeout=5, ttl=None, interface='eth0') -> int:
3834
3835        cmd = f'ping -6 {ipaddr} -I {interface} -c {num_responses} -W {timeout}'
3836        if size is not None:
3837            cmd += f' -s {size}'
3838
3839        if ttl is not None:
3840            cmd += f' -t {ttl}'
3841
3842        resp_count = 0
3843
3844        try:
3845            for line in self.bash(cmd):
3846                if self.PING_RESPONSE_PATTERN.match(line):
3847                    resp_count += 1
3848        except subprocess.CalledProcessError:
3849            pass
3850
3851        return resp_count
3852
3853    def get_ip6_address(self, address_type: config.ADDRESS_TYPE):
3854        """Get specific type of IPv6 address configured on thread device.
3855
3856        Args:
3857            address_type: the config.ADDRESS_TYPE type of IPv6 address.
3858
3859        Returns:
3860            IPv6 address string.
3861        """
3862        if address_type == config.ADDRESS_TYPE.BACKBONE_GUA:
3863            return self._getBackboneGua()
3864        elif address_type == config.ADDRESS_TYPE.BACKBONE_LINK_LOCAL:
3865            return self._getInfraLinkLocalAddress()
3866        elif address_type == config.ADDRESS_TYPE.ONLINK_ULA:
3867            return self._getInfraUla()
3868        elif address_type == config.ADDRESS_TYPE.ONLINK_GUA:
3869            return self._getInfraGua()
3870        else:
3871            raise ValueError(f'unsupported address type: {address_type}')
3872
3873    def _getBackboneGua(self) -> Optional[str]:
3874        for addr in self.get_ether_addrs():
3875            if re.match(config.BACKBONE_PREFIX_REGEX_PATTERN, addr, re.I):
3876                return addr
3877
3878        return None
3879
3880    def _getInfraUla(self) -> Optional[str]:
3881        """ Returns the ULA addresses autoconfigured on the infra link.
3882        """
3883        addrs = []
3884        for addr in self.get_ether_addrs():
3885            if re.match(config.ONLINK_PREFIX_REGEX_PATTERN, addr, re.I):
3886                addrs.append(addr)
3887
3888        return addrs
3889
3890    def _getInfraGua(self) -> Optional[str]:
3891        """ Returns the GUA addresses autoconfigured on the infra link.
3892        """
3893
3894        gua_prefix = config.ONLINK_GUA_PREFIX.split('::/')[0]
3895        return [addr for addr in self.get_ether_addrs() if addr.startswith(gua_prefix)]
3896
3897    def _getInfraLinkLocalAddress(self) -> Optional[str]:
3898        """ Returns the link-local address autoconfigured on the infra link, which is started with "fe80".
3899        """
3900        for addr in self.get_ether_addrs():
3901            if re.match(config.LINK_LOCAL_REGEX_PATTERN, addr, re.I):
3902                return addr
3903
3904        return None
3905
3906    def ping(self, *args, **kwargs):
3907        backbone = kwargs.pop('backbone', False)
3908        if backbone:
3909            return self.ping_ether(*args, **kwargs)
3910        else:
3911            return super().ping(*args, **kwargs)
3912
3913    def udp_send_host(self, ipaddr, port, data, hop_limit=None):
3914        if hop_limit is None:
3915            if ipaddress.ip_address(ipaddr).is_multicast:
3916                hop_limit = 10
3917            else:
3918                hop_limit = 64
3919        cmd = f'python3 /app/third_party/openthread/repo/tests/scripts/thread-cert/udp_send_host.py {ipaddr} {port} "{data}" {hop_limit}'
3920        self.bash(cmd)
3921
3922    def add_ipmaddr(self, *args, **kwargs):
3923        backbone = kwargs.pop('backbone', False)
3924        if backbone:
3925            return self.add_ipmaddr_ether(*args, **kwargs)
3926        else:
3927            return super().add_ipmaddr(*args, **kwargs)
3928
3929    def ip_neighbors_flush(self):
3930        # clear neigh cache on linux
3931        self.bash(f'ip -6 neigh list dev {self.ETH_DEV}')
3932        self.bash(f'ip -6 neigh flush nud all nud failed nud noarp dev {self.ETH_DEV}')
3933        self.bash('ip -6 neigh list nud all dev %s | cut -d " " -f1 | sudo xargs -I{} ip -6 neigh delete {} dev %s' %
3934                  (self.ETH_DEV, self.ETH_DEV))
3935        self.bash(f'ip -6 neigh list dev {self.ETH_DEV}')
3936
3937    def publish_mdns_service(self, instance_name, service_type, port, host_name, txt):
3938        """Publish an mDNS service on the Ethernet.
3939
3940        :param instance_name: the service instance name.
3941        :param service_type: the service type in format of '<service_type>.<protocol>'.
3942        :param port: the port the service is at.
3943        :param host_name: the host name this service points to. The domain
3944                          should not be included.
3945        :param txt: a dictionary containing the key-value pairs of the TXT record.
3946        """
3947        txt_string = ' '.join([f'{key}={value}' for key, value in txt.items()])
3948        self.bash(f'avahi-publish -s {instance_name}  {service_type} {port} -H {host_name}.local {txt_string} &')
3949
3950    def publish_mdns_host(self, hostname, addresses):
3951        """Publish an mDNS host on the Ethernet
3952
3953        :param host_name: the host name this service points to. The domain
3954                          should not be included.
3955        :param addresses: a list of strings representing the addresses to
3956                          be registered with the host.
3957        """
3958        for address in addresses:
3959            self.bash(f'avahi-publish -a {hostname}.local {address} &')
3960
3961    def browse_mdns_services(self, name, timeout=2):
3962        """ Browse mDNS services on the ethernet.
3963
3964        :param name: the service type name in format of '<service-name>.<protocol>'.
3965        :param timeout: timeout value in seconds before returning.
3966        :return: A list of service instance names.
3967        """
3968
3969        self.bash(f'dns-sd -Z {name} local. > /tmp/{name} 2>&1 &')
3970        time.sleep(timeout)
3971        self.bash('pkill dns-sd')
3972
3973        instances = []
3974        for line in self.bash(f'cat /tmp/{name}', encoding='raw_unicode_escape'):
3975            elements = line.split()
3976            if len(elements) >= 3 and elements[0] == name and elements[1] == 'PTR':
3977                instances.append(elements[2][:-len('.' + name)])
3978        return instances
3979
3980    def discover_mdns_service(self, instance, name, host_name, timeout=2):
3981        """ Discover/resolve the mDNS service on ethernet.
3982
3983        :param instance: the service instance name.
3984        :param name: the service name in format of '<service-name>.<protocol>'.
3985        :param host_name: the host name this service points to. The domain
3986                          should not be included.
3987        :param timeout: timeout value in seconds before returning.
3988        :return: a dict of service properties or None.
3989
3990        The return value is a dict with the same key/values of srp_server_get_service
3991        except that we don't have a `deleted` field here.
3992        """
3993        host_name_file = self.bash('mktemp')[0].strip()
3994        service_data_file = self.bash('mktemp')[0].strip()
3995
3996        self.bash(f'dns-sd -Z {name} local. > {service_data_file} 2>&1 &')
3997        time.sleep(timeout)
3998
3999        full_service_name = f'{instance}.{name}'
4000        # When hostname is unspecified, extract hostname from browse result
4001        if host_name is None:
4002            for line in self.bash(f'cat {service_data_file}', encoding='raw_unicode_escape'):
4003                elements = line.split()
4004                if len(elements) >= 6 and elements[0] == full_service_name and elements[1] == 'SRV':
4005                    host_name = elements[5].split('.')[0]
4006                    break
4007
4008        assert (host_name is not None)
4009        self.bash(f'dns-sd -G v6 {host_name}.local. > {host_name_file} 2>&1 &')
4010        time.sleep(timeout)
4011
4012        self.bash('pkill dns-sd')
4013        addresses = []
4014        service = {}
4015
4016        logging.debug(self.bash(f'cat {host_name_file}', encoding='raw_unicode_escape'))
4017        logging.debug(self.bash(f'cat {service_data_file}', encoding='raw_unicode_escape'))
4018
4019        # example output in the host file:
4020        # Timestamp     A/R Flags if Hostname                               Address                                     TTL
4021        # 9:38:09.274  Add     23 48 my-host.local.                         2001:0000:0000:0000:0000:0000:0000:0002%<0>  120
4022        #
4023        for line in self.bash(f'cat {host_name_file}', encoding='raw_unicode_escape'):
4024            elements = line.split()
4025            fullname = f'{host_name}.local.'
4026            if fullname not in elements:
4027                continue
4028            if 'Add' not in elements:
4029                continue
4030            addresses.append(elements[elements.index(fullname) + 1].split('%')[0])
4031
4032        logging.debug(f'addresses of {host_name}: {addresses}')
4033
4034        # example output of in the service file:
4035        # _ipps._tcp                                      PTR     my-service._ipps._tcp
4036        # my-service._ipps._tcp                           SRV     0 0 12345 my-host.local. ; Replace with unicast FQDN of target host
4037        # my-service._ipps._tcp                           TXT     ""
4038        #
4039        is_txt = False
4040        txt = ''
4041        for line in self.bash(f'cat {service_data_file}', encoding='raw_unicode_escape'):
4042            elements = line.split()
4043            if len(elements) >= 2 and elements[0] == full_service_name and elements[1] == 'TXT':
4044                is_txt = True
4045            if is_txt:
4046                txt += line.strip()
4047                if line.strip().endswith('"'):
4048                    is_txt = False
4049                    txt_dict = self.__parse_dns_sd_txt(txt)
4050                    logging.info(f'txt = {txt_dict}')
4051                    service['txt'] = txt_dict
4052
4053            if not elements or elements[0] != full_service_name:
4054                continue
4055            if elements[1] == 'SRV':
4056                service['fullname'] = elements[0]
4057                service['instance'] = instance
4058                service['name'] = name
4059                service['priority'] = int(elements[2])
4060                service['weight'] = int(elements[3])
4061                service['port'] = int(elements[4])
4062                service['host_fullname'] = elements[5]
4063                assert (service['host_fullname'] == f'{host_name}.local.')
4064                service['host'] = host_name
4065                service['addresses'] = addresses
4066        return service or None
4067
4068    def start_radvd_service(self, prefix, slaac):
4069        self.bash("""cat >/etc/radvd.conf <<EOF
4070interface eth0
4071{
4072    AdvSendAdvert on;
4073
4074    AdvReachableTime 200;
4075    AdvRetransTimer 200;
4076    AdvDefaultLifetime 1800;
4077    MinRtrAdvInterval 1200;
4078    MaxRtrAdvInterval 1800;
4079    AdvDefaultPreference low;
4080
4081    prefix %s
4082    {
4083        AdvOnLink on;
4084        AdvAutonomous %s;
4085        AdvRouterAddr off;
4086        AdvPreferredLifetime 1800;
4087        AdvValidLifetime 1800;
4088    };
4089};
4090EOF
4091""" % (prefix, 'on' if slaac else 'off'))
4092        self.bash('service radvd start')
4093        self.bash('service radvd status')  # Make sure radvd service is running
4094
4095    def stop_radvd_service(self):
4096        self.bash('service radvd stop')
4097
4098    def kill_radvd_service(self):
4099        self.bash('pkill radvd')
4100
4101    def __parse_dns_sd_txt(self, line: str):
4102        # Example TXT entry:
4103        # "xp=\\000\\013\\184\\000\\000\\000\\000\\000"
4104        txt = {}
4105        for entry in re.findall(r'"((?:[^\\]|\\.)*?)"', line):
4106            if '=' not in entry:
4107                continue
4108
4109            k, v = entry.split('=', 1)
4110            txt[k] = v
4111
4112        return txt
4113
4114
4115class OtbrNode(LinuxHost, NodeImpl, OtbrDocker):
4116    TUN_DEV = config.THREAD_IFNAME
4117    is_otbr = True
4118    is_bbr = True  # OTBR is also BBR
4119    node_type = 'otbr-docker'
4120
4121    def __repr__(self):
4122        return f'Otbr<{self.nodeid}>'
4123
4124    def start(self):
4125        self._setup_sysctl()
4126        self.set_log_level(5)
4127        super().start()
4128
4129    def add_ipaddr(self, addr):
4130        cmd = f'ip -6 addr add {addr}/64 dev {self.TUN_DEV}'
4131        self.bash(cmd)
4132
4133    def add_ipmaddr_tun(self, ip: str):
4134        cmd = f'python3 /app/third_party/openthread/repo/tests/scripts/thread-cert/mcast6.py {self.TUN_DEV} {ip} &'
4135        self.bash(cmd)
4136
4137    def get_ip6_address(self, address_type: config.ADDRESS_TYPE):
4138        try:
4139            return super(OtbrNode, self).get_ip6_address(address_type)
4140        except Exception as e:
4141            return super(LinuxHost, self).get_ip6_address(address_type)
4142
4143
4144class HostNode(LinuxHost, OtbrDocker):
4145    is_host = True
4146
4147    def __init__(self, nodeid, name=None, **kwargs):
4148        self.nodeid = nodeid
4149        self.name = name or ('Host%d' % nodeid)
4150        super().__init__(nodeid, **kwargs)
4151        self.bash('service otbr-agent stop')
4152
4153    def start(self, start_radvd=True, prefix=config.DOMAIN_PREFIX, slaac=False):
4154        self._setup_sysctl()
4155        if start_radvd:
4156            self.start_radvd_service(prefix, slaac)
4157        else:
4158            self.stop_radvd_service()
4159
4160    def stop(self):
4161        self.stop_radvd_service()
4162
4163    def get_addrs(self) -> List[str]:
4164        return self.get_ether_addrs()
4165
4166    def __repr__(self):
4167        return f'Host<{self.nodeid}>'
4168
4169    def get_matched_ula_addresses(self, prefix):
4170        """Get the IPv6 addresses that matches given prefix.
4171        """
4172
4173        addrs = []
4174        for addr in self.get_ip6_address(config.ADDRESS_TYPE.ONLINK_ULA):
4175            if IPv6Address(addr) in IPv6Network(prefix):
4176                addrs.append(addr)
4177
4178        return addrs
4179
4180
4181if __name__ == '__main__':
4182    unittest.main()
4183