1#!/usr/bin/env python3
2#
3#  Copyright (c) 2016, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29
30import json
31import binascii
32import ipaddress
33import logging
34import os
35import re
36import shlex
37import socket
38import subprocess
39import sys
40import time
41import traceback
42import typing
43import unittest
44from ipaddress import IPv6Address, IPv6Network
45from typing import Union, Dict, Optional, List, Any
46
47import pexpect
48import pexpect.popen_spawn
49
50import config
51import simulator
52import thread_cert
53
54PORT_OFFSET = int(os.getenv('PORT_OFFSET', "0"))
55
56INFRA_DNS64 = int(os.getenv('NAT64', 0))
57
58
59class OtbrDocker:
60    RESET_DELAY = 3
61
62    _socat_proc = None
63    _ot_rcp_proc = None
64    _docker_proc = None
65    _border_routing_counters = None
66
67    def __init__(self, nodeid: int, backbone_network: str, **kwargs):
68        self.verbose = int(float(os.getenv('VERBOSE', 0)))
69
70        assert backbone_network is not None
71        self.backbone_network = backbone_network
72        try:
73            self._docker_name = config.OTBR_DOCKER_NAME_PREFIX + str(nodeid)
74            self._prepare_ot_rcp_sim(nodeid)
75            self._launch_docker()
76        except Exception:
77            traceback.print_exc()
78            self.destroy()
79            raise
80
81    def _prepare_ot_rcp_sim(self, nodeid: int):
82        self._socat_proc = subprocess.Popen(['socat', '-d', '-d', 'pty,raw,echo=0', 'pty,raw,echo=0'],
83                                            stderr=subprocess.PIPE,
84                                            stdin=subprocess.DEVNULL,
85                                            stdout=subprocess.DEVNULL)
86
87        line = self._socat_proc.stderr.readline().decode('ascii').strip()
88        self._rcp_device_pty = rcp_device_pty = line[line.index('PTY is /dev') + 7:]
89        line = self._socat_proc.stderr.readline().decode('ascii').strip()
90        self._rcp_device = rcp_device = line[line.index('PTY is /dev') + 7:]
91        logging.info(f"socat running: device PTY: {rcp_device_pty}, device: {rcp_device}")
92
93        ot_rcp_path = self._get_ot_rcp_path()
94        self._ot_rcp_proc = subprocess.Popen(f"{ot_rcp_path} {nodeid} > {rcp_device_pty} < {rcp_device_pty}",
95                                             shell=True,
96                                             stdin=subprocess.DEVNULL,
97                                             stdout=subprocess.DEVNULL,
98                                             stderr=subprocess.DEVNULL)
99
100        try:
101            self._ot_rcp_proc.wait(1)
102        except subprocess.TimeoutExpired:
103            # We expect ot-rcp not to quit in 1 second.
104            pass
105        else:
106            raise Exception(f"ot-rcp {nodeid} exited unexpectedly!")
107
108    def _get_ot_rcp_path(self) -> str:
109        srcdir = os.environ['top_builddir']
110        path = '%s/examples/apps/ncp/ot-rcp' % srcdir
111        logging.info("ot-rcp path: %s", path)
112        return path
113
114    def _launch_docker(self):
115        logging.info(f'Docker image: {config.OTBR_DOCKER_IMAGE}')
116        subprocess.check_call(f"docker rm -f {self._docker_name} || true", shell=True)
117        CI_ENV = os.getenv('CI_ENV', '').split()
118        dns = ['--dns=127.0.0.1'] if INFRA_DNS64 == 1 else ['--dns=8.8.8.8']
119        nat64_prefix = ['--nat64-prefix', '2001:db8:1:ffff::/96'] if INFRA_DNS64 == 1 else []
120        os.makedirs('/tmp/coverage/', exist_ok=True)
121
122        cmd = ['docker', 'run'] + CI_ENV + [
123            '--rm',
124            '--name',
125            self._docker_name,
126            '--network',
127            self.backbone_network,
128        ] + dns + [
129            '-i',
130            '--sysctl',
131            'net.ipv6.conf.all.disable_ipv6=0 net.ipv4.conf.all.forwarding=1 net.ipv6.conf.all.forwarding=1',
132            '--privileged',
133            '--cap-add=NET_ADMIN',
134            '--volume',
135            f'{self._rcp_device}:/dev/ttyUSB0',
136            '-v',
137            '/tmp/coverage/:/tmp/coverage/',
138            config.OTBR_DOCKER_IMAGE,
139            '-B',
140            config.BACKBONE_IFNAME,
141            '--trel-url',
142            f'trel://{config.BACKBONE_IFNAME}',
143        ] + nat64_prefix
144        logging.info(' '.join(cmd))
145        self._docker_proc = subprocess.Popen(cmd,
146                                             stdin=subprocess.DEVNULL,
147                                             stdout=sys.stdout if self.verbose else subprocess.DEVNULL,
148                                             stderr=sys.stderr if self.verbose else subprocess.DEVNULL)
149
150        launch_docker_deadline = time.time() + 300
151        launch_ok = False
152
153        while time.time() < launch_docker_deadline:
154            try:
155                subprocess.check_call(f'docker exec -i {self._docker_name} ot-ctl state', shell=True)
156                launch_ok = True
157                logging.info("OTBR Docker %s on %s Is Ready!", self._docker_name, self.backbone_network)
158                break
159            except subprocess.CalledProcessError:
160                time.sleep(5)
161                continue
162
163        assert launch_ok
164
165        self.start_ot_ctl()
166
167    def __repr__(self):
168        return f'OtbrDocker<{self.nodeid}>'
169
170    def start_otbr_service(self):
171        self.bash('service otbr-agent start')
172        self.simulator.go(3)
173        self.start_ot_ctl()
174
175    def stop_otbr_service(self):
176        self.stop_ot_ctl()
177        self.bash('service otbr-agent stop')
178
179    def stop_mdns_service(self):
180        self.bash('service avahi-daemon stop; service mdns stop; !(cat /proc/net/udp | grep -i :14E9)')
181
182    def start_mdns_service(self):
183        self.bash('service avahi-daemon start; service mdns start; cat /proc/net/udp | grep -i :14E9')
184
185    def start_ot_ctl(self):
186        cmd = f'docker exec -i {self._docker_name} ot-ctl'
187        self.pexpect = pexpect.popen_spawn.PopenSpawn(cmd, timeout=30)
188        if self.verbose:
189            self.pexpect.logfile_read = sys.stdout.buffer
190
191        # Add delay to ensure that the process is ready to receive commands.
192        timeout = 0.4
193        while timeout > 0:
194            self.pexpect.send('\r\n')
195            try:
196                self.pexpect.expect('> ', timeout=0.1)
197                break
198            except pexpect.TIMEOUT:
199                timeout -= 0.1
200
201    def stop_ot_ctl(self):
202        self.pexpect.sendeof()
203        self.pexpect.wait()
204        self.pexpect.proc.kill()
205
206    def reserve_udp_port(self, port):
207        self.bash(f'socat -u UDP6-LISTEN:{port},bindtodevice=wpan0 - &')
208
209    def destroy(self):
210        logging.info("Destroying %s", self)
211        self._shutdown_docker()
212        self._shutdown_ot_rcp()
213        self._shutdown_socat()
214
215    def _shutdown_docker(self):
216        if self._docker_proc is None:
217            return
218
219        try:
220            COVERAGE = int(os.getenv('COVERAGE', '0'))
221            OTBR_COVERAGE = int(os.getenv('OTBR_COVERAGE', '0'))
222            test_name = os.getenv('TEST_NAME')
223            unique_node_id = f'{test_name}-{PORT_OFFSET}-{self.nodeid}'
224
225            if COVERAGE or OTBR_COVERAGE:
226                self.bash('service otbr-agent stop')
227
228                cov_file_path = f'/tmp/coverage/coverage-{unique_node_id}.info'
229                # Upload OTBR code coverage if OTBR_COVERAGE=1, otherwise OpenThread code coverage.
230                if OTBR_COVERAGE:
231                    codecov_cmd = f'lcov --directory . --capture --output-file {cov_file_path}'
232                else:
233                    codecov_cmd = ('lcov --directory build/otbr/third_party/openthread/repo --capture '
234                                   f'--output-file {cov_file_path}')
235
236                self.bash(codecov_cmd)
237
238            copyCore = subprocess.run(f'docker cp {self._docker_name}:/core ./coredump_{unique_node_id}', shell=True)
239            if copyCore.returncode == 0:
240                subprocess.check_call(
241                    f'docker cp {self._docker_name}:/usr/sbin/otbr-agent ./otbr-agent_{unique_node_id}', shell=True)
242
243        finally:
244            subprocess.check_call(f"docker rm -f {self._docker_name}", shell=True)
245            self._docker_proc.wait()
246            del self._docker_proc
247
248    def _shutdown_ot_rcp(self):
249        if self._ot_rcp_proc is not None:
250            self._ot_rcp_proc.kill()
251            self._ot_rcp_proc.wait()
252            del self._ot_rcp_proc
253
254    def _shutdown_socat(self):
255        if self._socat_proc is not None:
256            self._socat_proc.stderr.close()
257            self._socat_proc.kill()
258            self._socat_proc.wait()
259            del self._socat_proc
260
261    def bash(self, cmd: str, encoding='ascii') -> List[str]:
262        logging.info("%s $ %s", self, cmd)
263        proc = subprocess.Popen(['docker', 'exec', '-i', self._docker_name, 'bash', '-c', cmd],
264                                stdin=subprocess.DEVNULL,
265                                stdout=subprocess.PIPE,
266                                stderr=sys.stderr,
267                                encoding=encoding)
268
269        with proc:
270
271            lines = []
272
273            while True:
274                line = proc.stdout.readline()
275
276                if not line:
277                    break
278
279                lines.append(line)
280                logging.info("%s $ %r", self, line.rstrip('\r\n'))
281
282            proc.wait()
283
284            if proc.returncode != 0:
285                raise subprocess.CalledProcessError(proc.returncode, cmd, ''.join(lines))
286            else:
287                return lines
288
289    def dns_dig(self, server: str, name: str, qtype: str):
290        """
291        Run dig command to query a DNS server.
292
293        Args:
294            server: the server address.
295            name: the name to query.
296            qtype: the query type (e.g. AAAA, PTR, TXT, SRV).
297
298        Returns:
299            The dig result similar as below:
300            {
301                "opcode": "QUERY",
302                "status": "NOERROR",
303                "id": "64144",
304                "QUESTION": [
305                    ('google.com.', 'IN', 'AAAA')
306                ],
307                "ANSWER": [
308                    ('google.com.', 107,	'IN', 'AAAA', '2404:6800:4008:c00::71'),
309                    ('google.com.', 107,	'IN', 'AAAA', '2404:6800:4008:c00::8a'),
310                    ('google.com.', 107,	'IN', 'AAAA', '2404:6800:4008:c00::66'),
311                    ('google.com.', 107,	'IN', 'AAAA', '2404:6800:4008:c00::8b'),
312                ],
313                "ADDITIONAL": [
314                ],
315            }
316        """
317        output = self.bash(f'dig -6 @{server} \'{name}\' {qtype}', encoding='raw_unicode_escape')
318
319        section = None
320        dig_result = {
321            'QUESTION': [],
322            'ANSWER': [],
323            'ADDITIONAL': [],
324        }
325
326        for line in output:
327            line = line.strip()
328
329            if line.startswith(';; ->>HEADER<<- '):
330                headers = line[len(';; ->>HEADER<<- '):].split(', ')
331                for header in headers:
332                    key, val = header.split(': ')
333                    dig_result[key] = val
334
335                continue
336
337            if line == ';; QUESTION SECTION:':
338                section = 'QUESTION'
339                continue
340            elif line == ';; ANSWER SECTION:':
341                section = 'ANSWER'
342                continue
343            elif line == ';; ADDITIONAL SECTION:':
344                section = 'ADDITIONAL'
345                continue
346            elif section and not line:
347                section = None
348                continue
349
350            if section:
351                assert line
352
353                if section == 'QUESTION':
354                    assert line.startswith(';')
355                    line = line[1:]
356                record = list(line.split())
357
358                if section == 'QUESTION':
359                    if record[2] in ('SRV', 'TXT'):
360                        record[0] = self.__unescape_dns_instance_name(record[0])
361                else:
362                    record[1] = int(record[1])
363                    if record[3] == 'SRV':
364                        record[0] = self.__unescape_dns_instance_name(record[0])
365                        record[4], record[5], record[6] = map(int, [record[4], record[5], record[6]])
366                    elif record[3] == 'TXT':
367                        record[0] = self.__unescape_dns_instance_name(record[0])
368                        record[4:] = [self.__parse_dns_dig_txt(line)]
369                    elif record[3] == 'PTR':
370                        record[4] = self.__unescape_dns_instance_name(record[4])
371
372                dig_result[section].append(tuple(record))
373
374        return dig_result
375
376    def call_dbus_method(self, *args):
377        args = shlex.join([args[0], args[1], json.dumps(args[2:])])
378        return json.loads(
379            self.bash(f'python3 /app/third_party/openthread/repo/tests/scripts/thread-cert/call_dbus_method.py {args}')
380            [0])
381
382    def get_dbus_property(self, property_name):
383        return self.call_dbus_method('org.freedesktop.DBus.Properties', 'Get', 'io.openthread.BorderRouter',
384                                     property_name)
385
386    def set_dbus_property(self, property_name, property_value):
387        return self.call_dbus_method('org.freedesktop.DBus.Properties', 'Set', 'io.openthread.BorderRouter',
388                                     property_name, property_value)
389
390    def get_border_routing_counters(self):
391        counters = self.get_dbus_property('BorderRoutingCounters')
392        counters = {
393            'inbound_unicast': counters[0],
394            'inbound_multicast': counters[1],
395            'outbound_unicast': counters[2],
396            'outbound_multicast': counters[3],
397            'ra_rx': counters[4],
398            'ra_tx_success': counters[5],
399            'ra_tx_failure': counters[6],
400            'rs_rx': counters[7],
401            'rs_tx_success': counters[8],
402            'rs_tx_failure': counters[9],
403        }
404        logging.info(f'border routing counters: {counters}')
405        return counters
406
407    def _process_traffic_counters(self, counter):
408        return {
409            '4to6': {
410                'packets': counter[0],
411                'bytes': counter[1],
412            },
413            '6to4': {
414                'packets': counter[2],
415                'bytes': counter[3],
416            }
417        }
418
419    def _process_packet_counters(self, counter):
420        return {'4to6': {'packets': counter[0]}, '6to4': {'packets': counter[1]}}
421
422    def nat64_set_enabled(self, enable):
423        return self.call_dbus_method('io.openthread.BorderRouter', 'SetNat64Enabled', enable)
424
425    def activate_ephemeral_key_mode(self, lifetime):
426        return self.call_dbus_method('io.openthread.BorderRouter', 'ActivateEphemeralKeyMode', lifetime)
427
428    def deactivate_ephemeral_key_mode(self, retain_active_session):
429        return self.call_dbus_method('io.openthread.BorderRouter', 'DeactivateEphemeralKeyMode', retain_active_session)
430
431    @property
432    def nat64_cidr(self):
433        self.send_command('nat64 cidr')
434        cidr = self._expect_command_output()[0].strip()
435        return ipaddress.IPv4Network(cidr, strict=False)
436
437    @nat64_cidr.setter
438    def nat64_cidr(self, cidr: ipaddress.IPv4Network):
439        if not isinstance(cidr, ipaddress.IPv4Network):
440            raise ValueError("cidr is expected to be an instance of ipaddress.IPv4Network")
441        self.send_command(f'nat64 cidr {cidr}')
442        self._expect_done()
443
444    @property
445    def nat64_state(self):
446        state = self.get_dbus_property('Nat64State')
447        return {'PrefixManager': state[0], 'Translator': state[1]}
448
449    @property
450    def nat64_mappings(self):
451        return [{
452            'id': row[0],
453            'ip4': row[1],
454            'ip6': row[2],
455            'expiry': row[3],
456            'counters': {
457                'total': self._process_traffic_counters(row[4][0]),
458                'ICMP': self._process_traffic_counters(row[4][1]),
459                'UDP': self._process_traffic_counters(row[4][2]),
460                'TCP': self._process_traffic_counters(row[4][3]),
461            }
462        } for row in self.get_dbus_property('Nat64Mappings')]
463
464    @property
465    def nat64_counters(self):
466        res_error = self.get_dbus_property('Nat64ErrorCounters')
467        res_proto = self.get_dbus_property('Nat64ProtocolCounters')
468        return {
469            'protocol': {
470                'Total': self._process_traffic_counters(res_proto[0]),
471                'ICMP': self._process_traffic_counters(res_proto[1]),
472                'UDP': self._process_traffic_counters(res_proto[2]),
473                'TCP': self._process_traffic_counters(res_proto[3]),
474            },
475            'errors': {
476                'Unknown': self._process_packet_counters(res_error[0]),
477                'Illegal Pkt': self._process_packet_counters(res_error[1]),
478                'Unsup Proto': self._process_packet_counters(res_error[2]),
479                'No Mapping': self._process_packet_counters(res_error[3]),
480            }
481        }
482
483    @property
484    def nat64_traffic_counters(self):
485        res = self.get_dbus_property('Nat64TrafficCounters')
486        return {
487            'Total': self._process_traffic_counters(res[0]),
488            'ICMP': self._process_traffic_counters(res[1]),
489            'UDP': self._process_traffic_counters(res[2]),
490            'TCP': self._process_traffic_counters(res[3]),
491        }
492
493    @property
494    def dns_upstream_query_state(self):
495        return bool(self.get_dbus_property('DnsUpstreamQueryState'))
496
497    @dns_upstream_query_state.setter
498    def dns_upstream_query_state(self, value):
499        if type(value) is not bool:
500            raise ValueError("dns_upstream_query_state must be a bool")
501        return self.set_dbus_property('DnsUpstreamQueryState', value)
502
503    @property
504    def ephemeral_key_enabled(self):
505        return bool(self.get_dbus_property('EphemeralKeyEnabled'))
506
507    @ephemeral_key_enabled.setter
508    def ephemeral_key_enabled(self, value):
509        if type(value) is not bool:
510            raise ValueError("ephemeral_key_enabled must be a bool")
511        return self.set_dbus_property('EphemeralKeyEnabled', value)
512
513    def read_border_routing_counters_delta(self):
514        old_counters = self._border_routing_counters
515        new_counters = self.get_border_routing_counters()
516        self._border_routing_counters = new_counters
517        delta_counters = {}
518        if old_counters is None:
519            delta_counters = new_counters
520        else:
521            for i in ('inbound', 'outbound'):
522                for j in ('unicast', 'multicast'):
523                    key = f'{i}_{j}'
524                    assert (key in old_counters)
525                    assert (key in new_counters)
526                    value = [new_counters[key][0] - old_counters[key][0], new_counters[key][1] - old_counters[key][1]]
527                    delta_counters[key] = value
528        delta_counters = {
529            key: value for key, value in delta_counters.items() if not isinstance(value, int) and value[0] and value[1]
530        }
531
532        return delta_counters
533
534    @staticmethod
535    def __unescape_dns_instance_name(name: str) -> str:
536        new_name = []
537        i = 0
538        while i < len(name):
539            c = name[i]
540
541            if c == '\\':
542                assert i + 1 < len(name), name
543                if name[i + 1].isdigit():
544                    assert i + 3 < len(name) and name[i + 2].isdigit() and name[i + 3].isdigit(), name
545                    new_name.append(chr(int(name[i + 1:i + 4])))
546                    i += 3
547                else:
548                    new_name.append(name[i + 1])
549                    i += 1
550            else:
551                new_name.append(c)
552
553            i += 1
554
555        return ''.join(new_name)
556
557    def __parse_dns_dig_txt(self, line: str):
558        # Example TXT entry:
559        # "xp=\\000\\013\\184\\000\\000\\000\\000\\000"
560        txt = {}
561        for entry in re.findall(r'"((?:[^\\]|\\.)*?)"', line):
562            if entry == "":
563                continue
564
565            k, v = entry.split('=', 1)
566            txt[k] = v
567
568        return txt
569
570    def _setup_sysctl(self):
571        self.bash(f'sysctl net.ipv6.conf.{self.ETH_DEV}.accept_ra=2')
572        self.bash(f'sysctl net.ipv6.conf.{self.ETH_DEV}.accept_ra_rt_info_max_plen=64')
573
574
575class OtCli:
576    RESET_DELAY = 0.1
577
578    def __init__(self, nodeid, is_mtd=False, version=None, is_bbr=False, **kwargs):
579        self.verbose = int(float(os.getenv('VERBOSE', 0)))
580        self.node_type = os.getenv('NODE_TYPE', 'sim')
581        self.env_version = os.getenv('THREAD_VERSION', '1.1')
582        self.is_bbr = is_bbr
583        self._initialized = False
584        if os.getenv('COVERAGE', 0) and os.getenv('CC', 'gcc') == 'gcc':
585            self._cmd_prefix = '/usr/bin/env GCOV_PREFIX=%s/ot-run/%s/ot-gcda.%d ' % (os.getenv(
586                'top_srcdir', '.'), sys.argv[0], nodeid)
587        else:
588            self._cmd_prefix = ''
589
590        if version is not None:
591            self.version = version
592        else:
593            self.version = self.env_version
594
595        mode = os.environ.get('USE_MTD') == '1' and is_mtd and 'mtd' or 'ftd'
596
597        if self.node_type == 'soc':
598            self.__init_soc(nodeid)
599        elif self.node_type == 'ncp-sim':
600            # TODO use mode after ncp-mtd is available.
601            self.__init_ncp_sim(nodeid, 'ftd')
602        else:
603            self.__init_sim(nodeid, mode)
604
605        if self.verbose:
606            self.pexpect.logfile_read = sys.stdout.buffer
607
608        self._initialized = True
609
610    def __init_sim(self, nodeid, mode):
611        """ Initialize a simulation node. """
612
613        # Default command if no match below, will be overridden if below conditions are met.
614        cmd = './ot-cli-%s' % (mode)
615
616        # For Thread 1.2 MTD node, use ot-cli-mtd build regardless of OT_CLI_PATH
617        if self.version != '1.1' and mode == 'mtd' and 'top_builddir' in os.environ:
618            srcdir = os.environ['top_builddir']
619            cmd = '%s/examples/apps/cli/ot-cli-%s %d' % (srcdir, mode, nodeid)
620
621        # If Thread version of node matches the testing environment version.
622        elif self.version == self.env_version:
623            # Load Thread 1.2 BBR device when testing Thread 1.2 scenarios
624            # which requires device with Backbone functionality.
625            if self.version != '1.1' and self.is_bbr:
626                if 'OT_CLI_PATH_BBR' in os.environ:
627                    cmd = os.environ['OT_CLI_PATH_BBR']
628                elif 'top_builddir_1_4_bbr' in os.environ:
629                    srcdir = os.environ['top_builddir_1_4_bbr']
630                    cmd = '%s/examples/apps/cli/ot-cli-%s' % (srcdir, mode)
631
632            # Load Thread device of the testing environment version (may be 1.1 or 1.2)
633            else:
634                if 'OT_CLI_PATH' in os.environ:
635                    cmd = os.environ['OT_CLI_PATH']
636                elif 'top_builddir' in os.environ:
637                    srcdir = os.environ['top_builddir']
638                    cmd = '%s/examples/apps/cli/ot-cli-%s' % (srcdir, mode)
639
640            if 'RADIO_DEVICE' in os.environ:
641                cmd += ' --real-time-signal=+1 -v spinel+hdlc+uart://%s?forkpty-arg=%d' % (os.environ['RADIO_DEVICE'],
642                                                                                           nodeid)
643                self.is_posix = True
644            else:
645                cmd += ' %d' % nodeid
646
647        # Load Thread 1.1 node when testing Thread 1.2 scenarios for interoperability
648        elif self.version == '1.1':
649            # Posix app
650            if 'OT_CLI_PATH_1_1' in os.environ:
651                cmd = os.environ['OT_CLI_PATH_1_1']
652            elif 'top_builddir_1_1' in os.environ:
653                srcdir = os.environ['top_builddir_1_1']
654                cmd = '%s/examples/apps/cli/ot-cli-%s' % (srcdir, mode)
655
656            if 'RADIO_DEVICE_1_1' in os.environ:
657                cmd += ' --real-time-signal=+1 -v spinel+hdlc+uart://%s?forkpty-arg=%d' % (
658                    os.environ['RADIO_DEVICE_1_1'], nodeid)
659                self.is_posix = True
660            else:
661                cmd += ' %d' % nodeid
662
663        print("%s" % cmd)
664
665        self.pexpect = pexpect.popen_spawn.PopenSpawn(self._cmd_prefix + cmd, timeout=10)
666
667        # Add delay to ensure that the process is ready to receive commands.
668        timeout = 0.4
669        while timeout > 0:
670            self.pexpect.send('\r\n')
671            try:
672                self.pexpect.expect('> ', timeout=0.1)
673                break
674            except pexpect.TIMEOUT:
675                timeout -= 0.1
676
677    def __init_ncp_sim(self, nodeid, mode):
678        """ Initialize an NCP simulation node. """
679
680        # Default command if no match below, will be overridden if below conditions are met.
681        cmd = 'spinel-cli.py -p ./ot-ncp-%s -n' % mode
682
683        # If Thread version of node matches the testing environment version.
684        if self.version == self.env_version:
685            if 'RADIO_DEVICE' in os.environ:
686                args = ' --real-time-signal=+1 spinel+hdlc+uart://%s?forkpty-arg=%d' % (os.environ['RADIO_DEVICE'],
687                                                                                        nodeid)
688                self.is_posix = True
689            else:
690                args = ''
691
692            # Load Thread 1.2 BBR device when testing Thread 1.2 scenarios
693            # which requires device with Backbone functionality.
694            if self.version != '1.1' and self.is_bbr:
695                if 'OT_NCP_PATH_1_4_BBR' in os.environ:
696                    cmd = 'spinel-cli.py -p "%s%s" -n' % (
697                        os.environ['OT_NCP_PATH_1_4_BBR'],
698                        args,
699                    )
700                elif 'top_builddir_1_4_bbr' in os.environ:
701                    srcdir = os.environ['top_builddir_1_4_bbr']
702                    cmd = '%s/examples/apps/ncp/ot-ncp-%s' % (srcdir, mode)
703                    cmd = 'spinel-cli.py -p "%s%s" -n' % (
704                        cmd,
705                        args,
706                    )
707
708            # Load Thread device of the testing environment version (may be 1.1 or 1.2).
709            else:
710                if 'OT_NCP_PATH' in os.environ:
711                    cmd = 'spinel-cli.py -p "%s%s" -n' % (
712                        os.environ['OT_NCP_PATH'],
713                        args,
714                    )
715                elif 'top_builddir' in os.environ:
716                    srcdir = os.environ['top_builddir']
717                    cmd = '%s/examples/apps/ncp/ot-ncp-%s' % (srcdir, mode)
718                    cmd = 'spinel-cli.py -p "%s%s" -n' % (
719                        cmd,
720                        args,
721                    )
722
723        # Load Thread 1.1 node when testing Thread 1.2 scenarios for interoperability.
724        elif self.version == '1.1':
725            if 'RADIO_DEVICE_1_1' in os.environ:
726                args = ' --real-time-signal=+1 spinel+hdlc+uart://%s?forkpty-arg=%d' % (os.environ['RADIO_DEVICE_1_1'],
727                                                                                        nodeid)
728                self.is_posix = True
729            else:
730                args = ''
731
732            if 'OT_NCP_PATH_1_1' in os.environ:
733                cmd = 'spinel-cli.py -p "%s%s" -n' % (
734                    os.environ['OT_NCP_PATH_1_1'],
735                    args,
736                )
737            elif 'top_builddir_1_1' in os.environ:
738                srcdir = os.environ['top_builddir_1_1']
739                cmd = '%s/examples/apps/ncp/ot-ncp-%s' % (srcdir, mode)
740                cmd = 'spinel-cli.py -p "%s%s" -n' % (
741                    cmd,
742                    args,
743                )
744
745        cmd += ' %d' % nodeid
746        print("%s" % cmd)
747
748        self.pexpect = pexpect.spawn(self._cmd_prefix + cmd, timeout=10)
749
750        # Add delay to ensure that the process is ready to receive commands.
751        time.sleep(0.2)
752        self._expect('spinel-cli >')
753        self.debug(int(os.getenv('DEBUG', '0')))
754
755    def __init_soc(self, nodeid):
756        """ Initialize a System-on-a-chip node connected via UART. """
757        import fdpexpect
758
759        serialPort = '/dev/ttyUSB%d' % ((nodeid - 1) * 2)
760        self.pexpect = fdpexpect.fdspawn(os.open(serialPort, os.O_RDWR | os.O_NONBLOCK | os.O_NOCTTY))
761
762    def destroy(self):
763        if not self._initialized:
764            return
765
766        if (hasattr(self.pexpect, 'proc') and self.pexpect.proc.poll() is None or
767                not hasattr(self.pexpect, 'proc') and self.pexpect.isalive()):
768            print("%d: exit" % self.nodeid)
769            self.pexpect.send('exit\n')
770            self.pexpect.expect(pexpect.EOF)
771            self.pexpect.wait()
772            self._initialized = False
773
774
775class NodeImpl:
776    is_host = False
777    is_otbr = False
778
779    def __init__(self, nodeid, name=None, simulator=None, **kwargs):
780        self.nodeid = nodeid
781        self.name = name or ('Node%d' % nodeid)
782        self.is_posix = False
783
784        self.simulator = simulator
785        if self.simulator:
786            self.simulator.add_node(self)
787
788        super().__init__(nodeid, **kwargs)
789
790        self.set_addr64('%016x' % (thread_cert.EXTENDED_ADDRESS_BASE + nodeid))
791
792    def _expect(self, pattern, timeout=-1, *args, **kwargs):
793        """ Process simulator events until expected the pattern. """
794        if timeout == -1:
795            timeout = self.pexpect.timeout
796
797        assert timeout > 0
798
799        while timeout > 0:
800            try:
801                return self.pexpect.expect(pattern, 0.1, *args, **kwargs)
802            except pexpect.TIMEOUT:
803                timeout -= 0.1
804                self.simulator.go(0)
805                if timeout <= 0:
806                    raise
807
808    def _expect_done(self, timeout=-1):
809        self._expect('Done', timeout)
810
811    def _expect_result(self, pattern, *args, **kwargs):
812        """Expect a single matching result.
813
814        The arguments are identical to pexpect.expect().
815
816        Returns:
817            The matched line.
818        """
819        results = self._expect_results(pattern, *args, **kwargs)
820        assert len(results) == 1, results
821        return results[0]
822
823    def _expect_results(self, pattern, *args, **kwargs):
824        """Expect multiple matching results.
825
826        The arguments are identical to pexpect.expect().
827
828        Returns:
829            The matched lines.
830        """
831        output = self._expect_command_output()
832        results = [line for line in output if self._match_pattern(line, pattern)]
833        return results
834
835    def _expect_key_value_pairs(self, pattern, separator=': '):
836        """Expect 'key: value' in multiple lines.
837
838        Returns:
839            Dictionary of the key:value pairs.
840        """
841        result = {}
842        for line in self._expect_results(pattern):
843            key, val = line.split(separator)
844            result.update({key: val})
845        return result
846
847    @staticmethod
848    def _match_pattern(line, pattern):
849        if isinstance(pattern, str):
850            pattern = re.compile(pattern)
851
852        if isinstance(pattern, typing.Pattern):
853            return pattern.match(line)
854        else:
855            return any(NodeImpl._match_pattern(line, p) for p in pattern)
856
857    def _expect_command_output(self, ignore_logs=True):
858        lines = []
859
860        while True:
861            line = self.__readline(ignore_logs=ignore_logs)
862
863            if line == 'Done':
864                break
865            elif line.startswith('Error '):
866                raise Exception(line)
867            else:
868                lines.append(line)
869
870        print(f'_expect_command_output() returns {lines!r}')
871        return lines
872
873    def __is_logging_line(self, line: str) -> bool:
874        return len(line) >= 3 and line[:3] in {'[D]', '[I]', '[N]', '[W]', '[C]', '[-]'}
875
876    def read_cert_messages_in_commissioning_log(self, timeout=-1):
877        """Get the log of the traffic after DTLS handshake.
878        """
879        format_str = br"=+?\[\[THCI\].*?type=%s.*?\].*?=+?[\s\S]+?-{40,}"
880        join_fin_req = format_str % br"JOIN_FIN\.req"
881        join_fin_rsp = format_str % br"JOIN_FIN\.rsp"
882        dummy_format_str = br"\[THCI\].*?type=%s.*?"
883        join_ent_ntf = dummy_format_str % br"JOIN_ENT\.ntf"
884        join_ent_rsp = dummy_format_str % br"JOIN_ENT\.rsp"
885        pattern = (b"(" + join_fin_req + b")|(" + join_fin_rsp + b")|(" + join_ent_ntf + b")|(" + join_ent_rsp + b")")
886
887        messages = []
888        # There are at most 4 cert messages both for joiner and commissioner
889        for _ in range(0, 4):
890            try:
891                self._expect(pattern, timeout=timeout)
892                log = self.pexpect.match.group(0)
893                messages.append(self._extract_cert_message(log))
894            except BaseException:
895                break
896        return messages
897
898    def _extract_cert_message(self, log):
899        res = re.search(br"direction=\w+", log)
900        assert res
901        direction = res.group(0).split(b'=')[1].strip()
902
903        res = re.search(br"type=\S+", log)
904        assert res
905        type = res.group(0).split(b'=')[1].strip()
906
907        payload = bytearray([])
908        payload_len = 0
909        if type in [b"JOIN_FIN.req", b"JOIN_FIN.rsp"]:
910            res = re.search(br"len=\d+", log)
911            assert res
912            payload_len = int(res.group(0).split(b'=')[1].strip())
913
914            hex_pattern = br"\|(\s([0-9a-fA-F]{2}|\.\.))+?\s+?\|"
915            while True:
916                res = re.search(hex_pattern, log)
917                if not res:
918                    break
919                data = [int(hex, 16) for hex in res.group(0)[1:-1].split(b' ') if hex and hex != b'..']
920                payload += bytearray(data)
921                log = log[res.end() - 1:]
922        assert len(payload) == payload_len
923        return (direction, type, payload)
924
925    def send_command(self, cmd, go=True, expect_command_echo=True):
926        print("%d: %s" % (self.nodeid, cmd))
927        self.pexpect.send(cmd + '\n')
928        if go:
929            self.simulator.go(0, nodeid=self.nodeid)
930        sys.stdout.flush()
931
932        if expect_command_echo:
933            self._expect_command_echo(cmd)
934
935    def _expect_command_echo(self, cmd):
936        cmd = cmd.strip()
937        while True:
938            line = self.__readline()
939            if line.strip() == cmd:
940                break
941
942            logging.warning("expecting echo %r, but read %r", cmd, line)
943
944    def __readline(self, ignore_logs=True):
945        PROMPT = 'spinel-cli > ' if self.node_type == 'ncp-sim' else '> '
946        while True:
947            self._expect(r"[^\n]+\n")
948            line = self.pexpect.match.group(0).decode('utf8').strip()
949            while line.startswith(PROMPT):
950                line = line[len(PROMPT):]
951
952            if line == '':
953                continue
954
955            if ignore_logs and self.__is_logging_line(line):
956                continue
957
958            return line
959
960    def get_commands(self):
961        self.send_command('?')
962        self._expect('Commands:')
963        return self._expect_results(r'\S+')
964
965    def set_mode(self, mode):
966        cmd = 'mode %s' % mode
967        self.send_command(cmd)
968        self._expect_done()
969
970    def debug(self, level):
971        # `debug` command will not trigger interaction with simulator
972        self.send_command('debug %d' % level, go=False)
973
974    def start(self):
975        self.interface_up()
976        self.thread_start()
977
978    def stop(self):
979        self.thread_stop()
980        self.interface_down()
981
982    def set_log_level(self, level: int):
983        self.send_command(f'log level {level}')
984        self._expect_done()
985
986    def interface_up(self):
987        self.send_command('ifconfig up')
988        self._expect_done()
989
990    def interface_down(self):
991        self.send_command('ifconfig down')
992        self._expect_done()
993
994    def thread_start(self):
995        self.send_command('thread start')
996        self._expect_done()
997
998    def thread_stop(self):
999        self.send_command('thread stop')
1000        self._expect_done()
1001
1002    def detach(self, is_async=False):
1003        cmd = 'detach'
1004        if is_async:
1005            cmd += ' async'
1006
1007        self.send_command(cmd)
1008
1009        if is_async:
1010            self._expect_done()
1011            return
1012
1013        end = self.simulator.now() + 4
1014        while True:
1015            self.simulator.go(1)
1016            try:
1017                self._expect_done(timeout=0.1)
1018                return
1019            except (pexpect.TIMEOUT, socket.timeout):
1020                if self.simulator.now() > end:
1021                    raise
1022
1023    def expect_finished_detaching(self):
1024        self._expect('Finished detaching')
1025
1026    def commissioner_start(self):
1027        cmd = 'commissioner start'
1028        self.send_command(cmd)
1029        self._expect_done()
1030
1031    def commissioner_stop(self):
1032        cmd = 'commissioner stop'
1033        self.send_command(cmd)
1034        self._expect_done()
1035
1036    def commissioner_state(self):
1037        states = [r'disabled', r'petitioning', r'active']
1038        self.send_command('commissioner state')
1039        return self._expect_result(states)
1040
1041    def commissioner_add_joiner(self, addr, psk):
1042        cmd = 'commissioner joiner add %s %s' % (addr, psk)
1043        self.send_command(cmd)
1044        self._expect_done()
1045
1046    def commissioner_set_provisioning_url(self, provisioning_url=''):
1047        cmd = 'commissioner provisioningurl %s' % provisioning_url
1048        self.send_command(cmd)
1049        self._expect_done()
1050
1051    def joiner_start(self, pskd='', provisioning_url=''):
1052        cmd = 'joiner start %s %s' % (pskd, provisioning_url)
1053        self.send_command(cmd)
1054        self._expect_done()
1055
1056    def clear_allowlist(self):
1057        cmd = 'macfilter addr clear'
1058        self.send_command(cmd)
1059        self._expect_done()
1060
1061    def enable_allowlist(self):
1062        cmd = 'macfilter addr allowlist'
1063        self.send_command(cmd)
1064        self._expect_done()
1065
1066    def disable_allowlist(self):
1067        cmd = 'macfilter addr disable'
1068        self.send_command(cmd)
1069        self._expect_done()
1070
1071    def add_allowlist(self, addr, rssi=None):
1072        cmd = 'macfilter addr add %s' % addr
1073
1074        if rssi is not None:
1075            cmd += ' %s' % rssi
1076
1077        self.send_command(cmd)
1078        self._expect_done()
1079
1080    def radiofilter_is_enabled(self) -> bool:
1081        states = [r'Disabled', r'Enabled']
1082        self.send_command('radiofilter')
1083        return self._expect_result(states) == 'Enabled'
1084
1085    def radiofilter_enable(self):
1086        cmd = 'radiofilter enable'
1087        self.send_command(cmd)
1088        self._expect_done()
1089
1090    def radiofilter_disable(self):
1091        cmd = 'radiofilter disable'
1092        self.send_command(cmd)
1093        self._expect_done()
1094
1095    def get_bbr_registration_jitter(self):
1096        self.send_command('bbr jitter')
1097        return int(self._expect_result(r'\d+'))
1098
1099    def set_bbr_registration_jitter(self, jitter):
1100        cmd = 'bbr jitter %d' % jitter
1101        self.send_command(cmd)
1102        self._expect_done()
1103
1104    def get_rcp_version(self) -> str:
1105        self.send_command('rcp version')
1106        rcp_version = self._expect_command_output()[0].strip()
1107        return rcp_version
1108
1109    def srp_server_get_state(self):
1110        states = ['disabled', 'running', 'stopped']
1111        self.send_command('srp server state')
1112        return self._expect_result(states)
1113
1114    def srp_server_get_addr_mode(self):
1115        modes = [r'unicast', r'anycast']
1116        self.send_command(f'srp server addrmode')
1117        return self._expect_result(modes)
1118
1119    def srp_server_set_addr_mode(self, mode):
1120        self.send_command(f'srp server addrmode {mode}')
1121        self._expect_done()
1122
1123    def srp_server_get_anycast_seq_num(self):
1124        self.send_command(f'srp server seqnum')
1125        return int(self._expect_result(r'\d+'))
1126
1127    def srp_server_set_anycast_seq_num(self, seqnum):
1128        self.send_command(f'srp server seqnum {seqnum}')
1129        self._expect_done()
1130
1131    def srp_server_set_enabled(self, enable):
1132        cmd = f'srp server {"enable" if enable else "disable"}'
1133        self.send_command(cmd)
1134        self._expect_done()
1135
1136    def srp_server_set_lease_range(self, min_lease, max_lease, min_key_lease, max_key_lease):
1137        self.send_command(f'srp server lease {min_lease} {max_lease} {min_key_lease} {max_key_lease}')
1138        self._expect_done()
1139
1140    def srp_server_set_ttl_range(self, min_ttl, max_ttl):
1141        self.send_command(f'srp server ttl {min_ttl} {max_ttl}')
1142        self._expect_done()
1143
1144    def srp_server_get_hosts(self):
1145        """Returns the host list on the SRP server as a list of property
1146           dictionary.
1147
1148           Example output:
1149           [{
1150               'fullname': 'my-host.default.service.arpa.',
1151               'name': 'my-host',
1152               'deleted': 'false',
1153               'addresses': ['2001::1', '2001::2']
1154           }]
1155        """
1156
1157        cmd = 'srp server host'
1158        self.send_command(cmd)
1159        lines = self._expect_command_output()
1160        host_list = []
1161        while lines:
1162            host = {}
1163
1164            host['fullname'] = lines.pop(0).strip()
1165            host['name'] = host['fullname'].split('.')[0]
1166
1167            host['deleted'] = lines.pop(0).strip().split(':')[1].strip()
1168            if host['deleted'] == 'true':
1169                host_list.append(host)
1170                continue
1171
1172            addresses = lines.pop(0).strip().split('[')[1].strip(' ]').split(',')
1173            map(str.strip, addresses)
1174            host['addresses'] = [addr.strip() for addr in addresses if addr]
1175
1176            host_list.append(host)
1177
1178        return host_list
1179
1180    def srp_server_get_host(self, host_name):
1181        """Returns host on the SRP server that matches given host name.
1182
1183           Example usage:
1184           self.srp_server_get_host("my-host")
1185        """
1186
1187        for host in self.srp_server_get_hosts():
1188            if host_name == host['name']:
1189                return host
1190
1191    def srp_server_get_services(self):
1192        """Returns the service list on the SRP server as a list of property
1193           dictionary.
1194
1195           Example output:
1196           [{
1197               'fullname': 'my-service._ipps._tcp.default.service.arpa.',
1198               'instance': 'my-service',
1199               'name': '_ipps._tcp',
1200               'deleted': 'false',
1201               'port': '12345',
1202               'priority': '0',
1203               'weight': '0',
1204               'ttl': '7200',
1205               'lease': '7200',
1206               'key-lease': '7200',
1207               'TXT': ['abc=010203'],
1208               'host_fullname': 'my-host.default.service.arpa.',
1209               'host': 'my-host',
1210               'addresses': ['2001::1', '2001::2']
1211           }]
1212
1213           Note that the TXT data is output as a HEX string.
1214        """
1215
1216        cmd = 'srp server service'
1217        self.send_command(cmd)
1218        lines = self._expect_command_output()
1219
1220        service_list = []
1221        while lines:
1222            service = {}
1223
1224            service['fullname'] = lines.pop(0).strip()
1225            name_labels = service['fullname'].split('.')
1226            service['instance'] = name_labels[0]
1227            service['name'] = '.'.join(name_labels[1:3])
1228
1229            service['deleted'] = lines.pop(0).strip().split(':')[1].strip()
1230            if service['deleted'] == 'true':
1231                service_list.append(service)
1232                continue
1233
1234            # 'subtypes', port', 'priority', 'weight', 'ttl', 'lease', and 'key-lease'
1235            for i in range(0, 7):
1236                key_value = lines.pop(0).strip().split(':')
1237                service[key_value[0].strip()] = key_value[1].strip()
1238
1239            txt_entries = lines.pop(0).strip().split('[')[1].strip(' ]').split(',')
1240            txt_entries = map(str.strip, txt_entries)
1241            service['TXT'] = [txt for txt in txt_entries if txt]
1242
1243            service['host_fullname'] = lines.pop(0).strip().split(':')[1].strip()
1244            service['host'] = service['host_fullname'].split('.')[0]
1245
1246            addresses = lines.pop(0).strip().split('[')[1].strip(' ]').split(',')
1247            addresses = map(str.strip, addresses)
1248            service['addresses'] = [addr for addr in addresses if addr]
1249
1250            service_list.append(service)
1251
1252        return service_list
1253
1254    def srp_server_get_service(self, instance_name, service_name):
1255        """Returns service on the SRP server that matches given instance
1256           name and service name.
1257
1258           Example usage:
1259           self.srp_server_get_service("my-service", "_ipps._tcp")
1260        """
1261
1262        for service in self.srp_server_get_services():
1263            if (instance_name == service['instance'] and service_name == service['name']):
1264                return service
1265
1266    def get_srp_server_port(self):
1267        """Returns the SRP server UDP port by parsing
1268           the SRP Server Data in Network Data.
1269        """
1270
1271        for service in self.get_services():
1272            # TODO: for now, we are using 0xfd as the SRP service data.
1273            #       May use a dedicated bit flag for SRP server.
1274            if int(service[1], 16) == 0x5d:
1275                # The SRP server data contains IPv6 address (16 bytes)
1276                # followed by UDP port number.
1277                return int(service[2][2 * 16:], 16)
1278
1279    def srp_client_start(self, server_address, server_port):
1280        self.send_command(f'srp client start {server_address} {server_port}')
1281        self._expect_done()
1282
1283    def srp_client_stop(self):
1284        self.send_command(f'srp client stop')
1285        self._expect_done()
1286
1287    def srp_client_get_state(self):
1288        cmd = 'srp client state'
1289        self.send_command(cmd)
1290        return self._expect_command_output()[0]
1291
1292    def srp_client_get_auto_start_mode(self):
1293        cmd = 'srp client autostart'
1294        self.send_command(cmd)
1295        return self._expect_command_output()[0]
1296
1297    def srp_client_enable_auto_start_mode(self):
1298        self.send_command(f'srp client autostart enable')
1299        self._expect_done()
1300
1301    def srp_client_disable_auto_start_mode(self):
1302        self.send_command(f'srp client autostart disable')
1303        self._expect_done()
1304
1305    def srp_client_get_server_address(self):
1306        cmd = 'srp client server address'
1307        self.send_command(cmd)
1308        return self._expect_command_output()[0]
1309
1310    def srp_client_get_server_port(self):
1311        cmd = 'srp client server port'
1312        self.send_command(cmd)
1313        return int(self._expect_command_output()[0])
1314
1315    def srp_client_get_host_state(self):
1316        cmd = 'srp client host state'
1317        self.send_command(cmd)
1318        return self._expect_command_output()[0]
1319
1320    def srp_client_set_host_name(self, name):
1321        self.send_command(f'srp client host name {name}')
1322        self._expect_done()
1323
1324    def srp_client_get_host_name(self):
1325        self.send_command(f'srp client host name')
1326        self._expect_done()
1327
1328    def srp_client_remove_host(self, remove_key=False, send_unreg_to_server=False):
1329        self.send_command(f'srp client host remove {int(remove_key)} {int(send_unreg_to_server)}')
1330        self._expect_done()
1331
1332    def srp_client_clear_host(self):
1333        self.send_command(f'srp client host clear')
1334        self._expect_done()
1335
1336    def srp_client_enable_auto_host_address(self):
1337        self.send_command(f'srp client host address auto')
1338        self._expect_done()
1339
1340    def srp_client_set_host_address(self, *addrs: str):
1341        self.send_command(f'srp client host address {" ".join(addrs)}')
1342        self._expect_done()
1343
1344    def srp_client_get_host_address(self):
1345        self.send_command(f'srp client host address')
1346        self._expect_done()
1347
1348    def srp_client_add_service(self,
1349                               instance_name,
1350                               service_name,
1351                               port,
1352                               priority=0,
1353                               weight=0,
1354                               txt_entries=[],
1355                               lease=0,
1356                               key_lease=0):
1357        txt_record = "".join(self._encode_txt_entry(entry) for entry in txt_entries)
1358        if txt_record == '':
1359            txt_record = '-'
1360        instance_name = self._escape_escapable(instance_name)
1361        self.send_command(
1362            f'srp client service add {instance_name} {service_name} {port} {priority} {weight} {txt_record} {lease} {key_lease}'
1363        )
1364        self._expect_done()
1365
1366    def srp_client_remove_service(self, instance_name, service_name):
1367        self.send_command(f'srp client service remove {instance_name} {service_name}')
1368        self._expect_done()
1369
1370    def srp_client_clear_service(self, instance_name, service_name):
1371        self.send_command(f'srp client service clear {instance_name} {service_name}')
1372        self._expect_done()
1373
1374    def srp_client_get_services(self):
1375        cmd = 'srp client service'
1376        self.send_command(cmd)
1377        service_lines = self._expect_command_output()
1378        return [self._parse_srp_client_service(line) for line in service_lines]
1379
1380    def srp_client_set_lease_interval(self, leaseinterval: int):
1381        cmd = f'srp client leaseinterval {leaseinterval}'
1382        self.send_command(cmd)
1383        self._expect_done()
1384
1385    def srp_client_get_lease_interval(self) -> int:
1386        cmd = 'srp client leaseinterval'
1387        self.send_command(cmd)
1388        return int(self._expect_result('\d+'))
1389
1390    def srp_client_set_key_lease_interval(self, leaseinterval: int):
1391        cmd = f'srp client keyleaseinterval {leaseinterval}'
1392        self.send_command(cmd)
1393        self._expect_done()
1394
1395    def srp_client_get_key_lease_interval(self) -> int:
1396        cmd = 'srp client keyleaseinterval'
1397        self.send_command(cmd)
1398        return int(self._expect_result('\d+'))
1399
1400    def srp_client_set_ttl(self, ttl: int):
1401        cmd = f'srp client ttl {ttl}'
1402        self.send_command(cmd)
1403        self._expect_done()
1404
1405    def srp_client_get_ttl(self) -> int:
1406        cmd = 'srp client ttl'
1407        self.send_command(cmd)
1408        return int(self._expect_result('\d+'))
1409
1410    #
1411    # TREL utilities
1412    #
1413
1414    def enable_trel(self):
1415        cmd = 'trel enable'
1416        self.send_command(cmd)
1417        self._expect_done()
1418
1419    def is_trel_enabled(self) -> Union[None, bool]:
1420        states = [r'Disabled', r'Enabled']
1421        self.send_command('trel')
1422        try:
1423            return self._expect_result(states) == 'Enabled'
1424        except Exception as ex:
1425            if 'InvalidCommand' in str(ex):
1426                return None
1427
1428            raise
1429
1430    def get_trel_counters(self):
1431        cmd = 'trel counters'
1432        self.send_command(cmd)
1433        result = self._expect_command_output()
1434
1435        counters = {}
1436        for line in result:
1437            m = re.match(r'(\w+)\:[^\d]+(\d+)[^\d]+(\d+)(?:[^\d]+(\d+))?', line)
1438            if m:
1439                groups = m.groups()
1440                sub_counters = {
1441                    'packets': int(groups[1]),
1442                    'bytes': int(groups[2]),
1443                }
1444                if groups[3]:
1445                    sub_counters['failures'] = int(groups[3])
1446                counters[groups[0]] = sub_counters
1447        return counters
1448
1449    def reset_trel_counters(self):
1450        cmd = 'trel counters reset'
1451        self.send_command(cmd)
1452        self._expect_done()
1453
1454    def get_trel_port(self):
1455        cmd = 'trel port'
1456        self.send_command(cmd)
1457        return int(self._expect_command_output()[0])
1458
1459    def set_epskc(self, keystring: str, timeout=120000, port=0):
1460        cmd = 'ba ephemeralkey set ' + keystring + ' ' + str(timeout) + ' ' + str(port)
1461        self.send_command(cmd)
1462        self._expect(r"(Done|Error .*)")
1463
1464    def clear_epskc(self):
1465        cmd = 'ba ephemeralkey clear'
1466        self.send_command(cmd)
1467        self._expect_done()
1468
1469    def get_border_agent_counters(self):
1470        cmd = 'ba counters'
1471        self.send_command(cmd)
1472        result = self._expect_command_output()
1473
1474        counters = {}
1475        for line in result:
1476            m = re.match(r'(\w+)\: (\d+)', line)
1477            if m:
1478                counter_name = m.group(1)
1479                counter_value = m.group(2)
1480
1481                counters[counter_name] = int(counter_value)
1482        return counters
1483
1484    def _encode_txt_entry(self, entry):
1485        """Encodes the TXT entry to the DNS-SD TXT record format as a HEX string.
1486
1487           Example usage:
1488           self._encode_txt_entries(['abc'])     -> '03616263'
1489           self._encode_txt_entries(['def='])    -> '046465663d'
1490           self._encode_txt_entries(['xyz=XYZ']) -> '0778797a3d58595a'
1491        """
1492        return '{:02x}'.format(len(entry)) + "".join("{:02x}".format(ord(c)) for c in entry)
1493
1494    def _parse_srp_client_service(self, line: str):
1495        """Parse one line of srp service list into a dictionary which
1496           maps string keys to string values.
1497
1498           Example output for input
1499           'instance:\"%s\", name:\"%s\", state:%s, port:%d, priority:%d, weight:%d"'
1500           {
1501               'instance': 'my-service',
1502               'name': '_ipps._udp',
1503               'state': 'ToAdd',
1504               'port': '12345',
1505               'priority': '0',
1506               'weight': '0'
1507           }
1508
1509           Note that value of 'port', 'priority' and 'weight' are represented
1510           as strings but not integers.
1511        """
1512        key_values = [word.strip().split(':') for word in line.split(', ')]
1513        keys = [key_value[0] for key_value in key_values]
1514        values = [key_value[1].strip('"') for key_value in key_values]
1515        return dict(zip(keys, values))
1516
1517    def locate(self, anycast_addr):
1518        cmd = 'locate ' + anycast_addr
1519        self.send_command(cmd)
1520        self.simulator.go(5)
1521        return self._parse_locate_result(self._expect_command_output()[0])
1522
1523    def _parse_locate_result(self, line: str):
1524        """Parse anycast locate result as list of ml-eid and rloc16.
1525
1526           Example output for input
1527           'fd00:db8:0:0:acf9:9d0:7f3c:b06e 0xa800'
1528
1529           [ 'fd00:db8:0:0:acf9:9d0:7f3c:b06e', '0xa800' ]
1530        """
1531        return line.split(' ')
1532
1533    def enable_backbone_router(self):
1534        cmd = 'bbr enable'
1535        self.send_command(cmd)
1536        self._expect_done()
1537
1538    def disable_backbone_router(self):
1539        cmd = 'bbr disable'
1540        self.send_command(cmd)
1541        self._expect_done()
1542
1543    def register_backbone_router(self):
1544        cmd = 'bbr register'
1545        self.send_command(cmd)
1546        self._expect_done()
1547
1548    def get_backbone_router_state(self):
1549        states = [r'Disabled', r'Primary', r'Secondary']
1550        self.send_command('bbr state')
1551        return self._expect_result(states)
1552
1553    @property
1554    def is_primary_backbone_router(self) -> bool:
1555        return self.get_backbone_router_state() == 'Primary'
1556
1557    def get_backbone_router(self):
1558        cmd = 'bbr config'
1559        self.send_command(cmd)
1560        self._expect(r'(.*)Done')
1561        g = self.pexpect.match.groups()
1562        output = g[0].decode("utf-8")
1563        lines = output.strip().split('\n')
1564        lines = [l.strip() for l in lines]
1565        ret = {}
1566        for l in lines:
1567            z = re.search(r'seqno:\s+([0-9]+)', l)
1568            if z:
1569                ret['seqno'] = int(z.groups()[0])
1570
1571            z = re.search(r'delay:\s+([0-9]+)', l)
1572            if z:
1573                ret['delay'] = int(z.groups()[0])
1574
1575            z = re.search(r'timeout:\s+([0-9]+)', l)
1576            if z:
1577                ret['timeout'] = int(z.groups()[0])
1578
1579        return ret
1580
1581    def set_backbone_router(self, seqno=None, reg_delay=None, mlr_timeout=None):
1582        cmd = 'bbr config'
1583
1584        if seqno is not None:
1585            cmd += ' seqno %d' % seqno
1586
1587        if reg_delay is not None:
1588            cmd += ' delay %d' % reg_delay
1589
1590        if mlr_timeout is not None:
1591            cmd += ' timeout %d' % mlr_timeout
1592
1593        self.send_command(cmd)
1594        self._expect_done()
1595
1596    def set_domain_prefix(self, prefix, flags='prosD'):
1597        self.add_prefix(prefix, flags)
1598        self.register_netdata()
1599
1600    def remove_domain_prefix(self, prefix):
1601        self.remove_prefix(prefix)
1602        self.register_netdata()
1603
1604    def set_next_dua_response(self, status: Union[str, int], iid=None):
1605        # Convert 5.00 to COAP CODE 160
1606        if isinstance(status, str):
1607            assert '.' in status
1608            status = status.split('.')
1609            status = (int(status[0]) << 5) + int(status[1])
1610
1611        cmd = 'bbr mgmt dua {}'.format(status)
1612        if iid is not None:
1613            cmd += ' ' + str(iid)
1614        self.send_command(cmd)
1615        self._expect_done()
1616
1617    def set_dua_iid(self, iid: str):
1618        assert len(iid) == 16
1619        int(iid, 16)
1620
1621        cmd = 'dua iid {}'.format(iid)
1622        self.send_command(cmd)
1623        self._expect_done()
1624
1625    def clear_dua_iid(self):
1626        cmd = 'dua iid clear'
1627        self.send_command(cmd)
1628        self._expect_done()
1629
1630    def multicast_listener_list(self) -> Dict[IPv6Address, int]:
1631        cmd = 'bbr mgmt mlr listener'
1632        self.send_command(cmd)
1633
1634        table = {}
1635        for line in self._expect_results("\S+ \d+"):
1636            line = line.split()
1637            assert len(line) == 2, line
1638            ip = IPv6Address(line[0])
1639            timeout = int(line[1])
1640            assert ip not in table
1641
1642            table[ip] = timeout
1643
1644        return table
1645
1646    def multicast_listener_clear(self):
1647        cmd = f'bbr mgmt mlr listener clear'
1648        self.send_command(cmd)
1649        self._expect_done()
1650
1651    def multicast_listener_add(self, ip: Union[IPv6Address, str], timeout: int = 0):
1652        if not isinstance(ip, IPv6Address):
1653            ip = IPv6Address(ip)
1654
1655        cmd = f'bbr mgmt mlr listener add {ip.compressed} {timeout}'
1656        self.send_command(cmd)
1657        self._expect(r"(Done|Error .*)")
1658
1659    def set_next_mlr_response(self, status: int):
1660        cmd = 'bbr mgmt mlr response {}'.format(status)
1661        self.send_command(cmd)
1662        self._expect_done()
1663
1664    def register_multicast_listener(self, *ipaddrs: Union[IPv6Address, str], timeout=None):
1665        assert len(ipaddrs) > 0, ipaddrs
1666
1667        ipaddrs = map(str, ipaddrs)
1668        cmd = f'mlr reg {" ".join(ipaddrs)}'
1669        if timeout is not None:
1670            cmd += f' {int(timeout)}'
1671        self.send_command(cmd)
1672        self.simulator.go(3)
1673        lines = self._expect_command_output()
1674        m = re.match(r'status (\d+), (\d+) failed', lines[0])
1675        assert m is not None, lines
1676        status = int(m.group(1))
1677        failed_num = int(m.group(2))
1678        assert failed_num == len(lines) - 1
1679        failed_ips = list(map(IPv6Address, lines[1:]))
1680        print(f"register_multicast_listener {ipaddrs} => status: {status}, failed ips: {failed_ips}")
1681        return status, failed_ips
1682
1683    def set_link_quality(self, addr, lqi):
1684        cmd = 'macfilter rss add-lqi %s %s' % (addr, lqi)
1685        self.send_command(cmd)
1686        self._expect_done()
1687
1688    def set_outbound_link_quality(self, lqi):
1689        cmd = 'macfilter rss add-lqi * %s' % (lqi)
1690        self.send_command(cmd)
1691        self._expect_done()
1692
1693    def remove_allowlist(self, addr):
1694        cmd = 'macfilter addr remove %s' % addr
1695        self.send_command(cmd)
1696        self._expect_done()
1697
1698    def get_addr16(self):
1699        self.send_command('rloc16')
1700        rloc16 = self._expect_result(r'[0-9a-fA-F]{4}')
1701        return int(rloc16, 16)
1702
1703    def get_router_id(self):
1704        rloc16 = self.get_addr16()
1705        return rloc16 >> 10
1706
1707    def get_addr64(self):
1708        self.send_command('extaddr')
1709        return self._expect_result('[0-9a-fA-F]{16}')
1710
1711    def set_addr64(self, addr64: str):
1712        # Make sure `addr64` is a hex string of length 16
1713        assert len(addr64) == 16
1714        int(addr64, 16)
1715        self.send_command('extaddr %s' % addr64)
1716        self._expect_done()
1717
1718    def get_eui64(self):
1719        self.send_command('eui64')
1720        return self._expect_result('[0-9a-fA-F]{16}')
1721
1722    def set_extpanid(self, extpanid):
1723        self.send_command('extpanid %s' % extpanid)
1724        self._expect_done()
1725
1726    def get_extpanid(self):
1727        self.send_command('extpanid')
1728        return self._expect_result('[0-9a-fA-F]{16}')
1729
1730    def get_mesh_local_prefix(self):
1731        self.send_command('prefix meshlocal')
1732        return self._expect_command_output()[0]
1733
1734    def set_mesh_local_prefix(self, mesh_local_prefix):
1735        self.send_command('prefix meshlocal %s' % mesh_local_prefix)
1736        self._expect_done()
1737
1738    def get_joiner_id(self):
1739        self.send_command('joiner id')
1740        return self._expect_result('[0-9a-fA-F]{16}')
1741
1742    def get_channel(self):
1743        self.send_command('channel')
1744        return int(self._expect_result(r'\d+'))
1745
1746    def set_channel(self, channel):
1747        cmd = 'channel %d' % channel
1748        self.send_command(cmd)
1749        self._expect_done()
1750
1751    def get_networkkey(self):
1752        self.send_command('networkkey')
1753        return self._expect_result('[0-9a-fA-F]{32}')
1754
1755    def set_networkkey(self, networkkey):
1756        cmd = 'networkkey %s' % networkkey
1757        self.send_command(cmd)
1758        self._expect_done()
1759
1760    def get_key_sequence_counter(self):
1761        self.send_command('keysequence counter')
1762        result = self._expect_result(r'\d+')
1763        return int(result)
1764
1765    def set_key_sequence_counter(self, key_sequence_counter):
1766        cmd = 'keysequence counter %d' % key_sequence_counter
1767        self.send_command(cmd)
1768        self._expect_done()
1769
1770    def get_key_switch_guardtime(self):
1771        self.send_command('keysequence guardtime')
1772        return int(self._expect_result(r'\d+'))
1773
1774    def set_key_switch_guardtime(self, key_switch_guardtime):
1775        cmd = 'keysequence guardtime %d' % key_switch_guardtime
1776        self.send_command(cmd)
1777        self._expect_done()
1778
1779    def set_network_id_timeout(self, network_id_timeout):
1780        cmd = 'networkidtimeout %d' % network_id_timeout
1781        self.send_command(cmd)
1782        self._expect_done()
1783
1784    def _escape_escapable(self, string):
1785        """Escape CLI escapable characters in the given string.
1786
1787        Args:
1788            string (str): UTF-8 input string.
1789
1790        Returns:
1791            [str]: The modified string with escaped characters.
1792        """
1793        escapable_chars = '\\ \t\r\n'
1794        for char in escapable_chars:
1795            string = string.replace(char, '\\%s' % char)
1796        return string
1797
1798    def get_network_name(self):
1799        self.send_command('networkname')
1800        return self._expect_result([r'\S+'])
1801
1802    def set_network_name(self, network_name):
1803        cmd = 'networkname %s' % self._escape_escapable(network_name)
1804        self.send_command(cmd)
1805        self._expect_done()
1806
1807    def get_panid(self):
1808        self.send_command('panid')
1809        result = self._expect_result('0x[0-9a-fA-F]{4}')
1810        return int(result, 16)
1811
1812    def set_panid(self, panid=config.PANID):
1813        cmd = 'panid %d' % panid
1814        self.send_command(cmd)
1815        self._expect_done()
1816
1817    def set_parent_priority(self, priority):
1818        cmd = 'parentpriority %d' % priority
1819        self.send_command(cmd)
1820        self._expect_done()
1821
1822    def get_partition_id(self):
1823        self.send_command('partitionid')
1824        return self._expect_result(r'\d+')
1825
1826    def get_preferred_partition_id(self):
1827        self.send_command('partitionid preferred')
1828        return self._expect_result(r'\d+')
1829
1830    def set_preferred_partition_id(self, partition_id):
1831        cmd = 'partitionid preferred %d' % partition_id
1832        self.send_command(cmd)
1833        self._expect_done()
1834
1835    def get_pollperiod(self):
1836        self.send_command('pollperiod')
1837        return self._expect_result(r'\d+')
1838
1839    def set_pollperiod(self, pollperiod):
1840        self.send_command('pollperiod %d' % pollperiod)
1841        self._expect_done()
1842
1843    def get_child_supervision_interval(self):
1844        self.send_command('childsupervision interval')
1845        return self._expect_result(r'\d+')
1846
1847    def set_child_supervision_interval(self, interval):
1848        self.send_command('childsupervision interval %d' % interval)
1849        self._expect_done()
1850
1851    def get_child_supervision_check_timeout(self):
1852        self.send_command('childsupervision checktimeout')
1853        return self._expect_result(r'\d+')
1854
1855    def set_child_supervision_check_timeout(self, timeout):
1856        self.send_command('childsupervision checktimeout %d' % timeout)
1857        self._expect_done()
1858
1859    def get_child_supervision_check_failure_counter(self):
1860        self.send_command('childsupervision failcounter')
1861        return self._expect_result(r'\d+')
1862
1863    def reset_child_supervision_check_failure_counter(self):
1864        self.send_command('childsupervision failcounter reset')
1865        self._expect_done()
1866
1867    def get_csl_info(self):
1868        self.send_command('csl')
1869        return self._expect_key_value_pairs(r'\S+')
1870
1871    def set_csl_channel(self, csl_channel):
1872        self.send_command('csl channel %d' % csl_channel)
1873        self._expect_done()
1874
1875    def set_csl_period(self, csl_period):
1876        self.send_command('csl period %d' % csl_period)
1877        self._expect_done()
1878
1879    def set_csl_timeout(self, csl_timeout):
1880        self.send_command('csl timeout %d' % csl_timeout)
1881        self._expect_done()
1882
1883    def send_mac_emptydata(self):
1884        self.send_command('mac send emptydata')
1885        self._expect_done()
1886
1887    def send_mac_datarequest(self):
1888        self.send_command('mac send datarequest')
1889        self._expect_done()
1890
1891    def set_router_upgrade_threshold(self, threshold):
1892        cmd = 'routerupgradethreshold %d' % threshold
1893        self.send_command(cmd)
1894        self._expect_done()
1895
1896    def set_router_downgrade_threshold(self, threshold):
1897        cmd = 'routerdowngradethreshold %d' % threshold
1898        self.send_command(cmd)
1899        self._expect_done()
1900
1901    def get_router_downgrade_threshold(self) -> int:
1902        self.send_command('routerdowngradethreshold')
1903        return int(self._expect_result(r'\d+'))
1904
1905    def set_router_eligible(self, enable: bool):
1906        cmd = f'routereligible {"enable" if enable else "disable"}'
1907        self.send_command(cmd)
1908        self._expect_done()
1909
1910    def get_router_eligible(self) -> bool:
1911        states = [r'Disabled', r'Enabled']
1912        self.send_command('routereligible')
1913        return self._expect_result(states) == 'Enabled'
1914
1915    def prefer_router_id(self, router_id):
1916        cmd = 'preferrouterid %d' % router_id
1917        self.send_command(cmd)
1918        self._expect_done()
1919
1920    def release_router_id(self, router_id):
1921        cmd = 'releaserouterid %d' % router_id
1922        self.send_command(cmd)
1923        self._expect_done()
1924
1925    def get_state(self):
1926        states = [r'detached', r'child', r'router', r'leader', r'disabled']
1927        self.send_command('state')
1928        return self._expect_result(states)
1929
1930    def set_state(self, state):
1931        cmd = 'state %s' % state
1932        self.send_command(cmd)
1933        self._expect_done()
1934
1935    def get_ephemeral_key_state(self):
1936        cmd = 'ba ephemeralkey'
1937        states = [r'inactive', r'active']
1938        self.send_command(cmd)
1939        return self._expect_result(states)
1940
1941    def get_timeout(self):
1942        self.send_command('childtimeout')
1943        return self._expect_result(r'\d+')
1944
1945    def set_timeout(self, timeout):
1946        cmd = 'childtimeout %d' % timeout
1947        self.send_command(cmd)
1948        self._expect_done()
1949
1950    def set_max_children(self, number):
1951        cmd = 'childmax %d' % number
1952        self.send_command(cmd)
1953        self._expect_done()
1954
1955    def get_weight(self):
1956        self.send_command('leaderweight')
1957        return self._expect_result(r'\d+')
1958
1959    def set_weight(self, weight):
1960        cmd = 'leaderweight %d' % weight
1961        self.send_command(cmd)
1962        self._expect_done()
1963
1964    def add_ipaddr(self, ipaddr):
1965        cmd = 'ipaddr add %s' % ipaddr
1966        self.send_command(cmd)
1967        self._expect_done()
1968
1969    def del_ipaddr(self, ipaddr):
1970        cmd = 'ipaddr del %s' % ipaddr
1971        self.send_command(cmd)
1972        self._expect_done()
1973
1974    def add_ipmaddr(self, ipmaddr):
1975        cmd = 'ipmaddr add %s' % ipmaddr
1976        self.send_command(cmd)
1977        self._expect_done()
1978
1979    def del_ipmaddr(self, ipmaddr):
1980        cmd = 'ipmaddr del %s' % ipmaddr
1981        self.send_command(cmd)
1982        self._expect_done()
1983
1984    def get_addrs(self, verbose=False):
1985        self.send_command('ipaddr' + (' -v' if verbose else ''))
1986
1987        return self._expect_results(r'\S+(:\S*)+')
1988
1989    def get_mleid(self):
1990        self.send_command('ipaddr mleid')
1991        return self._expect_result(r'\S+(:\S*)+')
1992
1993    def get_linklocal(self):
1994        self.send_command('ipaddr linklocal')
1995        return self._expect_result(r'\S+(:\S*)+')
1996
1997    def get_rloc(self):
1998        self.send_command('ipaddr rloc')
1999        return self._expect_result(r'\S+(:\S*)+')
2000
2001    def get_addr(self, prefix):
2002        network = ipaddress.ip_network(u'%s' % str(prefix))
2003        addrs = self.get_addrs()
2004
2005        for addr in addrs:
2006            if isinstance(addr, bytearray):
2007                addr = bytes(addr)
2008            ipv6_address = ipaddress.ip_address(addr)
2009            if ipv6_address in network:
2010                return ipv6_address.exploded
2011
2012        return None
2013
2014    def has_ipaddr(self, address):
2015        ipaddr = ipaddress.ip_address(address)
2016        ipaddrs = self.get_addrs()
2017        for addr in ipaddrs:
2018            if isinstance(addr, bytearray):
2019                addr = bytes(addr)
2020            if ipaddress.ip_address(addr) == ipaddr:
2021                return True
2022        return False
2023
2024    def get_ipmaddrs(self):
2025        self.send_command('ipmaddr')
2026        return self._expect_results(r'\S+(:\S*)+')
2027
2028    def has_ipmaddr(self, address):
2029        ipmaddr = ipaddress.ip_address(address)
2030        ipmaddrs = self.get_ipmaddrs()
2031        for addr in ipmaddrs:
2032            if isinstance(addr, bytearray):
2033                addr = bytes(addr)
2034            if ipaddress.ip_address(addr) == ipmaddr:
2035                return True
2036        return False
2037
2038    def get_addr_leader_aloc(self):
2039        addrs = self.get_addrs()
2040        for addr in addrs:
2041            segs = addr.split(':')
2042            if (segs[4] == '0' and segs[5] == 'ff' and segs[6] == 'fe00' and segs[7] == 'fc00'):
2043                return addr
2044        return None
2045
2046    def get_mleid_iid(self):
2047        ml_eid = IPv6Address(self.get_mleid())
2048        return ml_eid.packed[8:].hex()
2049
2050    def get_eidcaches(self):
2051        eidcaches = []
2052        self.send_command('eidcache')
2053        for line in self._expect_results(r'([a-fA-F0-9\:]+) ([a-fA-F0-9]+)'):
2054            eidcaches.append(line.split())
2055
2056        return eidcaches
2057
2058    def add_service(self, enterpriseNumber, serviceData, serverData):
2059        cmd = 'service add %s %s %s' % (
2060            enterpriseNumber,
2061            serviceData,
2062            serverData,
2063        )
2064        self.send_command(cmd)
2065        self._expect_done()
2066
2067    def remove_service(self, enterpriseNumber, serviceData):
2068        cmd = 'service remove %s %s' % (enterpriseNumber, serviceData)
2069        self.send_command(cmd)
2070        self._expect_done()
2071
2072    def get_child_table(self) -> Dict[int, Dict[str, Any]]:
2073        """Get the table of attached children."""
2074        cmd = 'child table'
2075        self.send_command(cmd)
2076        output = self._expect_command_output()
2077
2078        #
2079        # Example output:
2080        # | ID  | RLOC16 | Timeout    | Age        | LQ In | C_VN |R|D|N|Ver|CSL|QMsgCnt|Suprvsn| Extended MAC     |
2081        # +-----+--------+------------+------------+-------+------+-+-+-+---+---+-------+-------+------------------+
2082        # |   1 | 0xc801 |        240 |         24 |     3 |  131 |1|0|0|  3| 0 |     0 |   129 | 4ecede68435358ac |
2083        # |   2 | 0xc802 |        240 |          2 |     3 |  131 |0|0|0|  3| 1 |     0 |     0 | a672a601d2ce37d8 |
2084        # Done
2085        #
2086
2087        headers = self.__split_table_row(output[0])
2088
2089        table = {}
2090        for line in output[2:]:
2091            line = line.strip()
2092            if not line:
2093                continue
2094
2095            fields = self.__split_table_row(line)
2096            col = lambda colname: self.__get_table_col(colname, headers, fields)
2097
2098            id = int(col("ID"))
2099            r, d, n = int(col("R")), int(col("D")), int(col("N"))
2100            mode = f'{"r" if r else ""}{"d" if d else ""}{"n" if n else ""}'
2101
2102            table[int(id)] = {
2103                'id': int(id),
2104                'rloc16': int(col('RLOC16'), 16),
2105                'timeout': int(col('Timeout')),
2106                'age': int(col('Age')),
2107                'lq_in': int(col('LQ In')),
2108                'c_vn': int(col('C_VN')),
2109                'mode': mode,
2110                'extaddr': col('Extended MAC'),
2111                'ver': int(col('Ver')),
2112                'csl': bool(int(col('CSL'))),
2113                'qmsgcnt': int(col('QMsgCnt')),
2114                'suprvsn': int(col('Suprvsn'))
2115            }
2116
2117        return table
2118
2119    def __split_table_row(self, row: str) -> List[str]:
2120        if not (row.startswith('|') and row.endswith('|')):
2121            raise ValueError(row)
2122
2123        fields = row.split('|')
2124        fields = [x.strip() for x in fields[1:-1]]
2125        return fields
2126
2127    def __get_table_col(self, colname: str, headers: List[str], fields: List[str]) -> str:
2128        return fields[headers.index(colname)]
2129
2130    def __getOmrAddress(self):
2131        prefixes = [prefix.split('::')[0] for prefix in self.get_prefixes()]
2132        omr_addrs = []
2133        for addr in self.get_addrs():
2134            for prefix in prefixes:
2135                if (addr.startswith(prefix)) and (addr != self.__getDua()):
2136                    omr_addrs.append(addr)
2137                    break
2138
2139        return omr_addrs
2140
2141    def __getLinkLocalAddress(self):
2142        for ip6Addr in self.get_addrs():
2143            if re.match(config.LINK_LOCAL_REGEX_PATTERN, ip6Addr, re.I):
2144                return ip6Addr
2145
2146        return None
2147
2148    def __getGlobalAddress(self):
2149        global_address = []
2150        for ip6Addr in self.get_addrs():
2151            if ((not re.match(config.LINK_LOCAL_REGEX_PATTERN, ip6Addr, re.I)) and
2152                (not re.match(config.MESH_LOCAL_PREFIX_REGEX_PATTERN, ip6Addr, re.I)) and
2153                (not re.match(config.ROUTING_LOCATOR_REGEX_PATTERN, ip6Addr, re.I))):
2154                global_address.append(ip6Addr)
2155
2156        return global_address
2157
2158    def __getRloc(self):
2159        for ip6Addr in self.get_addrs():
2160            if (re.match(config.MESH_LOCAL_PREFIX_REGEX_PATTERN, ip6Addr, re.I) and
2161                    re.match(config.ROUTING_LOCATOR_REGEX_PATTERN, ip6Addr, re.I) and
2162                    not (re.match(config.ALOC_FLAG_REGEX_PATTERN, ip6Addr, re.I))):
2163                return ip6Addr
2164        return None
2165
2166    def __getAloc(self):
2167        aloc = []
2168        for ip6Addr in self.get_addrs():
2169            if (re.match(config.MESH_LOCAL_PREFIX_REGEX_PATTERN, ip6Addr, re.I) and
2170                    re.match(config.ROUTING_LOCATOR_REGEX_PATTERN, ip6Addr, re.I) and
2171                    re.match(config.ALOC_FLAG_REGEX_PATTERN, ip6Addr, re.I)):
2172                aloc.append(ip6Addr)
2173
2174        return aloc
2175
2176    def __getMleid(self):
2177        for ip6Addr in self.get_addrs():
2178            if re.match(config.MESH_LOCAL_PREFIX_REGEX_PATTERN, ip6Addr,
2179                        re.I) and not (re.match(config.ROUTING_LOCATOR_REGEX_PATTERN, ip6Addr, re.I)):
2180                return ip6Addr
2181
2182        return None
2183
2184    def __getDua(self) -> Optional[str]:
2185        for ip6Addr in self.get_addrs():
2186            if re.match(config.DOMAIN_PREFIX_REGEX_PATTERN, ip6Addr, re.I):
2187                return ip6Addr
2188
2189        return None
2190
2191    def get_ip6_address_by_prefix(self, prefix: Union[str, IPv6Network]) -> List[IPv6Address]:
2192        """Get addresses matched with given prefix.
2193
2194        Args:
2195            prefix: the prefix to match against.
2196                    Can be either a string or ipaddress.IPv6Network.
2197
2198        Returns:
2199            The IPv6 address list.
2200        """
2201        if isinstance(prefix, str):
2202            prefix = IPv6Network(prefix)
2203        addrs = map(IPv6Address, self.get_addrs())
2204
2205        return [addr for addr in addrs if addr in prefix]
2206
2207    def get_ip6_address(self, address_type):
2208        """Get specific type of IPv6 address configured on thread device.
2209
2210        Args:
2211            address_type: the config.ADDRESS_TYPE type of IPv6 address.
2212
2213        Returns:
2214            IPv6 address string.
2215        """
2216        if address_type == config.ADDRESS_TYPE.LINK_LOCAL:
2217            return self.__getLinkLocalAddress()
2218        elif address_type == config.ADDRESS_TYPE.GLOBAL:
2219            return self.__getGlobalAddress()
2220        elif address_type == config.ADDRESS_TYPE.RLOC:
2221            return self.__getRloc()
2222        elif address_type == config.ADDRESS_TYPE.ALOC:
2223            return self.__getAloc()
2224        elif address_type == config.ADDRESS_TYPE.ML_EID:
2225            return self.__getMleid()
2226        elif address_type == config.ADDRESS_TYPE.DUA:
2227            return self.__getDua()
2228        elif address_type == config.ADDRESS_TYPE.BACKBONE_GUA:
2229            return self._getBackboneGua()
2230        elif address_type == config.ADDRESS_TYPE.OMR:
2231            return self.__getOmrAddress()
2232        else:
2233            return None
2234
2235    def get_context_reuse_delay(self):
2236        self.send_command('contextreusedelay')
2237        return self._expect_result(r'\d+')
2238
2239    def set_context_reuse_delay(self, delay):
2240        cmd = 'contextreusedelay %d' % delay
2241        self.send_command(cmd)
2242        self._expect_done()
2243
2244    def add_prefix(self, prefix, flags='paosr', prf='med'):
2245        cmd = 'prefix add %s %s %s' % (prefix, flags, prf)
2246        self.send_command(cmd)
2247        self._expect_done()
2248
2249    def remove_prefix(self, prefix):
2250        cmd = 'prefix remove %s' % prefix
2251        self.send_command(cmd)
2252        self._expect_done()
2253
2254    #
2255    # BR commands
2256    #
2257    def enable_br(self):
2258        self.send_command('br enable')
2259        self._expect_done()
2260
2261    def disable_br(self):
2262        self.send_command('br disable')
2263        self._expect_done()
2264
2265    def get_br_omr_prefix(self):
2266        cmd = 'br omrprefix local'
2267        self.send_command(cmd)
2268        return self._expect_command_output()[0]
2269
2270    def get_br_peers(self) -> List[str]:
2271        # Example output of `br peers` command:
2272        #   rloc16:0xa800 age:00:00:50
2273        #   rloc16:0x6800 age:00:00:51
2274        #   Done
2275        self.send_command('br peers')
2276        return self._expect_command_output()
2277
2278    def get_br_peers_rloc16s(self) -> List[int]:
2279        """parse `br peers` output and return the list of RLOC16s"""
2280        return [
2281            int(pair.split(':')[1], 16)
2282            for line in self.get_br_peers()
2283            for pair in line.split()
2284            if pair.split(':')[0] == 'rloc16'
2285        ]
2286
2287    def get_br_routers(self) -> List[str]:
2288        # Example output of `br routers` command:
2289        #   fe80:0:0:0:42:acff:fe14:3 (M:0 O:0 S:1) ms-since-rx:144160 reachable:yes age:00:17:36 (peer BR)
2290        #   fe80:0:0:0:42:acff:fe14:2 (M:0 O:0 S:1) ms-since-rx:45179 reachable:yes age:00:17:36
2291        #   Done
2292        self.send_command('br routers')
2293        return self._expect_command_output()
2294
2295    def get_br_routers_ip_addresses(self) -> List[IPv6Address]:
2296        """parse `br routers` output and return the list of IPv6 addresses"""
2297        return [IPv6Address(line.split()[0]) for line in self.get_br_routers()]
2298
2299    def get_netdata_omr_prefixes(self):
2300        omr_prefixes = []
2301        for prefix in self.get_prefixes():
2302            prefix, flags = prefix.split()[:2]
2303            if 'a' in flags and 'o' in flags and 's' in flags and 'D' not in flags:
2304                omr_prefixes.append(prefix)
2305
2306        return omr_prefixes
2307
2308    def get_br_on_link_prefix(self):
2309        cmd = 'br onlinkprefix local'
2310        self.send_command(cmd)
2311        return self._expect_command_output()[0]
2312
2313    def pd_get_prefix(self):
2314        cmd = 'br pd omrprefix'
2315        self.send_command(cmd)
2316        return self._expect_command_output()[0].split(" ")[0]
2317
2318    def pd_set_enabled(self, enable):
2319        self.send_command('br pd {}'.format("enable" if enable else "disable"))
2320        self._expect_done()
2321
2322    @property
2323    def pd_state(self):
2324        self.send_command('br pd state')
2325        return self._expect_command_output()[0].strip()
2326
2327    def get_netdata_non_nat64_routes(self):
2328        nat64_routes = []
2329        routes = self.get_routes()
2330        for route in routes:
2331            if 'n' not in route.split(' ')[1]:
2332                nat64_routes.append(route.split(' ')[0])
2333        return nat64_routes
2334
2335    def get_netdata_nat64_routes(self):
2336        nat64_routes = []
2337        routes = self.get_routes()
2338        for route in routes:
2339            if 'n' in route.split(' ')[1]:
2340                nat64_routes.append(route.split(' ')[0])
2341        return nat64_routes
2342
2343    def get_br_nat64_prefix(self):
2344        cmd = 'br nat64prefix local'
2345        self.send_command(cmd)
2346        return self._expect_command_output()[0]
2347
2348    def get_br_favored_nat64_prefix(self):
2349        cmd = 'br nat64prefix favored'
2350        self.send_command(cmd)
2351        return self._expect_command_output()[0].split(' ')[0]
2352
2353    def enable_nat64(self):
2354        self.send_command(f'nat64 enable')
2355        self._expect_done()
2356
2357    def disable_nat64(self):
2358        self.send_command(f'nat64 disable')
2359        self._expect_done()
2360
2361    def get_nat64_state(self):
2362        self.send_command('nat64 state')
2363        res = {}
2364        for line in self._expect_command_output():
2365            state = line.split(':')
2366            res[state[0].strip()] = state[1].strip()
2367        return res
2368
2369    def get_nat64_mappings(self):
2370        cmd = 'nat64 mappings'
2371        self.send_command(cmd)
2372        result = self._expect_command_output()
2373        session = None
2374        session_counters = None
2375        sessions = []
2376
2377        for line in result:
2378            m = re.match(
2379                r'\|\s+([a-f0-9]+)\s+\|\s+(.+)\s+\|\s+(.+)\s+\|\s+(\d+)s\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|',
2380                line)
2381            if m:
2382                groups = m.groups()
2383                if session:
2384                    session['counters'] = session_counters
2385                    sessions.append(session)
2386                session = {
2387                    'id': groups[0],
2388                    'ip6': groups[1],
2389                    'ip4': groups[2],
2390                    'expiry': int(groups[3]),
2391                }
2392                session_counters = {}
2393                session_counters['total'] = {
2394                    '4to6': {
2395                        'packets': int(groups[4]),
2396                        'bytes': int(groups[5]),
2397                    },
2398                    '6to4': {
2399                        'packets': int(groups[6]),
2400                        'bytes': int(groups[7]),
2401                    },
2402                }
2403                continue
2404            if not session:
2405                continue
2406            m = re.match(r'\|\s+\|\s+(.+)\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|', line)
2407            if m:
2408                groups = m.groups()
2409                session_counters[groups[0]] = {
2410                    '4to6': {
2411                        'packets': int(groups[1]),
2412                        'bytes': int(groups[2]),
2413                    },
2414                    '6to4': {
2415                        'packets': int(groups[3]),
2416                        'bytes': int(groups[4]),
2417                    },
2418                }
2419        if session:
2420            session['counters'] = session_counters
2421            sessions.append(session)
2422        return sessions
2423
2424    def get_nat64_counters(self):
2425        cmd = 'nat64 counters'
2426        self.send_command(cmd)
2427        result = self._expect_command_output()
2428
2429        protocol_counters = {}
2430        error_counters = {}
2431        for line in result:
2432            m = re.match(r'\|\s+(.+)\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|', line)
2433            if m:
2434                groups = m.groups()
2435                protocol_counters[groups[0]] = {
2436                    '4to6': {
2437                        'packets': int(groups[1]),
2438                        'bytes': int(groups[2]),
2439                    },
2440                    '6to4': {
2441                        'packets': int(groups[3]),
2442                        'bytes': int(groups[4]),
2443                    },
2444                }
2445                continue
2446            m = re.match(r'\|\s+(.+)\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|', line)
2447            if m:
2448                groups = m.groups()
2449                error_counters[groups[0]] = {
2450                    '4to6': {
2451                        'packets': int(groups[1]),
2452                    },
2453                    '6to4': {
2454                        'packets': int(groups[2]),
2455                    },
2456                }
2457                continue
2458        return {'protocol': protocol_counters, 'errors': error_counters}
2459
2460    def get_prefixes(self):
2461        return self.get_netdata()['Prefixes']
2462
2463    def get_routes(self):
2464        return self.get_netdata()['Routes']
2465
2466    def get_services(self):
2467        netdata = self.netdata_show()
2468        services = []
2469        services_section = False
2470
2471        for line in netdata:
2472            if line.startswith('Services:'):
2473                services_section = True
2474            elif line.startswith('Contexts'):
2475                services_section = False
2476            elif services_section:
2477                services.append(line.strip().split(' '))
2478        return services
2479
2480    def netdata_show(self):
2481        self.send_command('netdata show')
2482        return self._expect_command_output()
2483
2484    def get_netdata(self):
2485        raw_netdata = self.netdata_show()
2486        netdata = {'Prefixes': [], 'Routes': [], 'Services': [], 'Contexts': [], 'Commissioning': []}
2487        key_list = ['Prefixes', 'Routes', 'Services', 'Contexts', 'Commissioning']
2488        key = None
2489
2490        for i in range(0, len(raw_netdata)):
2491            keys = list(filter(raw_netdata[i].startswith, key_list))
2492            if keys != []:
2493                key = keys[0]
2494            elif key is not None:
2495                netdata[key].append(raw_netdata[i])
2496
2497        return netdata
2498
2499    def add_route(self, prefix, stable=False, nat64=False, prf='med'):
2500        cmd = 'route add %s ' % prefix
2501        if stable:
2502            cmd += 's'
2503        if nat64:
2504            cmd += 'n'
2505        cmd += ' %s' % prf
2506        self.send_command(cmd)
2507        self._expect_done()
2508
2509    def remove_route(self, prefix):
2510        cmd = 'route remove %s' % prefix
2511        self.send_command(cmd)
2512        self._expect_done()
2513
2514    def register_netdata(self):
2515        self.send_command('netdata register')
2516        self._expect_done()
2517
2518    def netdata_publish_dnssrp_anycast(self, seqnum, version=0):
2519        self.send_command(f'netdata publish dnssrp anycast {seqnum} {version}')
2520        self._expect_done()
2521
2522    def netdata_publish_dnssrp_unicast(self, address, port, version=0):
2523        self.send_command(f'netdata publish dnssrp unicast {address} {port} {version}')
2524        self._expect_done()
2525
2526    def netdata_publish_dnssrp_unicast_mleid(self, port, version=0):
2527        self.send_command(f'netdata publish dnssrp unicast {port} {version}')
2528        self._expect_done()
2529
2530    def netdata_unpublish_dnssrp(self):
2531        self.send_command('netdata unpublish dnssrp')
2532        self._expect_done()
2533
2534    def netdata_publish_prefix(self, prefix, flags='paosr', prf='med'):
2535        self.send_command(f'netdata publish prefix {prefix} {flags} {prf}')
2536        self._expect_done()
2537
2538    def netdata_publish_route(self, prefix, flags='s', prf='med'):
2539        self.send_command(f'netdata publish route {prefix} {flags} {prf}')
2540        self._expect_done()
2541
2542    def netdata_publish_replace(self, old_prefix, prefix, flags='s', prf='med'):
2543        self.send_command(f'netdata publish replace {old_prefix} {prefix} {flags} {prf}')
2544        self._expect_done()
2545
2546    def netdata_unpublish_prefix(self, prefix):
2547        self.send_command(f'netdata unpublish {prefix}')
2548        self._expect_done()
2549
2550    def send_network_diag_get(self, addr, tlv_types):
2551        self.send_command('networkdiagnostic get %s %s' % (addr, ' '.join([str(t.value) for t in tlv_types])))
2552
2553        if isinstance(self.simulator, simulator.VirtualTime):
2554            self.simulator.go(8)
2555            timeout = 1
2556        else:
2557            timeout = 8
2558
2559        self._expect_done(timeout=timeout)
2560
2561    def send_network_diag_reset(self, addr, tlv_types):
2562        self.send_command('networkdiagnostic reset %s %s' % (addr, ' '.join([str(t.value) for t in tlv_types])))
2563
2564        if isinstance(self.simulator, simulator.VirtualTime):
2565            self.simulator.go(8)
2566            timeout = 1
2567        else:
2568            timeout = 8
2569
2570        self._expect_done(timeout=timeout)
2571
2572    def energy_scan(self, mask, count, period, scan_duration, ipaddr):
2573        cmd = 'commissioner energy %d %d %d %d %s' % (
2574            mask,
2575            count,
2576            period,
2577            scan_duration,
2578            ipaddr,
2579        )
2580        self.send_command(cmd)
2581
2582        if isinstance(self.simulator, simulator.VirtualTime):
2583            self.simulator.go(8)
2584            timeout = 1
2585        else:
2586            timeout = 8
2587
2588        self._expect('Energy:', timeout=timeout)
2589
2590    def panid_query(self, panid, mask, ipaddr):
2591        cmd = 'commissioner panid %d %d %s' % (panid, mask, ipaddr)
2592        self.send_command(cmd)
2593
2594        if isinstance(self.simulator, simulator.VirtualTime):
2595            self.simulator.go(8)
2596            timeout = 1
2597        else:
2598            timeout = 8
2599
2600        self._expect('Conflict:', timeout=timeout)
2601
2602    def scan(self, result=1, timeout=10):
2603        self.send_command('scan')
2604
2605        self.simulator.go(timeout)
2606
2607        if result == 1:
2608            networks = []
2609            for line in self._expect_command_output()[2:]:
2610                _, panid, extaddr, channel, dbm, lqi, _ = map(str.strip, line.split('|'))
2611                panid = int(panid, 16)
2612                channel, dbm, lqi = map(int, (channel, dbm, lqi))
2613
2614                networks.append({
2615                    'panid': panid,
2616                    'extaddr': extaddr,
2617                    'channel': channel,
2618                    'dbm': dbm,
2619                    'lqi': lqi,
2620                })
2621            return networks
2622
2623    def scan_energy(self, timeout=10):
2624        self.send_command('scan energy')
2625        self.simulator.go(timeout)
2626        rssi_list = []
2627        for line in self._expect_command_output()[2:]:
2628            _, channel, rssi, _ = line.split('|')
2629            rssi_list.append({
2630                'channel': int(channel.strip()),
2631                'rssi': int(rssi.strip()),
2632            })
2633        return rssi_list
2634
2635    def ping(self, ipaddr, num_responses=1, size=8, timeout=5, count=1, interval=1, hoplimit=64, interface=None):
2636        args = f'{ipaddr} {size} {count} {interval} {hoplimit} {timeout}'
2637        if interface is not None:
2638            args = f'-I {interface} {args}'
2639        cmd = f'ping {args}'
2640
2641        self.send_command(cmd)
2642
2643        wait_allowance = 3
2644        end = self.simulator.now() + timeout + wait_allowance
2645
2646        responders = {}
2647
2648        result = True
2649        # ncp-sim doesn't print Done
2650        done = (self.node_type == 'ncp-sim')
2651        while len(responders) < num_responses or not done:
2652            self.simulator.go(1)
2653            try:
2654                i = self._expect([r'from (\S+):', r'Done'], timeout=0.1)
2655            except (pexpect.TIMEOUT, socket.timeout):
2656                if self.simulator.now() < end:
2657                    continue
2658                result = False
2659                if isinstance(self.simulator, simulator.VirtualTime):
2660                    self.simulator.sync_devices()
2661                break
2662            else:
2663                if i == 0:
2664                    responders[self.pexpect.match.groups()[0]] = 1
2665                elif i == 1:
2666                    done = True
2667        return result
2668
2669    def reset(self):
2670        self._reset('reset')
2671
2672    def factory_reset(self):
2673        self._reset('factoryreset')
2674
2675    def _reset(self, cmd):
2676        self.send_command(cmd, expect_command_echo=False)
2677        time.sleep(self.RESET_DELAY)
2678        # Send a "version" command and drain the CLI output after reset
2679        self.send_command('version', expect_command_echo=False)
2680        while True:
2681            try:
2682                self._expect(r"[^\n]+\n", timeout=0.1)
2683                continue
2684            except pexpect.TIMEOUT:
2685                break
2686
2687        if self.is_otbr:
2688            self.set_log_level(5)
2689
2690    def set_router_selection_jitter(self, jitter):
2691        cmd = 'routerselectionjitter %d' % jitter
2692        self.send_command(cmd)
2693        self._expect_done()
2694
2695    def set_active_dataset(
2696        self,
2697        timestamp=None,
2698        channel=None,
2699        channel_mask=None,
2700        extended_panid=None,
2701        mesh_local_prefix=None,
2702        network_key=None,
2703        network_name=None,
2704        panid=None,
2705        pskc=None,
2706        security_policy=[],
2707        updateExisting=False,
2708    ):
2709
2710        if updateExisting:
2711            self.send_command('dataset init active', go=False)
2712        else:
2713            self.send_command('dataset clear', go=False)
2714        self._expect_done()
2715
2716        if timestamp is not None:
2717            cmd = 'dataset activetimestamp %d' % timestamp
2718            self.send_command(cmd, go=False)
2719            self._expect_done()
2720
2721        if channel is not None:
2722            cmd = 'dataset channel %d' % channel
2723            self.send_command(cmd, go=False)
2724            self._expect_done()
2725
2726        if channel_mask is not None:
2727            cmd = 'dataset channelmask %d' % channel_mask
2728            self.send_command(cmd, go=False)
2729            self._expect_done()
2730
2731        if extended_panid is not None:
2732            cmd = 'dataset extpanid %s' % extended_panid
2733            self.send_command(cmd, go=False)
2734            self._expect_done()
2735
2736        if mesh_local_prefix is not None:
2737            cmd = 'dataset meshlocalprefix %s' % mesh_local_prefix
2738            self.send_command(cmd, go=False)
2739            self._expect_done()
2740
2741        if network_key is not None:
2742            cmd = 'dataset networkkey %s' % network_key
2743            self.send_command(cmd, go=False)
2744            self._expect_done()
2745
2746        if network_name is not None:
2747            cmd = 'dataset networkname %s' % network_name
2748            self.send_command(cmd, go=False)
2749            self._expect_done()
2750
2751        if panid is not None:
2752            cmd = 'dataset panid %d' % panid
2753            self.send_command(cmd, go=False)
2754            self._expect_done()
2755
2756        if pskc is not None:
2757            cmd = 'dataset pskc %s' % pskc
2758            self.send_command(cmd, go=False)
2759            self._expect_done()
2760
2761        if security_policy is not None:
2762            if len(security_policy) >= 2:
2763                cmd = 'dataset securitypolicy %s %s' % (
2764                    str(security_policy[0]),
2765                    security_policy[1],
2766                )
2767            if len(security_policy) >= 3:
2768                cmd += ' %s' % (str(security_policy[2]))
2769            self.send_command(cmd, go=False)
2770            self._expect_done()
2771
2772        self.send_command('dataset commit active', go=False)
2773        self._expect_done()
2774
2775    def set_pending_dataset(self, pendingtimestamp, activetimestamp, panid=None, channel=None, delay=None):
2776        self.send_command('dataset clear')
2777        self._expect_done()
2778
2779        cmd = 'dataset pendingtimestamp %d' % pendingtimestamp
2780        self.send_command(cmd)
2781        self._expect_done()
2782
2783        cmd = 'dataset activetimestamp %d' % activetimestamp
2784        self.send_command(cmd)
2785        self._expect_done()
2786
2787        if panid is not None:
2788            cmd = 'dataset panid %d' % panid
2789            self.send_command(cmd)
2790            self._expect_done()
2791
2792        if channel is not None:
2793            cmd = 'dataset channel %d' % channel
2794            self.send_command(cmd)
2795            self._expect_done()
2796
2797        if delay is not None:
2798            cmd = 'dataset delay %d' % delay
2799            self.send_command(cmd)
2800            self._expect_done()
2801
2802        # Set the meshlocal prefix in config.py
2803        self.send_command('dataset meshlocalprefix %s' % config.MESH_LOCAL_PREFIX.split('/')[0])
2804        self._expect_done()
2805
2806        self.send_command('dataset commit pending')
2807        self._expect_done()
2808
2809    def start_dataset_updater(self, panid=None, channel=None, security_policy=None, delay=None):
2810        self.send_command('dataset clear')
2811        self._expect_done()
2812
2813        if panid is not None:
2814            cmd = 'dataset panid %d' % panid
2815            self.send_command(cmd)
2816            self._expect_done()
2817
2818        if channel is not None:
2819            cmd = 'dataset channel %d' % channel
2820            self.send_command(cmd)
2821            self._expect_done()
2822
2823        if security_policy is not None:
2824            cmd = 'dataset securitypolicy %d %s ' % (security_policy[0], security_policy[1])
2825            if (len(security_policy) >= 3):
2826                cmd += '%d ' % (security_policy[2])
2827            self.send_command(cmd)
2828            self._expect_done()
2829
2830        if delay is not None:
2831            cmd = 'dataset delay %d ' % delay
2832            self.send_command(cmd)
2833            self._expect_done()
2834
2835        self.send_command('dataset updater start')
2836        self._expect_done()
2837
2838    def announce_begin(self, mask, count, period, ipaddr):
2839        cmd = 'commissioner announce %d %d %d %s' % (
2840            mask,
2841            count,
2842            period,
2843            ipaddr,
2844        )
2845        self.send_command(cmd)
2846        self._expect_done()
2847
2848    def send_mgmt_active_set(
2849        self,
2850        active_timestamp=None,
2851        channel=None,
2852        channel_mask=None,
2853        extended_panid=None,
2854        panid=None,
2855        network_key=None,
2856        mesh_local=None,
2857        network_name=None,
2858        security_policy=None,
2859        binary=None,
2860    ):
2861        cmd = 'dataset mgmtsetcommand active '
2862
2863        if active_timestamp is not None:
2864            cmd += 'activetimestamp %d ' % active_timestamp
2865
2866        if channel is not None:
2867            cmd += 'channel %d ' % channel
2868
2869        if channel_mask is not None:
2870            cmd += 'channelmask %d ' % channel_mask
2871
2872        if extended_panid is not None:
2873            cmd += 'extpanid %s ' % extended_panid
2874
2875        if panid is not None:
2876            cmd += 'panid %d ' % panid
2877
2878        if network_key is not None:
2879            cmd += 'networkkey %s ' % network_key
2880
2881        if mesh_local is not None:
2882            cmd += 'localprefix %s ' % mesh_local
2883
2884        if network_name is not None:
2885            cmd += 'networkname %s ' % self._escape_escapable(network_name)
2886
2887        if security_policy is not None:
2888            cmd += 'securitypolicy %d %s ' % (security_policy[0], security_policy[1])
2889            if (len(security_policy) >= 3):
2890                cmd += '%d ' % (security_policy[2])
2891
2892        if binary is not None:
2893            cmd += '-x %s ' % binary
2894
2895        self.send_command(cmd)
2896        self._expect_done()
2897
2898    def send_mgmt_active_get(self, addr='', tlvs=[]):
2899        cmd = 'dataset mgmtgetcommand active'
2900
2901        if addr != '':
2902            cmd += ' address '
2903            cmd += addr
2904
2905        if len(tlvs) != 0:
2906            tlv_str = ''.join('%02x' % tlv for tlv in tlvs)
2907            cmd += ' -x '
2908            cmd += tlv_str
2909
2910        self.send_command(cmd)
2911        self._expect_done()
2912
2913    def send_mgmt_pending_get(self, addr='', tlvs=[]):
2914        cmd = 'dataset mgmtgetcommand pending'
2915
2916        if addr != '':
2917            cmd += ' address '
2918            cmd += addr
2919
2920        if len(tlvs) != 0:
2921            tlv_str = ''.join('%02x' % tlv for tlv in tlvs)
2922            cmd += ' -x '
2923            cmd += tlv_str
2924
2925        self.send_command(cmd)
2926        self._expect_done()
2927
2928    def send_mgmt_pending_set(
2929        self,
2930        pending_timestamp=None,
2931        active_timestamp=None,
2932        delay_timer=None,
2933        channel=None,
2934        panid=None,
2935        network_key=None,
2936        mesh_local=None,
2937        network_name=None,
2938    ):
2939        cmd = 'dataset mgmtsetcommand pending '
2940        if pending_timestamp is not None:
2941            cmd += 'pendingtimestamp %d ' % pending_timestamp
2942
2943        if active_timestamp is not None:
2944            cmd += 'activetimestamp %d ' % active_timestamp
2945
2946        if delay_timer is not None:
2947            cmd += 'delaytimer %d ' % delay_timer
2948
2949        if channel is not None:
2950            cmd += 'channel %d ' % channel
2951
2952        if panid is not None:
2953            cmd += 'panid %d ' % panid
2954
2955        if network_key is not None:
2956            cmd += 'networkkey %s ' % network_key
2957
2958        if mesh_local is not None:
2959            cmd += 'localprefix %s ' % mesh_local
2960
2961        if network_name is not None:
2962            cmd += 'networkname %s ' % self._escape_escapable(network_name)
2963
2964        self.send_command(cmd)
2965        self._expect_done()
2966
2967    def coap_cancel(self):
2968        """
2969        Cancel a CoAP subscription.
2970        """
2971        cmd = 'coap cancel'
2972        self.send_command(cmd)
2973        self._expect_done()
2974
2975    def coap_delete(self, ipaddr, uri, con=False, payload=None):
2976        """
2977        Send a DELETE request via CoAP.
2978        """
2979        return self._coap_rq('delete', ipaddr, uri, con, payload)
2980
2981    def coap_get(self, ipaddr, uri, con=False, payload=None):
2982        """
2983        Send a GET request via CoAP.
2984        """
2985        return self._coap_rq('get', ipaddr, uri, con, payload)
2986
2987    def coap_get_block(self, ipaddr, uri, size=16, count=0):
2988        """
2989        Send a GET request via CoAP.
2990        """
2991        return self._coap_rq_block('get', ipaddr, uri, size, count)
2992
2993    def coap_observe(self, ipaddr, uri, con=False, payload=None):
2994        """
2995        Send a GET request via CoAP with Observe set.
2996        """
2997        return self._coap_rq('observe', ipaddr, uri, con, payload)
2998
2999    def coap_post(self, ipaddr, uri, con=False, payload=None):
3000        """
3001        Send a POST request via CoAP.
3002        """
3003        return self._coap_rq('post', ipaddr, uri, con, payload)
3004
3005    def coap_post_block(self, ipaddr, uri, size=16, count=0):
3006        """
3007        Send a POST request via CoAP.
3008        """
3009        return self._coap_rq_block('post', ipaddr, uri, size, count)
3010
3011    def coap_put(self, ipaddr, uri, con=False, payload=None):
3012        """
3013        Send a PUT request via CoAP.
3014        """
3015        return self._coap_rq('put', ipaddr, uri, con, payload)
3016
3017    def coap_put_block(self, ipaddr, uri, size=16, count=0):
3018        """
3019        Send a PUT request via CoAP.
3020        """
3021        return self._coap_rq_block('put', ipaddr, uri, size, count)
3022
3023    def _coap_rq(self, method, ipaddr, uri, con=False, payload=None):
3024        """
3025        Issue a GET/POST/PUT/DELETE/GET OBSERVE request.
3026        """
3027        cmd = 'coap %s %s %s' % (method, ipaddr, uri)
3028        if con:
3029            cmd += ' con'
3030        else:
3031            cmd += ' non'
3032
3033        if payload is not None:
3034            cmd += ' %s' % payload
3035
3036        self.send_command(cmd)
3037        return self.coap_wait_response()
3038
3039    def _coap_rq_block(self, method, ipaddr, uri, size=16, count=0):
3040        """
3041        Issue a GET/POST/PUT/DELETE/GET OBSERVE BLOCK request.
3042        """
3043        cmd = 'coap %s %s %s' % (method, ipaddr, uri)
3044
3045        cmd += ' block-%d' % size
3046
3047        if count != 0:
3048            cmd += ' %d' % count
3049
3050        self.send_command(cmd)
3051        return self.coap_wait_response()
3052
3053    def coap_wait_response(self):
3054        """
3055        Wait for a CoAP response, and return it.
3056        """
3057        if isinstance(self.simulator, simulator.VirtualTime):
3058            self.simulator.go(5)
3059            timeout = 1
3060        else:
3061            timeout = 5
3062
3063        self._expect(r'coap response from ([\da-f:]+)(?: OBS=(\d+))?'
3064                     r'(?: with payload: ([\da-f]+))?\b',
3065                     timeout=timeout)
3066        (source, observe, payload) = self.pexpect.match.groups()
3067        source = source.decode('UTF-8')
3068
3069        if observe is not None:
3070            observe = int(observe, base=10)
3071
3072        if payload is not None:
3073            try:
3074                payload = binascii.a2b_hex(payload).decode('UTF-8')
3075            except UnicodeDecodeError:
3076                pass
3077
3078        # Return the values received
3079        return dict(source=source, observe=observe, payload=payload)
3080
3081    def coap_wait_request(self):
3082        """
3083        Wait for a CoAP request to be made.
3084        """
3085        if isinstance(self.simulator, simulator.VirtualTime):
3086            self.simulator.go(5)
3087            timeout = 1
3088        else:
3089            timeout = 5
3090
3091        self._expect(r'coap request from ([\da-f:]+)(?: OBS=(\d+))?'
3092                     r'(?: with payload: ([\da-f]+))?\b',
3093                     timeout=timeout)
3094        (source, observe, payload) = self.pexpect.match.groups()
3095        source = source.decode('UTF-8')
3096
3097        if observe is not None:
3098            observe = int(observe, base=10)
3099
3100        if payload is not None:
3101            payload = binascii.a2b_hex(payload).decode('UTF-8')
3102
3103        # Return the values received
3104        return dict(source=source, observe=observe, payload=payload)
3105
3106    def coap_wait_subscribe(self):
3107        """
3108        Wait for a CoAP client to be subscribed.
3109        """
3110        if isinstance(self.simulator, simulator.VirtualTime):
3111            self.simulator.go(5)
3112            timeout = 1
3113        else:
3114            timeout = 5
3115
3116        self._expect(r'Subscribing client\b', timeout=timeout)
3117
3118    def coap_wait_ack(self):
3119        """
3120        Wait for a CoAP notification ACK.
3121        """
3122        if isinstance(self.simulator, simulator.VirtualTime):
3123            self.simulator.go(5)
3124            timeout = 1
3125        else:
3126            timeout = 5
3127
3128        self._expect(r'Received ACK in reply to notification from ([\da-f:]+)\b', timeout=timeout)
3129        (source,) = self.pexpect.match.groups()
3130        source = source.decode('UTF-8')
3131
3132        return source
3133
3134    def coap_set_resource_path(self, path):
3135        """
3136        Set the path for the CoAP resource.
3137        """
3138        cmd = 'coap resource %s' % path
3139        self.send_command(cmd)
3140        self._expect_done()
3141
3142    def coap_set_resource_path_block(self, path, count=0):
3143        """
3144        Set the path for the CoAP resource and how many blocks can be received from this resource.
3145        """
3146        cmd = 'coap resource %s %d' % (path, count)
3147        self.send_command(cmd)
3148        self._expect('Done')
3149
3150    def coap_set_content(self, content):
3151        """
3152        Set the content of the CoAP resource.
3153        """
3154        cmd = 'coap set %s' % content
3155        self.send_command(cmd)
3156        self._expect_done()
3157
3158    def coap_start(self):
3159        """
3160        Start the CoAP service.
3161        """
3162        cmd = 'coap start'
3163        self.send_command(cmd)
3164        self._expect_done()
3165
3166    def coap_stop(self):
3167        """
3168        Stop the CoAP service.
3169        """
3170        cmd = 'coap stop'
3171        self.send_command(cmd)
3172
3173        if isinstance(self.simulator, simulator.VirtualTime):
3174            self.simulator.go(5)
3175            timeout = 1
3176        else:
3177            timeout = 5
3178
3179        self._expect_done(timeout=timeout)
3180
3181    def coaps_start_psk(self, psk, pskIdentity):
3182        cmd = 'coaps psk %s %s' % (psk, pskIdentity)
3183        self.send_command(cmd)
3184        self._expect_done()
3185
3186        cmd = 'coaps start'
3187        self.send_command(cmd)
3188        self._expect_done()
3189
3190    def coaps_start_x509(self):
3191        cmd = 'coaps x509'
3192        self.send_command(cmd)
3193        self._expect_done()
3194
3195        cmd = 'coaps start'
3196        self.send_command(cmd)
3197        self._expect_done()
3198
3199    def coaps_set_resource_path(self, path):
3200        cmd = 'coaps resource %s' % path
3201        self.send_command(cmd)
3202        self._expect_done()
3203
3204    def coaps_stop(self):
3205        cmd = 'coaps stop'
3206        self.send_command(cmd)
3207
3208        if isinstance(self.simulator, simulator.VirtualTime):
3209            self.simulator.go(5)
3210            timeout = 1
3211        else:
3212            timeout = 5
3213
3214        self._expect_done(timeout=timeout)
3215
3216    def coaps_connect(self, ipaddr):
3217        cmd = 'coaps connect %s' % ipaddr
3218        self.send_command(cmd)
3219
3220        if isinstance(self.simulator, simulator.VirtualTime):
3221            self.simulator.go(5)
3222            timeout = 1
3223        else:
3224            timeout = 5
3225
3226        self._expect('coaps connected', timeout=timeout)
3227
3228    def coaps_disconnect(self):
3229        cmd = 'coaps disconnect'
3230        self.send_command(cmd)
3231        self._expect_done()
3232        self.simulator.go(5)
3233
3234    def coaps_get(self):
3235        cmd = 'coaps get test'
3236        self.send_command(cmd)
3237
3238        if isinstance(self.simulator, simulator.VirtualTime):
3239            self.simulator.go(5)
3240            timeout = 1
3241        else:
3242            timeout = 5
3243
3244        self._expect('coaps response', timeout=timeout)
3245
3246    def commissioner_mgmtget(self, tlvs_binary=None):
3247        cmd = 'commissioner mgmtget'
3248        if tlvs_binary is not None:
3249            cmd += ' -x %s' % tlvs_binary
3250        self.send_command(cmd)
3251        self._expect_done()
3252
3253    def commissioner_mgmtset(self, tlvs_binary):
3254        cmd = 'commissioner mgmtset -x %s' % tlvs_binary
3255        self.send_command(cmd)
3256        self._expect_done()
3257
3258    def bytes_to_hex_str(self, src):
3259        return ''.join(format(x, '02x') for x in src)
3260
3261    def commissioner_mgmtset_with_tlvs(self, tlvs):
3262        payload = bytearray()
3263        for tlv in tlvs:
3264            payload += tlv.to_hex()
3265        self.commissioner_mgmtset(self.bytes_to_hex_str(payload))
3266
3267    def udp_start(self, local_ipaddr, local_port, bind_unspecified=False):
3268        cmd = 'udp open'
3269        self.send_command(cmd)
3270        self._expect_done()
3271
3272        cmd = 'udp bind %s %s %s' % ("-u" if bind_unspecified else "", local_ipaddr, local_port)
3273        self.send_command(cmd)
3274        self._expect_done()
3275
3276    def udp_stop(self):
3277        cmd = 'udp close'
3278        self.send_command(cmd)
3279        self._expect_done()
3280
3281    def udp_send(self, bytes, ipaddr, port, success=True):
3282        cmd = 'udp send %s %d -s %d ' % (ipaddr, port, bytes)
3283        self.send_command(cmd)
3284        if success:
3285            self._expect_done()
3286        else:
3287            self._expect('Error')
3288
3289    def udp_check_rx(self, bytes_should_rx):
3290        self._expect('%d bytes' % bytes_should_rx)
3291
3292    def set_routereligible(self, enable: bool):
3293        cmd = f'routereligible {"enable" if enable else "disable"}'
3294        self.send_command(cmd)
3295        self._expect_done()
3296
3297    def router_list(self):
3298        cmd = 'router list'
3299        self.send_command(cmd)
3300        self._expect([r'(\d+)((\s\d+)*)'])
3301
3302        g = self.pexpect.match.groups()
3303        router_list = g[0].decode('utf8') + ' ' + g[1].decode('utf8')
3304        router_list = [int(x) for x in router_list.split()]
3305        self._expect_done()
3306        return router_list
3307
3308    def router_table(self):
3309        cmd = 'router table'
3310        self.send_command(cmd)
3311
3312        self._expect(r'(.*)Done')
3313        g = self.pexpect.match.groups()
3314        output = g[0].decode('utf8')
3315        lines = output.strip().split('\n')
3316        lines = [l.strip() for l in lines]
3317        router_table = {}
3318        for i, line in enumerate(lines):
3319            if not line.startswith('|') or not line.endswith('|'):
3320                if i not in (0, 2):
3321                    # should not happen
3322                    print("unexpected line %d: %s" % (i, line))
3323
3324                continue
3325
3326            line = line[1:][:-1]
3327            line = [x.strip() for x in line.split('|')]
3328            if len(line) < 9:
3329                print("unexpected line %d: %s" % (i, line))
3330                continue
3331
3332            try:
3333                int(line[0])
3334            except ValueError:
3335                if i != 1:
3336                    print("unexpected line %d: %s" % (i, line))
3337                continue
3338
3339            id = int(line[0])
3340            rloc16 = int(line[1], 16)
3341            nexthop = int(line[2])
3342            pathcost = int(line[3])
3343            lqin = int(line[4])
3344            lqout = int(line[5])
3345            age = int(line[6])
3346            emac = str(line[7])
3347            link = int(line[8])
3348
3349            router_table[id] = {
3350                'rloc16': rloc16,
3351                'nexthop': nexthop,
3352                'pathcost': pathcost,
3353                'lqin': lqin,
3354                'lqout': lqout,
3355                'age': age,
3356                'emac': emac,
3357                'link': link,
3358            }
3359
3360        return router_table
3361
3362    def link_metrics_request_single_probe(self, dst_addr: str, linkmetrics_flags: str, mode: str = ''):
3363        cmd = 'linkmetrics request %s %s single %s' % (mode, dst_addr, linkmetrics_flags)
3364        self.send_command(cmd)
3365        self.simulator.go(5)
3366        return self._parse_linkmetrics_query_result(self._expect_command_output())
3367
3368    def link_metrics_request_forward_tracking_series(self, dst_addr: str, series_id: int, mode: str = ''):
3369        cmd = 'linkmetrics request %s %s forward %d' % (mode, dst_addr, series_id)
3370        self.send_command(cmd)
3371        self.simulator.go(5)
3372        return self._parse_linkmetrics_query_result(self._expect_command_output())
3373
3374    def _parse_linkmetrics_query_result(self, lines):
3375        """Parse link metrics query result"""
3376
3377        # Example of command output:
3378        # ['Received Link Metrics Report from: fe80:0:0:0:146e:a00:0:1',
3379        #  '- PDU Counter: 1 (Count/Summation)',
3380        #  '- LQI: 0 (Exponential Moving Average)',
3381        #  '- Margin: 80 (dB) (Exponential Moving Average)',
3382        #  '- RSSI: -20 (dBm) (Exponential Moving Average)']
3383        #
3384        # Or 'Link Metrics Report, status: {status}'
3385
3386        result = {}
3387        for line in lines:
3388            if line.startswith('- '):
3389                k, v = line[2:].split(': ')
3390                result[k] = v.split(' ')[0]
3391            elif line.startswith('Link Metrics Report, status: '):
3392                result['Status'] = line[29:]
3393        return result
3394
3395    def link_metrics_config_req_enhanced_ack_based_probing(self,
3396                                                           dst_addr: str,
3397                                                           enable: bool,
3398                                                           metrics_flags: str,
3399                                                           ext_flags='',
3400                                                           mode: str = ''):
3401        cmd = "linkmetrics config %s %s enhanced-ack" % (mode, dst_addr)
3402        if enable:
3403            cmd = cmd + (" register %s %s" % (metrics_flags, ext_flags))
3404        else:
3405            cmd = cmd + " clear"
3406        self.send_command(cmd)
3407        self._expect_done()
3408
3409    def link_metrics_config_req_forward_tracking_series(self,
3410                                                        dst_addr: str,
3411                                                        series_id: int,
3412                                                        series_flags: str,
3413                                                        metrics_flags: str,
3414                                                        mode: str = ''):
3415        cmd = "linkmetrics config %s %s forward %d %s %s" % (mode, dst_addr, series_id, series_flags, metrics_flags)
3416        self.send_command(cmd)
3417        self._expect_done()
3418
3419    def link_metrics_send_link_probe(self, dst_addr: str, series_id: int, length: int):
3420        cmd = "linkmetrics probe %s %d %d" % (dst_addr, series_id, length)
3421        self.send_command(cmd)
3422        self._expect_done()
3423
3424    def link_metrics_mgr_set_enabled(self, enable: bool):
3425        op_str = "enable" if enable else "disable"
3426        cmd = f'linkmetricsmgr {op_str}'
3427        self.send_command(cmd)
3428        self._expect_done()
3429
3430    def send_address_notification(self, dst: str, target: str, mliid: str):
3431        cmd = f'fake /a/an {dst} {target} {mliid}'
3432        self.send_command(cmd)
3433        self._expect_done()
3434
3435    def send_proactive_backbone_notification(self, target: str, mliid: str, ltt: int):
3436        cmd = f'fake /b/ba {target} {mliid} {ltt}'
3437        self.send_command(cmd)
3438        self._expect_done()
3439
3440    def dns_get_config(self):
3441        """
3442        Returns the DNS config as a list of property dictionary (string key and string value).
3443
3444        Example output:
3445        {
3446            'Server': '[fd00:0:0:0:0:0:0:1]:1234'
3447            'ResponseTimeout': '5000 ms'
3448            'MaxTxAttempts': '2'
3449            'RecursionDesired': 'no'
3450        }
3451        """
3452        cmd = f'dns config'
3453        self.send_command(cmd)
3454        output = self._expect_command_output()
3455        config = {}
3456        for line in output:
3457            k, v = line.split(': ')
3458            config[k] = v
3459        return config
3460
3461    def dns_set_config(self, config):
3462        cmd = f'dns config {config}'
3463        self.send_command(cmd)
3464        self._expect_done()
3465
3466    def dns_resolve(self, hostname, server=None, port=53):
3467        cmd = f'dns resolve {hostname}'
3468        if server is not None:
3469            cmd += f' {server} {port}'
3470
3471        self.send_command(cmd)
3472        self.simulator.go(10)
3473        output = self._expect_command_output()
3474        dns_resp = output[0]
3475        # example output: "DNS response for host1.default.service.arpa. - fd00:db8:0:0:fd3d:d471:1e8c:b60 TTL:7190 "
3476        #                 " fd00:db8:0:0:0:ff:fe00:9000 TTL:7190"
3477        addrs = dns_resp.strip().split(' - ')[1].split(' ')
3478        ip = [item.strip() for item in addrs[::2]]
3479        ttl = [int(item.split('TTL:')[1]) for item in addrs[1::2]]
3480
3481        return list(zip(ip, ttl))
3482
3483    def _parse_dns_service_info(self, output):
3484        # Example of `output`
3485        #   Port:22222, Priority:2, Weight:2, TTL:7155
3486        #   Host:host2.default.service.arpa.
3487        #   HostAddress:0:0:0:0:0:0:0:0 TTL:0
3488        #   TXT:[a=00, b=02bb] TTL:7155
3489
3490        m = re.match(
3491            r'.*Port:(\d+), Priority:(\d+), Weight:(\d+), TTL:(\d+)\s+Host:(.*?)\s+HostAddress:(\S+) TTL:(\d+)\s+TXT:\[(.*?)\] TTL:(\d+)',
3492            '\r'.join(output))
3493        if not m:
3494            return {}
3495        port, priority, weight, srv_ttl, hostname, address, aaaa_ttl, txt_data, txt_ttl = m.groups()
3496        return {
3497            'port': int(port),
3498            'priority': int(priority),
3499            'weight': int(weight),
3500            'host': hostname,
3501            'address': address,
3502            'txt_data': txt_data,
3503            'srv_ttl': int(srv_ttl),
3504            'txt_ttl': int(txt_ttl),
3505            'aaaa_ttl': int(aaaa_ttl),
3506        }
3507
3508    def dns_resolve_service(self, instance, service, server=None, port=53):
3509        """
3510        Resolves the service instance and returns the instance information as a dict.
3511
3512        Example return value:
3513            {
3514                'port': 12345,
3515                'priority': 0,
3516                'weight': 0,
3517                'host': 'ins1._ipps._tcp.default.service.arpa.',
3518                'address': '2001::1',
3519                'txt_data': 'a=00, b=02bb',
3520                'srv_ttl': 7100,
3521                'txt_ttl': 7100,
3522                'aaaa_ttl': 7100,
3523            }
3524        """
3525        instance = self._escape_escapable(instance)
3526        cmd = f'dns service {instance} {service}'
3527        if server is not None:
3528            cmd += f' {server} {port}'
3529
3530        self.send_command(cmd)
3531        self.simulator.go(10)
3532        output = self._expect_command_output()
3533        info = self._parse_dns_service_info(output)
3534        if not info:
3535            raise Exception('dns resolve service failed: %s.%s' % (instance, service))
3536        return info
3537
3538    @staticmethod
3539    def __parse_hex_string(hexstr: str) -> bytes:
3540        assert (len(hexstr) % 2 == 0)
3541        return bytes(int(hexstr[i:i + 2], 16) for i in range(0, len(hexstr), 2))
3542
3543    def dns_browse(self, service_name, server=None, port=53):
3544        """
3545        Browse the service and returns the instances.
3546
3547        Example return value:
3548            {
3549                'ins1': {
3550                    'port': 12345,
3551                    'priority': 1,
3552                    'weight': 1,
3553                    'host': 'ins1._ipps._tcp.default.service.arpa.',
3554                    'address': '2001::1',
3555                    'txt_data': 'a=00, b=11cf',
3556                    'srv_ttl': 7100,
3557                    'txt_ttl': 7100,
3558                    'aaaa_ttl': 7100,
3559                },
3560                'ins2': {
3561                    'port': 12345,
3562                    'priority': 2,
3563                    'weight': 2,
3564                    'host': 'ins2._ipps._tcp.default.service.arpa.',
3565                    'address': '2001::2',
3566                    'txt_data': 'a=01, b=23dd',
3567                    'srv_ttl': 7100,
3568                    'txt_ttl': 7100,
3569                    'aaaa_ttl': 7100,
3570                }
3571            }
3572        """
3573        cmd = f'dns browse {service_name}'
3574        if server is not None:
3575            cmd += f' {server} {port}'
3576
3577        self.send_command(cmd)
3578        self.simulator.go(10)
3579        output = self._expect_command_output()
3580
3581        # Example output:
3582        # DNS browse response for _ipps._tcp.default.service.arpa.
3583        # ins2
3584        #     Port:22222, Priority:2, Weight:2, TTL:7175
3585        #     Host:host2.default.service.arpa.
3586        #     HostAddress:fd00:db8:0:0:3205:28dd:5b87:6a63 TTL:7175
3587        #     TXT:[a=00, b=11cf] TTL:7175
3588        # ins1
3589        #     Port:11111, Priority:1, Weight:1, TTL:7170
3590        #     Host:host1.default.service.arpa.
3591        #     HostAddress:fd00:db8:0:0:39f4:d9:eb4f:778 TTL:7170
3592        #     TXT:[a=01, b=23dd] TTL:7170
3593        # Done
3594
3595        result = {}
3596        index = 1  # skip first line
3597        while index < len(output):
3598            ins = output[index].strip()
3599            result[ins] = self._parse_dns_service_info(output[index + 1:index + 6])
3600            index = index + (5 if result[ins] else 1)
3601        return result
3602
3603    def set_mliid(self, mliid: str):
3604        cmd = f'mliid {mliid}'
3605        self.send_command(cmd)
3606        self._expect_command_output()
3607
3608    def history_netinfo(self, num_entries=0):
3609        """
3610        Get the `netinfo` history list, parse each entry and return
3611        a list of dictionary (string key and string value) entries.
3612
3613        Example of return value:
3614        [
3615            {
3616                'age': '00:00:00.000 ago',
3617                'role': 'disabled',
3618                'mode': 'rdn',
3619                'rloc16': '0x7400',
3620                'partition-id': '1318093703'
3621            },
3622            {
3623                'age': '00:00:02.588 ago',
3624                'role': 'leader',
3625                'mode': 'rdn',
3626                'rloc16': '0x7400',
3627                'partition-id': '1318093703'
3628            }
3629        ]
3630        """
3631        cmd = f'history netinfo list {num_entries}'
3632        self.send_command(cmd)
3633        output = self._expect_command_output()
3634        netinfos = []
3635        for entry in output:
3636            netinfo = {}
3637            age, info = entry.split(' -> ')
3638            netinfo['age'] = age
3639            for item in info.split(' '):
3640                k, v = item.split(':')
3641                netinfo[k] = v
3642            netinfos.append(netinfo)
3643        return netinfos
3644
3645    def history_rx(self, num_entries=0):
3646        """
3647        Get the IPv6 RX history list, parse each entry and return
3648        a list of dictionary (string key and string value) entries.
3649
3650        Example of return value:
3651        [
3652            {
3653                'age': '00:00:01.999',
3654                'type': 'ICMP6(EchoReqst)',
3655                'len': '16',
3656                'sec': 'yes',
3657                'prio': 'norm',
3658                'rss': '-20',
3659                'from': '0xac00',
3660                'radio': '15.4',
3661                'src': '[fd00:db8:0:0:2cfa:fd61:58a9:f0aa]:0',
3662                'dst': '[fd00:db8:0:0:ed7e:2d04:e543:eba5]:0',
3663            }
3664        ]
3665        """
3666        cmd = f'history rx list {num_entries}'
3667        self.send_command(cmd)
3668        return self._parse_history_rx_tx_ouput(self._expect_command_output())
3669
3670    def history_tx(self, num_entries=0):
3671        """
3672        Get the IPv6 TX history list, parse each entry and return
3673        a list of dictionary (string key and string value) entries.
3674
3675        Example of return value:
3676        [
3677            {
3678                'age': '00:00:01.999',
3679                'type': 'ICMP6(EchoReply)',
3680                'len': '16',
3681                'sec': 'yes',
3682                'prio': 'norm',
3683                'to': '0xac00',
3684                'tx-success': 'yes',
3685                'radio': '15.4',
3686                'src': '[fd00:db8:0:0:ed7e:2d04:e543:eba5]:0',
3687                'dst': '[fd00:db8:0:0:2cfa:fd61:58a9:f0aa]:0',
3688
3689            }
3690        ]
3691        """
3692        cmd = f'history tx list {num_entries}'
3693        self.send_command(cmd)
3694        return self._parse_history_rx_tx_ouput(self._expect_command_output())
3695
3696    def _parse_history_rx_tx_ouput(self, lines):
3697        rxtx_list = []
3698        for line in lines:
3699            if line.strip().startswith('type:'):
3700                for item in line.strip().split(' '):
3701                    k, v = item.split(':')
3702                    entry[k] = v
3703            elif line.strip().startswith('src:'):
3704                entry['src'] = line[4:]
3705            elif line.strip().startswith('dst:'):
3706                entry['dst'] = line[4:]
3707                rxtx_list.append(entry)
3708            else:
3709                entry = {}
3710                entry['age'] = line
3711
3712        return rxtx_list
3713
3714    def set_router_id_range(self, min_router_id: int, max_router_id: int):
3715        cmd = f'routeridrange {min_router_id} {max_router_id}'
3716        self.send_command(cmd)
3717        self._expect_command_output()
3718
3719    def get_router_id_range(self):
3720        cmd = 'routeridrange'
3721        self.send_command(cmd)
3722        line = self._expect_command_output()[0]
3723        return [int(item) for item in line.split()]
3724
3725    def get_channel_monitor_info(self) -> Dict:
3726        """
3727        Returns:
3728            Dict of channel monitor info, e.g.
3729                {'enabled': '1',
3730                 'interval': '41000',
3731                 'threshold': '-75',
3732                 'window': '960',
3733                 'count': '985',
3734                 'occupancies': {
3735                    '11': '0.00%',
3736                    '12': '3.50%',
3737                    '13': '9.89%',
3738                    '14': '15.36%',
3739                    '15': '20.02%',
3740                    '16': '21.95%',
3741                    '17': '32.71%',
3742                    '18': '35.76%',
3743                    '19': '37.97%',
3744                    '20': '43.68%',
3745                    '21': '48.95%',
3746                    '22': '54.05%',
3747                    '23': '58.65%',
3748                    '24': '68.26%',
3749                    '25': '66.73%',
3750                    '26': '73.12%'
3751                    }
3752                }
3753        """
3754        config = {}
3755        self.send_command('channel monitor')
3756
3757        for line in self._expect_results(r'\S+'):
3758            if re.match(r'.*:\s.*', line):
3759                key, val = line.split(':')
3760                config.update({key: val.strip()})
3761            elif re.match(r'.*:', line):  # occupancy
3762                occ_key, val = line.split(':')
3763                val = {}
3764                config.update({occ_key: val})
3765            elif 'busy' in line:
3766                # channel occupancies
3767                key = line.split()[1]
3768                val = line.split()[3]
3769                config[occ_key].update({key: val})
3770        return config
3771
3772    def set_channel_manager_auto_enable(self, enable: bool):
3773        self.send_command(f'channel manager auto {int(enable)}')
3774        self._expect_done()
3775
3776    def set_channel_manager_autocsl_enable(self, enable: bool):
3777        self.send_command(f'channel manager autocsl {int(enable)}')
3778        self._expect_done()
3779
3780    def set_channel_manager_supported(self, channel_mask: int):
3781        self.send_command(f'channel manager supported {int(channel_mask)}')
3782        self._expect_done()
3783
3784    def set_channel_manager_favored(self, channel_mask: int):
3785        self.send_command(f'channel manager favored {int(channel_mask)}')
3786        self._expect_done()
3787
3788    def set_channel_manager_interval(self, interval: int):
3789        self.send_command(f'channel manager interval {interval}')
3790        self._expect_done()
3791
3792    def set_channel_manager_cca_threshold(self, hex_value: str):
3793        self.send_command(f'channel manager threshold {hex_value}')
3794        self._expect_done()
3795
3796    def get_channel_manager_config(self):
3797        self.send_command('channel manager')
3798        return self._expect_key_value_pairs(r'\S+')
3799
3800
3801class Node(NodeImpl, OtCli):
3802    pass
3803
3804
3805class LinuxHost():
3806    PING_RESPONSE_PATTERN = re.compile(r'\d+ bytes from .*:.*')
3807    ETH_DEV = config.BACKBONE_IFNAME
3808
3809    def enable_ether(self):
3810        """Enable the ethernet interface.
3811        """
3812
3813        self.bash(f'ip link set {self.ETH_DEV} up')
3814
3815    def disable_ether(self):
3816        """Disable the ethernet interface.
3817        """
3818
3819        self.bash(f'ip link set {self.ETH_DEV} down')
3820
3821    def get_ether_addrs(self, ipv4=False, ipv6=True):
3822        output = self.bash(f'ip addr list dev {self.ETH_DEV}')
3823
3824        addrs = []
3825        for line in output:
3826            # line examples:
3827            # "inet6 fe80::42:c0ff:fea8:903/64 scope link"
3828            # "inet 192.168.9.1/24 brd 192.168.9.255 scope global eth0"
3829            line = line.strip().split()
3830
3831            if not line or not line[0].startswith('inet'):
3832                continue
3833            if line[0] == 'inet' and not ipv4:
3834                continue
3835            if line[0] == 'inet6' and not ipv6:
3836                continue
3837
3838            addr = line[1]
3839            if '/' in addr:
3840                addr = addr.split('/')[0]
3841            addrs.append(addr)
3842
3843        logging.debug('%s: get_ether_addrs: %r', self, addrs)
3844        return addrs
3845
3846    def get_ether_mac(self):
3847        output = self.bash(f'ip addr list dev {self.ETH_DEV}')
3848        for line in output:
3849            # link/ether 02:42:ac:11:00:02 brd ff:ff:ff:ff:ff:ff link-netnsid 0
3850            line = line.strip().split()
3851            if line and line[0] == 'link/ether':
3852                return line[1]
3853
3854        assert False, output
3855
3856    def add_ipmaddr_ether(self, ip: str):
3857        cmd = f'python3 /app/third_party/openthread/repo/tests/scripts/thread-cert/mcast6.py {self.ETH_DEV} {ip} &'
3858        self.bash(cmd)
3859
3860    def ping_ether(self, ipaddr, num_responses=1, size=None, timeout=5, ttl=None, interface='eth0') -> int:
3861
3862        cmd = f'ping -6 {ipaddr} -I {interface} -c {num_responses} -W {timeout}'
3863        if size is not None:
3864            cmd += f' -s {size}'
3865
3866        if ttl is not None:
3867            cmd += f' -t {ttl}'
3868
3869        resp_count = 0
3870
3871        try:
3872            for line in self.bash(cmd):
3873                if self.PING_RESPONSE_PATTERN.match(line):
3874                    resp_count += 1
3875        except subprocess.CalledProcessError:
3876            pass
3877
3878        return resp_count
3879
3880    def get_ip6_address(self, address_type: config.ADDRESS_TYPE):
3881        """Get specific type of IPv6 address configured on thread device.
3882
3883        Args:
3884            address_type: the config.ADDRESS_TYPE type of IPv6 address.
3885
3886        Returns:
3887            IPv6 address string.
3888        """
3889        if address_type == config.ADDRESS_TYPE.BACKBONE_GUA:
3890            return self._getBackboneGua()
3891        elif address_type == config.ADDRESS_TYPE.BACKBONE_LINK_LOCAL:
3892            return self._getInfraLinkLocalAddress()
3893        elif address_type == config.ADDRESS_TYPE.ONLINK_ULA:
3894            return self._getInfraUla()
3895        elif address_type == config.ADDRESS_TYPE.ONLINK_GUA:
3896            return self._getInfraGua()
3897        else:
3898            raise ValueError(f'unsupported address type: {address_type}')
3899
3900    def _getBackboneGua(self) -> Optional[str]:
3901        for addr in self.get_ether_addrs():
3902            if re.match(config.BACKBONE_PREFIX_REGEX_PATTERN, addr, re.I):
3903                return addr
3904
3905        return None
3906
3907    def _getInfraUla(self) -> Optional[str]:
3908        """ Returns the ULA addresses autoconfigured on the infra link.
3909        """
3910        addrs = []
3911        for addr in self.get_ether_addrs():
3912            if re.match(config.ONLINK_PREFIX_REGEX_PATTERN, addr, re.I):
3913                addrs.append(addr)
3914
3915        return addrs
3916
3917    def _getInfraGua(self) -> Optional[str]:
3918        """ Returns the GUA addresses autoconfigured on the infra link.
3919        """
3920
3921        gua_prefix = config.ONLINK_GUA_PREFIX.split('::/')[0]
3922        return [addr for addr in self.get_ether_addrs() if addr.startswith(gua_prefix)]
3923
3924    def _getInfraLinkLocalAddress(self) -> Optional[str]:
3925        """ Returns the link-local address autoconfigured on the infra link, which is started with "fe80".
3926        """
3927        for addr in self.get_ether_addrs():
3928            if re.match(config.LINK_LOCAL_REGEX_PATTERN, addr, re.I):
3929                return addr
3930
3931        return None
3932
3933    def ping(self, *args, **kwargs):
3934        backbone = kwargs.pop('backbone', False)
3935        if backbone:
3936            return self.ping_ether(*args, **kwargs)
3937        else:
3938            return super().ping(*args, **kwargs)
3939
3940    def udp_send_host(self, ipaddr, port, data, hop_limit=None):
3941        if hop_limit is None:
3942            if ipaddress.ip_address(ipaddr).is_multicast:
3943                hop_limit = 10
3944            else:
3945                hop_limit = 64
3946        cmd = f'python3 /app/third_party/openthread/repo/tests/scripts/thread-cert/udp_send_host.py {ipaddr} {port} "{data}" {hop_limit}'
3947        self.bash(cmd)
3948
3949    def add_ipmaddr(self, *args, **kwargs):
3950        backbone = kwargs.pop('backbone', False)
3951        if backbone:
3952            return self.add_ipmaddr_ether(*args, **kwargs)
3953        else:
3954            return super().add_ipmaddr(*args, **kwargs)
3955
3956    def ip_neighbors_flush(self):
3957        # clear neigh cache on linux
3958        self.bash(f'ip -6 neigh list dev {self.ETH_DEV}')
3959        self.bash(f'ip -6 neigh flush nud all nud failed nud noarp dev {self.ETH_DEV}')
3960        self.bash('ip -6 neigh list nud all dev %s | cut -d " " -f1 | sudo xargs -I{} ip -6 neigh delete {} dev %s' %
3961                  (self.ETH_DEV, self.ETH_DEV))
3962        self.bash(f'ip -6 neigh list dev {self.ETH_DEV}')
3963
3964    def publish_mdns_service(self, instance_name, service_type, port, host_name, txt):
3965        """Publish an mDNS service on the Ethernet.
3966
3967        :param instance_name: the service instance name.
3968        :param service_type: the service type in format of '<service_type>.<protocol>'.
3969        :param port: the port the service is at.
3970        :param host_name: the host name this service points to. The domain
3971                          should not be included.
3972        :param txt: a dictionary containing the key-value pairs of the TXT record.
3973        """
3974        txt_string = ' '.join([f'{key}={value}' for key, value in txt.items()])
3975        self.bash(f'avahi-publish -s {instance_name}  {service_type} {port} -H {host_name}.local {txt_string} &')
3976
3977    def publish_mdns_host(self, hostname, addresses):
3978        """Publish an mDNS host on the Ethernet
3979
3980        :param host_name: the host name this service points to. The domain
3981                          should not be included.
3982        :param addresses: a list of strings representing the addresses to
3983                          be registered with the host.
3984        """
3985        for address in addresses:
3986            self.bash(f'avahi-publish -a {hostname}.local {address} &')
3987
3988    def browse_mdns_services(self, name, timeout=2):
3989        """ Browse mDNS services on the ethernet.
3990
3991        :param name: the service type name in format of '<service-name>.<protocol>'.
3992        :param timeout: timeout value in seconds before returning.
3993        :return: A list of service instance names.
3994        """
3995
3996        self.bash(f'dns-sd -Z {name} local. > /tmp/{name} 2>&1 &')
3997        time.sleep(timeout)
3998        self.bash('pkill dns-sd')
3999
4000        instances = []
4001        for line in self.bash(f'cat /tmp/{name}', encoding='raw_unicode_escape'):
4002            elements = line.split()
4003            if len(elements) >= 3 and elements[0] == name and elements[1] == 'PTR':
4004                instances.append(elements[2][:-len('.' + name)])
4005        return instances
4006
4007    def discover_mdns_service(self, instance, name, host_name, timeout=2):
4008        """ Discover/resolve the mDNS service on ethernet.
4009
4010        :param instance: the service instance name.
4011        :param name: the service name in format of '<service-name>.<protocol>'.
4012        :param host_name: the host name this service points to. The domain
4013                          should not be included.
4014        :param timeout: timeout value in seconds before returning.
4015        :return: a dict of service properties or None.
4016
4017        The return value is a dict with the same key/values of srp_server_get_service
4018        except that we don't have a `deleted` field here.
4019        """
4020        host_name_file = self.bash('mktemp')[0].strip()
4021        service_data_file = self.bash('mktemp')[0].strip()
4022
4023        self.bash(f'dns-sd -Z {name} local. > {service_data_file} 2>&1 &')
4024        time.sleep(timeout)
4025
4026        full_service_name = f'{instance}.{name}'
4027        # When hostname is unspecified, extract hostname from browse result
4028        if host_name is None:
4029            for line in self.bash(f'cat {service_data_file}', encoding='raw_unicode_escape'):
4030                elements = line.split()
4031                if len(elements) >= 6 and elements[0] == full_service_name and elements[1] == 'SRV':
4032                    host_name = elements[5].split('.')[0]
4033                    break
4034
4035        assert (host_name is not None)
4036        self.bash(f'dns-sd -G v6 {host_name}.local. > {host_name_file} 2>&1 &')
4037        time.sleep(timeout)
4038
4039        self.bash('pkill dns-sd')
4040        addresses = []
4041        service = {}
4042
4043        logging.debug(self.bash(f'cat {host_name_file}', encoding='raw_unicode_escape'))
4044        logging.debug(self.bash(f'cat {service_data_file}', encoding='raw_unicode_escape'))
4045
4046        # example output in the host file:
4047        # Timestamp     A/R Flags if Hostname                               Address                                     TTL
4048        # 9:38:09.274  Add     23 48 my-host.local.                         2001:0000:0000:0000:0000:0000:0000:0002%<0>  120
4049        #
4050        for line in self.bash(f'cat {host_name_file}', encoding='raw_unicode_escape'):
4051            elements = line.split()
4052            fullname = f'{host_name}.local.'
4053            if fullname not in elements:
4054                continue
4055            if 'Add' not in elements:
4056                continue
4057            addresses.append(elements[elements.index(fullname) + 1].split('%')[0])
4058
4059        logging.debug(f'addresses of {host_name}: {addresses}')
4060
4061        # example output of in the service file:
4062        # _ipps._tcp                                      PTR     my-service._ipps._tcp
4063        # my-service._ipps._tcp                           SRV     0 0 12345 my-host.local. ; Replace with unicast FQDN of target host
4064        # my-service._ipps._tcp                           TXT     ""
4065        #
4066        is_txt = False
4067        txt = ''
4068        for line in self.bash(f'cat {service_data_file}', encoding='raw_unicode_escape'):
4069            elements = line.split()
4070            if len(elements) >= 2 and elements[0] == full_service_name and elements[1] == 'TXT':
4071                is_txt = True
4072            if is_txt:
4073                txt += line.strip()
4074                if line.strip().endswith('"'):
4075                    is_txt = False
4076                    txt_dict = self.__parse_dns_sd_txt(txt)
4077                    logging.info(f'txt = {txt_dict}')
4078                    service['txt'] = txt_dict
4079
4080            if not elements or elements[0] != full_service_name:
4081                continue
4082            if elements[1] == 'SRV':
4083                service['fullname'] = elements[0]
4084                service['instance'] = instance
4085                service['name'] = name
4086                service['priority'] = int(elements[2])
4087                service['weight'] = int(elements[3])
4088                service['port'] = int(elements[4])
4089                service['host_fullname'] = elements[5]
4090                assert (service['host_fullname'] == f'{host_name}.local.')
4091                service['host'] = host_name
4092                service['addresses'] = addresses
4093        return service or None
4094
4095    def start_radvd_service(self, prefix, slaac):
4096        self.bash("""cat >/etc/radvd.conf <<EOF
4097interface eth0
4098{
4099    AdvSendAdvert on;
4100
4101    AdvReachableTime 200;
4102    AdvRetransTimer 200;
4103    AdvDefaultLifetime 1800;
4104    MinRtrAdvInterval 1200;
4105    MaxRtrAdvInterval 1800;
4106    AdvDefaultPreference low;
4107
4108    prefix %s
4109    {
4110        AdvOnLink on;
4111        AdvAutonomous %s;
4112        AdvRouterAddr off;
4113        AdvPreferredLifetime 1800;
4114        AdvValidLifetime 1800;
4115    };
4116};
4117EOF
4118""" % (prefix, 'on' if slaac else 'off'))
4119        self.bash('service radvd start')
4120        self.bash('service radvd status')  # Make sure radvd service is running
4121
4122    def start_pd_radvd_service(self, prefix):
4123        self.bash("""cat >/etc/radvd.conf <<EOF
4124interface wpan0
4125{
4126    AdvSendAdvert on;
4127
4128    AdvReachableTime 20;
4129    AdvRetransTimer 20;
4130    AdvDefaultLifetime 180;
4131    MinRtrAdvInterval 120;
4132    MaxRtrAdvInterval 180;
4133    AdvDefaultPreference low;
4134
4135    prefix %s
4136    {
4137        AdvOnLink on;
4138        AdvAutonomous on;
4139        AdvRouterAddr off;
4140        AdvPreferredLifetime 180;
4141        AdvValidLifetime 180;
4142    };
4143};
4144EOF
4145""" % (prefix,))
4146        self.bash('service radvd start')
4147        self.bash('service radvd status')  # Make sure radvd service is running
4148
4149    def stop_radvd_service(self):
4150        self.bash('service radvd stop')
4151
4152    def kill_radvd_service(self):
4153        self.bash('pkill radvd')
4154
4155    def __parse_dns_sd_txt(self, line: str):
4156        # Example TXT entry:
4157        # "xp=\\000\\013\\184\\000\\000\\000\\000\\000"
4158        txt = {}
4159        for entry in re.findall(r'"((?:[^\\]|\\.)*?)"', line):
4160            if '=' not in entry:
4161                continue
4162
4163            k, v = entry.split('=', 1)
4164            txt[k] = v
4165
4166        return txt
4167
4168
4169class OtbrNode(LinuxHost, NodeImpl, OtbrDocker):
4170    TUN_DEV = config.THREAD_IFNAME
4171    is_otbr = True
4172    is_bbr = True  # OTBR is also BBR
4173    node_type = 'otbr-docker'
4174
4175    def __repr__(self):
4176        return f'Otbr<{self.nodeid}>'
4177
4178    def start(self):
4179        self._setup_sysctl()
4180        self.set_log_level(5)
4181        super().start()
4182
4183    def add_ipaddr(self, addr):
4184        cmd = f'ip -6 addr add {addr}/64 dev {self.TUN_DEV}'
4185        self.bash(cmd)
4186
4187    def add_ipmaddr_tun(self, ip: str):
4188        cmd = f'python3 /app/third_party/openthread/repo/tests/scripts/thread-cert/mcast6.py {self.TUN_DEV} {ip} &'
4189        self.bash(cmd)
4190
4191    def get_ip6_address(self, address_type: config.ADDRESS_TYPE):
4192        try:
4193            return super(OtbrNode, self).get_ip6_address(address_type)
4194        except Exception as e:
4195            return super(LinuxHost, self).get_ip6_address(address_type)
4196
4197
4198class HostNode(LinuxHost, OtbrDocker):
4199    is_host = True
4200
4201    def __init__(self, nodeid, name=None, **kwargs):
4202        self.nodeid = nodeid
4203        self.name = name or ('Host%d' % nodeid)
4204        super().__init__(nodeid, **kwargs)
4205        self.bash('service otbr-agent stop')
4206
4207    def start(self, start_radvd=True, prefix=config.DOMAIN_PREFIX, slaac=False):
4208        self._setup_sysctl()
4209        if start_radvd:
4210            self.start_radvd_service(prefix, slaac)
4211        else:
4212            self.stop_radvd_service()
4213
4214    def stop(self):
4215        self.stop_radvd_service()
4216
4217    def get_addrs(self) -> List[str]:
4218        return self.get_ether_addrs()
4219
4220    def __repr__(self):
4221        return f'Host<{self.nodeid}>'
4222
4223    def get_matched_ula_addresses(self, prefix):
4224        """Get the IPv6 addresses that matches given prefix.
4225        """
4226
4227        addrs = []
4228        for addr in self.get_ip6_address(config.ADDRESS_TYPE.ONLINK_ULA):
4229            if IPv6Address(addr) in IPv6Network(prefix):
4230                addrs.append(addr)
4231
4232        return addrs
4233
4234
4235if __name__ == '__main__':
4236    unittest.main()
4237