1# Copyright (c) 2023, Bjarki Arge Andreasen 2# SPDX-License-Identifier: Apache-2.0 3 4import socket 5import threading 6import select 7import time 8import copy 9 10class TEUDPReceiveSession(): 11 def __init__(self, address, timeout = 1): 12 self.address = address 13 self.last_packet_received_at = time.monotonic() 14 self.timeout = timeout 15 self.packets_received = 0 16 self.packets_dropped = 0 17 18 def get_address(self): 19 return self.address 20 21 def on_packet_received(self, data): 22 if self._validate_packet_(data): 23 self.packets_received += 1 24 else: 25 self.packets_dropped += 1 26 27 self.last_packet_received_at = time.monotonic() 28 29 def update(self): 30 if (time.monotonic() - self.last_packet_received_at) > self.timeout: 31 return (self.packets_received, self.packets_dropped) 32 return None 33 34 def _validate_packet_(self, data: bytes) -> bool: 35 prng_state = 1234 36 for b in data: 37 prng_state = ((1103515245 * prng_state) + 12345) % (1 << 31) 38 if prng_state & 0xFF != b: 39 return False 40 return True 41 42class TEUDPReceive(): 43 def __init__(self): 44 self.running = True 45 self.thread = threading.Thread(target=self._target_) 46 self.sessions = [] 47 48 def start(self): 49 self.thread.start() 50 51 def stop(self): 52 self.running = False 53 self.thread.join(1) 54 55 def _target_(self): 56 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 57 sock.setblocking(False) 58 sock.bind(('0.0.0.0', 7781)) 59 60 while self.running: 61 try: 62 ready_to_read, _, _ = select.select([sock], [sock], [], 0.5) 63 64 if not ready_to_read: 65 self._update_sessions_(sock) 66 continue 67 68 data, address = sock.recvfrom(4096) 69 70 print(f'udp received {len(data)} bytes -> {address[0]}:{address[1]}') 71 72 session = self._get_session_by_address_(address) 73 session.on_packet_received(data) 74 75 except Exception as e: 76 print(e) 77 break 78 79 sock.close() 80 81 def _get_session_by_address_(self, address) -> TEUDPReceiveSession: 82 # Search for existing session 83 for session in self.sessions: 84 if session.get_address() == address: 85 return session 86 87 # Create and return new session 88 print(f'Created session for {address[0]}:{address[1]}') 89 self.sessions.append(TEUDPReceiveSession(address, 2)) 90 return self.sessions[-1] 91 92 def _update_sessions_(self, sock): 93 sessions = copy.copy(self.sessions) 94 95 for session in sessions: 96 result = session.update() 97 98 if result is None: 99 continue 100 101 response = bytes([result[0], result[1]]) 102 103 print(f'Sending result {response} to address {session.get_address()}') 104 sock.sendto(response, session.get_address()) 105 106 print(f'Removing session for address {session.get_address()}') 107 self.sessions.remove(session) 108