#!/usr/bin/env python3 # # Copyright (c) 2024 Nordic Semiconductor ASA # # SPDX-License-Identifier: Apache-2.0 """ Log Parser for Dictionary-based Logging This uses the JSON database file to decode the binary log data taken directly from input serialport and print the log messages. """ import argparse import contextlib import logging import os import select import sys import time import parserlib import serial try: # Pylink is an optional dependency for RTT reading, which requires it's own installation. # Don't fail, unless the user tries to use RTT reading. import pylink except ImportError: pylink = None LOGGER_FORMAT = "%(message)s" logger = logging.getLogger("parser") class SerialReader: """Class to read data from serial port and parse it""" def __init__(self, serial_port, baudrate): self.serial_port = serial_port self.baudrate = baudrate self.serial = None @contextlib.contextmanager def open(self): try: self.serial = serial.Serial(self.serial_port, self.baudrate) yield finally: self.serial.close() def fileno(self): return self.serial.fileno() def read_non_blocking(self): size = self.serial.in_waiting return self.serial.read(size) class FileReader: """Class to read data from serial port and parse it""" def __init__(self, filepath): self.filepath = filepath self.file = None @contextlib.contextmanager def open(self): if self.filepath is not None: with open(self.filepath, 'rb') as f: self.file = f yield else: sys.stdin = os.fdopen(sys.stdin.fileno(), 'rb', 0) self.file = sys.stdin yield def fileno(self): return self.file.fileno() def read_non_blocking(self): # Read available data using a reasonable buffer size (without buffer size, this blocks # forever, but with buffer size it returns even when less data than the buffer read was # available). return self.file.read(1024) class JLinkRTTReader: """Class to read data from JLink's RTT""" @staticmethod def _create_jlink_connection(lib_path): if pylink is None: raise ImportError( "pylink module is required for RTT reading. " "Please install it using 'pip install pylink-square'." ) if lib_path is not None: lib = pylink.Library(lib_path, True) jlink = pylink.JLink(lib) else: jlink = pylink.JLink() return jlink @contextlib.contextmanager def open(self): try: self.jlink.open() self.jlink.set_tif(pylink.enums.JLinkInterfaces.SWD) if self.speed != 0: self.jlink.connect(self.target_device, self.speed) else: self.jlink.connect(self.target_device) self.jlink.rtt_start(self.block_address) # Wait for the JLINK RTT buffers to be initialized. up_down_initialized = False while not up_down_initialized: try: _ = self.jlink.rtt_get_num_up_buffers() _ = self.jlink.rtt_get_num_down_buffers() up_down_initialized = True except pylink.errors.JLinkRTTException: time.sleep(0.1) yield finally: self.close() def __init__(self, target_device, block_address, channel, speed, lib_path): self.target_device = target_device self.block_address = block_address self.speed = speed self.channel = channel self.jlink = self._create_jlink_connection(lib_path) def close(self): # JLink closes the connection through the __del__ method. del self.jlink def read_non_blocking(self): return bytes(self.jlink.rtt_read(self.channel, 1024)) def parse_args(): """Parse command line arguments""" parser = argparse.ArgumentParser(allow_abbrev=False) parser.add_argument("dbfile", help="Dictionary Logging Database file") parser.add_argument("--debug", action="store_true", help="Print extra debugging information") parser.add_argument( "--polling-interval", type=float, default=0.1, help="Interval for polling input source, if it does not support 'select'", ) # Create subparsers for different input modes subparsers = parser.add_subparsers(dest="mode", required=True, help="Input source mode") # Serial subparser serial_parser = subparsers.add_parser("serial", help="Read from serial port") serial_parser.add_argument("port", help="Serial port") serial_parser.add_argument("baudrate", type=int, help="Baudrate") # File subparser file_parser = subparsers.add_parser("file", help="Read from file") file_parser.add_argument( "filepath", nargs="?", default=None, help="Input file path, leave empty for stdin" ) # RTT subparser jlink_rtt_parser = subparsers.add_parser("jlink-rtt", help="Read from RTT") jlink_rtt_parser.add_argument( "target_device", help="Device Name (see https://www.segger.com/supported-devices/jlink/)" ) jlink_rtt_parser.add_argument( "--block-address", help="RTT block address in hex", type=lambda x: int(x, 16) ) jlink_rtt_parser.add_argument("--channel", type=int, help="RTT channel number", default=0) jlink_rtt_parser.add_argument("--speed", type=int, help="Reading speed", default='0') jlink_rtt_parser.add_argument("--lib-path", help="Path to libjlinkarm.so library") return parser.parse_args() def main(): """function of serial parser""" args = parse_args() if args.dbfile is None or '.json' not in args.dbfile: logger.error("ERROR: invalid log database path: %s, exiting...", args.dbfile) sys.exit(1) logging.basicConfig(format=LOGGER_FORMAT) if args.debug: logger.setLevel(logging.DEBUG) else: logger.setLevel(logging.INFO) log_parser = parserlib.get_log_parser(args.dbfile, logger) data = b'' if args.mode == "serial": reader = SerialReader(args.port, args.baudrate) elif args.mode == "file": reader = FileReader(args.filepath) elif args.mode == "jlink-rtt": reader = JLinkRTTReader( args.target_device, args.block_address, args.channel, args.speed, args.lib_path ) else: raise ValueError("Invalid mode selected. Use 'serial' or 'file'.") with reader.open(): while True: if hasattr(reader, 'fileno'): _, _, _ = select.select([reader], [], []) else: time.sleep(args.polling_interval) data += reader.read_non_blocking() parsed_data_offset = parserlib.parser(data, log_parser, logger) data = data[parsed_data_offset:] if __name__ == "__main__": main()