#!/usr/bin/env python3 # # Traditional watch command, but with higher resolution updates and a bit # different options/output format # # Example: # ./scripts/watch.py -s0.1 date # # Copyright (c) 2022, The littlefs authors. # SPDX-License-Identifier: BSD-3-Clause # import collections as co import errno import fcntl import io import os import pty import re import shutil import struct import subprocess as sp import sys import termios import time try: import inotify_simple except ModuleNotFoundError: inotify_simple = None def openio(path, mode='r', buffering=-1): # allow '-' for stdin/stdout if path == '-': if mode == 'r': return os.fdopen(os.dup(sys.stdin.fileno()), mode, buffering) else: return os.fdopen(os.dup(sys.stdout.fileno()), mode, buffering) else: return open(path, mode, buffering) def inotifywait(paths): # wait for interesting events inotify = inotify_simple.INotify() flags = (inotify_simple.flags.ATTRIB | inotify_simple.flags.CREATE | inotify_simple.flags.DELETE | inotify_simple.flags.DELETE_SELF | inotify_simple.flags.MODIFY | inotify_simple.flags.MOVED_FROM | inotify_simple.flags.MOVED_TO | inotify_simple.flags.MOVE_SELF) # recurse into directories for path in paths: if os.path.isdir(path): for dir, _, files in os.walk(path): inotify.add_watch(dir, flags) for f in files: inotify.add_watch(os.path.join(dir, f), flags) else: inotify.add_watch(path, flags) # wait for event inotify.read() class LinesIO: def __init__(self, maxlen=None): self.maxlen = maxlen self.lines = co.deque(maxlen=maxlen) self.tail = io.StringIO() # trigger automatic sizing if maxlen == 0: self.resize(0) def write(self, s): # note using split here ensures the trailing string has no newline lines = s.split('\n') if len(lines) > 1 and self.tail.getvalue(): self.tail.write(lines[0]) lines[0] = self.tail.getvalue() self.tail = io.StringIO() self.lines.extend(lines[:-1]) if lines[-1]: self.tail.write(lines[-1]) def resize(self, maxlen): self.maxlen = maxlen if maxlen == 0: maxlen = shutil.get_terminal_size((80, 5))[1] if maxlen != self.lines.maxlen: self.lines = co.deque(self.lines, maxlen=maxlen) canvas_lines = 1 def draw(self): # did terminal size change? if self.maxlen == 0: self.resize(0) # first thing first, give ourself a canvas while LinesIO.canvas_lines < len(self.lines): sys.stdout.write('\n') LinesIO.canvas_lines += 1 # clear the bottom of the canvas if we shrink shrink = LinesIO.canvas_lines - len(self.lines) if shrink > 0: for i in range(shrink): sys.stdout.write('\r') if shrink-1-i > 0: sys.stdout.write('\x1b[%dA' % (shrink-1-i)) sys.stdout.write('\x1b[K') if shrink-1-i > 0: sys.stdout.write('\x1b[%dB' % (shrink-1-i)) sys.stdout.write('\x1b[%dA' % shrink) LinesIO.canvas_lines = len(self.lines) for i, line in enumerate(self.lines): # move cursor, clear line, disable/reenable line wrapping sys.stdout.write('\r') if len(self.lines)-1-i > 0: sys.stdout.write('\x1b[%dA' % (len(self.lines)-1-i)) sys.stdout.write('\x1b[K') sys.stdout.write('\x1b[?7l') sys.stdout.write(line) sys.stdout.write('\x1b[?7h') if len(self.lines)-1-i > 0: sys.stdout.write('\x1b[%dB' % (len(self.lines)-1-i)) sys.stdout.flush() def main(command, *, lines=0, cat=False, sleep=None, keep_open=False, keep_open_paths=None, exit_on_error=False): returncode = 0 try: while True: # reset ring each run if cat: ring = sys.stdout else: ring = LinesIO(lines) try: # run the command under a pseudoterminal mpty, spty = pty.openpty() # forward terminal size w, h = shutil.get_terminal_size((80, 5)) if lines: h = lines fcntl.ioctl(spty, termios.TIOCSWINSZ, struct.pack('HHHH', h, w, 0, 0)) proc = sp.Popen(command, stdout=spty, stderr=spty, close_fds=False) os.close(spty) mpty = os.fdopen(mpty, 'r', 1) while True: try: line = mpty.readline() except OSError as e: if e.errno != errno.EIO: raise break if not line: break ring.write(line) if not cat: ring.draw() mpty.close() proc.wait() if exit_on_error and proc.returncode != 0: returncode = proc.returncode break except OSError as e: if e.errno != errno.ETXTBSY: raise pass # try to inotifywait if keep_open and inotify_simple is not None: if keep_open_paths: paths = set(keep_paths) else: # guess inotify paths from command paths = set() for p in command: for p in { p, re.sub('^-.', '', p), re.sub('^--[^=]+=', '', p)}: if p and os.path.exists(p): paths.add(p) ptime = time.time() inotifywait(paths) # sleep for a minimum amount of time, this helps issues around # rapidly updating files time.sleep(max(0, (sleep or 0.1) - (time.time()-ptime))) else: time.sleep(sleep or 0.1) except KeyboardInterrupt: pass if not cat: sys.stdout.write('\n') sys.exit(returncode) if __name__ == "__main__": import sys import argparse parser = argparse.ArgumentParser( description="Traditional watch command, but with higher resolution " "updates and a bit different options/output format.", allow_abbrev=False) parser.add_argument( 'command', nargs=argparse.REMAINDER, help="Command to run.") parser.add_argument( '-n', '--lines', nargs='?', type=lambda x: int(x, 0), const=0, help="Show this many lines of history. 0 uses the terminal height. " "Defaults to 0.") parser.add_argument( '-z', '--cat', action='store_true', help="Pipe directly to stdout.") parser.add_argument( '-s', '--sleep', type=float, help="Seconds to sleep between runs. Defaults to 0.1.") parser.add_argument( '-k', '--keep-open', action='store_true', help="Try to use inotify to wait for changes.") parser.add_argument( '-K', '--keep-open-path', dest='keep_open_paths', action='append', help="Use this path for inotify. Defaults to guessing.") parser.add_argument( '-e', '--exit-on-error', action='store_true', help="Exit on error.") sys.exit(main(**{k: v for k, v in vars(parser.parse_args()).items() if v is not None}))