1import socket
2import asyncio
3from telnetlib3 import open_connection, TelnetReader
4
5ENCODING = "utf-8"
6
7reader: TelnetReader
8
9def find_free_port() -> int:
10    # Return open port number
11    s = socket.socket()
12    s.bind(('localhost', 0))
13    port = s.getsockname()[1]
14    s.close()
15    return port
16
17def telnet_connect(port: int) -> None:
18    global reader
19
20    loop = asyncio.get_event_loop()
21
22    # Coroutines with event loop required for robot tests
23    coro = open_connection('localhost', port)
24    reader, _ = loop.run_until_complete(coro)
25
26def telnet_read_until(until_string: str, timeout: int = 15) -> str:
27    global reader
28
29    loop = asyncio.get_event_loop()
30
31    # Wrap the readuntil coroutine with wait_for to add timeout
32    coro = asyncio.wait_for(reader.readuntil(until_string.encode(ENCODING)), timeout=timeout)
33    data = loop.run_until_complete(coro)
34
35    return data.decode(ENCODING)