1#!/usr/bin/env python3 2# 3# Efficiently displays the last n lines of a file/pipe. 4# 5# Example: 6# ./scripts/tailpipe.py trace -n5 7# 8# Copyright (c) 2022, The littlefs authors. 9# SPDX-License-Identifier: BSD-3-Clause 10# 11 12import collections as co 13import io 14import os 15import select 16import shutil 17import sys 18import threading as th 19import time 20 21 22def openio(path, mode='r', buffering=-1): 23 # allow '-' for stdin/stdout 24 if path == '-': 25 if mode == 'r': 26 return os.fdopen(os.dup(sys.stdin.fileno()), mode, buffering) 27 else: 28 return os.fdopen(os.dup(sys.stdout.fileno()), mode, buffering) 29 else: 30 return open(path, mode, buffering) 31 32class LinesIO: 33 def __init__(self, maxlen=None): 34 self.maxlen = maxlen 35 self.lines = co.deque(maxlen=maxlen) 36 self.tail = io.StringIO() 37 38 # trigger automatic sizing 39 if maxlen == 0: 40 self.resize(0) 41 42 def write(self, s): 43 # note using split here ensures the trailing string has no newline 44 lines = s.split('\n') 45 46 if len(lines) > 1 and self.tail.getvalue(): 47 self.tail.write(lines[0]) 48 lines[0] = self.tail.getvalue() 49 self.tail = io.StringIO() 50 51 self.lines.extend(lines[:-1]) 52 53 if lines[-1]: 54 self.tail.write(lines[-1]) 55 56 def resize(self, maxlen): 57 self.maxlen = maxlen 58 if maxlen == 0: 59 maxlen = shutil.get_terminal_size((80, 5))[1] 60 if maxlen != self.lines.maxlen: 61 self.lines = co.deque(self.lines, maxlen=maxlen) 62 63 canvas_lines = 1 64 def draw(self): 65 # did terminal size change? 66 if self.maxlen == 0: 67 self.resize(0) 68 69 # first thing first, give ourself a canvas 70 while LinesIO.canvas_lines < len(self.lines): 71 sys.stdout.write('\n') 72 LinesIO.canvas_lines += 1 73 74 # clear the bottom of the canvas if we shrink 75 shrink = LinesIO.canvas_lines - len(self.lines) 76 if shrink > 0: 77 for i in range(shrink): 78 sys.stdout.write('\r') 79 if shrink-1-i > 0: 80 sys.stdout.write('\x1b[%dA' % (shrink-1-i)) 81 sys.stdout.write('\x1b[K') 82 if shrink-1-i > 0: 83 sys.stdout.write('\x1b[%dB' % (shrink-1-i)) 84 sys.stdout.write('\x1b[%dA' % shrink) 85 LinesIO.canvas_lines = len(self.lines) 86 87 for i, line in enumerate(self.lines): 88 # move cursor, clear line, disable/reenable line wrapping 89 sys.stdout.write('\r') 90 if len(self.lines)-1-i > 0: 91 sys.stdout.write('\x1b[%dA' % (len(self.lines)-1-i)) 92 sys.stdout.write('\x1b[K') 93 sys.stdout.write('\x1b[?7l') 94 sys.stdout.write(line) 95 sys.stdout.write('\x1b[?7h') 96 if len(self.lines)-1-i > 0: 97 sys.stdout.write('\x1b[%dB' % (len(self.lines)-1-i)) 98 sys.stdout.flush() 99 100 101def main(path='-', *, lines=5, cat=False, sleep=None, keep_open=False): 102 if cat: 103 ring = sys.stdout 104 else: 105 ring = LinesIO(lines) 106 107 # if sleep print in background thread to avoid getting stuck in a read call 108 event = th.Event() 109 lock = th.Lock() 110 if not cat: 111 done = False 112 def background(): 113 while not done: 114 event.wait() 115 event.clear() 116 with lock: 117 ring.draw() 118 time.sleep(sleep or 0.01) 119 th.Thread(target=background, daemon=True).start() 120 121 try: 122 while True: 123 with openio(path) as f: 124 for line in f: 125 with lock: 126 ring.write(line) 127 event.set() 128 129 if not keep_open: 130 break 131 # don't just flood open calls 132 time.sleep(sleep or 0.1) 133 except FileNotFoundError as e: 134 print("error: file not found %r" % path) 135 sys.exit(-1) 136 except KeyboardInterrupt: 137 pass 138 139 if not cat: 140 done = True 141 lock.acquire() # avoids https://bugs.python.org/issue42717 142 sys.stdout.write('\n') 143 144 145if __name__ == "__main__": 146 import sys 147 import argparse 148 parser = argparse.ArgumentParser( 149 description="Efficiently displays the last n lines of a file/pipe.", 150 allow_abbrev=False) 151 parser.add_argument( 152 'path', 153 nargs='?', 154 help="Path to read from.") 155 parser.add_argument( 156 '-n', '--lines', 157 nargs='?', 158 type=lambda x: int(x, 0), 159 const=0, 160 help="Show this many lines of history. 0 uses the terminal height. " 161 "Defaults to 5.") 162 parser.add_argument( 163 '-z', '--cat', 164 action='store_true', 165 help="Pipe directly to stdout.") 166 parser.add_argument( 167 '-s', '--sleep', 168 type=float, 169 help="Seconds to sleep between reads. Defaults to 0.01.") 170 parser.add_argument( 171 '-k', '--keep-open', 172 action='store_true', 173 help="Reopen the pipe on EOF, useful when multiple " 174 "processes are writing.") 175 sys.exit(main(**{k: v 176 for k, v in vars(parser.parse_intermixed_args()).items() 177 if v is not None})) 178