1# Copyright (c) 2023 Nordic Semiconductor ASA 2# 3# SPDX-License-Identifier: Apache-2.0 4 5import logging 6import os 7import sys 8import threading 9import time 10from argparse import ArgumentParser 11 12from zen_of_python import zen_of_python 13 14 15class FifoFile: 16 def __init__(self, filename, mode): 17 self.filename = filename 18 self.mode = mode 19 self.thread = None 20 self.file = None 21 self.logger = logging.getLogger(__name__) 22 23 def _open(self): 24 self.logger.info(f'Creating fifo file: {self.filename}') 25 end_time = time.time() + 2 26 while not os.path.exists(self.filename): 27 time.sleep(0.1) 28 if time.time() > end_time: 29 self.logger.error(f'Did not able create fifo file: {self.filename}') 30 return 31 self.file = open(self.filename, self.mode, buffering=0) 32 self.logger.info(f'File created: {self.filename}') 33 34 def open(self): 35 self.thread = threading.Thread(target=self._open(), daemon=True) 36 self.thread.start() 37 38 def write(self, data): 39 if self.file: 40 self.file.write(data) 41 42 def read(self): 43 if self.file: 44 return self.file.readline() 45 46 def close(self): 47 if self.file: 48 self.file.close() 49 self.thread.join(1) 50 self.logger.info(f'Closed file: {self.filename}') 51 52 def __enter__(self): 53 self.open() 54 return self 55 56 def __exit__(self, exc_type, exc_val, exc_tb): 57 self.close() 58 59 60def main(): 61 logging.basicConfig(level='DEBUG') 62 parser = ArgumentParser() 63 parser.add_argument('file') 64 args = parser.parse_args() 65 read_path = args.file + '.in' 66 write_path = args.file + '.out' 67 logger = logging.getLogger(__name__) 68 logger.info('Start') 69 70 with FifoFile(write_path, 'wb') as wf, FifoFile(read_path, 'rb'): 71 for line in zen_of_python: 72 wf.write(f'{line}\n'.encode('utf-8')) 73 time.sleep(1) # give a moment for external programs to collect all outputs 74 return 0 75 76 77if __name__ == '__main__': 78 sys.exit(main()) 79