1# Python class for controlling wlantest 2# Copyright (c) 2013-2019, Jouni Malinen <j@w1.fi> 3# 4# This software may be distributed under the terms of the BSD license. 5# See README for more details. 6 7import re 8import os 9import posixpath 10import time 11import subprocess 12import logging 13import wpaspy 14 15logger = logging.getLogger() 16 17class Wlantest: 18 remote_host = None 19 setup_params = None 20 exe_thread = None 21 exe_res = [] 22 monitor_mod = None 23 setup_done = False 24 25 @classmethod 26 def stop_remote_wlantest(cls): 27 if cls.exe_thread is None: 28 # Local flow - no need for remote operations 29 return 30 31 cls.remote_host.execute(["killall", "-9", "wlantest"]) 32 cls.remote_host.thread_wait(cls.exe_thread, 5) 33 cls.exe_thread = None 34 cls.exe_res = [] 35 36 @classmethod 37 def reset_remote_wlantest(cls): 38 cls.stop_remote_wlantest() 39 cls.remote_host = None 40 cls.setup_params = None 41 cls.exe_thread = None 42 cls.exe_res = [] 43 cls.monitor_mod = None 44 cls.setup_done = False 45 46 @classmethod 47 def start_remote_wlantest(cls): 48 if cls.remote_host is None: 49 # Local flow - no need for remote operations 50 return 51 if cls.exe_thread is not None: 52 raise Exception("Cannot start wlantest twice") 53 54 log_dir = cls.setup_params['log_dir'] 55 ifaces = re.split('; | |, ', cls.remote_host.ifname) 56 ifname = ifaces[0] 57 exe = cls.setup_params["wlantest"] 58 tc_name = cls.setup_params["tc_name"] 59 base_log_name = tc_name + "_wlantest_" + \ 60 cls.remote_host.name + "_" + ifname 61 log_file = posixpath.join(log_dir, base_log_name + ".log") 62 pcap_file = posixpath.join(log_dir, base_log_name + ".pcapng") 63 cmd = "{} -i {} -n {} -c -dtN -L {}".format(exe, ifname, 64 pcap_file, log_file) 65 cls.remote_host.add_log(log_file) 66 cls.remote_host.add_log(pcap_file) 67 cls.exe_thread = cls.remote_host.thread_run(cmd.split(), cls.exe_res) 68 # Give wlantest a chance to start working 69 time.sleep(1) 70 71 @classmethod 72 def register_remote_wlantest(cls, host, setup_params, monitor_mod): 73 if cls.remote_host is not None: 74 raise Exception("Cannot register remote wlantest twice") 75 cls.remote_host = host 76 cls.setup_params = setup_params 77 cls.monitor_mod = monitor_mod 78 status, buf = host.execute(["which", setup_params['wlantest']]) 79 if status != 0: 80 raise Exception(host.name + " - wlantest: " + buf) 81 status, buf = host.execute(["which", setup_params['wlantest_cli']]) 82 if status != 0: 83 raise Exception(host.name + " - wlantest_cli: " + buf) 84 85 @classmethod 86 def chan_from_wpa(cls, wpa, is_p2p=False): 87 if cls.monitor_mod is None: 88 return 89 m = cls.monitor_mod 90 return m.setup(cls.remote_host, [m.get_monitor_params(wpa, is_p2p)]) 91 92 @classmethod 93 def setup(cls, wpa, is_p2p=False): 94 if wpa: 95 cls.chan_from_wpa(wpa, is_p2p) 96 cls.start_remote_wlantest() 97 cls.setup_done = True 98 99 def __init__(self): 100 if not self.setup_done: 101 raise Exception("Cannot create Wlantest instance before setup()") 102 if os.path.isfile('../../wlantest/wlantest_cli'): 103 self.wlantest_cli = '../../wlantest/wlantest_cli' 104 else: 105 self.wlantest_cli = 'wlantest_cli' 106 107 def cli_cmd(self, params): 108 if self.remote_host is not None: 109 exe = self.setup_params["wlantest_cli"] 110 ret = self.remote_host.execute([exe] + params) 111 if ret[0] != 0: 112 raise Exception("wlantest_cli failed") 113 return ret[1] 114 else: 115 return subprocess.check_output([self.wlantest_cli] + params).decode() 116 117 def flush(self): 118 res = self.cli_cmd(["flush"]) 119 if "FAIL" in res: 120 raise Exception("wlantest_cli flush failed") 121 122 def relog(self): 123 res = self.cli_cmd(["relog"]) 124 if "FAIL" in res: 125 raise Exception("wlantest_cli relog failed") 126 127 def add_passphrase(self, passphrase): 128 res = self.cli_cmd(["add_passphrase", passphrase]) 129 if "FAIL" in res: 130 raise Exception("wlantest_cli add_passphrase failed") 131 132 def add_wepkey(self, key): 133 res = self.cli_cmd(["add_wepkey", key]) 134 if "FAIL" in res: 135 raise Exception("wlantest_cli add_key failed") 136 137 def info_bss(self, field, bssid): 138 res = self.cli_cmd(["info_bss", field, bssid]) 139 if "FAIL" in res: 140 raise Exception("Could not get BSS info from wlantest for " + bssid) 141 return res 142 143 def get_bss_counter(self, field, bssid): 144 try: 145 res = self.cli_cmd(["get_bss_counter", field, bssid]) 146 except Exception as e: 147 return 0 148 if "FAIL" in res: 149 return 0 150 return int(res) 151 152 def clear_bss_counters(self, bssid): 153 self.cli_cmd(["clear_bss_counters", bssid]) 154 155 def info_sta(self, field, bssid, addr): 156 res = self.cli_cmd(["info_sta", field, bssid, addr]) 157 if "FAIL" in res: 158 raise Exception("Could not get STA info from wlantest for " + addr) 159 return res 160 161 def get_sta_counter(self, field, bssid, addr): 162 res = self.cli_cmd(["get_sta_counter", field, bssid, addr]) 163 if "FAIL" in res: 164 raise Exception("wlantest_cli command failed") 165 return int(res) 166 167 def clear_sta_counters(self, bssid, addr): 168 res = self.cli_cmd(["clear_sta_counters", bssid, addr]) 169 if "FAIL" in res: 170 raise Exception("wlantest_cli command failed") 171 172 def tdls_clear(self, bssid, addr1, addr2): 173 self.cli_cmd(["clear_tdls_counters", bssid, addr1, addr2]) 174 175 def get_tdls_counter(self, field, bssid, addr1, addr2): 176 res = self.cli_cmd(["get_tdls_counter", field, bssid, addr1, addr2]) 177 if "FAIL" in res: 178 raise Exception("wlantest_cli command failed") 179 return int(res) 180 181 def require_ap_pmf_mandatory(self, bssid): 182 res = self.info_bss("rsn_capab", bssid) 183 if "MFPR" not in res: 184 raise Exception("AP did not require PMF") 185 if "MFPC" not in res: 186 raise Exception("AP did not enable PMF") 187 res = self.info_bss("key_mgmt", bssid) 188 if "PSK-SHA256" not in res: 189 raise Exception("AP did not enable SHA256-based AKM for PMF") 190 191 def require_ap_pmf_optional(self, bssid): 192 res = self.info_bss("rsn_capab", bssid) 193 if "MFPR" in res: 194 raise Exception("AP required PMF") 195 if "MFPC" not in res: 196 raise Exception("AP did not enable PMF") 197 198 def require_ap_no_pmf(self, bssid): 199 res = self.info_bss("rsn_capab", bssid) 200 if "MFPR" in res: 201 raise Exception("AP required PMF") 202 if "MFPC" in res: 203 raise Exception("AP enabled PMF") 204 205 def require_sta_pmf_mandatory(self, bssid, addr): 206 res = self.info_sta("rsn_capab", bssid, addr) 207 if "MFPR" not in res: 208 raise Exception("STA did not require PMF") 209 if "MFPC" not in res: 210 raise Exception("STA did not enable PMF") 211 212 def require_sta_pmf(self, bssid, addr): 213 res = self.info_sta("rsn_capab", bssid, addr) 214 if "MFPC" not in res: 215 raise Exception("STA did not enable PMF") 216 217 def require_sta_no_pmf(self, bssid, addr): 218 res = self.info_sta("rsn_capab", bssid, addr) 219 if "MFPC" in res: 220 raise Exception("STA enabled PMF") 221 222 def require_sta_key_mgmt(self, bssid, addr, key_mgmt): 223 res = self.info_sta("key_mgmt", bssid, addr) 224 if key_mgmt not in res: 225 raise Exception("Unexpected STA key_mgmt") 226 227 def get_tx_tid(self, bssid, addr, tid): 228 res = self.cli_cmd(["get_tx_tid", bssid, addr, str(tid)]) 229 if "FAIL" in res: 230 raise Exception("wlantest_cli command failed") 231 return int(res) 232 233 def get_rx_tid(self, bssid, addr, tid): 234 res = self.cli_cmd(["get_rx_tid", bssid, addr, str(tid)]) 235 if "FAIL" in res: 236 raise Exception("wlantest_cli command failed") 237 return int(res) 238 239 def get_tid_counters(self, bssid, addr): 240 tx = {} 241 rx = {} 242 for tid in range(0, 17): 243 tx[tid] = self.get_tx_tid(bssid, addr, tid) 244 rx[tid] = self.get_rx_tid(bssid, addr, tid) 245 return [tx, rx] 246 247class WlantestCapture: 248 def __init__(self, ifname, output, netns=None): 249 self.cmd = None 250 self.ifname = ifname 251 if os.path.isfile('../../wlantest/wlantest'): 252 bin = '../../wlantest/wlantest' 253 else: 254 bin = 'wlantest' 255 logger.debug("wlantest[%s] starting" % ifname) 256 args = [bin, '-e', '-i', ifname, '-w', output] 257 if netns: 258 args = ['ip', 'netns', 'exec', netns] + args 259 self.cmd = subprocess.Popen(args, 260 stdout=subprocess.PIPE, 261 stderr=subprocess.PIPE) 262 263 def __del__(self): 264 if self.cmd: 265 self.close() 266 267 def close(self): 268 logger.debug("wlantest[%s] stopping" % self.ifname) 269 self.cmd.terminate() 270 res = self.cmd.communicate() 271 if len(res[0]) > 0: 272 logger.debug("wlantest[%s] stdout: %s" % (self.ifname, 273 res[0].decode().strip())) 274 if len(res[1]) > 0: 275 logger.debug("wlantest[%s] stderr: %s" % (self.ifname, 276 res[1].decode().strip())) 277 self.cmd = None 278