1# Copyright (c) 2023, Bjarki Arge Andreasen
2# SPDX-License-Identifier: Apache-2.0
3
4import socket
5import threading
6import select
7import time
8import copy
9
10class TEUDPReceiveSession():
11    def __init__(self, address, timeout = 1):
12        self.address = address
13        self.last_packet_received_at = time.monotonic()
14        self.timeout = timeout
15        self.packets_received = 0
16        self.packets_dropped = 0
17
18    def get_address(self):
19        return self.address
20
21    def on_packet_received(self, data):
22        if self._validate_packet_(data):
23            self.packets_received += 1
24        else:
25            self.packets_dropped += 1
26
27        self.last_packet_received_at = time.monotonic()
28
29    def update(self):
30        if (time.monotonic() - self.last_packet_received_at) > self.timeout:
31            return (self.packets_received, self.packets_dropped)
32        return None
33
34    def _validate_packet_(self, data: bytes) -> bool:
35        prng_state = 1234
36        for b in data:
37            prng_state = ((1103515245 * prng_state) + 12345) % (1 << 31)
38            if prng_state & 0xFF != b:
39                return False
40        return True
41
42class TEUDPReceive():
43    def __init__(self):
44        self.running = True
45        self.thread = threading.Thread(target=self._target_)
46        self.sessions = []
47
48    def start(self):
49        self.thread.start()
50
51    def stop(self):
52        self.running = False
53        self.thread.join(1)
54
55    def _target_(self):
56        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
57        sock.setblocking(False)
58        sock.bind(('0.0.0.0', 7781))
59
60        while self.running:
61            try:
62                ready_to_read, _, _ = select.select([sock], [sock], [], 0.5)
63
64                if not ready_to_read:
65                    self._update_sessions_(sock)
66                    continue
67
68                data, address = sock.recvfrom(4096)
69
70                print(f'udp received {len(data)} bytes -> {address[0]}:{address[1]}')
71
72                session = self._get_session_by_address_(address)
73                session.on_packet_received(data)
74
75            except Exception as e:
76                print(e)
77                break
78
79        sock.close()
80
81    def _get_session_by_address_(self, address) -> TEUDPReceiveSession:
82        # Search for existing session
83        for session in self.sessions:
84            if session.get_address() == address:
85                return session
86
87        # Create and return new session
88        print(f'Created session for {address[0]}:{address[1]}')
89        self.sessions.append(TEUDPReceiveSession(address, 2))
90        return self.sessions[-1]
91
92    def _update_sessions_(self, sock):
93        sessions = copy.copy(self.sessions)
94
95        for session in sessions:
96            result = session.update()
97
98            if result is None:
99                continue
100
101            response = bytes([result[0], result[1]])
102
103            print(f'Sending result {response} to address {session.get_address()}')
104            sock.sendto(response, session.get_address())
105
106            print(f'Removing session for address {session.get_address()}')
107            self.sessions.remove(session)
108