1# Testing utilities 2# Copyright (c) 2013-2019, Jouni Malinen <j@w1.fi> 3# 4# This software may be distributed under the terms of the BSD license. 5# See README for more details. 6 7import binascii 8import os 9import socket 10import struct 11import subprocess 12import time 13import remotehost 14import logging 15import re 16logger = logging.getLogger() 17import hostapd 18 19def get_ifnames(): 20 ifnames = [] 21 with open("/proc/net/dev", "r") as f: 22 lines = f.readlines() 23 for l in lines: 24 val = l.split(':', 1) 25 if len(val) == 2: 26 ifnames.append(val[0].strip(' ')) 27 return ifnames 28 29class HwsimSkip(Exception): 30 def __init__(self, reason): 31 self.reason = reason 32 def __str__(self): 33 return self.reason 34 35def long_duration_test(func): 36 func.long_duration_test = True 37 return func 38 39class fail_test(object): 40 _test_fail = 'TEST_FAIL' 41 _get_fail = 'GET_FAIL' 42 43 def __init__(self, dev, count, funcs, *args): 44 self._dev = dev 45 self._funcs = [(count, funcs)] 46 47 args = list(args) 48 while args: 49 count = args.pop(0) 50 funcs = args.pop(0) 51 self._funcs.append((count, funcs)) 52 def __enter__(self): 53 patterns = ' '.join(['%d:%s' % (c, f) for c, f in self._funcs]) 54 cmd = '%s %s' % (self._test_fail, patterns) 55 if "OK" not in self._dev.request(cmd): 56 raise HwsimSkip("TEST_FAIL not supported") 57 def __exit__(self, type, value, traceback): 58 pending = self._dev.request(self._get_fail) 59 if type is None: 60 expected = ' '.join(['0:%s' % f for c, f in self._funcs]) 61 if pending != expected: 62 # Ensure the failure cannot trigger in the future 63 self._dev.request('%s 0:' % self._test_fail) 64 raise Exception("Not all failures triggered (pending: %s)" % pending) 65 else: 66 logger.info("Pending failures at time of exception: %s" % pending) 67 68class alloc_fail(fail_test): 69 _test_fail = 'TEST_ALLOC_FAIL' 70 _get_fail = 'GET_ALLOC_FAIL' 71 72def wait_fail_trigger(dev, cmd, note="Failure not triggered", max_iter=40, 73 timeout=0.05): 74 for i in range(0, max_iter): 75 if dev.request(cmd).startswith("0:"): 76 break 77 if i == max_iter - 1: 78 raise Exception(note) 79 time.sleep(timeout) 80 81def require_under_vm(): 82 if os.getenv('VM') != 'VM': 83 raise HwsimSkip("Not running under VM") 84 85def iface_is_in_bridge(bridge, ifname): 86 fname = "/sys/class/net/"+ifname+"/brport/bridge" 87 if not os.path.exists(fname): 88 return False 89 if not os.path.islink(fname): 90 return False 91 truebridge = os.path.basename(os.readlink(fname)) 92 if bridge == truebridge: 93 return True 94 return False 95 96def skip_with_fips(dev, reason="Not supported in FIPS mode"): 97 res = dev.get_capability("fips") 98 if res and 'FIPS' in res: 99 raise HwsimSkip(reason) 100 101def check_ext_key_id_capa(dev): 102 res = dev.get_driver_status_field('capa.flags') 103 if (int(res, 0) & 0x8000000000000000) == 0: 104 raise HwsimSkip("Extended Key ID not supported") 105 106def skip_without_tkip(dev): 107 res = dev.get_capability("fips") 108 if "TKIP" not in dev.get_capability("pairwise") or \ 109 "TKIP" not in dev.get_capability("group"): 110 raise HwsimSkip("Cipher TKIP not supported") 111 112def check_wep_capa(dev): 113 if "WEP40" not in dev.get_capability("group"): 114 raise HwsimSkip("WEP not supported") 115 116def check_sae_capab(dev): 117 if "SAE" not in dev.get_capability("auth_alg"): 118 raise HwsimSkip("SAE not supported") 119 120def check_sae_pk_capab(dev): 121 capab = dev.get_capability("sae") 122 if capab is None or "PK" not in capab: 123 raise HwsimSkip("SAE-PK not supported") 124 125def check_erp_capa(dev): 126 capab = dev.get_capability("erp") 127 if not capab or 'ERP' not in capab: 128 raise HwsimSkip("ERP not supported in the build") 129 130def check_fils_capa(dev): 131 capa = dev.get_capability("fils") 132 if capa is None or "FILS" not in capa: 133 raise HwsimSkip("FILS not supported") 134 135def check_fils_sk_pfs_capa(dev): 136 capa = dev.get_capability("fils") 137 if capa is None or "FILS-SK-PFS" not in capa: 138 raise HwsimSkip("FILS-SK-PFS not supported") 139 140def check_imsi_privacy_support(dev): 141 tls = dev.request("GET tls_library") 142 if tls.startswith("OpenSSL"): 143 return 144 raise HwsimSkip("IMSI privacy not supported with this TLS library: " + tls) 145 146def check_tls_tod(dev): 147 tls = dev.request("GET tls_library") 148 if not tls.startswith("OpenSSL") and not tls.startswith("internal"): 149 raise HwsimSkip("TLS TOD-TOFU/STRICT not supported with this TLS library: " + tls) 150 151def vht_supported(): 152 cmd = subprocess.Popen(["iw", "reg", "get"], stdout=subprocess.PIPE) 153 reg = cmd.stdout.read().decode() 154 if "@ 80)" in reg or "@ 160)" in reg: 155 return True 156 return False 157 158def eht_320mhz_supported(): 159 cmd = subprocess.Popen(["iw", "reg", "get"], 160 stdout=subprocess.PIPE) 161 cmd = subprocess.Popen(["iw", "reg", "get"], stdout=subprocess.PIPE) 162 reg = cmd.stdout.read().decode() 163 if "@ 320)" in reg: 164 return True 165 return False 166 167def he_6ghz_supported(freq=5975): 168 cmd = subprocess.Popen(["iw", "reg", "get"], 169 stdout=subprocess.PIPE) 170 reg_rules = cmd.stdout.read().decode().splitlines() 171 for rule in reg_rules: 172 m = re.search(r"\s*\(\d+\s*-\s*\d+", rule) 173 if not m: 174 continue 175 freqs = re.findall(r"\d+", m.group(0)) 176 if int(freqs[0]) <= freq and freq <= int(freqs[1]): 177 return True 178 179 return False 180 181# This function checks whether the provided dev, which may be either 182# WpaSupplicant or Hostapd supports CSA. 183def csa_supported(dev): 184 res = dev.get_driver_status() 185 if (int(res['capa.flags'], 0) & 0x80000000) == 0: 186 raise HwsimSkip("CSA not supported") 187 188def get_phy(ap, ifname=None): 189 phy = "phy3" 190 try: 191 hostname = ap['hostname'] 192 except: 193 hostname = None 194 host = remotehost.Host(hostname) 195 196 if ifname == None: 197 ifname = ap['ifname'] 198 status, buf = host.execute(["iw", "dev", ifname, "info"]) 199 if status != 0: 200 raise Exception("iw " + ifname + " info failed") 201 lines = buf.split("\n") 202 for line in lines: 203 if "wiphy" in line: 204 words = line.split() 205 phy = "phy" + words[1] 206 break 207 return phy 208 209def parse_ie(buf): 210 ret = {} 211 data = binascii.unhexlify(buf) 212 while len(data) >= 2: 213 ie, elen = struct.unpack('BB', data[0:2]) 214 data = data[2:] 215 if elen > len(data): 216 break 217 ret[ie] = data[0:elen] 218 data = data[elen:] 219 return ret 220 221def wait_regdom_changes(dev): 222 for i in range(10): 223 ev = dev.wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=0.1) 224 if ev is None: 225 break 226 227def clear_country(dev): 228 logger.info("Try to clear country") 229 id = dev[1].add_network() 230 dev[1].set_network(id, "mode", "2") 231 dev[1].set_network_quoted(id, "ssid", "country-clear") 232 dev[1].set_network(id, "key_mgmt", "NONE") 233 dev[1].set_network(id, "frequency", "2412") 234 dev[1].set_network(id, "scan_freq", "2412") 235 dev[1].select_network(id) 236 ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"]) 237 if ev: 238 dev[0].connect("country-clear", key_mgmt="NONE", scan_freq="2412") 239 dev[1].request("DISCONNECT") 240 dev[0].wait_disconnected() 241 dev[0].request("DISCONNECT") 242 dev[0].request("ABORT_SCAN") 243 time.sleep(1) 244 dev[0].dump_monitor() 245 dev[1].dump_monitor() 246 247def clear_regdom(hapd, dev, count=1): 248 disable_hapd(hapd) 249 clear_regdom_dev(dev, count) 250 251def disable_hapd(hapd): 252 if hapd: 253 hapd.request("DISABLE") 254 time.sleep(0.1) 255 256def clear_regdom_dev(dev, count=1): 257 for i in range(count): 258 dev[i].request("DISCONNECT") 259 for i in range(count): 260 dev[i].disconnect_and_stop_scan() 261 dev[0].cmd_execute(['iw', 'reg', 'set', '00']) 262 wait_regdom_changes(dev[0]) 263 country = dev[0].get_driver_status_field("country") 264 logger.info("Country code at the end: " + country) 265 if country != "00": 266 clear_country(dev) 267 for i in range(count): 268 dev[i].flush_scan_cache() 269 270def radiotap_build(): 271 radiotap_payload = struct.pack('BB', 0x08, 0) 272 radiotap_payload += struct.pack('BB', 0, 0) 273 radiotap_payload += struct.pack('BB', 0, 0) 274 radiotap_hdr = struct.pack('<BBHL', 0, 0, 8 + len(radiotap_payload), 275 0xc002) 276 return radiotap_hdr + radiotap_payload 277 278def start_monitor(ifname, freq=2412): 279 subprocess.check_call(["iw", ifname, "set", "type", "monitor"]) 280 subprocess.call(["ip", "link", "set", "dev", ifname, "up"]) 281 subprocess.check_call(["iw", ifname, "set", "freq", str(freq)]) 282 283 ETH_P_ALL = 3 284 sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, 285 socket.htons(ETH_P_ALL)) 286 sock.bind((ifname, 0)) 287 sock.settimeout(0.5) 288 return sock 289 290def stop_monitor(ifname): 291 subprocess.call(["ip", "link", "set", "dev", ifname, "down"]) 292 subprocess.call(["iw", ifname, "set", "type", "managed"]) 293 294def clear_scan_cache(apdev): 295 ifname = apdev['ifname'] 296 hostapd.cmd_execute(apdev, ['ifconfig', ifname, 'up']) 297 hostapd.cmd_execute(apdev, ['iw', ifname, 'scan', 'trigger', 'freq', '2412', 298 'flush']) 299 time.sleep(0.1) 300 hostapd.cmd_execute(apdev, ['ifconfig', ifname, 'down']) 301 302def set_world_reg(apdev0=None, apdev1=None, dev0=None): 303 if apdev0: 304 hostapd.cmd_execute(apdev0, ['iw', 'reg', 'set', '00']) 305 if apdev1: 306 hostapd.cmd_execute(apdev1, ['iw', 'reg', 'set', '00']) 307 if dev0: 308 dev0.cmd_execute(['iw', 'reg', 'set', '00']) 309 time.sleep(0.1) 310 311def sysctl_write(val): 312 subprocess.call(['sysctl', '-w', val], stdout=open('/dev/null', 'w')) 313 314def var_arg_call(fn, dev, apdev, params): 315 if fn.__code__.co_argcount > 2: 316 return fn(dev, apdev, params) 317 elif fn.__code__.co_argcount > 1: 318 return fn(dev, apdev) 319 return fn(dev) 320 321def cloned_wrapper(wrapper, fn): 322 # we need the name set right for selecting / printing etc. 323 wrapper.__name__ = fn.__name__ 324 wrapper.__doc__ = fn.__doc__ 325 # reparent to the right module for module filtering 326 wrapper.__module__ = fn.__module__ 327 return wrapper 328 329def disable_ipv6(fn): 330 def wrapper(dev, apdev, params): 331 require_under_vm() 332 try: 333 sysctl_write('net.ipv6.conf.all.disable_ipv6=1') 334 sysctl_write('net.ipv6.conf.default.disable_ipv6=1') 335 var_arg_call(fn, dev, apdev, params) 336 finally: 337 sysctl_write('net.ipv6.conf.all.disable_ipv6=0') 338 sysctl_write('net.ipv6.conf.default.disable_ipv6=0') 339 return cloned_wrapper(wrapper, fn) 340