1#!/usr/bin/env python3
2#
3#  Copyright (c) 2020, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29import logging
30import queue
31import re
32import threading
33import time
34from abc import abstractmethod, ABC
35from typing import Any, Callable, Optional, Union, List, Pattern
36
37from .connectors import OtCliHandler
38from .errors import ExpectLineTimeoutError, CommandError
39from .utils import match_line
40
41
42class OTCommandHandler(ABC):
43    """This abstract class defines interfaces of a OT Command Handler."""
44
45    @abstractmethod
46    def execute_command(self, cmd: str, timeout: float) -> List[str]:
47        """Method execute_command should execute the OT CLI command within a timeout (in seconds) and return the
48        command output as a list of lines.
49
50        Note: each line SHOULD NOT contain '\r\n' at the end. The last line of output should be 'Done' or
51        'Error <code>: <msg>' following OT CLI conventions.
52        """
53
54    @abstractmethod
55    def execute_platform_command(self, cmd: str, timeout: float) -> List[str]:
56        """Method execute_platform_command should execute the platform command within a timeout (in seconds) and
57        return the command output as a list of lines.
58
59        Note: each line of the command output MUST NOT contain '\r\n' at the end.
60        """
61
62    @abstractmethod
63    def close(self):
64        """Method close should close the OT Command Handler."""
65
66    @abstractmethod
67    def wait(self, duration: float) -> List[str]:
68        """Method wait should wait for a given duration and return the OT CLI output during this period.
69
70        Normally, OT CLI does not output when it's not executing any command. But OT CLI can also output
71        asynchronously in some cases (e.g. `Join Success` when Joiner joins successfully).
72        """
73
74    @abstractmethod
75    def set_line_read_callback(self, callback: Optional[Callable[[str], Any]]):
76        """Method set_line_read_callback should register a callback that will be called for every line
77        output by the OT CLI.
78
79        This is useful for handling asynchronous command output while still being able to execute
80        other commands.
81        """
82        pass
83
84    def shell(self, cmd: str, timeout: float) -> List[str]:
85        raise NotImplementedError("shell command is not supported on %s" % self.__class__.__name__)
86
87
88class OtCliCommandRunner(OTCommandHandler):
89    __PATTERN_COMMAND_DONE_OR_ERROR = re.compile(
90        r'(Done|Error|Error \d+:.*|.*: command not found)$')  # "Error" for spinel-cli.py
91
92    __PATTERN_LOG_LINE = re.compile(r'((\[(NONE|CRIT|WARN|NOTE|INFO|DEBG)\])'
93                                    r'|(-.*-+: )'  # e.g. -CLI-----:
94                                    r'|(\[[DINWC\-]\] (?=[\w\-]{14}:)\w+-*:)'  # e.g. [I] Mac-----------:
95                                    r')')
96    """regex used to filter logs"""
97
98    assert __PATTERN_LOG_LINE.match('[I] ChannelMonitor: debug log')
99    assert __PATTERN_LOG_LINE.match('[I] Mac-----------: info log')
100    assert __PATTERN_LOG_LINE.match('[N] MeshForwarder-: note    log')
101    assert __PATTERN_LOG_LINE.match('[W] Notifier------: warn log')
102    assert __PATTERN_LOG_LINE.match('[C] Mle-----------: critical log')
103    assert __PATTERN_LOG_LINE.match('[-] Settings------: none log')
104    assert not __PATTERN_LOG_LINE.match('[-] Settings-----: none log')  # not enough `-` after module name
105
106    __ASYNC_COMMANDS = {'scan', 'ping', 'discover'}
107
108    def __init__(self, otcli: OtCliHandler, is_spinel_cli=False):
109        self.__otcli: OtCliHandler = otcli
110        self.__is_spinel_cli = is_spinel_cli
111        self.__expect_command_echoback = not self.__is_spinel_cli
112        self.__line_read_callback = None
113
114        self.__pending_lines = queue.Queue()
115        self.__should_close = threading.Event()
116        self.__otcli_reader = threading.Thread(target=self.__otcli_read_routine, daemon=True)
117        self.__otcli_reader.start()
118
119    def __repr__(self):
120        return repr(self.__otcli)
121
122    def execute_command(self, cmd, timeout=10) -> List[str]:
123        assert not self.__should_close.is_set(), "OT CLI is already closed."
124        self.__otcli.writeline(cmd)
125
126        if cmd in ('reset', 'factoryreset'):
127            self.wait(3)
128            self.__otcli.writeline('extaddr')
129            self.wait(1)
130            return []
131
132        if self.__expect_command_echoback:
133            self.__expect_line(timeout, cmd)
134
135        output = self.__expect_line(timeout,
136                                    OtCliCommandRunner.__PATTERN_COMMAND_DONE_OR_ERROR,
137                                    asynchronous=cmd.split()[0] in OtCliCommandRunner.__ASYNC_COMMANDS)
138        return output
139
140    def execute_platform_command(self, cmd, timeout=10) -> List[str]:
141        raise NotImplementedError(f'Platform command is not supported on {self.__class__.__name__}')
142
143    def wait(self, duration: float) -> List[str]:
144        self.__otcli.wait(duration)
145
146        output = []
147        try:
148            while True:
149                line = self.__pending_lines.get_nowait()
150                output.append(line)
151
152        except queue.Empty:
153            pass
154
155        return output
156
157    def close(self):
158        self.__should_close.set()
159        self.__otcli.close()
160        self.__otcli_reader.join()
161
162    def set_line_read_callback(self, callback: Optional[Callable[[str], Any]]):
163        self.__line_read_callback = callback
164
165    #
166    # Private methods
167    #
168
169    def __expect_line(self, timeout: float, expect_line: Union[str, Pattern], asynchronous=False) -> List[str]:
170        output = []
171
172        if not asynchronous:
173            while True:
174                try:
175                    line = self.__pending_lines.get(timeout=timeout)
176                except queue.Empty:
177                    raise ExpectLineTimeoutError(expect_line)
178
179                output.append(line)
180
181                if match_line(line, expect_line):
182                    break
183        else:
184            done = False
185            while not done and timeout > 0:
186                lines = self.wait(1)
187                timeout -= 1
188
189                for line in lines:
190                    output.append(line)
191
192                    if match_line(line, expect_line):
193                        done = True
194                        break
195
196            if not done:
197                raise ExpectLineTimeoutError(expect_line)
198
199        return output
200
201    def __otcli_read_routine(self):
202        while not self.__should_close.is_set():
203            try:
204                line = self.__otcli.readline()
205            except Exception:
206                if self.__should_close.is_set():
207                    break
208                else:
209                    raise
210
211            logging.debug('%s: %r', self.__otcli, line)
212
213            if line is None:
214                break
215
216            if line.startswith('> '):
217                line = line[2:]
218
219            if self.__line_read_callback is not None:
220                self.__line_read_callback(line)
221
222            logging.debug('%s: %s', self.__otcli, line)
223
224            if not OtCliCommandRunner.__PATTERN_LOG_LINE.match(line):
225                self.__pending_lines.put(line)
226
227
228class OtbrSshCommandRunner(OTCommandHandler):
229
230    def __init__(self, host, port, username, password, sudo):
231        import paramiko
232
233        self.__host = host
234        self.__port = port
235        self.__sudo = sudo
236        self.__ssh = paramiko.SSHClient()
237        self.__ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
238
239        self.__line_read_callback = None
240
241        try:
242            self.__ssh.connect(host,
243                               port=port,
244                               username=username,
245                               password=password,
246                               allow_agent=False,
247                               look_for_keys=False)
248        except paramiko.ssh_exception.AuthenticationException:
249            if not password:
250                transport = self.__ssh.get_transport()
251                assert transport is not None
252                transport.auth_none(username)
253            else:
254                raise
255
256    def __repr__(self):
257        return f'{self.__host}:{self.__port}'
258
259    def execute_command(self, cmd: str, timeout: float) -> List[str]:
260        sh_cmd = f'ot-ctl {cmd}'
261        if self.__sudo:
262            sh_cmd = 'sudo ' + sh_cmd
263
264        output = self.shell(sh_cmd, timeout=timeout)
265
266        if self.__line_read_callback is not None:
267            for line in output:
268                self.__line_read_callback(line)
269
270        if cmd in ('reset', 'factoryreset'):
271            self.wait(3)
272
273        return output
274
275    def execute_platform_command(self, cmd, timeout=10) -> List[str]:
276        if self.__sudo:
277            cmd = 'sudo ' + cmd
278
279        return self.shell(cmd, timeout=timeout)
280
281    def shell(self, cmd: str, timeout: float) -> List[str]:
282        cmd_in, cmd_out, cmd_err = self.__ssh.exec_command(cmd, timeout=int(timeout), bufsize=1024)
283        errput = [l.rstrip('\r\n') for l in cmd_err.readlines()]
284        output = [l.rstrip('\r\n') for l in cmd_out.readlines()]
285
286        if errput:
287            raise CommandError(cmd, errput)
288
289        return output
290
291    def close(self):
292        self.__ssh.close()
293
294    def wait(self, duration: float) -> List[str]:
295        time.sleep(duration)
296        return []
297
298    def set_line_read_callback(self, callback: Optional[Callable[[str], Any]]):
299        self.__line_read_callback = callback
300
301
302class OtbrAdbCommandRunner(OTCommandHandler):
303
304    from adb_shell.adb_device import AdbDevice
305
306    def __init__(self, adb: AdbDevice):
307        self.__adb = adb
308        self.__line_read_callback = None
309        self.__adb.connect(rsa_keys=None, auth_timeout_s=0.1)
310
311    def execute_command(self, cmd: str, timeout: float) -> List[str]:
312        sh_cmd = f'ot-ctl {cmd}'
313
314        output = self.shell(sh_cmd, timeout=timeout)
315
316        if self.__line_read_callback is not None:
317            for line in output:
318                self.__line_read_callback(line)
319
320        if cmd in ('reset', 'factoryreset'):
321            self.wait(3)
322
323        return output
324
325    def execute_platform_command(self, cmd: str, timeout: float = 10) -> List[str]:
326        return self.shell(cmd, timeout=timeout)
327
328    def shell(self, cmd: str, timeout: float) -> List[str]:
329        return self.__adb.shell(cmd, transport_timeout_s=timeout, read_timeout_s=timeout,
330                                timeout_s=timeout).splitlines()
331
332    def close(self):
333        self.__adb.close()
334
335    def wait(self, duration: float) -> List[str]:
336        time.sleep(duration)
337        return []
338
339    def set_line_read_callback(self, callback: Optional[Callable[[str], Any]]):
340        self.__line_read_callback = callback
341
342
343class OtbrAdbTcpCommandRunner(OtbrAdbCommandRunner):
344
345    def __init__(self, host: str, port: int):
346        from adb_shell.adb_device import AdbDeviceTcp
347
348        self.__host = host
349        self.__port = port
350
351        adb = AdbDeviceTcp(host, port, default_transport_timeout_s=9.0)
352        super(OtbrAdbTcpCommandRunner, self).__init__(adb)
353
354    def __repr__(self):
355        return f'{self.__host}:{self.__port}'
356
357
358class OtbrAdbUsbCommandRunner(OtbrAdbCommandRunner):
359
360    def __init__(self, serial: str):
361        from adb_shell.adb_device import AdbDeviceUsb
362
363        self.__serial = serial
364
365        adb = AdbDeviceUsb(serial, port_path=None, default_transport_timeout_s=9.0)
366        super(OtbrAdbUsbCommandRunner, self).__init__(adb)
367
368    def __repr__(self):
369        return f'USB:{self.__serial}'
370