1#!/usr/bin/env python3 2# 3# Copyright (c) 2024 Nordic Semiconductor ASA 4# 5# SPDX-License-Identifier: Apache-2.0 6 7""" 8Log Parser for Dictionary-based Logging 9 10This uses the JSON database file to decode the binary 11log data taken directly from input serialport and print 12the log messages. 13""" 14 15import argparse 16import contextlib 17import logging 18import os 19import select 20import sys 21import time 22 23import parserlib 24import serial 25 26try: 27 # Pylink is an optional dependency for RTT reading, which requires it's own installation. 28 # Don't fail, unless the user tries to use RTT reading. 29 import pylink 30except ImportError: 31 pylink = None 32 33LOGGER_FORMAT = "%(message)s" 34logger = logging.getLogger("parser") 35 36 37class SerialReader: 38 """Class to read data from serial port and parse it""" 39 40 def __init__(self, serial_port, baudrate): 41 self.serial_port = serial_port 42 self.baudrate = baudrate 43 self.serial = None 44 45 @contextlib.contextmanager 46 def open(self): 47 try: 48 self.serial = serial.Serial(self.serial_port, self.baudrate) 49 yield 50 finally: 51 self.serial.close() 52 53 def fileno(self): 54 return self.serial.fileno() 55 56 def read_non_blocking(self): 57 size = self.serial.in_waiting 58 return self.serial.read(size) 59 60 61class FileReader: 62 """Class to read data from serial port and parse it""" 63 64 def __init__(self, filepath): 65 self.filepath = filepath 66 self.file = None 67 68 @contextlib.contextmanager 69 def open(self): 70 if self.filepath is not None: 71 with open(self.filepath, 'rb') as f: 72 self.file = f 73 yield 74 else: 75 sys.stdin = os.fdopen(sys.stdin.fileno(), 'rb', 0) 76 self.file = sys.stdin 77 yield 78 79 def fileno(self): 80 return self.file.fileno() 81 82 def read_non_blocking(self): 83 # Read available data using a reasonable buffer size (without buffer size, this blocks 84 # forever, but with buffer size it returns even when less data than the buffer read was 85 # available). 86 return self.file.read(1024) 87 88 89class JLinkRTTReader: 90 """Class to read data from JLink's RTT""" 91 92 @staticmethod 93 def _create_jlink_connection(lib_path): 94 if pylink is None: 95 raise ImportError( 96 "pylink module is required for RTT reading. " 97 "Please install it using 'pip install pylink-square'." 98 ) 99 100 if lib_path is not None: 101 lib = pylink.Library(lib_path, True) 102 jlink = pylink.JLink(lib) 103 else: 104 jlink = pylink.JLink() 105 106 return jlink 107 108 @contextlib.contextmanager 109 def open(self): 110 try: 111 self.jlink.open() 112 self.jlink.set_tif(pylink.enums.JLinkInterfaces.SWD) 113 if self.speed != 0: 114 self.jlink.connect(self.target_device, self.speed) 115 else: 116 self.jlink.connect(self.target_device) 117 118 self.jlink.rtt_start(self.block_address) 119 120 # Wait for the JLINK RTT buffers to be initialized. 121 up_down_initialized = False 122 while not up_down_initialized: 123 try: 124 _ = self.jlink.rtt_get_num_up_buffers() 125 _ = self.jlink.rtt_get_num_down_buffers() 126 up_down_initialized = True 127 except pylink.errors.JLinkRTTException: 128 time.sleep(0.1) 129 130 yield 131 132 finally: 133 self.close() 134 135 def __init__(self, target_device, block_address, channel, speed, lib_path): 136 self.target_device = target_device 137 self.block_address = block_address 138 self.speed = speed 139 self.channel = channel 140 141 self.jlink = self._create_jlink_connection(lib_path) 142 143 def close(self): 144 # JLink closes the connection through the __del__ method. 145 del self.jlink 146 147 def read_non_blocking(self): 148 return bytes(self.jlink.rtt_read(self.channel, 1024)) 149 150 151def parse_args(): 152 """Parse command line arguments""" 153 parser = argparse.ArgumentParser(allow_abbrev=False) 154 155 parser.add_argument("dbfile", help="Dictionary Logging Database file") 156 parser.add_argument("--debug", action="store_true", help="Print extra debugging information") 157 parser.add_argument( 158 "--polling-interval", 159 type=float, 160 default=0.1, 161 help="Interval for polling input source, if it does not support 'select'", 162 ) 163 164 # Create subparsers for different input modes 165 subparsers = parser.add_subparsers(dest="mode", required=True, help="Input source mode") 166 167 # Serial subparser 168 serial_parser = subparsers.add_parser("serial", help="Read from serial port") 169 serial_parser.add_argument("port", help="Serial port") 170 serial_parser.add_argument("baudrate", type=int, help="Baudrate") 171 172 # File subparser 173 file_parser = subparsers.add_parser("file", help="Read from file") 174 file_parser.add_argument( 175 "filepath", nargs="?", default=None, help="Input file path, leave empty for stdin" 176 ) 177 178 # RTT subparser 179 jlink_rtt_parser = subparsers.add_parser("jlink-rtt", help="Read from RTT") 180 jlink_rtt_parser.add_argument( 181 "target_device", help="Device Name (see https://www.segger.com/supported-devices/jlink/)" 182 ) 183 jlink_rtt_parser.add_argument( 184 "--block-address", help="RTT block address in hex", type=lambda x: int(x, 16) 185 ) 186 jlink_rtt_parser.add_argument("--channel", type=int, help="RTT channel number", default=0) 187 jlink_rtt_parser.add_argument("--speed", type=int, help="Reading speed", default='0') 188 jlink_rtt_parser.add_argument("--lib-path", help="Path to libjlinkarm.so library") 189 190 return parser.parse_args() 191 192 193def main(): 194 """function of serial parser""" 195 args = parse_args() 196 197 if args.dbfile is None or '.json' not in args.dbfile: 198 logger.error("ERROR: invalid log database path: %s, exiting...", args.dbfile) 199 sys.exit(1) 200 201 logging.basicConfig(format=LOGGER_FORMAT) 202 203 if args.debug: 204 logger.setLevel(logging.DEBUG) 205 else: 206 logger.setLevel(logging.INFO) 207 208 log_parser = parserlib.get_log_parser(args.dbfile, logger) 209 210 data = b'' 211 212 if args.mode == "serial": 213 reader = SerialReader(args.port, args.baudrate) 214 elif args.mode == "file": 215 reader = FileReader(args.filepath) 216 elif args.mode == "jlink-rtt": 217 reader = JLinkRTTReader( 218 args.target_device, args.block_address, args.channel, args.speed, args.lib_path 219 ) 220 else: 221 raise ValueError("Invalid mode selected. Use 'serial' or 'file'.") 222 223 with reader.open(): 224 while True: 225 if hasattr(reader, 'fileno'): 226 _, _, _ = select.select([reader], [], []) 227 else: 228 time.sleep(args.polling_interval) 229 data += reader.read_non_blocking() 230 parsed_data_offset = parserlib.parser(data, log_parser, logger) 231 data = data[parsed_data_offset:] 232 233 234if __name__ == "__main__": 235 main() 236