1#!/usr/bin/env python3
2#
3# Traditional watch command, but with higher resolution updates and a bit
4# different options/output format
5#
6# Example:
7# ./scripts/watch.py -s0.1 date
8#
9# Copyright (c) 2022, The littlefs authors.
10# SPDX-License-Identifier: BSD-3-Clause
11#
12
13import collections as co
14import errno
15import fcntl
16import io
17import os
18import pty
19import re
20import shutil
21import struct
22import subprocess as sp
23import sys
24import termios
25import time
26
27try:
28    import inotify_simple
29except ModuleNotFoundError:
30    inotify_simple = None
31
32
33def openio(path, mode='r', buffering=-1):
34    # allow '-' for stdin/stdout
35    if path == '-':
36        if mode == 'r':
37            return os.fdopen(os.dup(sys.stdin.fileno()), mode, buffering)
38        else:
39            return os.fdopen(os.dup(sys.stdout.fileno()), mode, buffering)
40    else:
41        return open(path, mode, buffering)
42
43def inotifywait(paths):
44    # wait for interesting events
45    inotify = inotify_simple.INotify()
46    flags = (inotify_simple.flags.ATTRIB
47        | inotify_simple.flags.CREATE
48        | inotify_simple.flags.DELETE
49        | inotify_simple.flags.DELETE_SELF
50        | inotify_simple.flags.MODIFY
51        | inotify_simple.flags.MOVED_FROM
52        | inotify_simple.flags.MOVED_TO
53        | inotify_simple.flags.MOVE_SELF)
54
55    # recurse into directories
56    for path in paths:
57        if os.path.isdir(path):
58            for dir, _, files in os.walk(path):
59                inotify.add_watch(dir, flags)
60                for f in files:
61                    inotify.add_watch(os.path.join(dir, f), flags)
62        else:
63            inotify.add_watch(path, flags)
64
65    # wait for event
66    inotify.read()
67
68class LinesIO:
69    def __init__(self, maxlen=None):
70        self.maxlen = maxlen
71        self.lines = co.deque(maxlen=maxlen)
72        self.tail = io.StringIO()
73
74        # trigger automatic sizing
75        if maxlen == 0:
76            self.resize(0)
77
78    def write(self, s):
79        # note using split here ensures the trailing string has no newline
80        lines = s.split('\n')
81
82        if len(lines) > 1 and self.tail.getvalue():
83            self.tail.write(lines[0])
84            lines[0] = self.tail.getvalue()
85            self.tail = io.StringIO()
86
87        self.lines.extend(lines[:-1])
88
89        if lines[-1]:
90            self.tail.write(lines[-1])
91
92    def resize(self, maxlen):
93        self.maxlen = maxlen
94        if maxlen == 0:
95            maxlen = shutil.get_terminal_size((80, 5))[1]
96        if maxlen != self.lines.maxlen:
97            self.lines = co.deque(self.lines, maxlen=maxlen)
98
99    canvas_lines = 1
100    def draw(self):
101        # did terminal size change?
102        if self.maxlen == 0:
103            self.resize(0)
104
105        # first thing first, give ourself a canvas
106        while LinesIO.canvas_lines < len(self.lines):
107            sys.stdout.write('\n')
108            LinesIO.canvas_lines += 1
109
110        # clear the bottom of the canvas if we shrink
111        shrink = LinesIO.canvas_lines - len(self.lines)
112        if shrink > 0:
113            for i in range(shrink):
114                sys.stdout.write('\r')
115                if shrink-1-i > 0:
116                    sys.stdout.write('\x1b[%dA' % (shrink-1-i))
117                sys.stdout.write('\x1b[K')
118                if shrink-1-i > 0:
119                    sys.stdout.write('\x1b[%dB' % (shrink-1-i))
120            sys.stdout.write('\x1b[%dA' % shrink)
121            LinesIO.canvas_lines = len(self.lines)
122
123        for i, line in enumerate(self.lines):
124            # move cursor, clear line, disable/reenable line wrapping
125            sys.stdout.write('\r')
126            if len(self.lines)-1-i > 0:
127                sys.stdout.write('\x1b[%dA' % (len(self.lines)-1-i))
128            sys.stdout.write('\x1b[K')
129            sys.stdout.write('\x1b[?7l')
130            sys.stdout.write(line)
131            sys.stdout.write('\x1b[?7h')
132            if len(self.lines)-1-i > 0:
133                sys.stdout.write('\x1b[%dB' % (len(self.lines)-1-i))
134        sys.stdout.flush()
135
136
137def main(command, *,
138        lines=0,
139        cat=False,
140        sleep=None,
141        keep_open=False,
142        keep_open_paths=None,
143        exit_on_error=False):
144    returncode = 0
145    try:
146        while True:
147            # reset ring each run
148            if cat:
149                ring = sys.stdout
150            else:
151                ring = LinesIO(lines)
152
153            try:
154                # run the command under a pseudoterminal
155                mpty, spty = pty.openpty()
156
157                # forward terminal size
158                w, h = shutil.get_terminal_size((80, 5))
159                if lines:
160                    h = lines
161                fcntl.ioctl(spty, termios.TIOCSWINSZ,
162                    struct.pack('HHHH', h, w, 0, 0))
163
164                proc = sp.Popen(command,
165                    stdout=spty,
166                    stderr=spty,
167                    close_fds=False)
168                os.close(spty)
169                mpty = os.fdopen(mpty, 'r', 1)
170
171                while True:
172                    try:
173                        line = mpty.readline()
174                    except OSError as e:
175                        if e.errno != errno.EIO:
176                            raise
177                        break
178                    if not line:
179                        break
180
181                    ring.write(line)
182                    if not cat:
183                        ring.draw()
184
185                mpty.close()
186                proc.wait()
187                if exit_on_error and proc.returncode != 0:
188                    returncode = proc.returncode
189                    break
190            except OSError as e:
191                if e.errno != errno.ETXTBSY:
192                    raise
193                pass
194
195            # try to inotifywait
196            if keep_open and inotify_simple is not None:
197                if keep_open_paths:
198                    paths = set(keep_paths)
199                else:
200                    # guess inotify paths from command
201                    paths = set()
202                    for p in command:
203                        for p in {
204                                p,
205                                re.sub('^-.', '', p),
206                                re.sub('^--[^=]+=', '', p)}:
207                            if p and os.path.exists(p):
208                                paths.add(p)
209                ptime = time.time()
210                inotifywait(paths)
211                # sleep for a minimum amount of time, this helps issues around
212                # rapidly updating files
213                time.sleep(max(0, (sleep or 0.1) - (time.time()-ptime)))
214            else:
215                time.sleep(sleep or 0.1)
216    except KeyboardInterrupt:
217        pass
218
219    if not cat:
220        sys.stdout.write('\n')
221    sys.exit(returncode)
222
223
224if __name__ == "__main__":
225    import sys
226    import argparse
227    parser = argparse.ArgumentParser(
228        description="Traditional watch command, but with higher resolution "
229            "updates and a bit different options/output format.",
230        allow_abbrev=False)
231    parser.add_argument(
232        'command',
233        nargs=argparse.REMAINDER,
234        help="Command to run.")
235    parser.add_argument(
236        '-n', '--lines',
237        nargs='?',
238        type=lambda x: int(x, 0),
239        const=0,
240        help="Show this many lines of history. 0 uses the terminal height. "
241            "Defaults to 0.")
242    parser.add_argument(
243        '-z', '--cat',
244        action='store_true',
245        help="Pipe directly to stdout.")
246    parser.add_argument(
247        '-s', '--sleep',
248        type=float,
249        help="Seconds to sleep between runs. Defaults to 0.1.")
250    parser.add_argument(
251        '-k', '--keep-open',
252        action='store_true',
253        help="Try to use inotify to wait for changes.")
254    parser.add_argument(
255        '-K', '--keep-open-path',
256        dest='keep_open_paths',
257        action='append',
258        help="Use this path for inotify. Defaults to guessing.")
259    parser.add_argument(
260        '-e', '--exit-on-error',
261        action='store_true',
262        help="Exit on error.")
263    sys.exit(main(**{k: v
264        for k, v in vars(parser.parse_args()).items()
265        if v is not None}))
266