1#!/usr/bin/env python3 2# Copyright (c) 2022 Rodrigo Peixoto <rodrigopex@gmail.com> 3# SPDX-License-Identifier: Apache-2.0 4import argparse 5import json 6 7import serial 8 9j = """ 10[ 11 {"name":"version","on_changed": false, "read_only": true, "message_size": 4}, 12 {"name":"sensor_data","on_changed": true, "read_only": false, "message_size":8}, 13 {"name":"start_measurement","on_changed": false, "read_only": false, "message_size": 1}, 14 {"name":"finish","on_changed": false, "read_only": false, "message_size": 1} 15] 16""" 17 18parser = argparse.ArgumentParser(description='Read zbus events via serial.', allow_abbrev=False) 19parser.add_argument("port", type=str, help='The tty or COM port to be used') 20 21args = parser.parse_args() 22 23channels = json.loads(j) 24 25 26def fetch_sentence(): 27 name = b"" 28 while True: 29 b = ser.read(size=1) 30 if b == b",": 31 break 32 name += b 33 print(name) 34 found_msg_size = int.from_bytes(ser.read(size=1), byteorder="little") 35 found_msg = ser.read(found_msg_size) 36 return name, found_msg_size, found_msg 37 38 39try: 40 ser = serial.Serial(args.port) 41 while True: 42 d = ser.read() 43 if d == b'$': 44 channel_name, msg_size, msg = fetch_sentence() 45 print(f"PUB [{channel_name}] -> {msg}") 46except serial.serialutil.SerialException as e: 47 print(e) 48