# Copyright (c) 2023, Bjarki Arge Andreasen # SPDX-License-Identifier: Apache-2.0 import socket import threading import select import time import copy class TEUDPReceiveSession(): def __init__(self, address, timeout = 1): self.address = address self.last_packet_received_at = time.monotonic() self.timeout = timeout self.packets_received = 0 self.packets_dropped = 0 def get_address(self): return self.address def on_packet_received(self, data): if self._validate_packet_(data): self.packets_received += 1 else: self.packets_dropped += 1 self.last_packet_received_at = time.monotonic() def update(self): if (time.monotonic() - self.last_packet_received_at) > self.timeout: return (self.packets_received, self.packets_dropped) return None def _validate_packet_(self, data: bytes) -> bool: prng_state = 1234 for b in data: prng_state = ((1103515245 * prng_state) + 12345) % (1 << 31) if prng_state & 0xFF != b: return False return True class TEUDPReceive(): def __init__(self): self.running = True self.thread = threading.Thread(target=self._target_) self.sessions = [] def start(self): self.thread.start() def stop(self): self.running = False self.thread.join(1) def _target_(self): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setblocking(False) sock.bind(('0.0.0.0', 7781)) while self.running: try: ready_to_read, _, _ = select.select([sock], [sock], [], 0.5) if not ready_to_read: self._update_sessions_(sock) continue data, address = sock.recvfrom(4096) print(f'udp received {len(data)} bytes -> {address[0]}:{address[1]}') session = self._get_session_by_address_(address) session.on_packet_received(data) except Exception as e: print(e) break sock.close() def _get_session_by_address_(self, address) -> TEUDPReceiveSession: # Search for existing session for session in self.sessions: if session.get_address() == address: return session # Create and return new session print(f'Created session for {address[0]}:{address[1]}') self.sessions.append(TEUDPReceiveSession(address, 2)) return self.sessions[-1] def _update_sessions_(self, sock): sessions = copy.copy(self.sessions) for session in sessions: result = session.update() if result is None: continue response = bytes([result[0], result[1]]) print(f'Sending result {response} to address {session.get_address()}') sock.sendto(response, session.get_address()) print(f'Removing session for address {session.get_address()}') self.sessions.remove(session)