# Copyright (c) 2023 Nordic Semiconductor ASA # # SPDX-License-Identifier: Apache-2.0 from __future__ import annotations import io import logging import os import threading import time from pathlib import Path logger = logging.getLogger(__name__) class FifoHandler: """ Class dedicated for handling communication over POSIX system FIFO (named pipes). """ def __init__(self, fifo_path: str | Path, timeout: float): """ :param fifo_path: path to basic fifo name :param timeout: timeout for establishing connection over FIFO """ self._fifo_out_path = str(fifo_path) + '.out' self._fifo_in_path = str(fifo_path) + '.in' self._fifo_out_file: io.FileIO | None = None self._fifo_in_file: io.FileIO | None = None self._open_fifo_thread: threading.Thread | None = None self._opening_monitor_thread: threading.Thread | None = None self._fifo_opened: threading.Event = threading.Event() self._stop_waiting_for_opening: threading.Event = threading.Event() self._timeout = timeout def initiate_connection(self) -> None: """ Opening FIFO could be a blocking operation (it requires also opening FIFO on the other side - by separate program/process). So, to avoid blockage, execute opening FIFO in separate thread and additionally run in second thread opening time monitor to alternatively unblock first thread when timeout will expire. """ self._stop_waiting_for_opening.clear() self._make_fifo_file(self._fifo_out_path) self._make_fifo_file(self._fifo_in_path) if self._open_fifo_thread is None: self._open_fifo_thread = threading.Thread(target=self._open_fifo, daemon=True) self._open_fifo_thread.start() if self._opening_monitor_thread is None: self._opening_monitor_thread = threading.Thread(target=self._opening_monitor, daemon=True) self._opening_monitor_thread.start() @staticmethod def _make_fifo_file(filename: str) -> None: if not os.path.exists(filename): os.mkfifo(filename) def _open_fifo(self) -> None: self._fifo_out_file = open(self._fifo_out_path, 'rb', buffering=0) self._fifo_in_file = open(self._fifo_in_path, 'wb', buffering=0) if not self._stop_waiting_for_opening.is_set(): self._fifo_opened.set() def _opening_monitor(self) -> None: """ Monitor opening FIFO operation - if timeout was expired (or disconnect was called in the meantime), then interrupt opening FIFO in other thread. """ timeout_time: float = time.time() + self._timeout while time.time() < timeout_time and not self._stop_waiting_for_opening.is_set(): if self._fifo_opened.is_set(): return time.sleep(0.1) self._stop_waiting_for_opening.set() self._unblock_open_fifo_operation() def _unblock_open_fifo_operation(self) -> None: """ This is workaround for unblocking opening FIFO operation - imitate opening FIFO "on the other side". """ if os.path.exists(self._fifo_out_path): open(self._fifo_out_path, 'wb', buffering=0) if os.path.exists(self._fifo_in_path): open(self._fifo_in_path, 'rb', buffering=0) def disconnect(self) -> None: self._stop_waiting_for_opening.set() if self._open_fifo_thread and self._open_fifo_thread.is_alive(): self._open_fifo_thread.join(timeout=1) self._open_fifo_thread = None if self._opening_monitor_thread and self._opening_monitor_thread.is_alive(): self._opening_monitor_thread.join(timeout=1) self._opening_monitor_thread = None self._fifo_opened.clear() if self._fifo_out_file: self._fifo_out_file.close() if self._fifo_in_file: self._fifo_in_file.close() if os.path.exists(self._fifo_out_path): os.unlink(self._fifo_out_path) if os.path.exists(self._fifo_in_path): os.unlink(self._fifo_in_path) @property def is_open(self) -> bool: try: return bool( self._fifo_opened.is_set() and self._fifo_in_file is not None and self._fifo_out_file is not None and self._fifo_in_file.fileno() and self._fifo_out_file.fileno() ) except ValueError: return False def read(self, __size: int = -1) -> bytes: return self._fifo_out_file.read(__size) # type: ignore[union-attr] def readline(self, __size: int | None = None) -> bytes: return self._fifo_out_file.readline(__size) # type: ignore[union-attr] def write(self, __buffer: bytes) -> int: return self._fifo_in_file.write(__buffer) # type: ignore[union-attr] def flush_write(self) -> None: if self._fifo_in_file: self._fifo_in_file.flush() def flush_read(self) -> None: if self._fifo_out_file: self._fifo_out_file.flush()