1#!/usr/bin/env python3 2# Copyright (c) 2022 Rodrigo Peixoto <rodrigopex@gmail.com> 3# SPDX-License-Identifier: Apache-2.0 4import argparse 5import json 6from time import sleep 7 8import serial 9 10j = """ 11[ 12{"name":"version","on_changed": false, "read_only": true, "message_size": 4}, 13{"name":"sensor_data","on_changed": true, "read_only": false, "message_size": 4}, 14{"name":"start_measurement","on_changed": false, "read_only": false, "message_size": 1} 15]""" 16 17channels = json.loads(j) 18parser = argparse.ArgumentParser(description='Read zbus events via serial.', allow_abbrev=False) 19parser.add_argument("port", type=str, help='The tty or COM port to be used') 20 21args = parser.parse_args() 22 23 24def fetch_sentence(ser): 25 channel_id = int.from_bytes(ser.read(), "little") 26 channel_name = channels[channel_id]['name'] 27 msg_size = channels[channel_id]['message_size'] 28 msg = ser.read(msg_size) 29 ser.read(1) # skip '*' 30 return (channel_name, msg_size, msg) 31 32 33def pub_start_measurement(ser, action: bool): 34 print( 35 f"Proxy PUB [{channels[2]['name']}] -> start measurement") 36 ser.write(b'$') 37 ser.write(b'\x02') # idx 38 ser.write(b'\x01') 39 ser.write(b'*') 40 ser.flush() 41 42 43ser = serial.Serial(args.port) 44pub_start_measurement(ser, True) 45while True: 46 d = ser.read() 47 if d == b'$': 48 channel_name, msg_size, msg = fetch_sentence(ser) 49 if channel_name == "sensor_data": 50 print( 51 f"Proxy NOTIFY: [{channel_name}] -> sensor value {int.from_bytes(msg, 'little')}") 52 sleep(1) 53 pub_start_measurement(ser, True) 54