1#!/usr/bin/env python3 2# Copyright (c) 2022 Rodrigo Peixoto <rodrigopex@gmail.com> 3# SPDX-License-Identifier: Apache-2.0 4import serial 5import json 6from time import sleep 7import argparse 8 9j = """ 10[ 11{"name":"version","on_changed": false, "read_only": true, "message_size": 4}, 12{"name":"sensor_data","on_changed": true, "read_only": false, "message_size": 4}, 13{"name":"start_measurement","on_changed": false, "read_only": false, "message_size": 1} 14]""" 15 16channels = json.loads(j) 17parser = argparse.ArgumentParser(description='Read zbus events via serial.', allow_abbrev=False) 18parser.add_argument("port", type=str, help='The tty or COM port to be used') 19 20args = parser.parse_args() 21 22 23def fetch_sentence(ser): 24 channel_id = int.from_bytes(ser.read(), "little") 25 channel_name = channels[channel_id]['name'] 26 msg_size = channels[channel_id]['message_size'] 27 msg = ser.read(msg_size) 28 ser.read(1) # skip '*' 29 return (channel_name, msg_size, msg) 30 31 32def pub_start_measurement(ser, action: bool): 33 print( 34 f"Proxy PUB [{channels[2]['name']}] -> start measurement") 35 ser.write(b'$') 36 ser.write(b'\x02') # idx 37 ser.write(b'\x01') 38 ser.write(b'*') 39 ser.flush() 40 41 42ser = serial.Serial(args.port) 43pub_start_measurement(ser, True) 44while True: 45 d = ser.read() 46 if d == b'$': 47 channel_name, msg_size, msg = fetch_sentence(ser) 48 if channel_name == "sensor_data": 49 print( 50 f"Proxy NOTIFY: [{channel_name}] -> sensor value {int.from_bytes(msg, 'little')}") 51 sleep(1) 52 pub_start_measurement(ser, True) 53