1# Copyright 2015-2021 Espressif Systems (Shanghai) CO LTD
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15
16import os
17import queue
18import time
19
20from serial.tools.miniterm import Console
21
22from .console_parser import ConsoleParser
23from .constants import CMD_STOP, TAG_CMD
24from .stoppable_thread import StoppableThread
25
26
27class ConsoleReader(StoppableThread):
28    """ Read input keys from the console and push them to the queue,
29    until stopped.
30    """
31
32    def __init__(self, console, event_queue, cmd_queue, parser, test_mode):
33        # type: (Console, queue.Queue, queue.Queue, ConsoleParser, bool) -> None
34        super(ConsoleReader, self).__init__()
35        self.console = console
36        self.event_queue = event_queue
37        self.cmd_queue = cmd_queue
38        self.parser = parser
39        self.test_mode = test_mode
40
41    def run(self):
42        # type: () -> None
43        self.console.setup()
44        try:
45            while self.alive:
46                try:
47                    if os.name == 'nt':
48                        # Windows kludge: because the console.cancel() method doesn't
49                        # seem to work to unblock getkey() on the Windows implementation.
50                        #
51                        # So we only call getkey() if we know there's a key waiting for us.
52                        import msvcrt
53                        while not msvcrt.kbhit() and self.alive:  # type: ignore
54                            time.sleep(0.1)
55                        if not self.alive:
56                            break
57                    elif self.test_mode:
58                        # In testing mode the stdin is connected to PTY but is not used for input anything. For PTY
59                        # the canceling by fcntl.ioctl isn't working and would hang in self.console.getkey().
60                        # Therefore, we avoid calling it.
61                        while self.alive:
62                            time.sleep(0.1)
63                        break
64                    c = self.console.getkey()
65                except KeyboardInterrupt:
66                    c = '\x03'
67                if c is not None:
68                    ret = self.parser.parse(c)
69                    if ret is not None:
70                        (tag, cmd) = ret
71                        # stop command should be executed last
72                        if tag == TAG_CMD and cmd != CMD_STOP:
73                            self.cmd_queue.put(ret)
74                        else:
75                            self.event_queue.put(ret)
76
77        finally:
78            self.console.cleanup()
79
80    def _cancel(self):
81        # type: () -> None
82        if os.name == 'posix' and not self.test_mode:
83            # this is the way cancel() is implemented in pyserial 3.3 or newer,
84            # older pyserial (3.1+) has cancellation implemented via 'select',
85            # which does not work when console sends an escape sequence response
86            #
87            # even older pyserial (<3.1) does not have this method
88            #
89            # on Windows there is a different (also hacky) fix, applied above.
90            #
91            # note that TIOCSTI is not implemented in WSL / bash-on-Windows.
92            # TODO: introduce some workaround to make it work there.
93            #
94            # Note: This would throw exception in testing mode when the stdin is connected to PTY.
95            import fcntl
96            import termios
97            fcntl.ioctl(self.console.fd, termios.TIOCSTI, b'\0')
98