1#!/usr/bin/env python3
2#
3# Copyright (c) 2024 Nordic Semiconductor ASA
4#
5# SPDX-License-Identifier: Apache-2.0
6
7"""
8Log Parser for Dictionary-based Logging
9
10This uses the JSON database file to decode the binary
11log data taken directly from input serialport and print
12the log messages.
13"""
14
15import argparse
16import contextlib
17import logging
18import os
19import select
20import sys
21import time
22
23import parserlib
24import serial
25
26try:
27    # Pylink is an optional dependency for RTT reading, which requires it's own installation.
28    # Don't fail, unless the user tries to use RTT reading.
29    import pylink
30except ImportError:
31    pylink = None
32
33LOGGER_FORMAT = "%(message)s"
34logger = logging.getLogger("parser")
35
36
37class SerialReader:
38    """Class to read data from serial port and parse it"""
39
40    def __init__(self, serial_port, baudrate):
41        self.serial_port = serial_port
42        self.baudrate = baudrate
43        self.serial = None
44
45    @contextlib.contextmanager
46    def open(self):
47        try:
48            self.serial = serial.Serial(self.serial_port, self.baudrate)
49            yield
50        finally:
51            self.serial.close()
52
53    def fileno(self):
54        return self.serial.fileno()
55
56    def read_non_blocking(self):
57        size = self.serial.in_waiting
58        return self.serial.read(size)
59
60
61class FileReader:
62    """Class to read data from serial port and parse it"""
63
64    def __init__(self, filepath):
65        self.filepath = filepath
66        self.file = None
67
68    @contextlib.contextmanager
69    def open(self):
70        if self.filepath is not None:
71            with open(self.filepath, 'rb') as f:
72                self.file = f
73                yield
74        else:
75            sys.stdin = os.fdopen(sys.stdin.fileno(), 'rb', 0)
76            self.file = sys.stdin
77            yield
78
79    def fileno(self):
80        return self.file.fileno()
81
82    def read_non_blocking(self):
83        # Read available data using a reasonable buffer size (without buffer size, this blocks
84        # forever, but with buffer size it returns even when less data than the buffer read was
85        # available).
86        return self.file.read(1024)
87
88
89class JLinkRTTReader:
90    """Class to read data from JLink's RTT"""
91
92    @staticmethod
93    def _create_jlink_connection(lib_path):
94        if pylink is None:
95            raise ImportError(
96                "pylink module is required for RTT reading. "
97                "Please install it using 'pip install pylink-square'."
98            )
99
100        if lib_path is not None:
101            lib = pylink.Library(lib_path, True)
102            jlink = pylink.JLink(lib)
103        else:
104            jlink = pylink.JLink()
105
106        return jlink
107
108    @contextlib.contextmanager
109    def open(self):
110        try:
111            self.jlink.open()
112            self.jlink.set_tif(pylink.enums.JLinkInterfaces.SWD)
113            if self.speed != 0:
114                self.jlink.connect(self.target_device, self.speed)
115            else:
116                self.jlink.connect(self.target_device)
117
118            self.jlink.rtt_start(self.block_address)
119
120            # Wait for the JLINK RTT buffers to be initialized.
121            up_down_initialized = False
122            while not up_down_initialized:
123                try:
124                    _ = self.jlink.rtt_get_num_up_buffers()
125                    _ = self.jlink.rtt_get_num_down_buffers()
126                    up_down_initialized = True
127                except pylink.errors.JLinkRTTException:
128                    time.sleep(0.1)
129
130            yield
131
132        finally:
133            self.close()
134
135    def __init__(self, target_device, block_address, channel, speed, lib_path):
136        self.target_device = target_device
137        self.block_address = block_address
138        self.speed = speed
139        self.channel = channel
140
141        self.jlink = self._create_jlink_connection(lib_path)
142
143    def close(self):
144        # JLink closes the connection through the __del__ method.
145        del self.jlink
146
147    def read_non_blocking(self):
148        return bytes(self.jlink.rtt_read(self.channel, 1024))
149
150
151def parse_args():
152    """Parse command line arguments"""
153    parser = argparse.ArgumentParser(allow_abbrev=False)
154
155    parser.add_argument("dbfile", help="Dictionary Logging Database file")
156    parser.add_argument("--debug", action="store_true", help="Print extra debugging information")
157    parser.add_argument(
158        "--polling-interval",
159        type=float,
160        default=0.1,
161        help="Interval for polling input source, if it does not support 'select'",
162    )
163
164    # Create subparsers for different input modes
165    subparsers = parser.add_subparsers(dest="mode", required=True, help="Input source mode")
166
167    # Serial subparser
168    serial_parser = subparsers.add_parser("serial", help="Read from serial port")
169    serial_parser.add_argument("port", help="Serial port")
170    serial_parser.add_argument("baudrate", type=int, help="Baudrate")
171
172    # File subparser
173    file_parser = subparsers.add_parser("file", help="Read from file")
174    file_parser.add_argument(
175        "filepath", nargs="?", default=None, help="Input file path, leave empty for stdin"
176    )
177
178    # RTT subparser
179    jlink_rtt_parser = subparsers.add_parser("jlink-rtt", help="Read from RTT")
180    jlink_rtt_parser.add_argument(
181        "target_device", help="Device Name (see https://www.segger.com/supported-devices/jlink/)"
182    )
183    jlink_rtt_parser.add_argument(
184        "--block-address", help="RTT block address in hex", type=lambda x: int(x, 16)
185    )
186    jlink_rtt_parser.add_argument("--channel", type=int, help="RTT channel number", default=0)
187    jlink_rtt_parser.add_argument("--speed", type=int, help="Reading speed", default='0')
188    jlink_rtt_parser.add_argument("--lib-path", help="Path to libjlinkarm.so library")
189
190    return parser.parse_args()
191
192
193def main():
194    """function of serial parser"""
195    args = parse_args()
196
197    if args.dbfile is None or '.json' not in args.dbfile:
198        logger.error("ERROR: invalid log database path: %s, exiting...", args.dbfile)
199        sys.exit(1)
200
201    logging.basicConfig(format=LOGGER_FORMAT)
202
203    if args.debug:
204        logger.setLevel(logging.DEBUG)
205    else:
206        logger.setLevel(logging.INFO)
207
208    log_parser = parserlib.get_log_parser(args.dbfile, logger)
209
210    data = b''
211
212    if args.mode == "serial":
213        reader = SerialReader(args.port, args.baudrate)
214    elif args.mode == "file":
215        reader = FileReader(args.filepath)
216    elif args.mode == "jlink-rtt":
217        reader = JLinkRTTReader(
218            args.target_device, args.block_address, args.channel, args.speed, args.lib_path
219        )
220    else:
221        raise ValueError("Invalid mode selected. Use 'serial' or 'file'.")
222
223    with reader.open():
224        while True:
225            if hasattr(reader, 'fileno'):
226                _, _, _ = select.select([reader], [], [])
227            else:
228                time.sleep(args.polling_interval)
229            data += reader.read_non_blocking()
230            parsed_data_offset = parserlib.parser(data, log_parser, logger)
231            data = data[parsed_data_offset:]
232
233
234if __name__ == "__main__":
235    main()
236