1#!/usr/bin/env python3 2# 3# Traditional watch command, but with higher resolution updates and a bit 4# different options/output format 5# 6# Example: 7# ./scripts/watch.py -s0.1 date 8# 9# Copyright (c) 2022, The littlefs authors. 10# SPDX-License-Identifier: BSD-3-Clause 11# 12 13import collections as co 14import errno 15import fcntl 16import io 17import os 18import pty 19import re 20import shutil 21import struct 22import subprocess as sp 23import sys 24import termios 25import time 26 27try: 28 import inotify_simple 29except ModuleNotFoundError: 30 inotify_simple = None 31 32 33def openio(path, mode='r', buffering=-1): 34 # allow '-' for stdin/stdout 35 if path == '-': 36 if mode == 'r': 37 return os.fdopen(os.dup(sys.stdin.fileno()), mode, buffering) 38 else: 39 return os.fdopen(os.dup(sys.stdout.fileno()), mode, buffering) 40 else: 41 return open(path, mode, buffering) 42 43def inotifywait(paths): 44 # wait for interesting events 45 inotify = inotify_simple.INotify() 46 flags = (inotify_simple.flags.ATTRIB 47 | inotify_simple.flags.CREATE 48 | inotify_simple.flags.DELETE 49 | inotify_simple.flags.DELETE_SELF 50 | inotify_simple.flags.MODIFY 51 | inotify_simple.flags.MOVED_FROM 52 | inotify_simple.flags.MOVED_TO 53 | inotify_simple.flags.MOVE_SELF) 54 55 # recurse into directories 56 for path in paths: 57 if os.path.isdir(path): 58 for dir, _, files in os.walk(path): 59 inotify.add_watch(dir, flags) 60 for f in files: 61 inotify.add_watch(os.path.join(dir, f), flags) 62 else: 63 inotify.add_watch(path, flags) 64 65 # wait for event 66 inotify.read() 67 68class LinesIO: 69 def __init__(self, maxlen=None): 70 self.maxlen = maxlen 71 self.lines = co.deque(maxlen=maxlen) 72 self.tail = io.StringIO() 73 74 # trigger automatic sizing 75 if maxlen == 0: 76 self.resize(0) 77 78 def write(self, s): 79 # note using split here ensures the trailing string has no newline 80 lines = s.split('\n') 81 82 if len(lines) > 1 and self.tail.getvalue(): 83 self.tail.write(lines[0]) 84 lines[0] = self.tail.getvalue() 85 self.tail = io.StringIO() 86 87 self.lines.extend(lines[:-1]) 88 89 if lines[-1]: 90 self.tail.write(lines[-1]) 91 92 def resize(self, maxlen): 93 self.maxlen = maxlen 94 if maxlen == 0: 95 maxlen = shutil.get_terminal_size((80, 5))[1] 96 if maxlen != self.lines.maxlen: 97 self.lines = co.deque(self.lines, maxlen=maxlen) 98 99 canvas_lines = 1 100 def draw(self): 101 # did terminal size change? 102 if self.maxlen == 0: 103 self.resize(0) 104 105 # first thing first, give ourself a canvas 106 while LinesIO.canvas_lines < len(self.lines): 107 sys.stdout.write('\n') 108 LinesIO.canvas_lines += 1 109 110 # clear the bottom of the canvas if we shrink 111 shrink = LinesIO.canvas_lines - len(self.lines) 112 if shrink > 0: 113 for i in range(shrink): 114 sys.stdout.write('\r') 115 if shrink-1-i > 0: 116 sys.stdout.write('\x1b[%dA' % (shrink-1-i)) 117 sys.stdout.write('\x1b[K') 118 if shrink-1-i > 0: 119 sys.stdout.write('\x1b[%dB' % (shrink-1-i)) 120 sys.stdout.write('\x1b[%dA' % shrink) 121 LinesIO.canvas_lines = len(self.lines) 122 123 for i, line in enumerate(self.lines): 124 # move cursor, clear line, disable/reenable line wrapping 125 sys.stdout.write('\r') 126 if len(self.lines)-1-i > 0: 127 sys.stdout.write('\x1b[%dA' % (len(self.lines)-1-i)) 128 sys.stdout.write('\x1b[K') 129 sys.stdout.write('\x1b[?7l') 130 sys.stdout.write(line) 131 sys.stdout.write('\x1b[?7h') 132 if len(self.lines)-1-i > 0: 133 sys.stdout.write('\x1b[%dB' % (len(self.lines)-1-i)) 134 sys.stdout.flush() 135 136 137def main(command, *, 138 lines=0, 139 cat=False, 140 sleep=None, 141 keep_open=False, 142 keep_open_paths=None, 143 exit_on_error=False): 144 returncode = 0 145 try: 146 while True: 147 # reset ring each run 148 if cat: 149 ring = sys.stdout 150 else: 151 ring = LinesIO(lines) 152 153 try: 154 # run the command under a pseudoterminal 155 mpty, spty = pty.openpty() 156 157 # forward terminal size 158 w, h = shutil.get_terminal_size((80, 5)) 159 if lines: 160 h = lines 161 fcntl.ioctl(spty, termios.TIOCSWINSZ, 162 struct.pack('HHHH', h, w, 0, 0)) 163 164 proc = sp.Popen(command, 165 stdout=spty, 166 stderr=spty, 167 close_fds=False) 168 os.close(spty) 169 mpty = os.fdopen(mpty, 'r', 1) 170 171 while True: 172 try: 173 line = mpty.readline() 174 except OSError as e: 175 if e.errno != errno.EIO: 176 raise 177 break 178 if not line: 179 break 180 181 ring.write(line) 182 if not cat: 183 ring.draw() 184 185 mpty.close() 186 proc.wait() 187 if exit_on_error and proc.returncode != 0: 188 returncode = proc.returncode 189 break 190 except OSError as e: 191 if e.errno != errno.ETXTBSY: 192 raise 193 pass 194 195 # try to inotifywait 196 if keep_open and inotify_simple is not None: 197 if keep_open_paths: 198 paths = set(keep_paths) 199 else: 200 # guess inotify paths from command 201 paths = set() 202 for p in command: 203 for p in { 204 p, 205 re.sub('^-.', '', p), 206 re.sub('^--[^=]+=', '', p)}: 207 if p and os.path.exists(p): 208 paths.add(p) 209 ptime = time.time() 210 inotifywait(paths) 211 # sleep for a minimum amount of time, this helps issues around 212 # rapidly updating files 213 time.sleep(max(0, (sleep or 0.1) - (time.time()-ptime))) 214 else: 215 time.sleep(sleep or 0.1) 216 except KeyboardInterrupt: 217 pass 218 219 if not cat: 220 sys.stdout.write('\n') 221 sys.exit(returncode) 222 223 224if __name__ == "__main__": 225 import sys 226 import argparse 227 parser = argparse.ArgumentParser( 228 description="Traditional watch command, but with higher resolution " 229 "updates and a bit different options/output format.", 230 allow_abbrev=False) 231 parser.add_argument( 232 'command', 233 nargs=argparse.REMAINDER, 234 help="Command to run.") 235 parser.add_argument( 236 '-n', '--lines', 237 nargs='?', 238 type=lambda x: int(x, 0), 239 const=0, 240 help="Show this many lines of history. 0 uses the terminal height. " 241 "Defaults to 0.") 242 parser.add_argument( 243 '-z', '--cat', 244 action='store_true', 245 help="Pipe directly to stdout.") 246 parser.add_argument( 247 '-s', '--sleep', 248 type=float, 249 help="Seconds to sleep between runs. Defaults to 0.1.") 250 parser.add_argument( 251 '-k', '--keep-open', 252 action='store_true', 253 help="Try to use inotify to wait for changes.") 254 parser.add_argument( 255 '-K', '--keep-open-path', 256 dest='keep_open_paths', 257 action='append', 258 help="Use this path for inotify. Defaults to guessing.") 259 parser.add_argument( 260 '-e', '--exit-on-error', 261 action='store_true', 262 help="Exit on error.") 263 sys.exit(main(**{k: v 264 for k, v in vars(parser.parse_args()).items() 265 if v is not None})) 266