1#!/usr/bin/env python3
2#
3# Efficiently displays the last n lines of a file/pipe.
4#
5# Example:
6# ./scripts/tailpipe.py trace -n5
7#
8# Copyright (c) 2022, The littlefs authors.
9# SPDX-License-Identifier: BSD-3-Clause
10#
11
12import collections as co
13import io
14import os
15import select
16import shutil
17import sys
18import threading as th
19import time
20
21
22def openio(path, mode='r', buffering=-1):
23    # allow '-' for stdin/stdout
24    if path == '-':
25        if mode == 'r':
26            return os.fdopen(os.dup(sys.stdin.fileno()), mode, buffering)
27        else:
28            return os.fdopen(os.dup(sys.stdout.fileno()), mode, buffering)
29    else:
30        return open(path, mode, buffering)
31
32class LinesIO:
33    def __init__(self, maxlen=None):
34        self.maxlen = maxlen
35        self.lines = co.deque(maxlen=maxlen)
36        self.tail = io.StringIO()
37
38        # trigger automatic sizing
39        if maxlen == 0:
40            self.resize(0)
41
42    def write(self, s):
43        # note using split here ensures the trailing string has no newline
44        lines = s.split('\n')
45
46        if len(lines) > 1 and self.tail.getvalue():
47            self.tail.write(lines[0])
48            lines[0] = self.tail.getvalue()
49            self.tail = io.StringIO()
50
51        self.lines.extend(lines[:-1])
52
53        if lines[-1]:
54            self.tail.write(lines[-1])
55
56    def resize(self, maxlen):
57        self.maxlen = maxlen
58        if maxlen == 0:
59            maxlen = shutil.get_terminal_size((80, 5))[1]
60        if maxlen != self.lines.maxlen:
61            self.lines = co.deque(self.lines, maxlen=maxlen)
62
63    canvas_lines = 1
64    def draw(self):
65        # did terminal size change?
66        if self.maxlen == 0:
67            self.resize(0)
68
69        # first thing first, give ourself a canvas
70        while LinesIO.canvas_lines < len(self.lines):
71            sys.stdout.write('\n')
72            LinesIO.canvas_lines += 1
73
74        # clear the bottom of the canvas if we shrink
75        shrink = LinesIO.canvas_lines - len(self.lines)
76        if shrink > 0:
77            for i in range(shrink):
78                sys.stdout.write('\r')
79                if shrink-1-i > 0:
80                    sys.stdout.write('\x1b[%dA' % (shrink-1-i))
81                sys.stdout.write('\x1b[K')
82                if shrink-1-i > 0:
83                    sys.stdout.write('\x1b[%dB' % (shrink-1-i))
84            sys.stdout.write('\x1b[%dA' % shrink)
85            LinesIO.canvas_lines = len(self.lines)
86
87        for i, line in enumerate(self.lines):
88            # move cursor, clear line, disable/reenable line wrapping
89            sys.stdout.write('\r')
90            if len(self.lines)-1-i > 0:
91                sys.stdout.write('\x1b[%dA' % (len(self.lines)-1-i))
92            sys.stdout.write('\x1b[K')
93            sys.stdout.write('\x1b[?7l')
94            sys.stdout.write(line)
95            sys.stdout.write('\x1b[?7h')
96            if len(self.lines)-1-i > 0:
97                sys.stdout.write('\x1b[%dB' % (len(self.lines)-1-i))
98        sys.stdout.flush()
99
100
101def main(path='-', *, lines=5, cat=False, sleep=None, keep_open=False):
102    if cat:
103        ring = sys.stdout
104    else:
105        ring = LinesIO(lines)
106
107    # if sleep print in background thread to avoid getting stuck in a read call
108    event = th.Event()
109    lock = th.Lock()
110    if not cat:
111        done = False
112        def background():
113            while not done:
114                event.wait()
115                event.clear()
116                with lock:
117                    ring.draw()
118                time.sleep(sleep or 0.01)
119        th.Thread(target=background, daemon=True).start()
120
121    try:
122        while True:
123            with openio(path) as f:
124                for line in f:
125                    with lock:
126                        ring.write(line)
127                        event.set()
128
129            if not keep_open:
130                break
131            # don't just flood open calls
132            time.sleep(sleep or 0.1)
133    except FileNotFoundError as e:
134        print("error: file not found %r" % path)
135        sys.exit(-1)
136    except KeyboardInterrupt:
137        pass
138
139    if not cat:
140        done = True
141        lock.acquire() # avoids https://bugs.python.org/issue42717
142        sys.stdout.write('\n')
143
144
145if __name__ == "__main__":
146    import sys
147    import argparse
148    parser = argparse.ArgumentParser(
149        description="Efficiently displays the last n lines of a file/pipe.",
150        allow_abbrev=False)
151    parser.add_argument(
152        'path',
153        nargs='?',
154        help="Path to read from.")
155    parser.add_argument(
156        '-n', '--lines',
157        nargs='?',
158        type=lambda x: int(x, 0),
159        const=0,
160        help="Show this many lines of history. 0 uses the terminal height. "
161            "Defaults to 5.")
162    parser.add_argument(
163        '-z', '--cat',
164        action='store_true',
165        help="Pipe directly to stdout.")
166    parser.add_argument(
167        '-s', '--sleep',
168        type=float,
169        help="Seconds to sleep between reads. Defaults to 0.01.")
170    parser.add_argument(
171        '-k', '--keep-open',
172        action='store_true',
173        help="Reopen the pipe on EOF, useful when multiple "
174            "processes are writing.")
175    sys.exit(main(**{k: v
176        for k, v in vars(parser.parse_intermixed_args()).items()
177        if v is not None}))
178