1#!/usr/bin/env python3 2# 3# Copyright (c) 2020, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29import logging 30import queue 31import re 32import threading 33import time 34from abc import abstractmethod, ABC 35from typing import Any, Callable, Optional, Union, List, Pattern 36 37from .connectors import OtCliHandler 38from .errors import ExpectLineTimeoutError, CommandError 39from .utils import match_line 40 41 42class OTCommandHandler(ABC): 43 """This abstract class defines interfaces of a OT Command Handler.""" 44 45 @abstractmethod 46 def execute_command(self, cmd: str, timeout: float) -> List[str]: 47 """Method execute_command should execute the OT CLI command within a timeout (in seconds) and return the 48 command output as a list of lines. 49 50 Note: each line SHOULD NOT contain '\r\n' at the end. The last line of output should be 'Done' or 51 'Error <code>: <msg>' following OT CLI conventions. 52 """ 53 54 @abstractmethod 55 def execute_platform_command(self, cmd: str, timeout: float) -> List[str]: 56 """Method execute_platform_command should execute the platform command within a timeout (in seconds) and 57 return the command output as a list of lines. 58 59 Note: each line of the command output MUST NOT contain '\r\n' at the end. 60 """ 61 62 @abstractmethod 63 def close(self): 64 """Method close should close the OT Command Handler.""" 65 66 @abstractmethod 67 def wait(self, duration: float) -> List[str]: 68 """Method wait should wait for a given duration and return the OT CLI output during this period. 69 70 Normally, OT CLI does not output when it's not executing any command. But OT CLI can also output 71 asynchronously in some cases (e.g. `Join Success` when Joiner joins successfully). 72 """ 73 74 @abstractmethod 75 def set_line_read_callback(self, callback: Optional[Callable[[str], Any]]): 76 """Method set_line_read_callback should register a callback that will be called for every line 77 output by the OT CLI. 78 79 This is useful for handling asynchronous command output while still being able to execute 80 other commands. 81 """ 82 pass 83 84 def shell(self, cmd: str, timeout: float) -> List[str]: 85 raise NotImplementedError("shell command is not supported on %s" % self.__class__.__name__) 86 87 88class OtCliCommandRunner(OTCommandHandler): 89 __PATTERN_COMMAND_DONE_OR_ERROR = re.compile( 90 r'(Done|Error|Error \d+:.*|.*: command not found)$') # "Error" for spinel-cli.py 91 92 __PATTERN_LOG_LINE = re.compile(r'((\[(NONE|CRIT|WARN|NOTE|INFO|DEBG)\])' 93 r'|(-.*-+: )' # e.g. -CLI-----: 94 r'|(\[[DINWC\-]\] (?=[\w\-]{14}:)\w+-*:)' # e.g. [I] Mac-----------: 95 r')') 96 """regex used to filter logs""" 97 98 assert __PATTERN_LOG_LINE.match('[I] ChannelMonitor: debug log') 99 assert __PATTERN_LOG_LINE.match('[I] Mac-----------: info log') 100 assert __PATTERN_LOG_LINE.match('[N] MeshForwarder-: note log') 101 assert __PATTERN_LOG_LINE.match('[W] Notifier------: warn log') 102 assert __PATTERN_LOG_LINE.match('[C] Mle-----------: critical log') 103 assert __PATTERN_LOG_LINE.match('[-] Settings------: none log') 104 assert not __PATTERN_LOG_LINE.match('[-] Settings-----: none log') # not enough `-` after module name 105 106 __ASYNC_COMMANDS = {'scan', 'ping', 'discover'} 107 108 def __init__(self, otcli: OtCliHandler, is_spinel_cli=False): 109 self.__otcli: OtCliHandler = otcli 110 self.__is_spinel_cli = is_spinel_cli 111 self.__expect_command_echoback = not self.__is_spinel_cli 112 self.__line_read_callback = None 113 114 self.__pending_lines = queue.Queue() 115 self.__should_close = threading.Event() 116 self.__otcli_reader = threading.Thread(target=self.__otcli_read_routine, daemon=True) 117 self.__otcli_reader.start() 118 119 def __repr__(self): 120 return repr(self.__otcli) 121 122 def execute_command(self, cmd, timeout=10) -> List[str]: 123 assert not self.__should_close.is_set(), "OT CLI is already closed." 124 self.__otcli.writeline(cmd) 125 126 if cmd in ('reset', 'factoryreset'): 127 self.wait(3) 128 self.__otcli.writeline('extaddr') 129 self.wait(1) 130 return [] 131 132 if self.__expect_command_echoback: 133 self.__expect_line(timeout, cmd) 134 135 output = self.__expect_line(timeout, 136 OtCliCommandRunner.__PATTERN_COMMAND_DONE_OR_ERROR, 137 asynchronous=cmd.split()[0] in OtCliCommandRunner.__ASYNC_COMMANDS) 138 return output 139 140 def execute_platform_command(self, cmd, timeout=10) -> List[str]: 141 raise NotImplementedError(f'Platform command is not supported on {self.__class__.__name__}') 142 143 def wait(self, duration: float) -> List[str]: 144 self.__otcli.wait(duration) 145 146 output = [] 147 try: 148 while True: 149 line = self.__pending_lines.get_nowait() 150 output.append(line) 151 152 except queue.Empty: 153 pass 154 155 return output 156 157 def close(self): 158 self.__should_close.set() 159 self.__otcli.close() 160 self.__otcli_reader.join() 161 162 def set_line_read_callback(self, callback: Optional[Callable[[str], Any]]): 163 self.__line_read_callback = callback 164 165 # 166 # Private methods 167 # 168 169 def __expect_line(self, timeout: float, expect_line: Union[str, Pattern], asynchronous=False) -> List[str]: 170 output = [] 171 172 if not asynchronous: 173 while True: 174 try: 175 line = self.__pending_lines.get(timeout=timeout) 176 except queue.Empty: 177 raise ExpectLineTimeoutError(expect_line) 178 179 output.append(line) 180 181 if match_line(line, expect_line): 182 break 183 else: 184 done = False 185 while not done and timeout > 0: 186 lines = self.wait(1) 187 timeout -= 1 188 189 for line in lines: 190 output.append(line) 191 192 if match_line(line, expect_line): 193 done = True 194 break 195 196 if not done: 197 raise ExpectLineTimeoutError(expect_line) 198 199 return output 200 201 def __otcli_read_routine(self): 202 while not self.__should_close.is_set(): 203 try: 204 line = self.__otcli.readline() 205 except Exception: 206 if self.__should_close.is_set(): 207 break 208 else: 209 raise 210 211 logging.debug('%s: %r', self.__otcli, line) 212 213 if line is None: 214 break 215 216 if line.startswith('> '): 217 line = line[2:] 218 219 if self.__line_read_callback is not None: 220 self.__line_read_callback(line) 221 222 logging.debug('%s: %s', self.__otcli, line) 223 224 if not OtCliCommandRunner.__PATTERN_LOG_LINE.match(line): 225 self.__pending_lines.put(line) 226 227 228class OtbrSshCommandRunner(OTCommandHandler): 229 230 def __init__(self, host, port, username, password, sudo): 231 import paramiko 232 233 self.__host = host 234 self.__port = port 235 self.__sudo = sudo 236 self.__ssh = paramiko.SSHClient() 237 self.__ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 238 239 self.__line_read_callback = None 240 241 try: 242 self.__ssh.connect(host, 243 port=port, 244 username=username, 245 password=password, 246 allow_agent=False, 247 look_for_keys=False) 248 except paramiko.ssh_exception.AuthenticationException: 249 if not password: 250 transport = self.__ssh.get_transport() 251 assert transport is not None 252 transport.auth_none(username) 253 else: 254 raise 255 256 def __repr__(self): 257 return f'{self.__host}:{self.__port}' 258 259 def execute_command(self, cmd: str, timeout: float) -> List[str]: 260 sh_cmd = f'ot-ctl {cmd}' 261 if self.__sudo: 262 sh_cmd = 'sudo ' + sh_cmd 263 264 output = self.shell(sh_cmd, timeout=timeout) 265 266 if self.__line_read_callback is not None: 267 for line in output: 268 self.__line_read_callback(line) 269 270 if cmd in ('reset', 'factoryreset'): 271 self.wait(3) 272 273 return output 274 275 def execute_platform_command(self, cmd, timeout=10) -> List[str]: 276 if self.__sudo: 277 cmd = 'sudo ' + cmd 278 279 return self.shell(cmd, timeout=timeout) 280 281 def shell(self, cmd: str, timeout: float) -> List[str]: 282 cmd_in, cmd_out, cmd_err = self.__ssh.exec_command(cmd, timeout=int(timeout), bufsize=1024) 283 errput = [l.rstrip('\r\n') for l in cmd_err.readlines()] 284 output = [l.rstrip('\r\n') for l in cmd_out.readlines()] 285 286 if errput: 287 raise CommandError(cmd, errput) 288 289 return output 290 291 def close(self): 292 self.__ssh.close() 293 294 def wait(self, duration: float) -> List[str]: 295 time.sleep(duration) 296 return [] 297 298 def set_line_read_callback(self, callback: Optional[Callable[[str], Any]]): 299 self.__line_read_callback = callback 300 301 302class OtbrAdbCommandRunner(OTCommandHandler): 303 304 from adb_shell.adb_device import AdbDevice 305 306 def __init__(self, adb: AdbDevice): 307 self.__adb = adb 308 self.__line_read_callback = None 309 self.__adb.connect(rsa_keys=None, auth_timeout_s=0.1) 310 311 def execute_command(self, cmd: str, timeout: float) -> List[str]: 312 sh_cmd = f'ot-ctl {cmd}' 313 314 output = self.shell(sh_cmd, timeout=timeout) 315 316 if self.__line_read_callback is not None: 317 for line in output: 318 self.__line_read_callback(line) 319 320 if cmd in ('reset', 'factoryreset'): 321 self.wait(3) 322 323 return output 324 325 def execute_platform_command(self, cmd: str, timeout: float = 10) -> List[str]: 326 return self.shell(cmd, timeout=timeout) 327 328 def shell(self, cmd: str, timeout: float) -> List[str]: 329 return self.__adb.shell(cmd, transport_timeout_s=timeout, read_timeout_s=timeout, 330 timeout_s=timeout).splitlines() 331 332 def close(self): 333 self.__adb.close() 334 335 def wait(self, duration: float) -> List[str]: 336 time.sleep(duration) 337 return [] 338 339 def set_line_read_callback(self, callback: Optional[Callable[[str], Any]]): 340 self.__line_read_callback = callback 341 342 343class OtbrAdbTcpCommandRunner(OtbrAdbCommandRunner): 344 345 def __init__(self, host: str, port: int): 346 from adb_shell.adb_device import AdbDeviceTcp 347 348 self.__host = host 349 self.__port = port 350 351 adb = AdbDeviceTcp(host, port, default_transport_timeout_s=9.0) 352 super(OtbrAdbTcpCommandRunner, self).__init__(adb) 353 354 def __repr__(self): 355 return f'{self.__host}:{self.__port}' 356 357 358class OtbrAdbUsbCommandRunner(OtbrAdbCommandRunner): 359 360 def __init__(self, serial: str): 361 from adb_shell.adb_device import AdbDeviceUsb 362 363 self.__serial = serial 364 365 adb = AdbDeviceUsb(serial, port_path=None, default_transport_timeout_s=9.0) 366 super(OtbrAdbUsbCommandRunner, self).__init__(adb) 367 368 def __repr__(self): 369 return f'USB:{self.__serial}' 370