1# wpa_supplicant D-Bus interface tests 2# Copyright (c) 2014-2015, Jouni Malinen <j@w1.fi> 3# 4# This software may be distributed under the terms of the BSD license. 5# See README for more details. 6 7import binascii 8import logging 9logger = logging.getLogger() 10import subprocess 11import time 12import shutil 13import struct 14import sys 15from test_ap_hs20 import hs20_ap_params 16 17try: 18 if sys.version_info[0] > 2: 19 from gi.repository import GObject as gobject 20 else: 21 import gobject 22 import dbus 23 dbus_imported = True 24except ImportError: 25 dbus_imported = False 26 27import hostapd 28from wpasupplicant import WpaSupplicant 29from utils import * 30from p2p_utils import * 31from test_ap_tdls import connect_2sta_open 32from test_ap_eap import check_altsubject_match_support, check_eap_capa 33from test_nfc_p2p import set_ip_addr_info 34from test_wpas_mesh import check_mesh_support, add_open_mesh_network 35 36WPAS_DBUS_SERVICE = "fi.w1.wpa_supplicant1" 37WPAS_DBUS_PATH = "/fi/w1/wpa_supplicant1" 38WPAS_DBUS_IFACE = "fi.w1.wpa_supplicant1.Interface" 39WPAS_DBUS_IFACE_WPS = WPAS_DBUS_IFACE + ".WPS" 40WPAS_DBUS_NETWORK = "fi.w1.wpa_supplicant1.Network" 41WPAS_DBUS_BSS = "fi.w1.wpa_supplicant1.BSS" 42WPAS_DBUS_IFACE_P2PDEVICE = WPAS_DBUS_IFACE + ".P2PDevice" 43WPAS_DBUS_P2P_PEER = "fi.w1.wpa_supplicant1.Peer" 44WPAS_DBUS_GROUP = "fi.w1.wpa_supplicant1.Group" 45WPAS_DBUS_PERSISTENT_GROUP = "fi.w1.wpa_supplicant1.PersistentGroup" 46WPAS_DBUS_IFACE_MESH = WPAS_DBUS_IFACE + ".Mesh" 47 48def prepare_dbus(dev): 49 if not dbus_imported: 50 logger.info("No dbus module available") 51 raise HwsimSkip("No dbus module available") 52 try: 53 from dbus.mainloop.glib import DBusGMainLoop 54 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) 55 bus = dbus.SystemBus() 56 wpas_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH) 57 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE) 58 path = wpas.GetInterface(dev.ifname) 59 if_obj = bus.get_object(WPAS_DBUS_SERVICE, path) 60 return (bus, wpas_obj, path, if_obj) 61 except Exception as e: 62 raise HwsimSkip("Could not connect to D-Bus: %s" % e) 63 64class TestDbus(object): 65 def __init__(self, bus): 66 self.loop = gobject.MainLoop() 67 self.signals = [] 68 self.bus = bus 69 70 def __exit__(self, type, value, traceback): 71 for s in self.signals: 72 s.remove() 73 74 def add_signal(self, handler, interface, name, byte_arrays=False): 75 s = self.bus.add_signal_receiver(handler, dbus_interface=interface, 76 signal_name=name, 77 byte_arrays=byte_arrays) 78 self.signals.append(s) 79 80 def timeout(self, *args): 81 logger.debug("timeout") 82 self.loop.quit() 83 return False 84 85class alloc_fail_dbus(object): 86 def __init__(self, dev, count, funcs, operation="Operation", 87 expected="NoMemory"): 88 self._dev = dev 89 self._count = count 90 self._funcs = funcs 91 self._operation = operation 92 self._expected = expected 93 def __enter__(self): 94 cmd = "TEST_ALLOC_FAIL %d:%s" % (self._count, self._funcs) 95 if "OK" not in self._dev.request(cmd): 96 raise HwsimSkip("TEST_ALLOC_FAIL not supported") 97 def __exit__(self, type, value, traceback): 98 if type is None: 99 raise Exception("%s succeeded during out-of-memory" % self._operation) 100 if type == dbus.exceptions.DBusException and self._expected in str(value): 101 return True 102 if self._dev.request("GET_ALLOC_FAIL") != "0:%s" % self._funcs: 103 raise Exception("%s did not trigger allocation failure" % self._operation) 104 return False 105 106def start_ap(ap, ssid="test-wps", 107 ap_uuid="27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"): 108 params = {"ssid": ssid, "eap_server": "1", "wps_state": "2", 109 "wpa_passphrase": "12345678", "wpa": "2", 110 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", 111 "ap_pin": "12345670", "uuid": ap_uuid} 112 return hostapd.add_ap(ap, params) 113 114def test_dbus_getall(dev, apdev): 115 """D-Bus GetAll""" 116 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 117 118 dev[0].flush_scan_cache() 119 120 props = wpas_obj.GetAll(WPAS_DBUS_SERVICE, 121 dbus_interface=dbus.PROPERTIES_IFACE) 122 logger.debug("GetAll(fi.w1.wpa.supplicant1, /fi/w1/wpa_supplicant1) ==> " + str(props)) 123 124 props = if_obj.GetAll(WPAS_DBUS_IFACE, 125 dbus_interface=dbus.PROPERTIES_IFACE) 126 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE, path, str(props))) 127 128 props = if_obj.GetAll(WPAS_DBUS_IFACE_WPS, 129 dbus_interface=dbus.PROPERTIES_IFACE) 130 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE_WPS, path, str(props))) 131 132 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs', 133 dbus_interface=dbus.PROPERTIES_IFACE) 134 if len(res) != 0: 135 raise Exception("Unexpected BSSs entry: " + str(res)) 136 137 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks', 138 dbus_interface=dbus.PROPERTIES_IFACE) 139 if len(res) != 0: 140 raise Exception("Unexpected Networks entry: " + str(res)) 141 142 hapd = hostapd.add_ap(apdev[0], {"ssid": "open"}) 143 bssid = apdev[0]['bssid'] 144 dev[0].scan_for_bss(bssid, freq=2412) 145 id = dev[0].add_network() 146 dev[0].set_network(id, "disabled", "0") 147 dev[0].set_network_quoted(id, "ssid", "test") 148 149 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs', 150 dbus_interface=dbus.PROPERTIES_IFACE) 151 if len(res) < 1: 152 raise Exception("Missing BSSs entry: " + str(res)) 153 if len(res) > 1: 154 raise Exception("Too manu BSSs entries: " + str(res)) 155 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0]) 156 props = bss_obj.GetAll(WPAS_DBUS_BSS, dbus_interface=dbus.PROPERTIES_IFACE) 157 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS, res[0], str(props))) 158 bssid_str = '' 159 for item in props['BSSID']: 160 if len(bssid_str) > 0: 161 bssid_str += ':' 162 bssid_str += '%02x' % item 163 if bssid_str != bssid: 164 raise Exception("Unexpected BSSID in BSSs entry") 165 166 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks', 167 dbus_interface=dbus.PROPERTIES_IFACE) 168 if len(res) != 1: 169 raise Exception("Missing Networks entry: " + str(res)) 170 net_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0]) 171 props = net_obj.GetAll(WPAS_DBUS_NETWORK, 172 dbus_interface=dbus.PROPERTIES_IFACE) 173 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_NETWORK, res[0], str(props))) 174 ssid = props['Properties']['ssid'] 175 if ssid != '"test"': 176 raise Exception("Unexpected SSID in network entry") 177 178def test_dbus_getall_oom(dev, apdev): 179 """D-Bus GetAll wpa_config_get_all() OOM""" 180 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 181 182 id = dev[0].add_network() 183 dev[0].set_network(id, "disabled", "0") 184 dev[0].set_network_quoted(id, "ssid", "test") 185 186 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks', 187 dbus_interface=dbus.PROPERTIES_IFACE) 188 if len(res) != 1: 189 raise Exception("Missing Networks entry: " + str(res)) 190 net_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0]) 191 for i in range(1, 50): 192 with alloc_fail(dev[0], i, "wpa_config_get_all"): 193 try: 194 props = net_obj.GetAll(WPAS_DBUS_NETWORK, 195 dbus_interface=dbus.PROPERTIES_IFACE) 196 except dbus.exceptions.DBusException as e: 197 pass 198 199def dbus_get(dbus, wpas_obj, prop, expect=None, byte_arrays=False): 200 val = wpas_obj.Get(WPAS_DBUS_SERVICE, prop, 201 dbus_interface=dbus.PROPERTIES_IFACE, 202 byte_arrays=byte_arrays) 203 if expect is not None and val != expect: 204 raise Exception("Unexpected %s: %s (expected: %s)" % 205 (prop, str(val), str(expect))) 206 return val 207 208def dbus_set(dbus, wpas_obj, prop, val): 209 wpas_obj.Set(WPAS_DBUS_SERVICE, prop, val, 210 dbus_interface=dbus.PROPERTIES_IFACE) 211 212def test_dbus_properties(dev, apdev): 213 """D-Bus Get/Set fi.w1.wpa_supplicant1 properties""" 214 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 215 216 dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump") 217 dbus_set(dbus, wpas_obj, "DebugLevel", "debug") 218 dbus_get(dbus, wpas_obj, "DebugLevel", expect="debug") 219 for (val, err) in [(3, "Error.Failed: wrong property type"), 220 ("foo", "Error.Failed: wrong debug level value")]: 221 try: 222 dbus_set(dbus, wpas_obj, "DebugLevel", val) 223 raise Exception("Invalid DebugLevel value accepted: " + str(val)) 224 except dbus.exceptions.DBusException as e: 225 if err not in str(e): 226 raise Exception("Unexpected error message: " + str(e)) 227 dbus_set(dbus, wpas_obj, "DebugLevel", "msgdump") 228 dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump") 229 230 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True) 231 dbus_set(dbus, wpas_obj, "DebugTimestamp", False) 232 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=False) 233 try: 234 dbus_set(dbus, wpas_obj, "DebugTimestamp", "foo") 235 raise Exception("Invalid DebugTimestamp value accepted") 236 except dbus.exceptions.DBusException as e: 237 if "Error.Failed: wrong property type" not in str(e): 238 raise Exception("Unexpected error message: " + str(e)) 239 dbus_set(dbus, wpas_obj, "DebugTimestamp", True) 240 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True) 241 242 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True) 243 dbus_set(dbus, wpas_obj, "DebugShowKeys", False) 244 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=False) 245 try: 246 dbus_set(dbus, wpas_obj, "DebugShowKeys", "foo") 247 raise Exception("Invalid DebugShowKeys value accepted") 248 except dbus.exceptions.DBusException as e: 249 if "Error.Failed: wrong property type" not in str(e): 250 raise Exception("Unexpected error message: " + str(e)) 251 dbus_set(dbus, wpas_obj, "DebugShowKeys", True) 252 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True) 253 254 res = dbus_get(dbus, wpas_obj, "Interfaces") 255 if len(res) != 1: 256 raise Exception("Unexpected Interfaces value: " + str(res)) 257 258 res = dbus_get(dbus, wpas_obj, "EapMethods") 259 if len(res) < 5 or "TTLS" not in res: 260 raise Exception("Unexpected EapMethods value: " + str(res)) 261 262 res = dbus_get(dbus, wpas_obj, "Capabilities") 263 if len(res) < 2 or "p2p" not in res: 264 raise Exception("Unexpected Capabilities value: " + str(res)) 265 266 dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True) 267 val = binascii.unhexlify("010006020304050608") 268 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val)) 269 res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True) 270 if val != res: 271 raise Exception("WFDIEs value changed") 272 try: 273 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(b'\x00')) 274 raise Exception("Invalid WFDIEs value accepted") 275 except dbus.exceptions.DBusException as e: 276 if "InvalidArgs" not in str(e): 277 raise Exception("Unexpected error message: " + str(e)) 278 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(b'')) 279 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val)) 280 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(b'')) 281 res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True) 282 if len(res) != 0: 283 raise Exception("WFDIEs not cleared properly") 284 285 res = dbus_get(dbus, wpas_obj, "EapMethods") 286 try: 287 dbus_set(dbus, wpas_obj, "EapMethods", res) 288 raise Exception("Invalid Set accepted") 289 except dbus.exceptions.DBusException as e: 290 if "InvalidArgs: Property is read-only" not in str(e): 291 raise Exception("Unexpected error message: " + str(e)) 292 293 try: 294 wpas_obj.SetFoo(WPAS_DBUS_SERVICE, "DebugShowKeys", True, 295 dbus_interface=dbus.PROPERTIES_IFACE) 296 raise Exception("Unknown method accepted") 297 except dbus.exceptions.DBusException as e: 298 if "UnknownMethod" not in str(e): 299 raise Exception("Unexpected error message: " + str(e)) 300 301 try: 302 wpas_obj.Get("foo", "DebugShowKeys", 303 dbus_interface=dbus.PROPERTIES_IFACE) 304 raise Exception("Invalid Get accepted") 305 except dbus.exceptions.DBusException as e: 306 if "InvalidArgs: No such property" not in str(e): 307 raise Exception("Unexpected error message: " + str(e)) 308 309 test_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH, 310 introspect=False) 311 try: 312 test_obj.Get(123, "DebugShowKeys", 313 dbus_interface=dbus.PROPERTIES_IFACE) 314 raise Exception("Invalid Get accepted") 315 except dbus.exceptions.DBusException as e: 316 if "InvalidArgs: Invalid arguments" not in str(e): 317 raise Exception("Unexpected error message: " + str(e)) 318 try: 319 test_obj.Get(WPAS_DBUS_SERVICE, 123, 320 dbus_interface=dbus.PROPERTIES_IFACE) 321 raise Exception("Invalid Get accepted") 322 except dbus.exceptions.DBusException as e: 323 if "InvalidArgs: Invalid arguments" not in str(e): 324 raise Exception("Unexpected error message: " + str(e)) 325 326 try: 327 wpas_obj.Set(WPAS_DBUS_SERVICE, "WFDIEs", 328 dbus.ByteArray(b'', variant_level=2), 329 dbus_interface=dbus.PROPERTIES_IFACE) 330 raise Exception("Invalid Set accepted") 331 except dbus.exceptions.DBusException as e: 332 if "InvalidArgs: invalid message format" not in str(e): 333 raise Exception("Unexpected error message: " + str(e)) 334 335def test_dbus_set_global_properties(dev, apdev): 336 """D-Bus Get/Set fi.w1.wpa_supplicant1 interface global properties""" 337 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 338 339 dev[0].set("model_name", "") 340 props = [('Okc', '0', '1'), ('ModelName', '', 'blahblahblah')] 341 342 for p in props: 343 res = if_obj.Get(WPAS_DBUS_IFACE, p[0], 344 dbus_interface=dbus.PROPERTIES_IFACE) 345 if res != p[1]: 346 raise Exception("Unexpected " + p[0] + " value: " + str(res)) 347 348 if_obj.Set(WPAS_DBUS_IFACE, p[0], p[2], 349 dbus_interface=dbus.PROPERTIES_IFACE) 350 351 res = if_obj.Get(WPAS_DBUS_IFACE, p[0], 352 dbus_interface=dbus.PROPERTIES_IFACE) 353 if res != p[2]: 354 raise Exception("Unexpected " + p[0] + " value after set: " + str(res)) 355 dev[0].set("model_name", "") 356 357def test_dbus_invalid_method(dev, apdev): 358 """D-Bus invalid method""" 359 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 360 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS) 361 362 try: 363 wps.Foo() 364 raise Exception("Unknown method accepted") 365 except dbus.exceptions.DBusException as e: 366 if "UnknownMethod" not in str(e): 367 raise Exception("Unexpected error message: " + str(e)) 368 369 test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False) 370 test_wps = dbus.Interface(test_obj, WPAS_DBUS_IFACE_WPS) 371 try: 372 test_wps.Start(123) 373 raise Exception("WPS.Start with incorrect signature accepted") 374 except dbus.exceptions.DBusException as e: 375 if "InvalidArgs: Invalid arg" not in str(e): 376 raise Exception("Unexpected error message: " + str(e)) 377 378def test_dbus_get_set_wps(dev, apdev): 379 """D-Bus Get/Set for WPS properties""" 380 try: 381 _test_dbus_get_set_wps(dev, apdev) 382 finally: 383 dev[0].request("SET wps_cred_processing 0") 384 dev[0].request("SET config_methods display keypad virtual_display nfc_interface p2ps") 385 dev[0].set("device_name", "Device A") 386 dev[0].set("manufacturer", "") 387 dev[0].set("model_name", "") 388 dev[0].set("model_number", "") 389 dev[0].set("serial_number", "") 390 dev[0].set("device_type", "0-00000000-0") 391 392def _test_dbus_get_set_wps(dev, apdev): 393 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 394 395 if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods", 396 dbus_interface=dbus.PROPERTIES_IFACE) 397 398 val = "display keypad virtual_display nfc_interface" 399 dev[0].request("SET config_methods " + val) 400 401 config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods", 402 dbus_interface=dbus.PROPERTIES_IFACE) 403 if config != val: 404 raise Exception("Unexpected Get(ConfigMethods) result: " + config) 405 406 val2 = "push_button display" 407 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2, 408 dbus_interface=dbus.PROPERTIES_IFACE) 409 config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods", 410 dbus_interface=dbus.PROPERTIES_IFACE) 411 if config != val2: 412 raise Exception("Unexpected Get(ConfigMethods) result after Set: " + config) 413 414 dev[0].request("SET config_methods " + val) 415 416 for i in range(3): 417 dev[0].request("SET wps_cred_processing " + str(i)) 418 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials", 419 dbus_interface=dbus.PROPERTIES_IFACE) 420 expected_val = False if i == 1 else True 421 if val != expected_val: 422 raise Exception("Unexpected Get(ProcessCredentials) result({}): {}".format(i, val)) 423 424 tests = [("device_name", "DeviceName"), 425 ("manufacturer", "Manufacturer"), 426 ("model_name", "ModelName"), 427 ("model_number", "ModelNumber"), 428 ("serial_number", "SerialNumber")] 429 430 for f1, f2 in tests: 431 val2 = "test-value-test" 432 dev[0].set(f1, val2) 433 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, f2, 434 dbus_interface=dbus.PROPERTIES_IFACE) 435 if val != val2: 436 raise Exception("Get(%s) returned unexpected value" % f2) 437 val2 = "TEST-value" 438 if_obj.Set(WPAS_DBUS_IFACE_WPS, f2, val2, 439 dbus_interface=dbus.PROPERTIES_IFACE) 440 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, f2, 441 dbus_interface=dbus.PROPERTIES_IFACE) 442 if val != val2: 443 raise Exception("Get(%s) returned unexpected value after Set" % f2) 444 445 dev[0].set("device_type", "5-0050F204-1") 446 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "DeviceType", 447 dbus_interface=dbus.PROPERTIES_IFACE) 448 if val[0] != 0x00 or val[1] != 0x05 != val[2] != 0x00 or val[3] != 0x50 or val[4] != 0xf2 or val[5] != 0x04 or val[6] != 0x00 or val[7] != 0x01: 449 raise Exception("DeviceType mismatch") 450 if_obj.Set(WPAS_DBUS_IFACE_WPS, "DeviceType", val, 451 dbus_interface=dbus.PROPERTIES_IFACE) 452 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "DeviceType", 453 dbus_interface=dbus.PROPERTIES_IFACE) 454 if val[0] != 0x00 or val[1] != 0x05 != val[2] != 0x00 or val[3] != 0x50 or val[4] != 0xf2 or val[5] != 0x04 or val[6] != 0x00 or val[7] != 0x01: 455 raise Exception("DeviceType mismatch after Set") 456 457 val2 = b'\x01\x02\x03\x04\x05\x06\x07\x08' 458 if_obj.Set(WPAS_DBUS_IFACE_WPS, "DeviceType", dbus.ByteArray(val2), 459 dbus_interface=dbus.PROPERTIES_IFACE) 460 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "DeviceType", 461 dbus_interface=dbus.PROPERTIES_IFACE, 462 byte_arrays=True) 463 if val != val2: 464 raise Exception("DeviceType mismatch after Set (2)") 465 466 class TestDbusGetSet(TestDbus): 467 def __init__(self, bus): 468 TestDbus.__init__(self, bus) 469 self.signal_received = False 470 self.signal_received_deprecated = False 471 self.sets_done = False 472 473 def __enter__(self): 474 gobject.timeout_add(1, self.run_sets) 475 gobject.timeout_add(1000, self.timeout) 476 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE_WPS, 477 "PropertiesChanged") 478 self.add_signal(self.propertiesChanged2, dbus.PROPERTIES_IFACE, 479 "PropertiesChanged") 480 self.loop.run() 481 return self 482 483 def propertiesChanged(self, properties): 484 logger.debug("PropertiesChanged: " + str(properties)) 485 if "ProcessCredentials" in properties: 486 self.signal_received_deprecated = True 487 if self.sets_done and self.signal_received: 488 self.loop.quit() 489 490 def propertiesChanged2(self, interface_name, changed_properties, 491 invalidated_properties): 492 logger.debug("propertiesChanged2: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties))) 493 if interface_name != WPAS_DBUS_IFACE_WPS: 494 return 495 if "ProcessCredentials" in changed_properties: 496 self.signal_received = True 497 if self.sets_done and self.signal_received_deprecated: 498 self.loop.quit() 499 500 def run_sets(self, *args): 501 logger.debug("run_sets") 502 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials", 503 dbus.Boolean(1), 504 dbus_interface=dbus.PROPERTIES_IFACE) 505 if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials", 506 dbus_interface=dbus.PROPERTIES_IFACE) != True: 507 raise Exception("Unexpected Get(ProcessCredentials) result after Set") 508 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials", 509 dbus.Boolean(0), 510 dbus_interface=dbus.PROPERTIES_IFACE) 511 if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials", 512 dbus_interface=dbus.PROPERTIES_IFACE) != False: 513 raise Exception("Unexpected Get(ProcessCredentials) result after Set") 514 515 self.dbus_sets_done = True 516 return False 517 518 def success(self): 519 return self.signal_received and self.signal_received_deprecated 520 521 with TestDbusGetSet(bus) as t: 522 if not t.success(): 523 raise Exception("No signal received for ProcessCredentials change") 524 525def test_dbus_wps_invalid(dev, apdev): 526 """D-Bus invaldi WPS operation""" 527 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 528 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS) 529 530 failures = [{'Role': 'foo', 'Type': 'pbc'}, 531 {'Role': 123, 'Type': 'pbc'}, 532 {'Type': 'pbc'}, 533 {'Role': 'enrollee'}, 534 {'Role': 'registrar'}, 535 {'Role': 'enrollee', 'Type': 123}, 536 {'Role': 'enrollee', 'Type': 'foo'}, 537 {'Role': 'enrollee', 'Type': 'pbc', 538 'Bssid': '02:33:44:55:66:77'}, 539 {'Role': 'enrollee', 'Type': 'pin', 'Pin': 123}, 540 {'Role': 'enrollee', 'Type': 'pbc', 541 'Bssid': dbus.ByteArray(b'12345')}, 542 {'Role': 'enrollee', 'Type': 'pbc', 543 'P2PDeviceAddress': 12345}, 544 {'Role': 'enrollee', 'Type': 'pbc', 545 'P2PDeviceAddress': dbus.ByteArray(b'12345')}, 546 {'Role': 'enrollee', 'Type': 'pbc', 'Foo': 'bar'}] 547 for args in failures: 548 try: 549 wps.Start(args) 550 raise Exception("Invalid WPS.Start() arguments accepted: " + str(args)) 551 except dbus.exceptions.DBusException as e: 552 if not str(e).startswith("fi.w1.wpa_supplicant1.InvalidArgs"): 553 raise Exception("Unexpected error message: " + str(e)) 554 555def test_dbus_wps_oom(dev, apdev): 556 """D-Bus WPS operation (OOM)""" 557 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 558 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS) 559 560 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_state", "Get"): 561 if_obj.Get(WPAS_DBUS_IFACE, "State", 562 dbus_interface=dbus.PROPERTIES_IFACE) 563 564 hapd = hostapd.add_ap(apdev[0], {"ssid": "open"}) 565 bssid = apdev[0]['bssid'] 566 dev[0].scan_for_bss(bssid, freq=2412) 567 568 time.sleep(0.05) 569 for i in range(1, 3): 570 with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_bsss", "Get"): 571 if_obj.Get(WPAS_DBUS_IFACE, "BSSs", 572 dbus_interface=dbus.PROPERTIES_IFACE) 573 574 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs', 575 dbus_interface=dbus.PROPERTIES_IFACE) 576 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0]) 577 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_bss_rates", "Get"): 578 bss_obj.Get(WPAS_DBUS_BSS, "Rates", 579 dbus_interface=dbus.PROPERTIES_IFACE) 580 with alloc_fail(dev[0], 1, 581 "wpa_bss_get_bit_rates;wpas_dbus_getter_bss_rates"): 582 try: 583 bss_obj.Get(WPAS_DBUS_BSS, "Rates", 584 dbus_interface=dbus.PROPERTIES_IFACE) 585 except dbus.exceptions.DBusException as e: 586 pass 587 588 id = dev[0].add_network() 589 dev[0].set_network(id, "disabled", "0") 590 dev[0].set_network_quoted(id, "ssid", "test") 591 592 for i in range(1, 3): 593 with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_networks", "Get"): 594 if_obj.Get(WPAS_DBUS_IFACE, "Networks", 595 dbus_interface=dbus.PROPERTIES_IFACE) 596 597 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_getter_interfaces", "Get"): 598 dbus_get(dbus, wpas_obj, "Interfaces") 599 600 for i in range(1, 6): 601 with alloc_fail_dbus(dev[0], i, "=eap_get_names_as_string_array;wpas_dbus_getter_eap_methods", "Get"): 602 dbus_get(dbus, wpas_obj, "EapMethods") 603 604 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_setter_config_methods", "Set", 605 expected="Error.Failed: Failed to set property"): 606 val2 = "push_button display" 607 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2, 608 dbus_interface=dbus.PROPERTIES_IFACE) 609 610 with alloc_fail_dbus(dev[0], 1, "=wpa_config_add_network;wpas_dbus_handler_wps_start", 611 "WPS.Start", 612 expected="UnknownError: WPS start failed"): 613 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670'}) 614 615def test_dbus_wps_pbc(dev, apdev): 616 """D-Bus WPS/PBC operation and signals""" 617 try: 618 _test_dbus_wps_pbc(dev, apdev) 619 finally: 620 dev[0].request("SET wps_cred_processing 0") 621 622def _test_dbus_wps_pbc(dev, apdev): 623 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 624 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS) 625 626 hapd = start_ap(apdev[0]) 627 hapd.request("WPS_PBC") 628 bssid = apdev[0]['bssid'] 629 dev[0].flush_scan_cache() 630 dev[0].scan_for_bss(bssid, freq="2412") 631 dev[0].request("SET wps_cred_processing 2") 632 633 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs', 634 dbus_interface=dbus.PROPERTIES_IFACE) 635 if len(res) != 1: 636 raise Exception("Missing BSSs entry: " + str(res)) 637 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0]) 638 props = bss_obj.GetAll(WPAS_DBUS_BSS, dbus_interface=dbus.PROPERTIES_IFACE) 639 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS, res[0], str(props))) 640 if 'WPS' not in props: 641 raise Exception("No WPS information in the BSS entry") 642 if 'Type' not in props['WPS']: 643 raise Exception("No Type field in the WPS dictionary") 644 if props['WPS']['Type'] != 'pbc': 645 raise Exception("Unexpected WPS Type: " + props['WPS']['Type']) 646 647 class TestDbusWps(TestDbus): 648 def __init__(self, bus, wps): 649 TestDbus.__init__(self, bus) 650 self.success_seen = False 651 self.credentials_received = False 652 self.wps = wps 653 654 def __enter__(self): 655 gobject.timeout_add(1, self.start_pbc) 656 gobject.timeout_add(15000, self.timeout) 657 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event") 658 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS, 659 "Credentials") 660 self.loop.run() 661 return self 662 663 def wpsEvent(self, name, args): 664 logger.debug("wpsEvent: %s args='%s'" % (name, str(args))) 665 if name == "success": 666 self.success_seen = True 667 if self.credentials_received: 668 self.loop.quit() 669 670 def credentials(self, args): 671 logger.debug("credentials: " + str(args)) 672 self.credentials_received = True 673 if self.success_seen: 674 self.loop.quit() 675 676 def start_pbc(self, *args): 677 logger.debug("start_pbc") 678 self.wps.Start({'Role': 'enrollee', 'Type': 'pbc'}) 679 return False 680 681 def success(self): 682 return self.success_seen and self.credentials_received 683 684 with TestDbusWps(bus, wps) as t: 685 if not t.success(): 686 raise Exception("Failure in D-Bus operations") 687 688 dev[0].wait_connected(timeout=10) 689 dev[0].request("DISCONNECT") 690 hapd.disable() 691 dev[0].flush_scan_cache() 692 693def test_dbus_wps_pbc_overlap(dev, apdev): 694 """D-Bus WPS/PBC operation and signal for PBC overlap""" 695 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 696 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS) 697 698 hapd = start_ap(apdev[0]) 699 hapd2 = start_ap(apdev[1], ssid="test-wps2", 700 ap_uuid="27ea801a-9e5c-4e73-bd82-f89cbcd10d7f") 701 hapd.request("WPS_PBC") 702 hapd2.request("WPS_PBC") 703 bssid = apdev[0]['bssid'] 704 dev[0].scan_for_bss(bssid, freq="2412") 705 bssid2 = apdev[1]['bssid'] 706 dev[0].scan_for_bss(bssid2, freq="2412") 707 708 class TestDbusWps(TestDbus): 709 def __init__(self, bus, wps): 710 TestDbus.__init__(self, bus) 711 self.overlap_seen = False 712 self.wps = wps 713 714 def __enter__(self): 715 gobject.timeout_add(1, self.start_pbc) 716 gobject.timeout_add(15000, self.timeout) 717 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event") 718 self.loop.run() 719 return self 720 721 def wpsEvent(self, name, args): 722 logger.debug("wpsEvent: %s args='%s'" % (name, str(args))) 723 if name == "pbc-overlap": 724 self.overlap_seen = True 725 self.loop.quit() 726 727 def start_pbc(self, *args): 728 logger.debug("start_pbc") 729 self.wps.Start({'Role': 'enrollee', 'Type': 'pbc'}) 730 return False 731 732 def success(self): 733 return self.overlap_seen 734 735 with TestDbusWps(bus, wps) as t: 736 if not t.success(): 737 raise Exception("Failure in D-Bus operations") 738 739 dev[0].request("WPS_CANCEL") 740 dev[0].request("DISCONNECT") 741 hapd.disable() 742 hapd2.disable() 743 dev[0].flush_scan_cache() 744 745def test_dbus_wps_pin(dev, apdev): 746 """D-Bus WPS/PIN operation and signals""" 747 try: 748 _test_dbus_wps_pin(dev, apdev) 749 finally: 750 dev[0].request("SET wps_cred_processing 0") 751 752def _test_dbus_wps_pin(dev, apdev): 753 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 754 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS) 755 756 hapd = start_ap(apdev[0]) 757 hapd.request("WPS_PIN any 12345670") 758 bssid = apdev[0]['bssid'] 759 dev[0].scan_for_bss(bssid, freq="2412") 760 dev[0].request("SET wps_cred_processing 2") 761 762 class TestDbusWps(TestDbus): 763 def __init__(self, bus): 764 TestDbus.__init__(self, bus) 765 self.success_seen = False 766 self.credentials_received = False 767 768 def __enter__(self): 769 gobject.timeout_add(1, self.start_pin) 770 gobject.timeout_add(15000, self.timeout) 771 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event") 772 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS, 773 "Credentials") 774 self.loop.run() 775 return self 776 777 def wpsEvent(self, name, args): 778 logger.debug("wpsEvent: %s args='%s'" % (name, str(args))) 779 if name == "success": 780 self.success_seen = True 781 if self.credentials_received: 782 self.loop.quit() 783 784 def credentials(self, args): 785 logger.debug("credentials: " + str(args)) 786 self.credentials_received = True 787 if self.success_seen: 788 self.loop.quit() 789 790 def start_pin(self, *args): 791 logger.debug("start_pin") 792 bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':', '').encode())) 793 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670', 794 'Bssid': bssid_ay}) 795 return False 796 797 def success(self): 798 return self.success_seen and self.credentials_received 799 800 with TestDbusWps(bus) as t: 801 if not t.success(): 802 raise Exception("Failure in D-Bus operations") 803 804 dev[0].wait_connected(timeout=10) 805 806def test_dbus_wps_pin2(dev, apdev): 807 """D-Bus WPS/PIN operation and signals (PIN from wpa_supplicant)""" 808 try: 809 _test_dbus_wps_pin2(dev, apdev) 810 finally: 811 dev[0].request("SET wps_cred_processing 0") 812 813def _test_dbus_wps_pin2(dev, apdev): 814 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 815 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS) 816 817 hapd = start_ap(apdev[0]) 818 bssid = apdev[0]['bssid'] 819 dev[0].scan_for_bss(bssid, freq="2412") 820 dev[0].request("SET wps_cred_processing 2") 821 822 class TestDbusWps(TestDbus): 823 def __init__(self, bus): 824 TestDbus.__init__(self, bus) 825 self.success_seen = False 826 self.failed = False 827 828 def __enter__(self): 829 gobject.timeout_add(1, self.start_pin) 830 gobject.timeout_add(15000, self.timeout) 831 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event") 832 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS, 833 "Credentials") 834 self.loop.run() 835 return self 836 837 def wpsEvent(self, name, args): 838 logger.debug("wpsEvent: %s args='%s'" % (name, str(args))) 839 if name == "success": 840 self.success_seen = True 841 if self.credentials_received: 842 self.loop.quit() 843 844 def credentials(self, args): 845 logger.debug("credentials: " + str(args)) 846 self.credentials_received = True 847 if self.success_seen: 848 self.loop.quit() 849 850 def start_pin(self, *args): 851 logger.debug("start_pin") 852 bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':', '').encode())) 853 res = wps.Start({'Role': 'enrollee', 'Type': 'pin', 854 'Bssid': bssid_ay}) 855 pin = res['Pin'] 856 h = hostapd.Hostapd(apdev[0]['ifname']) 857 h.request("WPS_PIN any " + pin) 858 return False 859 860 def success(self): 861 return self.success_seen and self.credentials_received 862 863 with TestDbusWps(bus) as t: 864 if not t.success(): 865 raise Exception("Failure in D-Bus operations") 866 867 dev[0].wait_connected(timeout=10) 868 869def test_dbus_wps_pin_m2d(dev, apdev): 870 """D-Bus WPS/PIN operation and signals with M2D""" 871 try: 872 _test_dbus_wps_pin_m2d(dev, apdev) 873 finally: 874 dev[0].request("SET wps_cred_processing 0") 875 876def _test_dbus_wps_pin_m2d(dev, apdev): 877 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 878 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS) 879 880 hapd = start_ap(apdev[0]) 881 bssid = apdev[0]['bssid'] 882 dev[0].scan_for_bss(bssid, freq="2412") 883 dev[0].request("SET wps_cred_processing 2") 884 885 class TestDbusWps(TestDbus): 886 def __init__(self, bus): 887 TestDbus.__init__(self, bus) 888 self.success_seen = False 889 self.credentials_received = False 890 891 def __enter__(self): 892 gobject.timeout_add(1, self.start_pin) 893 gobject.timeout_add(15000, self.timeout) 894 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event") 895 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS, 896 "Credentials") 897 self.loop.run() 898 return self 899 900 def wpsEvent(self, name, args): 901 logger.debug("wpsEvent: %s args='%s'" % (name, str(args))) 902 if name == "success": 903 self.success_seen = True 904 if self.credentials_received: 905 self.loop.quit() 906 elif name == "m2d": 907 h = hostapd.Hostapd(apdev[0]['ifname']) 908 h.request("WPS_PIN any 12345670") 909 910 def credentials(self, args): 911 logger.debug("credentials: " + str(args)) 912 self.credentials_received = True 913 if self.success_seen: 914 self.loop.quit() 915 916 def start_pin(self, *args): 917 logger.debug("start_pin") 918 bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':', '').encode())) 919 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670', 920 'Bssid': bssid_ay}) 921 return False 922 923 def success(self): 924 return self.success_seen and self.credentials_received 925 926 with TestDbusWps(bus) as t: 927 if not t.success(): 928 raise Exception("Failure in D-Bus operations") 929 930 dev[0].wait_connected(timeout=10) 931 932def test_dbus_wps_reg(dev, apdev): 933 """D-Bus WPS/Registrar operation and signals""" 934 try: 935 _test_dbus_wps_reg(dev, apdev) 936 finally: 937 dev[0].request("SET wps_cred_processing 0") 938 939def _test_dbus_wps_reg(dev, apdev): 940 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 941 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS) 942 943 hapd = start_ap(apdev[0]) 944 hapd.request("WPS_PIN any 12345670") 945 bssid = apdev[0]['bssid'] 946 dev[0].scan_for_bss(bssid, freq="2412") 947 dev[0].request("SET wps_cred_processing 2") 948 949 class TestDbusWps(TestDbus): 950 def __init__(self, bus): 951 TestDbus.__init__(self, bus) 952 self.credentials_received = False 953 954 def __enter__(self): 955 gobject.timeout_add(100, self.start_reg) 956 gobject.timeout_add(15000, self.timeout) 957 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event") 958 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS, 959 "Credentials") 960 self.loop.run() 961 return self 962 963 def wpsEvent(self, name, args): 964 logger.debug("wpsEvent: %s args='%s'" % (name, str(args))) 965 966 def credentials(self, args): 967 logger.debug("credentials: " + str(args)) 968 self.credentials_received = True 969 self.loop.quit() 970 971 def start_reg(self, *args): 972 logger.debug("start_reg") 973 bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':', '').encode())) 974 wps.Start({'Role': 'registrar', 'Type': 'pin', 975 'Pin': '12345670', 'Bssid': bssid_ay}) 976 return False 977 978 def success(self): 979 return self.credentials_received 980 981 with TestDbusWps(bus) as t: 982 if not t.success(): 983 raise Exception("Failure in D-Bus operations") 984 985 dev[0].wait_connected(timeout=10) 986 987def test_dbus_wps_cancel(dev, apdev): 988 """D-Bus WPS Cancel operation""" 989 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 990 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS) 991 992 hapd = start_ap(apdev[0]) 993 bssid = apdev[0]['bssid'] 994 995 wps.Cancel() 996 dev[0].scan_for_bss(bssid, freq="2412") 997 bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':', '').encode())) 998 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670', 999 'Bssid': bssid_ay}) 1000 wps.Cancel() 1001 dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 1) 1002 1003def test_dbus_scan_invalid(dev, apdev): 1004 """D-Bus invalid scan method""" 1005 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1006 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1007 1008 tests = [({}, "InvalidArgs"), 1009 ({'Type': 123}, "InvalidArgs"), 1010 ({'Type': 'foo'}, "InvalidArgs"), 1011 ({'Type': 'active', 'Foo': 'bar'}, "InvalidArgs"), 1012 ({'Type': 'active', 'SSIDs': 'foo'}, "InvalidArgs"), 1013 ({'Type': 'active', 'SSIDs': ['foo']}, "InvalidArgs"), 1014 ({'Type': 'active', 1015 'SSIDs': [dbus.ByteArray(b"1"), dbus.ByteArray(b"2"), 1016 dbus.ByteArray(b"3"), dbus.ByteArray(b"4"), 1017 dbus.ByteArray(b"5"), dbus.ByteArray(b"6"), 1018 dbus.ByteArray(b"7"), dbus.ByteArray(b"8"), 1019 dbus.ByteArray(b"9"), dbus.ByteArray(b"10"), 1020 dbus.ByteArray(b"11"), dbus.ByteArray(b"12"), 1021 dbus.ByteArray(b"13"), dbus.ByteArray(b"14"), 1022 dbus.ByteArray(b"15"), dbus.ByteArray(b"16"), 1023 dbus.ByteArray(b"17")]}, 1024 "InvalidArgs"), 1025 ({'Type': 'active', 1026 'SSIDs': [dbus.ByteArray(b"1234567890abcdef1234567890abcdef1")]}, 1027 "InvalidArgs"), 1028 ({'Type': 'active', 'IEs': 'foo'}, "InvalidArgs"), 1029 ({'Type': 'active', 'IEs': ['foo']}, "InvalidArgs"), 1030 ({'Type': 'active', 'Channels': 2412}, "InvalidArgs"), 1031 ({'Type': 'active', 'Channels': [2412]}, "InvalidArgs"), 1032 ({'Type': 'active', 1033 'Channels': [(dbus.Int32(2412), dbus.UInt32(20))]}, 1034 "InvalidArgs"), 1035 ({'Type': 'active', 1036 'Channels': [(dbus.UInt32(2412), dbus.Int32(20))]}, 1037 "InvalidArgs"), 1038 ({'Type': 'active', 'AllowRoam': "yes"}, "InvalidArgs"), 1039 ({'Type': 'passive', 'IEs': [dbus.ByteArray(b"\xdd\x00")]}, 1040 "InvalidArgs"), 1041 ({'Type': 'passive', 'SSIDs': [dbus.ByteArray(b"foo")]}, 1042 "InvalidArgs")] 1043 for (t, err) in tests: 1044 try: 1045 iface.Scan(t) 1046 raise Exception("Invalid Scan() arguments accepted: " + str(t)) 1047 except dbus.exceptions.DBusException as e: 1048 if err not in str(e): 1049 raise Exception("Unexpected error message for invalid Scan(%s): %s" % (str(t), str(e))) 1050 1051def test_dbus_scan_oom(dev, apdev): 1052 """D-Bus scan method and OOM""" 1053 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1054 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1055 1056 with alloc_fail_dbus(dev[0], 1, 1057 "wpa_scan_clone_params;wpas_dbus_handler_scan", 1058 "Scan", expected="ScanError: Scan request rejected"): 1059 iface.Scan({'Type': 'passive', 1060 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]}) 1061 1062 with alloc_fail_dbus(dev[0], 1, 1063 "=wpas_dbus_get_scan_channels;wpas_dbus_handler_scan", 1064 "Scan"): 1065 iface.Scan({'Type': 'passive', 1066 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]}) 1067 1068 with alloc_fail_dbus(dev[0], 1, 1069 "=wpas_dbus_get_scan_ies;wpas_dbus_handler_scan", 1070 "Scan"): 1071 iface.Scan({'Type': 'active', 1072 'IEs': [dbus.ByteArray(b"\xdd\x00")], 1073 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]}) 1074 1075 with alloc_fail_dbus(dev[0], 1, 1076 "=wpas_dbus_get_scan_ssids;wpas_dbus_handler_scan", 1077 "Scan"): 1078 iface.Scan({'Type': 'active', 1079 'SSIDs': [dbus.ByteArray(b"open"), 1080 dbus.ByteArray()], 1081 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]}) 1082 1083def test_dbus_scan(dev, apdev): 1084 """D-Bus scan and related signals""" 1085 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1086 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1087 1088 hapd = hostapd.add_ap(apdev[0], {"ssid": "open"}) 1089 1090 class TestDbusScan(TestDbus): 1091 def __init__(self, bus): 1092 TestDbus.__init__(self, bus) 1093 self.scan_completed = 0 1094 self.bss_added = False 1095 self.fail_reason = None 1096 1097 def __enter__(self): 1098 gobject.timeout_add(1, self.run_scan) 1099 gobject.timeout_add(15000, self.timeout) 1100 self.add_signal(self.scanDone, WPAS_DBUS_IFACE, "ScanDone") 1101 self.add_signal(self.bssAdded, WPAS_DBUS_IFACE, "BSSAdded") 1102 self.add_signal(self.bssRemoved, WPAS_DBUS_IFACE, "BSSRemoved") 1103 self.loop.run() 1104 return self 1105 1106 def scanDone(self, success): 1107 logger.debug("scanDone: success=%s" % success) 1108 self.scan_completed += 1 1109 if self.scan_completed == 1: 1110 iface.Scan({'Type': 'passive', 1111 'AllowRoam': True, 1112 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]}) 1113 elif self.scan_completed == 2: 1114 iface.Scan({'Type': 'passive', 1115 'AllowRoam': False}) 1116 elif self.bss_added and self.scan_completed == 3: 1117 self.loop.quit() 1118 1119 def bssAdded(self, bss, properties): 1120 logger.debug("bssAdded: %s" % bss) 1121 logger.debug(str(properties)) 1122 if 'WPS' in properties: 1123 if 'Type' in properties['WPS']: 1124 self.fail_reason = "Unexpected WPS dictionary entry in non-WPS BSS" 1125 self.loop.quit() 1126 self.bss_added = True 1127 if self.scan_completed == 3: 1128 self.loop.quit() 1129 1130 def bssRemoved(self, bss): 1131 logger.debug("bssRemoved: %s" % bss) 1132 1133 def run_scan(self, *args): 1134 logger.debug("run_scan") 1135 iface.Scan({'Type': 'active', 1136 'SSIDs': [dbus.ByteArray(b"open"), 1137 dbus.ByteArray()], 1138 'IEs': [dbus.ByteArray(b"\xdd\x00"), 1139 dbus.ByteArray()], 1140 'AllowRoam': False, 1141 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]}) 1142 return False 1143 1144 def success(self): 1145 return self.scan_completed == 3 and self.bss_added 1146 1147 with TestDbusScan(bus) as t: 1148 if t.fail_reason: 1149 raise Exception(t.fail_reason) 1150 if not t.success(): 1151 raise Exception("Expected signals not seen") 1152 1153 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs", 1154 dbus_interface=dbus.PROPERTIES_IFACE) 1155 if len(res) < 1: 1156 raise Exception("Scan result not in BSSs property") 1157 iface.FlushBSS(0) 1158 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs", 1159 dbus_interface=dbus.PROPERTIES_IFACE) 1160 if len(res) != 0: 1161 raise Exception("FlushBSS() did not remove scan results from BSSs property") 1162 iface.FlushBSS(1) 1163 1164def test_dbus_scan_rand(dev, apdev): 1165 """D-Bus MACAddressRandomizationMask property Get/Set""" 1166 try: 1167 run_dbus_scan_rand(dev, apdev) 1168 finally: 1169 dev[0].request("MAC_RAND_SCAN all enable=0") 1170 1171def run_dbus_scan_rand(dev, apdev): 1172 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1173 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1174 1175 res = if_obj.Get(WPAS_DBUS_IFACE, "MACAddressRandomizationMask", 1176 dbus_interface=dbus.PROPERTIES_IFACE) 1177 if len(res) != 0: 1178 logger.info(str(res)) 1179 raise Exception("Unexpected initial MACAddressRandomizationMask value") 1180 1181 try: 1182 if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask", "foo", 1183 dbus_interface=dbus.PROPERTIES_IFACE) 1184 raise Exception("Invalid Set accepted") 1185 except dbus.exceptions.DBusException as e: 1186 if "InvalidArgs: invalid message format" not in str(e): 1187 raise Exception("Unexpected error message: " + str(e)) 1188 1189 try: 1190 if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask", 1191 {"foo": "bar"}, 1192 dbus_interface=dbus.PROPERTIES_IFACE) 1193 raise Exception("Invalid Set accepted") 1194 except dbus.exceptions.DBusException as e: 1195 if "wpas_dbus_setter_mac_address_randomization_mask: mask was not a byte array" not in str(e): 1196 raise Exception("Unexpected error message: " + str(e)) 1197 1198 try: 1199 if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask", 1200 {"foo": dbus.ByteArray(b'123456')}, 1201 dbus_interface=dbus.PROPERTIES_IFACE) 1202 raise Exception("Invalid Set accepted") 1203 except dbus.exceptions.DBusException as e: 1204 if 'wpas_dbus_setter_mac_address_randomization_mask: bad scan type "foo"' not in str(e): 1205 raise Exception("Unexpected error message: " + str(e)) 1206 1207 try: 1208 if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask", 1209 {"scan": dbus.ByteArray(b'12345')}, 1210 dbus_interface=dbus.PROPERTIES_IFACE) 1211 raise Exception("Invalid Set accepted") 1212 except dbus.exceptions.DBusException as e: 1213 if 'wpas_dbus_setter_mac_address_randomization_mask: malformed MAC mask given' not in str(e): 1214 raise Exception("Unexpected error message: " + str(e)) 1215 1216 if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask", 1217 {"scan": dbus.ByteArray(b'123456')}, 1218 dbus_interface=dbus.PROPERTIES_IFACE) 1219 res = if_obj.Get(WPAS_DBUS_IFACE, "MACAddressRandomizationMask", 1220 dbus_interface=dbus.PROPERTIES_IFACE) 1221 if len(res) != 1: 1222 logger.info(str(res)) 1223 raise Exception("Unexpected MACAddressRandomizationMask value") 1224 1225 try: 1226 if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask", 1227 {"scan": dbus.ByteArray(b'123456'), 1228 "sched_scan": dbus.ByteArray(b'987654')}, 1229 dbus_interface=dbus.PROPERTIES_IFACE) 1230 except dbus.exceptions.DBusException as e: 1231 # sched_scan is unlikely to be supported 1232 pass 1233 1234 if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask", 1235 dbus.Dictionary({}, signature='sv'), 1236 dbus_interface=dbus.PROPERTIES_IFACE) 1237 res = if_obj.Get(WPAS_DBUS_IFACE, "MACAddressRandomizationMask", 1238 dbus_interface=dbus.PROPERTIES_IFACE) 1239 if len(res) != 0: 1240 logger.info(str(res)) 1241 raise Exception("Unexpected MACAddressRandomizationMask value") 1242 1243def test_dbus_scan_busy(dev, apdev): 1244 """D-Bus scan trigger rejection when busy with previous scan""" 1245 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1246 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1247 1248 if "OK" not in dev[0].request("SCAN freq=2412-2462"): 1249 raise Exception("Failed to start scan") 1250 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-STARTED"], 15) 1251 if ev is None: 1252 raise Exception("Scan start timed out") 1253 1254 try: 1255 iface.Scan({'Type': 'active', 'AllowRoam': False}) 1256 raise Exception("Scan() accepted when busy") 1257 except dbus.exceptions.DBusException as e: 1258 if "ScanError: Scan request reject" not in str(e): 1259 raise Exception("Unexpected error message: " + str(e)) 1260 1261 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15) 1262 if ev is None: 1263 raise Exception("Scan timed out") 1264 1265def test_dbus_scan_abort(dev, apdev): 1266 """D-Bus scan trigger and abort""" 1267 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1268 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1269 1270 iface.Scan({'Type': 'active', 'AllowRoam': False}) 1271 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-STARTED"], 15) 1272 if ev is None: 1273 raise Exception("Scan start timed out") 1274 1275 iface.AbortScan() 1276 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15) 1277 if ev is None: 1278 raise Exception("Scan abort result timed out") 1279 dev[0].dump_monitor() 1280 iface.Scan({'Type': 'active', 'AllowRoam': False}) 1281 iface.AbortScan() 1282 1283 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15) 1284 if ev is None: 1285 raise Exception("Scan timed out") 1286 1287def test_dbus_connect(dev, apdev): 1288 """D-Bus AddNetwork and connect""" 1289 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1290 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1291 1292 ssid = "test-wpa2-psk" 1293 passphrase = 'qwertyuiop' 1294 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase) 1295 hapd = hostapd.add_ap(apdev[0], params) 1296 1297 class TestDbusConnect(TestDbus): 1298 def __init__(self, bus): 1299 TestDbus.__init__(self, bus) 1300 self.network_added = False 1301 self.network_selected = False 1302 self.network_removed = False 1303 self.state = 0 1304 1305 def __enter__(self): 1306 gobject.timeout_add(1, self.run_connect) 1307 gobject.timeout_add(15000, self.timeout) 1308 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded") 1309 self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE, 1310 "NetworkRemoved") 1311 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE, 1312 "NetworkSelected") 1313 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, 1314 "PropertiesChanged") 1315 self.loop.run() 1316 return self 1317 1318 def networkAdded(self, network, properties): 1319 logger.debug("networkAdded: %s" % str(network)) 1320 logger.debug(str(properties)) 1321 self.network_added = True 1322 1323 def networkRemoved(self, network): 1324 logger.debug("networkRemoved: %s" % str(network)) 1325 self.network_removed = True 1326 1327 def networkSelected(self, network): 1328 logger.debug("networkSelected: %s" % str(network)) 1329 self.network_selected = True 1330 1331 def propertiesChanged(self, properties): 1332 logger.debug("propertiesChanged: %s" % str(properties)) 1333 if 'State' in properties and properties['State'] == "completed": 1334 if self.state == 0: 1335 self.state = 1 1336 iface.Disconnect() 1337 elif self.state == 2: 1338 self.state = 3 1339 iface.Disconnect() 1340 elif self.state == 4: 1341 self.state = 5 1342 iface.Reattach() 1343 elif self.state == 5: 1344 self.state = 6 1345 iface.Disconnect() 1346 elif self.state == 7: 1347 self.state = 8 1348 res = iface.SignalPoll() 1349 logger.debug("SignalPoll: " + str(res)) 1350 if 'frequency' not in res or res['frequency'] != 2412: 1351 self.state = -1 1352 logger.info("Unexpected SignalPoll result") 1353 iface.RemoveNetwork(self.netw) 1354 if 'State' in properties and properties['State'] == "disconnected": 1355 if self.state == 1: 1356 self.state = 2 1357 iface.SelectNetwork(self.netw) 1358 elif self.state == 3: 1359 self.state = 4 1360 iface.Reassociate() 1361 elif self.state == 6: 1362 self.state = 7 1363 iface.Reconnect() 1364 elif self.state == 8: 1365 self.state = 9 1366 self.loop.quit() 1367 1368 def run_connect(self, *args): 1369 logger.debug("run_connect") 1370 args = dbus.Dictionary({'ssid': ssid, 1371 'key_mgmt': 'WPA-PSK', 1372 'psk': passphrase, 1373 'scan_freq': 2412}, 1374 signature='sv') 1375 self.netw = iface.AddNetwork(args) 1376 iface.SelectNetwork(self.netw) 1377 return False 1378 1379 def success(self): 1380 if not self.network_added or \ 1381 not self.network_removed or \ 1382 not self.network_selected: 1383 return False 1384 return self.state == 9 1385 1386 with TestDbusConnect(bus) as t: 1387 if not t.success(): 1388 raise Exception("Expected signals not seen") 1389 1390def test_dbus_remove_connected(dev, apdev): 1391 """D-Bus RemoveAllNetworks while connected""" 1392 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1393 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1394 1395 ssid = "test-open" 1396 hapd = hostapd.add_ap(apdev[0], {"ssid": ssid}) 1397 1398 class TestDbusConnect(TestDbus): 1399 def __init__(self, bus): 1400 TestDbus.__init__(self, bus) 1401 self.network_added = False 1402 self.network_selected = False 1403 self.network_removed = False 1404 self.state = 0 1405 1406 def __enter__(self): 1407 gobject.timeout_add(1, self.run_connect) 1408 gobject.timeout_add(15000, self.timeout) 1409 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded") 1410 self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE, 1411 "NetworkRemoved") 1412 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE, 1413 "NetworkSelected") 1414 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, 1415 "PropertiesChanged") 1416 self.loop.run() 1417 return self 1418 1419 def networkAdded(self, network, properties): 1420 logger.debug("networkAdded: %s" % str(network)) 1421 logger.debug(str(properties)) 1422 self.network_added = True 1423 1424 def networkRemoved(self, network): 1425 logger.debug("networkRemoved: %s" % str(network)) 1426 self.network_removed = True 1427 1428 def networkSelected(self, network): 1429 logger.debug("networkSelected: %s" % str(network)) 1430 self.network_selected = True 1431 1432 def propertiesChanged(self, properties): 1433 logger.debug("propertiesChanged: %s" % str(properties)) 1434 if 'State' in properties and properties['State'] == "completed": 1435 if self.state == 0: 1436 self.state = 1 1437 iface.Disconnect() 1438 elif self.state == 2: 1439 self.state = 3 1440 iface.Disconnect() 1441 elif self.state == 4: 1442 self.state = 5 1443 iface.Reattach() 1444 elif self.state == 5: 1445 self.state = 6 1446 iface.Disconnect() 1447 elif self.state == 7: 1448 self.state = 8 1449 res = iface.SignalPoll() 1450 logger.debug("SignalPoll: " + str(res)) 1451 if 'frequency' not in res or res['frequency'] != 2412: 1452 self.state = -1 1453 logger.info("Unexpected SignalPoll result") 1454 iface.RemoveAllNetworks() 1455 if 'State' in properties and properties['State'] == "disconnected": 1456 if self.state == 1: 1457 self.state = 2 1458 iface.SelectNetwork(self.netw) 1459 elif self.state == 3: 1460 self.state = 4 1461 iface.Reassociate() 1462 elif self.state == 6: 1463 self.state = 7 1464 iface.Reconnect() 1465 elif self.state == 8: 1466 self.state = 9 1467 self.loop.quit() 1468 1469 def run_connect(self, *args): 1470 logger.debug("run_connect") 1471 args = dbus.Dictionary({'ssid': ssid, 1472 'key_mgmt': 'NONE', 1473 'scan_freq': 2412}, 1474 signature='sv') 1475 self.netw = iface.AddNetwork(args) 1476 iface.SelectNetwork(self.netw) 1477 return False 1478 1479 def success(self): 1480 if not self.network_added or \ 1481 not self.network_removed or \ 1482 not self.network_selected: 1483 return False 1484 return self.state == 9 1485 1486 with TestDbusConnect(bus) as t: 1487 if not t.success(): 1488 raise Exception("Expected signals not seen") 1489 1490def test_dbus_connect_psk_mem(dev, apdev): 1491 """D-Bus AddNetwork and connect with memory-only PSK""" 1492 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1493 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1494 1495 ssid = "test-wpa2-psk" 1496 passphrase = 'qwertyuiop' 1497 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase) 1498 hapd = hostapd.add_ap(apdev[0], params) 1499 1500 class TestDbusConnect(TestDbus): 1501 def __init__(self, bus): 1502 TestDbus.__init__(self, bus) 1503 self.connected = False 1504 1505 def __enter__(self): 1506 gobject.timeout_add(1, self.run_connect) 1507 gobject.timeout_add(15000, self.timeout) 1508 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, 1509 "PropertiesChanged") 1510 self.add_signal(self.networkRequest, WPAS_DBUS_IFACE, 1511 "NetworkRequest") 1512 self.loop.run() 1513 return self 1514 1515 def propertiesChanged(self, properties): 1516 logger.debug("propertiesChanged: %s" % str(properties)) 1517 if 'State' in properties and properties['State'] == "completed": 1518 self.connected = True 1519 self.loop.quit() 1520 1521 def networkRequest(self, path, field, txt): 1522 logger.debug("networkRequest: %s %s %s" % (path, field, txt)) 1523 if field == "PSK_PASSPHRASE": 1524 iface.NetworkReply(path, field, '"' + passphrase + '"') 1525 1526 def run_connect(self, *args): 1527 logger.debug("run_connect") 1528 args = dbus.Dictionary({'ssid': ssid, 1529 'key_mgmt': 'WPA-PSK', 1530 'mem_only_psk': 1, 1531 'scan_freq': 2412}, 1532 signature='sv') 1533 self.netw = iface.AddNetwork(args) 1534 iface.SelectNetwork(self.netw) 1535 return False 1536 1537 def success(self): 1538 return self.connected 1539 1540 with TestDbusConnect(bus) as t: 1541 if not t.success(): 1542 raise Exception("Expected signals not seen") 1543 1544def test_dbus_connect_oom(dev, apdev): 1545 """D-Bus AddNetwork and connect when out-of-memory""" 1546 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1547 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1548 1549 if "OK" not in dev[0].request("TEST_ALLOC_FAIL 0:"): 1550 raise HwsimSkip("TEST_ALLOC_FAIL not supported in the build") 1551 1552 ssid = "test-wpa2-psk" 1553 passphrase = 'qwertyuiop' 1554 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase) 1555 hapd = hostapd.add_ap(apdev[0], params) 1556 1557 class TestDbusConnect(TestDbus): 1558 def __init__(self, bus): 1559 TestDbus.__init__(self, bus) 1560 self.network_added = False 1561 self.network_selected = False 1562 self.network_removed = False 1563 self.state = 0 1564 1565 def __enter__(self): 1566 gobject.timeout_add(1, self.run_connect) 1567 gobject.timeout_add(1500, self.timeout) 1568 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded") 1569 self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE, 1570 "NetworkRemoved") 1571 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE, 1572 "NetworkSelected") 1573 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, 1574 "PropertiesChanged") 1575 self.loop.run() 1576 return self 1577 1578 def networkAdded(self, network, properties): 1579 logger.debug("networkAdded: %s" % str(network)) 1580 logger.debug(str(properties)) 1581 self.network_added = True 1582 1583 def networkRemoved(self, network): 1584 logger.debug("networkRemoved: %s" % str(network)) 1585 self.network_removed = True 1586 1587 def networkSelected(self, network): 1588 logger.debug("networkSelected: %s" % str(network)) 1589 self.network_selected = True 1590 1591 def propertiesChanged(self, properties): 1592 logger.debug("propertiesChanged: %s" % str(properties)) 1593 if 'State' in properties and properties['State'] == "completed": 1594 if self.state == 0: 1595 self.state = 1 1596 iface.Disconnect() 1597 elif self.state == 2: 1598 self.state = 3 1599 iface.Disconnect() 1600 elif self.state == 4: 1601 self.state = 5 1602 iface.Reattach() 1603 elif self.state == 5: 1604 self.state = 6 1605 res = iface.SignalPoll() 1606 logger.debug("SignalPoll: " + str(res)) 1607 if 'frequency' not in res or res['frequency'] != 2412: 1608 self.state = -1 1609 logger.info("Unexpected SignalPoll result") 1610 iface.RemoveNetwork(self.netw) 1611 if 'State' in properties and properties['State'] == "disconnected": 1612 if self.state == 1: 1613 self.state = 2 1614 iface.SelectNetwork(self.netw) 1615 elif self.state == 3: 1616 self.state = 4 1617 iface.Reassociate() 1618 elif self.state == 6: 1619 self.state = 7 1620 self.loop.quit() 1621 1622 def run_connect(self, *args): 1623 logger.debug("run_connect") 1624 args = dbus.Dictionary({'ssid': ssid, 1625 'key_mgmt': 'WPA-PSK', 1626 'psk': passphrase, 1627 'scan_freq': 2412}, 1628 signature='sv') 1629 try: 1630 self.netw = iface.AddNetwork(args) 1631 except Exception as e: 1632 logger.info("Exception on AddNetwork: " + str(e)) 1633 self.loop.quit() 1634 return False 1635 try: 1636 iface.SelectNetwork(self.netw) 1637 except Exception as e: 1638 logger.info("Exception on SelectNetwork: " + str(e)) 1639 self.loop.quit() 1640 1641 return False 1642 1643 def success(self): 1644 if not self.network_added or \ 1645 not self.network_removed or \ 1646 not self.network_selected: 1647 return False 1648 return self.state == 7 1649 1650 count = 0 1651 for i in range(1, 1000): 1652 for j in range(3): 1653 dev[j].dump_monitor() 1654 dev[0].request("TEST_ALLOC_FAIL %d:main" % i) 1655 try: 1656 with TestDbusConnect(bus) as t: 1657 if not t.success(): 1658 logger.info("Iteration %d - Expected signals not seen" % i) 1659 else: 1660 logger.info("Iteration %d - success" % i) 1661 1662 state = dev[0].request('GET_ALLOC_FAIL') 1663 logger.info("GET_ALLOC_FAIL: " + state) 1664 dev[0].dump_monitor() 1665 dev[0].request("TEST_ALLOC_FAIL 0:") 1666 if i < 3: 1667 raise Exception("Connection succeeded during out-of-memory") 1668 if not state.startswith('0:'): 1669 count += 1 1670 if count == 5: 1671 break 1672 except: 1673 pass 1674 1675 # Force regulatory update to re-fetch hw capabilities for the following 1676 # test cases. 1677 try: 1678 dev[0].dump_monitor() 1679 subprocess.call(['iw', 'reg', 'set', 'US']) 1680 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1) 1681 finally: 1682 dev[0].dump_monitor() 1683 subprocess.call(['iw', 'reg', 'set', '00']) 1684 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1) 1685 1686def test_dbus_while_not_connected(dev, apdev): 1687 """D-Bus invalid operations while not connected""" 1688 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1689 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1690 1691 try: 1692 iface.Disconnect() 1693 raise Exception("Disconnect() accepted when not connected") 1694 except dbus.exceptions.DBusException as e: 1695 if "NotConnected" not in str(e): 1696 raise Exception("Unexpected error message for invalid Disconnect: " + str(e)) 1697 1698 try: 1699 iface.Reattach() 1700 raise Exception("Reattach() accepted when not connected") 1701 except dbus.exceptions.DBusException as e: 1702 if "NotConnected" not in str(e): 1703 raise Exception("Unexpected error message for invalid Reattach: " + str(e)) 1704 1705def test_dbus_connect_eap(dev, apdev): 1706 """D-Bus AddNetwork and connect to EAP network""" 1707 check_altsubject_match_support(dev[0]) 1708 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1709 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1710 1711 ssid = "ieee8021x-open" 1712 params = hostapd.radius_params() 1713 params["ssid"] = ssid 1714 params["ieee8021x"] = "1" 1715 hapd = hostapd.add_ap(apdev[0], params) 1716 1717 class TestDbusConnect(TestDbus): 1718 def __init__(self, bus): 1719 TestDbus.__init__(self, bus) 1720 self.certification_received = False 1721 self.eap_status = False 1722 self.state = 0 1723 1724 def __enter__(self): 1725 gobject.timeout_add(1, self.run_connect) 1726 gobject.timeout_add(15000, self.timeout) 1727 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, 1728 "PropertiesChanged") 1729 self.add_signal(self.certification, WPAS_DBUS_IFACE, 1730 "Certification", byte_arrays=True) 1731 self.add_signal(self.networkRequest, WPAS_DBUS_IFACE, 1732 "NetworkRequest") 1733 self.add_signal(self.eap, WPAS_DBUS_IFACE, "EAP") 1734 self.loop.run() 1735 return self 1736 1737 def propertiesChanged(self, properties): 1738 logger.debug("propertiesChanged: %s" % str(properties)) 1739 if 'State' in properties and properties['State'] == "completed": 1740 if self.state == 0: 1741 self.state = 1 1742 iface.EAPLogoff() 1743 logger.info("Set dNSName constraint") 1744 net_obj = bus.get_object(WPAS_DBUS_SERVICE, self.netw) 1745 args = dbus.Dictionary({'altsubject_match': 1746 self.server_dnsname}, 1747 signature='sv') 1748 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args, 1749 dbus_interface=dbus.PROPERTIES_IFACE) 1750 elif self.state == 2: 1751 self.state = 3 1752 iface.Disconnect() 1753 logger.info("Set non-matching dNSName constraint") 1754 net_obj = bus.get_object(WPAS_DBUS_SERVICE, self.netw) 1755 args = dbus.Dictionary({'altsubject_match': 1756 self.server_dnsname + "FOO"}, 1757 signature='sv') 1758 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args, 1759 dbus_interface=dbus.PROPERTIES_IFACE) 1760 if 'State' in properties and properties['State'] == "disconnected": 1761 if self.state == 1: 1762 self.state = 2 1763 iface.EAPLogon() 1764 iface.SelectNetwork(self.netw) 1765 if self.state == 3: 1766 self.state = 4 1767 iface.SelectNetwork(self.netw) 1768 1769 def certification(self, args): 1770 logger.debug("certification: %s" % str(args)) 1771 self.certification_received = True 1772 if args['depth'] == 0: 1773 # The test server certificate is supposed to have dNSName 1774 if len(args['altsubject']) < 1: 1775 raise Exception("Missing dNSName") 1776 dnsname = args['altsubject'][0] 1777 if not dnsname.startswith("DNS:"): 1778 raise Exception("Expected dNSName not found: " + dnsname) 1779 logger.info("altsubject: " + dnsname) 1780 self.server_dnsname = dnsname 1781 1782 def eap(self, status, parameter): 1783 logger.debug("EAP: status=%s parameter=%s" % (status, parameter)) 1784 if status == 'completion' and parameter == 'success': 1785 self.eap_status = True 1786 if self.state == 4 and status == 'remote certificate verification' and parameter == 'AltSubject mismatch': 1787 self.state = 5 1788 self.loop.quit() 1789 1790 def networkRequest(self, path, field, txt): 1791 logger.debug("networkRequest: %s %s %s" % (path, field, txt)) 1792 if field == "PASSWORD": 1793 iface.NetworkReply(path, field, "password") 1794 1795 def run_connect(self, *args): 1796 logger.debug("run_connect") 1797 args = dbus.Dictionary({'ssid': ssid, 1798 'key_mgmt': 'IEEE8021X', 1799 'eapol_flags': 0, 1800 'eap': 'TTLS', 1801 'anonymous_identity': 'ttls', 1802 'identity': 'pap user', 1803 'ca_cert': 'auth_serv/ca.pem', 1804 'phase2': 'auth=PAP', 1805 'scan_freq': 2412}, 1806 signature='sv') 1807 self.netw = iface.AddNetwork(args) 1808 iface.SelectNetwork(self.netw) 1809 return False 1810 1811 def success(self): 1812 if not self.eap_status or not self.certification_received: 1813 return False 1814 return self.state == 5 1815 1816 with TestDbusConnect(bus) as t: 1817 if not t.success(): 1818 raise Exception("Expected signals not seen") 1819 1820def test_dbus_network(dev, apdev): 1821 """D-Bus AddNetwork/RemoveNetwork parameters and error cases""" 1822 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1823 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1824 1825 args = dbus.Dictionary({'ssid': "foo", 1826 'key_mgmt': 'WPA-PSK', 1827 'psk': "12345678", 1828 'identity': dbus.ByteArray([1, 2]), 1829 'priority': dbus.Int32(0), 1830 'scan_freq': dbus.UInt32(2412)}, 1831 signature='sv') 1832 netw = iface.AddNetwork(args) 1833 id = int(dev[0].list_networks()[0]['id']) 1834 val = dev[0].get_network(id, "scan_freq") 1835 if val != "2412": 1836 raise Exception("Invalid scan_freq value: " + str(val)) 1837 iface.RemoveNetwork(netw) 1838 1839 args = dbus.Dictionary({'ssid': "foo", 1840 'key_mgmt': 'NONE', 1841 'scan_freq': "2412 2432", 1842 'freq_list': "2412 2417 2432"}, 1843 signature='sv') 1844 netw = iface.AddNetwork(args) 1845 id = int(dev[0].list_networks()[0]['id']) 1846 val = dev[0].get_network(id, "scan_freq") 1847 if val != "2412 2432": 1848 raise Exception("Invalid scan_freq value (2): " + str(val)) 1849 val = dev[0].get_network(id, "freq_list") 1850 if val != "2412 2417 2432": 1851 raise Exception("Invalid freq_list value: " + str(val)) 1852 iface.RemoveNetwork(netw) 1853 try: 1854 iface.RemoveNetwork(netw) 1855 raise Exception("Invalid RemoveNetwork() accepted") 1856 except dbus.exceptions.DBusException as e: 1857 if "NetworkUnknown" not in str(e): 1858 raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e)) 1859 try: 1860 iface.SelectNetwork(netw) 1861 raise Exception("Invalid SelectNetwork() accepted") 1862 except dbus.exceptions.DBusException as e: 1863 if "NetworkUnknown" not in str(e): 1864 raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e)) 1865 1866 args = dbus.Dictionary({'ssid': "foo1", 'key_mgmt': 'NONE', 1867 'identity': "testuser", 'scan_freq': '2412'}, 1868 signature='sv') 1869 netw1 = iface.AddNetwork(args) 1870 args = dbus.Dictionary({'ssid': "foo2", 'key_mgmt': 'NONE'}, 1871 signature='sv') 1872 netw2 = iface.AddNetwork(args) 1873 res = if_obj.Get(WPAS_DBUS_IFACE, "Networks", 1874 dbus_interface=dbus.PROPERTIES_IFACE) 1875 if len(res) != 2: 1876 raise Exception("Unexpected number of networks") 1877 1878 net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1) 1879 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled", 1880 dbus_interface=dbus.PROPERTIES_IFACE) 1881 if res != False: 1882 raise Exception("Added network was unexpectedly enabled by default") 1883 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(True), 1884 dbus_interface=dbus.PROPERTIES_IFACE) 1885 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled", 1886 dbus_interface=dbus.PROPERTIES_IFACE) 1887 if res != True: 1888 raise Exception("Set(Enabled,True) did not seem to change property value") 1889 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(False), 1890 dbus_interface=dbus.PROPERTIES_IFACE) 1891 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled", 1892 dbus_interface=dbus.PROPERTIES_IFACE) 1893 if res != False: 1894 raise Exception("Set(Enabled,False) did not seem to change property value") 1895 try: 1896 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.UInt32(1), 1897 dbus_interface=dbus.PROPERTIES_IFACE) 1898 raise Exception("Invalid Set(Enabled,1) accepted") 1899 except dbus.exceptions.DBusException as e: 1900 if "Error.Failed: wrong property type" not in str(e): 1901 raise Exception("Unexpected error message for invalid Set(Enabled,1): " + str(e)) 1902 1903 args = dbus.Dictionary({'ssid': "foo1new"}, signature='sv') 1904 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args, 1905 dbus_interface=dbus.PROPERTIES_IFACE) 1906 res = net_obj.Get(WPAS_DBUS_NETWORK, "Properties", 1907 dbus_interface=dbus.PROPERTIES_IFACE) 1908 if res['ssid'] != '"foo1new"': 1909 raise Exception("Set(Properties) failed to update ssid") 1910 if res['identity'] != '"testuser"': 1911 raise Exception("Set(Properties) unexpectedly changed unrelated parameter") 1912 1913 iface.RemoveAllNetworks() 1914 res = if_obj.Get(WPAS_DBUS_IFACE, "Networks", 1915 dbus_interface=dbus.PROPERTIES_IFACE) 1916 if len(res) != 0: 1917 raise Exception("Unexpected number of networks") 1918 iface.RemoveAllNetworks() 1919 1920 tests = [dbus.Dictionary({'psk': "1234567"}, signature='sv'), 1921 dbus.Dictionary({'identity': dbus.ByteArray()}, 1922 signature='sv'), 1923 dbus.Dictionary({'identity': dbus.Byte(1)}, signature='sv')] 1924 for args in tests: 1925 try: 1926 iface.AddNetwork(args) 1927 raise Exception("Invalid AddNetwork args accepted: " + str(args)) 1928 except dbus.exceptions.DBusException as e: 1929 if "InvalidArgs" not in str(e): 1930 raise Exception("Unexpected error message for invalid AddNetwork: " + str(e)) 1931 1932def test_dbus_network_oom(dev, apdev): 1933 """D-Bus AddNetwork/RemoveNetwork parameters and OOM error cases""" 1934 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1935 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1936 1937 args = dbus.Dictionary({'ssid': "foo1", 'key_mgmt': 'NONE', 1938 'identity': "testuser", 'scan_freq': '2412'}, 1939 signature='sv') 1940 netw1 = iface.AddNetwork(args) 1941 net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1) 1942 1943 with alloc_fail_dbus(dev[0], 1, 1944 "wpa_config_get_all;wpas_dbus_getter_network_properties", 1945 "Get"): 1946 net_obj.Get(WPAS_DBUS_NETWORK, "Properties", 1947 dbus_interface=dbus.PROPERTIES_IFACE) 1948 1949 iface.RemoveAllNetworks() 1950 1951 with alloc_fail_dbus(dev[0], 1, 1952 "wpas_dbus_new_decompose_object_path;wpas_dbus_handler_remove_network", 1953 "RemoveNetwork", "InvalidArgs"): 1954 iface.RemoveNetwork(dbus.ObjectPath("/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234")) 1955 1956 with alloc_fail(dev[0], 1, "wpa_dbus_register_object_per_iface;wpas_dbus_register_network"): 1957 args = dbus.Dictionary({'ssid': "foo2", 'key_mgmt': 'NONE'}, 1958 signature='sv') 1959 try: 1960 netw = iface.AddNetwork(args) 1961 # Currently, AddNetwork() succeeds even if os_strdup() for path 1962 # fails, so remove the network if that occurs. 1963 iface.RemoveNetwork(netw) 1964 except dbus.exceptions.DBusException as e: 1965 pass 1966 1967 for i in range(1, 3): 1968 with alloc_fail(dev[0], i, "=wpas_dbus_register_network"): 1969 try: 1970 netw = iface.AddNetwork(args) 1971 # Currently, AddNetwork() succeeds even if network registration 1972 # fails, so remove the network if that occurs. 1973 iface.RemoveNetwork(netw) 1974 except dbus.exceptions.DBusException as e: 1975 pass 1976 1977 with alloc_fail_dbus(dev[0], 1, 1978 "=wpa_config_add_network;wpas_dbus_handler_add_network", 1979 "AddNetwork", 1980 "UnknownError: wpa_supplicant could not add a network"): 1981 args = dbus.Dictionary({'ssid': "foo2", 'key_mgmt': 'NONE'}, 1982 signature='sv') 1983 netw = iface.AddNetwork(args) 1984 1985 tests = [(1, 1986 'wpa_dbus_dict_get_entry;set_network_properties;wpas_dbus_handler_add_network', 1987 dbus.Dictionary({'ssid': dbus.ByteArray(b' ')}, 1988 signature='sv')), 1989 (1, '=set_network_properties;wpas_dbus_handler_add_network', 1990 dbus.Dictionary({'ssid': 'foo'}, signature='sv')), 1991 (1, '=set_network_properties;wpas_dbus_handler_add_network', 1992 dbus.Dictionary({'eap': 'foo'}, signature='sv')), 1993 (1, '=set_network_properties;wpas_dbus_handler_add_network', 1994 dbus.Dictionary({'priority': dbus.UInt32(1)}, 1995 signature='sv')), 1996 (1, '=set_network_properties;wpas_dbus_handler_add_network', 1997 dbus.Dictionary({'priority': dbus.Int32(1)}, 1998 signature='sv')), 1999 (1, '=set_network_properties;wpas_dbus_handler_add_network', 2000 dbus.Dictionary({'ssid': dbus.ByteArray(b' ')}, 2001 signature='sv'))] 2002 for (count, funcs, args) in tests: 2003 with alloc_fail_dbus(dev[0], count, funcs, "AddNetwork", "InvalidArgs"): 2004 netw = iface.AddNetwork(args) 2005 2006 if len(if_obj.Get(WPAS_DBUS_IFACE, 'Networks', 2007 dbus_interface=dbus.PROPERTIES_IFACE)) > 0: 2008 raise Exception("Unexpected network block added") 2009 if len(dev[0].list_networks()) > 0: 2010 raise Exception("Unexpected network block visible") 2011 2012def test_dbus_interface(dev, apdev): 2013 """D-Bus CreateInterface/GetInterface/RemoveInterface parameters and error cases""" 2014 try: 2015 _test_dbus_interface(dev, apdev) 2016 finally: 2017 # Need to force P2P channel list update since the 'lo' interface 2018 # with driver=none ends up configuring default dualband channels. 2019 dev[0].request("SET country US") 2020 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1) 2021 if ev is None: 2022 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], 2023 timeout=1) 2024 dev[0].request("SET country 00") 2025 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1) 2026 if ev is None: 2027 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], 2028 timeout=1) 2029 subprocess.call(['iw', 'reg', 'set', '00']) 2030 2031def _test_dbus_interface(dev, apdev): 2032 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2033 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE) 2034 2035 params = dbus.Dictionary({'Ifname': 'lo', 'Driver': 'none', 'Type': 'sta', 2036 'Address': '02:03:11:22:33:44'}, 2037 signature='sv') 2038 path = wpas.CreateInterface(params) 2039 logger.debug("New interface path: " + str(path)) 2040 path2 = wpas.GetInterface("lo") 2041 if path != path2: 2042 raise Exception("Interface object mismatch") 2043 2044 params = dbus.Dictionary({'Ifname': 'lo', 2045 'Driver': 'none', 2046 'ConfigFile': 'foo', 2047 'BridgeIfname': 'foo',}, 2048 signature='sv') 2049 try: 2050 wpas.CreateInterface(params) 2051 raise Exception("Invalid CreateInterface() accepted") 2052 except dbus.exceptions.DBusException as e: 2053 if "InterfaceExists" not in str(e): 2054 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e)) 2055 2056 wpas.RemoveInterface(path) 2057 try: 2058 wpas.RemoveInterface(path) 2059 raise Exception("Invalid RemoveInterface() accepted") 2060 except dbus.exceptions.DBusException as e: 2061 if "InterfaceUnknown" not in str(e): 2062 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e)) 2063 2064 params = dbus.Dictionary({'Ifname': 'lo', 'Driver': 'none', 2065 'Foo': 123}, 2066 signature='sv') 2067 try: 2068 wpas.CreateInterface(params) 2069 raise Exception("Invalid CreateInterface() accepted") 2070 except dbus.exceptions.DBusException as e: 2071 if "InvalidArgs" not in str(e): 2072 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e)) 2073 2074 params = dbus.Dictionary({'Driver': 'none'}, signature='sv') 2075 try: 2076 wpas.CreateInterface(params) 2077 raise Exception("Invalid CreateInterface() accepted") 2078 except dbus.exceptions.DBusException as e: 2079 if "InvalidArgs" not in str(e): 2080 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e)) 2081 2082 try: 2083 wpas.GetInterface("lo") 2084 raise Exception("Invalid GetInterface() accepted") 2085 except dbus.exceptions.DBusException as e: 2086 if "InterfaceUnknown" not in str(e): 2087 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e)) 2088 2089 params = dbus.Dictionary({'Ifname': 'lo', 'Driver': 'none', 2090 'Type': 'foo'}, signature='sv') 2091 try: 2092 wpas.CreateInterface(params) 2093 raise Exception("Invalid CreateInterface() accepted") 2094 except dbus.exceptions.DBusException as e: 2095 if "InvalidArgs" not in str(e): 2096 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e)) 2097 2098 try: 2099 wpas.GetInterface("lo") 2100 raise Exception("Invalid GetInterface() accepted") 2101 except dbus.exceptions.DBusException as e: 2102 if "InterfaceUnknown" not in str(e): 2103 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e)) 2104 2105 params = dbus.Dictionary({'Ifname': 'lo', 'Driver': 'none', 2106 'Address': 'foo'}, signature='sv') 2107 try: 2108 wpas.CreateInterface(params) 2109 raise Exception("Invalid CreateInterface() accepted") 2110 except dbus.exceptions.DBusException as e: 2111 if "InvalidArgs" not in str(e): 2112 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e)) 2113 2114 try: 2115 wpas.GetInterface("lo") 2116 raise Exception("Invalid GetInterface() accepted") 2117 except dbus.exceptions.DBusException as e: 2118 if "InterfaceUnknown" not in str(e): 2119 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e)) 2120 2121def test_dbus_interface_oom(dev, apdev): 2122 """D-Bus CreateInterface/GetInterface/RemoveInterface OOM error cases""" 2123 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2124 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE) 2125 2126 with alloc_fail_dbus(dev[0], 1, "wpa_dbus_dict_get_entry;wpas_dbus_handler_create_interface", "CreateInterface", "InvalidArgs"): 2127 params = dbus.Dictionary({'Ifname': 'lo', 'Driver': 'none'}, 2128 signature='sv') 2129 wpas.CreateInterface(params) 2130 2131 for i in range(1, 1000): 2132 dev[0].request("TEST_ALLOC_FAIL %d:wpa_supplicant_add_iface;wpas_dbus_handler_create_interface" % i) 2133 params = dbus.Dictionary({'Ifname': 'lo', 'Driver': 'none'}, 2134 signature='sv') 2135 try: 2136 npath = wpas.CreateInterface(params) 2137 wpas.RemoveInterface(npath) 2138 logger.info("CreateInterface succeeds after %d allocation failures" % i) 2139 state = dev[0].request('GET_ALLOC_FAIL') 2140 logger.info("GET_ALLOC_FAIL: " + state) 2141 dev[0].dump_monitor() 2142 dev[0].request("TEST_ALLOC_FAIL 0:") 2143 if i < 5: 2144 raise Exception("CreateInterface succeeded during out-of-memory") 2145 if not state.startswith('0:'): 2146 break 2147 except dbus.exceptions.DBusException as e: 2148 pass 2149 2150 for arg in ['Driver', 'Ifname', 'ConfigFile', 'BridgeIfname']: 2151 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_handler_create_interface", 2152 "CreateInterface"): 2153 params = dbus.Dictionary({arg: 'foo'}, signature='sv') 2154 wpas.CreateInterface(params) 2155 2156def test_dbus_blob(dev, apdev): 2157 """D-Bus AddNetwork/RemoveNetwork parameters and error cases""" 2158 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2159 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 2160 2161 blob = dbus.ByteArray(b"\x01\x02\x03") 2162 iface.AddBlob('blob1', blob) 2163 try: 2164 iface.AddBlob('blob1', dbus.ByteArray(b"\x01\x02\x04")) 2165 raise Exception("Invalid AddBlob() accepted") 2166 except dbus.exceptions.DBusException as e: 2167 if "BlobExists" not in str(e): 2168 raise Exception("Unexpected error message for invalid AddBlob: " + str(e)) 2169 res = iface.GetBlob('blob1') 2170 if len(res) != len(blob): 2171 raise Exception("Unexpected blob data length") 2172 for i in range(len(res)): 2173 if res[i] != dbus.Byte(blob[i]): 2174 raise Exception("Unexpected blob data") 2175 res = if_obj.Get(WPAS_DBUS_IFACE, "Blobs", 2176 dbus_interface=dbus.PROPERTIES_IFACE) 2177 if 'blob1' not in res: 2178 raise Exception("Added blob missing from Blobs property") 2179 iface.RemoveBlob('blob1') 2180 try: 2181 iface.RemoveBlob('blob1') 2182 raise Exception("Invalid RemoveBlob() accepted") 2183 except dbus.exceptions.DBusException as e: 2184 if "BlobUnknown" not in str(e): 2185 raise Exception("Unexpected error message for invalid RemoveBlob: " + str(e)) 2186 try: 2187 iface.GetBlob('blob1') 2188 raise Exception("Invalid GetBlob() accepted") 2189 except dbus.exceptions.DBusException as e: 2190 if "BlobUnknown" not in str(e): 2191 raise Exception("Unexpected error message for invalid GetBlob: " + str(e)) 2192 2193 class TestDbusBlob(TestDbus): 2194 def __init__(self, bus): 2195 TestDbus.__init__(self, bus) 2196 self.blob_added = False 2197 self.blob_removed = False 2198 2199 def __enter__(self): 2200 gobject.timeout_add(1, self.run_blob) 2201 gobject.timeout_add(15000, self.timeout) 2202 self.add_signal(self.blobAdded, WPAS_DBUS_IFACE, "BlobAdded") 2203 self.add_signal(self.blobRemoved, WPAS_DBUS_IFACE, "BlobRemoved") 2204 self.loop.run() 2205 return self 2206 2207 def blobAdded(self, blobName): 2208 logger.debug("blobAdded: %s" % blobName) 2209 if blobName == 'blob2': 2210 self.blob_added = True 2211 2212 def blobRemoved(self, blobName): 2213 logger.debug("blobRemoved: %s" % blobName) 2214 if blobName == 'blob2': 2215 self.blob_removed = True 2216 self.loop.quit() 2217 2218 def run_blob(self, *args): 2219 logger.debug("run_blob") 2220 iface.AddBlob('blob2', dbus.ByteArray(b"\x01\x02\x04")) 2221 iface.RemoveBlob('blob2') 2222 return False 2223 2224 def success(self): 2225 return self.blob_added and self.blob_removed 2226 2227 with TestDbusBlob(bus) as t: 2228 if not t.success(): 2229 raise Exception("Expected signals not seen") 2230 2231def test_dbus_blob_oom(dev, apdev): 2232 """D-Bus AddNetwork/RemoveNetwork OOM error cases""" 2233 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2234 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 2235 2236 for i in range(1, 4): 2237 with alloc_fail_dbus(dev[0], i, "wpas_dbus_handler_add_blob", 2238 "AddBlob"): 2239 iface.AddBlob('blob_no_mem', dbus.ByteArray(b"\x01\x02\x03\x04")) 2240 2241def test_dbus_autoscan(dev, apdev): 2242 """D-Bus Autoscan()""" 2243 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2244 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 2245 2246 iface.AutoScan("foo") 2247 iface.AutoScan("periodic:1") 2248 iface.AutoScan("") 2249 dev[0].request("AUTOSCAN ") 2250 2251def test_dbus_autoscan_oom(dev, apdev): 2252 """D-Bus Autoscan() OOM""" 2253 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2254 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 2255 2256 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_autoscan", "AutoScan"): 2257 iface.AutoScan("foo") 2258 dev[0].request("AUTOSCAN ") 2259 2260def test_dbus_tdls_invalid(dev, apdev): 2261 """D-Bus invalid TDLS operations""" 2262 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2263 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 2264 2265 hapd = hostapd.add_ap(apdev[0], {"ssid": "test-open"}) 2266 connect_2sta_open(dev, hapd) 2267 addr1 = dev[1].p2p_interface_addr() 2268 2269 try: 2270 iface.TDLSDiscover("foo") 2271 raise Exception("Invalid TDLSDiscover() accepted") 2272 except dbus.exceptions.DBusException as e: 2273 if "InvalidArgs" not in str(e): 2274 raise Exception("Unexpected error message for invalid TDLSDiscover: " + str(e)) 2275 2276 try: 2277 iface.TDLSStatus("foo") 2278 raise Exception("Invalid TDLSStatus() accepted") 2279 except dbus.exceptions.DBusException as e: 2280 if "InvalidArgs" not in str(e): 2281 raise Exception("Unexpected error message for invalid TDLSStatus: " + str(e)) 2282 2283 res = iface.TDLSStatus(addr1) 2284 if res != "peer does not exist": 2285 raise Exception("Unexpected TDLSStatus response") 2286 2287 try: 2288 iface.TDLSSetup("foo") 2289 raise Exception("Invalid TDLSSetup() accepted") 2290 except dbus.exceptions.DBusException as e: 2291 if "InvalidArgs" not in str(e): 2292 raise Exception("Unexpected error message for invalid TDLSSetup: " + str(e)) 2293 2294 try: 2295 iface.TDLSTeardown("foo") 2296 raise Exception("Invalid TDLSTeardown() accepted") 2297 except dbus.exceptions.DBusException as e: 2298 if "InvalidArgs" not in str(e): 2299 raise Exception("Unexpected error message for invalid TDLSTeardown: " + str(e)) 2300 2301 try: 2302 iface.TDLSTeardown("00:11:22:33:44:55") 2303 raise Exception("TDLSTeardown accepted for unknown peer") 2304 except dbus.exceptions.DBusException as e: 2305 if "UnknownError: error performing TDLS teardown" not in str(e): 2306 raise Exception("Unexpected error message: " + str(e)) 2307 2308 try: 2309 iface.TDLSChannelSwitch({}) 2310 raise Exception("Invalid TDLSChannelSwitch() accepted") 2311 except dbus.exceptions.DBusException as e: 2312 if "InvalidArgs" not in str(e): 2313 raise Exception("Unexpected error message for invalid TDLSChannelSwitch: " + str(e)) 2314 2315 try: 2316 iface.TDLSCancelChannelSwitch("foo") 2317 raise Exception("Invalid TDLSCancelChannelSwitch() accepted") 2318 except dbus.exceptions.DBusException as e: 2319 if "InvalidArgs" not in str(e): 2320 raise Exception("Unexpected error message for invalid TDLSCancelChannelSwitch: " + str(e)) 2321 2322def test_dbus_tdls_oom(dev, apdev): 2323 """D-Bus TDLS operations during OOM""" 2324 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2325 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 2326 2327 with alloc_fail_dbus(dev[0], 1, "wpa_tdls_add_peer", "TDLSSetup", 2328 "UnknownError: error performing TDLS setup"): 2329 iface.TDLSSetup("00:11:22:33:44:55") 2330 2331def test_dbus_tdls(dev, apdev): 2332 """D-Bus TDLS""" 2333 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2334 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 2335 2336 hapd = hostapd.add_ap(apdev[0], {"ssid": "test-open"}) 2337 connect_2sta_open(dev, hapd) 2338 2339 addr1 = dev[1].p2p_interface_addr() 2340 2341 class TestDbusTdls(TestDbus): 2342 def __init__(self, bus): 2343 TestDbus.__init__(self, bus) 2344 self.tdls_setup = False 2345 self.tdls_teardown = False 2346 2347 def __enter__(self): 2348 gobject.timeout_add(1, self.run_tdls) 2349 gobject.timeout_add(15000, self.timeout) 2350 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, 2351 "PropertiesChanged") 2352 self.loop.run() 2353 return self 2354 2355 def propertiesChanged(self, properties): 2356 logger.debug("propertiesChanged: %s" % str(properties)) 2357 2358 def run_tdls(self, *args): 2359 logger.debug("run_tdls") 2360 iface.TDLSDiscover(addr1) 2361 gobject.timeout_add(100, self.run_tdls2) 2362 return False 2363 2364 def run_tdls2(self, *args): 2365 logger.debug("run_tdls2") 2366 iface.TDLSSetup(addr1) 2367 gobject.timeout_add(500, self.run_tdls3) 2368 return False 2369 2370 def run_tdls3(self, *args): 2371 logger.debug("run_tdls3") 2372 res = iface.TDLSStatus(addr1) 2373 if res == "connected": 2374 self.tdls_setup = True 2375 else: 2376 logger.info("Unexpected TDLSStatus: " + res) 2377 iface.TDLSTeardown(addr1) 2378 gobject.timeout_add(200, self.run_tdls4) 2379 return False 2380 2381 def run_tdls4(self, *args): 2382 logger.debug("run_tdls4") 2383 res = iface.TDLSStatus(addr1) 2384 if res == "peer does not exist": 2385 self.tdls_teardown = True 2386 else: 2387 logger.info("Unexpected TDLSStatus: " + res) 2388 self.loop.quit() 2389 return False 2390 2391 def success(self): 2392 return self.tdls_setup and self.tdls_teardown 2393 2394 with TestDbusTdls(bus) as t: 2395 if not t.success(): 2396 raise Exception("Expected signals not seen") 2397 2398def test_dbus_tdls_channel_switch(dev, apdev): 2399 """D-Bus TDLS channel switch configuration""" 2400 flags = int(dev[0].get_driver_status_field('capa.flags'), 16) 2401 if flags & 0x800000000 == 0: 2402 raise HwsimSkip("Driver does not support TDLS channel switching") 2403 2404 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2405 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 2406 2407 hapd = hostapd.add_ap(apdev[0], {"ssid": "test-open"}) 2408 connect_2sta_open(dev, hapd) 2409 2410 addr1 = dev[1].p2p_interface_addr() 2411 2412 class TestDbusTdls(TestDbus): 2413 def __init__(self, bus): 2414 TestDbus.__init__(self, bus) 2415 self.tdls_setup = False 2416 self.tdls_done = False 2417 2418 def __enter__(self): 2419 gobject.timeout_add(1, self.run_tdls) 2420 gobject.timeout_add(15000, self.timeout) 2421 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, 2422 "PropertiesChanged") 2423 self.loop.run() 2424 return self 2425 2426 def propertiesChanged(self, properties): 2427 logger.debug("propertiesChanged: %s" % str(properties)) 2428 2429 def run_tdls(self, *args): 2430 logger.debug("run_tdls") 2431 iface.TDLSDiscover(addr1) 2432 gobject.timeout_add(100, self.run_tdls2) 2433 return False 2434 2435 def run_tdls2(self, *args): 2436 logger.debug("run_tdls2") 2437 iface.TDLSSetup(addr1) 2438 gobject.timeout_add(500, self.run_tdls3) 2439 return False 2440 2441 def run_tdls3(self, *args): 2442 logger.debug("run_tdls3") 2443 res = iface.TDLSStatus(addr1) 2444 if res == "connected": 2445 self.tdls_setup = True 2446 else: 2447 logger.info("Unexpected TDLSStatus: " + res) 2448 2449 # Unknown dict entry 2450 args = dbus.Dictionary({'Foobar': dbus.Byte(1)}, 2451 signature='sv') 2452 try: 2453 iface.TDLSChannelSwitch(args) 2454 except Exception as e: 2455 if "InvalidArgs" not in str(e): 2456 raise Exception("Unexpected exception") 2457 2458 # Missing OperClass 2459 args = dbus.Dictionary({}, signature='sv') 2460 try: 2461 iface.TDLSChannelSwitch(args) 2462 except Exception as e: 2463 if "InvalidArgs" not in str(e): 2464 raise Exception("Unexpected exception") 2465 2466 # Missing Frequency 2467 args = dbus.Dictionary({'OperClass': dbus.Byte(1)}, 2468 signature='sv') 2469 try: 2470 iface.TDLSChannelSwitch(args) 2471 except Exception as e: 2472 if "InvalidArgs" not in str(e): 2473 raise Exception("Unexpected exception") 2474 2475 # Missing PeerAddress 2476 args = dbus.Dictionary({'OperClass': dbus.Byte(1), 2477 'Frequency': dbus.UInt32(2417)}, 2478 signature='sv') 2479 try: 2480 iface.TDLSChannelSwitch(args) 2481 except Exception as e: 2482 if "InvalidArgs" not in str(e): 2483 raise Exception("Unexpected exception") 2484 2485 # Valid parameters 2486 args = dbus.Dictionary({'OperClass': dbus.Byte(1), 2487 'Frequency': dbus.UInt32(2417), 2488 'PeerAddress': addr1, 2489 'SecChannelOffset': dbus.UInt32(0), 2490 'CenterFrequency1': dbus.UInt32(0), 2491 'CenterFrequency2': dbus.UInt32(0), 2492 'Bandwidth': dbus.UInt32(20), 2493 'HT': dbus.Boolean(False), 2494 'VHT': dbus.Boolean(False)}, 2495 signature='sv') 2496 iface.TDLSChannelSwitch(args) 2497 2498 gobject.timeout_add(200, self.run_tdls4) 2499 return False 2500 2501 def run_tdls4(self, *args): 2502 logger.debug("run_tdls4") 2503 iface.TDLSCancelChannelSwitch(addr1) 2504 self.tdls_done = True 2505 self.loop.quit() 2506 return False 2507 2508 def success(self): 2509 return self.tdls_setup and self.tdls_done 2510 2511 with TestDbusTdls(bus) as t: 2512 if not t.success(): 2513 raise Exception("Expected signals not seen") 2514 2515def test_dbus_pkcs11(dev, apdev): 2516 """D-Bus SetPKCS11EngineAndModulePath()""" 2517 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2518 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 2519 2520 try: 2521 iface.SetPKCS11EngineAndModulePath("foo", "bar") 2522 except dbus.exceptions.DBusException as e: 2523 if "Error.Failed: Reinit of the EAPOL" not in str(e): 2524 raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e)) 2525 2526 try: 2527 iface.SetPKCS11EngineAndModulePath("foo", "") 2528 except dbus.exceptions.DBusException as e: 2529 if "Error.Failed: Reinit of the EAPOL" not in str(e): 2530 raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e)) 2531 2532 iface.SetPKCS11EngineAndModulePath("", "bar") 2533 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath", 2534 dbus_interface=dbus.PROPERTIES_IFACE) 2535 if res != "": 2536 raise Exception("Unexpected PKCS11EnginePath value: " + res) 2537 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath", 2538 dbus_interface=dbus.PROPERTIES_IFACE) 2539 if res != "bar": 2540 raise Exception("Unexpected PKCS11ModulePath value: " + res) 2541 2542 iface.SetPKCS11EngineAndModulePath("", "") 2543 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath", 2544 dbus_interface=dbus.PROPERTIES_IFACE) 2545 if res != "": 2546 raise Exception("Unexpected PKCS11EnginePath value: " + res) 2547 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath", 2548 dbus_interface=dbus.PROPERTIES_IFACE) 2549 if res != "": 2550 raise Exception("Unexpected PKCS11ModulePath value: " + res) 2551 2552def test_dbus_apscan(dev, apdev): 2553 """D-Bus Get/Set ApScan""" 2554 try: 2555 _test_dbus_apscan(dev, apdev) 2556 finally: 2557 dev[0].request("AP_SCAN 1") 2558 2559def _test_dbus_apscan(dev, apdev): 2560 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2561 2562 res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan", 2563 dbus_interface=dbus.PROPERTIES_IFACE) 2564 if res != 1: 2565 raise Exception("Unexpected initial ApScan value: %d" % res) 2566 2567 for i in range(3): 2568 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(i), 2569 dbus_interface=dbus.PROPERTIES_IFACE) 2570 res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan", 2571 dbus_interface=dbus.PROPERTIES_IFACE) 2572 if res != i: 2573 raise Exception("Unexpected ApScan value %d (expected %d)" % (res, i)) 2574 2575 try: 2576 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.Int16(-1), 2577 dbus_interface=dbus.PROPERTIES_IFACE) 2578 raise Exception("Invalid Set(ApScan,-1) accepted") 2579 except dbus.exceptions.DBusException as e: 2580 if "Error.Failed: wrong property type" not in str(e): 2581 raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e)) 2582 2583 try: 2584 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(123), 2585 dbus_interface=dbus.PROPERTIES_IFACE) 2586 raise Exception("Invalid Set(ApScan,123) accepted") 2587 except dbus.exceptions.DBusException as e: 2588 if "Error.Failed: ap_scan must be 0, 1, or 2" not in str(e): 2589 raise Exception("Unexpected error message for invalid Set(ApScan,123): " + str(e)) 2590 2591 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(1), 2592 dbus_interface=dbus.PROPERTIES_IFACE) 2593 2594def test_dbus_pmf(dev, apdev): 2595 """D-Bus Get/Set Pmf""" 2596 try: 2597 _test_dbus_pmf(dev, apdev) 2598 finally: 2599 dev[0].request("SET pmf 0") 2600 2601def _test_dbus_pmf(dev, apdev): 2602 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2603 2604 dev[0].set("pmf", "0") 2605 res = if_obj.Get(WPAS_DBUS_IFACE, "Pmf", 2606 dbus_interface=dbus.PROPERTIES_IFACE) 2607 if res != "0": 2608 raise Exception("Unexpected initial Pmf value: %s" % res) 2609 2610 for i in range(3): 2611 if_obj.Set(WPAS_DBUS_IFACE, "Pmf", str(i), 2612 dbus_interface=dbus.PROPERTIES_IFACE) 2613 res = if_obj.Get(WPAS_DBUS_IFACE, "Pmf", 2614 dbus_interface=dbus.PROPERTIES_IFACE) 2615 if res != str(i): 2616 raise Exception("Unexpected Pmf value %s (expected %d)" % (res, i)) 2617 2618 if_obj.Set(WPAS_DBUS_IFACE, "Pmf", "1", 2619 dbus_interface=dbus.PROPERTIES_IFACE) 2620 2621def test_dbus_fastreauth(dev, apdev): 2622 """D-Bus Get/Set FastReauth""" 2623 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2624 2625 res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth", 2626 dbus_interface=dbus.PROPERTIES_IFACE) 2627 if res != True: 2628 raise Exception("Unexpected initial FastReauth value: " + str(res)) 2629 2630 for i in [False, True]: 2631 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(i), 2632 dbus_interface=dbus.PROPERTIES_IFACE) 2633 res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth", 2634 dbus_interface=dbus.PROPERTIES_IFACE) 2635 if res != i: 2636 raise Exception("Unexpected FastReauth value %d (expected %d)" % (res, i)) 2637 2638 try: 2639 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Int16(-1), 2640 dbus_interface=dbus.PROPERTIES_IFACE) 2641 raise Exception("Invalid Set(FastReauth,-1) accepted") 2642 except dbus.exceptions.DBusException as e: 2643 if "Error.Failed: wrong property type" not in str(e): 2644 raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e)) 2645 2646 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(True), 2647 dbus_interface=dbus.PROPERTIES_IFACE) 2648 2649def test_dbus_bss_expire(dev, apdev): 2650 """D-Bus Get/Set BSSExpireAge and BSSExpireCount""" 2651 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2652 2653 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(179), 2654 dbus_interface=dbus.PROPERTIES_IFACE) 2655 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireAge", 2656 dbus_interface=dbus.PROPERTIES_IFACE) 2657 if res != 179: 2658 raise Exception("Unexpected BSSExpireAge value %d (expected %d)" % (res, i)) 2659 2660 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(3), 2661 dbus_interface=dbus.PROPERTIES_IFACE) 2662 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireCount", 2663 dbus_interface=dbus.PROPERTIES_IFACE) 2664 if res != 3: 2665 raise Exception("Unexpected BSSExpireCount value %d (expected %d)" % (res, i)) 2666 2667 try: 2668 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.Int16(-1), 2669 dbus_interface=dbus.PROPERTIES_IFACE) 2670 raise Exception("Invalid Set(BSSExpireAge,-1) accepted") 2671 except dbus.exceptions.DBusException as e: 2672 if "Error.Failed: wrong property type" not in str(e): 2673 raise Exception("Unexpected error message for invalid Set(BSSExpireAge,-1): " + str(e)) 2674 2675 try: 2676 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(9), 2677 dbus_interface=dbus.PROPERTIES_IFACE) 2678 raise Exception("Invalid Set(BSSExpireAge,9) accepted") 2679 except dbus.exceptions.DBusException as e: 2680 if "Error.Failed: BSSExpireAge must be >= 10" not in str(e): 2681 raise Exception("Unexpected error message for invalid Set(BSSExpireAge,9): " + str(e)) 2682 2683 try: 2684 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.Int16(-1), 2685 dbus_interface=dbus.PROPERTIES_IFACE) 2686 raise Exception("Invalid Set(BSSExpireCount,-1) accepted") 2687 except dbus.exceptions.DBusException as e: 2688 if "Error.Failed: wrong property type" not in str(e): 2689 raise Exception("Unexpected error message for invalid Set(BSSExpireCount,-1): " + str(e)) 2690 2691 try: 2692 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(0), 2693 dbus_interface=dbus.PROPERTIES_IFACE) 2694 raise Exception("Invalid Set(BSSExpireCount,0) accepted") 2695 except dbus.exceptions.DBusException as e: 2696 if "Error.Failed: BSSExpireCount must be > 0" not in str(e): 2697 raise Exception("Unexpected error message for invalid Set(BSSExpireCount,0): " + str(e)) 2698 2699 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(180), 2700 dbus_interface=dbus.PROPERTIES_IFACE) 2701 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(2), 2702 dbus_interface=dbus.PROPERTIES_IFACE) 2703 2704def test_dbus_country(dev, apdev): 2705 """D-Bus Get/Set Country""" 2706 try: 2707 _test_dbus_country(dev, apdev) 2708 finally: 2709 dev[0].request("SET country 00") 2710 subprocess.call(['iw', 'reg', 'set', '00']) 2711 2712def _test_dbus_country(dev, apdev): 2713 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2714 2715 # work around issues with possible pending regdom event from the end of 2716 # the previous test case 2717 time.sleep(0.2) 2718 dev[0].dump_monitor() 2719 2720 if_obj.Set(WPAS_DBUS_IFACE, "Country", "FI", 2721 dbus_interface=dbus.PROPERTIES_IFACE) 2722 res = if_obj.Get(WPAS_DBUS_IFACE, "Country", 2723 dbus_interface=dbus.PROPERTIES_IFACE) 2724 if res != "FI": 2725 raise Exception("Unexpected Country value %s (expected FI)" % res) 2726 2727 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"]) 2728 if ev is None: 2729 # For now, work around separate P2P Device interface event delivery 2730 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1) 2731 if ev is None: 2732 raise Exception("regdom change event not seen") 2733 if "init=USER type=COUNTRY alpha2=FI" not in ev: 2734 raise Exception("Unexpected event contents: " + ev) 2735 2736 try: 2737 if_obj.Set(WPAS_DBUS_IFACE, "Country", dbus.Int16(-1), 2738 dbus_interface=dbus.PROPERTIES_IFACE) 2739 raise Exception("Invalid Set(Country,-1) accepted") 2740 except dbus.exceptions.DBusException as e: 2741 if "Error.Failed: wrong property type" not in str(e): 2742 raise Exception("Unexpected error message for invalid Set(Country,-1): " + str(e)) 2743 2744 try: 2745 if_obj.Set(WPAS_DBUS_IFACE, "Country", "F", 2746 dbus_interface=dbus.PROPERTIES_IFACE) 2747 raise Exception("Invalid Set(Country,F) accepted") 2748 except dbus.exceptions.DBusException as e: 2749 if "Error.Failed: invalid country code" not in str(e): 2750 raise Exception("Unexpected error message for invalid Set(Country,F): " + str(e)) 2751 2752 if_obj.Set(WPAS_DBUS_IFACE, "Country", "00", 2753 dbus_interface=dbus.PROPERTIES_IFACE) 2754 2755 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"]) 2756 if ev is None: 2757 # For now, work around separate P2P Device interface event delivery 2758 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1) 2759 if ev is None: 2760 raise Exception("regdom change event not seen") 2761 # init=CORE was previously used due to invalid db.txt data for 00. For 2762 # now, allow both it and the new init=USER after fixed db.txt. 2763 if "init=CORE type=WORLD" not in ev and "init=USER type=WORLD" not in ev: 2764 raise Exception("Unexpected event contents: " + ev) 2765 2766def test_dbus_scan_interval(dev, apdev): 2767 """D-Bus Get/Set ScanInterval""" 2768 try: 2769 _test_dbus_scan_interval(dev, apdev) 2770 finally: 2771 dev[0].request("SCAN_INTERVAL 5") 2772 2773def _test_dbus_scan_interval(dev, apdev): 2774 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2775 2776 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(3), 2777 dbus_interface=dbus.PROPERTIES_IFACE) 2778 res = if_obj.Get(WPAS_DBUS_IFACE, "ScanInterval", 2779 dbus_interface=dbus.PROPERTIES_IFACE) 2780 if res != 3: 2781 raise Exception("Unexpected ScanInterval value %d (expected %d)" % (res, i)) 2782 2783 try: 2784 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.UInt16(100), 2785 dbus_interface=dbus.PROPERTIES_IFACE) 2786 raise Exception("Invalid Set(ScanInterval,100) accepted") 2787 except dbus.exceptions.DBusException as e: 2788 if "Error.Failed: wrong property type" not in str(e): 2789 raise Exception("Unexpected error message for invalid Set(ScanInterval,100): " + str(e)) 2790 2791 try: 2792 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(-1), 2793 dbus_interface=dbus.PROPERTIES_IFACE) 2794 raise Exception("Invalid Set(ScanInterval,-1) accepted") 2795 except dbus.exceptions.DBusException as e: 2796 if "Error.Failed: scan_interval must be >= 0" not in str(e): 2797 raise Exception("Unexpected error message for invalid Set(ScanInterval,-1): " + str(e)) 2798 2799 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(5), 2800 dbus_interface=dbus.PROPERTIES_IFACE) 2801 2802def test_dbus_probe_req_reporting(dev, apdev): 2803 """D-Bus Probe Request reporting""" 2804 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2805 2806 dev[1].p2p_find(social=True) 2807 2808 class TestDbusProbe(TestDbus): 2809 def __init__(self, bus): 2810 TestDbus.__init__(self, bus) 2811 self.reported = False 2812 2813 def __enter__(self): 2814 gobject.timeout_add(1, self.run_test) 2815 gobject.timeout_add(15000, self.timeout) 2816 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 2817 "GroupStarted") 2818 self.add_signal(self.probeRequest, WPAS_DBUS_IFACE, "ProbeRequest", 2819 byte_arrays=True) 2820 self.loop.run() 2821 return self 2822 2823 def groupStarted(self, properties): 2824 logger.debug("groupStarted: " + str(properties)) 2825 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 2826 properties['interface_object']) 2827 self.iface = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE) 2828 self.iface.SubscribeProbeReq() 2829 self.group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 2830 2831 def probeRequest(self, args): 2832 logger.debug("probeRequest: args=%s" % str(args)) 2833 self.reported = True 2834 self.loop.quit() 2835 2836 def run_test(self, *args): 2837 logger.debug("run_test") 2838 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 2839 params = dbus.Dictionary({'frequency': 2412}) 2840 p2p.GroupAdd(params) 2841 return False 2842 2843 def success(self): 2844 return self.reported 2845 2846 with TestDbusProbe(bus) as t: 2847 if not t.success(): 2848 raise Exception("Expected signals not seen") 2849 t.iface.UnsubscribeProbeReq() 2850 try: 2851 t.iface.UnsubscribeProbeReq() 2852 raise Exception("Invalid UnsubscribeProbeReq() accepted") 2853 except dbus.exceptions.DBusException as e: 2854 if "NoSubscription" not in str(e): 2855 raise Exception("Unexpected error message for invalid UnsubscribeProbeReq(): " + str(e)) 2856 t.group_p2p.Disconnect() 2857 2858 with TestDbusProbe(bus) as t: 2859 if not t.success(): 2860 raise Exception("Expected signals not seen") 2861 # On purpose, leave ProbeReq subscription in place to test automatic 2862 # cleanup. 2863 2864 dev[1].p2p_stop_find() 2865 2866def test_dbus_probe_req_reporting_oom(dev, apdev): 2867 """D-Bus Probe Request reporting (OOM)""" 2868 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2869 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 2870 2871 # Need to make sure this process has not already subscribed to avoid false 2872 # failures due to the operation succeeding due to os_strdup() not even 2873 # getting called. 2874 try: 2875 iface.UnsubscribeProbeReq() 2876 was_subscribed = True 2877 except dbus.exceptions.DBusException as e: 2878 was_subscribed = False 2879 pass 2880 2881 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_subscribe_preq", 2882 "SubscribeProbeReq"): 2883 iface.SubscribeProbeReq() 2884 2885 if was_subscribed: 2886 # On purpose, leave ProbeReq subscription in place to test automatic 2887 # cleanup. 2888 iface.SubscribeProbeReq() 2889 2890def test_dbus_p2p_invalid(dev, apdev): 2891 """D-Bus invalid P2P operations""" 2892 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2893 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 2894 2895 try: 2896 p2p.RejectPeer(path + "/Peers/00112233445566") 2897 raise Exception("Invalid RejectPeer accepted") 2898 except dbus.exceptions.DBusException as e: 2899 if "UnknownError: Failed to call wpas_p2p_reject" not in str(e): 2900 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e)) 2901 2902 try: 2903 p2p.RejectPeer("/foo") 2904 raise Exception("Invalid RejectPeer accepted") 2905 except dbus.exceptions.DBusException as e: 2906 if "InvalidArgs" not in str(e): 2907 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e)) 2908 2909 tests = [{}, 2910 {'peer': 'foo'}, 2911 {'foo': "bar"}, 2912 {'iface': "abc"}, 2913 {'iface': 123}] 2914 for t in tests: 2915 try: 2916 p2p.RemoveClient(t) 2917 raise Exception("Invalid RemoveClient accepted") 2918 except dbus.exceptions.DBusException as e: 2919 if "InvalidArgs" not in str(e): 2920 raise Exception("Unexpected error message for invalid RemoveClient(): " + str(e)) 2921 2922 tests = [{'DiscoveryType': 'foo'}, 2923 {'RequestedDeviceTypes': 'foo'}, 2924 {'RequestedDeviceTypes': ['foo']}, 2925 {'RequestedDeviceTypes': ['1', '2', '3', '4', '5', '6', '7', '8', 2926 '9', '10', '11', '12', '13', '14', '15', 2927 '16', '17']}, 2928 {'RequestedDeviceTypes': dbus.Array([], signature="s")}, 2929 {'RequestedDeviceTypes': dbus.Array([['foo']], signature="as")}, 2930 {'RequestedDeviceTypes': dbus.Array([], signature="i")}, 2931 {'RequestedDeviceTypes': [dbus.ByteArray(b'12345678'), 2932 dbus.ByteArray(b'1234567')]}, 2933 {'Foo': dbus.Int16(1)}, 2934 {'Foo': dbus.UInt16(1)}, 2935 {'Foo': dbus.Int64(1)}, 2936 {'Foo': dbus.UInt64(1)}, 2937 {'Foo': dbus.Double(1.23)}, 2938 {'Foo': dbus.Signature('s')}, 2939 {'Foo': 'bar'}] 2940 for t in tests: 2941 try: 2942 p2p.Find(dbus.Dictionary(t)) 2943 raise Exception("Invalid Find accepted") 2944 except dbus.exceptions.DBusException as e: 2945 if "InvalidArgs" not in str(e): 2946 raise Exception("Unexpected error message for invalid Find(): " + str(e)) 2947 2948 for p in ["/foo", 2949 "/fi/w1/wpa_supplicant1/Interfaces/1234", 2950 "/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234"]: 2951 try: 2952 p2p.RemovePersistentGroup(dbus.ObjectPath(p)) 2953 raise Exception("Invalid RemovePersistentGroup accepted") 2954 except dbus.exceptions.DBusException as e: 2955 if "InvalidArgs" not in str(e): 2956 raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e)) 2957 2958 try: 2959 dev[0].request("P2P_SET disabled 1") 2960 p2p.Listen(5) 2961 raise Exception("Invalid Listen accepted") 2962 except dbus.exceptions.DBusException as e: 2963 if "UnknownError: Could not start P2P listen" not in str(e): 2964 raise Exception("Unexpected error message for invalid Listen: " + str(e)) 2965 finally: 2966 dev[0].request("P2P_SET disabled 0") 2967 2968 test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False) 2969 test_p2p = dbus.Interface(test_obj, WPAS_DBUS_IFACE_P2PDEVICE) 2970 try: 2971 test_p2p.Listen("foo") 2972 raise Exception("Invalid Listen accepted") 2973 except dbus.exceptions.DBusException as e: 2974 if "InvalidArgs" not in str(e): 2975 raise Exception("Unexpected error message for invalid Listen: " + str(e)) 2976 2977 try: 2978 dev[0].request("P2P_SET disabled 1") 2979 p2p.ExtendedListen(dbus.Dictionary({})) 2980 raise Exception("Invalid ExtendedListen accepted") 2981 except dbus.exceptions.DBusException as e: 2982 if "UnknownError: failed to initiate a p2p_ext_listen" not in str(e): 2983 raise Exception("Unexpected error message for invalid ExtendedListen: " + str(e)) 2984 finally: 2985 dev[0].request("P2P_SET disabled 0") 2986 2987 try: 2988 dev[0].request("P2P_SET disabled 1") 2989 args = {'duration1': 30000, 'interval1': 102400, 2990 'duration2': 20000, 'interval2': 102400} 2991 p2p.PresenceRequest(args) 2992 raise Exception("Invalid PresenceRequest accepted") 2993 except dbus.exceptions.DBusException as e: 2994 if "UnknownError: Failed to invoke presence request" not in str(e): 2995 raise Exception("Unexpected error message for invalid PresenceRequest: " + str(e)) 2996 finally: 2997 dev[0].request("P2P_SET disabled 0") 2998 2999 try: 3000 params = dbus.Dictionary({'frequency': dbus.Int32(-1)}) 3001 p2p.GroupAdd(params) 3002 raise Exception("Invalid GroupAdd accepted") 3003 except dbus.exceptions.DBusException as e: 3004 if "InvalidArgs" not in str(e): 3005 raise Exception("Unexpected error message for invalid GroupAdd: " + str(e)) 3006 3007 try: 3008 params = dbus.Dictionary({'persistent_group_object': 3009 dbus.ObjectPath(path), 3010 'frequency': 2412}) 3011 p2p.GroupAdd(params) 3012 raise Exception("Invalid GroupAdd accepted") 3013 except dbus.exceptions.DBusException as e: 3014 if "InvalidArgs" not in str(e): 3015 raise Exception("Unexpected error message for invalid GroupAdd: " + str(e)) 3016 3017 try: 3018 p2p.Disconnect() 3019 raise Exception("Invalid Disconnect accepted") 3020 except dbus.exceptions.DBusException as e: 3021 if "UnknownError: failed to disconnect" not in str(e): 3022 raise Exception("Unexpected error message for invalid Disconnect: " + str(e)) 3023 3024 try: 3025 dev[0].request("P2P_SET disabled 1") 3026 p2p.Flush() 3027 raise Exception("Invalid Flush accepted") 3028 except dbus.exceptions.DBusException as e: 3029 if "Error.Failed: P2P is not available for this interface" not in str(e): 3030 raise Exception("Unexpected error message for invalid Flush: " + str(e)) 3031 finally: 3032 dev[0].request("P2P_SET disabled 0") 3033 3034 try: 3035 dev[0].request("P2P_SET disabled 1") 3036 args = {'peer': path, 3037 'join': True, 3038 'wps_method': 'pbc', 3039 'frequency': 2412} 3040 pin = p2p.Connect(args) 3041 raise Exception("Invalid Connect accepted") 3042 except dbus.exceptions.DBusException as e: 3043 if "Error.Failed: P2P is not available for this interface" not in str(e): 3044 raise Exception("Unexpected error message for invalid Connect: " + str(e)) 3045 finally: 3046 dev[0].request("P2P_SET disabled 0") 3047 3048 tests = [{'frequency': dbus.Int32(-1)}, 3049 {'wps_method': 'pbc'}, 3050 {'wps_method': 'foo'}] 3051 for args in tests: 3052 try: 3053 pin = p2p.Connect(args) 3054 raise Exception("Invalid Connect accepted") 3055 except dbus.exceptions.DBusException as e: 3056 if "InvalidArgs" not in str(e): 3057 raise Exception("Unexpected error message for invalid Connect: " + str(e)) 3058 3059 try: 3060 dev[0].request("P2P_SET disabled 1") 3061 args = {'peer': path} 3062 pin = p2p.Invite(args) 3063 raise Exception("Invalid Invite accepted") 3064 except dbus.exceptions.DBusException as e: 3065 if "Error.Failed: P2P is not available for this interface" not in str(e): 3066 raise Exception("Unexpected error message for invalid Invite: " + str(e)) 3067 finally: 3068 dev[0].request("P2P_SET disabled 0") 3069 3070 try: 3071 args = {'foo': 'bar'} 3072 pin = p2p.Invite(args) 3073 raise Exception("Invalid Invite accepted") 3074 except dbus.exceptions.DBusException as e: 3075 if "InvalidArgs" not in str(e): 3076 raise Exception("Unexpected error message for invalid Connect: " + str(e)) 3077 3078 tests = [(path, 'display', "InvalidArgs"), 3079 (dbus.ObjectPath(path + "/Peers/00112233445566"), 3080 'display', 3081 "UnknownError: Failed to send provision discovery request"), 3082 (dbus.ObjectPath(path + "/Peers/00112233445566"), 3083 'keypad', 3084 "UnknownError: Failed to send provision discovery request"), 3085 (dbus.ObjectPath(path + "/Peers/00112233445566"), 3086 'pbc', 3087 "UnknownError: Failed to send provision discovery request"), 3088 (dbus.ObjectPath(path + "/Peers/00112233445566"), 3089 'pushbutton', 3090 "UnknownError: Failed to send provision discovery request"), 3091 (dbus.ObjectPath(path + "/Peers/00112233445566"), 3092 'foo', "InvalidArgs")] 3093 for (p, method, err) in tests: 3094 try: 3095 p2p.ProvisionDiscoveryRequest(p, method) 3096 raise Exception("Invalid ProvisionDiscoveryRequest accepted") 3097 except dbus.exceptions.DBusException as e: 3098 if err not in str(e): 3099 raise Exception("Unexpected error message for invalid ProvisionDiscoveryRequest: " + str(e)) 3100 3101 try: 3102 dev[0].request("P2P_SET disabled 1") 3103 if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers", 3104 dbus_interface=dbus.PROPERTIES_IFACE) 3105 raise Exception("Invalid Get(Peers) accepted") 3106 except dbus.exceptions.DBusException as e: 3107 if "Error.Failed: P2P is not available for this interface" not in str(e): 3108 raise Exception("Unexpected error message for invalid Get(Peers): " + str(e)) 3109 finally: 3110 dev[0].request("P2P_SET disabled 0") 3111 3112def test_dbus_p2p_oom(dev, apdev): 3113 """D-Bus P2P operations and OOM""" 3114 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 3115 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 3116 3117 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_string_array", 3118 "Find", "InvalidArgs"): 3119 p2p.Find(dbus.Dictionary({'Foo': ['bar']})) 3120 3121 with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_string_array", 3122 "Find", "InvalidArgs"): 3123 p2p.Find(dbus.Dictionary({'Foo': ['bar']})) 3124 3125 with alloc_fail_dbus(dev[0], 10, "_wpa_dbus_dict_entry_get_string_array", 3126 "Find", "InvalidArgs"): 3127 p2p.Find(dbus.Dictionary({'Foo': ['1', '2', '3', '4', '5', '6', '7', 3128 '8', '9']})) 3129 3130 with alloc_fail_dbus(dev[0], 1, ":=_wpa_dbus_dict_entry_get_binarray", 3131 "Find", "InvalidArgs"): 3132 p2p.Find(dbus.Dictionary({'Foo': [dbus.ByteArray(b'123')]})) 3133 3134 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array;_wpa_dbus_dict_entry_get_binarray", 3135 "Find", "InvalidArgs"): 3136 p2p.Find(dbus.Dictionary({'Foo': [dbus.ByteArray(b'123')]})) 3137 3138 with alloc_fail_dbus(dev[0], 2, "=_wpa_dbus_dict_entry_get_binarray", 3139 "Find", "InvalidArgs"): 3140 p2p.Find(dbus.Dictionary({'Foo': [dbus.ByteArray(b'123'), 3141 dbus.ByteArray(b'123'), 3142 dbus.ByteArray(b'123'), 3143 dbus.ByteArray(b'123'), 3144 dbus.ByteArray(b'123'), 3145 dbus.ByteArray(b'123'), 3146 dbus.ByteArray(b'123'), 3147 dbus.ByteArray(b'123'), 3148 dbus.ByteArray(b'123'), 3149 dbus.ByteArray(b'123'), 3150 dbus.ByteArray(b'123')]})) 3151 3152 with alloc_fail_dbus(dev[0], 1, "wpabuf_alloc_ext_data;_wpa_dbus_dict_entry_get_binarray", 3153 "Find", "InvalidArgs"): 3154 p2p.Find(dbus.Dictionary({'Foo': [dbus.ByteArray(b'123')]})) 3155 3156 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_handler_p2p_find", 3157 "Find", "InvalidArgs"): 3158 p2p.Find(dbus.Dictionary({'Foo': path})) 3159 3160 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array", 3161 "AddService", "InvalidArgs"): 3162 args = {'service_type': 'bonjour', 3163 'response': dbus.ByteArray(500*b'b')} 3164 p2p.AddService(args) 3165 3166 with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_byte_array", 3167 "AddService", "InvalidArgs"): 3168 p2p.AddService(args) 3169 3170def test_dbus_p2p_discovery(dev, apdev): 3171 """D-Bus P2P discovery""" 3172 try: 3173 run_dbus_p2p_discovery(dev, apdev) 3174 finally: 3175 dev[1].request("VENDOR_ELEM_REMOVE 1 *") 3176 3177def run_dbus_p2p_discovery(dev, apdev): 3178 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 3179 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 3180 3181 addr0 = dev[0].p2p_dev_addr() 3182 3183 dev[1].request("SET sec_device_type 1-0050F204-2") 3184 dev[1].request("VENDOR_ELEM_ADD 1 dd0c0050f2041049000411223344") 3185 dev[1].request("VENDOR_ELEM_ADD 1 dd06001122335566") 3186 dev[1].p2p_listen() 3187 addr1 = dev[1].p2p_dev_addr() 3188 a1 = binascii.unhexlify(addr1.replace(':', '')) 3189 3190 wfd_devinfo = "00001c440028" 3191 dev[2].request("SET wifi_display 1") 3192 dev[2].request("WFD_SUBELEM_SET 0 0006" + wfd_devinfo) 3193 wfd = binascii.unhexlify('000006' + wfd_devinfo) 3194 dev[2].p2p_listen() 3195 addr2 = dev[2].p2p_dev_addr() 3196 a2 = binascii.unhexlify(addr2.replace(':', '')) 3197 3198 res = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE, 3199 dbus_interface=dbus.PROPERTIES_IFACE) 3200 if 'Peers' not in res: 3201 raise Exception("GetAll result missing Peers") 3202 if len(res['Peers']) != 0: 3203 raise Exception("Unexpected peer(s) in the list") 3204 3205 args = {'DiscoveryType': 'social', 3206 'RequestedDeviceTypes': [dbus.ByteArray(b'12345678')], 3207 'Timeout': dbus.Int32(1)} 3208 p2p.Find(dbus.Dictionary(args)) 3209 p2p.StopFind() 3210 3211 class TestDbusP2p(TestDbus): 3212 def __init__(self, bus): 3213 TestDbus.__init__(self, bus) 3214 self.found = False 3215 self.found2 = False 3216 self.found_prop = False 3217 self.lost = False 3218 self.find_stopped = False 3219 3220 def __enter__(self): 3221 gobject.timeout_add(1, self.run_test) 3222 gobject.timeout_add(15000, self.timeout) 3223 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 3224 "DeviceFound") 3225 self.add_signal(self.deviceFoundProperties, 3226 WPAS_DBUS_IFACE_P2PDEVICE, "DeviceFoundProperties") 3227 self.add_signal(self.deviceLost, WPAS_DBUS_IFACE_P2PDEVICE, 3228 "DeviceLost") 3229 self.add_signal(self.provisionDiscoveryResponseEnterPin, 3230 WPAS_DBUS_IFACE_P2PDEVICE, 3231 "ProvisionDiscoveryResponseEnterPin") 3232 self.add_signal(self.findStopped, WPAS_DBUS_IFACE_P2PDEVICE, 3233 "FindStopped") 3234 self.loop.run() 3235 return self 3236 3237 def deviceFound(self, path): 3238 logger.debug("deviceFound: path=%s" % path) 3239 res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers", 3240 dbus_interface=dbus.PROPERTIES_IFACE) 3241 if len(res) < 1: 3242 raise Exception("Unexpected number of peers") 3243 if path not in res: 3244 raise Exception("Mismatch in peer object path") 3245 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path) 3246 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER, 3247 dbus_interface=dbus.PROPERTIES_IFACE, 3248 byte_arrays=True) 3249 logger.debug("peer properties: " + str(res)) 3250 3251 if res['DeviceAddress'] == a1: 3252 if 'SecondaryDeviceTypes' not in res: 3253 raise Exception("Missing SecondaryDeviceTypes") 3254 sec = res['SecondaryDeviceTypes'] 3255 if len(sec) < 1: 3256 raise Exception("Secondary device type missing") 3257 if b"\x00\x01\x00\x50\xF2\x04\x00\x02" not in sec: 3258 raise Exception("Secondary device type mismatch") 3259 3260 if 'VendorExtension' not in res: 3261 raise Exception("Missing VendorExtension") 3262 vendor = res['VendorExtension'] 3263 if len(vendor) < 1: 3264 raise Exception("Vendor extension missing") 3265 if b"\x11\x22\x33\x44" not in vendor: 3266 raise Exception("Secondary device type mismatch") 3267 3268 if 'VSIE' not in res: 3269 raise Exception("Missing VSIE") 3270 vendor = res['VSIE'] 3271 if len(vendor) < 1: 3272 raise Exception("VSIE missing") 3273 if vendor != b"\xdd\x06\x00\x11\x22\x33\x55\x66": 3274 raise Exception("VSIE mismatch") 3275 3276 self.found = True 3277 elif res['DeviceAddress'] == a2: 3278 if 'IEs' not in res: 3279 raise Exception("IEs missing") 3280 if res['IEs'] != wfd: 3281 raise Exception("IEs mismatch") 3282 self.found2 = True 3283 else: 3284 raise Exception("Unexpected peer device address") 3285 3286 if self.found and self.found2: 3287 p2p.StopFind() 3288 p2p.RejectPeer(path) 3289 p2p.ProvisionDiscoveryRequest(path, 'display') 3290 3291 def deviceLost(self, path): 3292 logger.debug("deviceLost: path=%s" % path) 3293 if not self.found or not self.found2: 3294 # This may happen if a previous test case ended up scheduling 3295 # deviceLost event and that event did not get delivered before 3296 # starting the next test execution. 3297 logger.debug("Ignore deviceLost before the deviceFound events") 3298 return 3299 self.lost = True 3300 try: 3301 p2p.RejectPeer(path) 3302 raise Exception("Invalid RejectPeer accepted") 3303 except dbus.exceptions.DBusException as e: 3304 if "UnknownError: Failed to call wpas_p2p_reject" not in str(e): 3305 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e)) 3306 self.loop.quit() 3307 3308 def deviceFoundProperties(self, path, properties): 3309 logger.debug("deviceFoundProperties: path=%s" % path) 3310 logger.debug("peer properties: " + str(properties)) 3311 if properties['DeviceAddress'] == a1: 3312 self.found_prop = True 3313 3314 def provisionDiscoveryResponseEnterPin(self, peer_object): 3315 logger.debug("provisionDiscoveryResponseEnterPin - peer=%s" % peer_object) 3316 p2p.Flush() 3317 3318 def findStopped(self): 3319 logger.debug("findStopped") 3320 self.find_stopped = True 3321 3322 def run_test(self, *args): 3323 logger.debug("run_test") 3324 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social', 3325 'Timeout': dbus.Int32(10)})) 3326 return False 3327 3328 def success(self): 3329 return self.found and self.lost and self.found2 and self.find_stopped 3330 3331 with TestDbusP2p(bus) as t: 3332 if not t.success(): 3333 raise Exception("Expected signals not seen") 3334 3335 dev[1].request("VENDOR_ELEM_REMOVE 1 *") 3336 dev[1].p2p_stop_find() 3337 3338 p2p.Listen(1) 3339 dev[2].p2p_stop_find() 3340 dev[2].request("P2P_FLUSH") 3341 if not dev[2].discover_peer(addr0): 3342 raise Exception("Peer not found") 3343 p2p.StopFind() 3344 dev[2].p2p_stop_find() 3345 3346 try: 3347 p2p.ExtendedListen(dbus.Dictionary({'foo': 100})) 3348 raise Exception("Invalid ExtendedListen accepted") 3349 except dbus.exceptions.DBusException as e: 3350 if "InvalidArgs" not in str(e): 3351 raise Exception("Unexpected error message for invalid ExtendedListen(): " + str(e)) 3352 3353 p2p.ExtendedListen(dbus.Dictionary({'period': 100, 'interval': 1000})) 3354 p2p.ExtendedListen(dbus.Dictionary({})) 3355 dev[0].global_request("P2P_EXT_LISTEN") 3356 3357def test_dbus_p2p_discovery_freq(dev, apdev): 3358 """D-Bus P2P discovery on a specific non-social channel""" 3359 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 3360 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 3361 3362 addr1 = dev[1].p2p_dev_addr() 3363 autogo(dev[1], freq=2422) 3364 3365 class TestDbusP2p(TestDbus): 3366 def __init__(self, bus): 3367 TestDbus.__init__(self, bus) 3368 self.found = False 3369 3370 def __enter__(self): 3371 gobject.timeout_add(1, self.run_test) 3372 gobject.timeout_add(5000, self.timeout) 3373 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 3374 "DeviceFound") 3375 self.loop.run() 3376 return self 3377 3378 def deviceFound(self, path): 3379 logger.debug("deviceFound: path=%s" % path) 3380 self.found = True 3381 self.loop.quit() 3382 3383 def run_test(self, *args): 3384 logger.debug("run_test") 3385 p2p.Find(dbus.Dictionary({'freq': 2422})) 3386 return False 3387 3388 def success(self): 3389 return self.found 3390 3391 with TestDbusP2p(bus) as t: 3392 if not t.success(): 3393 raise Exception("Expected signals not seen") 3394 3395 dev[1].remove_group() 3396 p2p.StopFind() 3397 3398def test_dbus_p2p_service_discovery(dev, apdev): 3399 """D-Bus P2P service discovery""" 3400 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 3401 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 3402 3403 addr0 = dev[0].p2p_dev_addr() 3404 addr1 = dev[1].p2p_dev_addr() 3405 3406 bonjour_query = dbus.ByteArray(binascii.unhexlify('0b5f6166706f766572746370c00c000c01')) 3407 bonjour_response = dbus.ByteArray(binascii.unhexlify('074578616d706c65c027')) 3408 3409 tests = [{'service_type': 'bonjour', 3410 'query': bonjour_query, 3411 'response': bonjour_response}, 3412 {'service_type': 'upnp', 3413 'version': 0x10, 3414 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice', 3415 'query': bonjour_query, 3416 'response': bonjour_response}] 3417 for args in tests: 3418 p2p.AddService(args) 3419 p2p.FlushService() 3420 3421 args = {'service_type': 'bonjour', 3422 'query': bonjour_query, 3423 'response': bonjour_response} 3424 p2p.AddService(args) 3425 3426 try: 3427 p2p.DeleteService(args) 3428 raise Exception("Invalid DeleteService() accepted") 3429 except dbus.exceptions.DBusException as e: 3430 if "InvalidArgs" not in str(e): 3431 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e)) 3432 3433 args = {'service_type': 'bonjour', 3434 'query': bonjour_query} 3435 p2p.DeleteService(args) 3436 try: 3437 p2p.DeleteService(args) 3438 raise Exception("Invalid DeleteService() accepted") 3439 except dbus.exceptions.DBusException as e: 3440 if "InvalidArgs" not in str(e): 3441 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e)) 3442 3443 args = {'service_type': 'upnp', 3444 'version': 0x10, 3445 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice'} 3446 p2p.AddService(args) 3447 p2p.DeleteService(args) 3448 try: 3449 p2p.DeleteService(args) 3450 raise Exception("Invalid DeleteService() accepted") 3451 except dbus.exceptions.DBusException as e: 3452 if "InvalidArgs" not in str(e): 3453 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e)) 3454 3455 tests = [{'service_type': 'foo'}, 3456 {'service_type': 'foo', 'query': bonjour_query}, 3457 {'service_type': 'upnp'}, 3458 {'service_type': 'upnp', 'version': 0x10}, 3459 {'service_type': 'upnp', 3460 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice'}, 3461 {'version': 0x10, 3462 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice'}, 3463 {'service_type': 'upnp', 'foo': 'bar'}, 3464 {'service_type': 'bonjour'}, 3465 {'service_type': 'bonjour', 'query': 'foo'}, 3466 {'service_type': 'bonjour', 'foo': 'bar'}] 3467 for args in tests: 3468 try: 3469 p2p.DeleteService(args) 3470 raise Exception("Invalid DeleteService() accepted") 3471 except dbus.exceptions.DBusException as e: 3472 if "InvalidArgs" not in str(e): 3473 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e)) 3474 3475 tests = [{'service_type': 'foo'}, 3476 {'service_type': 'upnp'}, 3477 {'service_type': 'upnp', 'version': 0x10}, 3478 {'service_type': 'upnp', 3479 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice'}, 3480 {'version': 0x10, 3481 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice'}, 3482 {'service_type': 'upnp', 'foo': 'bar'}, 3483 {'service_type': 'bonjour'}, 3484 {'service_type': 'bonjour', 'query': 'foo'}, 3485 {'service_type': 'bonjour', 'response': 'foo'}, 3486 {'service_type': 'bonjour', 'query': bonjour_query}, 3487 {'service_type': 'bonjour', 'response': bonjour_response}, 3488 {'service_type': 'bonjour', 'query': dbus.ByteArray(500*b'a')}, 3489 {'service_type': 'bonjour', 'foo': 'bar'}] 3490 for args in tests: 3491 try: 3492 p2p.AddService(args) 3493 raise Exception("Invalid AddService() accepted") 3494 except dbus.exceptions.DBusException as e: 3495 if "InvalidArgs" not in str(e): 3496 raise Exception("Unexpected error message for invalid AddService(): " + str(e)) 3497 3498 args = {'tlv': dbus.ByteArray(b"\x02\x00\x00\x01")} 3499 ref = p2p.ServiceDiscoveryRequest(args) 3500 p2p.ServiceDiscoveryCancelRequest(ref) 3501 try: 3502 p2p.ServiceDiscoveryCancelRequest(ref) 3503 raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted") 3504 except dbus.exceptions.DBusException as e: 3505 if "InvalidArgs" not in str(e): 3506 raise Exception("Unexpected error message for invalid AddService(): " + str(e)) 3507 try: 3508 p2p.ServiceDiscoveryCancelRequest(dbus.UInt64(0)) 3509 raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted") 3510 except dbus.exceptions.DBusException as e: 3511 if "InvalidArgs" not in str(e): 3512 raise Exception("Unexpected error message for invalid AddService(): " + str(e)) 3513 3514 tests= [{'service_type': 'upnp', 3515 'version': 0x10, 3516 'service': 'ssdp:foo'}, 3517 {'service_type': 'upnp', 3518 'version': 0x10, 3519 'service': 'ssdp:bar', 3520 'tlv': dbus.ByteArray(b"\x02\x00\x00\x01")}] 3521 for args in tests: 3522 ref = p2p.ServiceDiscoveryRequest(args) 3523 p2p.ServiceDiscoveryCancelRequest(ref) 3524 3525 tests = [{'service_type': 'foo'}, 3526 {'foo': 'bar'}, 3527 {'tlv': 'foo'}, 3528 {}, 3529 {'version': 0}, 3530 {'service_type': 'upnp', 3531 'service': 'ssdp:foo'}, 3532 {'service_type': 'upnp', 3533 'version': 0x10}, 3534 {'service_type': 'upnp', 3535 'version': 0x10, 3536 'service': 'ssdp:foo', 3537 'peer_object': dbus.ObjectPath(path + "/Peers")}, 3538 {'service_type': 'upnp', 3539 'version': 0x10, 3540 'service': 'ssdp:foo', 3541 'peer_object': path + "/Peers"}, 3542 {'service_type': 'upnp', 3543 'version': 0x10, 3544 'service': 'ssdp:foo', 3545 'peer_object': dbus.ObjectPath(path + "/Peers/00112233445566")}] 3546 for args in tests: 3547 try: 3548 p2p.ServiceDiscoveryRequest(args) 3549 raise Exception("Invalid ServiceDiscoveryRequest accepted") 3550 except dbus.exceptions.DBusException as e: 3551 if "InvalidArgs" not in str(e): 3552 raise Exception("Unexpected error message for invalid ServiceDiscoveryRequest(): " + str(e)) 3553 3554 tests = [{'foo': 'bar'}, 3555 {'tlvs': dbus.ByteArray(b"\x02\x00\x00\x01"), 3556 'bar': 'foo'}] 3557 for args in tests: 3558 try: 3559 p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv')) 3560 raise Exception("Invalid ServiceDiscoveryResponse accepted") 3561 except dbus.exceptions.DBusException as e: 3562 if "InvalidArgs" not in str(e): 3563 raise Exception("Unexpected error message for invalid ServiceDiscoveryResponse(): " + str(e)) 3564 3565def test_dbus_p2p_service_discovery_query(dev, apdev): 3566 """D-Bus P2P service discovery query""" 3567 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 3568 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 3569 3570 addr0 = dev[0].p2p_dev_addr() 3571 dev[1].request("P2P_SERVICE_ADD bonjour 0b5f6166706f766572746370c00c000c01 074578616d706c65c027") 3572 dev[1].p2p_listen() 3573 addr1 = dev[1].p2p_dev_addr() 3574 3575 class TestDbusP2p(TestDbus): 3576 def __init__(self, bus): 3577 TestDbus.__init__(self, bus) 3578 self.done = False 3579 3580 def __enter__(self): 3581 gobject.timeout_add(1, self.run_test) 3582 gobject.timeout_add(15000, self.timeout) 3583 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 3584 "DeviceFound") 3585 self.add_signal(self.serviceDiscoveryResponse, 3586 WPAS_DBUS_IFACE_P2PDEVICE, 3587 "ServiceDiscoveryResponse", byte_arrays=True) 3588 self.loop.run() 3589 return self 3590 3591 def deviceFound(self, path): 3592 logger.debug("deviceFound: path=%s" % path) 3593 args = {'peer_object': path, 3594 'tlv': dbus.ByteArray(b"\x02\x00\x00\x01")} 3595 p2p.ServiceDiscoveryRequest(args) 3596 3597 def serviceDiscoveryResponse(self, sd_request): 3598 logger.debug("serviceDiscoveryResponse: sd_request=%s" % str(sd_request)) 3599 self.done = True 3600 self.loop.quit() 3601 3602 def run_test(self, *args): 3603 logger.debug("run_test") 3604 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social', 3605 'Timeout': dbus.Int32(10)})) 3606 return False 3607 3608 def success(self): 3609 return self.done 3610 3611 with TestDbusP2p(bus) as t: 3612 if not t.success(): 3613 raise Exception("Expected signals not seen") 3614 3615 dev[1].p2p_stop_find() 3616 3617def test_dbus_p2p_service_discovery_external(dev, apdev): 3618 """D-Bus P2P service discovery with external response""" 3619 try: 3620 _test_dbus_p2p_service_discovery_external(dev, apdev) 3621 finally: 3622 dev[0].request("P2P_SERV_DISC_EXTERNAL 0") 3623 3624def _test_dbus_p2p_service_discovery_external(dev, apdev): 3625 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 3626 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 3627 3628 addr0 = dev[0].p2p_dev_addr() 3629 addr1 = dev[1].p2p_dev_addr() 3630 resp = "0300000101" 3631 3632 dev[1].request("P2P_FLUSH") 3633 dev[1].request("P2P_SERV_DISC_REQ " + addr0 + " 02000001") 3634 dev[1].p2p_find(social=True) 3635 3636 class TestDbusP2p(TestDbus): 3637 def __init__(self, bus): 3638 TestDbus.__init__(self, bus) 3639 self.sd = False 3640 3641 def __enter__(self): 3642 gobject.timeout_add(1, self.run_test) 3643 gobject.timeout_add(15000, self.timeout) 3644 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 3645 "DeviceFound") 3646 self.add_signal(self.serviceDiscoveryRequest, 3647 WPAS_DBUS_IFACE_P2PDEVICE, 3648 "ServiceDiscoveryRequest") 3649 self.loop.run() 3650 return self 3651 3652 def deviceFound(self, path): 3653 logger.debug("deviceFound: path=%s" % path) 3654 3655 def serviceDiscoveryRequest(self, sd_request): 3656 logger.debug("serviceDiscoveryRequest: sd_request=%s" % str(sd_request)) 3657 self.sd = True 3658 args = {'peer_object': sd_request['peer_object'], 3659 'frequency': sd_request['frequency'], 3660 'dialog_token': sd_request['dialog_token'], 3661 'tlvs': dbus.ByteArray(binascii.unhexlify(resp))} 3662 p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv')) 3663 self.loop.quit() 3664 3665 def run_test(self, *args): 3666 logger.debug("run_test") 3667 p2p.ServiceDiscoveryExternal(1) 3668 p2p.ServiceUpdate() 3669 p2p.Listen(15) 3670 return False 3671 3672 def success(self): 3673 return self.sd 3674 3675 with TestDbusP2p(bus) as t: 3676 if not t.success(): 3677 raise Exception("Expected signals not seen") 3678 3679 ev = dev[1].wait_global_event(["P2P-SERV-DISC-RESP"], timeout=5) 3680 if ev is None: 3681 raise Exception("Service discovery timed out") 3682 if addr0 not in ev: 3683 raise Exception("Unexpected address in SD Response: " + ev) 3684 if ev.split(' ')[4] != resp: 3685 raise Exception("Unexpected response data SD Response: " + ev) 3686 dev[1].p2p_stop_find() 3687 3688 p2p.StopFind() 3689 p2p.ServiceDiscoveryExternal(0) 3690 3691def test_dbus_p2p_autogo(dev, apdev): 3692 """D-Bus P2P autonomous GO""" 3693 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 3694 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 3695 3696 addr0 = dev[0].p2p_dev_addr() 3697 3698 class TestDbusP2p(TestDbus): 3699 def __init__(self, bus): 3700 TestDbus.__init__(self, bus) 3701 self.first = True 3702 self.waiting_end = False 3703 self.exceptions = False 3704 self.deauthorized = False 3705 self.done = False 3706 3707 def __enter__(self): 3708 gobject.timeout_add(1, self.run_test) 3709 gobject.timeout_add(15000, self.timeout) 3710 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 3711 "DeviceFound") 3712 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 3713 "GroupStarted") 3714 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, 3715 "GroupFinished") 3716 self.add_signal(self.persistentGroupAdded, 3717 WPAS_DBUS_IFACE_P2PDEVICE, 3718 "PersistentGroupAdded") 3719 self.add_signal(self.persistentGroupRemoved, 3720 WPAS_DBUS_IFACE_P2PDEVICE, 3721 "PersistentGroupRemoved") 3722 self.add_signal(self.provisionDiscoveryRequestDisplayPin, 3723 WPAS_DBUS_IFACE_P2PDEVICE, 3724 "ProvisionDiscoveryRequestDisplayPin") 3725 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE, 3726 "StaAuthorized") 3727 self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE, 3728 "StaDeauthorized") 3729 self.loop.run() 3730 return self 3731 3732 def groupStarted(self, properties): 3733 logger.debug("groupStarted: " + str(properties)) 3734 self.group = properties['group_object'] 3735 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 3736 properties['interface_object']) 3737 role = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role", 3738 dbus_interface=dbus.PROPERTIES_IFACE) 3739 if role != "GO": 3740 self.exceptions = True 3741 raise Exception("Unexpected role reported: " + role) 3742 group = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group", 3743 dbus_interface=dbus.PROPERTIES_IFACE) 3744 if group != properties['group_object']: 3745 self.exceptions = True 3746 raise Exception("Unexpected Group reported: " + str(group)) 3747 go = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO", 3748 dbus_interface=dbus.PROPERTIES_IFACE) 3749 if go != '/': 3750 self.exceptions = True 3751 raise Exception("Unexpected PeerGO value: " + str(go)) 3752 if self.first: 3753 self.first = False 3754 logger.info("Remove persistent group instance") 3755 group_p2p = dbus.Interface(self.g_if_obj, 3756 WPAS_DBUS_IFACE_P2PDEVICE) 3757 group_p2p.Disconnect() 3758 else: 3759 dev[1].global_request("P2P_CONNECT " + addr0 + " 12345670 join") 3760 3761 def groupFinished(self, properties): 3762 logger.debug("groupFinished: " + str(properties)) 3763 if self.waiting_end: 3764 logger.info("Remove persistent group") 3765 p2p.RemovePersistentGroup(self.persistent) 3766 else: 3767 logger.info("Re-start persistent group") 3768 params = dbus.Dictionary({'persistent_group_object': 3769 self.persistent, 3770 'frequency': 2412}) 3771 p2p.GroupAdd(params) 3772 3773 def persistentGroupAdded(self, path, properties): 3774 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties))) 3775 self.persistent = path 3776 3777 def persistentGroupRemoved(self, path): 3778 logger.debug("persistentGroupRemoved: %s" % path) 3779 self.done = True 3780 self.loop.quit() 3781 3782 def deviceFound(self, path): 3783 logger.debug("deviceFound: path=%s" % path) 3784 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path) 3785 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER, 3786 dbus_interface=dbus.PROPERTIES_IFACE, 3787 byte_arrays=True) 3788 logger.debug('peer properties: ' + str(self.peer)) 3789 3790 def provisionDiscoveryRequestDisplayPin(self, peer_object, pin): 3791 logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin)) 3792 self.peer_path = peer_object 3793 peer = binascii.unhexlify(peer_object.split('/')[-1]) 3794 addr = ':'.join(["%02x" % i for i in struct.unpack('6B', peer)]) 3795 3796 params = {'Role': 'registrar', 3797 'P2PDeviceAddress': self.peer['DeviceAddress'], 3798 'Bssid': self.peer['DeviceAddress'], 3799 'Type': 'pin'} 3800 wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS) 3801 try: 3802 wps.Start(params) 3803 self.exceptions = True 3804 raise Exception("Invalid WPS.Start() accepted") 3805 except dbus.exceptions.DBusException as e: 3806 if "InvalidArgs" not in str(e): 3807 self.exceptions = True 3808 raise Exception("Unexpected error message: " + str(e)) 3809 params = {'Role': 'registrar', 3810 'P2PDeviceAddress': self.peer['DeviceAddress'], 3811 'Type': 'pin', 3812 'Pin': '12345670'} 3813 logger.info("Authorize peer to connect to the group") 3814 wps.Start(params) 3815 3816 def staAuthorized(self, name): 3817 logger.debug("staAuthorized: " + name) 3818 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, self.peer_path) 3819 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER, 3820 dbus_interface=dbus.PROPERTIES_IFACE, 3821 byte_arrays=True) 3822 logger.debug("Peer properties: " + str(res)) 3823 if 'Groups' not in res or len(res['Groups']) != 1: 3824 self.exceptions = True 3825 raise Exception("Unexpected number of peer Groups entries") 3826 if res['Groups'][0] != self.group: 3827 self.exceptions = True 3828 raise Exception("Unexpected peer Groups[0] value") 3829 3830 g_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group) 3831 res = g_obj.GetAll(WPAS_DBUS_GROUP, 3832 dbus_interface=dbus.PROPERTIES_IFACE, 3833 byte_arrays=True) 3834 logger.debug("Group properties: " + str(res)) 3835 if 'Members' not in res or len(res['Members']) != 1: 3836 self.exceptions = True 3837 raise Exception("Unexpected number of group members") 3838 3839 ext = dbus.ByteArray(b"\x11\x22\x33\x44") 3840 # Earlier implementation of this interface was a bit strange. The 3841 # property is defined to have aay signature and that is what the 3842 # getter returned. However, the setter expected there to be a 3843 # dictionary with 'WPSVendorExtensions' as the key surrounding these 3844 # values.. The current implementations maintains support for that 3845 # for backwards compability reasons. Verify that encoding first. 3846 vals = dbus.Dictionary({'WPSVendorExtensions': [ext]}, 3847 signature='sv') 3848 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals, 3849 dbus_interface=dbus.PROPERTIES_IFACE) 3850 res = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions', 3851 dbus_interface=dbus.PROPERTIES_IFACE, 3852 byte_arrays=True) 3853 if len(res) != 1: 3854 self.exceptions = True 3855 raise Exception("Unexpected number of vendor extensions") 3856 if res[0] != ext: 3857 self.exceptions = True 3858 raise Exception("Vendor extension value changed") 3859 3860 # And now verify that the more appropriate encoding is accepted as 3861 # well. 3862 res.append(dbus.ByteArray(b'\xaa\xbb\xcc\xdd\xee\xff')) 3863 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res, 3864 dbus_interface=dbus.PROPERTIES_IFACE) 3865 res2 = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions', 3866 dbus_interface=dbus.PROPERTIES_IFACE, 3867 byte_arrays=True) 3868 if len(res) != 2: 3869 self.exceptions = True 3870 raise Exception("Unexpected number of vendor extensions") 3871 if res[0] != res2[0] or res[1] != res2[1]: 3872 self.exceptions = True 3873 raise Exception("Vendor extension value changed") 3874 3875 for i in range(10): 3876 res.append(dbus.ByteArray(b'\xaa\xbb')) 3877 try: 3878 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res, 3879 dbus_interface=dbus.PROPERTIES_IFACE) 3880 self.exceptions = True 3881 raise Exception("Invalid Set(WPSVendorExtensions) accepted") 3882 except dbus.exceptions.DBusException as e: 3883 if "Error.Failed" not in str(e): 3884 self.exceptions = True 3885 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e)) 3886 3887 vals = dbus.Dictionary({'Foo': [ext]}, signature='sv') 3888 try: 3889 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals, 3890 dbus_interface=dbus.PROPERTIES_IFACE) 3891 self.exceptions = True 3892 raise Exception("Invalid Set(WPSVendorExtensions) accepted") 3893 except dbus.exceptions.DBusException as e: 3894 if "InvalidArgs" not in str(e): 3895 self.exceptions = True 3896 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e)) 3897 3898 vals = ["foo"] 3899 try: 3900 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals, 3901 dbus_interface=dbus.PROPERTIES_IFACE) 3902 self.exceptions = True 3903 raise Exception("Invalid Set(WPSVendorExtensions) accepted") 3904 except dbus.exceptions.DBusException as e: 3905 if "Error.Failed" not in str(e): 3906 self.exceptions = True 3907 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e)) 3908 3909 vals = [["foo"]] 3910 try: 3911 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals, 3912 dbus_interface=dbus.PROPERTIES_IFACE) 3913 self.exceptions = True 3914 raise Exception("Invalid Set(WPSVendorExtensions) accepted") 3915 except dbus.exceptions.DBusException as e: 3916 if "Error.Failed" not in str(e): 3917 self.exceptions = True 3918 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e)) 3919 3920 p2p.RemoveClient({'peer': self.peer_path}) 3921 3922 self.waiting_end = True 3923 3924 # wait for client to be fully connected 3925 dev[1].wait_connected() 3926 # so we can cleanly disconnect it now 3927 group_p2p = dbus.Interface(self.g_if_obj, 3928 WPAS_DBUS_IFACE_P2PDEVICE) 3929 group_p2p.Disconnect() 3930 3931 def staDeauthorized(self, name): 3932 logger.debug("staDeauthorized: " + name) 3933 self.deauthorized = True 3934 3935 def run_test(self, *args): 3936 logger.debug("run_test") 3937 params = dbus.Dictionary({'persistent': True, 3938 'frequency': 2412}) 3939 logger.info("Add a persistent group") 3940 p2p.GroupAdd(params) 3941 return False 3942 3943 def success(self): 3944 return self.done and self.deauthorized and not self.exceptions 3945 3946 with TestDbusP2p(bus) as t: 3947 if not t.success(): 3948 raise Exception("Expected signals not seen") 3949 3950 dev[1].wait_go_ending_session() 3951 3952def test_dbus_p2p_autogo_pbc(dev, apdev): 3953 """D-Bus P2P autonomous GO and PBC""" 3954 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 3955 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 3956 3957 addr0 = dev[0].p2p_dev_addr() 3958 3959 class TestDbusP2p(TestDbus): 3960 def __init__(self, bus): 3961 TestDbus.__init__(self, bus) 3962 self.first = True 3963 self.waiting_end = False 3964 self.done = False 3965 3966 def __enter__(self): 3967 gobject.timeout_add(1, self.run_test) 3968 gobject.timeout_add(15000, self.timeout) 3969 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 3970 "DeviceFound") 3971 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 3972 "GroupStarted") 3973 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, 3974 "GroupFinished") 3975 self.add_signal(self.provisionDiscoveryPBCRequest, 3976 WPAS_DBUS_IFACE_P2PDEVICE, 3977 "ProvisionDiscoveryPBCRequest") 3978 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE, 3979 "StaAuthorized") 3980 self.loop.run() 3981 return self 3982 3983 def groupStarted(self, properties): 3984 logger.debug("groupStarted: " + str(properties)) 3985 self.group = properties['group_object'] 3986 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 3987 properties['interface_object']) 3988 dev[1].global_request("P2P_CONNECT " + addr0 + " pbc join") 3989 3990 def groupFinished(self, properties): 3991 logger.debug("groupFinished: " + str(properties)) 3992 self.done = True 3993 self.loop.quit() 3994 3995 def deviceFound(self, path): 3996 logger.debug("deviceFound: path=%s" % path) 3997 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path) 3998 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER, 3999 dbus_interface=dbus.PROPERTIES_IFACE, 4000 byte_arrays=True) 4001 logger.debug('peer properties: ' + str(self.peer)) 4002 4003 def provisionDiscoveryPBCRequest(self, peer_object): 4004 logger.debug("provisionDiscoveryPBCRequest - peer=%s" % peer_object) 4005 self.peer_path = peer_object 4006 peer = binascii.unhexlify(peer_object.split('/')[-1]) 4007 addr = ':'.join(["%02x" % i for i in struct.unpack('6B', peer)]) 4008 params = {'Role': 'registrar', 4009 'P2PDeviceAddress': self.peer['DeviceAddress'], 4010 'Type': 'pbc'} 4011 logger.info("Authorize peer to connect to the group") 4012 wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS) 4013 wps.Start(params) 4014 4015 def staAuthorized(self, name): 4016 logger.debug("staAuthorized: " + name) 4017 # wait for client to be fully connected 4018 dev[1].wait_connected() 4019 # so we can cleanly disconnect it now 4020 group_p2p = dbus.Interface(self.g_if_obj, 4021 WPAS_DBUS_IFACE_P2PDEVICE) 4022 group_p2p.Disconnect() 4023 4024 def run_test(self, *args): 4025 logger.debug("run_test") 4026 params = dbus.Dictionary({'frequency': 2412}) 4027 p2p.GroupAdd(params) 4028 return False 4029 4030 def success(self): 4031 return self.done 4032 4033 with TestDbusP2p(bus) as t: 4034 if not t.success(): 4035 raise Exception("Expected signals not seen") 4036 4037 dev[1].wait_go_ending_session() 4038 dev[1].flush_scan_cache() 4039 4040def test_dbus_p2p_autogo_legacy(dev, apdev): 4041 """D-Bus P2P autonomous GO and legacy STA""" 4042 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 4043 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4044 4045 addr0 = dev[0].p2p_dev_addr() 4046 4047 class TestDbusP2p(TestDbus): 4048 def __init__(self, bus): 4049 TestDbus.__init__(self, bus) 4050 self.done = False 4051 4052 def __enter__(self): 4053 gobject.timeout_add(1, self.run_test) 4054 gobject.timeout_add(15000, self.timeout) 4055 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 4056 "GroupStarted") 4057 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, 4058 "GroupFinished") 4059 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE, 4060 "StaAuthorized") 4061 self.loop.run() 4062 return self 4063 4064 def groupStarted(self, properties): 4065 logger.debug("groupStarted: " + str(properties)) 4066 g_obj = bus.get_object(WPAS_DBUS_SERVICE, 4067 properties['group_object']) 4068 res = g_obj.GetAll(WPAS_DBUS_GROUP, 4069 dbus_interface=dbus.PROPERTIES_IFACE, 4070 byte_arrays=True) 4071 bssid = ':'.join(["%02x" % i for i in struct.unpack('6B', res['BSSID'])]) 4072 4073 pin = '12345670' 4074 params = {'Role': 'enrollee', 4075 'Type': 'pin', 4076 'Pin': pin} 4077 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 4078 properties['interface_object']) 4079 wps = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_WPS) 4080 wps.Start(params) 4081 dev[1].scan_for_bss(bssid, freq=2412) 4082 dev[1].request("WPS_PIN " + bssid + " " + pin) 4083 self.group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4084 4085 def groupFinished(self, properties): 4086 logger.debug("groupFinished: " + str(properties)) 4087 self.done = True 4088 self.loop.quit() 4089 4090 def staAuthorized(self, name): 4091 logger.debug("staAuthorized: " + name) 4092 dev[1].request("DISCONNECT") 4093 self.group_p2p.Disconnect() 4094 4095 def run_test(self, *args): 4096 logger.debug("run_test") 4097 params = dbus.Dictionary({'frequency': 2412}) 4098 p2p.GroupAdd(params) 4099 return False 4100 4101 def success(self): 4102 return self.done 4103 4104 with TestDbusP2p(bus) as t: 4105 if not t.success(): 4106 raise Exception("Expected signals not seen") 4107 4108def test_dbus_p2p_join(dev, apdev): 4109 """D-Bus P2P join an autonomous GO""" 4110 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 4111 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4112 4113 addr1 = dev[1].p2p_dev_addr() 4114 addr2 = dev[2].p2p_dev_addr() 4115 dev[1].p2p_start_go(freq=2412) 4116 dev[2].p2p_listen() 4117 4118 class TestDbusP2p(TestDbus): 4119 def __init__(self, bus): 4120 TestDbus.__init__(self, bus) 4121 self.done = False 4122 self.peer = None 4123 self.go = None 4124 4125 def __enter__(self): 4126 gobject.timeout_add(1, self.run_test) 4127 gobject.timeout_add(15000, self.timeout) 4128 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 4129 "DeviceFound") 4130 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 4131 "GroupStarted") 4132 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, 4133 "GroupFinished") 4134 self.add_signal(self.invitationResult, WPAS_DBUS_IFACE_P2PDEVICE, 4135 "InvitationResult") 4136 self.loop.run() 4137 return self 4138 4139 def deviceFound(self, path): 4140 logger.debug("deviceFound: path=%s" % path) 4141 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path) 4142 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER, 4143 dbus_interface=dbus.PROPERTIES_IFACE, 4144 byte_arrays=True) 4145 logger.debug('peer properties: ' + str(res)) 4146 if addr2.replace(':', '') in path: 4147 self.peer = path 4148 elif addr1.replace(':', '') in path: 4149 self.go = path 4150 if self.peer and self.go: 4151 logger.info("Join the group") 4152 p2p.StopFind() 4153 args = {'peer': self.go, 4154 'join': True, 4155 'wps_method': 'pin', 4156 'frequency': 2412} 4157 pin = p2p.Connect(args) 4158 4159 dev[1].group_request("WPS_PIN any " + pin) 4160 4161 def groupStarted(self, properties): 4162 logger.debug("groupStarted: " + str(properties)) 4163 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 4164 properties['interface_object']) 4165 role = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role", 4166 dbus_interface=dbus.PROPERTIES_IFACE) 4167 if role != "client": 4168 raise Exception("Unexpected role reported: " + role) 4169 group = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group", 4170 dbus_interface=dbus.PROPERTIES_IFACE) 4171 if group != properties['group_object']: 4172 raise Exception("Unexpected Group reported: " + str(group)) 4173 go = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO", 4174 dbus_interface=dbus.PROPERTIES_IFACE) 4175 if go != self.go: 4176 raise Exception("Unexpected PeerGO value: " + str(go)) 4177 4178 g_obj = bus.get_object(WPAS_DBUS_SERVICE, 4179 properties['group_object']) 4180 res = g_obj.GetAll(WPAS_DBUS_GROUP, 4181 dbus_interface=dbus.PROPERTIES_IFACE, 4182 byte_arrays=True) 4183 logger.debug("Group properties: " + str(res)) 4184 4185 ext = dbus.ByteArray(b"\x11\x22\x33\x44") 4186 try: 4187 # Set(WPSVendorExtensions) not allowed for P2P Client 4188 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res, 4189 dbus_interface=dbus.PROPERTIES_IFACE) 4190 raise Exception("Invalid Set(WPSVendorExtensions) accepted") 4191 except dbus.exceptions.DBusException as e: 4192 if "Error.Failed: Failed to set property" not in str(e): 4193 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e)) 4194 4195 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4196 args = {'duration1': 30000, 'interval1': 102400, 4197 'duration2': 20000, 'interval2': 102400} 4198 group_p2p.PresenceRequest(args) 4199 4200 args = {'peer': self.peer} 4201 group_p2p.Invite(args) 4202 4203 def groupFinished(self, properties): 4204 logger.debug("groupFinished: " + str(properties)) 4205 self.done = True 4206 self.loop.quit() 4207 4208 def invitationResult(self, result): 4209 logger.debug("invitationResult: " + str(result)) 4210 if result['status'] != 1: 4211 raise Exception("Unexpected invitation result: " + str(result)) 4212 dev[1].remove_group() 4213 4214 def run_test(self, *args): 4215 logger.debug("run_test") 4216 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'})) 4217 return False 4218 4219 def success(self): 4220 return self.done 4221 4222 with TestDbusP2p(bus) as t: 4223 if not t.success(): 4224 raise Exception("Expected signals not seen") 4225 4226 dev[2].p2p_stop_find() 4227 4228def test_dbus_p2p_invitation_received(dev, apdev): 4229 """D-Bus P2P and InvitationReceived""" 4230 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 4231 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4232 4233 form(dev[0], dev[1]) 4234 addr0 = dev[0].p2p_dev_addr() 4235 dev[0].p2p_listen() 4236 dev[0].global_request("SET persistent_reconnect 0") 4237 4238 if not dev[1].discover_peer(addr0, social=True): 4239 raise Exception("Peer " + addr0 + " not found") 4240 peer = dev[1].get_peer(addr0) 4241 4242 class TestDbusP2p(TestDbus): 4243 def __init__(self, bus): 4244 TestDbus.__init__(self, bus) 4245 self.done = False 4246 4247 def __enter__(self): 4248 gobject.timeout_add(1, self.run_test) 4249 gobject.timeout_add(15000, self.timeout) 4250 self.add_signal(self.invitationReceived, WPAS_DBUS_IFACE_P2PDEVICE, 4251 "InvitationReceived") 4252 self.loop.run() 4253 return self 4254 4255 def invitationReceived(self, result): 4256 logger.debug("invitationReceived: " + str(result)) 4257 self.done = True 4258 self.loop.quit() 4259 4260 def run_test(self, *args): 4261 logger.debug("run_test") 4262 cmd = "P2P_INVITE persistent=" + peer['persistent'] + " peer=" + addr0 4263 dev[1].global_request(cmd) 4264 return False 4265 4266 def success(self): 4267 return self.done 4268 4269 with TestDbusP2p(bus) as t: 4270 if not t.success(): 4271 raise Exception("Expected signals not seen") 4272 4273 dev[0].p2p_stop_find() 4274 dev[1].p2p_stop_find() 4275 4276def test_dbus_p2p_config(dev, apdev): 4277 """D-Bus Get/Set P2PDeviceConfig""" 4278 try: 4279 _test_dbus_p2p_config(dev, apdev) 4280 finally: 4281 dev[0].request("P2P_SET ssid_postfix ") 4282 4283def _test_dbus_p2p_config(dev, apdev): 4284 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 4285 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4286 4287 res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", 4288 dbus_interface=dbus.PROPERTIES_IFACE, 4289 byte_arrays=True) 4290 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", res, 4291 dbus_interface=dbus.PROPERTIES_IFACE) 4292 res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", 4293 dbus_interface=dbus.PROPERTIES_IFACE, 4294 byte_arrays=True) 4295 4296 if len(res) != len(res2): 4297 raise Exception("Different number of parameters") 4298 for k in res: 4299 if res[k] != res2[k]: 4300 raise Exception("Parameter %s value changes" % k) 4301 4302 changes = {'SsidPostfix': 'foo', 4303 'VendorExtension': [dbus.ByteArray(b'\x11\x22\x33\x44')], 4304 'SecondaryDeviceTypes': [dbus.ByteArray(b'\x11\x22\x33\x44\x55\x66\x77\x88')]} 4305 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", 4306 dbus.Dictionary(changes, signature='sv'), 4307 dbus_interface=dbus.PROPERTIES_IFACE) 4308 4309 res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", 4310 dbus_interface=dbus.PROPERTIES_IFACE, 4311 byte_arrays=True) 4312 logger.debug("P2PDeviceConfig: " + str(res2)) 4313 if 'VendorExtension' not in res2 or len(res2['VendorExtension']) != 1: 4314 raise Exception("VendorExtension does not match") 4315 if 'SecondaryDeviceTypes' not in res2 or len(res2['SecondaryDeviceTypes']) != 1: 4316 raise Exception("SecondaryDeviceType does not match") 4317 4318 changes = {'SsidPostfix': '', 4319 'VendorExtension': dbus.Array([], signature="ay"), 4320 'SecondaryDeviceTypes': dbus.Array([], signature="ay")} 4321 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", 4322 dbus.Dictionary(changes, signature='sv'), 4323 dbus_interface=dbus.PROPERTIES_IFACE) 4324 4325 res3 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", 4326 dbus_interface=dbus.PROPERTIES_IFACE, 4327 byte_arrays=True) 4328 logger.debug("P2PDeviceConfig: " + str(res3)) 4329 if 'VendorExtension' in res3: 4330 raise Exception("VendorExtension not removed") 4331 if 'SecondaryDeviceTypes' in res3: 4332 raise Exception("SecondaryDeviceType not removed") 4333 4334 try: 4335 dev[0].request("P2P_SET disabled 1") 4336 if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", 4337 dbus_interface=dbus.PROPERTIES_IFACE, 4338 byte_arrays=True) 4339 raise Exception("Invalid Get(P2PDeviceConfig) accepted") 4340 except dbus.exceptions.DBusException as e: 4341 if "Error.Failed: P2P is not available for this interface" not in str(e): 4342 raise Exception("Unexpected error message for invalid Invite: " + str(e)) 4343 finally: 4344 dev[0].request("P2P_SET disabled 0") 4345 4346 try: 4347 dev[0].request("P2P_SET disabled 1") 4348 changes = {'SsidPostfix': 'foo'} 4349 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", 4350 dbus.Dictionary(changes, signature='sv'), 4351 dbus_interface=dbus.PROPERTIES_IFACE) 4352 raise Exception("Invalid Set(P2PDeviceConfig) accepted") 4353 except dbus.exceptions.DBusException as e: 4354 if "Error.Failed: P2P is not available for this interface" not in str(e): 4355 raise Exception("Unexpected error message for invalid Invite: " + str(e)) 4356 finally: 4357 dev[0].request("P2P_SET disabled 0") 4358 4359 tests = [{'DeviceName': 123}, 4360 {'SsidPostfix': 123}, 4361 {'Foo': 'Bar'}] 4362 for changes in tests: 4363 try: 4364 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", 4365 dbus.Dictionary(changes, signature='sv'), 4366 dbus_interface=dbus.PROPERTIES_IFACE) 4367 raise Exception("Invalid Set(P2PDeviceConfig) accepted") 4368 except dbus.exceptions.DBusException as e: 4369 if "InvalidArgs" not in str(e): 4370 raise Exception("Unexpected error message for invalid Invite: " + str(e)) 4371 4372def test_dbus_p2p_persistent(dev, apdev): 4373 """D-Bus P2P persistent group""" 4374 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 4375 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4376 4377 class TestDbusP2p(TestDbus): 4378 def __init__(self, bus): 4379 TestDbus.__init__(self, bus) 4380 4381 def __enter__(self): 4382 gobject.timeout_add(1, self.run_test) 4383 gobject.timeout_add(15000, self.timeout) 4384 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 4385 "GroupStarted") 4386 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, 4387 "GroupFinished") 4388 self.add_signal(self.persistentGroupAdded, 4389 WPAS_DBUS_IFACE_P2PDEVICE, 4390 "PersistentGroupAdded") 4391 self.loop.run() 4392 return self 4393 4394 def groupStarted(self, properties): 4395 logger.debug("groupStarted: " + str(properties)) 4396 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 4397 properties['interface_object']) 4398 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4399 group_p2p.Disconnect() 4400 4401 def groupFinished(self, properties): 4402 logger.debug("groupFinished: " + str(properties)) 4403 self.loop.quit() 4404 4405 def persistentGroupAdded(self, path, properties): 4406 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties))) 4407 self.persistent = path 4408 4409 def run_test(self, *args): 4410 logger.debug("run_test") 4411 params = dbus.Dictionary({'persistent': True, 4412 'frequency': 2412}) 4413 logger.info("Add a persistent group") 4414 p2p.GroupAdd(params) 4415 return False 4416 4417 def success(self): 4418 return True 4419 4420 with TestDbusP2p(bus) as t: 4421 if not t.success(): 4422 raise Exception("Expected signals not seen") 4423 persistent = t.persistent 4424 4425 p_obj = bus.get_object(WPAS_DBUS_SERVICE, persistent) 4426 res = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties", 4427 dbus_interface=dbus.PROPERTIES_IFACE, byte_arrays=True) 4428 logger.info("Persistent group Properties: " + str(res)) 4429 vals = dbus.Dictionary({'ssid': 'DIRECT-foo'}, signature='sv') 4430 p_obj.Set(WPAS_DBUS_PERSISTENT_GROUP, "Properties", vals, 4431 dbus_interface=dbus.PROPERTIES_IFACE) 4432 res2 = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties", 4433 dbus_interface=dbus.PROPERTIES_IFACE) 4434 if len(res) != len(res2): 4435 raise Exception("Different number of parameters") 4436 for k in res: 4437 if k != 'ssid' and res[k] != res2[k]: 4438 raise Exception("Parameter %s value changes" % k) 4439 if res2['ssid'] != '"DIRECT-foo"': 4440 raise Exception("Unexpected ssid") 4441 4442 args = dbus.Dictionary({'ssid': 'DIRECT-testing', 4443 'psk': '1234567890'}, signature='sv') 4444 group = p2p.AddPersistentGroup(args) 4445 4446 groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups", 4447 dbus_interface=dbus.PROPERTIES_IFACE) 4448 if len(groups) != 2: 4449 raise Exception("Unexpected number of persistent groups: " + str(groups)) 4450 4451 p2p.RemoveAllPersistentGroups() 4452 4453 groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups", 4454 dbus_interface=dbus.PROPERTIES_IFACE) 4455 if len(groups) != 0: 4456 raise Exception("Unexpected number of persistent groups: " + str(groups)) 4457 4458 try: 4459 p2p.RemovePersistentGroup(persistent) 4460 raise Exception("Invalid RemovePersistentGroup accepted") 4461 except dbus.exceptions.DBusException as e: 4462 if "NetworkUnknown: There is no such persistent group" not in str(e): 4463 raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e)) 4464 4465def test_dbus_p2p_reinvoke_persistent(dev, apdev): 4466 """D-Bus P2P reinvoke persistent group""" 4467 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 4468 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4469 4470 addr0 = dev[0].p2p_dev_addr() 4471 4472 class TestDbusP2p(TestDbus): 4473 def __init__(self, bus): 4474 TestDbus.__init__(self, bus) 4475 self.first = True 4476 self.waiting_end = False 4477 self.done = False 4478 self.invited = False 4479 4480 def __enter__(self): 4481 gobject.timeout_add(1, self.run_test) 4482 gobject.timeout_add(15000, self.timeout) 4483 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 4484 "DeviceFound") 4485 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 4486 "GroupStarted") 4487 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, 4488 "GroupFinished") 4489 self.add_signal(self.persistentGroupAdded, 4490 WPAS_DBUS_IFACE_P2PDEVICE, 4491 "PersistentGroupAdded") 4492 self.add_signal(self.provisionDiscoveryRequestDisplayPin, 4493 WPAS_DBUS_IFACE_P2PDEVICE, 4494 "ProvisionDiscoveryRequestDisplayPin") 4495 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE, 4496 "StaAuthorized") 4497 self.loop.run() 4498 return self 4499 4500 def groupStarted(self, properties): 4501 logger.debug("groupStarted: " + str(properties)) 4502 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 4503 properties['interface_object']) 4504 if not self.invited: 4505 g_obj = bus.get_object(WPAS_DBUS_SERVICE, 4506 properties['group_object']) 4507 res = g_obj.GetAll(WPAS_DBUS_GROUP, 4508 dbus_interface=dbus.PROPERTIES_IFACE, 4509 byte_arrays=True) 4510 bssid = ':'.join(["%02x" % i for i in struct.unpack('6B', res['BSSID'])]) 4511 dev[1].scan_for_bss(bssid, freq=2412) 4512 dev[1].global_request("P2P_CONNECT " + addr0 + " 12345670 join") 4513 4514 def groupFinished(self, properties): 4515 logger.debug("groupFinished: " + str(properties)) 4516 if self.invited: 4517 self.done = True 4518 self.loop.quit() 4519 else: 4520 dev[1].global_request("SET persistent_reconnect 1") 4521 dev[1].p2p_listen() 4522 4523 args = {'persistent_group_object': dbus.ObjectPath(path), 4524 'peer': self.peer_path} 4525 try: 4526 pin = p2p.Invite(args) 4527 raise Exception("Invalid Invite accepted") 4528 except dbus.exceptions.DBusException as e: 4529 if "InvalidArgs" not in str(e): 4530 raise Exception("Unexpected error message for invalid Invite: " + str(e)) 4531 4532 args = {'persistent_group_object': self.persistent, 4533 'peer': self.peer_path} 4534 pin = p2p.Invite(args) 4535 self.invited = True 4536 4537 self.sta_group_ev = dev[1].wait_global_event(["P2P-GROUP-STARTED"], 4538 timeout=15) 4539 if self.sta_group_ev is None: 4540 raise Exception("P2P-GROUP-STARTED event not seen") 4541 4542 def persistentGroupAdded(self, path, properties): 4543 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties))) 4544 self.persistent = path 4545 4546 def deviceFound(self, path): 4547 logger.debug("deviceFound: path=%s" % path) 4548 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path) 4549 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER, 4550 dbus_interface=dbus.PROPERTIES_IFACE, 4551 byte_arrays=True) 4552 4553 def provisionDiscoveryRequestDisplayPin(self, peer_object, pin): 4554 logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin)) 4555 self.peer_path = peer_object 4556 peer = binascii.unhexlify(peer_object.split('/')[-1]) 4557 addr = ':'.join(["%02x" % i for i in struct.unpack('6B', peer)]) 4558 params = {'Role': 'registrar', 4559 'P2PDeviceAddress': self.peer['DeviceAddress'], 4560 'Bssid': self.peer['DeviceAddress'], 4561 'Type': 'pin', 4562 'Pin': '12345670'} 4563 logger.info("Authorize peer to connect to the group") 4564 wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS) 4565 wps.Start(params) 4566 self.sta_group_ev = dev[1].wait_global_event(["P2P-GROUP-STARTED"], 4567 timeout=15) 4568 if self.sta_group_ev is None: 4569 raise Exception("P2P-GROUP-STARTED event not seen") 4570 4571 def staAuthorized(self, name): 4572 logger.debug("staAuthorized: " + name) 4573 dev[1].group_form_result(self.sta_group_ev) 4574 dev[1].remove_group() 4575 ev = dev[1].wait_global_event(["P2P-GROUP-REMOVED"], timeout=10) 4576 if ev is None: 4577 raise Exception("Group removal timed out") 4578 group_p2p = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4579 group_p2p.Disconnect() 4580 4581 def run_test(self, *args): 4582 logger.debug("run_test") 4583 params = dbus.Dictionary({'persistent': True, 4584 'frequency': 2412}) 4585 logger.info("Add a persistent group") 4586 p2p.GroupAdd(params) 4587 return False 4588 4589 def success(self): 4590 return self.done 4591 4592 with TestDbusP2p(bus) as t: 4593 if not t.success(): 4594 raise Exception("Expected signals not seen") 4595 4596def test_dbus_p2p_go_neg_rx(dev, apdev): 4597 """D-Bus P2P GO Negotiation receive""" 4598 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 4599 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4600 addr0 = dev[0].p2p_dev_addr() 4601 4602 class TestDbusP2p(TestDbus): 4603 def __init__(self, bus): 4604 TestDbus.__init__(self, bus) 4605 self.done = False 4606 4607 def __enter__(self): 4608 gobject.timeout_add(1, self.run_test) 4609 gobject.timeout_add(15000, self.timeout) 4610 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 4611 "DeviceFound") 4612 self.add_signal(self.goNegotiationRequest, 4613 WPAS_DBUS_IFACE_P2PDEVICE, 4614 "GONegotiationRequest", 4615 byte_arrays=True) 4616 self.add_signal(self.goNegotiationSuccess, 4617 WPAS_DBUS_IFACE_P2PDEVICE, 4618 "GONegotiationSuccess", 4619 byte_arrays=True) 4620 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 4621 "GroupStarted") 4622 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, 4623 "GroupFinished") 4624 self.loop.run() 4625 return self 4626 4627 def deviceFound(self, path): 4628 logger.debug("deviceFound: path=%s" % path) 4629 4630 def goNegotiationRequest(self, path, dev_passwd_id, go_intent=0): 4631 logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path, dev_passwd_id, go_intent)) 4632 if dev_passwd_id != 1: 4633 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id) 4634 args = {'peer': path, 'wps_method': 'display', 'pin': '12345670', 4635 'go_intent': 15, 'persistent': False, 'frequency': 5175} 4636 try: 4637 p2p.Connect(args) 4638 raise Exception("Invalid Connect accepted") 4639 except dbus.exceptions.DBusException as e: 4640 if "ConnectChannelUnsupported" not in str(e): 4641 raise Exception("Unexpected error message for invalid Connect: " + str(e)) 4642 4643 args = {'peer': path, 'wps_method': 'display', 'pin': '12345670', 4644 'go_intent': 15, 'persistent': False} 4645 p2p.Connect(args) 4646 4647 def goNegotiationSuccess(self, properties): 4648 logger.debug("goNegotiationSuccess: properties=%s" % str(properties)) 4649 4650 def groupStarted(self, properties): 4651 logger.debug("groupStarted: " + str(properties)) 4652 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 4653 properties['interface_object']) 4654 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4655 group_p2p.Disconnect() 4656 4657 def groupFinished(self, properties): 4658 logger.debug("groupFinished: " + str(properties)) 4659 self.done = True 4660 self.loop.quit() 4661 4662 def run_test(self, *args): 4663 logger.debug("run_test") 4664 p2p.Listen(10) 4665 if not dev[1].discover_peer(addr0): 4666 raise Exception("Peer not found") 4667 dev[1].global_request("P2P_CONNECT " + addr0 + " 12345670 enter") 4668 return False 4669 4670 def success(self): 4671 return self.done 4672 4673 with TestDbusP2p(bus) as t: 4674 if not t.success(): 4675 raise Exception("Expected signals not seen") 4676 4677def test_dbus_p2p_go_neg_auth(dev, apdev): 4678 """D-Bus P2P GO Negotiation authorized""" 4679 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 4680 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4681 addr0 = dev[0].p2p_dev_addr() 4682 dev[1].p2p_listen() 4683 4684 class TestDbusP2p(TestDbus): 4685 def __init__(self, bus): 4686 TestDbus.__init__(self, bus) 4687 self.done = False 4688 self.peer_joined = False 4689 self.peer_disconnected = False 4690 4691 def __enter__(self): 4692 gobject.timeout_add(1, self.run_test) 4693 gobject.timeout_add(15000, self.timeout) 4694 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 4695 "DeviceFound") 4696 self.add_signal(self.goNegotiationSuccess, 4697 WPAS_DBUS_IFACE_P2PDEVICE, 4698 "GONegotiationSuccess", 4699 byte_arrays=True) 4700 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 4701 "GroupStarted") 4702 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, 4703 "GroupFinished") 4704 self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE, 4705 "StaDeauthorized") 4706 self.add_signal(self.peerJoined, WPAS_DBUS_GROUP, 4707 "PeerJoined") 4708 self.add_signal(self.peerDisconnected, WPAS_DBUS_GROUP, 4709 "PeerDisconnected") 4710 self.loop.run() 4711 return self 4712 4713 def deviceFound(self, path): 4714 logger.debug("deviceFound: path=%s" % path) 4715 args = {'peer': path, 'wps_method': 'keypad', 4716 'go_intent': 15, 'authorize_only': True} 4717 try: 4718 p2p.Connect(args) 4719 raise Exception("Invalid Connect accepted") 4720 except dbus.exceptions.DBusException as e: 4721 if "InvalidArgs" not in str(e): 4722 raise Exception("Unexpected error message for invalid Connect: " + str(e)) 4723 4724 args = {'peer': path, 'wps_method': 'keypad', 'pin': '12345670', 4725 'go_intent': 15, 'authorize_only': True} 4726 p2p.Connect(args) 4727 p2p.Listen(10) 4728 if not dev[1].discover_peer(addr0): 4729 raise Exception("Peer not found") 4730 dev[1].global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=0") 4731 ev = dev[1].wait_global_event(["P2P-GROUP-STARTED"], timeout=15) 4732 if ev is None: 4733 raise Exception("Group formation timed out") 4734 self.sta_group_ev = ev 4735 4736 def goNegotiationSuccess(self, properties): 4737 logger.debug("goNegotiationSuccess: properties=%s" % str(properties)) 4738 4739 def groupStarted(self, properties): 4740 logger.debug("groupStarted: " + str(properties)) 4741 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 4742 properties['interface_object']) 4743 dev[1].group_form_result(self.sta_group_ev) 4744 dev[1].remove_group() 4745 4746 def staDeauthorized(self, name): 4747 logger.debug("staDeuthorized: " + name) 4748 group_p2p = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4749 group_p2p.Disconnect() 4750 4751 def peerJoined(self, peer): 4752 logger.debug("peerJoined: " + peer) 4753 self.peer_joined = True 4754 4755 def peerDisconnected(self, peer): 4756 logger.debug("peerDisconnected: " + peer) 4757 self.peer_disconnected = True 4758 4759 def groupFinished(self, properties): 4760 logger.debug("groupFinished: " + str(properties)) 4761 self.done = True 4762 self.loop.quit() 4763 4764 def run_test(self, *args): 4765 logger.debug("run_test") 4766 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'})) 4767 return False 4768 4769 def success(self): 4770 return self.done and self.peer_joined and self.peer_disconnected 4771 4772 with TestDbusP2p(bus) as t: 4773 if not t.success(): 4774 raise Exception("Expected signals not seen") 4775 4776def test_dbus_p2p_go_neg_init(dev, apdev): 4777 """D-Bus P2P GO Negotiation initiation""" 4778 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 4779 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4780 addr0 = dev[0].p2p_dev_addr() 4781 dev[1].p2p_listen() 4782 4783 class TestDbusP2p(TestDbus): 4784 def __init__(self, bus): 4785 TestDbus.__init__(self, bus) 4786 self.done = False 4787 self.peer_group_added = False 4788 self.peer_group_removed = False 4789 4790 def __enter__(self): 4791 gobject.timeout_add(1, self.run_test) 4792 gobject.timeout_add(15000, self.timeout) 4793 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 4794 "DeviceFound") 4795 self.add_signal(self.goNegotiationSuccess, 4796 WPAS_DBUS_IFACE_P2PDEVICE, 4797 "GONegotiationSuccess", 4798 byte_arrays=True) 4799 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 4800 "GroupStarted") 4801 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, 4802 "GroupFinished") 4803 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE, 4804 "PropertiesChanged") 4805 self.loop.run() 4806 return self 4807 4808 def deviceFound(self, path): 4809 logger.debug("deviceFound: path=%s" % path) 4810 args = {'peer': path, 'wps_method': 'keypad', 'pin': '12345670', 4811 'go_intent': 0} 4812 p2p.Connect(args) 4813 4814 ev = dev[1].wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15) 4815 if ev is None: 4816 raise Exception("Timeout while waiting for GO Neg Request") 4817 dev[1].global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15") 4818 ev = dev[1].wait_global_event(["P2P-GROUP-STARTED"], timeout=15) 4819 if ev is None: 4820 raise Exception("Group formation timed out") 4821 self.sta_group_ev = ev 4822 4823 def goNegotiationSuccess(self, properties): 4824 logger.debug("goNegotiationSuccess: properties=%s" % str(properties)) 4825 4826 def groupStarted(self, properties): 4827 logger.debug("groupStarted: " + str(properties)) 4828 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 4829 properties['interface_object']) 4830 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4831 group_p2p.Disconnect() 4832 dev[1].group_form_result(self.sta_group_ev) 4833 dev[1].remove_group() 4834 4835 def groupFinished(self, properties): 4836 logger.debug("groupFinished: " + str(properties)) 4837 self.done = True 4838 4839 def propertiesChanged(self, interface_name, changed_properties, 4840 invalidated_properties): 4841 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties))) 4842 if interface_name != WPAS_DBUS_P2P_PEER: 4843 return 4844 if "Groups" not in changed_properties: 4845 return 4846 if len(changed_properties["Groups"]) > 0: 4847 self.peer_group_added = True 4848 if len(changed_properties["Groups"]) == 0: 4849 if not self.peer_group_added: 4850 # This is likely a leftover event from an earlier test case, 4851 # ignore it to allow this test case to go through its steps. 4852 logger.info("Ignore propertiesChanged indicating group removal before group has been added") 4853 return 4854 self.peer_group_removed = True 4855 self.loop.quit() 4856 4857 def run_test(self, *args): 4858 logger.debug("run_test") 4859 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'})) 4860 return False 4861 4862 def success(self): 4863 return self.done and self.peer_group_added and self.peer_group_removed 4864 4865 with TestDbusP2p(bus) as t: 4866 if not t.success(): 4867 raise Exception("Expected signals not seen") 4868 4869def test_dbus_p2p_group_termination_by_go(dev, apdev): 4870 """D-Bus P2P group removal on GO terminating the group""" 4871 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 4872 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4873 addr0 = dev[0].p2p_dev_addr() 4874 dev[1].p2p_listen() 4875 4876 class TestDbusP2p(TestDbus): 4877 def __init__(self, bus): 4878 TestDbus.__init__(self, bus) 4879 self.done = False 4880 self.peer_group_added = False 4881 self.peer_group_removed = False 4882 4883 def __enter__(self): 4884 gobject.timeout_add(1, self.run_test) 4885 gobject.timeout_add(15000, self.timeout) 4886 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 4887 "DeviceFound") 4888 self.add_signal(self.goNegotiationSuccess, 4889 WPAS_DBUS_IFACE_P2PDEVICE, 4890 "GONegotiationSuccess", 4891 byte_arrays=True) 4892 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 4893 "GroupStarted") 4894 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, 4895 "GroupFinished") 4896 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE, 4897 "PropertiesChanged") 4898 self.loop.run() 4899 return self 4900 4901 def deviceFound(self, path): 4902 logger.debug("deviceFound: path=%s" % path) 4903 args = {'peer': path, 'wps_method': 'keypad', 'pin': '12345670', 4904 'go_intent': 0} 4905 p2p.Connect(args) 4906 4907 ev = dev[1].wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15) 4908 if ev is None: 4909 raise Exception("Timeout while waiting for GO Neg Request") 4910 dev[1].global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15") 4911 ev = dev[1].wait_global_event(["P2P-GROUP-STARTED"], timeout=15) 4912 if ev is None: 4913 raise Exception("Group formation timed out") 4914 self.sta_group_ev = ev 4915 4916 def goNegotiationSuccess(self, properties): 4917 logger.debug("goNegotiationSuccess: properties=%s" % str(properties)) 4918 4919 def groupStarted(self, properties): 4920 logger.debug("groupStarted: " + str(properties)) 4921 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 4922 properties['interface_object']) 4923 dev[1].group_form_result(self.sta_group_ev) 4924 dev[1].remove_group() 4925 4926 def groupFinished(self, properties): 4927 logger.debug("groupFinished: " + str(properties)) 4928 self.done = True 4929 4930 def propertiesChanged(self, interface_name, changed_properties, 4931 invalidated_properties): 4932 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties))) 4933 if interface_name != WPAS_DBUS_P2P_PEER: 4934 return 4935 if "Groups" not in changed_properties: 4936 return 4937 if len(changed_properties["Groups"]) > 0: 4938 self.peer_group_added = True 4939 if len(changed_properties["Groups"]) == 0 and self.peer_group_added: 4940 self.peer_group_removed = True 4941 self.loop.quit() 4942 4943 def run_test(self, *args): 4944 logger.debug("run_test") 4945 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'})) 4946 return False 4947 4948 def success(self): 4949 return self.done and self.peer_group_added and self.peer_group_removed 4950 4951 with TestDbusP2p(bus) as t: 4952 if not t.success(): 4953 raise Exception("Expected signals not seen") 4954 4955def test_dbus_p2p_group_idle_timeout(dev, apdev): 4956 """D-Bus P2P group removal on idle timeout""" 4957 try: 4958 dev[0].global_request("SET p2p_group_idle 1") 4959 _test_dbus_p2p_group_idle_timeout(dev, apdev) 4960 finally: 4961 dev[0].global_request("SET p2p_group_idle 0") 4962 4963def _test_dbus_p2p_group_idle_timeout(dev, apdev): 4964 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 4965 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4966 addr0 = dev[0].p2p_dev_addr() 4967 dev[1].p2p_listen() 4968 4969 class TestDbusP2p(TestDbus): 4970 def __init__(self, bus): 4971 TestDbus.__init__(self, bus) 4972 self.done = False 4973 self.group_started = False 4974 self.peer_group_added = False 4975 self.peer_group_removed = False 4976 4977 def __enter__(self): 4978 gobject.timeout_add(1, self.run_test) 4979 gobject.timeout_add(15000, self.timeout) 4980 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 4981 "DeviceFound") 4982 self.add_signal(self.goNegotiationSuccess, 4983 WPAS_DBUS_IFACE_P2PDEVICE, 4984 "GONegotiationSuccess", 4985 byte_arrays=True) 4986 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 4987 "GroupStarted") 4988 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, 4989 "GroupFinished") 4990 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE, 4991 "PropertiesChanged") 4992 self.loop.run() 4993 return self 4994 4995 def deviceFound(self, path): 4996 logger.debug("deviceFound: path=%s" % path) 4997 args = {'peer': path, 'wps_method': 'keypad', 'pin': '12345670', 4998 'go_intent': 0} 4999 p2p.Connect(args) 5000 5001 ev = dev[1].wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15) 5002 if ev is None: 5003 raise Exception("Timeout while waiting for GO Neg Request") 5004 dev[1].global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15") 5005 ev = dev[1].wait_global_event(["P2P-GROUP-STARTED"], timeout=15) 5006 if ev is None: 5007 raise Exception("Group formation timed out") 5008 self.sta_group_ev = ev 5009 5010 def goNegotiationSuccess(self, properties): 5011 logger.debug("goNegotiationSuccess: properties=%s" % str(properties)) 5012 5013 def groupStarted(self, properties): 5014 logger.debug("groupStarted: " + str(properties)) 5015 self.group_started = True 5016 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 5017 properties['interface_object']) 5018 dev[1].group_form_result(self.sta_group_ev) 5019 ifaddr = dev[1].group_request("STA-FIRST").splitlines()[0] 5020 # Force disassociation with different reason code so that the 5021 # P2P Client using D-Bus does not get normal group termination event 5022 # from the GO. 5023 dev[1].group_request("DEAUTHENTICATE " + ifaddr + " reason=0 test=0") 5024 dev[1].remove_group() 5025 5026 def groupFinished(self, properties): 5027 logger.debug("groupFinished: " + str(properties)) 5028 self.done = True 5029 5030 def propertiesChanged(self, interface_name, changed_properties, 5031 invalidated_properties): 5032 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties))) 5033 if interface_name != WPAS_DBUS_P2P_PEER: 5034 return 5035 if not self.group_started: 5036 return 5037 if "Groups" not in changed_properties: 5038 return 5039 if len(changed_properties["Groups"]) > 0: 5040 self.peer_group_added = True 5041 if len(changed_properties["Groups"]) == 0: 5042 self.peer_group_removed = True 5043 self.loop.quit() 5044 5045 def run_test(self, *args): 5046 logger.debug("run_test") 5047 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'})) 5048 return False 5049 5050 def success(self): 5051 return self.done and self.peer_group_added and self.peer_group_removed 5052 5053 with TestDbusP2p(bus) as t: 5054 if not t.success(): 5055 raise Exception("Expected signals not seen") 5056 5057def test_dbus_p2p_wps_failure(dev, apdev): 5058 """D-Bus P2P WPS failure""" 5059 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5060 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 5061 addr0 = dev[0].p2p_dev_addr() 5062 5063 class TestDbusP2p(TestDbus): 5064 def __init__(self, bus): 5065 TestDbus.__init__(self, bus) 5066 self.wps_failed = False 5067 self.formation_failure = False 5068 5069 def __enter__(self): 5070 gobject.timeout_add(1, self.run_test) 5071 gobject.timeout_add(15000, self.timeout) 5072 self.add_signal(self.goNegotiationRequest, 5073 WPAS_DBUS_IFACE_P2PDEVICE, 5074 "GONegotiationRequest", 5075 byte_arrays=True) 5076 self.add_signal(self.goNegotiationSuccess, 5077 WPAS_DBUS_IFACE_P2PDEVICE, 5078 "GONegotiationSuccess", 5079 byte_arrays=True) 5080 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 5081 "GroupStarted") 5082 self.add_signal(self.wpsFailed, WPAS_DBUS_IFACE_P2PDEVICE, 5083 "WpsFailed") 5084 self.add_signal(self.groupFormationFailure, 5085 WPAS_DBUS_IFACE_P2PDEVICE, 5086 "GroupFormationFailure") 5087 self.loop.run() 5088 return self 5089 5090 def goNegotiationRequest(self, path, dev_passwd_id, go_intent=0): 5091 logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path, dev_passwd_id, go_intent)) 5092 if dev_passwd_id != 1: 5093 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id) 5094 args = {'peer': path, 'wps_method': 'display', 'pin': '12345670', 5095 'go_intent': 15} 5096 p2p.Connect(args) 5097 5098 def goNegotiationSuccess(self, properties): 5099 logger.debug("goNegotiationSuccess: properties=%s" % str(properties)) 5100 5101 def groupStarted(self, properties): 5102 logger.debug("groupStarted: " + str(properties)) 5103 raise Exception("Unexpected GroupStarted") 5104 5105 def wpsFailed(self, name, args): 5106 logger.debug("wpsFailed - name=%s args=%s" % (name, str(args))) 5107 self.wps_failed = True 5108 if self.formation_failure: 5109 self.loop.quit() 5110 5111 def groupFormationFailure(self, reason): 5112 logger.debug("groupFormationFailure - reason=%s" % reason) 5113 self.formation_failure = True 5114 if self.wps_failed: 5115 self.loop.quit() 5116 5117 def run_test(self, *args): 5118 logger.debug("run_test") 5119 p2p.Listen(10) 5120 if not dev[1].discover_peer(addr0): 5121 raise Exception("Peer not found") 5122 dev[1].global_request("P2P_CONNECT " + addr0 + " 87654321 enter") 5123 return False 5124 5125 def success(self): 5126 return self.wps_failed and self.formation_failure 5127 5128 with TestDbusP2p(bus) as t: 5129 if not t.success(): 5130 raise Exception("Expected signals not seen") 5131 5132def test_dbus_p2p_two_groups(dev, apdev): 5133 """D-Bus P2P with two concurrent groups""" 5134 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5135 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 5136 5137 dev[0].request("SET p2p_no_group_iface 0") 5138 addr0 = dev[0].p2p_dev_addr() 5139 addr1 = dev[1].p2p_dev_addr() 5140 addr2 = dev[2].p2p_dev_addr() 5141 dev[1].p2p_start_go(freq=2412) 5142 5143 class TestDbusP2p(TestDbus): 5144 def __init__(self, bus): 5145 TestDbus.__init__(self, bus) 5146 self.done = False 5147 self.peer = None 5148 self.go = None 5149 self.group1 = None 5150 self.group2 = None 5151 self.groups_removed = False 5152 5153 def __enter__(self): 5154 gobject.timeout_add(1, self.run_test) 5155 gobject.timeout_add(15000, self.timeout) 5156 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE, 5157 "PropertiesChanged", byte_arrays=True) 5158 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 5159 "DeviceFound") 5160 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 5161 "GroupStarted") 5162 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, 5163 "GroupFinished") 5164 self.add_signal(self.peerJoined, WPAS_DBUS_GROUP, 5165 "PeerJoined") 5166 self.loop.run() 5167 return self 5168 5169 def propertiesChanged(self, interface_name, changed_properties, 5170 invalidated_properties): 5171 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties))) 5172 5173 def deviceFound(self, path): 5174 logger.debug("deviceFound: path=%s" % path) 5175 if addr2.replace(':', '') in path: 5176 self.peer = path 5177 elif addr1.replace(':', '') in path: 5178 self.go = path 5179 if self.go and not self.group1: 5180 logger.info("Join the group") 5181 p2p.StopFind() 5182 pin = '12345670' 5183 dev[1].group_request("WPS_PIN any " + pin) 5184 args = {'peer': self.go, 5185 'join': True, 5186 'wps_method': 'pin', 5187 'pin': pin, 5188 'frequency': 2412} 5189 p2p.Connect(args) 5190 5191 def groupStarted(self, properties): 5192 logger.debug("groupStarted: " + str(properties)) 5193 prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE, 5194 dbus_interface=dbus.PROPERTIES_IFACE) 5195 logger.debug("p2pdevice properties: " + str(prop)) 5196 5197 g_obj = bus.get_object(WPAS_DBUS_SERVICE, 5198 properties['group_object']) 5199 res = g_obj.GetAll(WPAS_DBUS_GROUP, 5200 dbus_interface=dbus.PROPERTIES_IFACE, 5201 byte_arrays=True) 5202 logger.debug("Group properties: " + str(res)) 5203 5204 if not self.group1: 5205 self.group1 = properties['group_object'] 5206 self.group1iface = properties['interface_object'] 5207 self.g1_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 5208 self.group1iface) 5209 5210 logger.info("Start autonomous GO") 5211 params = dbus.Dictionary({'frequency': 2412}) 5212 p2p.GroupAdd(params) 5213 elif not self.group2: 5214 self.group2 = properties['group_object'] 5215 self.group2iface = properties['interface_object'] 5216 self.g2_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 5217 self.group2iface) 5218 self.g2_bssid = res['BSSID'] 5219 5220 if self.group1 and self.group2: 5221 logger.info("Authorize peer to join the group") 5222 a2 = binascii.unhexlify(addr2.replace(':', '')) 5223 params = {'Role': 'enrollee', 5224 'P2PDeviceAddress': dbus.ByteArray(a2), 5225 'Bssid': dbus.ByteArray(a2), 5226 'Type': 'pin', 5227 'Pin': '12345670'} 5228 g_wps = dbus.Interface(self.g2_if_obj, WPAS_DBUS_IFACE_WPS) 5229 g_wps.Start(params) 5230 5231 bssid = ':'.join(["%02x" % i for i in struct.unpack('6B', self.g2_bssid)]) 5232 dev[2].scan_for_bss(bssid, freq=2412) 5233 dev[2].global_request("P2P_CONNECT " + bssid + " 12345670 join freq=2412") 5234 ev = dev[2].wait_global_event(["P2P-GROUP-STARTED"], timeout=15) 5235 if ev is None: 5236 raise Exception("Group join timed out") 5237 dev[2].group_form_result(ev) 5238 5239 def groupFinished(self, properties): 5240 logger.debug("groupFinished: " + str(properties)) 5241 5242 if self.group1 == properties['group_object']: 5243 self.group1 = None 5244 elif self.group2 == properties['group_object']: 5245 self.group2 = None 5246 5247 if not self.group1 and not self.group2: 5248 self.done = True 5249 self.loop.quit() 5250 5251 def peerJoined(self, peer): 5252 logger.debug("peerJoined: " + peer) 5253 if self.groups_removed: 5254 return 5255 self.check_results() 5256 5257 dev[2].remove_group() 5258 5259 logger.info("Disconnect group2") 5260 group_p2p = dbus.Interface(self.g2_if_obj, 5261 WPAS_DBUS_IFACE_P2PDEVICE) 5262 group_p2p.Disconnect() 5263 5264 logger.info("Disconnect group1") 5265 group_p2p = dbus.Interface(self.g1_if_obj, 5266 WPAS_DBUS_IFACE_P2PDEVICE) 5267 group_p2p.Disconnect() 5268 self.groups_removed = True 5269 5270 def check_results(self): 5271 logger.info("Check results with two concurrent groups in operation") 5272 5273 g1_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group1) 5274 res1 = g1_obj.GetAll(WPAS_DBUS_GROUP, 5275 dbus_interface=dbus.PROPERTIES_IFACE, 5276 byte_arrays=True) 5277 5278 g2_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group2) 5279 res2 = g2_obj.GetAll(WPAS_DBUS_GROUP, 5280 dbus_interface=dbus.PROPERTIES_IFACE, 5281 byte_arrays=True) 5282 5283 logger.info("group1 = " + self.group1) 5284 logger.debug("Group properties: " + str(res1)) 5285 5286 logger.info("group2 = " + self.group2) 5287 logger.debug("Group properties: " + str(res2)) 5288 5289 prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE, 5290 dbus_interface=dbus.PROPERTIES_IFACE) 5291 logger.debug("p2pdevice properties: " + str(prop)) 5292 5293 if res1['Role'] != 'client': 5294 raise Exception("Group1 role reported incorrectly: " + res1['Role']) 5295 if res2['Role'] != 'GO': 5296 raise Exception("Group2 role reported incorrectly: " + res2['Role']) 5297 if prop['Role'] != 'device': 5298 raise Exception("p2pdevice role reported incorrectly: " + prop['Role']) 5299 5300 if len(res2['Members']) != 1: 5301 raise Exception("Unexpected Members value for group 2") 5302 5303 def run_test(self, *args): 5304 logger.debug("run_test") 5305 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'})) 5306 return False 5307 5308 def success(self): 5309 return self.done 5310 5311 with TestDbusP2p(bus) as t: 5312 if not t.success(): 5313 raise Exception("Expected signals not seen") 5314 5315 dev[1].remove_group() 5316 5317def test_dbus_p2p_cancel(dev, apdev): 5318 """D-Bus P2P Cancel""" 5319 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5320 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 5321 try: 5322 p2p.Cancel() 5323 raise Exception("Unexpected p2p.Cancel() success") 5324 except dbus.exceptions.DBusException as e: 5325 pass 5326 5327 addr0 = dev[0].p2p_dev_addr() 5328 dev[1].p2p_listen() 5329 5330 class TestDbusP2p(TestDbus): 5331 def __init__(self, bus): 5332 TestDbus.__init__(self, bus) 5333 self.done = False 5334 5335 def __enter__(self): 5336 gobject.timeout_add(1, self.run_test) 5337 gobject.timeout_add(15000, self.timeout) 5338 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 5339 "DeviceFound") 5340 self.loop.run() 5341 return self 5342 5343 def deviceFound(self, path): 5344 logger.debug("deviceFound: path=%s" % path) 5345 args = {'peer': path, 'wps_method': 'keypad', 'pin': '12345670', 5346 'go_intent': 0} 5347 p2p.Connect(args) 5348 p2p.Cancel() 5349 self.done = True 5350 self.loop.quit() 5351 5352 def run_test(self, *args): 5353 logger.debug("run_test") 5354 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'})) 5355 return False 5356 5357 def success(self): 5358 return self.done 5359 5360 with TestDbusP2p(bus) as t: 5361 if not t.success(): 5362 raise Exception("Expected signals not seen") 5363 5364def test_dbus_p2p_ip_addr(dev, apdev): 5365 """D-Bus P2P and IP address parameters""" 5366 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5367 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 5368 5369 vals = [("IpAddrGo", "192.168.43.1"), 5370 ("IpAddrMask", "255.255.255.0"), 5371 ("IpAddrStart", "192.168.43.100"), 5372 ("IpAddrEnd", "192.168.43.199")] 5373 for field, value in vals: 5374 if_obj.Set(WPAS_DBUS_IFACE, field, value, 5375 dbus_interface=dbus.PROPERTIES_IFACE) 5376 val = if_obj.Get(WPAS_DBUS_IFACE, field, 5377 dbus_interface=dbus.PROPERTIES_IFACE) 5378 if val != value: 5379 raise Exception("Unexpected %s value: %s" % (field, val)) 5380 5381 set_ip_addr_info(dev[1]) 5382 5383 dev[0].global_request("SET p2p_go_intent 0") 5384 5385 req = dev[0].global_request("NFC_GET_HANDOVER_REQ NDEF P2P-CR").rstrip() 5386 if "FAIL" in req: 5387 raise Exception("Failed to generate NFC connection handover request") 5388 sel = dev[1].global_request("NFC_GET_HANDOVER_SEL NDEF P2P-CR").rstrip() 5389 if "FAIL" in sel: 5390 raise Exception("Failed to generate NFC connection handover select") 5391 dev[0].dump_monitor() 5392 dev[1].dump_monitor() 5393 res = dev[1].global_request("NFC_REPORT_HANDOVER RESP P2P " + req + " " + sel) 5394 if "FAIL" in res: 5395 raise Exception("Failed to report NFC connection handover to wpa_supplicant(resp)") 5396 res = dev[0].global_request("NFC_REPORT_HANDOVER INIT P2P " + req + " " + sel) 5397 if "FAIL" in res: 5398 raise Exception("Failed to report NFC connection handover to wpa_supplicant(init)") 5399 5400 class TestDbusP2p(TestDbus): 5401 def __init__(self, bus): 5402 TestDbus.__init__(self, bus) 5403 self.done = False 5404 5405 def __enter__(self): 5406 gobject.timeout_add(1, self.run_test) 5407 gobject.timeout_add(15000, self.timeout) 5408 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 5409 "GroupStarted") 5410 self.loop.run() 5411 return self 5412 5413 def groupStarted(self, properties): 5414 logger.debug("groupStarted: " + str(properties)) 5415 self.loop.quit() 5416 5417 if 'IpAddrGo' not in properties: 5418 logger.info("IpAddrGo missing from GroupStarted") 5419 ip_addr_go = properties['IpAddrGo'] 5420 addr = "%d.%d.%d.%d" % (ip_addr_go[0], ip_addr_go[1], ip_addr_go[2], ip_addr_go[3]) 5421 if addr != "192.168.42.1": 5422 logger.info("Unexpected IpAddrGo value: " + addr) 5423 self.done = True 5424 5425 def run_test(self, *args): 5426 logger.debug("run_test") 5427 return False 5428 5429 def success(self): 5430 return self.done 5431 5432 with TestDbusP2p(bus) as t: 5433 if not t.success(): 5434 raise Exception("Expected signals not seen") 5435 5436def test_dbus_introspect(dev, apdev): 5437 """D-Bus introspection""" 5438 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5439 5440 res = if_obj.Introspect(WPAS_DBUS_IFACE, 5441 dbus_interface=dbus.INTROSPECTABLE_IFACE) 5442 logger.info("Initial Introspect: " + str(res)) 5443 if res is None or "Introspectable" not in res or "GroupStarted" not in res: 5444 raise Exception("Unexpected initial Introspect response: " + str(res)) 5445 if "FastReauth" not in res or "PassiveScan" not in res: 5446 raise Exception("Unexpected initial Introspect response: " + str(res)) 5447 5448 with alloc_fail(dev[0], 1, "wpa_dbus_introspect"): 5449 res2 = if_obj.Introspect(WPAS_DBUS_IFACE, 5450 dbus_interface=dbus.INTROSPECTABLE_IFACE) 5451 logger.info("Introspect: " + str(res2)) 5452 if res2 is not None: 5453 raise Exception("Unexpected Introspect response") 5454 5455 with alloc_fail(dev[0], 1, "=add_interface;wpa_dbus_introspect"): 5456 res2 = if_obj.Introspect(WPAS_DBUS_IFACE, 5457 dbus_interface=dbus.INTROSPECTABLE_IFACE) 5458 logger.info("Introspect: " + str(res2)) 5459 if res2 is None: 5460 raise Exception("No Introspect response") 5461 if len(res2) >= len(res): 5462 raise Exception("Unexpected Introspect response") 5463 5464 with alloc_fail(dev[0], 1, "wpabuf_alloc;add_interface;wpa_dbus_introspect"): 5465 res2 = if_obj.Introspect(WPAS_DBUS_IFACE, 5466 dbus_interface=dbus.INTROSPECTABLE_IFACE) 5467 logger.info("Introspect: " + str(res2)) 5468 if res2 is None: 5469 raise Exception("No Introspect response") 5470 if len(res2) >= len(res): 5471 raise Exception("Unexpected Introspect response") 5472 5473 with alloc_fail(dev[0], 2, "=add_interface;wpa_dbus_introspect"): 5474 res2 = if_obj.Introspect(WPAS_DBUS_IFACE, 5475 dbus_interface=dbus.INTROSPECTABLE_IFACE) 5476 logger.info("Introspect: " + str(res2)) 5477 if res2 is None: 5478 raise Exception("No Introspect response") 5479 if len(res2) >= len(res): 5480 raise Exception("Unexpected Introspect response") 5481 5482def run_busctl(service, obj): 5483 if not shutil.which("busctl"): 5484 raise HwsimSkip("No busctl available") 5485 logger.info("busctl introspect %s %s" % (service, obj)) 5486 cmd = subprocess.Popen(['busctl', 'introspect', service, obj], 5487 stdout=subprocess.PIPE, 5488 stderr=subprocess.PIPE) 5489 out = cmd.communicate() 5490 cmd.wait() 5491 logger.info("busctl stdout:\n%s" % out[0].strip()) 5492 if len(out[1]) > 0: 5493 logger.info("busctl stderr: %s" % out[1].decode().strip()) 5494 if "Duplicate property" in out[1].decode(): 5495 raise Exception("Duplicate property") 5496 5497def test_dbus_introspect_busctl(dev, apdev): 5498 """D-Bus introspection with busctl""" 5499 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5500 ifaces = dbus_get(dbus, wpas_obj, "Interfaces") 5501 run_busctl(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH) 5502 run_busctl(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH + "/Interfaces") 5503 run_busctl(WPAS_DBUS_SERVICE, ifaces[0]) 5504 5505 hapd = hostapd.add_ap(apdev[0], {"ssid": "open"}) 5506 bssid = apdev[0]['bssid'] 5507 dev[0].scan_for_bss(bssid, freq=2412) 5508 id = dev[0].add_network() 5509 dev[0].set_network(id, "disabled", "0") 5510 dev[0].set_network_quoted(id, "ssid", "test") 5511 5512 run_busctl(WPAS_DBUS_SERVICE, ifaces[0] + "/BSSs/0") 5513 run_busctl(WPAS_DBUS_SERVICE, ifaces[0] + "/Networks/0") 5514 5515def test_dbus_ap(dev, apdev): 5516 """D-Bus AddNetwork for AP mode""" 5517 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5518 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 5519 5520 ssid = "test-wpa2-psk" 5521 passphrase = 'qwertyuiop' 5522 5523 class TestDbusConnect(TestDbus): 5524 def __init__(self, bus): 5525 TestDbus.__init__(self, bus) 5526 self.started = False 5527 self.sta_added = False 5528 self.sta_removed = False 5529 self.authorized = False 5530 self.deauthorized = False 5531 self.stations = False 5532 5533 def __enter__(self): 5534 gobject.timeout_add(1, self.run_connect) 5535 gobject.timeout_add(15000, self.timeout) 5536 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded") 5537 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE, 5538 "NetworkSelected") 5539 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, 5540 "PropertiesChanged") 5541 self.add_signal(self.stationAdded, WPAS_DBUS_IFACE, "StationAdded") 5542 self.add_signal(self.stationRemoved, WPAS_DBUS_IFACE, 5543 "StationRemoved") 5544 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE, 5545 "StaAuthorized") 5546 self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE, 5547 "StaDeauthorized") 5548 self.loop.run() 5549 return self 5550 5551 def networkAdded(self, network, properties): 5552 logger.debug("networkAdded: %s" % str(network)) 5553 logger.debug(str(properties)) 5554 5555 def networkSelected(self, network): 5556 logger.debug("networkSelected: %s" % str(network)) 5557 self.network_selected = True 5558 5559 def propertiesChanged(self, properties): 5560 logger.debug("propertiesChanged: %s" % str(properties)) 5561 if 'State' in properties and properties['State'] == "completed": 5562 self.started = True 5563 dev[1].connect(ssid, psk=passphrase, scan_freq="2412") 5564 5565 def stationAdded(self, station, properties): 5566 logger.debug("stationAdded: %s" % str(station)) 5567 logger.debug(str(properties)) 5568 self.sta_added = True 5569 res = if_obj.Get(WPAS_DBUS_IFACE, 'Stations', 5570 dbus_interface=dbus.PROPERTIES_IFACE) 5571 logger.info("Stations: " + str(res)) 5572 if len(res) == 1: 5573 self.stations = True 5574 else: 5575 raise Exception("Missing Stations entry: " + str(res)) 5576 5577 def stationRemoved(self, station): 5578 logger.debug("stationRemoved: %s" % str(station)) 5579 self.sta_removed = True 5580 res = if_obj.Get(WPAS_DBUS_IFACE, 'Stations', 5581 dbus_interface=dbus.PROPERTIES_IFACE) 5582 logger.info("Stations: " + str(res)) 5583 if len(res) != 0: 5584 self.stations = False 5585 raise Exception("Unexpected Stations entry: " + str(res)) 5586 self.loop.quit() 5587 5588 def staAuthorized(self, name): 5589 logger.debug("staAuthorized: " + name) 5590 self.authorized = True 5591 dev[1].request("DISCONNECT") 5592 5593 def staDeauthorized(self, name): 5594 logger.debug("staDeauthorized: " + name) 5595 self.deauthorized = True 5596 5597 def run_connect(self, *args): 5598 logger.debug("run_connect") 5599 args = dbus.Dictionary({'ssid': ssid, 5600 'key_mgmt': 'WPA-PSK', 5601 'psk': passphrase, 5602 'mode': 2, 5603 'frequency': 2412, 5604 'scan_freq': 2412}, 5605 signature='sv') 5606 self.netw = iface.AddNetwork(args) 5607 iface.SelectNetwork(self.netw) 5608 return False 5609 5610 def success(self): 5611 return self.started and self.sta_added and self.sta_removed and \ 5612 self.authorized and self.deauthorized 5613 5614 with TestDbusConnect(bus) as t: 5615 if not t.success(): 5616 raise Exception("Expected signals not seen") 5617 5618def test_dbus_ap_scan(dev, apdev): 5619 """D-Bus AddNetwork for AP mode and scan""" 5620 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5621 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 5622 5623 ssid = "test-wpa2-psk" 5624 passphrase = 'qwertyuiop' 5625 5626 hapd = hostapd.add_ap(apdev[0], {"ssid": "open"}) 5627 bssid = hapd.own_addr() 5628 5629 class TestDbusConnect(TestDbus): 5630 def __init__(self, bus): 5631 TestDbus.__init__(self, bus) 5632 self.started = False 5633 self.scan_completed = False 5634 5635 def __enter__(self): 5636 gobject.timeout_add(1, self.run_connect) 5637 gobject.timeout_add(15000, self.timeout) 5638 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, 5639 "PropertiesChanged") 5640 self.add_signal(self.scanDone, WPAS_DBUS_IFACE, "ScanDone") 5641 self.loop.run() 5642 return self 5643 5644 def propertiesChanged(self, properties): 5645 logger.debug("propertiesChanged: %s" % str(properties)) 5646 if 'State' in properties and properties['State'] == "completed": 5647 self.started = True 5648 logger.info("Try to scan in AP mode") 5649 iface.Scan({'Type': 'active', 5650 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]}) 5651 logger.info("Scan() returned") 5652 5653 def scanDone(self, success): 5654 logger.debug("scanDone: success=%s" % success) 5655 if self.started: 5656 self.scan_completed = True 5657 self.loop.quit() 5658 5659 def run_connect(self, *args): 5660 logger.debug("run_connect") 5661 args = dbus.Dictionary({'ssid': ssid, 5662 'key_mgmt': 'WPA-PSK', 5663 'psk': passphrase, 5664 'mode': 2, 5665 'frequency': 2412, 5666 'scan_freq': 2412}, 5667 signature='sv') 5668 self.netw = iface.AddNetwork(args) 5669 iface.SelectNetwork(self.netw) 5670 return False 5671 5672 def success(self): 5673 return self.started and self.scan_completed 5674 5675 with TestDbusConnect(bus) as t: 5676 if not t.success(): 5677 raise Exception("Expected signals not seen") 5678 5679def test_dbus_connect_wpa_eap(dev, apdev): 5680 """D-Bus AddNetwork and connection with WPA+WPA2-Enterprise AP""" 5681 skip_without_tkip(dev[0]) 5682 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5683 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 5684 5685 ssid = "test-wpa-eap" 5686 params = hostapd.wpa_eap_params(ssid=ssid) 5687 params["wpa"] = "3" 5688 params["rsn_pairwise"] = "CCMP" 5689 hapd = hostapd.add_ap(apdev[0], params) 5690 5691 class TestDbusConnect(TestDbus): 5692 def __init__(self, bus): 5693 TestDbus.__init__(self, bus) 5694 self.done = False 5695 5696 def __enter__(self): 5697 gobject.timeout_add(1, self.run_connect) 5698 gobject.timeout_add(15000, self.timeout) 5699 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, 5700 "PropertiesChanged") 5701 self.add_signal(self.eap, WPAS_DBUS_IFACE, "EAP") 5702 self.loop.run() 5703 return self 5704 5705 def propertiesChanged(self, properties): 5706 logger.debug("propertiesChanged: %s" % str(properties)) 5707 if 'State' in properties and properties['State'] == "completed": 5708 self.done = True 5709 self.loop.quit() 5710 5711 def eap(self, status, parameter): 5712 logger.debug("EAP: status=%s parameter=%s" % (status, parameter)) 5713 5714 def run_connect(self, *args): 5715 logger.debug("run_connect") 5716 args = dbus.Dictionary({'ssid': ssid, 5717 'key_mgmt': 'WPA-EAP', 5718 'eap': 'PEAP', 5719 'identity': 'user', 5720 'password': 'password', 5721 'ca_cert': 'auth_serv/ca.pem', 5722 'phase2': 'auth=MSCHAPV2', 5723 'scan_freq': 2412}, 5724 signature='sv') 5725 self.netw = iface.AddNetwork(args) 5726 iface.SelectNetwork(self.netw) 5727 return False 5728 5729 def success(self): 5730 return self.done 5731 5732 with TestDbusConnect(bus) as t: 5733 if not t.success(): 5734 raise Exception("Expected signals not seen") 5735 5736def test_dbus_ap_scan_2_ap_mode_scan(dev, apdev): 5737 """AP_SCAN 2 AP mode and D-Bus Scan()""" 5738 try: 5739 _test_dbus_ap_scan_2_ap_mode_scan(dev, apdev) 5740 finally: 5741 dev[0].request("AP_SCAN 1") 5742 5743def _test_dbus_ap_scan_2_ap_mode_scan(dev, apdev): 5744 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5745 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 5746 5747 if "OK" not in dev[0].request("AP_SCAN 2"): 5748 raise Exception("Failed to set AP_SCAN 2") 5749 5750 id = dev[0].add_network() 5751 dev[0].set_network(id, "mode", "2") 5752 dev[0].set_network_quoted(id, "ssid", "wpas-ap-open") 5753 dev[0].set_network(id, "key_mgmt", "NONE") 5754 dev[0].set_network(id, "frequency", "2412") 5755 dev[0].set_network(id, "scan_freq", "2412") 5756 dev[0].set_network(id, "disabled", "0") 5757 dev[0].select_network(id) 5758 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=5) 5759 if ev is None: 5760 raise Exception("AP failed to start") 5761 5762 with fail_test(dev[0], 1, "wpa_driver_nl80211_scan"): 5763 iface.Scan({'Type': 'active', 5764 'AllowRoam': True, 5765 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]}) 5766 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-FAILED", 5767 "AP-DISABLED"], timeout=5) 5768 if ev is None: 5769 raise Exception("CTRL-EVENT-SCAN-FAILED not seen") 5770 if "AP-DISABLED" in ev: 5771 raise Exception("Unexpected AP-DISABLED event") 5772 if "retry=1" in ev: 5773 # Wait for the retry to scan happen 5774 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-FAILED", 5775 "AP-DISABLED"], timeout=5) 5776 if ev is None: 5777 raise Exception("CTRL-EVENT-SCAN-FAILED not seen - retry") 5778 if "AP-DISABLED" in ev: 5779 raise Exception("Unexpected AP-DISABLED event - retry") 5780 5781 dev[1].connect("wpas-ap-open", key_mgmt="NONE", scan_freq="2412") 5782 dev[1].request("DISCONNECT") 5783 dev[1].wait_disconnected() 5784 dev[0].request("DISCONNECT") 5785 dev[0].wait_disconnected() 5786 5787def test_dbus_expectdisconnect(dev, apdev): 5788 """D-Bus ExpectDisconnect""" 5789 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5790 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE) 5791 5792 params = {"ssid": "test-open"} 5793 hapd = hostapd.add_ap(apdev[0], params) 5794 dev[0].connect("test-open", key_mgmt="NONE", scan_freq="2412") 5795 5796 # This does not really verify the behavior other than by going through the 5797 # code path for additional coverage. 5798 wpas.ExpectDisconnect() 5799 dev[0].request("DISCONNECT") 5800 dev[0].wait_disconnected() 5801 5802def test_dbus_save_config(dev, apdev): 5803 """D-Bus SaveConfig""" 5804 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5805 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 5806 try: 5807 iface.SaveConfig() 5808 raise Exception("SaveConfig() accepted unexpectedly") 5809 except dbus.exceptions.DBusException as e: 5810 if not str(e).startswith("fi.w1.wpa_supplicant1.UnknownError: Not allowed to update configuration"): 5811 raise Exception("Unexpected error message for SaveConfig(): " + str(e)) 5812 5813def test_dbus_vendor_elem(dev, apdev): 5814 """D-Bus vendor element operations""" 5815 try: 5816 _test_dbus_vendor_elem(dev, apdev) 5817 finally: 5818 dev[0].request("VENDOR_ELEM_REMOVE 1 *") 5819 5820def _test_dbus_vendor_elem(dev, apdev): 5821 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5822 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 5823 5824 dev[0].request("VENDOR_ELEM_REMOVE 1 *") 5825 5826 try: 5827 ie = dbus.ByteArray(b"\x00\x00") 5828 iface.VendorElemAdd(-1, ie) 5829 raise Exception("Invalid VendorElemAdd() accepted") 5830 except dbus.exceptions.DBusException as e: 5831 if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e): 5832 raise Exception("Unexpected error message for invalid VendorElemAdd[1]: " + str(e)) 5833 5834 try: 5835 ie = dbus.ByteArray(b'') 5836 iface.VendorElemAdd(1, ie) 5837 raise Exception("Invalid VendorElemAdd() accepted") 5838 except dbus.exceptions.DBusException as e: 5839 if "InvalidArgs" not in str(e) or "Invalid value" not in str(e): 5840 raise Exception("Unexpected error message for invalid VendorElemAdd[2]: " + str(e)) 5841 5842 try: 5843 ie = dbus.ByteArray(b"\x00\x01") 5844 iface.VendorElemAdd(1, ie) 5845 raise Exception("Invalid VendorElemAdd() accepted") 5846 except dbus.exceptions.DBusException as e: 5847 if "InvalidArgs" not in str(e) or "Parse error" not in str(e): 5848 raise Exception("Unexpected error message for invalid VendorElemAdd[3]: " + str(e)) 5849 5850 try: 5851 iface.VendorElemGet(-1) 5852 raise Exception("Invalid VendorElemGet() accepted") 5853 except dbus.exceptions.DBusException as e: 5854 if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e): 5855 raise Exception("Unexpected error message for invalid VendorElemGet[1]: " + str(e)) 5856 5857 try: 5858 iface.VendorElemGet(1) 5859 raise Exception("Invalid VendorElemGet() accepted") 5860 except dbus.exceptions.DBusException as e: 5861 if "InvalidArgs" not in str(e) or "ID value does not exist" not in str(e): 5862 raise Exception("Unexpected error message for invalid VendorElemGet[2]: " + str(e)) 5863 5864 try: 5865 ie = dbus.ByteArray(b"\x00\x00") 5866 iface.VendorElemRem(-1, ie) 5867 raise Exception("Invalid VendorElemRemove() accepted") 5868 except dbus.exceptions.DBusException as e: 5869 if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e): 5870 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e)) 5871 5872 try: 5873 ie = dbus.ByteArray(b'') 5874 iface.VendorElemRem(1, ie) 5875 raise Exception("Invalid VendorElemRemove() accepted") 5876 except dbus.exceptions.DBusException as e: 5877 if "InvalidArgs" not in str(e) or "Invalid value" not in str(e): 5878 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e)) 5879 5880 iface.VendorElemRem(1, b"*") 5881 5882 ie = dbus.ByteArray(b"\x00\x01\x00") 5883 iface.VendorElemAdd(1, ie) 5884 5885 val = iface.VendorElemGet(1) 5886 if len(val) != len(ie): 5887 raise Exception("Unexpected VendorElemGet length") 5888 for i in range(len(val)): 5889 if val[i] != dbus.Byte(ie[i]): 5890 raise Exception("Unexpected VendorElemGet data") 5891 5892 ie2 = dbus.ByteArray(b"\xe0\x00") 5893 iface.VendorElemAdd(1, ie2) 5894 5895 ies = ie + ie2 5896 val = iface.VendorElemGet(1) 5897 if len(val) != len(ies): 5898 raise Exception("Unexpected VendorElemGet length[2]") 5899 for i in range(len(val)): 5900 if val[i] != dbus.Byte(ies[i]): 5901 raise Exception("Unexpected VendorElemGet data[2]") 5902 5903 try: 5904 test_ie = dbus.ByteArray(b"\x01\x01") 5905 iface.VendorElemRem(1, test_ie) 5906 raise Exception("Invalid VendorElemRemove() accepted") 5907 except dbus.exceptions.DBusException as e: 5908 if "InvalidArgs" not in str(e) or "Parse error" not in str(e): 5909 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e)) 5910 5911 iface.VendorElemRem(1, ie) 5912 val = iface.VendorElemGet(1) 5913 if len(val) != len(ie2): 5914 raise Exception("Unexpected VendorElemGet length[3]") 5915 5916 iface.VendorElemRem(1, b"*") 5917 try: 5918 iface.VendorElemGet(1) 5919 raise Exception("Invalid VendorElemGet() accepted after removal") 5920 except dbus.exceptions.DBusException as e: 5921 if "InvalidArgs" not in str(e) or "ID value does not exist" not in str(e): 5922 raise Exception("Unexpected error message for invalid VendorElemGet after removal: " + str(e)) 5923 5924def test_dbus_assoc_reject(dev, apdev): 5925 """D-Bus AssocStatusCode""" 5926 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5927 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 5928 5929 ssid = "test-open" 5930 params = {"ssid": ssid, 5931 "max_listen_interval": "1"} 5932 hapd = hostapd.add_ap(apdev[0], params) 5933 5934 class TestDbusConnect(TestDbus): 5935 def __init__(self, bus): 5936 TestDbus.__init__(self, bus) 5937 self.assoc_status_seen = False 5938 self.state = 0 5939 5940 def __enter__(self): 5941 gobject.timeout_add(1, self.run_connect) 5942 gobject.timeout_add(15000, self.timeout) 5943 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, 5944 "PropertiesChanged") 5945 self.loop.run() 5946 return self 5947 5948 def propertiesChanged(self, properties): 5949 logger.debug("propertiesChanged: %s" % str(properties)) 5950 if 'AssocStatusCode' in properties: 5951 status = properties['AssocStatusCode'] 5952 if status != 51: 5953 logger.info("Unexpected status code: " + str(status)) 5954 else: 5955 self.assoc_status_seen = True 5956 iface.Disconnect() 5957 self.loop.quit() 5958 5959 def run_connect(self, *args): 5960 args = dbus.Dictionary({'ssid': ssid, 5961 'key_mgmt': 'NONE', 5962 'scan_freq': 2412}, 5963 signature='sv') 5964 self.netw = iface.AddNetwork(args) 5965 iface.SelectNetwork(self.netw) 5966 return False 5967 5968 def success(self): 5969 return self.assoc_status_seen 5970 5971 with TestDbusConnect(bus) as t: 5972 if not t.success(): 5973 raise Exception("Expected signals not seen") 5974 5975def test_dbus_mesh(dev, apdev): 5976 """D-Bus mesh""" 5977 check_mesh_support(dev[0]) 5978 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5979 mesh = dbus.Interface(if_obj, WPAS_DBUS_IFACE_MESH) 5980 5981 add_open_mesh_network(dev[1]) 5982 addr1 = dev[1].own_addr() 5983 5984 class TestDbusMesh(TestDbus): 5985 def __init__(self, bus): 5986 TestDbus.__init__(self, bus) 5987 self.done = False 5988 5989 def __enter__(self): 5990 gobject.timeout_add(1, self.run_test) 5991 gobject.timeout_add(15000, self.timeout) 5992 self.add_signal(self.meshGroupStarted, WPAS_DBUS_IFACE_MESH, 5993 "MeshGroupStarted") 5994 self.add_signal(self.meshGroupRemoved, WPAS_DBUS_IFACE_MESH, 5995 "MeshGroupRemoved") 5996 self.add_signal(self.meshPeerConnected, WPAS_DBUS_IFACE_MESH, 5997 "MeshPeerConnected") 5998 self.add_signal(self.meshPeerDisconnected, WPAS_DBUS_IFACE_MESH, 5999 "MeshPeerDisconnected") 6000 self.loop.run() 6001 return self 6002 6003 def meshGroupStarted(self, args): 6004 logger.debug("MeshGroupStarted: " + str(args)) 6005 6006 def meshGroupRemoved(self, args): 6007 logger.debug("MeshGroupRemoved: " + str(args)) 6008 self.done = True 6009 self.loop.quit() 6010 6011 def meshPeerConnected(self, args): 6012 logger.debug("MeshPeerConnected: " + str(args)) 6013 6014 res = if_obj.Get(WPAS_DBUS_IFACE_MESH, 'MeshPeers', 6015 dbus_interface=dbus.PROPERTIES_IFACE, 6016 byte_arrays=True) 6017 logger.debug("MeshPeers: " + str(res)) 6018 if len(res) != 1: 6019 raise Exception("Unexpected number of MeshPeer values") 6020 if binascii.hexlify(res[0]).decode() != addr1.replace(':', ''): 6021 raise Exception("Unexpected peer address") 6022 6023 res = if_obj.Get(WPAS_DBUS_IFACE_MESH, 'MeshGroup', 6024 dbus_interface=dbus.PROPERTIES_IFACE, 6025 byte_arrays=True) 6026 logger.debug("MeshGroup: " + str(res)) 6027 if res != b"wpas-mesh-open": 6028 raise Exception("Unexpected MeshGroup") 6029 dev[1].mesh_group_remove() 6030 6031 def meshPeerDisconnected(self, args): 6032 logger.debug("MeshPeerDisconnected: " + str(args)) 6033 dev[0].mesh_group_remove() 6034 6035 def run_test(self, *args): 6036 logger.debug("run_test") 6037 add_open_mesh_network(dev[0]) 6038 return False 6039 6040 def success(self): 6041 return self.done 6042 6043 with TestDbusMesh(bus) as t: 6044 if not t.success(): 6045 raise Exception("Expected signals not seen") 6046 6047def test_dbus_roam(dev, apdev): 6048 """D-Bus Roam""" 6049 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 6050 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 6051 6052 ssid = "test-wpa2-psk" 6053 passphrase = 'qwertyuiop' 6054 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase) 6055 hapd = hostapd.add_ap(apdev[0], params) 6056 hapd2 = hostapd.add_ap(apdev[1], params) 6057 bssid = apdev[0]['bssid'] 6058 dev[0].scan_for_bss(bssid, freq=2412) 6059 bssid2 = apdev[1]['bssid'] 6060 dev[0].scan_for_bss(bssid2, freq=2412) 6061 6062 class TestDbusConnect(TestDbus): 6063 def __init__(self, bus): 6064 TestDbus.__init__(self, bus) 6065 self.state = 0 6066 6067 def __enter__(self): 6068 gobject.timeout_add(1, self.run_connect) 6069 gobject.timeout_add(15000, self.timeout) 6070 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, 6071 "PropertiesChanged") 6072 self.loop.run() 6073 return self 6074 6075 def propertiesChanged(self, properties): 6076 logger.debug("propertiesChanged: %s" % str(properties)) 6077 if 'State' in properties and properties['State'] == "completed": 6078 if self.state == 0: 6079 self.state = 1 6080 cur = properties["CurrentBSS"] 6081 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, cur) 6082 res = bss_obj.Get(WPAS_DBUS_BSS, 'BSSID', 6083 dbus_interface=dbus.PROPERTIES_IFACE) 6084 bssid_str = '' 6085 for item in res: 6086 if len(bssid_str) > 0: 6087 bssid_str += ':' 6088 bssid_str += '%02x' % item 6089 dst = bssid if bssid_str == bssid2 else bssid2 6090 iface.Roam(dst) 6091 elif self.state == 1: 6092 if "RoamComplete" in properties and \ 6093 properties["RoamComplete"]: 6094 self.state = 2 6095 self.loop.quit() 6096 6097 def run_connect(self, *args): 6098 logger.debug("run_connect") 6099 args = dbus.Dictionary({'ssid': ssid, 6100 'key_mgmt': 'WPA-PSK', 6101 'psk': passphrase, 6102 'scan_freq': 2412}, 6103 signature='sv') 6104 self.netw = iface.AddNetwork(args) 6105 iface.SelectNetwork(self.netw) 6106 return False 6107 6108 def success(self): 6109 return self.state == 2 6110 6111 with TestDbusConnect(bus) as t: 6112 if not t.success(): 6113 raise Exception("Expected signals not seen") 6114 6115def test_dbus_creds(dev, apdev): 6116 """D-Bus interworking credentials""" 6117 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 6118 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 6119 6120 args = {'domain': ['server.w1.fi','server2.w1.fi'], 6121 'realm': 'server.w1.fi', 6122 'home_ois': '50a9bf', 6123 'required_home_ois': '23bf50', 6124 'eap': 'TTLS', 6125 'phase2': 'auth=MSCHAPV2', 6126 'username': 'user', 6127 'password': 'password', 6128 'domain_suffix_match': 'server.w1.fi', 6129 'ca_cert': 'auth_serv/ca.pem'} 6130 6131 path = iface.AddCred(dbus.Dictionary(args, signature='sv')) 6132 for k, v in args.items(): 6133 if k == 'password': 6134 continue 6135 prop = dev[0].get_cred(0, k) 6136 if isinstance(v, list): 6137 v = '\n'.join(v) 6138 if prop != v: 6139 raise Exception('Credential add failed: %s does not match %s' % (prop, v)) 6140 6141 iface.RemoveCred(path) 6142 if not "FAIL" in dev[0].get_cred(0, 'domain'): 6143 raise Exception("Credential remove failed") 6144 6145 # Removal of multiple credentials 6146 cred1 = {'domain': 'server1.w1.fi','realm': 'server1.w1.fi','eap': 'TTLS'} 6147 iface.AddCred(dbus.Dictionary(cred1, signature='sv')) 6148 if "FAIL" in dev[0].get_cred(0, 'domain'): 6149 raise Exception("Failed to add credential") 6150 6151 cred2 = {'domain': 'server2.w1.fi','realm': 'server2.w1.fi','eap': 'TTLS'} 6152 iface.AddCred(dbus.Dictionary(cred2, signature='sv')) 6153 if "FAIL" in dev[0].get_cred(1, 'domain'): 6154 raise Exception("Failed to add credential") 6155 6156 iface.RemoveAllCreds() 6157 if not "FAIL" in dev[0].get_cred(0, 'domain'): 6158 raise Exception("Credential remove failed") 6159 if not "FAIL" in dev[0].get_cred(1, 'domain'): 6160 raise Exception("Credential remove failed") 6161 6162def test_dbus_interworking(dev, apdev): 6163 """D-Bus interworking selection""" 6164 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 6165 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 6166 6167 params = {"ssid": "test-interworking", "wpa": "2", 6168 "wpa_key_mgmt": "WPA-EAP", "rsn_pairwise": "CCMP", 6169 "ieee8021x": "1", "eapol_version": "2", 6170 "eap_server": "1", "eap_user_file": "auth_serv/eap_user.conf", 6171 "ca_cert": "auth_serv/ca.pem", 6172 "server_cert": "auth_serv/server.pem", 6173 "private_key": "auth_serv/server.key", 6174 "interworking": "1", 6175 "domain_name": "server.w1.fi", 6176 "nai_realm": "0,server.w1.fi,21[2:4][5:7]", 6177 "roaming_consortium": "2233445566", 6178 "hs20": "1", "anqp_domain_id": "1234"} 6179 6180 hapd = hostapd.add_ap(apdev[0], params) 6181 6182 class TestDbusInterworking(TestDbus): 6183 def __init__(self, bus): 6184 TestDbus.__init__(self, bus) 6185 self.interworking_ap_seen = False 6186 self.interworking_select_done = False 6187 6188 def __enter__(self): 6189 gobject.timeout_add(1, self.run_select) 6190 gobject.timeout_add(15000, self.timeout) 6191 self.add_signal(self.interworkingAPAdded, WPAS_DBUS_IFACE, 6192 "InterworkingAPAdded") 6193 self.add_signal(self.interworkingSelectDone, WPAS_DBUS_IFACE, 6194 "InterworkingSelectDone") 6195 self.loop.run() 6196 return self 6197 6198 def interworkingAPAdded(self, bss, cred, properties): 6199 logger.debug("interworkingAPAdded: bss=%s cred=%s %s" % (bss, cred, str(properties))) 6200 if self.cred == cred: 6201 self.interworking_ap_seen = True 6202 6203 def interworkingSelectDone(self): 6204 logger.debug("interworkingSelectDone") 6205 self.interworking_select_done = True 6206 self.loop.quit() 6207 6208 def run_select(self, *args): 6209 args = {"domain": "server.w1.fi", 6210 "realm": "server.w1.fi", 6211 "eap": "TTLS", 6212 "phase2": "auth=MSCHAPV2", 6213 "username": "user", 6214 "password": "password", 6215 "domain_suffix_match": "server.w1.fi", 6216 "ca_cert": "auth_serv/ca.pem"} 6217 self.cred = iface.AddCred(dbus.Dictionary(args, signature='sv')) 6218 iface.InterworkingSelect() 6219 return False 6220 6221 def success(self): 6222 return self.interworking_ap_seen and self.interworking_select_done 6223 6224 with TestDbusInterworking(bus) as t: 6225 if not t.success(): 6226 raise Exception("Expected signals not seen") 6227 6228def test_dbus_hs20_terms_and_conditions(dev, apdev): 6229 """D-Bus HS2.0 Terms and Conditions acceptance""" 6230 check_eap_capa(dev[0], "MSCHAPV2") 6231 6232 (bus, wpa_obj, path, if_obj) = prepare_dbus(dev[0]) 6233 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 6234 6235 bssid = apdev[0]['bssid'] 6236 params = {"ssid": "test-hs20", "hessid": bssid, "wpa": "2", 6237 "rsn_pairwise": "CCMP", "wpa_key_mgmt": "WPA-EAP", 6238 "ieee80211w": "1", "ieee8021x": "1", 6239 "auth_server_addr": "127.0.0.1", "auth_server_port": "1812", 6240 "auth_server_shared_secret": "radius", "interworking": "1", 6241 "access_network_type": "14", "internet": "1", "asra": "0", 6242 "esr": "0", "uesa": "0", "venue_group": "7", "venue_type": "1", 6243 "venue_name": ["eng:Example venue", "fin:Esimerkkipaikka"], 6244 "roaming_consortium": ["112233", "1020304050", "010203040506", 6245 "fedcba"], "domain_name": "example.com,another.example.com", 6246 "nai_realm": ["0,example.com,13[5:6],21[2:4][5:7]", 6247 "0,another.example.com"], "hs20": "1", 6248 "hs20_wan_metrics": "01:8000:1000:80:240:3000", 6249 "hs20_conn_capab": ["1:0:2", "6:22:1", "17:5060:0"], 6250 "hs20_operating_class": "5173", "anqp_3gpp_cell_net": "244,91", 6251 "hs20_t_c_filename": "terms-and-conditions", 6252 "hs20_t_c_timestamp": "123456789"} 6253 6254 hapd = hostapd.add_ap(apdev[0], params) 6255 6256 class TestDbusInterworking(TestDbus): 6257 def __init__(self, bus): 6258 TestDbus.__init__(self, bus) 6259 self.hs20_t_and_c_seen = False 6260 6261 def __enter__(self): 6262 gobject.timeout_add(1, self.run_connect) 6263 gobject.timeout_add(15000, self.timeout) 6264 self.add_signal(self.hs20TermsAndConditions, WPAS_DBUS_IFACE, 6265 "HS20TermsAndConditions") 6266 self.loop.run() 6267 return self 6268 6269 def hs20TermsAndConditions(self, t_c_url): 6270 logger.debug("hs20TermsAndConditions: url=%s" % (t_c_url)) 6271 url = "https://example.com/t_and_c?addr=%s&ap=123" % dev[0].own_addr() 6272 if url in t_c_url: 6273 self.hs20_t_and_c_seen = True 6274 6275 def run_connect(self, *args): 6276 dev[0].hs20_enable() 6277 dev[0].connect("test-hs20", proto="RSN", key_mgmt="WPA-EAP", eap="TTLS", 6278 identity="hs20-t-c-test", password="password", 6279 ca_cert="auth_serv/ca.pem", phase2="auth=MSCHAPV2", 6280 ieee80211w='2', scan_freq="2412") 6281 return False 6282 6283 def success(self): 6284 return self.hs20_t_and_c_seen 6285 6286 with TestDbusInterworking(bus) as t: 6287 if not t.success(): 6288 raise Exception("Expected signals not seen") 6289 6290def test_dbus_anqp_get(dev, apdev): 6291 """D-Bus ANQP get test""" 6292 (bus, wpa_obj, path, if_obj) = prepare_dbus(dev[0]) 6293 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 6294 6295 bssid = apdev[0]['bssid'] 6296 params = hs20_ap_params(ssid="test-anqp") 6297 params["hessid"] = bssid 6298 params['mbo'] = '1' 6299 params['mbo_cell_data_conn_pref'] = '1' 6300 params['hs20_oper_friendly_name'] = ["eng:Example operator", 6301 "fin:Esimerkkioperaattori"] 6302 hapd = hostapd.add_ap(apdev[0], params) 6303 6304 dev[0].flush_scan_cache() 6305 dev[0].scan_for_bss(bssid, freq="2412", force_scan=True) 6306 iface.ANQPGet({"addr": bssid, 6307 "ids": dbus.Array([257], dbus.Signature("q")), 6308 "mbo_ids": dbus.Array([2], dbus.Signature("y")), 6309 "hs20_ids": dbus.Array([3, 4], dbus.Signature("y"))}) 6310 6311 ev = dev[0].wait_event(["GAS-QUERY-DONE"], timeout=10) 6312 if ev is None: 6313 raise Exception("GAS query timed out") 6314 6315 ev = dev[0].wait_event(["RX-ANQP"], timeout=1) 6316 if ev is None or "ANQP Capability list" not in ev: 6317 raise Exception("Did not receive Capability list") 6318 6319 ev = dev[0].wait_event(["RX-HS20-ANQP"], timeout=1) 6320 if ev is None or "Operator Friendly Name" not in ev: 6321 raise Exception("Did not receive Operator Friendly Name") 6322 6323 ev = dev[0].wait_event(["RX-MBO-ANQP"], timeout=1) 6324 if ev is None or "cell_conn_pref" not in ev: 6325 raise Exception("Did not receive MBO Cellular Data Connection Preference") 6326 6327 bss = dev[0].get_bss(bssid) 6328 if 'anqp_capability_list' not in bss: 6329 raise Exception("Capability List ANQP-element not seen") 6330 6331def test_dbus_anqp_query_done(dev, apdev): 6332 """D-Bus ANQP get test""" 6333 (bus, wpa_obj, path, if_obj) = prepare_dbus(dev[0]) 6334 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 6335 6336 bssid = apdev[0]['bssid'] 6337 params = hs20_ap_params(ssid="test-anqp") 6338 params["hessid"] = bssid 6339 params['mbo'] = '1' 6340 params['mbo_cell_data_conn_pref'] = '1' 6341 params['hs20_oper_friendly_name'] = ["eng:Example operator", 6342 "fin:Esimerkkioperaattori"] 6343 hapd = hostapd.add_ap(apdev[0], params) 6344 6345 class TestDbusANQPGet(TestDbus): 6346 def __init__(self, bus): 6347 TestDbus.__init__(self, bus) 6348 self.anqp_query_done = False 6349 6350 def __enter__(self): 6351 gobject.timeout_add(1, self.run_query) 6352 gobject.timeout_add(15000, self.timeout) 6353 self.add_signal(self.anqpQueryDone, WPAS_DBUS_IFACE, 6354 "ANQPQueryDone") 6355 self.loop.run() 6356 return self 6357 6358 def anqpQueryDone(self, addr, result): 6359 logger.debug("anqpQueryDone: addr=%s result=%s" % (addr, result)) 6360 if addr == bssid and "SUCCESS" in result: 6361 self.anqp_query_done = True 6362 6363 def run_query(self, *args): 6364 dev[0].scan_for_bss(bssid, freq="2412", force_scan=True) 6365 iface.ANQPGet({"addr": bssid, 6366 "ids": dbus.Array([257], dbus.Signature("q"))}) 6367 return False 6368 6369 def success(self): 6370 return self.anqp_query_done 6371 6372 with TestDbusANQPGet(bus) as t: 6373 if not t.success(): 6374 raise Exception("Expected signals not seen") 6375 6376def test_dbus_bss_anqp_properties(dev, apdev): 6377 """D-Bus ANQP BSS properties changed""" 6378 (bus, wpa_obj, path, if_obj) = prepare_dbus(dev[0]) 6379 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 6380 6381 bssid = apdev[0]['bssid'] 6382 params = hs20_ap_params(ssid="test-anqp") 6383 params["hessid"] = bssid 6384 params['mbo'] = '1' 6385 params['mbo_cell_data_conn_pref'] = '1' 6386 params['hs20_oper_friendly_name'] = ["eng:Example operator", 6387 "fin:Esimerkkioperaattori"] 6388 hapd = hostapd.add_ap(apdev[0], params) 6389 6390 class TestDbusANQPBSSPropertiesChanged(TestDbus): 6391 def __init__(self, bus): 6392 TestDbus.__init__(self, bus) 6393 self.capability_list = False 6394 self.venue_name = False 6395 self.roaming_consortium = False 6396 self.nai_realm = False 6397 6398 def __enter__(self): 6399 gobject.timeout_add(1, self.run_query) 6400 gobject.timeout_add(15000, self.timeout) 6401 self.add_signal(self.propertiesChanged, WPAS_DBUS_BSS, 6402 "PropertiesChanged") 6403 self.loop.run() 6404 return self 6405 6406 def propertiesChanged(self, properties): 6407 logger.debug("propertiesChanged: %s" % str(properties)) 6408 if 'ANQP' in properties: 6409 anqp_properties = properties['ANQP'] 6410 self.capability_list = 'CapabilityList' in anqp_properties 6411 self.venue_name = 'VenueName' in anqp_properties 6412 self.roaming_consortium = 'RoamingConsortium' in anqp_properties 6413 self.nai_realm = 'NAIRealm' in anqp_properties 6414 6415 def run_query(self, *args): 6416 dev[0].scan_for_bss(bssid, freq="2412", force_scan=True) 6417 iface.ANQPGet({"addr": bssid, 6418 "ids": dbus.Array([257,258,261,263], dbus.Signature("q"))}) 6419 return False 6420 6421 def success(self): 6422 return self.capability_list and self.venue_name and self.roaming_consortium and self.nai_realm 6423 6424 with TestDbusANQPBSSPropertiesChanged(bus) as t: 6425 if not t.success(): 6426 raise Exception("Expected signals not seen") 6427