1import socket 2import asyncio 3from telnetlib3 import open_connection, TelnetReader 4 5ENCODING = "utf-8" 6 7reader: TelnetReader 8 9def find_free_port() -> int: 10 # Return open port number 11 s = socket.socket() 12 s.bind(('localhost', 0)) 13 port = s.getsockname()[1] 14 s.close() 15 return port 16 17def telnet_connect(port: int) -> None: 18 global reader 19 20 loop = asyncio.get_event_loop() 21 22 # Coroutines with event loop required for robot tests 23 coro = open_connection('localhost', port) 24 reader, _ = loop.run_until_complete(coro) 25 26def telnet_read_until(until_string: str, timeout: int = 15) -> str: 27 global reader 28 29 loop = asyncio.get_event_loop() 30 31 # Wrap the readuntil coroutine with wait_for to add timeout 32 coro = asyncio.wait_for(reader.readuntil(until_string.encode(ENCODING)), timeout=timeout) 33 data = loop.run_until_complete(coro) 34 35 return data.decode(ENCODING)