1# Testing utilities 2# Copyright (c) 2013-2019, Jouni Malinen <j@w1.fi> 3# 4# This software may be distributed under the terms of the BSD license. 5# See README for more details. 6 7import binascii 8import os 9import socket 10import struct 11import subprocess 12import time 13import remotehost 14import logging 15logger = logging.getLogger() 16import hostapd 17 18def get_ifnames(): 19 ifnames = [] 20 with open("/proc/net/dev", "r") as f: 21 lines = f.readlines() 22 for l in lines: 23 val = l.split(':', 1) 24 if len(val) == 2: 25 ifnames.append(val[0].strip(' ')) 26 return ifnames 27 28class HwsimSkip(Exception): 29 def __init__(self, reason): 30 self.reason = reason 31 def __str__(self): 32 return self.reason 33 34def long_duration_test(func): 35 func.long_duration_test = True 36 return func 37 38class alloc_fail(object): 39 def __init__(self, dev, count, funcs): 40 self._dev = dev 41 self._count = count 42 self._funcs = funcs 43 def __enter__(self): 44 cmd = "TEST_ALLOC_FAIL %d:%s" % (self._count, self._funcs) 45 if "OK" not in self._dev.request(cmd): 46 raise HwsimSkip("TEST_ALLOC_FAIL not supported") 47 def __exit__(self, type, value, traceback): 48 if type is None: 49 if self._dev.request("GET_ALLOC_FAIL") != "0:%s" % self._funcs: 50 raise Exception("Allocation failure did not trigger") 51 52class fail_test(object): 53 def __init__(self, dev, count, funcs): 54 self._dev = dev 55 self._count = count 56 self._funcs = funcs 57 def __enter__(self): 58 cmd = "TEST_FAIL %d:%s" % (self._count, self._funcs) 59 if "OK" not in self._dev.request(cmd): 60 raise HwsimSkip("TEST_FAIL not supported") 61 def __exit__(self, type, value, traceback): 62 if type is None: 63 if self._dev.request("GET_FAIL") != "0:%s" % self._funcs: 64 raise Exception("Test failure did not trigger") 65 66def wait_fail_trigger(dev, cmd, note="Failure not triggered", max_iter=40, 67 timeout=0.05): 68 for i in range(0, max_iter): 69 if dev.request(cmd).startswith("0:"): 70 break 71 if i == max_iter - 1: 72 raise Exception(note) 73 time.sleep(timeout) 74 75def require_under_vm(): 76 with open('/proc/1/cmdline', 'r') as f: 77 cmd = f.read() 78 if "inside.sh" not in cmd: 79 raise HwsimSkip("Not running under VM") 80 81def iface_is_in_bridge(bridge, ifname): 82 fname = "/sys/class/net/"+ifname+"/brport/bridge" 83 if not os.path.exists(fname): 84 return False 85 if not os.path.islink(fname): 86 return False 87 truebridge = os.path.basename(os.readlink(fname)) 88 if bridge == truebridge: 89 return True 90 return False 91 92def skip_with_fips(dev, reason="Not supported in FIPS mode"): 93 res = dev.get_capability("fips") 94 if res and 'FIPS' in res: 95 raise HwsimSkip(reason) 96 97def check_ext_key_id_capa(dev): 98 res = dev.get_driver_status_field('capa.flags') 99 if (int(res, 0) & 0x8000000000000000) == 0: 100 raise HwsimSkip("Extended Key ID not supported") 101 102def skip_without_tkip(dev): 103 res = dev.get_capability("fips") 104 if "TKIP" not in dev.get_capability("pairwise") or \ 105 "TKIP" not in dev.get_capability("group"): 106 raise HwsimSkip("Cipher TKIP not supported") 107 108def check_wep_capa(dev): 109 if "WEP40" not in dev.get_capability("group"): 110 raise HwsimSkip("WEP not supported") 111 112def check_sae_capab(dev): 113 if "SAE" not in dev.get_capability("auth_alg"): 114 raise HwsimSkip("SAE not supported") 115 116def check_sae_pk_capab(dev): 117 capab = dev.get_capability("sae") 118 if capab is None or "PK" not in capab: 119 raise HwsimSkip("SAE-PK not supported") 120 121def check_erp_capa(dev): 122 capab = dev.get_capability("erp") 123 if not capab or 'ERP' not in capab: 124 raise HwsimSkip("ERP not supported in the build") 125 126def check_fils_capa(dev): 127 capa = dev.get_capability("fils") 128 if capa is None or "FILS" not in capa: 129 raise HwsimSkip("FILS not supported") 130 131def check_fils_sk_pfs_capa(dev): 132 capa = dev.get_capability("fils") 133 if capa is None or "FILS-SK-PFS" not in capa: 134 raise HwsimSkip("FILS-SK-PFS not supported") 135 136def check_tls_tod(dev): 137 tls = dev.request("GET tls_library") 138 if not tls.startswith("OpenSSL") and not tls.startswith("internal"): 139 raise HwsimSkip("TLS TOD-TOFU/STRICT not supported with this TLS library: " + tls) 140 141def vht_supported(): 142 cmd = subprocess.Popen(["iw", "reg", "get"], stdout=subprocess.PIPE) 143 reg = cmd.stdout.read() 144 if "@ 80)" in reg or "@ 160)" in reg: 145 return True 146 return False 147 148# This function checks whether the provided dev, which may be either 149# WpaSupplicant or Hostapd supports CSA. 150def csa_supported(dev): 151 res = dev.get_driver_status() 152 if (int(res['capa.flags'], 0) & 0x80000000) == 0: 153 raise HwsimSkip("CSA not supported") 154 155def get_phy(ap, ifname=None): 156 phy = "phy3" 157 try: 158 hostname = ap['hostname'] 159 except: 160 hostname = None 161 host = remotehost.Host(hostname) 162 163 if ifname == None: 164 ifname = ap['ifname'] 165 status, buf = host.execute(["iw", "dev", ifname, "info"]) 166 if status != 0: 167 raise Exception("iw " + ifname + " info failed") 168 lines = buf.split("\n") 169 for line in lines: 170 if "wiphy" in line: 171 words = line.split() 172 phy = "phy" + words[1] 173 break 174 return phy 175 176def parse_ie(buf): 177 ret = {} 178 data = binascii.unhexlify(buf) 179 while len(data) >= 2: 180 ie, elen = struct.unpack('BB', data[0:2]) 181 data = data[2:] 182 if elen > len(data): 183 break 184 ret[ie] = data[0:elen] 185 data = data[elen:] 186 return ret 187 188def wait_regdom_changes(dev): 189 for i in range(10): 190 ev = dev.wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=0.1) 191 if ev is None: 192 break 193 194def clear_country(dev): 195 logger.info("Try to clear country") 196 id = dev[1].add_network() 197 dev[1].set_network(id, "mode", "2") 198 dev[1].set_network_quoted(id, "ssid", "country-clear") 199 dev[1].set_network(id, "key_mgmt", "NONE") 200 dev[1].set_network(id, "frequency", "2412") 201 dev[1].set_network(id, "scan_freq", "2412") 202 dev[1].select_network(id) 203 ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"]) 204 if ev: 205 dev[0].connect("country-clear", key_mgmt="NONE", scan_freq="2412") 206 dev[1].request("DISCONNECT") 207 dev[0].wait_disconnected() 208 dev[0].request("DISCONNECT") 209 dev[0].request("ABORT_SCAN") 210 time.sleep(1) 211 dev[0].dump_monitor() 212 dev[1].dump_monitor() 213 214def clear_regdom(hapd, dev, count=1): 215 disable_hapd(hapd) 216 clear_regdom_dev(dev, count) 217 218def disable_hapd(hapd): 219 if hapd: 220 hapd.request("DISABLE") 221 time.sleep(0.1) 222 223def clear_regdom_dev(dev, count=1): 224 for i in range(count): 225 dev[i].request("DISCONNECT") 226 for i in range(count): 227 dev[i].disconnect_and_stop_scan() 228 dev[0].cmd_execute(['iw', 'reg', 'set', '00']) 229 wait_regdom_changes(dev[0]) 230 country = dev[0].get_driver_status_field("country") 231 logger.info("Country code at the end: " + country) 232 if country != "00": 233 clear_country(dev) 234 for i in range(count): 235 dev[i].flush_scan_cache() 236 237def radiotap_build(): 238 radiotap_payload = struct.pack('BB', 0x08, 0) 239 radiotap_payload += struct.pack('BB', 0, 0) 240 radiotap_payload += struct.pack('BB', 0, 0) 241 radiotap_hdr = struct.pack('<BBHL', 0, 0, 8 + len(radiotap_payload), 242 0xc002) 243 return radiotap_hdr + radiotap_payload 244 245def start_monitor(ifname, freq=2412): 246 subprocess.check_call(["iw", ifname, "set", "type", "monitor"]) 247 subprocess.call(["ip", "link", "set", "dev", ifname, "up"]) 248 subprocess.check_call(["iw", ifname, "set", "freq", str(freq)]) 249 250 ETH_P_ALL = 3 251 sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, 252 socket.htons(ETH_P_ALL)) 253 sock.bind((ifname, 0)) 254 sock.settimeout(0.5) 255 return sock 256 257def stop_monitor(ifname): 258 subprocess.call(["ip", "link", "set", "dev", ifname, "down"]) 259 subprocess.call(["iw", ifname, "set", "type", "managed"]) 260 261def clear_scan_cache(apdev): 262 ifname = apdev['ifname'] 263 hostapd.cmd_execute(apdev, ['ifconfig', ifname, 'up']) 264 hostapd.cmd_execute(apdev, ['iw', ifname, 'scan', 'trigger', 'freq', '2412', 265 'flush']) 266 time.sleep(0.1) 267 hostapd.cmd_execute(apdev, ['ifconfig', ifname, 'down']) 268 269def set_world_reg(apdev0=None, apdev1=None, dev0=None): 270 if apdev0: 271 hostapd.cmd_execute(apdev0, ['iw', 'reg', 'set', '00']) 272 if apdev1: 273 hostapd.cmd_execute(apdev1, ['iw', 'reg', 'set', '00']) 274 if dev0: 275 dev0.cmd_execute(['iw', 'reg', 'set', '00']) 276 time.sleep(0.1) 277 278def sysctl_write(val): 279 subprocess.call(['sysctl', '-w', val], stdout=open('/dev/null', 'w')) 280 281def var_arg_call(fn, dev, apdev, params): 282 if fn.__code__.co_argcount > 2: 283 return fn(dev, apdev, params) 284 elif fn.__code__.co_argcount > 1: 285 return fn(dev, apdev) 286 return fn(dev) 287 288def cloned_wrapper(wrapper, fn): 289 # we need the name set right for selecting / printing etc. 290 wrapper.__name__ = fn.__name__ 291 wrapper.__doc__ = fn.__doc__ 292 # reparent to the right module for module filtering 293 wrapper.__module__ = fn.__module__ 294 return wrapper 295 296def disable_ipv6(fn): 297 def wrapper(dev, apdev, params): 298 require_under_vm() 299 try: 300 sysctl_write('net.ipv6.conf.all.disable_ipv6=1') 301 sysctl_write('net.ipv6.conf.default.disable_ipv6=1') 302 var_arg_call(fn, dev, apdev, params) 303 finally: 304 sysctl_write('net.ipv6.conf.all.disable_ipv6=0') 305 sysctl_write('net.ipv6.conf.default.disable_ipv6=0') 306 return cloned_wrapper(wrapper, fn) 307