1# Copyright 2019 Oticon A/S 2# Copyright 2023 Nordic Semiconductor ASA 3# SPDX-License-Identifier: Apache-2.0 4 5import os 6import csv 7 8KEY_ALTERNATIVES = [ 9 ('start_time', 'Tx_Start_Time'), 10 ('center_freq', 'CenterFreq'), 11 ('phy_address', 'PhyAddress'), 12 ('packet_size', 'PacketSize'), 13 ('packet', 'Packet'), 14] 15 16class CSVFile: 17 def __init__(self, f): 18 if not hasattr(f, 'read'): 19 f = open(f, newline='') 20 self.file = f 21 self.reader = csv.reader(self.file, delimiter=',', lineterminator='\n') 22 try: 23 headers = self.reader.__next__() 24 except StopIteration: 25 print("File %s is fully empty"%self.file.name) 26 headers = []; 27 for (key, alt) in KEY_ALTERNATIVES: 28 if not key in headers and alt in headers: 29 headers[headers.index(alt)] = key 30 self.headers = headers 31 32 def __del__(self): 33 self.file.close() 34 35 def __enter__(self): 36 return self 37 38 def __exit__(self, exc_type, exc_val, exc_tb): 39 self.file.close() 40 return False 41 42 def __iter__(self): 43 return self 44 45 def __next__(self): 46 row = {} 47 lst = self.reader.__next__() 48 for idx, header in enumerate(self.headers): 49 try: 50 row[header] = lst[idx] 51 except IndexError: # The last line may be corrupted, so let's end if we find a corrupted one 52 print("Input file %s truncated mid line, ignoring line"%self.file.name) 53 return None 54 row['start_time'] = int(row['start_time'], 10) 55 return row 56 57def open_input(filename): 58 try: 59 mtime = os.path.getmtime(filename) 60 return (mtime, open(filename, mode='r', newline='')) 61 except OSError as e: 62 raise argparse.ArgumentTypeError( 63 "can't open '{}': {}".format(filename, e)) 64