1# Copyright (c) 2023 Nordic Semiconductor ASA
2#
3# SPDX-License-Identifier: Apache-2.0
4
5from __future__ import annotations
6
7import io
8import logging
9import os
10import threading
11import time
12from pathlib import Path
13
14logger = logging.getLogger(__name__)
15
16
17class FifoHandler:
18    """
19    Class dedicated for handling communication over POSIX system FIFO (named
20    pipes).
21    """
22
23    def __init__(self, fifo_path: str | Path, timeout: float):
24        """
25        :param fifo_path: path to basic fifo name
26        :param timeout: timeout for establishing connection over FIFO
27        """
28        self._fifo_out_path = str(fifo_path) + '.out'
29        self._fifo_in_path = str(fifo_path) + '.in'
30        self._fifo_out_file: io.FileIO | None = None
31        self._fifo_in_file: io.FileIO | None = None
32        self._open_fifo_thread: threading.Thread | None = None
33        self._opening_monitor_thread: threading.Thread | None = None
34        self._fifo_opened: threading.Event = threading.Event()
35        self._stop_waiting_for_opening: threading.Event = threading.Event()
36        self._timeout = timeout
37
38    def initiate_connection(self) -> None:
39        """
40        Opening FIFO could be a blocking operation (it requires also opening
41        FIFO on the other side - by separate program/process). So, to avoid
42        blockage, execute opening FIFO in separate thread and additionally run
43        in second thread opening time monitor to alternatively unblock first
44        thread when timeout will expire.
45        """
46        self._stop_waiting_for_opening.clear()
47        self._make_fifo_file(self._fifo_out_path)
48        self._make_fifo_file(self._fifo_in_path)
49        if self._open_fifo_thread is None:
50            self._open_fifo_thread = threading.Thread(target=self._open_fifo, daemon=True)
51            self._open_fifo_thread.start()
52        if self._opening_monitor_thread is None:
53            self._opening_monitor_thread = threading.Thread(target=self._opening_monitor, daemon=True)
54            self._opening_monitor_thread.start()
55
56    @staticmethod
57    def _make_fifo_file(filename: str) -> None:
58        if not os.path.exists(filename):
59            os.mkfifo(filename)
60
61    def _open_fifo(self) -> None:
62        self._fifo_out_file = open(self._fifo_out_path, 'rb', buffering=0)
63        self._fifo_in_file = open(self._fifo_in_path, 'wb', buffering=0)
64        if not self._stop_waiting_for_opening.is_set():
65            self._fifo_opened.set()
66
67    def _opening_monitor(self) -> None:
68        """
69        Monitor opening FIFO operation - if timeout was expired (or disconnect
70        was called in the meantime), then interrupt opening FIFO in other
71        thread.
72        """
73        timeout_time: float = time.time() + self._timeout
74        while time.time() < timeout_time and not self._stop_waiting_for_opening.is_set():
75            if self._fifo_opened.is_set():
76                return
77            time.sleep(0.1)
78        self._stop_waiting_for_opening.set()
79        self._unblock_open_fifo_operation()
80
81    def _unblock_open_fifo_operation(self) -> None:
82        """
83        This is workaround for unblocking opening FIFO operation - imitate
84        opening FIFO "on the other side".
85        """
86        if os.path.exists(self._fifo_out_path):
87            open(self._fifo_out_path, 'wb', buffering=0)
88        if os.path.exists(self._fifo_in_path):
89            open(self._fifo_in_path, 'rb', buffering=0)
90
91    def disconnect(self) -> None:
92        self._stop_waiting_for_opening.set()
93        if self._open_fifo_thread and self._open_fifo_thread.is_alive():
94            self._open_fifo_thread.join(timeout=1)
95        self._open_fifo_thread = None
96        if self._opening_monitor_thread and self._opening_monitor_thread.is_alive():
97            self._opening_monitor_thread.join(timeout=1)
98        self._opening_monitor_thread = None
99        self._fifo_opened.clear()
100
101        if self._fifo_out_file:
102            self._fifo_out_file.close()
103        if self._fifo_in_file:
104            self._fifo_in_file.close()
105
106        if os.path.exists(self._fifo_out_path):
107            os.unlink(self._fifo_out_path)
108        if os.path.exists(self._fifo_in_path):
109            os.unlink(self._fifo_in_path)
110
111    @property
112    def is_open(self) -> bool:
113        try:
114            return bool(
115                self._fifo_opened.is_set()
116                and self._fifo_in_file is not None and self._fifo_out_file is not None
117                and self._fifo_in_file.fileno() and self._fifo_out_file.fileno()
118            )
119        except ValueError:
120            return False
121
122    def read(self, __size: int = -1) -> bytes:
123        return self._fifo_out_file.read(__size)  # type: ignore[union-attr]
124
125    def readline(self, __size: int | None = None) -> bytes:
126        return self._fifo_out_file.readline(__size)  # type: ignore[union-attr]
127
128    def write(self, __buffer: bytes) -> int:
129        return self._fifo_in_file.write(__buffer)  # type: ignore[union-attr]
130
131    def flush_write(self) -> None:
132        if self._fifo_in_file:
133            self._fifo_in_file.flush()
134
135    def flush_read(self) -> None:
136        if self._fifo_out_file:
137            self._fifo_out_file.flush()
138