1# Copyright 2022 Oticon A/S 2# SPDX-License-Identifier: Apache-2.0 3 4# BabbleSim library functions and constants targeting the 2G4 phy 5 6# Note: The following is a copy of the functionality in the BSim C-libraries 7# If/when adding to this library try to stay as close to the C library as possible 8# to make it easier to understand both worlds 9# If this file grows a lot it may be a good idea to use the existing C libraries 10# with a python wrapper; For now that is overkill though 11 12import os 13import platform 14import pwd 15import stat 16 17### Constants ### 18 19## BSim base message headers ## 20# The device wants to wait 21PB_MSG_WAIT = 0x01 22# The device is disconnecting from the phy or viceversa: 23PB_MSG_DISCONNECT = 0xFFFF 24# The device is disconnecting from the phy, and is requesting the phy to end the simulation ASAP 25PB_MSG_TERMINATE = 0xFFFE 26# The requested time tick has just finished 27PB_MSG_WAIT_END = 0x81 28 29## 2G4 Message headers from device to phy ## 30# The device will transmit 31P2G4_MSG_TX = 0x02 32# The device wants to attempt to receive 33P2G4_MSG_RX = 0x11 34# Continue receiving (the device likes the address and headers of the packet) 35P2G4_MSG_RXCONT = 0x12 36# Stop reception (the device does not likes the address or headers of the packet) => The phy will stop this reception 37P2G4_MSG_RXSTOP = 0x13 38# Do an RSSI measurement 39P2G4_MSG_RSSIMEAS = 0x14 40# Device is successfully providing a new p2G4_abort_t 41P2G4_MSG_RERESP_ABORTREEVAL = 0x15 42 43## 2G4 Message headers from phy to device ## 44# Tx completed (fully or not) 45P2G4_MSG_TX_END = 0x100 46# Poking the device to see if it wants to change its abort time 47P2G4_MSG_ABORTREEVAL = 0x101 48# Matching address found while receiving 49P2G4_MSG_RX_ADDRESSFOUND = 0x102 50# Rx completed (successfully or not) 51P2G4_MSG_RX_END = 0x103 52# RSSI measurement completed 53P2G4_MSG_RSSI_END = 0x104 54 55TIME_NEVER = 0xFFFFFFFFFFFFFFFF # UINT64_MAX 56 57# 2G4 modulations 58P2G4_MOD_BLE = 0x10 # Standard 1Mbps BLE modulation 59P2G4_MOD_BLE2M = 0x20 # Standard 2Mbps BLE 60P2G4_MOD_PROP2M = 0x21 # Proprietary 2Mbps 61P2G4_MOD_PROP3M = 0x31 # Proprietary 3Mbps 62P2G4_MOD_PROP4M = 0x41 # Proprietary 4Mbps 63 64### Functions ### 65 66# Create folder if it does not already exist 67def create_folder(path): 68 if os.access(path, os.F_OK): 69 return 70 os.makedirs(path, stat.S_IRWXG | stat.S_IRWXU, exist_ok=True) 71 if not os.access(path, os.F_OK): 72 raise Exception("Failed to create folder %s" % path) 73 74# Create 75def create_com_folder(sim_id): 76 pw_name = pwd.getpwuid(os.geteuid())[0] 77 com_path = "/tmp/bs_%s/%s" % (pw_name, sim_id) 78 create_folder(com_path) 79 return com_path 80 81# Get process start time from /proc - note that this only works on Linux 82def get_process_start_time(pid): 83 filename = "/proc/%s/stat" % pid 84 with open(filename, "r") as file: 85 line = file.readline() 86 start_time = line.split(" ")[21] # see man 5 proc 87 return int(start_time) 88 89def lock_file_fill(lock_path, pid): 90 with open(lock_path, "w") as file: 91 file.write(str(pid) + "\n") 92 if platform.system() == "Linux": 93 starttime = get_process_start_time(pid) 94 file.write(str(starttime) + "\n") 95 96def test_and_create_lock_file(com_path, device_nbr, trace): 97 lock_path = "%s/%s.d%i.lock" % (com_path, "2G4", device_nbr) 98 my_pid = os.getpid() 99 if not os.access(lock_path, os.F_OK): 100 # The file does not exist. So unless somebody is racing us, we are safe to go 101 lock_file_fill(lock_path, my_pid) 102 return lock_path 103 104 corrupt_file = False 105 other_dead = False 106 his_starttime = 0 107 108 with open(lock_path, "r") as file: 109 try: 110 his_pid = int(file.readline()) 111 except: 112 corrupt_file = True 113 if not corrupt_file and platform.system() == "Linux": 114 try: 115 his_starttime = int(file.readline()) 116 except: 117 corrupt_file = True 118 119 if corrupt_file: # We are provably racing the other process, we stop 120 raise Exception("Found previous lock owned by unknown process, we may be racing each other => aborting\n") 121 122 try: 123 os.kill(his_pid, 0) 124 except: 125 other_dead = True 126 127 if other_dead == False: # a process with the pid exists 128 if platform.system() == "Linux": 129 # To be sure the pid was not reused, let's check that the process start time matches 130 other_start_time = get_process_start_time(his_pid) 131 if (his_starttime == other_start_time): # it is the same process 132 raise Exception("Found a previous, still RUNNING process w pid %s with the same sim_id and device port which would interfere with this one, aborting" % his_pid) 133 else: 134 other_dead = True 135 else: 136 raise Exception("Found a previous, still RUNNING process w pid %s with the same sim_id and device port which would interfere with this one, aborting" % his_pid) 137 138 trace.trace(3, "Found previous lock owned by DEAD process (pid was %s), will attempt to take over" % his_pid) 139 lock_file_fill(lock_path, my_pid) 140 141 return lock_path 142 143def create_fifo_if_not_there(path): 144 if os.access(path, os.F_OK): 145 return 146 try: 147 os.mkfifo(path, stat.S_IRWXG | stat.S_IRWXU) 148 except: 149 pass 150 if not os.access(path, os.F_OK): 151 raise Exception("Failed to create fifo %s" % path) 152 153# See BLUETOOTH CORE SPECIFICATION Version 5.3, Vol 6, Part B, section 1.4.1 154chIdxToCenterFreq = [ 155 2404, 2406, 2408, 2410, 2412, 2414, 2416, 2418, 2420, 2422, 2424, 156 2428, 2430, 2432, 2434, 2436, 2438, 2440, 2442, 2444, 2446, 2448, 2450, 2452, 2454, 2456, 2458, 2460, 2462, 2464, 2466, 2468, 2470, 2472, 2474, 2476, 2478, 157 2402, 158 2426, 159 2480 160] 161 162# Convert from BLE channel index to the corresponding 2G4 BSim phy frequency value 163def ch_idx_to_2G4_freq(ch_idx): 164 freq = chIdxToCenterFreq[ch_idx] 165 # 2G4 frequency is offset relative to 2400 in 16 bits with fixed point notation (8.8 bits) 166 return ((freq - 2400) << 8) 167 168