1# Copyright (c) 2023 Nordic Semiconductor ASA 2# 3# SPDX-License-Identifier: Apache-2.0 4 5from __future__ import annotations 6 7import io 8import logging 9import os 10import threading 11import time 12from pathlib import Path 13 14logger = logging.getLogger(__name__) 15 16 17class FifoHandler: 18 """ 19 Class dedicated for handling communication over POSIX system FIFO (named 20 pipes). 21 """ 22 23 def __init__(self, fifo_path: str | Path, timeout: float): 24 """ 25 :param fifo_path: path to basic fifo name 26 :param timeout: timeout for establishing connection over FIFO 27 """ 28 self._fifo_out_path = str(fifo_path) + '.out' 29 self._fifo_in_path = str(fifo_path) + '.in' 30 self._fifo_out_file: io.FileIO | None = None 31 self._fifo_in_file: io.FileIO | None = None 32 self._open_fifo_thread: threading.Thread | None = None 33 self._opening_monitor_thread: threading.Thread | None = None 34 self._fifo_opened: threading.Event = threading.Event() 35 self._stop_waiting_for_opening: threading.Event = threading.Event() 36 self._timeout = timeout 37 38 def initiate_connection(self) -> None: 39 """ 40 Opening FIFO could be a blocking operation (it requires also opening 41 FIFO on the other side - by separate program/process). So, to avoid 42 blockage, execute opening FIFO in separate thread and additionally run 43 in second thread opening time monitor to alternatively unblock first 44 thread when timeout will expire. 45 """ 46 self._stop_waiting_for_opening.clear() 47 self._make_fifo_file(self._fifo_out_path) 48 self._make_fifo_file(self._fifo_in_path) 49 if self._open_fifo_thread is None: 50 self._open_fifo_thread = threading.Thread(target=self._open_fifo, daemon=True) 51 self._open_fifo_thread.start() 52 if self._opening_monitor_thread is None: 53 self._opening_monitor_thread = threading.Thread(target=self._opening_monitor, daemon=True) 54 self._opening_monitor_thread.start() 55 56 @staticmethod 57 def _make_fifo_file(filename: str) -> None: 58 if not os.path.exists(filename): 59 os.mkfifo(filename) 60 61 def _open_fifo(self) -> None: 62 self._fifo_out_file = open(self._fifo_out_path, 'rb', buffering=0) 63 self._fifo_in_file = open(self._fifo_in_path, 'wb', buffering=0) 64 if not self._stop_waiting_for_opening.is_set(): 65 self._fifo_opened.set() 66 67 def _opening_monitor(self) -> None: 68 """ 69 Monitor opening FIFO operation - if timeout was expired (or disconnect 70 was called in the meantime), then interrupt opening FIFO in other 71 thread. 72 """ 73 timeout_time: float = time.time() + self._timeout 74 while time.time() < timeout_time and not self._stop_waiting_for_opening.is_set(): 75 if self._fifo_opened.is_set(): 76 return 77 time.sleep(0.1) 78 self._stop_waiting_for_opening.set() 79 self._unblock_open_fifo_operation() 80 81 def _unblock_open_fifo_operation(self) -> None: 82 """ 83 This is workaround for unblocking opening FIFO operation - imitate 84 opening FIFO "on the other side". 85 """ 86 if os.path.exists(self._fifo_out_path): 87 open(self._fifo_out_path, 'wb', buffering=0) 88 if os.path.exists(self._fifo_in_path): 89 open(self._fifo_in_path, 'rb', buffering=0) 90 91 def disconnect(self) -> None: 92 self._stop_waiting_for_opening.set() 93 if self._open_fifo_thread and self._open_fifo_thread.is_alive(): 94 self._open_fifo_thread.join(timeout=1) 95 self._open_fifo_thread = None 96 if self._opening_monitor_thread and self._opening_monitor_thread.is_alive(): 97 self._opening_monitor_thread.join(timeout=1) 98 self._opening_monitor_thread = None 99 self._fifo_opened.clear() 100 101 if self._fifo_out_file: 102 self._fifo_out_file.close() 103 if self._fifo_in_file: 104 self._fifo_in_file.close() 105 106 if os.path.exists(self._fifo_out_path): 107 os.unlink(self._fifo_out_path) 108 if os.path.exists(self._fifo_in_path): 109 os.unlink(self._fifo_in_path) 110 111 @property 112 def is_open(self) -> bool: 113 try: 114 return bool( 115 self._fifo_opened.is_set() 116 and self._fifo_in_file is not None and self._fifo_out_file is not None 117 and self._fifo_in_file.fileno() and self._fifo_out_file.fileno() 118 ) 119 except ValueError: 120 return False 121 122 def read(self, __size: int = -1) -> bytes: 123 return self._fifo_out_file.read(__size) # type: ignore[union-attr] 124 125 def readline(self, __size: int | None = None) -> bytes: 126 return self._fifo_out_file.readline(__size) # type: ignore[union-attr] 127 128 def write(self, __buffer: bytes) -> int: 129 return self._fifo_in_file.write(__buffer) # type: ignore[union-attr] 130 131 def flush_write(self) -> None: 132 if self._fifo_in_file: 133 self._fifo_in_file.flush() 134 135 def flush_read(self) -> None: 136 if self._fifo_out_file: 137 self._fifo_out_file.flush() 138