1# Copyright 2015-2021 Espressif Systems (Shanghai) CO LTD 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15 16import os 17import queue 18import time 19 20from serial.tools.miniterm import Console 21 22from .console_parser import ConsoleParser 23from .constants import CMD_STOP, TAG_CMD 24from .stoppable_thread import StoppableThread 25 26 27class ConsoleReader(StoppableThread): 28 """ Read input keys from the console and push them to the queue, 29 until stopped. 30 """ 31 32 def __init__(self, console, event_queue, cmd_queue, parser, test_mode): 33 # type: (Console, queue.Queue, queue.Queue, ConsoleParser, bool) -> None 34 super(ConsoleReader, self).__init__() 35 self.console = console 36 self.event_queue = event_queue 37 self.cmd_queue = cmd_queue 38 self.parser = parser 39 self.test_mode = test_mode 40 41 def run(self): 42 # type: () -> None 43 self.console.setup() 44 try: 45 while self.alive: 46 try: 47 if os.name == 'nt': 48 # Windows kludge: because the console.cancel() method doesn't 49 # seem to work to unblock getkey() on the Windows implementation. 50 # 51 # So we only call getkey() if we know there's a key waiting for us. 52 import msvcrt 53 while not msvcrt.kbhit() and self.alive: # type: ignore 54 time.sleep(0.1) 55 if not self.alive: 56 break 57 elif self.test_mode: 58 # In testing mode the stdin is connected to PTY but is not used for input anything. For PTY 59 # the canceling by fcntl.ioctl isn't working and would hang in self.console.getkey(). 60 # Therefore, we avoid calling it. 61 while self.alive: 62 time.sleep(0.1) 63 break 64 c = self.console.getkey() 65 except KeyboardInterrupt: 66 c = '\x03' 67 if c is not None: 68 ret = self.parser.parse(c) 69 if ret is not None: 70 (tag, cmd) = ret 71 # stop command should be executed last 72 if tag == TAG_CMD and cmd != CMD_STOP: 73 self.cmd_queue.put(ret) 74 else: 75 self.event_queue.put(ret) 76 77 finally: 78 self.console.cleanup() 79 80 def _cancel(self): 81 # type: () -> None 82 if os.name == 'posix' and not self.test_mode: 83 # this is the way cancel() is implemented in pyserial 3.3 or newer, 84 # older pyserial (3.1+) has cancellation implemented via 'select', 85 # which does not work when console sends an escape sequence response 86 # 87 # even older pyserial (<3.1) does not have this method 88 # 89 # on Windows there is a different (also hacky) fix, applied above. 90 # 91 # note that TIOCSTI is not implemented in WSL / bash-on-Windows. 92 # TODO: introduce some workaround to make it work there. 93 # 94 # Note: This would throw exception in testing mode when the stdin is connected to PTY. 95 import fcntl 96 import termios 97 fcntl.ioctl(self.console.fd, termios.TIOCSTI, b'\0') 98