1#!/usr/bin/env python3
2#
3#  Copyright (c) 2020, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29import logging
30import subprocess
31import time
32from abc import abstractmethod, ABC
33from typing import Optional
34
35
36class OtCliHandler(ABC):
37    """This abstract class defines interfaces for a OT CLI Handler."""
38
39    @abstractmethod
40    def readline(self) -> Optional[str]:
41        """Method readline should return the next line read from OT CLI."""
42
43    @abstractmethod
44    def writeline(self, s: str) -> None:
45        """Method writeline should write a line to the OT CLI.
46
47        It should block until all characters are written to OT CLI.
48        """
49
50    @abstractmethod
51    def wait(self, duration: float) -> None:
52        """Method wait should wait for a given duration.
53
54        A normal implementation should just call `time.sleep(duration)`. This is intended for proceeding Virtual Time
55        Simulation instances.
56        """
57
58    @abstractmethod
59    def close(self) -> None:
60        """Method close should close the OT CLI Handler."""
61
62
63class Simulator(ABC):
64    """This abstract class defines interfaces for a Virtual Time Simulator."""
65
66    @abstractmethod
67    def go(self, duration: float):
68        """Proceed the simulator for a given duration (in seconds)."""
69        pass
70
71
72class OtCliPopen(OtCliHandler):
73    """Connector for OT CLI process (a Popen instance)."""
74
75    def __init__(self, proc: subprocess.Popen, nodeid: int, simulator: Simulator):
76        self.__otcli_proc = proc
77        self.__nodeid = nodeid
78        self.__simulator = simulator
79
80    def __repr__(self):
81        return 'OTCli<%d>' % self.__nodeid
82
83    def readline(self) -> Optional[str]:
84        assert self.__otcli_proc.stdout is not None
85        return self.__otcli_proc.stdout.readline().rstrip('\r\n')
86
87    def writeline(self, s: str):
88        assert self.__otcli_proc.stdin is not None
89        self.__otcli_proc.stdin.write(s + '\n')
90        self.__otcli_proc.stdin.flush()
91
92    def wait(self, duration: float):
93        if self.__simulator is not None:
94            # Virtual time simulation
95            self.__simulator.go(duration)
96        else:
97            # Real time simulation
98            time.sleep(duration)
99
100    def close(self):
101        assert self.__otcli_proc.stdin is not None
102        assert self.__otcli_proc.stdout is not None
103        self.__otcli_proc.stdin.close()
104        self.__otcli_proc.stdout.close()
105        self.__otcli_proc.wait()
106
107
108class OtCliSim(OtCliPopen):
109    """Connector for OT CLI Simulation instances."""
110
111    def __init__(self, executable: str, nodeid: int, simulator: Simulator):
112        logging.info('%s: executable=%s', self.__class__.__name__, executable)
113
114        proc = subprocess.Popen(args=[executable, str(nodeid)],
115                                executable=executable,
116                                stdin=subprocess.PIPE,
117                                stdout=subprocess.PIPE,
118                                encoding='utf-8',
119                                bufsize=1024)
120        super().__init__(proc, nodeid, simulator)
121
122
123class OtNcpSim(OtCliPopen):
124    """Connector for OT NCP Simulation instances."""
125
126    def __init__(self, executable: str, nodeid: int, simulator: Simulator):
127        logging.info('%s: executable=%s', self.__class__.__name__, executable)
128
129        proc = subprocess.Popen(args=f'spinel-cli.py -p "{executable}" -n {nodeid} 2>&1',
130                                stdin=subprocess.PIPE,
131                                stdout=subprocess.PIPE,
132                                encoding='utf-8',
133                                bufsize=1024,
134                                shell=True)
135        super().__init__(proc, nodeid, simulator)
136
137
138class OtCliSerial(OtCliHandler):
139    """Connector for OT CLI SOC devices via Serial."""
140
141    def __init__(self, dev: str, baudrate: int):
142        self.__dev = dev
143        self.__baudrate = baudrate
144
145        import serial
146        self.__serial = serial.Serial(self.__dev, self.__baudrate, timeout=0.1, exclusive=True)
147        self.__linebuffer = b''
148
149    def __repr__(self):
150        return self.__dev
151
152    def readline(self) -> Optional[str]:
153        while self.__serial.is_open:
154            line = self.__serial.readline()
155
156            if not line.endswith(b'\n'):
157                self.__linebuffer += line
158            else:
159                line = self.__linebuffer + line
160                self.__linebuffer = b''
161
162                return line.decode('utf-8', errors='ignore').rstrip('\r\n')
163
164        return None
165
166    def writeline(self, s: str):
167        self.__serial.write((s + '\n').encode('utf-8'))
168
169    def wait(self, duration: float):
170        time.sleep(duration)
171
172    def close(self):
173        self.__serial.close()
174