# Copyright (c) 2023 Nordic Semiconductor ASA # # SPDX-License-Identifier: Apache-2.0 import logging import os import sys import threading import time from argparse import ArgumentParser from zen_of_python import zen_of_python class FifoFile: def __init__(self, filename, mode): self.filename = filename self.mode = mode self.thread = None self.file = None self.logger = logging.getLogger(__name__) def _open(self): self.logger.info(f'Creating fifo file: {self.filename}') end_time = time.time() + 2 while not os.path.exists(self.filename): time.sleep(0.1) if time.time() > end_time: self.logger.error(f'Did not able create fifo file: {self.filename}') return self.file = open(self.filename, self.mode, buffering=0) self.logger.info(f'File created: {self.filename}') def open(self): self.thread = threading.Thread(target=self._open(), daemon=True) self.thread.start() def write(self, data): if self.file: self.file.write(data) def read(self): if self.file: return self.file.readline() def close(self): if self.file: self.file.close() self.thread.join(1) self.logger.info(f'Closed file: {self.filename}') def __enter__(self): self.open() return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() def main(): logging.basicConfig(level='DEBUG') parser = ArgumentParser() parser.add_argument('file') args = parser.parse_args() read_path = args.file + '.in' write_path = args.file + '.out' logger = logging.getLogger(__name__) logger.info('Start') with FifoFile(write_path, 'wb') as wf, FifoFile(read_path, 'rb'): for line in zen_of_python: wf.write(f'{line}\n'.encode('utf-8')) time.sleep(1) # give a moment for external programs to collect all outputs return 0 if __name__ == '__main__': sys.exit(main())