1#!/usr/bin/env python3 2# 3# Copyright (c) 2020, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29import logging 30import subprocess 31import time 32from abc import abstractmethod, ABC 33from typing import Optional 34 35 36class OtCliHandler(ABC): 37 """This abstract class defines interfaces for a OT CLI Handler.""" 38 39 @abstractmethod 40 def readline(self) -> Optional[str]: 41 """Method readline should return the next line read from OT CLI.""" 42 43 @abstractmethod 44 def writeline(self, s: str) -> None: 45 """Method writeline should write a line to the OT CLI. 46 47 It should block until all characters are written to OT CLI. 48 """ 49 50 @abstractmethod 51 def wait(self, duration: float) -> None: 52 """Method wait should wait for a given duration. 53 54 A normal implementation should just call `time.sleep(duration)`. This is intended for proceeding Virtual Time 55 Simulation instances. 56 """ 57 58 @abstractmethod 59 def close(self) -> None: 60 """Method close should close the OT CLI Handler.""" 61 62 63class Simulator(ABC): 64 """This abstract class defines interfaces for a Virtual Time Simulator.""" 65 66 @abstractmethod 67 def go(self, duration: float): 68 """Proceed the simulator for a given duration (in seconds).""" 69 pass 70 71 72class OtCliPopen(OtCliHandler): 73 """Connector for OT CLI process (a Popen instance).""" 74 75 def __init__(self, proc: subprocess.Popen, nodeid: int, simulator: Simulator): 76 self.__otcli_proc = proc 77 self.__nodeid = nodeid 78 self.__simulator = simulator 79 80 def __repr__(self): 81 return 'OTCli<%d>' % self.__nodeid 82 83 def readline(self) -> Optional[str]: 84 assert self.__otcli_proc.stdout is not None 85 return self.__otcli_proc.stdout.readline().rstrip('\r\n') 86 87 def writeline(self, s: str): 88 assert self.__otcli_proc.stdin is not None 89 self.__otcli_proc.stdin.write(s + '\n') 90 self.__otcli_proc.stdin.flush() 91 92 def wait(self, duration: float): 93 if self.__simulator is not None: 94 # Virtual time simulation 95 self.__simulator.go(duration) 96 else: 97 # Real time simulation 98 time.sleep(duration) 99 100 def close(self): 101 assert self.__otcli_proc.stdin is not None 102 assert self.__otcli_proc.stdout is not None 103 self.__otcli_proc.stdin.close() 104 self.__otcli_proc.stdout.close() 105 self.__otcli_proc.wait() 106 107 108class OtCliSim(OtCliPopen): 109 """Connector for OT CLI Simulation instances.""" 110 111 def __init__(self, executable: str, nodeid: int, simulator: Simulator): 112 logging.info('%s: executable=%s', self.__class__.__name__, executable) 113 114 proc = subprocess.Popen(args=[executable, str(nodeid)], 115 executable=executable, 116 stdin=subprocess.PIPE, 117 stdout=subprocess.PIPE, 118 encoding='utf-8', 119 bufsize=1024) 120 super().__init__(proc, nodeid, simulator) 121 122 123class OtNcpSim(OtCliPopen): 124 """Connector for OT NCP Simulation instances.""" 125 126 def __init__(self, executable: str, nodeid: int, simulator: Simulator): 127 logging.info('%s: executable=%s', self.__class__.__name__, executable) 128 129 proc = subprocess.Popen(args=f'spinel-cli.py -p "{executable}" -n {nodeid} 2>&1', 130 stdin=subprocess.PIPE, 131 stdout=subprocess.PIPE, 132 encoding='utf-8', 133 bufsize=1024, 134 shell=True) 135 super().__init__(proc, nodeid, simulator) 136 137 138class OtCliSerial(OtCliHandler): 139 """Connector for OT CLI SOC devices via Serial.""" 140 141 def __init__(self, dev: str, baudrate: int): 142 self.__dev = dev 143 self.__baudrate = baudrate 144 145 import serial 146 self.__serial = serial.Serial(self.__dev, self.__baudrate, timeout=0.1, exclusive=True) 147 self.__linebuffer = b'' 148 149 def __repr__(self): 150 return self.__dev 151 152 def readline(self) -> Optional[str]: 153 while self.__serial.is_open: 154 line = self.__serial.readline() 155 156 if not line.endswith(b'\n'): 157 self.__linebuffer += line 158 else: 159 line = self.__linebuffer + line 160 self.__linebuffer = b'' 161 162 return line.decode('utf-8', errors='ignore').rstrip('\r\n') 163 164 return None 165 166 def writeline(self, s: str): 167 self.__serial.write((s + '\n').encode('utf-8')) 168 169 def wait(self, duration: float): 170 time.sleep(duration) 171 172 def close(self): 173 self.__serial.close() 174