1# wpa_supplicant D-Bus interface tests 2# Copyright (c) 2014-2015, Jouni Malinen <j@w1.fi> 3# 4# This software may be distributed under the terms of the BSD license. 5# See README for more details. 6 7import binascii 8import logging 9logger = logging.getLogger() 10import subprocess 11import time 12import shutil 13import struct 14import sys 15 16try: 17 if sys.version_info[0] > 2: 18 from gi.repository import GObject as gobject 19 else: 20 import gobject 21 import dbus 22 dbus_imported = True 23except ImportError: 24 dbus_imported = False 25 26import hostapd 27from wpasupplicant import WpaSupplicant 28from utils import * 29from p2p_utils import * 30from test_ap_tdls import connect_2sta_open 31from test_ap_eap import check_altsubject_match_support 32from test_nfc_p2p import set_ip_addr_info 33from test_wpas_mesh import check_mesh_support, add_open_mesh_network 34 35WPAS_DBUS_SERVICE = "fi.w1.wpa_supplicant1" 36WPAS_DBUS_PATH = "/fi/w1/wpa_supplicant1" 37WPAS_DBUS_IFACE = "fi.w1.wpa_supplicant1.Interface" 38WPAS_DBUS_IFACE_WPS = WPAS_DBUS_IFACE + ".WPS" 39WPAS_DBUS_NETWORK = "fi.w1.wpa_supplicant1.Network" 40WPAS_DBUS_BSS = "fi.w1.wpa_supplicant1.BSS" 41WPAS_DBUS_IFACE_P2PDEVICE = WPAS_DBUS_IFACE + ".P2PDevice" 42WPAS_DBUS_P2P_PEER = "fi.w1.wpa_supplicant1.Peer" 43WPAS_DBUS_GROUP = "fi.w1.wpa_supplicant1.Group" 44WPAS_DBUS_PERSISTENT_GROUP = "fi.w1.wpa_supplicant1.PersistentGroup" 45WPAS_DBUS_IFACE_MESH = WPAS_DBUS_IFACE + ".Mesh" 46 47def prepare_dbus(dev): 48 if not dbus_imported: 49 logger.info("No dbus module available") 50 raise HwsimSkip("No dbus module available") 51 try: 52 from dbus.mainloop.glib import DBusGMainLoop 53 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) 54 bus = dbus.SystemBus() 55 wpas_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH) 56 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE) 57 path = wpas.GetInterface(dev.ifname) 58 if_obj = bus.get_object(WPAS_DBUS_SERVICE, path) 59 return (bus, wpas_obj, path, if_obj) 60 except Exception as e: 61 raise HwsimSkip("Could not connect to D-Bus: %s" % e) 62 63class TestDbus(object): 64 def __init__(self, bus): 65 self.loop = gobject.MainLoop() 66 self.signals = [] 67 self.bus = bus 68 69 def __exit__(self, type, value, traceback): 70 for s in self.signals: 71 s.remove() 72 73 def add_signal(self, handler, interface, name, byte_arrays=False): 74 s = self.bus.add_signal_receiver(handler, dbus_interface=interface, 75 signal_name=name, 76 byte_arrays=byte_arrays) 77 self.signals.append(s) 78 79 def timeout(self, *args): 80 logger.debug("timeout") 81 self.loop.quit() 82 return False 83 84class alloc_fail_dbus(object): 85 def __init__(self, dev, count, funcs, operation="Operation", 86 expected="NoMemory"): 87 self._dev = dev 88 self._count = count 89 self._funcs = funcs 90 self._operation = operation 91 self._expected = expected 92 def __enter__(self): 93 cmd = "TEST_ALLOC_FAIL %d:%s" % (self._count, self._funcs) 94 if "OK" not in self._dev.request(cmd): 95 raise HwsimSkip("TEST_ALLOC_FAIL not supported") 96 def __exit__(self, type, value, traceback): 97 if type is None: 98 raise Exception("%s succeeded during out-of-memory" % self._operation) 99 if type == dbus.exceptions.DBusException and self._expected in str(value): 100 return True 101 if self._dev.request("GET_ALLOC_FAIL") != "0:%s" % self._funcs: 102 raise Exception("%s did not trigger allocation failure" % self._operation) 103 return False 104 105def start_ap(ap, ssid="test-wps", 106 ap_uuid="27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"): 107 params = {"ssid": ssid, "eap_server": "1", "wps_state": "2", 108 "wpa_passphrase": "12345678", "wpa": "2", 109 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", 110 "ap_pin": "12345670", "uuid": ap_uuid} 111 return hostapd.add_ap(ap, params) 112 113def test_dbus_getall(dev, apdev): 114 """D-Bus GetAll""" 115 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 116 117 dev[0].flush_scan_cache() 118 119 props = wpas_obj.GetAll(WPAS_DBUS_SERVICE, 120 dbus_interface=dbus.PROPERTIES_IFACE) 121 logger.debug("GetAll(fi.w1.wpa.supplicant1, /fi/w1/wpa_supplicant1) ==> " + str(props)) 122 123 props = if_obj.GetAll(WPAS_DBUS_IFACE, 124 dbus_interface=dbus.PROPERTIES_IFACE) 125 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE, path, str(props))) 126 127 props = if_obj.GetAll(WPAS_DBUS_IFACE_WPS, 128 dbus_interface=dbus.PROPERTIES_IFACE) 129 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE_WPS, path, str(props))) 130 131 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs', 132 dbus_interface=dbus.PROPERTIES_IFACE) 133 if len(res) != 0: 134 raise Exception("Unexpected BSSs entry: " + str(res)) 135 136 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks', 137 dbus_interface=dbus.PROPERTIES_IFACE) 138 if len(res) != 0: 139 raise Exception("Unexpected Networks entry: " + str(res)) 140 141 hapd = hostapd.add_ap(apdev[0], {"ssid": "open"}) 142 bssid = apdev[0]['bssid'] 143 dev[0].scan_for_bss(bssid, freq=2412) 144 id = dev[0].add_network() 145 dev[0].set_network(id, "disabled", "0") 146 dev[0].set_network_quoted(id, "ssid", "test") 147 148 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs', 149 dbus_interface=dbus.PROPERTIES_IFACE) 150 if len(res) < 1: 151 raise Exception("Missing BSSs entry: " + str(res)) 152 if len(res) > 1: 153 raise Exception("Too manu BSSs entries: " + str(res)) 154 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0]) 155 props = bss_obj.GetAll(WPAS_DBUS_BSS, dbus_interface=dbus.PROPERTIES_IFACE) 156 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS, res[0], str(props))) 157 bssid_str = '' 158 for item in props['BSSID']: 159 if len(bssid_str) > 0: 160 bssid_str += ':' 161 bssid_str += '%02x' % item 162 if bssid_str != bssid: 163 raise Exception("Unexpected BSSID in BSSs entry") 164 165 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks', 166 dbus_interface=dbus.PROPERTIES_IFACE) 167 if len(res) != 1: 168 raise Exception("Missing Networks entry: " + str(res)) 169 net_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0]) 170 props = net_obj.GetAll(WPAS_DBUS_NETWORK, 171 dbus_interface=dbus.PROPERTIES_IFACE) 172 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_NETWORK, res[0], str(props))) 173 ssid = props['Properties']['ssid'] 174 if ssid != '"test"': 175 raise Exception("Unexpected SSID in network entry") 176 177def test_dbus_getall_oom(dev, apdev): 178 """D-Bus GetAll wpa_config_get_all() OOM""" 179 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 180 181 id = dev[0].add_network() 182 dev[0].set_network(id, "disabled", "0") 183 dev[0].set_network_quoted(id, "ssid", "test") 184 185 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks', 186 dbus_interface=dbus.PROPERTIES_IFACE) 187 if len(res) != 1: 188 raise Exception("Missing Networks entry: " + str(res)) 189 net_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0]) 190 for i in range(1, 50): 191 with alloc_fail(dev[0], i, "wpa_config_get_all"): 192 try: 193 props = net_obj.GetAll(WPAS_DBUS_NETWORK, 194 dbus_interface=dbus.PROPERTIES_IFACE) 195 except dbus.exceptions.DBusException as e: 196 pass 197 198def dbus_get(dbus, wpas_obj, prop, expect=None, byte_arrays=False): 199 val = wpas_obj.Get(WPAS_DBUS_SERVICE, prop, 200 dbus_interface=dbus.PROPERTIES_IFACE, 201 byte_arrays=byte_arrays) 202 if expect is not None and val != expect: 203 raise Exception("Unexpected %s: %s (expected: %s)" % 204 (prop, str(val), str(expect))) 205 return val 206 207def dbus_set(dbus, wpas_obj, prop, val): 208 wpas_obj.Set(WPAS_DBUS_SERVICE, prop, val, 209 dbus_interface=dbus.PROPERTIES_IFACE) 210 211def test_dbus_properties(dev, apdev): 212 """D-Bus Get/Set fi.w1.wpa_supplicant1 properties""" 213 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 214 215 dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump") 216 dbus_set(dbus, wpas_obj, "DebugLevel", "debug") 217 dbus_get(dbus, wpas_obj, "DebugLevel", expect="debug") 218 for (val, err) in [(3, "Error.Failed: wrong property type"), 219 ("foo", "Error.Failed: wrong debug level value")]: 220 try: 221 dbus_set(dbus, wpas_obj, "DebugLevel", val) 222 raise Exception("Invalid DebugLevel value accepted: " + str(val)) 223 except dbus.exceptions.DBusException as e: 224 if err not in str(e): 225 raise Exception("Unexpected error message: " + str(e)) 226 dbus_set(dbus, wpas_obj, "DebugLevel", "msgdump") 227 dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump") 228 229 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True) 230 dbus_set(dbus, wpas_obj, "DebugTimestamp", False) 231 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=False) 232 try: 233 dbus_set(dbus, wpas_obj, "DebugTimestamp", "foo") 234 raise Exception("Invalid DebugTimestamp value accepted") 235 except dbus.exceptions.DBusException as e: 236 if "Error.Failed: wrong property type" not in str(e): 237 raise Exception("Unexpected error message: " + str(e)) 238 dbus_set(dbus, wpas_obj, "DebugTimestamp", True) 239 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True) 240 241 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True) 242 dbus_set(dbus, wpas_obj, "DebugShowKeys", False) 243 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=False) 244 try: 245 dbus_set(dbus, wpas_obj, "DebugShowKeys", "foo") 246 raise Exception("Invalid DebugShowKeys value accepted") 247 except dbus.exceptions.DBusException as e: 248 if "Error.Failed: wrong property type" not in str(e): 249 raise Exception("Unexpected error message: " + str(e)) 250 dbus_set(dbus, wpas_obj, "DebugShowKeys", True) 251 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True) 252 253 res = dbus_get(dbus, wpas_obj, "Interfaces") 254 if len(res) != 1: 255 raise Exception("Unexpected Interfaces value: " + str(res)) 256 257 res = dbus_get(dbus, wpas_obj, "EapMethods") 258 if len(res) < 5 or "TTLS" not in res: 259 raise Exception("Unexpected EapMethods value: " + str(res)) 260 261 res = dbus_get(dbus, wpas_obj, "Capabilities") 262 if len(res) < 2 or "p2p" not in res: 263 raise Exception("Unexpected Capabilities value: " + str(res)) 264 265 dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True) 266 val = binascii.unhexlify("010006020304050608") 267 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val)) 268 res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True) 269 if val != res: 270 raise Exception("WFDIEs value changed") 271 try: 272 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(b'\x00')) 273 raise Exception("Invalid WFDIEs value accepted") 274 except dbus.exceptions.DBusException as e: 275 if "InvalidArgs" not in str(e): 276 raise Exception("Unexpected error message: " + str(e)) 277 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(b'')) 278 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val)) 279 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(b'')) 280 res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True) 281 if len(res) != 0: 282 raise Exception("WFDIEs not cleared properly") 283 284 res = dbus_get(dbus, wpas_obj, "EapMethods") 285 try: 286 dbus_set(dbus, wpas_obj, "EapMethods", res) 287 raise Exception("Invalid Set accepted") 288 except dbus.exceptions.DBusException as e: 289 if "InvalidArgs: Property is read-only" not in str(e): 290 raise Exception("Unexpected error message: " + str(e)) 291 292 try: 293 wpas_obj.SetFoo(WPAS_DBUS_SERVICE, "DebugShowKeys", True, 294 dbus_interface=dbus.PROPERTIES_IFACE) 295 raise Exception("Unknown method accepted") 296 except dbus.exceptions.DBusException as e: 297 if "UnknownMethod" not in str(e): 298 raise Exception("Unexpected error message: " + str(e)) 299 300 try: 301 wpas_obj.Get("foo", "DebugShowKeys", 302 dbus_interface=dbus.PROPERTIES_IFACE) 303 raise Exception("Invalid Get accepted") 304 except dbus.exceptions.DBusException as e: 305 if "InvalidArgs: No such property" not in str(e): 306 raise Exception("Unexpected error message: " + str(e)) 307 308 test_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH, 309 introspect=False) 310 try: 311 test_obj.Get(123, "DebugShowKeys", 312 dbus_interface=dbus.PROPERTIES_IFACE) 313 raise Exception("Invalid Get accepted") 314 except dbus.exceptions.DBusException as e: 315 if "InvalidArgs: Invalid arguments" not in str(e): 316 raise Exception("Unexpected error message: " + str(e)) 317 try: 318 test_obj.Get(WPAS_DBUS_SERVICE, 123, 319 dbus_interface=dbus.PROPERTIES_IFACE) 320 raise Exception("Invalid Get accepted") 321 except dbus.exceptions.DBusException as e: 322 if "InvalidArgs: Invalid arguments" not in str(e): 323 raise Exception("Unexpected error message: " + str(e)) 324 325 try: 326 wpas_obj.Set(WPAS_DBUS_SERVICE, "WFDIEs", 327 dbus.ByteArray(b'', variant_level=2), 328 dbus_interface=dbus.PROPERTIES_IFACE) 329 raise Exception("Invalid Set accepted") 330 except dbus.exceptions.DBusException as e: 331 if "InvalidArgs: invalid message format" not in str(e): 332 raise Exception("Unexpected error message: " + str(e)) 333 334def test_dbus_set_global_properties(dev, apdev): 335 """D-Bus Get/Set fi.w1.wpa_supplicant1 interface global properties""" 336 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 337 338 dev[0].set("model_name", "") 339 props = [('Okc', '0', '1'), ('ModelName', '', 'blahblahblah')] 340 341 for p in props: 342 res = if_obj.Get(WPAS_DBUS_IFACE, p[0], 343 dbus_interface=dbus.PROPERTIES_IFACE) 344 if res != p[1]: 345 raise Exception("Unexpected " + p[0] + " value: " + str(res)) 346 347 if_obj.Set(WPAS_DBUS_IFACE, p[0], p[2], 348 dbus_interface=dbus.PROPERTIES_IFACE) 349 350 res = if_obj.Get(WPAS_DBUS_IFACE, p[0], 351 dbus_interface=dbus.PROPERTIES_IFACE) 352 if res != p[2]: 353 raise Exception("Unexpected " + p[0] + " value after set: " + str(res)) 354 dev[0].set("model_name", "") 355 356def test_dbus_invalid_method(dev, apdev): 357 """D-Bus invalid method""" 358 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 359 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS) 360 361 try: 362 wps.Foo() 363 raise Exception("Unknown method accepted") 364 except dbus.exceptions.DBusException as e: 365 if "UnknownMethod" not in str(e): 366 raise Exception("Unexpected error message: " + str(e)) 367 368 test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False) 369 test_wps = dbus.Interface(test_obj, WPAS_DBUS_IFACE_WPS) 370 try: 371 test_wps.Start(123) 372 raise Exception("WPS.Start with incorrect signature accepted") 373 except dbus.exceptions.DBusException as e: 374 if "InvalidArgs: Invalid arg" not in str(e): 375 raise Exception("Unexpected error message: " + str(e)) 376 377def test_dbus_get_set_wps(dev, apdev): 378 """D-Bus Get/Set for WPS properties""" 379 try: 380 _test_dbus_get_set_wps(dev, apdev) 381 finally: 382 dev[0].request("SET wps_cred_processing 0") 383 dev[0].request("SET config_methods display keypad virtual_display nfc_interface p2ps") 384 dev[0].set("device_name", "Device A") 385 dev[0].set("manufacturer", "") 386 dev[0].set("model_name", "") 387 dev[0].set("model_number", "") 388 dev[0].set("serial_number", "") 389 dev[0].set("device_type", "0-00000000-0") 390 391def _test_dbus_get_set_wps(dev, apdev): 392 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 393 394 if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods", 395 dbus_interface=dbus.PROPERTIES_IFACE) 396 397 val = "display keypad virtual_display nfc_interface" 398 dev[0].request("SET config_methods " + val) 399 400 config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods", 401 dbus_interface=dbus.PROPERTIES_IFACE) 402 if config != val: 403 raise Exception("Unexpected Get(ConfigMethods) result: " + config) 404 405 val2 = "push_button display" 406 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2, 407 dbus_interface=dbus.PROPERTIES_IFACE) 408 config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods", 409 dbus_interface=dbus.PROPERTIES_IFACE) 410 if config != val2: 411 raise Exception("Unexpected Get(ConfigMethods) result after Set: " + config) 412 413 dev[0].request("SET config_methods " + val) 414 415 for i in range(3): 416 dev[0].request("SET wps_cred_processing " + str(i)) 417 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials", 418 dbus_interface=dbus.PROPERTIES_IFACE) 419 expected_val = False if i == 1 else True 420 if val != expected_val: 421 raise Exception("Unexpected Get(ProcessCredentials) result({}): {}".format(i, val)) 422 423 tests = [("device_name", "DeviceName"), 424 ("manufacturer", "Manufacturer"), 425 ("model_name", "ModelName"), 426 ("model_number", "ModelNumber"), 427 ("serial_number", "SerialNumber")] 428 429 for f1, f2 in tests: 430 val2 = "test-value-test" 431 dev[0].set(f1, val2) 432 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, f2, 433 dbus_interface=dbus.PROPERTIES_IFACE) 434 if val != val2: 435 raise Exception("Get(%s) returned unexpected value" % f2) 436 val2 = "TEST-value" 437 if_obj.Set(WPAS_DBUS_IFACE_WPS, f2, val2, 438 dbus_interface=dbus.PROPERTIES_IFACE) 439 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, f2, 440 dbus_interface=dbus.PROPERTIES_IFACE) 441 if val != val2: 442 raise Exception("Get(%s) returned unexpected value after Set" % f2) 443 444 dev[0].set("device_type", "5-0050F204-1") 445 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "DeviceType", 446 dbus_interface=dbus.PROPERTIES_IFACE) 447 if val[0] != 0x00 or val[1] != 0x05 != val[2] != 0x00 or val[3] != 0x50 or val[4] != 0xf2 or val[5] != 0x04 or val[6] != 0x00 or val[7] != 0x01: 448 raise Exception("DeviceType mismatch") 449 if_obj.Set(WPAS_DBUS_IFACE_WPS, "DeviceType", val, 450 dbus_interface=dbus.PROPERTIES_IFACE) 451 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "DeviceType", 452 dbus_interface=dbus.PROPERTIES_IFACE) 453 if val[0] != 0x00 or val[1] != 0x05 != val[2] != 0x00 or val[3] != 0x50 or val[4] != 0xf2 or val[5] != 0x04 or val[6] != 0x00 or val[7] != 0x01: 454 raise Exception("DeviceType mismatch after Set") 455 456 val2 = b'\x01\x02\x03\x04\x05\x06\x07\x08' 457 if_obj.Set(WPAS_DBUS_IFACE_WPS, "DeviceType", dbus.ByteArray(val2), 458 dbus_interface=dbus.PROPERTIES_IFACE) 459 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "DeviceType", 460 dbus_interface=dbus.PROPERTIES_IFACE, 461 byte_arrays=True) 462 if val != val2: 463 raise Exception("DeviceType mismatch after Set (2)") 464 465 class TestDbusGetSet(TestDbus): 466 def __init__(self, bus): 467 TestDbus.__init__(self, bus) 468 self.signal_received = False 469 self.signal_received_deprecated = False 470 self.sets_done = False 471 472 def __enter__(self): 473 gobject.timeout_add(1, self.run_sets) 474 gobject.timeout_add(1000, self.timeout) 475 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE_WPS, 476 "PropertiesChanged") 477 self.add_signal(self.propertiesChanged2, dbus.PROPERTIES_IFACE, 478 "PropertiesChanged") 479 self.loop.run() 480 return self 481 482 def propertiesChanged(self, properties): 483 logger.debug("PropertiesChanged: " + str(properties)) 484 if "ProcessCredentials" in properties: 485 self.signal_received_deprecated = True 486 if self.sets_done and self.signal_received: 487 self.loop.quit() 488 489 def propertiesChanged2(self, interface_name, changed_properties, 490 invalidated_properties): 491 logger.debug("propertiesChanged2: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties))) 492 if interface_name != WPAS_DBUS_IFACE_WPS: 493 return 494 if "ProcessCredentials" in changed_properties: 495 self.signal_received = True 496 if self.sets_done and self.signal_received_deprecated: 497 self.loop.quit() 498 499 def run_sets(self, *args): 500 logger.debug("run_sets") 501 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials", 502 dbus.Boolean(1), 503 dbus_interface=dbus.PROPERTIES_IFACE) 504 if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials", 505 dbus_interface=dbus.PROPERTIES_IFACE) != True: 506 raise Exception("Unexpected Get(ProcessCredentials) result after Set") 507 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials", 508 dbus.Boolean(0), 509 dbus_interface=dbus.PROPERTIES_IFACE) 510 if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials", 511 dbus_interface=dbus.PROPERTIES_IFACE) != False: 512 raise Exception("Unexpected Get(ProcessCredentials) result after Set") 513 514 self.dbus_sets_done = True 515 return False 516 517 def success(self): 518 return self.signal_received and self.signal_received_deprecated 519 520 with TestDbusGetSet(bus) as t: 521 if not t.success(): 522 raise Exception("No signal received for ProcessCredentials change") 523 524def test_dbus_wps_invalid(dev, apdev): 525 """D-Bus invaldi WPS operation""" 526 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 527 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS) 528 529 failures = [{'Role': 'foo', 'Type': 'pbc'}, 530 {'Role': 123, 'Type': 'pbc'}, 531 {'Type': 'pbc'}, 532 {'Role': 'enrollee'}, 533 {'Role': 'registrar'}, 534 {'Role': 'enrollee', 'Type': 123}, 535 {'Role': 'enrollee', 'Type': 'foo'}, 536 {'Role': 'enrollee', 'Type': 'pbc', 537 'Bssid': '02:33:44:55:66:77'}, 538 {'Role': 'enrollee', 'Type': 'pin', 'Pin': 123}, 539 {'Role': 'enrollee', 'Type': 'pbc', 540 'Bssid': dbus.ByteArray(b'12345')}, 541 {'Role': 'enrollee', 'Type': 'pbc', 542 'P2PDeviceAddress': 12345}, 543 {'Role': 'enrollee', 'Type': 'pbc', 544 'P2PDeviceAddress': dbus.ByteArray(b'12345')}, 545 {'Role': 'enrollee', 'Type': 'pbc', 'Foo': 'bar'}] 546 for args in failures: 547 try: 548 wps.Start(args) 549 raise Exception("Invalid WPS.Start() arguments accepted: " + str(args)) 550 except dbus.exceptions.DBusException as e: 551 if not str(e).startswith("fi.w1.wpa_supplicant1.InvalidArgs"): 552 raise Exception("Unexpected error message: " + str(e)) 553 554def test_dbus_wps_oom(dev, apdev): 555 """D-Bus WPS operation (OOM)""" 556 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 557 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS) 558 559 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_state", "Get"): 560 if_obj.Get(WPAS_DBUS_IFACE, "State", 561 dbus_interface=dbus.PROPERTIES_IFACE) 562 563 hapd = hostapd.add_ap(apdev[0], {"ssid": "open"}) 564 bssid = apdev[0]['bssid'] 565 dev[0].scan_for_bss(bssid, freq=2412) 566 567 time.sleep(0.05) 568 for i in range(1, 3): 569 with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_bsss", "Get"): 570 if_obj.Get(WPAS_DBUS_IFACE, "BSSs", 571 dbus_interface=dbus.PROPERTIES_IFACE) 572 573 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs', 574 dbus_interface=dbus.PROPERTIES_IFACE) 575 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0]) 576 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_bss_rates", "Get"): 577 bss_obj.Get(WPAS_DBUS_BSS, "Rates", 578 dbus_interface=dbus.PROPERTIES_IFACE) 579 with alloc_fail(dev[0], 1, 580 "wpa_bss_get_bit_rates;wpas_dbus_getter_bss_rates"): 581 try: 582 bss_obj.Get(WPAS_DBUS_BSS, "Rates", 583 dbus_interface=dbus.PROPERTIES_IFACE) 584 except dbus.exceptions.DBusException as e: 585 pass 586 587 id = dev[0].add_network() 588 dev[0].set_network(id, "disabled", "0") 589 dev[0].set_network_quoted(id, "ssid", "test") 590 591 for i in range(1, 3): 592 with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_networks", "Get"): 593 if_obj.Get(WPAS_DBUS_IFACE, "Networks", 594 dbus_interface=dbus.PROPERTIES_IFACE) 595 596 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_getter_interfaces", "Get"): 597 dbus_get(dbus, wpas_obj, "Interfaces") 598 599 for i in range(1, 6): 600 with alloc_fail_dbus(dev[0], i, "=eap_get_names_as_string_array;wpas_dbus_getter_eap_methods", "Get"): 601 dbus_get(dbus, wpas_obj, "EapMethods") 602 603 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_setter_config_methods", "Set", 604 expected="Error.Failed: Failed to set property"): 605 val2 = "push_button display" 606 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2, 607 dbus_interface=dbus.PROPERTIES_IFACE) 608 609 with alloc_fail_dbus(dev[0], 1, "=wpa_config_add_network;wpas_dbus_handler_wps_start", 610 "WPS.Start", 611 expected="UnknownError: WPS start failed"): 612 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670'}) 613 614def test_dbus_wps_pbc(dev, apdev): 615 """D-Bus WPS/PBC operation and signals""" 616 try: 617 _test_dbus_wps_pbc(dev, apdev) 618 finally: 619 dev[0].request("SET wps_cred_processing 0") 620 621def _test_dbus_wps_pbc(dev, apdev): 622 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 623 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS) 624 625 hapd = start_ap(apdev[0]) 626 hapd.request("WPS_PBC") 627 bssid = apdev[0]['bssid'] 628 dev[0].flush_scan_cache() 629 dev[0].scan_for_bss(bssid, freq="2412") 630 dev[0].request("SET wps_cred_processing 2") 631 632 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs', 633 dbus_interface=dbus.PROPERTIES_IFACE) 634 if len(res) != 1: 635 raise Exception("Missing BSSs entry: " + str(res)) 636 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0]) 637 props = bss_obj.GetAll(WPAS_DBUS_BSS, dbus_interface=dbus.PROPERTIES_IFACE) 638 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS, res[0], str(props))) 639 if 'WPS' not in props: 640 raise Exception("No WPS information in the BSS entry") 641 if 'Type' not in props['WPS']: 642 raise Exception("No Type field in the WPS dictionary") 643 if props['WPS']['Type'] != 'pbc': 644 raise Exception("Unexpected WPS Type: " + props['WPS']['Type']) 645 646 class TestDbusWps(TestDbus): 647 def __init__(self, bus, wps): 648 TestDbus.__init__(self, bus) 649 self.success_seen = False 650 self.credentials_received = False 651 self.wps = wps 652 653 def __enter__(self): 654 gobject.timeout_add(1, self.start_pbc) 655 gobject.timeout_add(15000, self.timeout) 656 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event") 657 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS, 658 "Credentials") 659 self.loop.run() 660 return self 661 662 def wpsEvent(self, name, args): 663 logger.debug("wpsEvent: %s args='%s'" % (name, str(args))) 664 if name == "success": 665 self.success_seen = True 666 if self.credentials_received: 667 self.loop.quit() 668 669 def credentials(self, args): 670 logger.debug("credentials: " + str(args)) 671 self.credentials_received = True 672 if self.success_seen: 673 self.loop.quit() 674 675 def start_pbc(self, *args): 676 logger.debug("start_pbc") 677 self.wps.Start({'Role': 'enrollee', 'Type': 'pbc'}) 678 return False 679 680 def success(self): 681 return self.success_seen and self.credentials_received 682 683 with TestDbusWps(bus, wps) as t: 684 if not t.success(): 685 raise Exception("Failure in D-Bus operations") 686 687 dev[0].wait_connected(timeout=10) 688 dev[0].request("DISCONNECT") 689 hapd.disable() 690 dev[0].flush_scan_cache() 691 692def test_dbus_wps_pbc_overlap(dev, apdev): 693 """D-Bus WPS/PBC operation and signal for PBC overlap""" 694 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 695 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS) 696 697 hapd = start_ap(apdev[0]) 698 hapd2 = start_ap(apdev[1], ssid="test-wps2", 699 ap_uuid="27ea801a-9e5c-4e73-bd82-f89cbcd10d7f") 700 hapd.request("WPS_PBC") 701 hapd2.request("WPS_PBC") 702 bssid = apdev[0]['bssid'] 703 dev[0].scan_for_bss(bssid, freq="2412") 704 bssid2 = apdev[1]['bssid'] 705 dev[0].scan_for_bss(bssid2, freq="2412") 706 707 class TestDbusWps(TestDbus): 708 def __init__(self, bus, wps): 709 TestDbus.__init__(self, bus) 710 self.overlap_seen = False 711 self.wps = wps 712 713 def __enter__(self): 714 gobject.timeout_add(1, self.start_pbc) 715 gobject.timeout_add(15000, self.timeout) 716 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event") 717 self.loop.run() 718 return self 719 720 def wpsEvent(self, name, args): 721 logger.debug("wpsEvent: %s args='%s'" % (name, str(args))) 722 if name == "pbc-overlap": 723 self.overlap_seen = True 724 self.loop.quit() 725 726 def start_pbc(self, *args): 727 logger.debug("start_pbc") 728 self.wps.Start({'Role': 'enrollee', 'Type': 'pbc'}) 729 return False 730 731 def success(self): 732 return self.overlap_seen 733 734 with TestDbusWps(bus, wps) as t: 735 if not t.success(): 736 raise Exception("Failure in D-Bus operations") 737 738 dev[0].request("WPS_CANCEL") 739 dev[0].request("DISCONNECT") 740 hapd.disable() 741 dev[0].flush_scan_cache() 742 743def test_dbus_wps_pin(dev, apdev): 744 """D-Bus WPS/PIN operation and signals""" 745 try: 746 _test_dbus_wps_pin(dev, apdev) 747 finally: 748 dev[0].request("SET wps_cred_processing 0") 749 750def _test_dbus_wps_pin(dev, apdev): 751 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 752 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS) 753 754 hapd = start_ap(apdev[0]) 755 hapd.request("WPS_PIN any 12345670") 756 bssid = apdev[0]['bssid'] 757 dev[0].scan_for_bss(bssid, freq="2412") 758 dev[0].request("SET wps_cred_processing 2") 759 760 class TestDbusWps(TestDbus): 761 def __init__(self, bus): 762 TestDbus.__init__(self, bus) 763 self.success_seen = False 764 self.credentials_received = False 765 766 def __enter__(self): 767 gobject.timeout_add(1, self.start_pin) 768 gobject.timeout_add(15000, self.timeout) 769 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event") 770 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS, 771 "Credentials") 772 self.loop.run() 773 return self 774 775 def wpsEvent(self, name, args): 776 logger.debug("wpsEvent: %s args='%s'" % (name, str(args))) 777 if name == "success": 778 self.success_seen = True 779 if self.credentials_received: 780 self.loop.quit() 781 782 def credentials(self, args): 783 logger.debug("credentials: " + str(args)) 784 self.credentials_received = True 785 if self.success_seen: 786 self.loop.quit() 787 788 def start_pin(self, *args): 789 logger.debug("start_pin") 790 bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':', '').encode())) 791 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670', 792 'Bssid': bssid_ay}) 793 return False 794 795 def success(self): 796 return self.success_seen and self.credentials_received 797 798 with TestDbusWps(bus) as t: 799 if not t.success(): 800 raise Exception("Failure in D-Bus operations") 801 802 dev[0].wait_connected(timeout=10) 803 804def test_dbus_wps_pin2(dev, apdev): 805 """D-Bus WPS/PIN operation and signals (PIN from wpa_supplicant)""" 806 try: 807 _test_dbus_wps_pin2(dev, apdev) 808 finally: 809 dev[0].request("SET wps_cred_processing 0") 810 811def _test_dbus_wps_pin2(dev, apdev): 812 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 813 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS) 814 815 hapd = start_ap(apdev[0]) 816 bssid = apdev[0]['bssid'] 817 dev[0].scan_for_bss(bssid, freq="2412") 818 dev[0].request("SET wps_cred_processing 2") 819 820 class TestDbusWps(TestDbus): 821 def __init__(self, bus): 822 TestDbus.__init__(self, bus) 823 self.success_seen = False 824 self.failed = False 825 826 def __enter__(self): 827 gobject.timeout_add(1, self.start_pin) 828 gobject.timeout_add(15000, self.timeout) 829 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event") 830 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS, 831 "Credentials") 832 self.loop.run() 833 return self 834 835 def wpsEvent(self, name, args): 836 logger.debug("wpsEvent: %s args='%s'" % (name, str(args))) 837 if name == "success": 838 self.success_seen = True 839 if self.credentials_received: 840 self.loop.quit() 841 842 def credentials(self, args): 843 logger.debug("credentials: " + str(args)) 844 self.credentials_received = True 845 if self.success_seen: 846 self.loop.quit() 847 848 def start_pin(self, *args): 849 logger.debug("start_pin") 850 bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':', '').encode())) 851 res = wps.Start({'Role': 'enrollee', 'Type': 'pin', 852 'Bssid': bssid_ay}) 853 pin = res['Pin'] 854 h = hostapd.Hostapd(apdev[0]['ifname']) 855 h.request("WPS_PIN any " + pin) 856 return False 857 858 def success(self): 859 return self.success_seen and self.credentials_received 860 861 with TestDbusWps(bus) as t: 862 if not t.success(): 863 raise Exception("Failure in D-Bus operations") 864 865 dev[0].wait_connected(timeout=10) 866 867def test_dbus_wps_pin_m2d(dev, apdev): 868 """D-Bus WPS/PIN operation and signals with M2D""" 869 try: 870 _test_dbus_wps_pin_m2d(dev, apdev) 871 finally: 872 dev[0].request("SET wps_cred_processing 0") 873 874def _test_dbus_wps_pin_m2d(dev, apdev): 875 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 876 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS) 877 878 hapd = start_ap(apdev[0]) 879 bssid = apdev[0]['bssid'] 880 dev[0].scan_for_bss(bssid, freq="2412") 881 dev[0].request("SET wps_cred_processing 2") 882 883 class TestDbusWps(TestDbus): 884 def __init__(self, bus): 885 TestDbus.__init__(self, bus) 886 self.success_seen = False 887 self.credentials_received = False 888 889 def __enter__(self): 890 gobject.timeout_add(1, self.start_pin) 891 gobject.timeout_add(15000, self.timeout) 892 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event") 893 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS, 894 "Credentials") 895 self.loop.run() 896 return self 897 898 def wpsEvent(self, name, args): 899 logger.debug("wpsEvent: %s args='%s'" % (name, str(args))) 900 if name == "success": 901 self.success_seen = True 902 if self.credentials_received: 903 self.loop.quit() 904 elif name == "m2d": 905 h = hostapd.Hostapd(apdev[0]['ifname']) 906 h.request("WPS_PIN any 12345670") 907 908 def credentials(self, args): 909 logger.debug("credentials: " + str(args)) 910 self.credentials_received = True 911 if self.success_seen: 912 self.loop.quit() 913 914 def start_pin(self, *args): 915 logger.debug("start_pin") 916 bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':', '').encode())) 917 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670', 918 'Bssid': bssid_ay}) 919 return False 920 921 def success(self): 922 return self.success_seen and self.credentials_received 923 924 with TestDbusWps(bus) as t: 925 if not t.success(): 926 raise Exception("Failure in D-Bus operations") 927 928 dev[0].wait_connected(timeout=10) 929 930def test_dbus_wps_reg(dev, apdev): 931 """D-Bus WPS/Registrar operation and signals""" 932 try: 933 _test_dbus_wps_reg(dev, apdev) 934 finally: 935 dev[0].request("SET wps_cred_processing 0") 936 937def _test_dbus_wps_reg(dev, apdev): 938 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 939 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS) 940 941 hapd = start_ap(apdev[0]) 942 hapd.request("WPS_PIN any 12345670") 943 bssid = apdev[0]['bssid'] 944 dev[0].scan_for_bss(bssid, freq="2412") 945 dev[0].request("SET wps_cred_processing 2") 946 947 class TestDbusWps(TestDbus): 948 def __init__(self, bus): 949 TestDbus.__init__(self, bus) 950 self.credentials_received = False 951 952 def __enter__(self): 953 gobject.timeout_add(100, self.start_reg) 954 gobject.timeout_add(15000, self.timeout) 955 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event") 956 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS, 957 "Credentials") 958 self.loop.run() 959 return self 960 961 def wpsEvent(self, name, args): 962 logger.debug("wpsEvent: %s args='%s'" % (name, str(args))) 963 964 def credentials(self, args): 965 logger.debug("credentials: " + str(args)) 966 self.credentials_received = True 967 self.loop.quit() 968 969 def start_reg(self, *args): 970 logger.debug("start_reg") 971 bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':', '').encode())) 972 wps.Start({'Role': 'registrar', 'Type': 'pin', 973 'Pin': '12345670', 'Bssid': bssid_ay}) 974 return False 975 976 def success(self): 977 return self.credentials_received 978 979 with TestDbusWps(bus) as t: 980 if not t.success(): 981 raise Exception("Failure in D-Bus operations") 982 983 dev[0].wait_connected(timeout=10) 984 985def test_dbus_wps_cancel(dev, apdev): 986 """D-Bus WPS Cancel operation""" 987 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 988 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS) 989 990 hapd = start_ap(apdev[0]) 991 bssid = apdev[0]['bssid'] 992 993 wps.Cancel() 994 dev[0].scan_for_bss(bssid, freq="2412") 995 bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':', '').encode())) 996 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670', 997 'Bssid': bssid_ay}) 998 wps.Cancel() 999 dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 1) 1000 1001def test_dbus_scan_invalid(dev, apdev): 1002 """D-Bus invalid scan method""" 1003 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1004 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1005 1006 tests = [({}, "InvalidArgs"), 1007 ({'Type': 123}, "InvalidArgs"), 1008 ({'Type': 'foo'}, "InvalidArgs"), 1009 ({'Type': 'active', 'Foo': 'bar'}, "InvalidArgs"), 1010 ({'Type': 'active', 'SSIDs': 'foo'}, "InvalidArgs"), 1011 ({'Type': 'active', 'SSIDs': ['foo']}, "InvalidArgs"), 1012 ({'Type': 'active', 1013 'SSIDs': [dbus.ByteArray(b"1"), dbus.ByteArray(b"2"), 1014 dbus.ByteArray(b"3"), dbus.ByteArray(b"4"), 1015 dbus.ByteArray(b"5"), dbus.ByteArray(b"6"), 1016 dbus.ByteArray(b"7"), dbus.ByteArray(b"8"), 1017 dbus.ByteArray(b"9"), dbus.ByteArray(b"10"), 1018 dbus.ByteArray(b"11"), dbus.ByteArray(b"12"), 1019 dbus.ByteArray(b"13"), dbus.ByteArray(b"14"), 1020 dbus.ByteArray(b"15"), dbus.ByteArray(b"16"), 1021 dbus.ByteArray(b"17")]}, 1022 "InvalidArgs"), 1023 ({'Type': 'active', 1024 'SSIDs': [dbus.ByteArray(b"1234567890abcdef1234567890abcdef1")]}, 1025 "InvalidArgs"), 1026 ({'Type': 'active', 'IEs': 'foo'}, "InvalidArgs"), 1027 ({'Type': 'active', 'IEs': ['foo']}, "InvalidArgs"), 1028 ({'Type': 'active', 'Channels': 2412}, "InvalidArgs"), 1029 ({'Type': 'active', 'Channels': [2412]}, "InvalidArgs"), 1030 ({'Type': 'active', 1031 'Channels': [(dbus.Int32(2412), dbus.UInt32(20))]}, 1032 "InvalidArgs"), 1033 ({'Type': 'active', 1034 'Channels': [(dbus.UInt32(2412), dbus.Int32(20))]}, 1035 "InvalidArgs"), 1036 ({'Type': 'active', 'AllowRoam': "yes"}, "InvalidArgs"), 1037 ({'Type': 'passive', 'IEs': [dbus.ByteArray(b"\xdd\x00")]}, 1038 "InvalidArgs"), 1039 ({'Type': 'passive', 'SSIDs': [dbus.ByteArray(b"foo")]}, 1040 "InvalidArgs")] 1041 for (t, err) in tests: 1042 try: 1043 iface.Scan(t) 1044 raise Exception("Invalid Scan() arguments accepted: " + str(t)) 1045 except dbus.exceptions.DBusException as e: 1046 if err not in str(e): 1047 raise Exception("Unexpected error message for invalid Scan(%s): %s" % (str(t), str(e))) 1048 1049def test_dbus_scan_oom(dev, apdev): 1050 """D-Bus scan method and OOM""" 1051 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1052 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1053 1054 with alloc_fail_dbus(dev[0], 1, 1055 "wpa_scan_clone_params;wpas_dbus_handler_scan", 1056 "Scan", expected="ScanError: Scan request rejected"): 1057 iface.Scan({'Type': 'passive', 1058 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]}) 1059 1060 with alloc_fail_dbus(dev[0], 1, 1061 "=wpas_dbus_get_scan_channels;wpas_dbus_handler_scan", 1062 "Scan"): 1063 iface.Scan({'Type': 'passive', 1064 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]}) 1065 1066 with alloc_fail_dbus(dev[0], 1, 1067 "=wpas_dbus_get_scan_ies;wpas_dbus_handler_scan", 1068 "Scan"): 1069 iface.Scan({'Type': 'active', 1070 'IEs': [dbus.ByteArray(b"\xdd\x00")], 1071 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]}) 1072 1073 with alloc_fail_dbus(dev[0], 1, 1074 "=wpas_dbus_get_scan_ssids;wpas_dbus_handler_scan", 1075 "Scan"): 1076 iface.Scan({'Type': 'active', 1077 'SSIDs': [dbus.ByteArray(b"open"), 1078 dbus.ByteArray()], 1079 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]}) 1080 1081def test_dbus_scan(dev, apdev): 1082 """D-Bus scan and related signals""" 1083 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1084 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1085 1086 hapd = hostapd.add_ap(apdev[0], {"ssid": "open"}) 1087 1088 class TestDbusScan(TestDbus): 1089 def __init__(self, bus): 1090 TestDbus.__init__(self, bus) 1091 self.scan_completed = 0 1092 self.bss_added = False 1093 self.fail_reason = None 1094 1095 def __enter__(self): 1096 gobject.timeout_add(1, self.run_scan) 1097 gobject.timeout_add(15000, self.timeout) 1098 self.add_signal(self.scanDone, WPAS_DBUS_IFACE, "ScanDone") 1099 self.add_signal(self.bssAdded, WPAS_DBUS_IFACE, "BSSAdded") 1100 self.add_signal(self.bssRemoved, WPAS_DBUS_IFACE, "BSSRemoved") 1101 self.loop.run() 1102 return self 1103 1104 def scanDone(self, success): 1105 logger.debug("scanDone: success=%s" % success) 1106 self.scan_completed += 1 1107 if self.scan_completed == 1: 1108 iface.Scan({'Type': 'passive', 1109 'AllowRoam': True, 1110 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]}) 1111 elif self.scan_completed == 2: 1112 iface.Scan({'Type': 'passive', 1113 'AllowRoam': False}) 1114 elif self.bss_added and self.scan_completed == 3: 1115 self.loop.quit() 1116 1117 def bssAdded(self, bss, properties): 1118 logger.debug("bssAdded: %s" % bss) 1119 logger.debug(str(properties)) 1120 if 'WPS' in properties: 1121 if 'Type' in properties['WPS']: 1122 self.fail_reason = "Unexpected WPS dictionary entry in non-WPS BSS" 1123 self.loop.quit() 1124 self.bss_added = True 1125 if self.scan_completed == 3: 1126 self.loop.quit() 1127 1128 def bssRemoved(self, bss): 1129 logger.debug("bssRemoved: %s" % bss) 1130 1131 def run_scan(self, *args): 1132 logger.debug("run_scan") 1133 iface.Scan({'Type': 'active', 1134 'SSIDs': [dbus.ByteArray(b"open"), 1135 dbus.ByteArray()], 1136 'IEs': [dbus.ByteArray(b"\xdd\x00"), 1137 dbus.ByteArray()], 1138 'AllowRoam': False, 1139 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]}) 1140 return False 1141 1142 def success(self): 1143 return self.scan_completed == 3 and self.bss_added 1144 1145 with TestDbusScan(bus) as t: 1146 if t.fail_reason: 1147 raise Exception(t.fail_reason) 1148 if not t.success(): 1149 raise Exception("Expected signals not seen") 1150 1151 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs", 1152 dbus_interface=dbus.PROPERTIES_IFACE) 1153 if len(res) < 1: 1154 raise Exception("Scan result not in BSSs property") 1155 iface.FlushBSS(0) 1156 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs", 1157 dbus_interface=dbus.PROPERTIES_IFACE) 1158 if len(res) != 0: 1159 raise Exception("FlushBSS() did not remove scan results from BSSs property") 1160 iface.FlushBSS(1) 1161 1162def test_dbus_scan_rand(dev, apdev): 1163 """D-Bus MACAddressRandomizationMask property Get/Set""" 1164 try: 1165 run_dbus_scan_rand(dev, apdev) 1166 finally: 1167 dev[0].request("MAC_RAND_SCAN all enable=0") 1168 1169def run_dbus_scan_rand(dev, apdev): 1170 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1171 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1172 1173 res = if_obj.Get(WPAS_DBUS_IFACE, "MACAddressRandomizationMask", 1174 dbus_interface=dbus.PROPERTIES_IFACE) 1175 if len(res) != 0: 1176 logger.info(str(res)) 1177 raise Exception("Unexpected initial MACAddressRandomizationMask value") 1178 1179 try: 1180 if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask", "foo", 1181 dbus_interface=dbus.PROPERTIES_IFACE) 1182 raise Exception("Invalid Set accepted") 1183 except dbus.exceptions.DBusException as e: 1184 if "InvalidArgs: invalid message format" not in str(e): 1185 raise Exception("Unexpected error message: " + str(e)) 1186 1187 try: 1188 if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask", 1189 {"foo": "bar"}, 1190 dbus_interface=dbus.PROPERTIES_IFACE) 1191 raise Exception("Invalid Set accepted") 1192 except dbus.exceptions.DBusException as e: 1193 if "wpas_dbus_setter_mac_address_randomization_mask: mask was not a byte array" not in str(e): 1194 raise Exception("Unexpected error message: " + str(e)) 1195 1196 try: 1197 if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask", 1198 {"foo": dbus.ByteArray(b'123456')}, 1199 dbus_interface=dbus.PROPERTIES_IFACE) 1200 raise Exception("Invalid Set accepted") 1201 except dbus.exceptions.DBusException as e: 1202 if 'wpas_dbus_setter_mac_address_randomization_mask: bad scan type "foo"' not in str(e): 1203 raise Exception("Unexpected error message: " + str(e)) 1204 1205 try: 1206 if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask", 1207 {"scan": dbus.ByteArray(b'12345')}, 1208 dbus_interface=dbus.PROPERTIES_IFACE) 1209 raise Exception("Invalid Set accepted") 1210 except dbus.exceptions.DBusException as e: 1211 if 'wpas_dbus_setter_mac_address_randomization_mask: malformed MAC mask given' not in str(e): 1212 raise Exception("Unexpected error message: " + str(e)) 1213 1214 if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask", 1215 {"scan": dbus.ByteArray(b'123456')}, 1216 dbus_interface=dbus.PROPERTIES_IFACE) 1217 res = if_obj.Get(WPAS_DBUS_IFACE, "MACAddressRandomizationMask", 1218 dbus_interface=dbus.PROPERTIES_IFACE) 1219 if len(res) != 1: 1220 logger.info(str(res)) 1221 raise Exception("Unexpected MACAddressRandomizationMask value") 1222 1223 try: 1224 if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask", 1225 {"scan": dbus.ByteArray(b'123456'), 1226 "sched_scan": dbus.ByteArray(b'987654')}, 1227 dbus_interface=dbus.PROPERTIES_IFACE) 1228 except dbus.exceptions.DBusException as e: 1229 # sched_scan is unlikely to be supported 1230 pass 1231 1232 if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask", 1233 dbus.Dictionary({}, signature='sv'), 1234 dbus_interface=dbus.PROPERTIES_IFACE) 1235 res = if_obj.Get(WPAS_DBUS_IFACE, "MACAddressRandomizationMask", 1236 dbus_interface=dbus.PROPERTIES_IFACE) 1237 if len(res) != 0: 1238 logger.info(str(res)) 1239 raise Exception("Unexpected MACAddressRandomizationMask value") 1240 1241def test_dbus_scan_busy(dev, apdev): 1242 """D-Bus scan trigger rejection when busy with previous scan""" 1243 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1244 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1245 1246 if "OK" not in dev[0].request("SCAN freq=2412-2462"): 1247 raise Exception("Failed to start scan") 1248 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-STARTED"], 15) 1249 if ev is None: 1250 raise Exception("Scan start timed out") 1251 1252 try: 1253 iface.Scan({'Type': 'active', 'AllowRoam': False}) 1254 raise Exception("Scan() accepted when busy") 1255 except dbus.exceptions.DBusException as e: 1256 if "ScanError: Scan request reject" not in str(e): 1257 raise Exception("Unexpected error message: " + str(e)) 1258 1259 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15) 1260 if ev is None: 1261 raise Exception("Scan timed out") 1262 1263def test_dbus_scan_abort(dev, apdev): 1264 """D-Bus scan trigger and abort""" 1265 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1266 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1267 1268 iface.Scan({'Type': 'active', 'AllowRoam': False}) 1269 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-STARTED"], 15) 1270 if ev is None: 1271 raise Exception("Scan start timed out") 1272 1273 iface.AbortScan() 1274 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15) 1275 if ev is None: 1276 raise Exception("Scan abort result timed out") 1277 dev[0].dump_monitor() 1278 iface.Scan({'Type': 'active', 'AllowRoam': False}) 1279 iface.AbortScan() 1280 1281 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15) 1282 if ev is None: 1283 raise Exception("Scan timed out") 1284 1285def test_dbus_connect(dev, apdev): 1286 """D-Bus AddNetwork and connect""" 1287 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1288 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1289 1290 ssid = "test-wpa2-psk" 1291 passphrase = 'qwertyuiop' 1292 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase) 1293 hapd = hostapd.add_ap(apdev[0], params) 1294 1295 class TestDbusConnect(TestDbus): 1296 def __init__(self, bus): 1297 TestDbus.__init__(self, bus) 1298 self.network_added = False 1299 self.network_selected = False 1300 self.network_removed = False 1301 self.state = 0 1302 1303 def __enter__(self): 1304 gobject.timeout_add(1, self.run_connect) 1305 gobject.timeout_add(15000, self.timeout) 1306 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded") 1307 self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE, 1308 "NetworkRemoved") 1309 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE, 1310 "NetworkSelected") 1311 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, 1312 "PropertiesChanged") 1313 self.loop.run() 1314 return self 1315 1316 def networkAdded(self, network, properties): 1317 logger.debug("networkAdded: %s" % str(network)) 1318 logger.debug(str(properties)) 1319 self.network_added = True 1320 1321 def networkRemoved(self, network): 1322 logger.debug("networkRemoved: %s" % str(network)) 1323 self.network_removed = True 1324 1325 def networkSelected(self, network): 1326 logger.debug("networkSelected: %s" % str(network)) 1327 self.network_selected = True 1328 1329 def propertiesChanged(self, properties): 1330 logger.debug("propertiesChanged: %s" % str(properties)) 1331 if 'State' in properties and properties['State'] == "completed": 1332 if self.state == 0: 1333 self.state = 1 1334 iface.Disconnect() 1335 elif self.state == 2: 1336 self.state = 3 1337 iface.Disconnect() 1338 elif self.state == 4: 1339 self.state = 5 1340 iface.Reattach() 1341 elif self.state == 5: 1342 self.state = 6 1343 iface.Disconnect() 1344 elif self.state == 7: 1345 self.state = 8 1346 res = iface.SignalPoll() 1347 logger.debug("SignalPoll: " + str(res)) 1348 if 'frequency' not in res or res['frequency'] != 2412: 1349 self.state = -1 1350 logger.info("Unexpected SignalPoll result") 1351 iface.RemoveNetwork(self.netw) 1352 if 'State' in properties and properties['State'] == "disconnected": 1353 if self.state == 1: 1354 self.state = 2 1355 iface.SelectNetwork(self.netw) 1356 elif self.state == 3: 1357 self.state = 4 1358 iface.Reassociate() 1359 elif self.state == 6: 1360 self.state = 7 1361 iface.Reconnect() 1362 elif self.state == 8: 1363 self.state = 9 1364 self.loop.quit() 1365 1366 def run_connect(self, *args): 1367 logger.debug("run_connect") 1368 args = dbus.Dictionary({'ssid': ssid, 1369 'key_mgmt': 'WPA-PSK', 1370 'psk': passphrase, 1371 'scan_freq': 2412}, 1372 signature='sv') 1373 self.netw = iface.AddNetwork(args) 1374 iface.SelectNetwork(self.netw) 1375 return False 1376 1377 def success(self): 1378 if not self.network_added or \ 1379 not self.network_removed or \ 1380 not self.network_selected: 1381 return False 1382 return self.state == 9 1383 1384 with TestDbusConnect(bus) as t: 1385 if not t.success(): 1386 raise Exception("Expected signals not seen") 1387 1388def test_dbus_remove_connected(dev, apdev): 1389 """D-Bus RemoveAllNetworks while connected""" 1390 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1391 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1392 1393 ssid = "test-open" 1394 hapd = hostapd.add_ap(apdev[0], {"ssid": ssid}) 1395 1396 class TestDbusConnect(TestDbus): 1397 def __init__(self, bus): 1398 TestDbus.__init__(self, bus) 1399 self.network_added = False 1400 self.network_selected = False 1401 self.network_removed = False 1402 self.state = 0 1403 1404 def __enter__(self): 1405 gobject.timeout_add(1, self.run_connect) 1406 gobject.timeout_add(15000, self.timeout) 1407 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded") 1408 self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE, 1409 "NetworkRemoved") 1410 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE, 1411 "NetworkSelected") 1412 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, 1413 "PropertiesChanged") 1414 self.loop.run() 1415 return self 1416 1417 def networkAdded(self, network, properties): 1418 logger.debug("networkAdded: %s" % str(network)) 1419 logger.debug(str(properties)) 1420 self.network_added = True 1421 1422 def networkRemoved(self, network): 1423 logger.debug("networkRemoved: %s" % str(network)) 1424 self.network_removed = True 1425 1426 def networkSelected(self, network): 1427 logger.debug("networkSelected: %s" % str(network)) 1428 self.network_selected = True 1429 1430 def propertiesChanged(self, properties): 1431 logger.debug("propertiesChanged: %s" % str(properties)) 1432 if 'State' in properties and properties['State'] == "completed": 1433 if self.state == 0: 1434 self.state = 1 1435 iface.Disconnect() 1436 elif self.state == 2: 1437 self.state = 3 1438 iface.Disconnect() 1439 elif self.state == 4: 1440 self.state = 5 1441 iface.Reattach() 1442 elif self.state == 5: 1443 self.state = 6 1444 iface.Disconnect() 1445 elif self.state == 7: 1446 self.state = 8 1447 res = iface.SignalPoll() 1448 logger.debug("SignalPoll: " + str(res)) 1449 if 'frequency' not in res or res['frequency'] != 2412: 1450 self.state = -1 1451 logger.info("Unexpected SignalPoll result") 1452 iface.RemoveAllNetworks() 1453 if 'State' in properties and properties['State'] == "disconnected": 1454 if self.state == 1: 1455 self.state = 2 1456 iface.SelectNetwork(self.netw) 1457 elif self.state == 3: 1458 self.state = 4 1459 iface.Reassociate() 1460 elif self.state == 6: 1461 self.state = 7 1462 iface.Reconnect() 1463 elif self.state == 8: 1464 self.state = 9 1465 self.loop.quit() 1466 1467 def run_connect(self, *args): 1468 logger.debug("run_connect") 1469 args = dbus.Dictionary({'ssid': ssid, 1470 'key_mgmt': 'NONE', 1471 'scan_freq': 2412}, 1472 signature='sv') 1473 self.netw = iface.AddNetwork(args) 1474 iface.SelectNetwork(self.netw) 1475 return False 1476 1477 def success(self): 1478 if not self.network_added or \ 1479 not self.network_removed or \ 1480 not self.network_selected: 1481 return False 1482 return self.state == 9 1483 1484 with TestDbusConnect(bus) as t: 1485 if not t.success(): 1486 raise Exception("Expected signals not seen") 1487 1488def test_dbus_connect_psk_mem(dev, apdev): 1489 """D-Bus AddNetwork and connect with memory-only PSK""" 1490 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1491 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1492 1493 ssid = "test-wpa2-psk" 1494 passphrase = 'qwertyuiop' 1495 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase) 1496 hapd = hostapd.add_ap(apdev[0], params) 1497 1498 class TestDbusConnect(TestDbus): 1499 def __init__(self, bus): 1500 TestDbus.__init__(self, bus) 1501 self.connected = False 1502 1503 def __enter__(self): 1504 gobject.timeout_add(1, self.run_connect) 1505 gobject.timeout_add(15000, self.timeout) 1506 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, 1507 "PropertiesChanged") 1508 self.add_signal(self.networkRequest, WPAS_DBUS_IFACE, 1509 "NetworkRequest") 1510 self.loop.run() 1511 return self 1512 1513 def propertiesChanged(self, properties): 1514 logger.debug("propertiesChanged: %s" % str(properties)) 1515 if 'State' in properties and properties['State'] == "completed": 1516 self.connected = True 1517 self.loop.quit() 1518 1519 def networkRequest(self, path, field, txt): 1520 logger.debug("networkRequest: %s %s %s" % (path, field, txt)) 1521 if field == "PSK_PASSPHRASE": 1522 iface.NetworkReply(path, field, '"' + passphrase + '"') 1523 1524 def run_connect(self, *args): 1525 logger.debug("run_connect") 1526 args = dbus.Dictionary({'ssid': ssid, 1527 'key_mgmt': 'WPA-PSK', 1528 'mem_only_psk': 1, 1529 'scan_freq': 2412}, 1530 signature='sv') 1531 self.netw = iface.AddNetwork(args) 1532 iface.SelectNetwork(self.netw) 1533 return False 1534 1535 def success(self): 1536 return self.connected 1537 1538 with TestDbusConnect(bus) as t: 1539 if not t.success(): 1540 raise Exception("Expected signals not seen") 1541 1542def test_dbus_connect_oom(dev, apdev): 1543 """D-Bus AddNetwork and connect when out-of-memory""" 1544 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1545 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1546 1547 if "OK" not in dev[0].request("TEST_ALLOC_FAIL 0:"): 1548 raise HwsimSkip("TEST_ALLOC_FAIL not supported in the build") 1549 1550 ssid = "test-wpa2-psk" 1551 passphrase = 'qwertyuiop' 1552 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase) 1553 hapd = hostapd.add_ap(apdev[0], params) 1554 1555 class TestDbusConnect(TestDbus): 1556 def __init__(self, bus): 1557 TestDbus.__init__(self, bus) 1558 self.network_added = False 1559 self.network_selected = False 1560 self.network_removed = False 1561 self.state = 0 1562 1563 def __enter__(self): 1564 gobject.timeout_add(1, self.run_connect) 1565 gobject.timeout_add(1500, self.timeout) 1566 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded") 1567 self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE, 1568 "NetworkRemoved") 1569 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE, 1570 "NetworkSelected") 1571 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, 1572 "PropertiesChanged") 1573 self.loop.run() 1574 return self 1575 1576 def networkAdded(self, network, properties): 1577 logger.debug("networkAdded: %s" % str(network)) 1578 logger.debug(str(properties)) 1579 self.network_added = True 1580 1581 def networkRemoved(self, network): 1582 logger.debug("networkRemoved: %s" % str(network)) 1583 self.network_removed = True 1584 1585 def networkSelected(self, network): 1586 logger.debug("networkSelected: %s" % str(network)) 1587 self.network_selected = True 1588 1589 def propertiesChanged(self, properties): 1590 logger.debug("propertiesChanged: %s" % str(properties)) 1591 if 'State' in properties and properties['State'] == "completed": 1592 if self.state == 0: 1593 self.state = 1 1594 iface.Disconnect() 1595 elif self.state == 2: 1596 self.state = 3 1597 iface.Disconnect() 1598 elif self.state == 4: 1599 self.state = 5 1600 iface.Reattach() 1601 elif self.state == 5: 1602 self.state = 6 1603 res = iface.SignalPoll() 1604 logger.debug("SignalPoll: " + str(res)) 1605 if 'frequency' not in res or res['frequency'] != 2412: 1606 self.state = -1 1607 logger.info("Unexpected SignalPoll result") 1608 iface.RemoveNetwork(self.netw) 1609 if 'State' in properties and properties['State'] == "disconnected": 1610 if self.state == 1: 1611 self.state = 2 1612 iface.SelectNetwork(self.netw) 1613 elif self.state == 3: 1614 self.state = 4 1615 iface.Reassociate() 1616 elif self.state == 6: 1617 self.state = 7 1618 self.loop.quit() 1619 1620 def run_connect(self, *args): 1621 logger.debug("run_connect") 1622 args = dbus.Dictionary({'ssid': ssid, 1623 'key_mgmt': 'WPA-PSK', 1624 'psk': passphrase, 1625 'scan_freq': 2412}, 1626 signature='sv') 1627 try: 1628 self.netw = iface.AddNetwork(args) 1629 except Exception as e: 1630 logger.info("Exception on AddNetwork: " + str(e)) 1631 self.loop.quit() 1632 return False 1633 try: 1634 iface.SelectNetwork(self.netw) 1635 except Exception as e: 1636 logger.info("Exception on SelectNetwork: " + str(e)) 1637 self.loop.quit() 1638 1639 return False 1640 1641 def success(self): 1642 if not self.network_added or \ 1643 not self.network_removed or \ 1644 not self.network_selected: 1645 return False 1646 return self.state == 7 1647 1648 count = 0 1649 for i in range(1, 1000): 1650 for j in range(3): 1651 dev[j].dump_monitor() 1652 dev[0].request("TEST_ALLOC_FAIL %d:main" % i) 1653 try: 1654 with TestDbusConnect(bus) as t: 1655 if not t.success(): 1656 logger.info("Iteration %d - Expected signals not seen" % i) 1657 else: 1658 logger.info("Iteration %d - success" % i) 1659 1660 state = dev[0].request('GET_ALLOC_FAIL') 1661 logger.info("GET_ALLOC_FAIL: " + state) 1662 dev[0].dump_monitor() 1663 dev[0].request("TEST_ALLOC_FAIL 0:") 1664 if i < 3: 1665 raise Exception("Connection succeeded during out-of-memory") 1666 if not state.startswith('0:'): 1667 count += 1 1668 if count == 5: 1669 break 1670 except: 1671 pass 1672 1673 # Force regulatory update to re-fetch hw capabilities for the following 1674 # test cases. 1675 try: 1676 dev[0].dump_monitor() 1677 subprocess.call(['iw', 'reg', 'set', 'US']) 1678 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1) 1679 finally: 1680 dev[0].dump_monitor() 1681 subprocess.call(['iw', 'reg', 'set', '00']) 1682 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1) 1683 1684def test_dbus_while_not_connected(dev, apdev): 1685 """D-Bus invalid operations while not connected""" 1686 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1687 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1688 1689 try: 1690 iface.Disconnect() 1691 raise Exception("Disconnect() accepted when not connected") 1692 except dbus.exceptions.DBusException as e: 1693 if "NotConnected" not in str(e): 1694 raise Exception("Unexpected error message for invalid Disconnect: " + str(e)) 1695 1696 try: 1697 iface.Reattach() 1698 raise Exception("Reattach() accepted when not connected") 1699 except dbus.exceptions.DBusException as e: 1700 if "NotConnected" not in str(e): 1701 raise Exception("Unexpected error message for invalid Reattach: " + str(e)) 1702 1703def test_dbus_connect_eap(dev, apdev): 1704 """D-Bus AddNetwork and connect to EAP network""" 1705 check_altsubject_match_support(dev[0]) 1706 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1707 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1708 1709 ssid = "ieee8021x-open" 1710 params = hostapd.radius_params() 1711 params["ssid"] = ssid 1712 params["ieee8021x"] = "1" 1713 hapd = hostapd.add_ap(apdev[0], params) 1714 1715 class TestDbusConnect(TestDbus): 1716 def __init__(self, bus): 1717 TestDbus.__init__(self, bus) 1718 self.certification_received = False 1719 self.eap_status = False 1720 self.state = 0 1721 1722 def __enter__(self): 1723 gobject.timeout_add(1, self.run_connect) 1724 gobject.timeout_add(15000, self.timeout) 1725 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, 1726 "PropertiesChanged") 1727 self.add_signal(self.certification, WPAS_DBUS_IFACE, 1728 "Certification", byte_arrays=True) 1729 self.add_signal(self.networkRequest, WPAS_DBUS_IFACE, 1730 "NetworkRequest") 1731 self.add_signal(self.eap, WPAS_DBUS_IFACE, "EAP") 1732 self.loop.run() 1733 return self 1734 1735 def propertiesChanged(self, properties): 1736 logger.debug("propertiesChanged: %s" % str(properties)) 1737 if 'State' in properties and properties['State'] == "completed": 1738 if self.state == 0: 1739 self.state = 1 1740 iface.EAPLogoff() 1741 logger.info("Set dNSName constraint") 1742 net_obj = bus.get_object(WPAS_DBUS_SERVICE, self.netw) 1743 args = dbus.Dictionary({'altsubject_match': 1744 self.server_dnsname}, 1745 signature='sv') 1746 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args, 1747 dbus_interface=dbus.PROPERTIES_IFACE) 1748 elif self.state == 2: 1749 self.state = 3 1750 iface.Disconnect() 1751 logger.info("Set non-matching dNSName constraint") 1752 net_obj = bus.get_object(WPAS_DBUS_SERVICE, self.netw) 1753 args = dbus.Dictionary({'altsubject_match': 1754 self.server_dnsname + "FOO"}, 1755 signature='sv') 1756 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args, 1757 dbus_interface=dbus.PROPERTIES_IFACE) 1758 if 'State' in properties and properties['State'] == "disconnected": 1759 if self.state == 1: 1760 self.state = 2 1761 iface.EAPLogon() 1762 iface.SelectNetwork(self.netw) 1763 if self.state == 3: 1764 self.state = 4 1765 iface.SelectNetwork(self.netw) 1766 1767 def certification(self, args): 1768 logger.debug("certification: %s" % str(args)) 1769 self.certification_received = True 1770 if args['depth'] == 0: 1771 # The test server certificate is supposed to have dNSName 1772 if len(args['altsubject']) < 1: 1773 raise Exception("Missing dNSName") 1774 dnsname = args['altsubject'][0] 1775 if not dnsname.startswith("DNS:"): 1776 raise Exception("Expected dNSName not found: " + dnsname) 1777 logger.info("altsubject: " + dnsname) 1778 self.server_dnsname = dnsname 1779 1780 def eap(self, status, parameter): 1781 logger.debug("EAP: status=%s parameter=%s" % (status, parameter)) 1782 if status == 'completion' and parameter == 'success': 1783 self.eap_status = True 1784 if self.state == 4 and status == 'remote certificate verification' and parameter == 'AltSubject mismatch': 1785 self.state = 5 1786 self.loop.quit() 1787 1788 def networkRequest(self, path, field, txt): 1789 logger.debug("networkRequest: %s %s %s" % (path, field, txt)) 1790 if field == "PASSWORD": 1791 iface.NetworkReply(path, field, "password") 1792 1793 def run_connect(self, *args): 1794 logger.debug("run_connect") 1795 args = dbus.Dictionary({'ssid': ssid, 1796 'key_mgmt': 'IEEE8021X', 1797 'eapol_flags': 0, 1798 'eap': 'TTLS', 1799 'anonymous_identity': 'ttls', 1800 'identity': 'pap user', 1801 'ca_cert': 'auth_serv/ca.pem', 1802 'phase2': 'auth=PAP', 1803 'scan_freq': 2412}, 1804 signature='sv') 1805 self.netw = iface.AddNetwork(args) 1806 iface.SelectNetwork(self.netw) 1807 return False 1808 1809 def success(self): 1810 if not self.eap_status or not self.certification_received: 1811 return False 1812 return self.state == 5 1813 1814 with TestDbusConnect(bus) as t: 1815 if not t.success(): 1816 raise Exception("Expected signals not seen") 1817 1818def test_dbus_network(dev, apdev): 1819 """D-Bus AddNetwork/RemoveNetwork parameters and error cases""" 1820 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1821 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1822 1823 args = dbus.Dictionary({'ssid': "foo", 1824 'key_mgmt': 'WPA-PSK', 1825 'psk': "12345678", 1826 'identity': dbus.ByteArray([1, 2]), 1827 'priority': dbus.Int32(0), 1828 'scan_freq': dbus.UInt32(2412)}, 1829 signature='sv') 1830 netw = iface.AddNetwork(args) 1831 id = int(dev[0].list_networks()[0]['id']) 1832 val = dev[0].get_network(id, "scan_freq") 1833 if val != "2412": 1834 raise Exception("Invalid scan_freq value: " + str(val)) 1835 iface.RemoveNetwork(netw) 1836 1837 args = dbus.Dictionary({'ssid': "foo", 1838 'key_mgmt': 'NONE', 1839 'scan_freq': "2412 2432", 1840 'freq_list': "2412 2417 2432"}, 1841 signature='sv') 1842 netw = iface.AddNetwork(args) 1843 id = int(dev[0].list_networks()[0]['id']) 1844 val = dev[0].get_network(id, "scan_freq") 1845 if val != "2412 2432": 1846 raise Exception("Invalid scan_freq value (2): " + str(val)) 1847 val = dev[0].get_network(id, "freq_list") 1848 if val != "2412 2417 2432": 1849 raise Exception("Invalid freq_list value: " + str(val)) 1850 iface.RemoveNetwork(netw) 1851 try: 1852 iface.RemoveNetwork(netw) 1853 raise Exception("Invalid RemoveNetwork() accepted") 1854 except dbus.exceptions.DBusException as e: 1855 if "NetworkUnknown" not in str(e): 1856 raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e)) 1857 try: 1858 iface.SelectNetwork(netw) 1859 raise Exception("Invalid SelectNetwork() accepted") 1860 except dbus.exceptions.DBusException as e: 1861 if "NetworkUnknown" not in str(e): 1862 raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e)) 1863 1864 args = dbus.Dictionary({'ssid': "foo1", 'key_mgmt': 'NONE', 1865 'identity': "testuser", 'scan_freq': '2412'}, 1866 signature='sv') 1867 netw1 = iface.AddNetwork(args) 1868 args = dbus.Dictionary({'ssid': "foo2", 'key_mgmt': 'NONE'}, 1869 signature='sv') 1870 netw2 = iface.AddNetwork(args) 1871 res = if_obj.Get(WPAS_DBUS_IFACE, "Networks", 1872 dbus_interface=dbus.PROPERTIES_IFACE) 1873 if len(res) != 2: 1874 raise Exception("Unexpected number of networks") 1875 1876 net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1) 1877 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled", 1878 dbus_interface=dbus.PROPERTIES_IFACE) 1879 if res != False: 1880 raise Exception("Added network was unexpectedly enabled by default") 1881 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(True), 1882 dbus_interface=dbus.PROPERTIES_IFACE) 1883 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled", 1884 dbus_interface=dbus.PROPERTIES_IFACE) 1885 if res != True: 1886 raise Exception("Set(Enabled,True) did not seem to change property value") 1887 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(False), 1888 dbus_interface=dbus.PROPERTIES_IFACE) 1889 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled", 1890 dbus_interface=dbus.PROPERTIES_IFACE) 1891 if res != False: 1892 raise Exception("Set(Enabled,False) did not seem to change property value") 1893 try: 1894 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.UInt32(1), 1895 dbus_interface=dbus.PROPERTIES_IFACE) 1896 raise Exception("Invalid Set(Enabled,1) accepted") 1897 except dbus.exceptions.DBusException as e: 1898 if "Error.Failed: wrong property type" not in str(e): 1899 raise Exception("Unexpected error message for invalid Set(Enabled,1): " + str(e)) 1900 1901 args = dbus.Dictionary({'ssid': "foo1new"}, signature='sv') 1902 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args, 1903 dbus_interface=dbus.PROPERTIES_IFACE) 1904 res = net_obj.Get(WPAS_DBUS_NETWORK, "Properties", 1905 dbus_interface=dbus.PROPERTIES_IFACE) 1906 if res['ssid'] != '"foo1new"': 1907 raise Exception("Set(Properties) failed to update ssid") 1908 if res['identity'] != '"testuser"': 1909 raise Exception("Set(Properties) unexpectedly changed unrelated parameter") 1910 1911 iface.RemoveAllNetworks() 1912 res = if_obj.Get(WPAS_DBUS_IFACE, "Networks", 1913 dbus_interface=dbus.PROPERTIES_IFACE) 1914 if len(res) != 0: 1915 raise Exception("Unexpected number of networks") 1916 iface.RemoveAllNetworks() 1917 1918 tests = [dbus.Dictionary({'psk': "1234567"}, signature='sv'), 1919 dbus.Dictionary({'identity': dbus.ByteArray()}, 1920 signature='sv'), 1921 dbus.Dictionary({'identity': dbus.Byte(1)}, signature='sv')] 1922 for args in tests: 1923 try: 1924 iface.AddNetwork(args) 1925 raise Exception("Invalid AddNetwork args accepted: " + str(args)) 1926 except dbus.exceptions.DBusException as e: 1927 if "InvalidArgs" not in str(e): 1928 raise Exception("Unexpected error message for invalid AddNetwork: " + str(e)) 1929 1930def test_dbus_network_oom(dev, apdev): 1931 """D-Bus AddNetwork/RemoveNetwork parameters and OOM error cases""" 1932 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1933 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1934 1935 args = dbus.Dictionary({'ssid': "foo1", 'key_mgmt': 'NONE', 1936 'identity': "testuser", 'scan_freq': '2412'}, 1937 signature='sv') 1938 netw1 = iface.AddNetwork(args) 1939 net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1) 1940 1941 with alloc_fail_dbus(dev[0], 1, 1942 "wpa_config_get_all;wpas_dbus_getter_network_properties", 1943 "Get"): 1944 net_obj.Get(WPAS_DBUS_NETWORK, "Properties", 1945 dbus_interface=dbus.PROPERTIES_IFACE) 1946 1947 iface.RemoveAllNetworks() 1948 1949 with alloc_fail_dbus(dev[0], 1, 1950 "wpas_dbus_new_decompose_object_path;wpas_dbus_handler_remove_network", 1951 "RemoveNetwork", "InvalidArgs"): 1952 iface.RemoveNetwork(dbus.ObjectPath("/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234")) 1953 1954 with alloc_fail(dev[0], 1, "wpa_dbus_register_object_per_iface;wpas_dbus_register_network"): 1955 args = dbus.Dictionary({'ssid': "foo2", 'key_mgmt': 'NONE'}, 1956 signature='sv') 1957 try: 1958 netw = iface.AddNetwork(args) 1959 # Currently, AddNetwork() succeeds even if os_strdup() for path 1960 # fails, so remove the network if that occurs. 1961 iface.RemoveNetwork(netw) 1962 except dbus.exceptions.DBusException as e: 1963 pass 1964 1965 for i in range(1, 3): 1966 with alloc_fail(dev[0], i, "=wpas_dbus_register_network"): 1967 try: 1968 netw = iface.AddNetwork(args) 1969 # Currently, AddNetwork() succeeds even if network registration 1970 # fails, so remove the network if that occurs. 1971 iface.RemoveNetwork(netw) 1972 except dbus.exceptions.DBusException as e: 1973 pass 1974 1975 with alloc_fail_dbus(dev[0], 1, 1976 "=wpa_config_add_network;wpas_dbus_handler_add_network", 1977 "AddNetwork", 1978 "UnknownError: wpa_supplicant could not add a network"): 1979 args = dbus.Dictionary({'ssid': "foo2", 'key_mgmt': 'NONE'}, 1980 signature='sv') 1981 netw = iface.AddNetwork(args) 1982 1983 tests = [(1, 1984 'wpa_dbus_dict_get_entry;set_network_properties;wpas_dbus_handler_add_network', 1985 dbus.Dictionary({'ssid': dbus.ByteArray(b' ')}, 1986 signature='sv')), 1987 (1, '=set_network_properties;wpas_dbus_handler_add_network', 1988 dbus.Dictionary({'ssid': 'foo'}, signature='sv')), 1989 (1, '=set_network_properties;wpas_dbus_handler_add_network', 1990 dbus.Dictionary({'eap': 'foo'}, signature='sv')), 1991 (1, '=set_network_properties;wpas_dbus_handler_add_network', 1992 dbus.Dictionary({'priority': dbus.UInt32(1)}, 1993 signature='sv')), 1994 (1, '=set_network_properties;wpas_dbus_handler_add_network', 1995 dbus.Dictionary({'priority': dbus.Int32(1)}, 1996 signature='sv')), 1997 (1, '=set_network_properties;wpas_dbus_handler_add_network', 1998 dbus.Dictionary({'ssid': dbus.ByteArray(b' ')}, 1999 signature='sv'))] 2000 for (count, funcs, args) in tests: 2001 with alloc_fail_dbus(dev[0], count, funcs, "AddNetwork", "InvalidArgs"): 2002 netw = iface.AddNetwork(args) 2003 2004 if len(if_obj.Get(WPAS_DBUS_IFACE, 'Networks', 2005 dbus_interface=dbus.PROPERTIES_IFACE)) > 0: 2006 raise Exception("Unexpected network block added") 2007 if len(dev[0].list_networks()) > 0: 2008 raise Exception("Unexpected network block visible") 2009 2010def test_dbus_interface(dev, apdev): 2011 """D-Bus CreateInterface/GetInterface/RemoveInterface parameters and error cases""" 2012 try: 2013 _test_dbus_interface(dev, apdev) 2014 finally: 2015 # Need to force P2P channel list update since the 'lo' interface 2016 # with driver=none ends up configuring default dualband channels. 2017 dev[0].request("SET country US") 2018 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1) 2019 if ev is None: 2020 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], 2021 timeout=1) 2022 dev[0].request("SET country 00") 2023 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1) 2024 if ev is None: 2025 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], 2026 timeout=1) 2027 subprocess.call(['iw', 'reg', 'set', '00']) 2028 2029def _test_dbus_interface(dev, apdev): 2030 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2031 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE) 2032 2033 params = dbus.Dictionary({'Ifname': 'lo', 'Driver': 'none'}, 2034 signature='sv') 2035 path = wpas.CreateInterface(params) 2036 logger.debug("New interface path: " + str(path)) 2037 path2 = wpas.GetInterface("lo") 2038 if path != path2: 2039 raise Exception("Interface object mismatch") 2040 2041 params = dbus.Dictionary({'Ifname': 'lo', 2042 'Driver': 'none', 2043 'ConfigFile': 'foo', 2044 'BridgeIfname': 'foo',}, 2045 signature='sv') 2046 try: 2047 wpas.CreateInterface(params) 2048 raise Exception("Invalid CreateInterface() accepted") 2049 except dbus.exceptions.DBusException as e: 2050 if "InterfaceExists" not in str(e): 2051 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e)) 2052 2053 wpas.RemoveInterface(path) 2054 try: 2055 wpas.RemoveInterface(path) 2056 raise Exception("Invalid RemoveInterface() accepted") 2057 except dbus.exceptions.DBusException as e: 2058 if "InterfaceUnknown" not in str(e): 2059 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e)) 2060 2061 params = dbus.Dictionary({'Ifname': 'lo', 'Driver': 'none', 2062 'Foo': 123}, 2063 signature='sv') 2064 try: 2065 wpas.CreateInterface(params) 2066 raise Exception("Invalid CreateInterface() accepted") 2067 except dbus.exceptions.DBusException as e: 2068 if "InvalidArgs" not in str(e): 2069 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e)) 2070 2071 params = dbus.Dictionary({'Driver': 'none'}, signature='sv') 2072 try: 2073 wpas.CreateInterface(params) 2074 raise Exception("Invalid CreateInterface() accepted") 2075 except dbus.exceptions.DBusException as e: 2076 if "InvalidArgs" not in str(e): 2077 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e)) 2078 2079 try: 2080 wpas.GetInterface("lo") 2081 raise Exception("Invalid GetInterface() accepted") 2082 except dbus.exceptions.DBusException as e: 2083 if "InterfaceUnknown" not in str(e): 2084 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e)) 2085 2086def test_dbus_interface_oom(dev, apdev): 2087 """D-Bus CreateInterface/GetInterface/RemoveInterface OOM error cases""" 2088 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2089 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE) 2090 2091 with alloc_fail_dbus(dev[0], 1, "wpa_dbus_dict_get_entry;wpas_dbus_handler_create_interface", "CreateInterface", "InvalidArgs"): 2092 params = dbus.Dictionary({'Ifname': 'lo', 'Driver': 'none'}, 2093 signature='sv') 2094 wpas.CreateInterface(params) 2095 2096 for i in range(1, 1000): 2097 dev[0].request("TEST_ALLOC_FAIL %d:wpa_supplicant_add_iface;wpas_dbus_handler_create_interface" % i) 2098 params = dbus.Dictionary({'Ifname': 'lo', 'Driver': 'none'}, 2099 signature='sv') 2100 try: 2101 npath = wpas.CreateInterface(params) 2102 wpas.RemoveInterface(npath) 2103 logger.info("CreateInterface succeeds after %d allocation failures" % i) 2104 state = dev[0].request('GET_ALLOC_FAIL') 2105 logger.info("GET_ALLOC_FAIL: " + state) 2106 dev[0].dump_monitor() 2107 dev[0].request("TEST_ALLOC_FAIL 0:") 2108 if i < 5: 2109 raise Exception("CreateInterface succeeded during out-of-memory") 2110 if not state.startswith('0:'): 2111 break 2112 except dbus.exceptions.DBusException as e: 2113 pass 2114 2115 for arg in ['Driver', 'Ifname', 'ConfigFile', 'BridgeIfname']: 2116 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_handler_create_interface", 2117 "CreateInterface"): 2118 params = dbus.Dictionary({arg: 'foo'}, signature='sv') 2119 wpas.CreateInterface(params) 2120 2121def test_dbus_blob(dev, apdev): 2122 """D-Bus AddNetwork/RemoveNetwork parameters and error cases""" 2123 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2124 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 2125 2126 blob = dbus.ByteArray(b"\x01\x02\x03") 2127 iface.AddBlob('blob1', blob) 2128 try: 2129 iface.AddBlob('blob1', dbus.ByteArray(b"\x01\x02\x04")) 2130 raise Exception("Invalid AddBlob() accepted") 2131 except dbus.exceptions.DBusException as e: 2132 if "BlobExists" not in str(e): 2133 raise Exception("Unexpected error message for invalid AddBlob: " + str(e)) 2134 res = iface.GetBlob('blob1') 2135 if len(res) != len(blob): 2136 raise Exception("Unexpected blob data length") 2137 for i in range(len(res)): 2138 if res[i] != dbus.Byte(blob[i]): 2139 raise Exception("Unexpected blob data") 2140 res = if_obj.Get(WPAS_DBUS_IFACE, "Blobs", 2141 dbus_interface=dbus.PROPERTIES_IFACE) 2142 if 'blob1' not in res: 2143 raise Exception("Added blob missing from Blobs property") 2144 iface.RemoveBlob('blob1') 2145 try: 2146 iface.RemoveBlob('blob1') 2147 raise Exception("Invalid RemoveBlob() accepted") 2148 except dbus.exceptions.DBusException as e: 2149 if "BlobUnknown" not in str(e): 2150 raise Exception("Unexpected error message for invalid RemoveBlob: " + str(e)) 2151 try: 2152 iface.GetBlob('blob1') 2153 raise Exception("Invalid GetBlob() accepted") 2154 except dbus.exceptions.DBusException as e: 2155 if "BlobUnknown" not in str(e): 2156 raise Exception("Unexpected error message for invalid GetBlob: " + str(e)) 2157 2158 class TestDbusBlob(TestDbus): 2159 def __init__(self, bus): 2160 TestDbus.__init__(self, bus) 2161 self.blob_added = False 2162 self.blob_removed = False 2163 2164 def __enter__(self): 2165 gobject.timeout_add(1, self.run_blob) 2166 gobject.timeout_add(15000, self.timeout) 2167 self.add_signal(self.blobAdded, WPAS_DBUS_IFACE, "BlobAdded") 2168 self.add_signal(self.blobRemoved, WPAS_DBUS_IFACE, "BlobRemoved") 2169 self.loop.run() 2170 return self 2171 2172 def blobAdded(self, blobName): 2173 logger.debug("blobAdded: %s" % blobName) 2174 if blobName == 'blob2': 2175 self.blob_added = True 2176 2177 def blobRemoved(self, blobName): 2178 logger.debug("blobRemoved: %s" % blobName) 2179 if blobName == 'blob2': 2180 self.blob_removed = True 2181 self.loop.quit() 2182 2183 def run_blob(self, *args): 2184 logger.debug("run_blob") 2185 iface.AddBlob('blob2', dbus.ByteArray(b"\x01\x02\x04")) 2186 iface.RemoveBlob('blob2') 2187 return False 2188 2189 def success(self): 2190 return self.blob_added and self.blob_removed 2191 2192 with TestDbusBlob(bus) as t: 2193 if not t.success(): 2194 raise Exception("Expected signals not seen") 2195 2196def test_dbus_blob_oom(dev, apdev): 2197 """D-Bus AddNetwork/RemoveNetwork OOM error cases""" 2198 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2199 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 2200 2201 for i in range(1, 4): 2202 with alloc_fail_dbus(dev[0], i, "wpas_dbus_handler_add_blob", 2203 "AddBlob"): 2204 iface.AddBlob('blob_no_mem', dbus.ByteArray(b"\x01\x02\x03\x04")) 2205 2206def test_dbus_autoscan(dev, apdev): 2207 """D-Bus Autoscan()""" 2208 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2209 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 2210 2211 iface.AutoScan("foo") 2212 iface.AutoScan("periodic:1") 2213 iface.AutoScan("") 2214 dev[0].request("AUTOSCAN ") 2215 2216def test_dbus_autoscan_oom(dev, apdev): 2217 """D-Bus Autoscan() OOM""" 2218 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2219 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 2220 2221 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_autoscan", "AutoScan"): 2222 iface.AutoScan("foo") 2223 dev[0].request("AUTOSCAN ") 2224 2225def test_dbus_tdls_invalid(dev, apdev): 2226 """D-Bus invalid TDLS operations""" 2227 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2228 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 2229 2230 hapd = hostapd.add_ap(apdev[0], {"ssid": "test-open"}) 2231 connect_2sta_open(dev, hapd) 2232 addr1 = dev[1].p2p_interface_addr() 2233 2234 try: 2235 iface.TDLSDiscover("foo") 2236 raise Exception("Invalid TDLSDiscover() accepted") 2237 except dbus.exceptions.DBusException as e: 2238 if "InvalidArgs" not in str(e): 2239 raise Exception("Unexpected error message for invalid TDLSDiscover: " + str(e)) 2240 2241 try: 2242 iface.TDLSStatus("foo") 2243 raise Exception("Invalid TDLSStatus() accepted") 2244 except dbus.exceptions.DBusException as e: 2245 if "InvalidArgs" not in str(e): 2246 raise Exception("Unexpected error message for invalid TDLSStatus: " + str(e)) 2247 2248 res = iface.TDLSStatus(addr1) 2249 if res != "peer does not exist": 2250 raise Exception("Unexpected TDLSStatus response") 2251 2252 try: 2253 iface.TDLSSetup("foo") 2254 raise Exception("Invalid TDLSSetup() accepted") 2255 except dbus.exceptions.DBusException as e: 2256 if "InvalidArgs" not in str(e): 2257 raise Exception("Unexpected error message for invalid TDLSSetup: " + str(e)) 2258 2259 try: 2260 iface.TDLSTeardown("foo") 2261 raise Exception("Invalid TDLSTeardown() accepted") 2262 except dbus.exceptions.DBusException as e: 2263 if "InvalidArgs" not in str(e): 2264 raise Exception("Unexpected error message for invalid TDLSTeardown: " + str(e)) 2265 2266 try: 2267 iface.TDLSTeardown("00:11:22:33:44:55") 2268 raise Exception("TDLSTeardown accepted for unknown peer") 2269 except dbus.exceptions.DBusException as e: 2270 if "UnknownError: error performing TDLS teardown" not in str(e): 2271 raise Exception("Unexpected error message: " + str(e)) 2272 2273 try: 2274 iface.TDLSChannelSwitch({}) 2275 raise Exception("Invalid TDLSChannelSwitch() accepted") 2276 except dbus.exceptions.DBusException as e: 2277 if "InvalidArgs" not in str(e): 2278 raise Exception("Unexpected error message for invalid TDLSChannelSwitch: " + str(e)) 2279 2280 try: 2281 iface.TDLSCancelChannelSwitch("foo") 2282 raise Exception("Invalid TDLSCancelChannelSwitch() accepted") 2283 except dbus.exceptions.DBusException as e: 2284 if "InvalidArgs" not in str(e): 2285 raise Exception("Unexpected error message for invalid TDLSCancelChannelSwitch: " + str(e)) 2286 2287def test_dbus_tdls_oom(dev, apdev): 2288 """D-Bus TDLS operations during OOM""" 2289 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2290 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 2291 2292 with alloc_fail_dbus(dev[0], 1, "wpa_tdls_add_peer", "TDLSSetup", 2293 "UnknownError: error performing TDLS setup"): 2294 iface.TDLSSetup("00:11:22:33:44:55") 2295 2296def test_dbus_tdls(dev, apdev): 2297 """D-Bus TDLS""" 2298 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2299 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 2300 2301 hapd = hostapd.add_ap(apdev[0], {"ssid": "test-open"}) 2302 connect_2sta_open(dev, hapd) 2303 2304 addr1 = dev[1].p2p_interface_addr() 2305 2306 class TestDbusTdls(TestDbus): 2307 def __init__(self, bus): 2308 TestDbus.__init__(self, bus) 2309 self.tdls_setup = False 2310 self.tdls_teardown = False 2311 2312 def __enter__(self): 2313 gobject.timeout_add(1, self.run_tdls) 2314 gobject.timeout_add(15000, self.timeout) 2315 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, 2316 "PropertiesChanged") 2317 self.loop.run() 2318 return self 2319 2320 def propertiesChanged(self, properties): 2321 logger.debug("propertiesChanged: %s" % str(properties)) 2322 2323 def run_tdls(self, *args): 2324 logger.debug("run_tdls") 2325 iface.TDLSDiscover(addr1) 2326 gobject.timeout_add(100, self.run_tdls2) 2327 return False 2328 2329 def run_tdls2(self, *args): 2330 logger.debug("run_tdls2") 2331 iface.TDLSSetup(addr1) 2332 gobject.timeout_add(500, self.run_tdls3) 2333 return False 2334 2335 def run_tdls3(self, *args): 2336 logger.debug("run_tdls3") 2337 res = iface.TDLSStatus(addr1) 2338 if res == "connected": 2339 self.tdls_setup = True 2340 else: 2341 logger.info("Unexpected TDLSStatus: " + res) 2342 iface.TDLSTeardown(addr1) 2343 gobject.timeout_add(200, self.run_tdls4) 2344 return False 2345 2346 def run_tdls4(self, *args): 2347 logger.debug("run_tdls4") 2348 res = iface.TDLSStatus(addr1) 2349 if res == "peer does not exist": 2350 self.tdls_teardown = True 2351 else: 2352 logger.info("Unexpected TDLSStatus: " + res) 2353 self.loop.quit() 2354 return False 2355 2356 def success(self): 2357 return self.tdls_setup and self.tdls_teardown 2358 2359 with TestDbusTdls(bus) as t: 2360 if not t.success(): 2361 raise Exception("Expected signals not seen") 2362 2363def test_dbus_tdls_channel_switch(dev, apdev): 2364 """D-Bus TDLS channel switch configuration""" 2365 flags = int(dev[0].get_driver_status_field('capa.flags'), 16) 2366 if flags & 0x800000000 == 0: 2367 raise HwsimSkip("Driver does not support TDLS channel switching") 2368 2369 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2370 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 2371 2372 hapd = hostapd.add_ap(apdev[0], {"ssid": "test-open"}) 2373 connect_2sta_open(dev, hapd) 2374 2375 addr1 = dev[1].p2p_interface_addr() 2376 2377 class TestDbusTdls(TestDbus): 2378 def __init__(self, bus): 2379 TestDbus.__init__(self, bus) 2380 self.tdls_setup = False 2381 self.tdls_done = False 2382 2383 def __enter__(self): 2384 gobject.timeout_add(1, self.run_tdls) 2385 gobject.timeout_add(15000, self.timeout) 2386 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, 2387 "PropertiesChanged") 2388 self.loop.run() 2389 return self 2390 2391 def propertiesChanged(self, properties): 2392 logger.debug("propertiesChanged: %s" % str(properties)) 2393 2394 def run_tdls(self, *args): 2395 logger.debug("run_tdls") 2396 iface.TDLSDiscover(addr1) 2397 gobject.timeout_add(100, self.run_tdls2) 2398 return False 2399 2400 def run_tdls2(self, *args): 2401 logger.debug("run_tdls2") 2402 iface.TDLSSetup(addr1) 2403 gobject.timeout_add(500, self.run_tdls3) 2404 return False 2405 2406 def run_tdls3(self, *args): 2407 logger.debug("run_tdls3") 2408 res = iface.TDLSStatus(addr1) 2409 if res == "connected": 2410 self.tdls_setup = True 2411 else: 2412 logger.info("Unexpected TDLSStatus: " + res) 2413 2414 # Unknown dict entry 2415 args = dbus.Dictionary({'Foobar': dbus.Byte(1)}, 2416 signature='sv') 2417 try: 2418 iface.TDLSChannelSwitch(args) 2419 except Exception as e: 2420 if "InvalidArgs" not in str(e): 2421 raise Exception("Unexpected exception") 2422 2423 # Missing OperClass 2424 args = dbus.Dictionary({}, signature='sv') 2425 try: 2426 iface.TDLSChannelSwitch(args) 2427 except Exception as e: 2428 if "InvalidArgs" not in str(e): 2429 raise Exception("Unexpected exception") 2430 2431 # Missing Frequency 2432 args = dbus.Dictionary({'OperClass': dbus.Byte(1)}, 2433 signature='sv') 2434 try: 2435 iface.TDLSChannelSwitch(args) 2436 except Exception as e: 2437 if "InvalidArgs" not in str(e): 2438 raise Exception("Unexpected exception") 2439 2440 # Missing PeerAddress 2441 args = dbus.Dictionary({'OperClass': dbus.Byte(1), 2442 'Frequency': dbus.UInt32(2417)}, 2443 signature='sv') 2444 try: 2445 iface.TDLSChannelSwitch(args) 2446 except Exception as e: 2447 if "InvalidArgs" not in str(e): 2448 raise Exception("Unexpected exception") 2449 2450 # Valid parameters 2451 args = dbus.Dictionary({'OperClass': dbus.Byte(1), 2452 'Frequency': dbus.UInt32(2417), 2453 'PeerAddress': addr1, 2454 'SecChannelOffset': dbus.UInt32(0), 2455 'CenterFrequency1': dbus.UInt32(0), 2456 'CenterFrequency2': dbus.UInt32(0), 2457 'Bandwidth': dbus.UInt32(20), 2458 'HT': dbus.Boolean(False), 2459 'VHT': dbus.Boolean(False)}, 2460 signature='sv') 2461 iface.TDLSChannelSwitch(args) 2462 2463 gobject.timeout_add(200, self.run_tdls4) 2464 return False 2465 2466 def run_tdls4(self, *args): 2467 logger.debug("run_tdls4") 2468 iface.TDLSCancelChannelSwitch(addr1) 2469 self.tdls_done = True 2470 self.loop.quit() 2471 return False 2472 2473 def success(self): 2474 return self.tdls_setup and self.tdls_done 2475 2476 with TestDbusTdls(bus) as t: 2477 if not t.success(): 2478 raise Exception("Expected signals not seen") 2479 2480def test_dbus_pkcs11(dev, apdev): 2481 """D-Bus SetPKCS11EngineAndModulePath()""" 2482 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2483 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 2484 2485 try: 2486 iface.SetPKCS11EngineAndModulePath("foo", "bar") 2487 except dbus.exceptions.DBusException as e: 2488 if "Error.Failed: Reinit of the EAPOL" not in str(e): 2489 raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e)) 2490 2491 try: 2492 iface.SetPKCS11EngineAndModulePath("foo", "") 2493 except dbus.exceptions.DBusException as e: 2494 if "Error.Failed: Reinit of the EAPOL" not in str(e): 2495 raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e)) 2496 2497 iface.SetPKCS11EngineAndModulePath("", "bar") 2498 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath", 2499 dbus_interface=dbus.PROPERTIES_IFACE) 2500 if res != "": 2501 raise Exception("Unexpected PKCS11EnginePath value: " + res) 2502 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath", 2503 dbus_interface=dbus.PROPERTIES_IFACE) 2504 if res != "bar": 2505 raise Exception("Unexpected PKCS11ModulePath value: " + res) 2506 2507 iface.SetPKCS11EngineAndModulePath("", "") 2508 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath", 2509 dbus_interface=dbus.PROPERTIES_IFACE) 2510 if res != "": 2511 raise Exception("Unexpected PKCS11EnginePath value: " + res) 2512 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath", 2513 dbus_interface=dbus.PROPERTIES_IFACE) 2514 if res != "": 2515 raise Exception("Unexpected PKCS11ModulePath value: " + res) 2516 2517def test_dbus_apscan(dev, apdev): 2518 """D-Bus Get/Set ApScan""" 2519 try: 2520 _test_dbus_apscan(dev, apdev) 2521 finally: 2522 dev[0].request("AP_SCAN 1") 2523 2524def _test_dbus_apscan(dev, apdev): 2525 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2526 2527 res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan", 2528 dbus_interface=dbus.PROPERTIES_IFACE) 2529 if res != 1: 2530 raise Exception("Unexpected initial ApScan value: %d" % res) 2531 2532 for i in range(3): 2533 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(i), 2534 dbus_interface=dbus.PROPERTIES_IFACE) 2535 res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan", 2536 dbus_interface=dbus.PROPERTIES_IFACE) 2537 if res != i: 2538 raise Exception("Unexpected ApScan value %d (expected %d)" % (res, i)) 2539 2540 try: 2541 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.Int16(-1), 2542 dbus_interface=dbus.PROPERTIES_IFACE) 2543 raise Exception("Invalid Set(ApScan,-1) accepted") 2544 except dbus.exceptions.DBusException as e: 2545 if "Error.Failed: wrong property type" not in str(e): 2546 raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e)) 2547 2548 try: 2549 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(123), 2550 dbus_interface=dbus.PROPERTIES_IFACE) 2551 raise Exception("Invalid Set(ApScan,123) accepted") 2552 except dbus.exceptions.DBusException as e: 2553 if "Error.Failed: ap_scan must be 0, 1, or 2" not in str(e): 2554 raise Exception("Unexpected error message for invalid Set(ApScan,123): " + str(e)) 2555 2556 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(1), 2557 dbus_interface=dbus.PROPERTIES_IFACE) 2558 2559def test_dbus_pmf(dev, apdev): 2560 """D-Bus Get/Set Pmf""" 2561 try: 2562 _test_dbus_pmf(dev, apdev) 2563 finally: 2564 dev[0].request("SET pmf 0") 2565 2566def _test_dbus_pmf(dev, apdev): 2567 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2568 2569 dev[0].set("pmf", "0") 2570 res = if_obj.Get(WPAS_DBUS_IFACE, "Pmf", 2571 dbus_interface=dbus.PROPERTIES_IFACE) 2572 if res != "0": 2573 raise Exception("Unexpected initial Pmf value: %s" % res) 2574 2575 for i in range(3): 2576 if_obj.Set(WPAS_DBUS_IFACE, "Pmf", str(i), 2577 dbus_interface=dbus.PROPERTIES_IFACE) 2578 res = if_obj.Get(WPAS_DBUS_IFACE, "Pmf", 2579 dbus_interface=dbus.PROPERTIES_IFACE) 2580 if res != str(i): 2581 raise Exception("Unexpected Pmf value %s (expected %d)" % (res, i)) 2582 2583 if_obj.Set(WPAS_DBUS_IFACE, "Pmf", "1", 2584 dbus_interface=dbus.PROPERTIES_IFACE) 2585 2586def test_dbus_fastreauth(dev, apdev): 2587 """D-Bus Get/Set FastReauth""" 2588 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2589 2590 res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth", 2591 dbus_interface=dbus.PROPERTIES_IFACE) 2592 if res != True: 2593 raise Exception("Unexpected initial FastReauth value: " + str(res)) 2594 2595 for i in [False, True]: 2596 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(i), 2597 dbus_interface=dbus.PROPERTIES_IFACE) 2598 res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth", 2599 dbus_interface=dbus.PROPERTIES_IFACE) 2600 if res != i: 2601 raise Exception("Unexpected FastReauth value %d (expected %d)" % (res, i)) 2602 2603 try: 2604 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Int16(-1), 2605 dbus_interface=dbus.PROPERTIES_IFACE) 2606 raise Exception("Invalid Set(FastReauth,-1) accepted") 2607 except dbus.exceptions.DBusException as e: 2608 if "Error.Failed: wrong property type" not in str(e): 2609 raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e)) 2610 2611 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(True), 2612 dbus_interface=dbus.PROPERTIES_IFACE) 2613 2614def test_dbus_bss_expire(dev, apdev): 2615 """D-Bus Get/Set BSSExpireAge and BSSExpireCount""" 2616 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2617 2618 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(179), 2619 dbus_interface=dbus.PROPERTIES_IFACE) 2620 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireAge", 2621 dbus_interface=dbus.PROPERTIES_IFACE) 2622 if res != 179: 2623 raise Exception("Unexpected BSSExpireAge value %d (expected %d)" % (res, i)) 2624 2625 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(3), 2626 dbus_interface=dbus.PROPERTIES_IFACE) 2627 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireCount", 2628 dbus_interface=dbus.PROPERTIES_IFACE) 2629 if res != 3: 2630 raise Exception("Unexpected BSSExpireCount value %d (expected %d)" % (res, i)) 2631 2632 try: 2633 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.Int16(-1), 2634 dbus_interface=dbus.PROPERTIES_IFACE) 2635 raise Exception("Invalid Set(BSSExpireAge,-1) accepted") 2636 except dbus.exceptions.DBusException as e: 2637 if "Error.Failed: wrong property type" not in str(e): 2638 raise Exception("Unexpected error message for invalid Set(BSSExpireAge,-1): " + str(e)) 2639 2640 try: 2641 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(9), 2642 dbus_interface=dbus.PROPERTIES_IFACE) 2643 raise Exception("Invalid Set(BSSExpireAge,9) accepted") 2644 except dbus.exceptions.DBusException as e: 2645 if "Error.Failed: BSSExpireAge must be >= 10" not in str(e): 2646 raise Exception("Unexpected error message for invalid Set(BSSExpireAge,9): " + str(e)) 2647 2648 try: 2649 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.Int16(-1), 2650 dbus_interface=dbus.PROPERTIES_IFACE) 2651 raise Exception("Invalid Set(BSSExpireCount,-1) accepted") 2652 except dbus.exceptions.DBusException as e: 2653 if "Error.Failed: wrong property type" not in str(e): 2654 raise Exception("Unexpected error message for invalid Set(BSSExpireCount,-1): " + str(e)) 2655 2656 try: 2657 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(0), 2658 dbus_interface=dbus.PROPERTIES_IFACE) 2659 raise Exception("Invalid Set(BSSExpireCount,0) accepted") 2660 except dbus.exceptions.DBusException as e: 2661 if "Error.Failed: BSSExpireCount must be > 0" not in str(e): 2662 raise Exception("Unexpected error message for invalid Set(BSSExpireCount,0): " + str(e)) 2663 2664 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(180), 2665 dbus_interface=dbus.PROPERTIES_IFACE) 2666 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(2), 2667 dbus_interface=dbus.PROPERTIES_IFACE) 2668 2669def test_dbus_country(dev, apdev): 2670 """D-Bus Get/Set Country""" 2671 try: 2672 _test_dbus_country(dev, apdev) 2673 finally: 2674 dev[0].request("SET country 00") 2675 subprocess.call(['iw', 'reg', 'set', '00']) 2676 2677def _test_dbus_country(dev, apdev): 2678 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2679 2680 # work around issues with possible pending regdom event from the end of 2681 # the previous test case 2682 time.sleep(0.2) 2683 dev[0].dump_monitor() 2684 2685 if_obj.Set(WPAS_DBUS_IFACE, "Country", "FI", 2686 dbus_interface=dbus.PROPERTIES_IFACE) 2687 res = if_obj.Get(WPAS_DBUS_IFACE, "Country", 2688 dbus_interface=dbus.PROPERTIES_IFACE) 2689 if res != "FI": 2690 raise Exception("Unexpected Country value %s (expected FI)" % res) 2691 2692 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"]) 2693 if ev is None: 2694 # For now, work around separate P2P Device interface event delivery 2695 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1) 2696 if ev is None: 2697 raise Exception("regdom change event not seen") 2698 if "init=USER type=COUNTRY alpha2=FI" not in ev: 2699 raise Exception("Unexpected event contents: " + ev) 2700 2701 try: 2702 if_obj.Set(WPAS_DBUS_IFACE, "Country", dbus.Int16(-1), 2703 dbus_interface=dbus.PROPERTIES_IFACE) 2704 raise Exception("Invalid Set(Country,-1) accepted") 2705 except dbus.exceptions.DBusException as e: 2706 if "Error.Failed: wrong property type" not in str(e): 2707 raise Exception("Unexpected error message for invalid Set(Country,-1): " + str(e)) 2708 2709 try: 2710 if_obj.Set(WPAS_DBUS_IFACE, "Country", "F", 2711 dbus_interface=dbus.PROPERTIES_IFACE) 2712 raise Exception("Invalid Set(Country,F) accepted") 2713 except dbus.exceptions.DBusException as e: 2714 if "Error.Failed: invalid country code" not in str(e): 2715 raise Exception("Unexpected error message for invalid Set(Country,F): " + str(e)) 2716 2717 if_obj.Set(WPAS_DBUS_IFACE, "Country", "00", 2718 dbus_interface=dbus.PROPERTIES_IFACE) 2719 2720 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"]) 2721 if ev is None: 2722 # For now, work around separate P2P Device interface event delivery 2723 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1) 2724 if ev is None: 2725 raise Exception("regdom change event not seen") 2726 # init=CORE was previously used due to invalid db.txt data for 00. For 2727 # now, allow both it and the new init=USER after fixed db.txt. 2728 if "init=CORE type=WORLD" not in ev and "init=USER type=WORLD" not in ev: 2729 raise Exception("Unexpected event contents: " + ev) 2730 2731def test_dbus_scan_interval(dev, apdev): 2732 """D-Bus Get/Set ScanInterval""" 2733 try: 2734 _test_dbus_scan_interval(dev, apdev) 2735 finally: 2736 dev[0].request("SCAN_INTERVAL 5") 2737 2738def _test_dbus_scan_interval(dev, apdev): 2739 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2740 2741 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(3), 2742 dbus_interface=dbus.PROPERTIES_IFACE) 2743 res = if_obj.Get(WPAS_DBUS_IFACE, "ScanInterval", 2744 dbus_interface=dbus.PROPERTIES_IFACE) 2745 if res != 3: 2746 raise Exception("Unexpected ScanInterval value %d (expected %d)" % (res, i)) 2747 2748 try: 2749 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.UInt16(100), 2750 dbus_interface=dbus.PROPERTIES_IFACE) 2751 raise Exception("Invalid Set(ScanInterval,100) accepted") 2752 except dbus.exceptions.DBusException as e: 2753 if "Error.Failed: wrong property type" not in str(e): 2754 raise Exception("Unexpected error message for invalid Set(ScanInterval,100): " + str(e)) 2755 2756 try: 2757 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(-1), 2758 dbus_interface=dbus.PROPERTIES_IFACE) 2759 raise Exception("Invalid Set(ScanInterval,-1) accepted") 2760 except dbus.exceptions.DBusException as e: 2761 if "Error.Failed: scan_interval must be >= 0" not in str(e): 2762 raise Exception("Unexpected error message for invalid Set(ScanInterval,-1): " + str(e)) 2763 2764 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(5), 2765 dbus_interface=dbus.PROPERTIES_IFACE) 2766 2767def test_dbus_probe_req_reporting(dev, apdev): 2768 """D-Bus Probe Request reporting""" 2769 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2770 2771 dev[1].p2p_find(social=True) 2772 2773 class TestDbusProbe(TestDbus): 2774 def __init__(self, bus): 2775 TestDbus.__init__(self, bus) 2776 self.reported = False 2777 2778 def __enter__(self): 2779 gobject.timeout_add(1, self.run_test) 2780 gobject.timeout_add(15000, self.timeout) 2781 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 2782 "GroupStarted") 2783 self.add_signal(self.probeRequest, WPAS_DBUS_IFACE, "ProbeRequest", 2784 byte_arrays=True) 2785 self.loop.run() 2786 return self 2787 2788 def groupStarted(self, properties): 2789 logger.debug("groupStarted: " + str(properties)) 2790 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 2791 properties['interface_object']) 2792 self.iface = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE) 2793 self.iface.SubscribeProbeReq() 2794 self.group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 2795 2796 def probeRequest(self, args): 2797 logger.debug("probeRequest: args=%s" % str(args)) 2798 self.reported = True 2799 self.loop.quit() 2800 2801 def run_test(self, *args): 2802 logger.debug("run_test") 2803 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 2804 params = dbus.Dictionary({'frequency': 2412}) 2805 p2p.GroupAdd(params) 2806 return False 2807 2808 def success(self): 2809 return self.reported 2810 2811 with TestDbusProbe(bus) as t: 2812 if not t.success(): 2813 raise Exception("Expected signals not seen") 2814 t.iface.UnsubscribeProbeReq() 2815 try: 2816 t.iface.UnsubscribeProbeReq() 2817 raise Exception("Invalid UnsubscribeProbeReq() accepted") 2818 except dbus.exceptions.DBusException as e: 2819 if "NoSubscription" not in str(e): 2820 raise Exception("Unexpected error message for invalid UnsubscribeProbeReq(): " + str(e)) 2821 t.group_p2p.Disconnect() 2822 2823 with TestDbusProbe(bus) as t: 2824 if not t.success(): 2825 raise Exception("Expected signals not seen") 2826 # On purpose, leave ProbeReq subscription in place to test automatic 2827 # cleanup. 2828 2829 dev[1].p2p_stop_find() 2830 2831def test_dbus_probe_req_reporting_oom(dev, apdev): 2832 """D-Bus Probe Request reporting (OOM)""" 2833 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2834 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 2835 2836 # Need to make sure this process has not already subscribed to avoid false 2837 # failures due to the operation succeeding due to os_strdup() not even 2838 # getting called. 2839 try: 2840 iface.UnsubscribeProbeReq() 2841 was_subscribed = True 2842 except dbus.exceptions.DBusException as e: 2843 was_subscribed = False 2844 pass 2845 2846 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_subscribe_preq", 2847 "SubscribeProbeReq"): 2848 iface.SubscribeProbeReq() 2849 2850 if was_subscribed: 2851 # On purpose, leave ProbeReq subscription in place to test automatic 2852 # cleanup. 2853 iface.SubscribeProbeReq() 2854 2855def test_dbus_p2p_invalid(dev, apdev): 2856 """D-Bus invalid P2P operations""" 2857 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2858 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 2859 2860 try: 2861 p2p.RejectPeer(path + "/Peers/00112233445566") 2862 raise Exception("Invalid RejectPeer accepted") 2863 except dbus.exceptions.DBusException as e: 2864 if "UnknownError: Failed to call wpas_p2p_reject" not in str(e): 2865 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e)) 2866 2867 try: 2868 p2p.RejectPeer("/foo") 2869 raise Exception("Invalid RejectPeer accepted") 2870 except dbus.exceptions.DBusException as e: 2871 if "InvalidArgs" not in str(e): 2872 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e)) 2873 2874 tests = [{}, 2875 {'peer': 'foo'}, 2876 {'foo': "bar"}, 2877 {'iface': "abc"}, 2878 {'iface': 123}] 2879 for t in tests: 2880 try: 2881 p2p.RemoveClient(t) 2882 raise Exception("Invalid RemoveClient accepted") 2883 except dbus.exceptions.DBusException as e: 2884 if "InvalidArgs" not in str(e): 2885 raise Exception("Unexpected error message for invalid RemoveClient(): " + str(e)) 2886 2887 tests = [{'DiscoveryType': 'foo'}, 2888 {'RequestedDeviceTypes': 'foo'}, 2889 {'RequestedDeviceTypes': ['foo']}, 2890 {'RequestedDeviceTypes': ['1', '2', '3', '4', '5', '6', '7', '8', 2891 '9', '10', '11', '12', '13', '14', '15', 2892 '16', '17']}, 2893 {'RequestedDeviceTypes': dbus.Array([], signature="s")}, 2894 {'RequestedDeviceTypes': dbus.Array([['foo']], signature="as")}, 2895 {'RequestedDeviceTypes': dbus.Array([], signature="i")}, 2896 {'RequestedDeviceTypes': [dbus.ByteArray(b'12345678'), 2897 dbus.ByteArray(b'1234567')]}, 2898 {'Foo': dbus.Int16(1)}, 2899 {'Foo': dbus.UInt16(1)}, 2900 {'Foo': dbus.Int64(1)}, 2901 {'Foo': dbus.UInt64(1)}, 2902 {'Foo': dbus.Double(1.23)}, 2903 {'Foo': dbus.Signature('s')}, 2904 {'Foo': 'bar'}] 2905 for t in tests: 2906 try: 2907 p2p.Find(dbus.Dictionary(t)) 2908 raise Exception("Invalid Find accepted") 2909 except dbus.exceptions.DBusException as e: 2910 if "InvalidArgs" not in str(e): 2911 raise Exception("Unexpected error message for invalid Find(): " + str(e)) 2912 2913 for p in ["/foo", 2914 "/fi/w1/wpa_supplicant1/Interfaces/1234", 2915 "/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234"]: 2916 try: 2917 p2p.RemovePersistentGroup(dbus.ObjectPath(p)) 2918 raise Exception("Invalid RemovePersistentGroup accepted") 2919 except dbus.exceptions.DBusException as e: 2920 if "InvalidArgs" not in str(e): 2921 raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e)) 2922 2923 try: 2924 dev[0].request("P2P_SET disabled 1") 2925 p2p.Listen(5) 2926 raise Exception("Invalid Listen accepted") 2927 except dbus.exceptions.DBusException as e: 2928 if "UnknownError: Could not start P2P listen" not in str(e): 2929 raise Exception("Unexpected error message for invalid Listen: " + str(e)) 2930 finally: 2931 dev[0].request("P2P_SET disabled 0") 2932 2933 test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False) 2934 test_p2p = dbus.Interface(test_obj, WPAS_DBUS_IFACE_P2PDEVICE) 2935 try: 2936 test_p2p.Listen("foo") 2937 raise Exception("Invalid Listen accepted") 2938 except dbus.exceptions.DBusException as e: 2939 if "InvalidArgs" not in str(e): 2940 raise Exception("Unexpected error message for invalid Listen: " + str(e)) 2941 2942 try: 2943 dev[0].request("P2P_SET disabled 1") 2944 p2p.ExtendedListen(dbus.Dictionary({})) 2945 raise Exception("Invalid ExtendedListen accepted") 2946 except dbus.exceptions.DBusException as e: 2947 if "UnknownError: failed to initiate a p2p_ext_listen" not in str(e): 2948 raise Exception("Unexpected error message for invalid ExtendedListen: " + str(e)) 2949 finally: 2950 dev[0].request("P2P_SET disabled 0") 2951 2952 try: 2953 dev[0].request("P2P_SET disabled 1") 2954 args = {'duration1': 30000, 'interval1': 102400, 2955 'duration2': 20000, 'interval2': 102400} 2956 p2p.PresenceRequest(args) 2957 raise Exception("Invalid PresenceRequest accepted") 2958 except dbus.exceptions.DBusException as e: 2959 if "UnknownError: Failed to invoke presence request" not in str(e): 2960 raise Exception("Unexpected error message for invalid PresenceRequest: " + str(e)) 2961 finally: 2962 dev[0].request("P2P_SET disabled 0") 2963 2964 try: 2965 params = dbus.Dictionary({'frequency': dbus.Int32(-1)}) 2966 p2p.GroupAdd(params) 2967 raise Exception("Invalid GroupAdd accepted") 2968 except dbus.exceptions.DBusException as e: 2969 if "InvalidArgs" not in str(e): 2970 raise Exception("Unexpected error message for invalid GroupAdd: " + str(e)) 2971 2972 try: 2973 params = dbus.Dictionary({'persistent_group_object': 2974 dbus.ObjectPath(path), 2975 'frequency': 2412}) 2976 p2p.GroupAdd(params) 2977 raise Exception("Invalid GroupAdd accepted") 2978 except dbus.exceptions.DBusException as e: 2979 if "InvalidArgs" not in str(e): 2980 raise Exception("Unexpected error message for invalid GroupAdd: " + str(e)) 2981 2982 try: 2983 p2p.Disconnect() 2984 raise Exception("Invalid Disconnect accepted") 2985 except dbus.exceptions.DBusException as e: 2986 if "UnknownError: failed to disconnect" not in str(e): 2987 raise Exception("Unexpected error message for invalid Disconnect: " + str(e)) 2988 2989 try: 2990 dev[0].request("P2P_SET disabled 1") 2991 p2p.Flush() 2992 raise Exception("Invalid Flush accepted") 2993 except dbus.exceptions.DBusException as e: 2994 if "Error.Failed: P2P is not available for this interface" not in str(e): 2995 raise Exception("Unexpected error message for invalid Flush: " + str(e)) 2996 finally: 2997 dev[0].request("P2P_SET disabled 0") 2998 2999 try: 3000 dev[0].request("P2P_SET disabled 1") 3001 args = {'peer': path, 3002 'join': True, 3003 'wps_method': 'pbc', 3004 'frequency': 2412} 3005 pin = p2p.Connect(args) 3006 raise Exception("Invalid Connect accepted") 3007 except dbus.exceptions.DBusException as e: 3008 if "Error.Failed: P2P is not available for this interface" not in str(e): 3009 raise Exception("Unexpected error message for invalid Connect: " + str(e)) 3010 finally: 3011 dev[0].request("P2P_SET disabled 0") 3012 3013 tests = [{'frequency': dbus.Int32(-1)}, 3014 {'wps_method': 'pbc'}, 3015 {'wps_method': 'foo'}] 3016 for args in tests: 3017 try: 3018 pin = p2p.Connect(args) 3019 raise Exception("Invalid Connect accepted") 3020 except dbus.exceptions.DBusException as e: 3021 if "InvalidArgs" not in str(e): 3022 raise Exception("Unexpected error message for invalid Connect: " + str(e)) 3023 3024 try: 3025 dev[0].request("P2P_SET disabled 1") 3026 args = {'peer': path} 3027 pin = p2p.Invite(args) 3028 raise Exception("Invalid Invite accepted") 3029 except dbus.exceptions.DBusException as e: 3030 if "Error.Failed: P2P is not available for this interface" not in str(e): 3031 raise Exception("Unexpected error message for invalid Invite: " + str(e)) 3032 finally: 3033 dev[0].request("P2P_SET disabled 0") 3034 3035 try: 3036 args = {'foo': 'bar'} 3037 pin = p2p.Invite(args) 3038 raise Exception("Invalid Invite accepted") 3039 except dbus.exceptions.DBusException as e: 3040 if "InvalidArgs" not in str(e): 3041 raise Exception("Unexpected error message for invalid Connect: " + str(e)) 3042 3043 tests = [(path, 'display', "InvalidArgs"), 3044 (dbus.ObjectPath(path + "/Peers/00112233445566"), 3045 'display', 3046 "UnknownError: Failed to send provision discovery request"), 3047 (dbus.ObjectPath(path + "/Peers/00112233445566"), 3048 'keypad', 3049 "UnknownError: Failed to send provision discovery request"), 3050 (dbus.ObjectPath(path + "/Peers/00112233445566"), 3051 'pbc', 3052 "UnknownError: Failed to send provision discovery request"), 3053 (dbus.ObjectPath(path + "/Peers/00112233445566"), 3054 'pushbutton', 3055 "UnknownError: Failed to send provision discovery request"), 3056 (dbus.ObjectPath(path + "/Peers/00112233445566"), 3057 'foo', "InvalidArgs")] 3058 for (p, method, err) in tests: 3059 try: 3060 p2p.ProvisionDiscoveryRequest(p, method) 3061 raise Exception("Invalid ProvisionDiscoveryRequest accepted") 3062 except dbus.exceptions.DBusException as e: 3063 if err not in str(e): 3064 raise Exception("Unexpected error message for invalid ProvisionDiscoveryRequest: " + str(e)) 3065 3066 try: 3067 dev[0].request("P2P_SET disabled 1") 3068 if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers", 3069 dbus_interface=dbus.PROPERTIES_IFACE) 3070 raise Exception("Invalid Get(Peers) accepted") 3071 except dbus.exceptions.DBusException as e: 3072 if "Error.Failed: P2P is not available for this interface" not in str(e): 3073 raise Exception("Unexpected error message for invalid Get(Peers): " + str(e)) 3074 finally: 3075 dev[0].request("P2P_SET disabled 0") 3076 3077def test_dbus_p2p_oom(dev, apdev): 3078 """D-Bus P2P operations and OOM""" 3079 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 3080 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 3081 3082 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_string_array", 3083 "Find", "InvalidArgs"): 3084 p2p.Find(dbus.Dictionary({'Foo': ['bar']})) 3085 3086 with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_string_array", 3087 "Find", "InvalidArgs"): 3088 p2p.Find(dbus.Dictionary({'Foo': ['bar']})) 3089 3090 with alloc_fail_dbus(dev[0], 10, "_wpa_dbus_dict_entry_get_string_array", 3091 "Find", "InvalidArgs"): 3092 p2p.Find(dbus.Dictionary({'Foo': ['1', '2', '3', '4', '5', '6', '7', 3093 '8', '9']})) 3094 3095 with alloc_fail_dbus(dev[0], 1, ":=_wpa_dbus_dict_entry_get_binarray", 3096 "Find", "InvalidArgs"): 3097 p2p.Find(dbus.Dictionary({'Foo': [dbus.ByteArray(b'123')]})) 3098 3099 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array;_wpa_dbus_dict_entry_get_binarray", 3100 "Find", "InvalidArgs"): 3101 p2p.Find(dbus.Dictionary({'Foo': [dbus.ByteArray(b'123')]})) 3102 3103 with alloc_fail_dbus(dev[0], 2, "=_wpa_dbus_dict_entry_get_binarray", 3104 "Find", "InvalidArgs"): 3105 p2p.Find(dbus.Dictionary({'Foo': [dbus.ByteArray(b'123'), 3106 dbus.ByteArray(b'123'), 3107 dbus.ByteArray(b'123'), 3108 dbus.ByteArray(b'123'), 3109 dbus.ByteArray(b'123'), 3110 dbus.ByteArray(b'123'), 3111 dbus.ByteArray(b'123'), 3112 dbus.ByteArray(b'123'), 3113 dbus.ByteArray(b'123'), 3114 dbus.ByteArray(b'123'), 3115 dbus.ByteArray(b'123')]})) 3116 3117 with alloc_fail_dbus(dev[0], 1, "wpabuf_alloc_ext_data;_wpa_dbus_dict_entry_get_binarray", 3118 "Find", "InvalidArgs"): 3119 p2p.Find(dbus.Dictionary({'Foo': [dbus.ByteArray(b'123')]})) 3120 3121 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_handler_p2p_find", 3122 "Find", "InvalidArgs"): 3123 p2p.Find(dbus.Dictionary({'Foo': path})) 3124 3125 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array", 3126 "AddService", "InvalidArgs"): 3127 args = {'service_type': 'bonjour', 3128 'response': dbus.ByteArray(500*b'b')} 3129 p2p.AddService(args) 3130 3131 with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_byte_array", 3132 "AddService", "InvalidArgs"): 3133 p2p.AddService(args) 3134 3135def test_dbus_p2p_discovery(dev, apdev): 3136 """D-Bus P2P discovery""" 3137 try: 3138 run_dbus_p2p_discovery(dev, apdev) 3139 finally: 3140 dev[1].request("VENDOR_ELEM_REMOVE 1 *") 3141 3142def run_dbus_p2p_discovery(dev, apdev): 3143 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 3144 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 3145 3146 addr0 = dev[0].p2p_dev_addr() 3147 3148 dev[1].request("SET sec_device_type 1-0050F204-2") 3149 dev[1].request("VENDOR_ELEM_ADD 1 dd0c0050f2041049000411223344") 3150 dev[1].request("VENDOR_ELEM_ADD 1 dd06001122335566") 3151 dev[1].p2p_listen() 3152 addr1 = dev[1].p2p_dev_addr() 3153 a1 = binascii.unhexlify(addr1.replace(':', '')) 3154 3155 wfd_devinfo = "00001c440028" 3156 dev[2].request("SET wifi_display 1") 3157 dev[2].request("WFD_SUBELEM_SET 0 0006" + wfd_devinfo) 3158 wfd = binascii.unhexlify('000006' + wfd_devinfo) 3159 dev[2].p2p_listen() 3160 addr2 = dev[2].p2p_dev_addr() 3161 a2 = binascii.unhexlify(addr2.replace(':', '')) 3162 3163 res = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE, 3164 dbus_interface=dbus.PROPERTIES_IFACE) 3165 if 'Peers' not in res: 3166 raise Exception("GetAll result missing Peers") 3167 if len(res['Peers']) != 0: 3168 raise Exception("Unexpected peer(s) in the list") 3169 3170 args = {'DiscoveryType': 'social', 3171 'RequestedDeviceTypes': [dbus.ByteArray(b'12345678')], 3172 'Timeout': dbus.Int32(1)} 3173 p2p.Find(dbus.Dictionary(args)) 3174 p2p.StopFind() 3175 3176 class TestDbusP2p(TestDbus): 3177 def __init__(self, bus): 3178 TestDbus.__init__(self, bus) 3179 self.found = False 3180 self.found2 = False 3181 self.found_prop = False 3182 self.lost = False 3183 self.find_stopped = False 3184 3185 def __enter__(self): 3186 gobject.timeout_add(1, self.run_test) 3187 gobject.timeout_add(15000, self.timeout) 3188 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 3189 "DeviceFound") 3190 self.add_signal(self.deviceFoundProperties, 3191 WPAS_DBUS_IFACE_P2PDEVICE, "DeviceFoundProperties") 3192 self.add_signal(self.deviceLost, WPAS_DBUS_IFACE_P2PDEVICE, 3193 "DeviceLost") 3194 self.add_signal(self.provisionDiscoveryResponseEnterPin, 3195 WPAS_DBUS_IFACE_P2PDEVICE, 3196 "ProvisionDiscoveryResponseEnterPin") 3197 self.add_signal(self.findStopped, WPAS_DBUS_IFACE_P2PDEVICE, 3198 "FindStopped") 3199 self.loop.run() 3200 return self 3201 3202 def deviceFound(self, path): 3203 logger.debug("deviceFound: path=%s" % path) 3204 res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers", 3205 dbus_interface=dbus.PROPERTIES_IFACE) 3206 if len(res) < 1: 3207 raise Exception("Unexpected number of peers") 3208 if path not in res: 3209 raise Exception("Mismatch in peer object path") 3210 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path) 3211 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER, 3212 dbus_interface=dbus.PROPERTIES_IFACE, 3213 byte_arrays=True) 3214 logger.debug("peer properties: " + str(res)) 3215 3216 if res['DeviceAddress'] == a1: 3217 if 'SecondaryDeviceTypes' not in res: 3218 raise Exception("Missing SecondaryDeviceTypes") 3219 sec = res['SecondaryDeviceTypes'] 3220 if len(sec) < 1: 3221 raise Exception("Secondary device type missing") 3222 if b"\x00\x01\x00\x50\xF2\x04\x00\x02" not in sec: 3223 raise Exception("Secondary device type mismatch") 3224 3225 if 'VendorExtension' not in res: 3226 raise Exception("Missing VendorExtension") 3227 vendor = res['VendorExtension'] 3228 if len(vendor) < 1: 3229 raise Exception("Vendor extension missing") 3230 if b"\x11\x22\x33\x44" not in vendor: 3231 raise Exception("Secondary device type mismatch") 3232 3233 if 'VSIE' not in res: 3234 raise Exception("Missing VSIE") 3235 vendor = res['VSIE'] 3236 if len(vendor) < 1: 3237 raise Exception("VSIE missing") 3238 if vendor != b"\xdd\x06\x00\x11\x22\x33\x55\x66": 3239 raise Exception("VSIE mismatch") 3240 3241 self.found = True 3242 elif res['DeviceAddress'] == a2: 3243 if 'IEs' not in res: 3244 raise Exception("IEs missing") 3245 if res['IEs'] != wfd: 3246 raise Exception("IEs mismatch") 3247 self.found2 = True 3248 else: 3249 raise Exception("Unexpected peer device address") 3250 3251 if self.found and self.found2: 3252 p2p.StopFind() 3253 p2p.RejectPeer(path) 3254 p2p.ProvisionDiscoveryRequest(path, 'display') 3255 3256 def deviceLost(self, path): 3257 logger.debug("deviceLost: path=%s" % path) 3258 if not self.found or not self.found2: 3259 # This may happen if a previous test case ended up scheduling 3260 # deviceLost event and that event did not get delivered before 3261 # starting the next test execution. 3262 logger.debug("Ignore deviceLost before the deviceFound events") 3263 return 3264 self.lost = True 3265 try: 3266 p2p.RejectPeer(path) 3267 raise Exception("Invalid RejectPeer accepted") 3268 except dbus.exceptions.DBusException as e: 3269 if "UnknownError: Failed to call wpas_p2p_reject" not in str(e): 3270 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e)) 3271 self.loop.quit() 3272 3273 def deviceFoundProperties(self, path, properties): 3274 logger.debug("deviceFoundProperties: path=%s" % path) 3275 logger.debug("peer properties: " + str(properties)) 3276 if properties['DeviceAddress'] == a1: 3277 self.found_prop = True 3278 3279 def provisionDiscoveryResponseEnterPin(self, peer_object): 3280 logger.debug("provisionDiscoveryResponseEnterPin - peer=%s" % peer_object) 3281 p2p.Flush() 3282 3283 def findStopped(self): 3284 logger.debug("findStopped") 3285 self.find_stopped = True 3286 3287 def run_test(self, *args): 3288 logger.debug("run_test") 3289 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social', 3290 'Timeout': dbus.Int32(10)})) 3291 return False 3292 3293 def success(self): 3294 return self.found and self.lost and self.found2 and self.find_stopped 3295 3296 with TestDbusP2p(bus) as t: 3297 if not t.success(): 3298 raise Exception("Expected signals not seen") 3299 3300 dev[1].request("VENDOR_ELEM_REMOVE 1 *") 3301 dev[1].p2p_stop_find() 3302 3303 p2p.Listen(1) 3304 dev[2].p2p_stop_find() 3305 dev[2].request("P2P_FLUSH") 3306 if not dev[2].discover_peer(addr0): 3307 raise Exception("Peer not found") 3308 p2p.StopFind() 3309 dev[2].p2p_stop_find() 3310 3311 try: 3312 p2p.ExtendedListen(dbus.Dictionary({'foo': 100})) 3313 raise Exception("Invalid ExtendedListen accepted") 3314 except dbus.exceptions.DBusException as e: 3315 if "InvalidArgs" not in str(e): 3316 raise Exception("Unexpected error message for invalid ExtendedListen(): " + str(e)) 3317 3318 p2p.ExtendedListen(dbus.Dictionary({'period': 100, 'interval': 1000})) 3319 p2p.ExtendedListen(dbus.Dictionary({})) 3320 dev[0].global_request("P2P_EXT_LISTEN") 3321 3322def test_dbus_p2p_discovery_freq(dev, apdev): 3323 """D-Bus P2P discovery on a specific non-social channel""" 3324 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 3325 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 3326 3327 addr1 = dev[1].p2p_dev_addr() 3328 autogo(dev[1], freq=2422) 3329 3330 class TestDbusP2p(TestDbus): 3331 def __init__(self, bus): 3332 TestDbus.__init__(self, bus) 3333 self.found = False 3334 3335 def __enter__(self): 3336 gobject.timeout_add(1, self.run_test) 3337 gobject.timeout_add(5000, self.timeout) 3338 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 3339 "DeviceFound") 3340 self.loop.run() 3341 return self 3342 3343 def deviceFound(self, path): 3344 logger.debug("deviceFound: path=%s" % path) 3345 self.found = True 3346 self.loop.quit() 3347 3348 def run_test(self, *args): 3349 logger.debug("run_test") 3350 p2p.Find(dbus.Dictionary({'freq': 2422})) 3351 return False 3352 3353 def success(self): 3354 return self.found 3355 3356 with TestDbusP2p(bus) as t: 3357 if not t.success(): 3358 raise Exception("Expected signals not seen") 3359 3360 dev[1].remove_group() 3361 p2p.StopFind() 3362 3363def test_dbus_p2p_service_discovery(dev, apdev): 3364 """D-Bus P2P service discovery""" 3365 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 3366 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 3367 3368 addr0 = dev[0].p2p_dev_addr() 3369 addr1 = dev[1].p2p_dev_addr() 3370 3371 bonjour_query = dbus.ByteArray(binascii.unhexlify('0b5f6166706f766572746370c00c000c01')) 3372 bonjour_response = dbus.ByteArray(binascii.unhexlify('074578616d706c65c027')) 3373 3374 args = {'service_type': 'bonjour', 3375 'query': bonjour_query, 3376 'response': bonjour_response} 3377 p2p.AddService(args) 3378 p2p.FlushService() 3379 p2p.AddService(args) 3380 3381 try: 3382 p2p.DeleteService(args) 3383 raise Exception("Invalid DeleteService() accepted") 3384 except dbus.exceptions.DBusException as e: 3385 if "InvalidArgs" not in str(e): 3386 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e)) 3387 3388 args = {'service_type': 'bonjour', 3389 'query': bonjour_query} 3390 p2p.DeleteService(args) 3391 try: 3392 p2p.DeleteService(args) 3393 raise Exception("Invalid DeleteService() accepted") 3394 except dbus.exceptions.DBusException as e: 3395 if "InvalidArgs" not in str(e): 3396 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e)) 3397 3398 args = {'service_type': 'upnp', 3399 'version': 0x10, 3400 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice'} 3401 p2p.AddService(args) 3402 p2p.DeleteService(args) 3403 try: 3404 p2p.DeleteService(args) 3405 raise Exception("Invalid DeleteService() accepted") 3406 except dbus.exceptions.DBusException as e: 3407 if "InvalidArgs" not in str(e): 3408 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e)) 3409 3410 tests = [{'service_type': 'foo'}, 3411 {'service_type': 'foo', 'query': bonjour_query}, 3412 {'service_type': 'upnp'}, 3413 {'service_type': 'upnp', 'version': 0x10}, 3414 {'service_type': 'upnp', 3415 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice'}, 3416 {'version': 0x10, 3417 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice'}, 3418 {'service_type': 'upnp', 'foo': 'bar'}, 3419 {'service_type': 'bonjour'}, 3420 {'service_type': 'bonjour', 'query': 'foo'}, 3421 {'service_type': 'bonjour', 'foo': 'bar'}] 3422 for args in tests: 3423 try: 3424 p2p.DeleteService(args) 3425 raise Exception("Invalid DeleteService() accepted") 3426 except dbus.exceptions.DBusException as e: 3427 if "InvalidArgs" not in str(e): 3428 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e)) 3429 3430 tests = [{'service_type': 'foo'}, 3431 {'service_type': 'upnp'}, 3432 {'service_type': 'upnp', 'version': 0x10}, 3433 {'service_type': 'upnp', 3434 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice'}, 3435 {'version': 0x10, 3436 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice'}, 3437 {'service_type': 'upnp', 'foo': 'bar'}, 3438 {'service_type': 'bonjour'}, 3439 {'service_type': 'bonjour', 'query': 'foo'}, 3440 {'service_type': 'bonjour', 'response': 'foo'}, 3441 {'service_type': 'bonjour', 'query': bonjour_query}, 3442 {'service_type': 'bonjour', 'response': bonjour_response}, 3443 {'service_type': 'bonjour', 'query': dbus.ByteArray(500*b'a')}, 3444 {'service_type': 'bonjour', 'foo': 'bar'}] 3445 for args in tests: 3446 try: 3447 p2p.AddService(args) 3448 raise Exception("Invalid AddService() accepted") 3449 except dbus.exceptions.DBusException as e: 3450 if "InvalidArgs" not in str(e): 3451 raise Exception("Unexpected error message for invalid AddService(): " + str(e)) 3452 3453 args = {'tlv': dbus.ByteArray(b"\x02\x00\x00\x01")} 3454 ref = p2p.ServiceDiscoveryRequest(args) 3455 p2p.ServiceDiscoveryCancelRequest(ref) 3456 try: 3457 p2p.ServiceDiscoveryCancelRequest(ref) 3458 raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted") 3459 except dbus.exceptions.DBusException as e: 3460 if "InvalidArgs" not in str(e): 3461 raise Exception("Unexpected error message for invalid AddService(): " + str(e)) 3462 try: 3463 p2p.ServiceDiscoveryCancelRequest(dbus.UInt64(0)) 3464 raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted") 3465 except dbus.exceptions.DBusException as e: 3466 if "InvalidArgs" not in str(e): 3467 raise Exception("Unexpected error message for invalid AddService(): " + str(e)) 3468 3469 args = {'service_type': 'upnp', 3470 'version': 0x10, 3471 'service': 'ssdp:foo'} 3472 ref = p2p.ServiceDiscoveryRequest(args) 3473 p2p.ServiceDiscoveryCancelRequest(ref) 3474 3475 tests = [{'service_type': 'foo'}, 3476 {'foo': 'bar'}, 3477 {'tlv': 'foo'}, 3478 {}, 3479 {'version': 0}, 3480 {'service_type': 'upnp', 3481 'service': 'ssdp:foo'}, 3482 {'service_type': 'upnp', 3483 'version': 0x10}, 3484 {'service_type': 'upnp', 3485 'version': 0x10, 3486 'service': 'ssdp:foo', 3487 'peer_object': dbus.ObjectPath(path + "/Peers")}, 3488 {'service_type': 'upnp', 3489 'version': 0x10, 3490 'service': 'ssdp:foo', 3491 'peer_object': path + "/Peers"}, 3492 {'service_type': 'upnp', 3493 'version': 0x10, 3494 'service': 'ssdp:foo', 3495 'peer_object': dbus.ObjectPath(path + "/Peers/00112233445566")}] 3496 for args in tests: 3497 try: 3498 p2p.ServiceDiscoveryRequest(args) 3499 raise Exception("Invalid ServiceDiscoveryRequest accepted") 3500 except dbus.exceptions.DBusException as e: 3501 if "InvalidArgs" not in str(e): 3502 raise Exception("Unexpected error message for invalid ServiceDiscoveryRequest(): " + str(e)) 3503 3504 args = {'foo': 'bar'} 3505 try: 3506 p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv')) 3507 raise Exception("Invalid ServiceDiscoveryResponse accepted") 3508 except dbus.exceptions.DBusException as e: 3509 if "InvalidArgs" not in str(e): 3510 raise Exception("Unexpected error message for invalid ServiceDiscoveryResponse(): " + str(e)) 3511 3512def test_dbus_p2p_service_discovery_query(dev, apdev): 3513 """D-Bus P2P service discovery query""" 3514 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 3515 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 3516 3517 addr0 = dev[0].p2p_dev_addr() 3518 dev[1].request("P2P_SERVICE_ADD bonjour 0b5f6166706f766572746370c00c000c01 074578616d706c65c027") 3519 dev[1].p2p_listen() 3520 addr1 = dev[1].p2p_dev_addr() 3521 3522 class TestDbusP2p(TestDbus): 3523 def __init__(self, bus): 3524 TestDbus.__init__(self, bus) 3525 self.done = False 3526 3527 def __enter__(self): 3528 gobject.timeout_add(1, self.run_test) 3529 gobject.timeout_add(15000, self.timeout) 3530 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 3531 "DeviceFound") 3532 self.add_signal(self.serviceDiscoveryResponse, 3533 WPAS_DBUS_IFACE_P2PDEVICE, 3534 "ServiceDiscoveryResponse", byte_arrays=True) 3535 self.loop.run() 3536 return self 3537 3538 def deviceFound(self, path): 3539 logger.debug("deviceFound: path=%s" % path) 3540 args = {'peer_object': path, 3541 'tlv': dbus.ByteArray(b"\x02\x00\x00\x01")} 3542 p2p.ServiceDiscoveryRequest(args) 3543 3544 def serviceDiscoveryResponse(self, sd_request): 3545 logger.debug("serviceDiscoveryResponse: sd_request=%s" % str(sd_request)) 3546 self.done = True 3547 self.loop.quit() 3548 3549 def run_test(self, *args): 3550 logger.debug("run_test") 3551 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social', 3552 'Timeout': dbus.Int32(10)})) 3553 return False 3554 3555 def success(self): 3556 return self.done 3557 3558 with TestDbusP2p(bus) as t: 3559 if not t.success(): 3560 raise Exception("Expected signals not seen") 3561 3562 dev[1].p2p_stop_find() 3563 3564def test_dbus_p2p_service_discovery_external(dev, apdev): 3565 """D-Bus P2P service discovery with external response""" 3566 try: 3567 _test_dbus_p2p_service_discovery_external(dev, apdev) 3568 finally: 3569 dev[0].request("P2P_SERV_DISC_EXTERNAL 0") 3570 3571def _test_dbus_p2p_service_discovery_external(dev, apdev): 3572 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 3573 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 3574 3575 addr0 = dev[0].p2p_dev_addr() 3576 addr1 = dev[1].p2p_dev_addr() 3577 resp = "0300000101" 3578 3579 dev[1].request("P2P_FLUSH") 3580 dev[1].request("P2P_SERV_DISC_REQ " + addr0 + " 02000001") 3581 dev[1].p2p_find(social=True) 3582 3583 class TestDbusP2p(TestDbus): 3584 def __init__(self, bus): 3585 TestDbus.__init__(self, bus) 3586 self.sd = False 3587 3588 def __enter__(self): 3589 gobject.timeout_add(1, self.run_test) 3590 gobject.timeout_add(15000, self.timeout) 3591 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 3592 "DeviceFound") 3593 self.add_signal(self.serviceDiscoveryRequest, 3594 WPAS_DBUS_IFACE_P2PDEVICE, 3595 "ServiceDiscoveryRequest") 3596 self.loop.run() 3597 return self 3598 3599 def deviceFound(self, path): 3600 logger.debug("deviceFound: path=%s" % path) 3601 3602 def serviceDiscoveryRequest(self, sd_request): 3603 logger.debug("serviceDiscoveryRequest: sd_request=%s" % str(sd_request)) 3604 self.sd = True 3605 args = {'peer_object': sd_request['peer_object'], 3606 'frequency': sd_request['frequency'], 3607 'dialog_token': sd_request['dialog_token'], 3608 'tlvs': dbus.ByteArray(binascii.unhexlify(resp))} 3609 p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv')) 3610 self.loop.quit() 3611 3612 def run_test(self, *args): 3613 logger.debug("run_test") 3614 p2p.ServiceDiscoveryExternal(1) 3615 p2p.ServiceUpdate() 3616 p2p.Listen(15) 3617 return False 3618 3619 def success(self): 3620 return self.sd 3621 3622 with TestDbusP2p(bus) as t: 3623 if not t.success(): 3624 raise Exception("Expected signals not seen") 3625 3626 ev = dev[1].wait_global_event(["P2P-SERV-DISC-RESP"], timeout=5) 3627 if ev is None: 3628 raise Exception("Service discovery timed out") 3629 if addr0 not in ev: 3630 raise Exception("Unexpected address in SD Response: " + ev) 3631 if ev.split(' ')[4] != resp: 3632 raise Exception("Unexpected response data SD Response: " + ev) 3633 dev[1].p2p_stop_find() 3634 3635 p2p.StopFind() 3636 p2p.ServiceDiscoveryExternal(0) 3637 3638def test_dbus_p2p_autogo(dev, apdev): 3639 """D-Bus P2P autonomous GO""" 3640 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 3641 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 3642 3643 addr0 = dev[0].p2p_dev_addr() 3644 3645 class TestDbusP2p(TestDbus): 3646 def __init__(self, bus): 3647 TestDbus.__init__(self, bus) 3648 self.first = True 3649 self.waiting_end = False 3650 self.exceptions = False 3651 self.deauthorized = False 3652 self.done = False 3653 3654 def __enter__(self): 3655 gobject.timeout_add(1, self.run_test) 3656 gobject.timeout_add(15000, self.timeout) 3657 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 3658 "DeviceFound") 3659 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 3660 "GroupStarted") 3661 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, 3662 "GroupFinished") 3663 self.add_signal(self.persistentGroupAdded, 3664 WPAS_DBUS_IFACE_P2PDEVICE, 3665 "PersistentGroupAdded") 3666 self.add_signal(self.persistentGroupRemoved, 3667 WPAS_DBUS_IFACE_P2PDEVICE, 3668 "PersistentGroupRemoved") 3669 self.add_signal(self.provisionDiscoveryRequestDisplayPin, 3670 WPAS_DBUS_IFACE_P2PDEVICE, 3671 "ProvisionDiscoveryRequestDisplayPin") 3672 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE, 3673 "StaAuthorized") 3674 self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE, 3675 "StaDeauthorized") 3676 self.loop.run() 3677 return self 3678 3679 def groupStarted(self, properties): 3680 logger.debug("groupStarted: " + str(properties)) 3681 self.group = properties['group_object'] 3682 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 3683 properties['interface_object']) 3684 role = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role", 3685 dbus_interface=dbus.PROPERTIES_IFACE) 3686 if role != "GO": 3687 self.exceptions = True 3688 raise Exception("Unexpected role reported: " + role) 3689 group = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group", 3690 dbus_interface=dbus.PROPERTIES_IFACE) 3691 if group != properties['group_object']: 3692 self.exceptions = True 3693 raise Exception("Unexpected Group reported: " + str(group)) 3694 go = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO", 3695 dbus_interface=dbus.PROPERTIES_IFACE) 3696 if go != '/': 3697 self.exceptions = True 3698 raise Exception("Unexpected PeerGO value: " + str(go)) 3699 if self.first: 3700 self.first = False 3701 logger.info("Remove persistent group instance") 3702 group_p2p = dbus.Interface(self.g_if_obj, 3703 WPAS_DBUS_IFACE_P2PDEVICE) 3704 group_p2p.Disconnect() 3705 else: 3706 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 3707 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join") 3708 3709 def groupFinished(self, properties): 3710 logger.debug("groupFinished: " + str(properties)) 3711 if self.waiting_end: 3712 logger.info("Remove persistent group") 3713 p2p.RemovePersistentGroup(self.persistent) 3714 else: 3715 logger.info("Re-start persistent group") 3716 params = dbus.Dictionary({'persistent_group_object': 3717 self.persistent, 3718 'frequency': 2412}) 3719 p2p.GroupAdd(params) 3720 3721 def persistentGroupAdded(self, path, properties): 3722 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties))) 3723 self.persistent = path 3724 3725 def persistentGroupRemoved(self, path): 3726 logger.debug("persistentGroupRemoved: %s" % path) 3727 self.done = True 3728 self.loop.quit() 3729 3730 def deviceFound(self, path): 3731 logger.debug("deviceFound: path=%s" % path) 3732 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path) 3733 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER, 3734 dbus_interface=dbus.PROPERTIES_IFACE, 3735 byte_arrays=True) 3736 logger.debug('peer properties: ' + str(self.peer)) 3737 3738 def provisionDiscoveryRequestDisplayPin(self, peer_object, pin): 3739 logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin)) 3740 self.peer_path = peer_object 3741 peer = binascii.unhexlify(peer_object.split('/')[-1]) 3742 addr = ':'.join(["%02x" % i for i in struct.unpack('6B', peer)]) 3743 3744 params = {'Role': 'registrar', 3745 'P2PDeviceAddress': self.peer['DeviceAddress'], 3746 'Bssid': self.peer['DeviceAddress'], 3747 'Type': 'pin'} 3748 wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS) 3749 try: 3750 wps.Start(params) 3751 self.exceptions = True 3752 raise Exception("Invalid WPS.Start() accepted") 3753 except dbus.exceptions.DBusException as e: 3754 if "InvalidArgs" not in str(e): 3755 self.exceptions = True 3756 raise Exception("Unexpected error message: " + str(e)) 3757 params = {'Role': 'registrar', 3758 'P2PDeviceAddress': self.peer['DeviceAddress'], 3759 'Type': 'pin', 3760 'Pin': '12345670'} 3761 logger.info("Authorize peer to connect to the group") 3762 wps.Start(params) 3763 3764 def staAuthorized(self, name): 3765 logger.debug("staAuthorized: " + name) 3766 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, self.peer_path) 3767 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER, 3768 dbus_interface=dbus.PROPERTIES_IFACE, 3769 byte_arrays=True) 3770 logger.debug("Peer properties: " + str(res)) 3771 if 'Groups' not in res or len(res['Groups']) != 1: 3772 self.exceptions = True 3773 raise Exception("Unexpected number of peer Groups entries") 3774 if res['Groups'][0] != self.group: 3775 self.exceptions = True 3776 raise Exception("Unexpected peer Groups[0] value") 3777 3778 g_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group) 3779 res = g_obj.GetAll(WPAS_DBUS_GROUP, 3780 dbus_interface=dbus.PROPERTIES_IFACE, 3781 byte_arrays=True) 3782 logger.debug("Group properties: " + str(res)) 3783 if 'Members' not in res or len(res['Members']) != 1: 3784 self.exceptions = True 3785 raise Exception("Unexpected number of group members") 3786 3787 ext = dbus.ByteArray(b"\x11\x22\x33\x44") 3788 # Earlier implementation of this interface was a bit strange. The 3789 # property is defined to have aay signature and that is what the 3790 # getter returned. However, the setter expected there to be a 3791 # dictionary with 'WPSVendorExtensions' as the key surrounding these 3792 # values.. The current implementations maintains support for that 3793 # for backwards compability reasons. Verify that encoding first. 3794 vals = dbus.Dictionary({'WPSVendorExtensions': [ext]}, 3795 signature='sv') 3796 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals, 3797 dbus_interface=dbus.PROPERTIES_IFACE) 3798 res = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions', 3799 dbus_interface=dbus.PROPERTIES_IFACE, 3800 byte_arrays=True) 3801 if len(res) != 1: 3802 self.exceptions = True 3803 raise Exception("Unexpected number of vendor extensions") 3804 if res[0] != ext: 3805 self.exceptions = True 3806 raise Exception("Vendor extension value changed") 3807 3808 # And now verify that the more appropriate encoding is accepted as 3809 # well. 3810 res.append(dbus.ByteArray(b'\xaa\xbb\xcc\xdd\xee\xff')) 3811 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res, 3812 dbus_interface=dbus.PROPERTIES_IFACE) 3813 res2 = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions', 3814 dbus_interface=dbus.PROPERTIES_IFACE, 3815 byte_arrays=True) 3816 if len(res) != 2: 3817 self.exceptions = True 3818 raise Exception("Unexpected number of vendor extensions") 3819 if res[0] != res2[0] or res[1] != res2[1]: 3820 self.exceptions = True 3821 raise Exception("Vendor extension value changed") 3822 3823 for i in range(10): 3824 res.append(dbus.ByteArray(b'\xaa\xbb')) 3825 try: 3826 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res, 3827 dbus_interface=dbus.PROPERTIES_IFACE) 3828 self.exceptions = True 3829 raise Exception("Invalid Set(WPSVendorExtensions) accepted") 3830 except dbus.exceptions.DBusException as e: 3831 if "Error.Failed" not in str(e): 3832 self.exceptions = True 3833 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e)) 3834 3835 vals = dbus.Dictionary({'Foo': [ext]}, signature='sv') 3836 try: 3837 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals, 3838 dbus_interface=dbus.PROPERTIES_IFACE) 3839 self.exceptions = True 3840 raise Exception("Invalid Set(WPSVendorExtensions) accepted") 3841 except dbus.exceptions.DBusException as e: 3842 if "InvalidArgs" not in str(e): 3843 self.exceptions = True 3844 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e)) 3845 3846 vals = ["foo"] 3847 try: 3848 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals, 3849 dbus_interface=dbus.PROPERTIES_IFACE) 3850 self.exceptions = True 3851 raise Exception("Invalid Set(WPSVendorExtensions) accepted") 3852 except dbus.exceptions.DBusException as e: 3853 if "Error.Failed" not in str(e): 3854 self.exceptions = True 3855 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e)) 3856 3857 vals = [["foo"]] 3858 try: 3859 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals, 3860 dbus_interface=dbus.PROPERTIES_IFACE) 3861 self.exceptions = True 3862 raise Exception("Invalid Set(WPSVendorExtensions) accepted") 3863 except dbus.exceptions.DBusException as e: 3864 if "Error.Failed" not in str(e): 3865 self.exceptions = True 3866 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e)) 3867 3868 p2p.RemoveClient({'peer': self.peer_path}) 3869 3870 self.waiting_end = True 3871 group_p2p = dbus.Interface(self.g_if_obj, 3872 WPAS_DBUS_IFACE_P2PDEVICE) 3873 group_p2p.Disconnect() 3874 3875 def staDeauthorized(self, name): 3876 logger.debug("staDeauthorized: " + name) 3877 self.deauthorized = True 3878 3879 def run_test(self, *args): 3880 logger.debug("run_test") 3881 params = dbus.Dictionary({'persistent': True, 3882 'frequency': 2412}) 3883 logger.info("Add a persistent group") 3884 p2p.GroupAdd(params) 3885 return False 3886 3887 def success(self): 3888 return self.done and self.deauthorized and not self.exceptions 3889 3890 with TestDbusP2p(bus) as t: 3891 if not t.success(): 3892 raise Exception("Expected signals not seen") 3893 3894 dev[1].wait_go_ending_session() 3895 3896def test_dbus_p2p_autogo_pbc(dev, apdev): 3897 """D-Bus P2P autonomous GO and PBC""" 3898 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 3899 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 3900 3901 addr0 = dev[0].p2p_dev_addr() 3902 3903 class TestDbusP2p(TestDbus): 3904 def __init__(self, bus): 3905 TestDbus.__init__(self, bus) 3906 self.first = True 3907 self.waiting_end = False 3908 self.done = False 3909 3910 def __enter__(self): 3911 gobject.timeout_add(1, self.run_test) 3912 gobject.timeout_add(15000, self.timeout) 3913 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 3914 "DeviceFound") 3915 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 3916 "GroupStarted") 3917 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, 3918 "GroupFinished") 3919 self.add_signal(self.provisionDiscoveryPBCRequest, 3920 WPAS_DBUS_IFACE_P2PDEVICE, 3921 "ProvisionDiscoveryPBCRequest") 3922 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE, 3923 "StaAuthorized") 3924 self.loop.run() 3925 return self 3926 3927 def groupStarted(self, properties): 3928 logger.debug("groupStarted: " + str(properties)) 3929 self.group = properties['group_object'] 3930 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 3931 properties['interface_object']) 3932 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 3933 dev1.global_request("P2P_CONNECT " + addr0 + " pbc join") 3934 3935 def groupFinished(self, properties): 3936 logger.debug("groupFinished: " + str(properties)) 3937 self.done = True 3938 self.loop.quit() 3939 3940 def deviceFound(self, path): 3941 logger.debug("deviceFound: path=%s" % path) 3942 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path) 3943 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER, 3944 dbus_interface=dbus.PROPERTIES_IFACE, 3945 byte_arrays=True) 3946 logger.debug('peer properties: ' + str(self.peer)) 3947 3948 def provisionDiscoveryPBCRequest(self, peer_object): 3949 logger.debug("provisionDiscoveryPBCRequest - peer=%s" % peer_object) 3950 self.peer_path = peer_object 3951 peer = binascii.unhexlify(peer_object.split('/')[-1]) 3952 addr = ':'.join(["%02x" % i for i in struct.unpack('6B', peer)]) 3953 params = {'Role': 'registrar', 3954 'P2PDeviceAddress': self.peer['DeviceAddress'], 3955 'Type': 'pbc'} 3956 logger.info("Authorize peer to connect to the group") 3957 wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS) 3958 wps.Start(params) 3959 3960 def staAuthorized(self, name): 3961 logger.debug("staAuthorized: " + name) 3962 group_p2p = dbus.Interface(self.g_if_obj, 3963 WPAS_DBUS_IFACE_P2PDEVICE) 3964 group_p2p.Disconnect() 3965 3966 def run_test(self, *args): 3967 logger.debug("run_test") 3968 params = dbus.Dictionary({'frequency': 2412}) 3969 p2p.GroupAdd(params) 3970 return False 3971 3972 def success(self): 3973 return self.done 3974 3975 with TestDbusP2p(bus) as t: 3976 if not t.success(): 3977 raise Exception("Expected signals not seen") 3978 3979 dev[1].wait_go_ending_session() 3980 dev[1].flush_scan_cache() 3981 3982def test_dbus_p2p_autogo_legacy(dev, apdev): 3983 """D-Bus P2P autonomous GO and legacy STA""" 3984 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 3985 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 3986 3987 addr0 = dev[0].p2p_dev_addr() 3988 3989 class TestDbusP2p(TestDbus): 3990 def __init__(self, bus): 3991 TestDbus.__init__(self, bus) 3992 self.done = False 3993 3994 def __enter__(self): 3995 gobject.timeout_add(1, self.run_test) 3996 gobject.timeout_add(15000, self.timeout) 3997 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 3998 "GroupStarted") 3999 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, 4000 "GroupFinished") 4001 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE, 4002 "StaAuthorized") 4003 self.loop.run() 4004 return self 4005 4006 def groupStarted(self, properties): 4007 logger.debug("groupStarted: " + str(properties)) 4008 g_obj = bus.get_object(WPAS_DBUS_SERVICE, 4009 properties['group_object']) 4010 res = g_obj.GetAll(WPAS_DBUS_GROUP, 4011 dbus_interface=dbus.PROPERTIES_IFACE, 4012 byte_arrays=True) 4013 bssid = ':'.join(["%02x" % i for i in struct.unpack('6B', res['BSSID'])]) 4014 4015 pin = '12345670' 4016 params = {'Role': 'enrollee', 4017 'Type': 'pin', 4018 'Pin': pin} 4019 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 4020 properties['interface_object']) 4021 wps = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_WPS) 4022 wps.Start(params) 4023 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 4024 dev1.scan_for_bss(bssid, freq=2412) 4025 dev1.request("WPS_PIN " + bssid + " " + pin) 4026 self.group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4027 4028 def groupFinished(self, properties): 4029 logger.debug("groupFinished: " + str(properties)) 4030 self.done = True 4031 self.loop.quit() 4032 4033 def staAuthorized(self, name): 4034 logger.debug("staAuthorized: " + name) 4035 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 4036 dev1.request("DISCONNECT") 4037 self.group_p2p.Disconnect() 4038 4039 def run_test(self, *args): 4040 logger.debug("run_test") 4041 params = dbus.Dictionary({'frequency': 2412}) 4042 p2p.GroupAdd(params) 4043 return False 4044 4045 def success(self): 4046 return self.done 4047 4048 with TestDbusP2p(bus) as t: 4049 if not t.success(): 4050 raise Exception("Expected signals not seen") 4051 4052def test_dbus_p2p_join(dev, apdev): 4053 """D-Bus P2P join an autonomous GO""" 4054 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 4055 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4056 4057 addr1 = dev[1].p2p_dev_addr() 4058 addr2 = dev[2].p2p_dev_addr() 4059 dev[1].p2p_start_go(freq=2412) 4060 dev1_group_ifname = dev[1].group_ifname 4061 dev[2].p2p_listen() 4062 4063 class TestDbusP2p(TestDbus): 4064 def __init__(self, bus): 4065 TestDbus.__init__(self, bus) 4066 self.done = False 4067 self.peer = None 4068 self.go = None 4069 4070 def __enter__(self): 4071 gobject.timeout_add(1, self.run_test) 4072 gobject.timeout_add(15000, self.timeout) 4073 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 4074 "DeviceFound") 4075 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 4076 "GroupStarted") 4077 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, 4078 "GroupFinished") 4079 self.add_signal(self.invitationResult, WPAS_DBUS_IFACE_P2PDEVICE, 4080 "InvitationResult") 4081 self.loop.run() 4082 return self 4083 4084 def deviceFound(self, path): 4085 logger.debug("deviceFound: path=%s" % path) 4086 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path) 4087 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER, 4088 dbus_interface=dbus.PROPERTIES_IFACE, 4089 byte_arrays=True) 4090 logger.debug('peer properties: ' + str(res)) 4091 if addr2.replace(':', '') in path: 4092 self.peer = path 4093 elif addr1.replace(':', '') in path: 4094 self.go = path 4095 if self.peer and self.go: 4096 logger.info("Join the group") 4097 p2p.StopFind() 4098 args = {'peer': self.go, 4099 'join': True, 4100 'wps_method': 'pin', 4101 'frequency': 2412} 4102 pin = p2p.Connect(args) 4103 4104 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 4105 dev1.group_ifname = dev1_group_ifname 4106 dev1.group_request("WPS_PIN any " + pin) 4107 4108 def groupStarted(self, properties): 4109 logger.debug("groupStarted: " + str(properties)) 4110 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 4111 properties['interface_object']) 4112 role = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role", 4113 dbus_interface=dbus.PROPERTIES_IFACE) 4114 if role != "client": 4115 raise Exception("Unexpected role reported: " + role) 4116 group = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group", 4117 dbus_interface=dbus.PROPERTIES_IFACE) 4118 if group != properties['group_object']: 4119 raise Exception("Unexpected Group reported: " + str(group)) 4120 go = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO", 4121 dbus_interface=dbus.PROPERTIES_IFACE) 4122 if go != self.go: 4123 raise Exception("Unexpected PeerGO value: " + str(go)) 4124 4125 g_obj = bus.get_object(WPAS_DBUS_SERVICE, 4126 properties['group_object']) 4127 res = g_obj.GetAll(WPAS_DBUS_GROUP, 4128 dbus_interface=dbus.PROPERTIES_IFACE, 4129 byte_arrays=True) 4130 logger.debug("Group properties: " + str(res)) 4131 4132 ext = dbus.ByteArray(b"\x11\x22\x33\x44") 4133 try: 4134 # Set(WPSVendorExtensions) not allowed for P2P Client 4135 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res, 4136 dbus_interface=dbus.PROPERTIES_IFACE) 4137 raise Exception("Invalid Set(WPSVendorExtensions) accepted") 4138 except dbus.exceptions.DBusException as e: 4139 if "Error.Failed: Failed to set property" not in str(e): 4140 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e)) 4141 4142 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4143 args = {'duration1': 30000, 'interval1': 102400, 4144 'duration2': 20000, 'interval2': 102400} 4145 group_p2p.PresenceRequest(args) 4146 4147 args = {'peer': self.peer} 4148 group_p2p.Invite(args) 4149 4150 def groupFinished(self, properties): 4151 logger.debug("groupFinished: " + str(properties)) 4152 self.done = True 4153 self.loop.quit() 4154 4155 def invitationResult(self, result): 4156 logger.debug("invitationResult: " + str(result)) 4157 if result['status'] != 1: 4158 raise Exception("Unexpected invitation result: " + str(result)) 4159 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 4160 dev1.group_ifname = dev1_group_ifname 4161 dev1.remove_group() 4162 4163 def run_test(self, *args): 4164 logger.debug("run_test") 4165 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'})) 4166 return False 4167 4168 def success(self): 4169 return self.done 4170 4171 with TestDbusP2p(bus) as t: 4172 if not t.success(): 4173 raise Exception("Expected signals not seen") 4174 4175 dev[2].p2p_stop_find() 4176 4177def test_dbus_p2p_invitation_received(dev, apdev): 4178 """D-Bus P2P and InvitationReceived""" 4179 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 4180 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4181 4182 form(dev[0], dev[1]) 4183 addr0 = dev[0].p2p_dev_addr() 4184 dev[0].p2p_listen() 4185 dev[0].global_request("SET persistent_reconnect 0") 4186 4187 if not dev[1].discover_peer(addr0, social=True): 4188 raise Exception("Peer " + addr0 + " not found") 4189 peer = dev[1].get_peer(addr0) 4190 4191 class TestDbusP2p(TestDbus): 4192 def __init__(self, bus): 4193 TestDbus.__init__(self, bus) 4194 self.done = False 4195 4196 def __enter__(self): 4197 gobject.timeout_add(1, self.run_test) 4198 gobject.timeout_add(15000, self.timeout) 4199 self.add_signal(self.invitationReceived, WPAS_DBUS_IFACE_P2PDEVICE, 4200 "InvitationReceived") 4201 self.loop.run() 4202 return self 4203 4204 def invitationReceived(self, result): 4205 logger.debug("invitationReceived: " + str(result)) 4206 self.done = True 4207 self.loop.quit() 4208 4209 def run_test(self, *args): 4210 logger.debug("run_test") 4211 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 4212 cmd = "P2P_INVITE persistent=" + peer['persistent'] + " peer=" + addr0 4213 dev1.global_request(cmd) 4214 return False 4215 4216 def success(self): 4217 return self.done 4218 4219 with TestDbusP2p(bus) as t: 4220 if not t.success(): 4221 raise Exception("Expected signals not seen") 4222 4223 dev[0].p2p_stop_find() 4224 dev[1].p2p_stop_find() 4225 4226def test_dbus_p2p_config(dev, apdev): 4227 """D-Bus Get/Set P2PDeviceConfig""" 4228 try: 4229 _test_dbus_p2p_config(dev, apdev) 4230 finally: 4231 dev[0].request("P2P_SET ssid_postfix ") 4232 4233def _test_dbus_p2p_config(dev, apdev): 4234 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 4235 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4236 4237 res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", 4238 dbus_interface=dbus.PROPERTIES_IFACE, 4239 byte_arrays=True) 4240 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", res, 4241 dbus_interface=dbus.PROPERTIES_IFACE) 4242 res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", 4243 dbus_interface=dbus.PROPERTIES_IFACE, 4244 byte_arrays=True) 4245 4246 if len(res) != len(res2): 4247 raise Exception("Different number of parameters") 4248 for k in res: 4249 if res[k] != res2[k]: 4250 raise Exception("Parameter %s value changes" % k) 4251 4252 changes = {'SsidPostfix': 'foo', 4253 'VendorExtension': [dbus.ByteArray(b'\x11\x22\x33\x44')], 4254 'SecondaryDeviceTypes': [dbus.ByteArray(b'\x11\x22\x33\x44\x55\x66\x77\x88')]} 4255 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", 4256 dbus.Dictionary(changes, signature='sv'), 4257 dbus_interface=dbus.PROPERTIES_IFACE) 4258 4259 res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", 4260 dbus_interface=dbus.PROPERTIES_IFACE, 4261 byte_arrays=True) 4262 logger.debug("P2PDeviceConfig: " + str(res2)) 4263 if 'VendorExtension' not in res2 or len(res2['VendorExtension']) != 1: 4264 raise Exception("VendorExtension does not match") 4265 if 'SecondaryDeviceTypes' not in res2 or len(res2['SecondaryDeviceTypes']) != 1: 4266 raise Exception("SecondaryDeviceType does not match") 4267 4268 changes = {'SsidPostfix': '', 4269 'VendorExtension': dbus.Array([], signature="ay"), 4270 'SecondaryDeviceTypes': dbus.Array([], signature="ay")} 4271 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", 4272 dbus.Dictionary(changes, signature='sv'), 4273 dbus_interface=dbus.PROPERTIES_IFACE) 4274 4275 res3 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", 4276 dbus_interface=dbus.PROPERTIES_IFACE, 4277 byte_arrays=True) 4278 logger.debug("P2PDeviceConfig: " + str(res3)) 4279 if 'VendorExtension' in res3: 4280 raise Exception("VendorExtension not removed") 4281 if 'SecondaryDeviceTypes' in res3: 4282 raise Exception("SecondaryDeviceType not removed") 4283 4284 try: 4285 dev[0].request("P2P_SET disabled 1") 4286 if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", 4287 dbus_interface=dbus.PROPERTIES_IFACE, 4288 byte_arrays=True) 4289 raise Exception("Invalid Get(P2PDeviceConfig) accepted") 4290 except dbus.exceptions.DBusException as e: 4291 if "Error.Failed: P2P is not available for this interface" not in str(e): 4292 raise Exception("Unexpected error message for invalid Invite: " + str(e)) 4293 finally: 4294 dev[0].request("P2P_SET disabled 0") 4295 4296 try: 4297 dev[0].request("P2P_SET disabled 1") 4298 changes = {'SsidPostfix': 'foo'} 4299 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", 4300 dbus.Dictionary(changes, signature='sv'), 4301 dbus_interface=dbus.PROPERTIES_IFACE) 4302 raise Exception("Invalid Set(P2PDeviceConfig) accepted") 4303 except dbus.exceptions.DBusException as e: 4304 if "Error.Failed: P2P is not available for this interface" not in str(e): 4305 raise Exception("Unexpected error message for invalid Invite: " + str(e)) 4306 finally: 4307 dev[0].request("P2P_SET disabled 0") 4308 4309 tests = [{'DeviceName': 123}, 4310 {'SsidPostfix': 123}, 4311 {'Foo': 'Bar'}] 4312 for changes in tests: 4313 try: 4314 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", 4315 dbus.Dictionary(changes, signature='sv'), 4316 dbus_interface=dbus.PROPERTIES_IFACE) 4317 raise Exception("Invalid Set(P2PDeviceConfig) accepted") 4318 except dbus.exceptions.DBusException as e: 4319 if "InvalidArgs" not in str(e): 4320 raise Exception("Unexpected error message for invalid Invite: " + str(e)) 4321 4322def test_dbus_p2p_persistent(dev, apdev): 4323 """D-Bus P2P persistent group""" 4324 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 4325 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4326 4327 class TestDbusP2p(TestDbus): 4328 def __init__(self, bus): 4329 TestDbus.__init__(self, bus) 4330 4331 def __enter__(self): 4332 gobject.timeout_add(1, self.run_test) 4333 gobject.timeout_add(15000, self.timeout) 4334 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 4335 "GroupStarted") 4336 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, 4337 "GroupFinished") 4338 self.add_signal(self.persistentGroupAdded, 4339 WPAS_DBUS_IFACE_P2PDEVICE, 4340 "PersistentGroupAdded") 4341 self.loop.run() 4342 return self 4343 4344 def groupStarted(self, properties): 4345 logger.debug("groupStarted: " + str(properties)) 4346 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 4347 properties['interface_object']) 4348 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4349 group_p2p.Disconnect() 4350 4351 def groupFinished(self, properties): 4352 logger.debug("groupFinished: " + str(properties)) 4353 self.loop.quit() 4354 4355 def persistentGroupAdded(self, path, properties): 4356 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties))) 4357 self.persistent = path 4358 4359 def run_test(self, *args): 4360 logger.debug("run_test") 4361 params = dbus.Dictionary({'persistent': True, 4362 'frequency': 2412}) 4363 logger.info("Add a persistent group") 4364 p2p.GroupAdd(params) 4365 return False 4366 4367 def success(self): 4368 return True 4369 4370 with TestDbusP2p(bus) as t: 4371 if not t.success(): 4372 raise Exception("Expected signals not seen") 4373 persistent = t.persistent 4374 4375 p_obj = bus.get_object(WPAS_DBUS_SERVICE, persistent) 4376 res = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties", 4377 dbus_interface=dbus.PROPERTIES_IFACE, byte_arrays=True) 4378 logger.info("Persistent group Properties: " + str(res)) 4379 vals = dbus.Dictionary({'ssid': 'DIRECT-foo'}, signature='sv') 4380 p_obj.Set(WPAS_DBUS_PERSISTENT_GROUP, "Properties", vals, 4381 dbus_interface=dbus.PROPERTIES_IFACE) 4382 res2 = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties", 4383 dbus_interface=dbus.PROPERTIES_IFACE) 4384 if len(res) != len(res2): 4385 raise Exception("Different number of parameters") 4386 for k in res: 4387 if k != 'ssid' and res[k] != res2[k]: 4388 raise Exception("Parameter %s value changes" % k) 4389 if res2['ssid'] != '"DIRECT-foo"': 4390 raise Exception("Unexpected ssid") 4391 4392 args = dbus.Dictionary({'ssid': 'DIRECT-testing', 4393 'psk': '1234567890'}, signature='sv') 4394 group = p2p.AddPersistentGroup(args) 4395 4396 groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups", 4397 dbus_interface=dbus.PROPERTIES_IFACE) 4398 if len(groups) != 2: 4399 raise Exception("Unexpected number of persistent groups: " + str(groups)) 4400 4401 p2p.RemoveAllPersistentGroups() 4402 4403 groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups", 4404 dbus_interface=dbus.PROPERTIES_IFACE) 4405 if len(groups) != 0: 4406 raise Exception("Unexpected number of persistent groups: " + str(groups)) 4407 4408 try: 4409 p2p.RemovePersistentGroup(persistent) 4410 raise Exception("Invalid RemovePersistentGroup accepted") 4411 except dbus.exceptions.DBusException as e: 4412 if "NetworkUnknown: There is no such persistent group" not in str(e): 4413 raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e)) 4414 4415def test_dbus_p2p_reinvoke_persistent(dev, apdev): 4416 """D-Bus P2P reinvoke persistent group""" 4417 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 4418 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4419 4420 addr0 = dev[0].p2p_dev_addr() 4421 4422 class TestDbusP2p(TestDbus): 4423 def __init__(self, bus): 4424 TestDbus.__init__(self, bus) 4425 self.first = True 4426 self.waiting_end = False 4427 self.done = False 4428 self.invited = False 4429 4430 def __enter__(self): 4431 gobject.timeout_add(1, self.run_test) 4432 gobject.timeout_add(15000, self.timeout) 4433 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 4434 "DeviceFound") 4435 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 4436 "GroupStarted") 4437 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, 4438 "GroupFinished") 4439 self.add_signal(self.persistentGroupAdded, 4440 WPAS_DBUS_IFACE_P2PDEVICE, 4441 "PersistentGroupAdded") 4442 self.add_signal(self.provisionDiscoveryRequestDisplayPin, 4443 WPAS_DBUS_IFACE_P2PDEVICE, 4444 "ProvisionDiscoveryRequestDisplayPin") 4445 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE, 4446 "StaAuthorized") 4447 self.loop.run() 4448 return self 4449 4450 def groupStarted(self, properties): 4451 logger.debug("groupStarted: " + str(properties)) 4452 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 4453 properties['interface_object']) 4454 if not self.invited: 4455 g_obj = bus.get_object(WPAS_DBUS_SERVICE, 4456 properties['group_object']) 4457 res = g_obj.GetAll(WPAS_DBUS_GROUP, 4458 dbus_interface=dbus.PROPERTIES_IFACE, 4459 byte_arrays=True) 4460 bssid = ':'.join(["%02x" % i for i in struct.unpack('6B', res['BSSID'])]) 4461 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 4462 dev1.scan_for_bss(bssid, freq=2412) 4463 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join") 4464 4465 def groupFinished(self, properties): 4466 logger.debug("groupFinished: " + str(properties)) 4467 if self.invited: 4468 self.done = True 4469 self.loop.quit() 4470 else: 4471 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 4472 dev1.global_request("SET persistent_reconnect 1") 4473 dev1.p2p_listen() 4474 4475 args = {'persistent_group_object': dbus.ObjectPath(path), 4476 'peer': self.peer_path} 4477 try: 4478 pin = p2p.Invite(args) 4479 raise Exception("Invalid Invite accepted") 4480 except dbus.exceptions.DBusException as e: 4481 if "InvalidArgs" not in str(e): 4482 raise Exception("Unexpected error message for invalid Invite: " + str(e)) 4483 4484 args = {'persistent_group_object': self.persistent, 4485 'peer': self.peer_path} 4486 pin = p2p.Invite(args) 4487 self.invited = True 4488 4489 self.sta_group_ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], 4490 timeout=15) 4491 if self.sta_group_ev is None: 4492 raise Exception("P2P-GROUP-STARTED event not seen") 4493 4494 def persistentGroupAdded(self, path, properties): 4495 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties))) 4496 self.persistent = path 4497 4498 def deviceFound(self, path): 4499 logger.debug("deviceFound: path=%s" % path) 4500 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path) 4501 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER, 4502 dbus_interface=dbus.PROPERTIES_IFACE, 4503 byte_arrays=True) 4504 4505 def provisionDiscoveryRequestDisplayPin(self, peer_object, pin): 4506 logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin)) 4507 self.peer_path = peer_object 4508 peer = binascii.unhexlify(peer_object.split('/')[-1]) 4509 addr = ':'.join(["%02x" % i for i in struct.unpack('6B', peer)]) 4510 params = {'Role': 'registrar', 4511 'P2PDeviceAddress': self.peer['DeviceAddress'], 4512 'Bssid': self.peer['DeviceAddress'], 4513 'Type': 'pin', 4514 'Pin': '12345670'} 4515 logger.info("Authorize peer to connect to the group") 4516 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 4517 wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS) 4518 wps.Start(params) 4519 self.sta_group_ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], 4520 timeout=15) 4521 if self.sta_group_ev is None: 4522 raise Exception("P2P-GROUP-STARTED event not seen") 4523 4524 def staAuthorized(self, name): 4525 logger.debug("staAuthorized: " + name) 4526 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 4527 dev1.group_form_result(self.sta_group_ev) 4528 dev1.remove_group() 4529 ev = dev1.wait_global_event(["P2P-GROUP-REMOVED"], timeout=10) 4530 if ev is None: 4531 raise Exception("Group removal timed out") 4532 group_p2p = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4533 group_p2p.Disconnect() 4534 4535 def run_test(self, *args): 4536 logger.debug("run_test") 4537 params = dbus.Dictionary({'persistent': True, 4538 'frequency': 2412}) 4539 logger.info("Add a persistent group") 4540 p2p.GroupAdd(params) 4541 return False 4542 4543 def success(self): 4544 return self.done 4545 4546 with TestDbusP2p(bus) as t: 4547 if not t.success(): 4548 raise Exception("Expected signals not seen") 4549 4550def test_dbus_p2p_go_neg_rx(dev, apdev): 4551 """D-Bus P2P GO Negotiation receive""" 4552 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 4553 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4554 addr0 = dev[0].p2p_dev_addr() 4555 4556 class TestDbusP2p(TestDbus): 4557 def __init__(self, bus): 4558 TestDbus.__init__(self, bus) 4559 self.done = False 4560 4561 def __enter__(self): 4562 gobject.timeout_add(1, self.run_test) 4563 gobject.timeout_add(15000, self.timeout) 4564 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 4565 "DeviceFound") 4566 self.add_signal(self.goNegotiationRequest, 4567 WPAS_DBUS_IFACE_P2PDEVICE, 4568 "GONegotiationRequest", 4569 byte_arrays=True) 4570 self.add_signal(self.goNegotiationSuccess, 4571 WPAS_DBUS_IFACE_P2PDEVICE, 4572 "GONegotiationSuccess", 4573 byte_arrays=True) 4574 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 4575 "GroupStarted") 4576 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, 4577 "GroupFinished") 4578 self.loop.run() 4579 return self 4580 4581 def deviceFound(self, path): 4582 logger.debug("deviceFound: path=%s" % path) 4583 4584 def goNegotiationRequest(self, path, dev_passwd_id, go_intent=0): 4585 logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path, dev_passwd_id, go_intent)) 4586 if dev_passwd_id != 1: 4587 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id) 4588 args = {'peer': path, 'wps_method': 'display', 'pin': '12345670', 4589 'go_intent': 15, 'persistent': False, 'frequency': 5175} 4590 try: 4591 p2p.Connect(args) 4592 raise Exception("Invalid Connect accepted") 4593 except dbus.exceptions.DBusException as e: 4594 if "ConnectChannelUnsupported" not in str(e): 4595 raise Exception("Unexpected error message for invalid Connect: " + str(e)) 4596 4597 args = {'peer': path, 'wps_method': 'display', 'pin': '12345670', 4598 'go_intent': 15, 'persistent': False} 4599 p2p.Connect(args) 4600 4601 def goNegotiationSuccess(self, properties): 4602 logger.debug("goNegotiationSuccess: properties=%s" % str(properties)) 4603 4604 def groupStarted(self, properties): 4605 logger.debug("groupStarted: " + str(properties)) 4606 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 4607 properties['interface_object']) 4608 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4609 group_p2p.Disconnect() 4610 4611 def groupFinished(self, properties): 4612 logger.debug("groupFinished: " + str(properties)) 4613 self.done = True 4614 self.loop.quit() 4615 4616 def run_test(self, *args): 4617 logger.debug("run_test") 4618 p2p.Listen(10) 4619 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 4620 if not dev1.discover_peer(addr0): 4621 raise Exception("Peer not found") 4622 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 enter") 4623 return False 4624 4625 def success(self): 4626 return self.done 4627 4628 with TestDbusP2p(bus) as t: 4629 if not t.success(): 4630 raise Exception("Expected signals not seen") 4631 4632def test_dbus_p2p_go_neg_auth(dev, apdev): 4633 """D-Bus P2P GO Negotiation authorized""" 4634 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 4635 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4636 addr0 = dev[0].p2p_dev_addr() 4637 dev[1].p2p_listen() 4638 4639 class TestDbusP2p(TestDbus): 4640 def __init__(self, bus): 4641 TestDbus.__init__(self, bus) 4642 self.done = False 4643 self.peer_joined = False 4644 self.peer_disconnected = False 4645 4646 def __enter__(self): 4647 gobject.timeout_add(1, self.run_test) 4648 gobject.timeout_add(15000, self.timeout) 4649 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 4650 "DeviceFound") 4651 self.add_signal(self.goNegotiationSuccess, 4652 WPAS_DBUS_IFACE_P2PDEVICE, 4653 "GONegotiationSuccess", 4654 byte_arrays=True) 4655 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 4656 "GroupStarted") 4657 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, 4658 "GroupFinished") 4659 self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE, 4660 "StaDeauthorized") 4661 self.add_signal(self.peerJoined, WPAS_DBUS_GROUP, 4662 "PeerJoined") 4663 self.add_signal(self.peerDisconnected, WPAS_DBUS_GROUP, 4664 "PeerDisconnected") 4665 self.loop.run() 4666 return self 4667 4668 def deviceFound(self, path): 4669 logger.debug("deviceFound: path=%s" % path) 4670 args = {'peer': path, 'wps_method': 'keypad', 4671 'go_intent': 15, 'authorize_only': True} 4672 try: 4673 p2p.Connect(args) 4674 raise Exception("Invalid Connect accepted") 4675 except dbus.exceptions.DBusException as e: 4676 if "InvalidArgs" not in str(e): 4677 raise Exception("Unexpected error message for invalid Connect: " + str(e)) 4678 4679 args = {'peer': path, 'wps_method': 'keypad', 'pin': '12345670', 4680 'go_intent': 15, 'authorize_only': True} 4681 p2p.Connect(args) 4682 p2p.Listen(10) 4683 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 4684 if not dev1.discover_peer(addr0): 4685 raise Exception("Peer not found") 4686 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=0") 4687 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15) 4688 if ev is None: 4689 raise Exception("Group formation timed out") 4690 self.sta_group_ev = ev 4691 4692 def goNegotiationSuccess(self, properties): 4693 logger.debug("goNegotiationSuccess: properties=%s" % str(properties)) 4694 4695 def groupStarted(self, properties): 4696 logger.debug("groupStarted: " + str(properties)) 4697 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 4698 properties['interface_object']) 4699 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 4700 dev1.group_form_result(self.sta_group_ev) 4701 dev1.remove_group() 4702 4703 def staDeauthorized(self, name): 4704 logger.debug("staDeuthorized: " + name) 4705 group_p2p = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4706 group_p2p.Disconnect() 4707 4708 def peerJoined(self, peer): 4709 logger.debug("peerJoined: " + peer) 4710 self.peer_joined = True 4711 4712 def peerDisconnected(self, peer): 4713 logger.debug("peerDisconnected: " + peer) 4714 self.peer_disconnected = True 4715 4716 def groupFinished(self, properties): 4717 logger.debug("groupFinished: " + str(properties)) 4718 self.done = True 4719 self.loop.quit() 4720 4721 def run_test(self, *args): 4722 logger.debug("run_test") 4723 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'})) 4724 return False 4725 4726 def success(self): 4727 return self.done and self.peer_joined and self.peer_disconnected 4728 4729 with TestDbusP2p(bus) as t: 4730 if not t.success(): 4731 raise Exception("Expected signals not seen") 4732 4733def test_dbus_p2p_go_neg_init(dev, apdev): 4734 """D-Bus P2P GO Negotiation initiation""" 4735 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 4736 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4737 addr0 = dev[0].p2p_dev_addr() 4738 dev[1].p2p_listen() 4739 4740 class TestDbusP2p(TestDbus): 4741 def __init__(self, bus): 4742 TestDbus.__init__(self, bus) 4743 self.done = False 4744 self.peer_group_added = False 4745 self.peer_group_removed = False 4746 4747 def __enter__(self): 4748 gobject.timeout_add(1, self.run_test) 4749 gobject.timeout_add(15000, self.timeout) 4750 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 4751 "DeviceFound") 4752 self.add_signal(self.goNegotiationSuccess, 4753 WPAS_DBUS_IFACE_P2PDEVICE, 4754 "GONegotiationSuccess", 4755 byte_arrays=True) 4756 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 4757 "GroupStarted") 4758 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, 4759 "GroupFinished") 4760 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE, 4761 "PropertiesChanged") 4762 self.loop.run() 4763 return self 4764 4765 def deviceFound(self, path): 4766 logger.debug("deviceFound: path=%s" % path) 4767 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 4768 args = {'peer': path, 'wps_method': 'keypad', 'pin': '12345670', 4769 'go_intent': 0} 4770 p2p.Connect(args) 4771 4772 ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15) 4773 if ev is None: 4774 raise Exception("Timeout while waiting for GO Neg Request") 4775 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15") 4776 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15) 4777 if ev is None: 4778 raise Exception("Group formation timed out") 4779 self.sta_group_ev = ev 4780 dev1.close_monitor_global() 4781 dev1.close_monitor_mon() 4782 dev1 = None 4783 4784 def goNegotiationSuccess(self, properties): 4785 logger.debug("goNegotiationSuccess: properties=%s" % str(properties)) 4786 4787 def groupStarted(self, properties): 4788 logger.debug("groupStarted: " + str(properties)) 4789 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 4790 properties['interface_object']) 4791 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4792 group_p2p.Disconnect() 4793 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1', monitor=False) 4794 dev1.group_form_result(self.sta_group_ev) 4795 dev1.remove_group() 4796 dev1 = None 4797 4798 def groupFinished(self, properties): 4799 logger.debug("groupFinished: " + str(properties)) 4800 self.done = True 4801 4802 def propertiesChanged(self, interface_name, changed_properties, 4803 invalidated_properties): 4804 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties))) 4805 if interface_name != WPAS_DBUS_P2P_PEER: 4806 return 4807 if "Groups" not in changed_properties: 4808 return 4809 if len(changed_properties["Groups"]) > 0: 4810 self.peer_group_added = True 4811 if len(changed_properties["Groups"]) == 0: 4812 if not self.peer_group_added: 4813 # This is likely a leftover event from an earlier test case, 4814 # ignore it to allow this test case to go through its steps. 4815 logger.info("Ignore propertiesChanged indicating group removal before group has been added") 4816 return 4817 self.peer_group_removed = True 4818 self.loop.quit() 4819 4820 def run_test(self, *args): 4821 logger.debug("run_test") 4822 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'})) 4823 return False 4824 4825 def success(self): 4826 return self.done and self.peer_group_added and self.peer_group_removed 4827 4828 with TestDbusP2p(bus) as t: 4829 if not t.success(): 4830 raise Exception("Expected signals not seen") 4831 4832def test_dbus_p2p_group_termination_by_go(dev, apdev): 4833 """D-Bus P2P group removal on GO terminating the group""" 4834 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 4835 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4836 addr0 = dev[0].p2p_dev_addr() 4837 dev[1].p2p_listen() 4838 4839 class TestDbusP2p(TestDbus): 4840 def __init__(self, bus): 4841 TestDbus.__init__(self, bus) 4842 self.done = False 4843 self.peer_group_added = False 4844 self.peer_group_removed = False 4845 4846 def __enter__(self): 4847 gobject.timeout_add(1, self.run_test) 4848 gobject.timeout_add(15000, self.timeout) 4849 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 4850 "DeviceFound") 4851 self.add_signal(self.goNegotiationSuccess, 4852 WPAS_DBUS_IFACE_P2PDEVICE, 4853 "GONegotiationSuccess", 4854 byte_arrays=True) 4855 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 4856 "GroupStarted") 4857 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, 4858 "GroupFinished") 4859 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE, 4860 "PropertiesChanged") 4861 self.loop.run() 4862 return self 4863 4864 def deviceFound(self, path): 4865 logger.debug("deviceFound: path=%s" % path) 4866 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 4867 args = {'peer': path, 'wps_method': 'keypad', 'pin': '12345670', 4868 'go_intent': 0} 4869 p2p.Connect(args) 4870 4871 ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15) 4872 if ev is None: 4873 raise Exception("Timeout while waiting for GO Neg Request") 4874 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15") 4875 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15) 4876 if ev is None: 4877 raise Exception("Group formation timed out") 4878 self.sta_group_ev = ev 4879 dev1.close_monitor_global() 4880 dev1.close_monitor_mon() 4881 dev1 = None 4882 4883 def goNegotiationSuccess(self, properties): 4884 logger.debug("goNegotiationSuccess: properties=%s" % str(properties)) 4885 4886 def groupStarted(self, properties): 4887 logger.debug("groupStarted: " + str(properties)) 4888 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 4889 properties['interface_object']) 4890 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1', monitor=False) 4891 dev1.group_form_result(self.sta_group_ev) 4892 dev1.remove_group() 4893 4894 def groupFinished(self, properties): 4895 logger.debug("groupFinished: " + str(properties)) 4896 self.done = True 4897 4898 def propertiesChanged(self, interface_name, changed_properties, 4899 invalidated_properties): 4900 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties))) 4901 if interface_name != WPAS_DBUS_P2P_PEER: 4902 return 4903 if "Groups" not in changed_properties: 4904 return 4905 if len(changed_properties["Groups"]) > 0: 4906 self.peer_group_added = True 4907 if len(changed_properties["Groups"]) == 0 and self.peer_group_added: 4908 self.peer_group_removed = True 4909 self.loop.quit() 4910 4911 def run_test(self, *args): 4912 logger.debug("run_test") 4913 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'})) 4914 return False 4915 4916 def success(self): 4917 return self.done and self.peer_group_added and self.peer_group_removed 4918 4919 with TestDbusP2p(bus) as t: 4920 if not t.success(): 4921 raise Exception("Expected signals not seen") 4922 4923def test_dbus_p2p_group_idle_timeout(dev, apdev): 4924 """D-Bus P2P group removal on idle timeout""" 4925 try: 4926 dev[0].global_request("SET p2p_group_idle 1") 4927 _test_dbus_p2p_group_idle_timeout(dev, apdev) 4928 finally: 4929 dev[0].global_request("SET p2p_group_idle 0") 4930 4931def _test_dbus_p2p_group_idle_timeout(dev, apdev): 4932 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 4933 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4934 addr0 = dev[0].p2p_dev_addr() 4935 dev[1].p2p_listen() 4936 4937 class TestDbusP2p(TestDbus): 4938 def __init__(self, bus): 4939 TestDbus.__init__(self, bus) 4940 self.done = False 4941 self.group_started = False 4942 self.peer_group_added = False 4943 self.peer_group_removed = False 4944 4945 def __enter__(self): 4946 gobject.timeout_add(1, self.run_test) 4947 gobject.timeout_add(15000, self.timeout) 4948 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 4949 "DeviceFound") 4950 self.add_signal(self.goNegotiationSuccess, 4951 WPAS_DBUS_IFACE_P2PDEVICE, 4952 "GONegotiationSuccess", 4953 byte_arrays=True) 4954 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 4955 "GroupStarted") 4956 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, 4957 "GroupFinished") 4958 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE, 4959 "PropertiesChanged") 4960 self.loop.run() 4961 return self 4962 4963 def deviceFound(self, path): 4964 logger.debug("deviceFound: path=%s" % path) 4965 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 4966 args = {'peer': path, 'wps_method': 'keypad', 'pin': '12345670', 4967 'go_intent': 0} 4968 p2p.Connect(args) 4969 4970 ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15) 4971 if ev is None: 4972 raise Exception("Timeout while waiting for GO Neg Request") 4973 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15") 4974 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15) 4975 if ev is None: 4976 raise Exception("Group formation timed out") 4977 self.sta_group_ev = ev 4978 dev1.close_monitor_global() 4979 dev1.close_monitor_mon() 4980 dev1 = None 4981 4982 def goNegotiationSuccess(self, properties): 4983 logger.debug("goNegotiationSuccess: properties=%s" % str(properties)) 4984 4985 def groupStarted(self, properties): 4986 logger.debug("groupStarted: " + str(properties)) 4987 self.group_started = True 4988 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 4989 properties['interface_object']) 4990 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1', monitor=False) 4991 dev1.group_form_result(self.sta_group_ev) 4992 ifaddr = dev1.group_request("STA-FIRST").splitlines()[0] 4993 # Force disassociation with different reason code so that the 4994 # P2P Client using D-Bus does not get normal group termination event 4995 # from the GO. 4996 dev1.group_request("DEAUTHENTICATE " + ifaddr + " reason=0 test=0") 4997 dev1.remove_group() 4998 4999 def groupFinished(self, properties): 5000 logger.debug("groupFinished: " + str(properties)) 5001 self.done = True 5002 5003 def propertiesChanged(self, interface_name, changed_properties, 5004 invalidated_properties): 5005 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties))) 5006 if interface_name != WPAS_DBUS_P2P_PEER: 5007 return 5008 if not self.group_started: 5009 return 5010 if "Groups" not in changed_properties: 5011 return 5012 if len(changed_properties["Groups"]) > 0: 5013 self.peer_group_added = True 5014 if len(changed_properties["Groups"]) == 0: 5015 self.peer_group_removed = True 5016 self.loop.quit() 5017 5018 def run_test(self, *args): 5019 logger.debug("run_test") 5020 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'})) 5021 return False 5022 5023 def success(self): 5024 return self.done and self.peer_group_added and self.peer_group_removed 5025 5026 with TestDbusP2p(bus) as t: 5027 if not t.success(): 5028 raise Exception("Expected signals not seen") 5029 5030def test_dbus_p2p_wps_failure(dev, apdev): 5031 """D-Bus P2P WPS failure""" 5032 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5033 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 5034 addr0 = dev[0].p2p_dev_addr() 5035 5036 class TestDbusP2p(TestDbus): 5037 def __init__(self, bus): 5038 TestDbus.__init__(self, bus) 5039 self.wps_failed = False 5040 self.formation_failure = False 5041 5042 def __enter__(self): 5043 gobject.timeout_add(1, self.run_test) 5044 gobject.timeout_add(15000, self.timeout) 5045 self.add_signal(self.goNegotiationRequest, 5046 WPAS_DBUS_IFACE_P2PDEVICE, 5047 "GONegotiationRequest", 5048 byte_arrays=True) 5049 self.add_signal(self.goNegotiationSuccess, 5050 WPAS_DBUS_IFACE_P2PDEVICE, 5051 "GONegotiationSuccess", 5052 byte_arrays=True) 5053 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 5054 "GroupStarted") 5055 self.add_signal(self.wpsFailed, WPAS_DBUS_IFACE_P2PDEVICE, 5056 "WpsFailed") 5057 self.add_signal(self.groupFormationFailure, 5058 WPAS_DBUS_IFACE_P2PDEVICE, 5059 "GroupFormationFailure") 5060 self.loop.run() 5061 return self 5062 5063 def goNegotiationRequest(self, path, dev_passwd_id, go_intent=0): 5064 logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path, dev_passwd_id, go_intent)) 5065 if dev_passwd_id != 1: 5066 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id) 5067 args = {'peer': path, 'wps_method': 'display', 'pin': '12345670', 5068 'go_intent': 15} 5069 p2p.Connect(args) 5070 5071 def goNegotiationSuccess(self, properties): 5072 logger.debug("goNegotiationSuccess: properties=%s" % str(properties)) 5073 5074 def groupStarted(self, properties): 5075 logger.debug("groupStarted: " + str(properties)) 5076 raise Exception("Unexpected GroupStarted") 5077 5078 def wpsFailed(self, name, args): 5079 logger.debug("wpsFailed - name=%s args=%s" % (name, str(args))) 5080 self.wps_failed = True 5081 if self.formation_failure: 5082 self.loop.quit() 5083 5084 def groupFormationFailure(self, reason): 5085 logger.debug("groupFormationFailure - reason=%s" % reason) 5086 self.formation_failure = True 5087 if self.wps_failed: 5088 self.loop.quit() 5089 5090 def run_test(self, *args): 5091 logger.debug("run_test") 5092 p2p.Listen(10) 5093 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 5094 if not dev1.discover_peer(addr0): 5095 raise Exception("Peer not found") 5096 dev1.global_request("P2P_CONNECT " + addr0 + " 87654321 enter") 5097 return False 5098 5099 def success(self): 5100 return self.wps_failed and self.formation_failure 5101 5102 with TestDbusP2p(bus) as t: 5103 if not t.success(): 5104 raise Exception("Expected signals not seen") 5105 5106def test_dbus_p2p_two_groups(dev, apdev): 5107 """D-Bus P2P with two concurrent groups""" 5108 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5109 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 5110 5111 dev[0].request("SET p2p_no_group_iface 0") 5112 addr0 = dev[0].p2p_dev_addr() 5113 addr1 = dev[1].p2p_dev_addr() 5114 addr2 = dev[2].p2p_dev_addr() 5115 dev[1].p2p_start_go(freq=2412) 5116 dev1_group_ifname = dev[1].group_ifname 5117 5118 class TestDbusP2p(TestDbus): 5119 def __init__(self, bus): 5120 TestDbus.__init__(self, bus) 5121 self.done = False 5122 self.peer = None 5123 self.go = None 5124 self.group1 = None 5125 self.group2 = None 5126 self.groups_removed = False 5127 5128 def __enter__(self): 5129 gobject.timeout_add(1, self.run_test) 5130 gobject.timeout_add(15000, self.timeout) 5131 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE, 5132 "PropertiesChanged", byte_arrays=True) 5133 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 5134 "DeviceFound") 5135 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 5136 "GroupStarted") 5137 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, 5138 "GroupFinished") 5139 self.add_signal(self.peerJoined, WPAS_DBUS_GROUP, 5140 "PeerJoined") 5141 self.loop.run() 5142 return self 5143 5144 def propertiesChanged(self, interface_name, changed_properties, 5145 invalidated_properties): 5146 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties))) 5147 5148 def deviceFound(self, path): 5149 logger.debug("deviceFound: path=%s" % path) 5150 if addr2.replace(':', '') in path: 5151 self.peer = path 5152 elif addr1.replace(':', '') in path: 5153 self.go = path 5154 if self.go and not self.group1: 5155 logger.info("Join the group") 5156 p2p.StopFind() 5157 pin = '12345670' 5158 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 5159 dev1.group_ifname = dev1_group_ifname 5160 dev1.group_request("WPS_PIN any " + pin) 5161 args = {'peer': self.go, 5162 'join': True, 5163 'wps_method': 'pin', 5164 'pin': pin, 5165 'frequency': 2412} 5166 p2p.Connect(args) 5167 5168 def groupStarted(self, properties): 5169 logger.debug("groupStarted: " + str(properties)) 5170 prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE, 5171 dbus_interface=dbus.PROPERTIES_IFACE) 5172 logger.debug("p2pdevice properties: " + str(prop)) 5173 5174 g_obj = bus.get_object(WPAS_DBUS_SERVICE, 5175 properties['group_object']) 5176 res = g_obj.GetAll(WPAS_DBUS_GROUP, 5177 dbus_interface=dbus.PROPERTIES_IFACE, 5178 byte_arrays=True) 5179 logger.debug("Group properties: " + str(res)) 5180 5181 if not self.group1: 5182 self.group1 = properties['group_object'] 5183 self.group1iface = properties['interface_object'] 5184 self.g1_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 5185 self.group1iface) 5186 5187 logger.info("Start autonomous GO") 5188 params = dbus.Dictionary({'frequency': 2412}) 5189 p2p.GroupAdd(params) 5190 elif not self.group2: 5191 self.group2 = properties['group_object'] 5192 self.group2iface = properties['interface_object'] 5193 self.g2_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 5194 self.group2iface) 5195 self.g2_bssid = res['BSSID'] 5196 5197 if self.group1 and self.group2: 5198 logger.info("Authorize peer to join the group") 5199 a2 = binascii.unhexlify(addr2.replace(':', '')) 5200 params = {'Role': 'enrollee', 5201 'P2PDeviceAddress': dbus.ByteArray(a2), 5202 'Bssid': dbus.ByteArray(a2), 5203 'Type': 'pin', 5204 'Pin': '12345670'} 5205 g_wps = dbus.Interface(self.g2_if_obj, WPAS_DBUS_IFACE_WPS) 5206 g_wps.Start(params) 5207 5208 bssid = ':'.join(["%02x" % i for i in struct.unpack('6B', self.g2_bssid)]) 5209 dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2') 5210 dev2.scan_for_bss(bssid, freq=2412) 5211 dev2.global_request("P2P_CONNECT " + bssid + " 12345670 join freq=2412") 5212 ev = dev2.wait_global_event(["P2P-GROUP-STARTED"], timeout=15) 5213 if ev is None: 5214 raise Exception("Group join timed out") 5215 self.dev2_group_ev = ev 5216 5217 def groupFinished(self, properties): 5218 logger.debug("groupFinished: " + str(properties)) 5219 5220 if self.group1 == properties['group_object']: 5221 self.group1 = None 5222 elif self.group2 == properties['group_object']: 5223 self.group2 = None 5224 5225 if not self.group1 and not self.group2: 5226 self.done = True 5227 self.loop.quit() 5228 5229 def peerJoined(self, peer): 5230 logger.debug("peerJoined: " + peer) 5231 if self.groups_removed: 5232 return 5233 self.check_results() 5234 5235 dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2') 5236 dev2.group_form_result(self.dev2_group_ev) 5237 dev2.remove_group() 5238 5239 logger.info("Disconnect group2") 5240 group_p2p = dbus.Interface(self.g2_if_obj, 5241 WPAS_DBUS_IFACE_P2PDEVICE) 5242 group_p2p.Disconnect() 5243 5244 logger.info("Disconnect group1") 5245 group_p2p = dbus.Interface(self.g1_if_obj, 5246 WPAS_DBUS_IFACE_P2PDEVICE) 5247 group_p2p.Disconnect() 5248 self.groups_removed = True 5249 5250 def check_results(self): 5251 logger.info("Check results with two concurrent groups in operation") 5252 5253 g1_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group1) 5254 res1 = g1_obj.GetAll(WPAS_DBUS_GROUP, 5255 dbus_interface=dbus.PROPERTIES_IFACE, 5256 byte_arrays=True) 5257 5258 g2_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group2) 5259 res2 = g2_obj.GetAll(WPAS_DBUS_GROUP, 5260 dbus_interface=dbus.PROPERTIES_IFACE, 5261 byte_arrays=True) 5262 5263 logger.info("group1 = " + self.group1) 5264 logger.debug("Group properties: " + str(res1)) 5265 5266 logger.info("group2 = " + self.group2) 5267 logger.debug("Group properties: " + str(res2)) 5268 5269 prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE, 5270 dbus_interface=dbus.PROPERTIES_IFACE) 5271 logger.debug("p2pdevice properties: " + str(prop)) 5272 5273 if res1['Role'] != 'client': 5274 raise Exception("Group1 role reported incorrectly: " + res1['Role']) 5275 if res2['Role'] != 'GO': 5276 raise Exception("Group2 role reported incorrectly: " + res2['Role']) 5277 if prop['Role'] != 'device': 5278 raise Exception("p2pdevice role reported incorrectly: " + prop['Role']) 5279 5280 if len(res2['Members']) != 1: 5281 raise Exception("Unexpected Members value for group 2") 5282 5283 def run_test(self, *args): 5284 logger.debug("run_test") 5285 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'})) 5286 return False 5287 5288 def success(self): 5289 return self.done 5290 5291 with TestDbusP2p(bus) as t: 5292 if not t.success(): 5293 raise Exception("Expected signals not seen") 5294 5295 dev[1].remove_group() 5296 5297def test_dbus_p2p_cancel(dev, apdev): 5298 """D-Bus P2P Cancel""" 5299 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5300 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 5301 try: 5302 p2p.Cancel() 5303 raise Exception("Unexpected p2p.Cancel() success") 5304 except dbus.exceptions.DBusException as e: 5305 pass 5306 5307 addr0 = dev[0].p2p_dev_addr() 5308 dev[1].p2p_listen() 5309 5310 class TestDbusP2p(TestDbus): 5311 def __init__(self, bus): 5312 TestDbus.__init__(self, bus) 5313 self.done = False 5314 5315 def __enter__(self): 5316 gobject.timeout_add(1, self.run_test) 5317 gobject.timeout_add(15000, self.timeout) 5318 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 5319 "DeviceFound") 5320 self.loop.run() 5321 return self 5322 5323 def deviceFound(self, path): 5324 logger.debug("deviceFound: path=%s" % path) 5325 args = {'peer': path, 'wps_method': 'keypad', 'pin': '12345670', 5326 'go_intent': 0} 5327 p2p.Connect(args) 5328 p2p.Cancel() 5329 self.done = True 5330 self.loop.quit() 5331 5332 def run_test(self, *args): 5333 logger.debug("run_test") 5334 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'})) 5335 return False 5336 5337 def success(self): 5338 return self.done 5339 5340 with TestDbusP2p(bus) as t: 5341 if not t.success(): 5342 raise Exception("Expected signals not seen") 5343 5344def test_dbus_p2p_ip_addr(dev, apdev): 5345 """D-Bus P2P and IP address parameters""" 5346 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5347 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 5348 5349 vals = [("IpAddrGo", "192.168.43.1"), 5350 ("IpAddrMask", "255.255.255.0"), 5351 ("IpAddrStart", "192.168.43.100"), 5352 ("IpAddrEnd", "192.168.43.199")] 5353 for field, value in vals: 5354 if_obj.Set(WPAS_DBUS_IFACE, field, value, 5355 dbus_interface=dbus.PROPERTIES_IFACE) 5356 val = if_obj.Get(WPAS_DBUS_IFACE, field, 5357 dbus_interface=dbus.PROPERTIES_IFACE) 5358 if val != value: 5359 raise Exception("Unexpected %s value: %s" % (field, val)) 5360 5361 set_ip_addr_info(dev[1]) 5362 5363 dev[0].global_request("SET p2p_go_intent 0") 5364 5365 req = dev[0].global_request("NFC_GET_HANDOVER_REQ NDEF P2P-CR").rstrip() 5366 if "FAIL" in req: 5367 raise Exception("Failed to generate NFC connection handover request") 5368 sel = dev[1].global_request("NFC_GET_HANDOVER_SEL NDEF P2P-CR").rstrip() 5369 if "FAIL" in sel: 5370 raise Exception("Failed to generate NFC connection handover select") 5371 dev[0].dump_monitor() 5372 dev[1].dump_monitor() 5373 res = dev[1].global_request("NFC_REPORT_HANDOVER RESP P2P " + req + " " + sel) 5374 if "FAIL" in res: 5375 raise Exception("Failed to report NFC connection handover to wpa_supplicant(resp)") 5376 res = dev[0].global_request("NFC_REPORT_HANDOVER INIT P2P " + req + " " + sel) 5377 if "FAIL" in res: 5378 raise Exception("Failed to report NFC connection handover to wpa_supplicant(init)") 5379 5380 class TestDbusP2p(TestDbus): 5381 def __init__(self, bus): 5382 TestDbus.__init__(self, bus) 5383 self.done = False 5384 5385 def __enter__(self): 5386 gobject.timeout_add(1, self.run_test) 5387 gobject.timeout_add(15000, self.timeout) 5388 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 5389 "GroupStarted") 5390 self.loop.run() 5391 return self 5392 5393 def groupStarted(self, properties): 5394 logger.debug("groupStarted: " + str(properties)) 5395 self.loop.quit() 5396 5397 if 'IpAddrGo' not in properties: 5398 logger.info("IpAddrGo missing from GroupStarted") 5399 ip_addr_go = properties['IpAddrGo'] 5400 addr = "%d.%d.%d.%d" % (ip_addr_go[0], ip_addr_go[1], ip_addr_go[2], ip_addr_go[3]) 5401 if addr != "192.168.42.1": 5402 logger.info("Unexpected IpAddrGo value: " + addr) 5403 self.done = True 5404 5405 def run_test(self, *args): 5406 logger.debug("run_test") 5407 return False 5408 5409 def success(self): 5410 return self.done 5411 5412 with TestDbusP2p(bus) as t: 5413 if not t.success(): 5414 raise Exception("Expected signals not seen") 5415 5416def test_dbus_introspect(dev, apdev): 5417 """D-Bus introspection""" 5418 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5419 5420 res = if_obj.Introspect(WPAS_DBUS_IFACE, 5421 dbus_interface=dbus.INTROSPECTABLE_IFACE) 5422 logger.info("Initial Introspect: " + str(res)) 5423 if res is None or "Introspectable" not in res or "GroupStarted" not in res: 5424 raise Exception("Unexpected initial Introspect response: " + str(res)) 5425 if "FastReauth" not in res or "PassiveScan" not in res: 5426 raise Exception("Unexpected initial Introspect response: " + str(res)) 5427 5428 with alloc_fail(dev[0], 1, "wpa_dbus_introspect"): 5429 res2 = if_obj.Introspect(WPAS_DBUS_IFACE, 5430 dbus_interface=dbus.INTROSPECTABLE_IFACE) 5431 logger.info("Introspect: " + str(res2)) 5432 if res2 is not None: 5433 raise Exception("Unexpected Introspect response") 5434 5435 with alloc_fail(dev[0], 1, "=add_interface;wpa_dbus_introspect"): 5436 res2 = if_obj.Introspect(WPAS_DBUS_IFACE, 5437 dbus_interface=dbus.INTROSPECTABLE_IFACE) 5438 logger.info("Introspect: " + str(res2)) 5439 if res2 is None: 5440 raise Exception("No Introspect response") 5441 if len(res2) >= len(res): 5442 raise Exception("Unexpected Introspect response") 5443 5444 with alloc_fail(dev[0], 1, "wpabuf_alloc;add_interface;wpa_dbus_introspect"): 5445 res2 = if_obj.Introspect(WPAS_DBUS_IFACE, 5446 dbus_interface=dbus.INTROSPECTABLE_IFACE) 5447 logger.info("Introspect: " + str(res2)) 5448 if res2 is None: 5449 raise Exception("No Introspect response") 5450 if len(res2) >= len(res): 5451 raise Exception("Unexpected Introspect response") 5452 5453 with alloc_fail(dev[0], 2, "=add_interface;wpa_dbus_introspect"): 5454 res2 = if_obj.Introspect(WPAS_DBUS_IFACE, 5455 dbus_interface=dbus.INTROSPECTABLE_IFACE) 5456 logger.info("Introspect: " + str(res2)) 5457 if res2 is None: 5458 raise Exception("No Introspect response") 5459 if len(res2) >= len(res): 5460 raise Exception("Unexpected Introspect response") 5461 5462def run_busctl(service, obj): 5463 if not shutil.which("busctl"): 5464 raise HwsimSkip("No busctl available") 5465 logger.info("busctl introspect %s %s" % (service, obj)) 5466 cmd = subprocess.Popen(['busctl', 'introspect', service, obj], 5467 stdout=subprocess.PIPE, 5468 stderr=subprocess.PIPE) 5469 out = cmd.communicate() 5470 cmd.wait() 5471 logger.info("busctl stdout:\n%s" % out[0].strip()) 5472 if len(out[1]) > 0: 5473 logger.info("busctl stderr: %s" % out[1].decode().strip()) 5474 if "Duplicate property" in out[1].decode(): 5475 raise Exception("Duplicate property") 5476 5477def test_dbus_introspect_busctl(dev, apdev): 5478 """D-Bus introspection with busctl""" 5479 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5480 ifaces = dbus_get(dbus, wpas_obj, "Interfaces") 5481 run_busctl(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH) 5482 run_busctl(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH + "/Interfaces") 5483 run_busctl(WPAS_DBUS_SERVICE, ifaces[0]) 5484 5485 hapd = hostapd.add_ap(apdev[0], {"ssid": "open"}) 5486 bssid = apdev[0]['bssid'] 5487 dev[0].scan_for_bss(bssid, freq=2412) 5488 id = dev[0].add_network() 5489 dev[0].set_network(id, "disabled", "0") 5490 dev[0].set_network_quoted(id, "ssid", "test") 5491 5492 run_busctl(WPAS_DBUS_SERVICE, ifaces[0] + "/BSSs/0") 5493 run_busctl(WPAS_DBUS_SERVICE, ifaces[0] + "/Networks/0") 5494 5495def test_dbus_ap(dev, apdev): 5496 """D-Bus AddNetwork for AP mode""" 5497 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5498 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 5499 5500 ssid = "test-wpa2-psk" 5501 passphrase = 'qwertyuiop' 5502 5503 class TestDbusConnect(TestDbus): 5504 def __init__(self, bus): 5505 TestDbus.__init__(self, bus) 5506 self.started = False 5507 self.sta_added = False 5508 self.sta_removed = False 5509 self.authorized = False 5510 self.deauthorized = False 5511 self.stations = False 5512 5513 def __enter__(self): 5514 gobject.timeout_add(1, self.run_connect) 5515 gobject.timeout_add(15000, self.timeout) 5516 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded") 5517 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE, 5518 "NetworkSelected") 5519 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, 5520 "PropertiesChanged") 5521 self.add_signal(self.stationAdded, WPAS_DBUS_IFACE, "StationAdded") 5522 self.add_signal(self.stationRemoved, WPAS_DBUS_IFACE, 5523 "StationRemoved") 5524 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE, 5525 "StaAuthorized") 5526 self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE, 5527 "StaDeauthorized") 5528 self.loop.run() 5529 return self 5530 5531 def networkAdded(self, network, properties): 5532 logger.debug("networkAdded: %s" % str(network)) 5533 logger.debug(str(properties)) 5534 5535 def networkSelected(self, network): 5536 logger.debug("networkSelected: %s" % str(network)) 5537 self.network_selected = True 5538 5539 def propertiesChanged(self, properties): 5540 logger.debug("propertiesChanged: %s" % str(properties)) 5541 if 'State' in properties and properties['State'] == "completed": 5542 self.started = True 5543 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 5544 dev1.connect(ssid, psk=passphrase, scan_freq="2412") 5545 5546 def stationAdded(self, station, properties): 5547 logger.debug("stationAdded: %s" % str(station)) 5548 logger.debug(str(properties)) 5549 self.sta_added = True 5550 res = if_obj.Get(WPAS_DBUS_IFACE, 'Stations', 5551 dbus_interface=dbus.PROPERTIES_IFACE) 5552 logger.info("Stations: " + str(res)) 5553 if len(res) == 1: 5554 self.stations = True 5555 else: 5556 raise Exception("Missing Stations entry: " + str(res)) 5557 5558 def stationRemoved(self, station): 5559 logger.debug("stationRemoved: %s" % str(station)) 5560 self.sta_removed = True 5561 res = if_obj.Get(WPAS_DBUS_IFACE, 'Stations', 5562 dbus_interface=dbus.PROPERTIES_IFACE) 5563 logger.info("Stations: " + str(res)) 5564 if len(res) != 0: 5565 self.stations = False 5566 raise Exception("Unexpected Stations entry: " + str(res)) 5567 self.loop.quit() 5568 5569 def staAuthorized(self, name): 5570 logger.debug("staAuthorized: " + name) 5571 self.authorized = True 5572 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 5573 dev1.request("DISCONNECT") 5574 5575 def staDeauthorized(self, name): 5576 logger.debug("staDeauthorized: " + name) 5577 self.deauthorized = True 5578 5579 def run_connect(self, *args): 5580 logger.debug("run_connect") 5581 args = dbus.Dictionary({'ssid': ssid, 5582 'key_mgmt': 'WPA-PSK', 5583 'psk': passphrase, 5584 'mode': 2, 5585 'frequency': 2412, 5586 'scan_freq': 2412}, 5587 signature='sv') 5588 self.netw = iface.AddNetwork(args) 5589 iface.SelectNetwork(self.netw) 5590 return False 5591 5592 def success(self): 5593 return self.started and self.sta_added and self.sta_removed and \ 5594 self.authorized and self.deauthorized 5595 5596 with TestDbusConnect(bus) as t: 5597 if not t.success(): 5598 raise Exception("Expected signals not seen") 5599 5600def test_dbus_ap_scan(dev, apdev): 5601 """D-Bus AddNetwork for AP mode and scan""" 5602 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5603 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 5604 5605 ssid = "test-wpa2-psk" 5606 passphrase = 'qwertyuiop' 5607 5608 hapd = hostapd.add_ap(apdev[0], {"ssid": "open"}) 5609 bssid = hapd.own_addr() 5610 5611 class TestDbusConnect(TestDbus): 5612 def __init__(self, bus): 5613 TestDbus.__init__(self, bus) 5614 self.started = False 5615 self.scan_completed = False 5616 5617 def __enter__(self): 5618 gobject.timeout_add(1, self.run_connect) 5619 gobject.timeout_add(15000, self.timeout) 5620 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, 5621 "PropertiesChanged") 5622 self.add_signal(self.scanDone, WPAS_DBUS_IFACE, "ScanDone") 5623 self.loop.run() 5624 return self 5625 5626 def propertiesChanged(self, properties): 5627 logger.debug("propertiesChanged: %s" % str(properties)) 5628 if 'State' in properties and properties['State'] == "completed": 5629 self.started = True 5630 logger.info("Try to scan in AP mode") 5631 iface.Scan({'Type': 'active', 5632 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]}) 5633 logger.info("Scan() returned") 5634 5635 def scanDone(self, success): 5636 logger.debug("scanDone: success=%s" % success) 5637 if self.started: 5638 self.scan_completed = True 5639 self.loop.quit() 5640 5641 def run_connect(self, *args): 5642 logger.debug("run_connect") 5643 args = dbus.Dictionary({'ssid': ssid, 5644 'key_mgmt': 'WPA-PSK', 5645 'psk': passphrase, 5646 'mode': 2, 5647 'frequency': 2412, 5648 'scan_freq': 2412}, 5649 signature='sv') 5650 self.netw = iface.AddNetwork(args) 5651 iface.SelectNetwork(self.netw) 5652 return False 5653 5654 def success(self): 5655 return self.started and self.scan_completed 5656 5657 with TestDbusConnect(bus) as t: 5658 if not t.success(): 5659 raise Exception("Expected signals not seen") 5660 5661def test_dbus_connect_wpa_eap(dev, apdev): 5662 """D-Bus AddNetwork and connection with WPA+WPA2-Enterprise AP""" 5663 skip_without_tkip(dev[0]) 5664 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5665 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 5666 5667 ssid = "test-wpa-eap" 5668 params = hostapd.wpa_eap_params(ssid=ssid) 5669 params["wpa"] = "3" 5670 params["rsn_pairwise"] = "CCMP" 5671 hapd = hostapd.add_ap(apdev[0], params) 5672 5673 class TestDbusConnect(TestDbus): 5674 def __init__(self, bus): 5675 TestDbus.__init__(self, bus) 5676 self.done = False 5677 5678 def __enter__(self): 5679 gobject.timeout_add(1, self.run_connect) 5680 gobject.timeout_add(15000, self.timeout) 5681 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, 5682 "PropertiesChanged") 5683 self.add_signal(self.eap, WPAS_DBUS_IFACE, "EAP") 5684 self.loop.run() 5685 return self 5686 5687 def propertiesChanged(self, properties): 5688 logger.debug("propertiesChanged: %s" % str(properties)) 5689 if 'State' in properties and properties['State'] == "completed": 5690 self.done = True 5691 self.loop.quit() 5692 5693 def eap(self, status, parameter): 5694 logger.debug("EAP: status=%s parameter=%s" % (status, parameter)) 5695 5696 def run_connect(self, *args): 5697 logger.debug("run_connect") 5698 args = dbus.Dictionary({'ssid': ssid, 5699 'key_mgmt': 'WPA-EAP', 5700 'eap': 'PEAP', 5701 'identity': 'user', 5702 'password': 'password', 5703 'ca_cert': 'auth_serv/ca.pem', 5704 'phase2': 'auth=MSCHAPV2', 5705 'scan_freq': 2412}, 5706 signature='sv') 5707 self.netw = iface.AddNetwork(args) 5708 iface.SelectNetwork(self.netw) 5709 return False 5710 5711 def success(self): 5712 return self.done 5713 5714 with TestDbusConnect(bus) as t: 5715 if not t.success(): 5716 raise Exception("Expected signals not seen") 5717 5718def test_dbus_ap_scan_2_ap_mode_scan(dev, apdev): 5719 """AP_SCAN 2 AP mode and D-Bus Scan()""" 5720 try: 5721 _test_dbus_ap_scan_2_ap_mode_scan(dev, apdev) 5722 finally: 5723 dev[0].request("AP_SCAN 1") 5724 5725def _test_dbus_ap_scan_2_ap_mode_scan(dev, apdev): 5726 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5727 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 5728 5729 if "OK" not in dev[0].request("AP_SCAN 2"): 5730 raise Exception("Failed to set AP_SCAN 2") 5731 5732 id = dev[0].add_network() 5733 dev[0].set_network(id, "mode", "2") 5734 dev[0].set_network_quoted(id, "ssid", "wpas-ap-open") 5735 dev[0].set_network(id, "key_mgmt", "NONE") 5736 dev[0].set_network(id, "frequency", "2412") 5737 dev[0].set_network(id, "scan_freq", "2412") 5738 dev[0].set_network(id, "disabled", "0") 5739 dev[0].select_network(id) 5740 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=5) 5741 if ev is None: 5742 raise Exception("AP failed to start") 5743 5744 with fail_test(dev[0], 1, "wpa_driver_nl80211_scan"): 5745 iface.Scan({'Type': 'active', 5746 'AllowRoam': True, 5747 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]}) 5748 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-FAILED", 5749 "AP-DISABLED"], timeout=5) 5750 if ev is None: 5751 raise Exception("CTRL-EVENT-SCAN-FAILED not seen") 5752 if "AP-DISABLED" in ev: 5753 raise Exception("Unexpected AP-DISABLED event") 5754 if "retry=1" in ev: 5755 # Wait for the retry to scan happen 5756 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-FAILED", 5757 "AP-DISABLED"], timeout=5) 5758 if ev is None: 5759 raise Exception("CTRL-EVENT-SCAN-FAILED not seen - retry") 5760 if "AP-DISABLED" in ev: 5761 raise Exception("Unexpected AP-DISABLED event - retry") 5762 5763 dev[1].connect("wpas-ap-open", key_mgmt="NONE", scan_freq="2412") 5764 dev[1].request("DISCONNECT") 5765 dev[1].wait_disconnected() 5766 dev[0].request("DISCONNECT") 5767 dev[0].wait_disconnected() 5768 5769def test_dbus_expectdisconnect(dev, apdev): 5770 """D-Bus ExpectDisconnect""" 5771 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5772 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE) 5773 5774 params = {"ssid": "test-open"} 5775 hapd = hostapd.add_ap(apdev[0], params) 5776 dev[0].connect("test-open", key_mgmt="NONE", scan_freq="2412") 5777 5778 # This does not really verify the behavior other than by going through the 5779 # code path for additional coverage. 5780 wpas.ExpectDisconnect() 5781 dev[0].request("DISCONNECT") 5782 dev[0].wait_disconnected() 5783 5784def test_dbus_save_config(dev, apdev): 5785 """D-Bus SaveConfig""" 5786 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5787 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 5788 try: 5789 iface.SaveConfig() 5790 raise Exception("SaveConfig() accepted unexpectedly") 5791 except dbus.exceptions.DBusException as e: 5792 if not str(e).startswith("fi.w1.wpa_supplicant1.UnknownError: Not allowed to update configuration"): 5793 raise Exception("Unexpected error message for SaveConfig(): " + str(e)) 5794 5795def test_dbus_vendor_elem(dev, apdev): 5796 """D-Bus vendor element operations""" 5797 try: 5798 _test_dbus_vendor_elem(dev, apdev) 5799 finally: 5800 dev[0].request("VENDOR_ELEM_REMOVE 1 *") 5801 5802def _test_dbus_vendor_elem(dev, apdev): 5803 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5804 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 5805 5806 dev[0].request("VENDOR_ELEM_REMOVE 1 *") 5807 5808 try: 5809 ie = dbus.ByteArray(b"\x00\x00") 5810 iface.VendorElemAdd(-1, ie) 5811 raise Exception("Invalid VendorElemAdd() accepted") 5812 except dbus.exceptions.DBusException as e: 5813 if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e): 5814 raise Exception("Unexpected error message for invalid VendorElemAdd[1]: " + str(e)) 5815 5816 try: 5817 ie = dbus.ByteArray(b'') 5818 iface.VendorElemAdd(1, ie) 5819 raise Exception("Invalid VendorElemAdd() accepted") 5820 except dbus.exceptions.DBusException as e: 5821 if "InvalidArgs" not in str(e) or "Invalid value" not in str(e): 5822 raise Exception("Unexpected error message for invalid VendorElemAdd[2]: " + str(e)) 5823 5824 try: 5825 ie = dbus.ByteArray(b"\x00\x01") 5826 iface.VendorElemAdd(1, ie) 5827 raise Exception("Invalid VendorElemAdd() accepted") 5828 except dbus.exceptions.DBusException as e: 5829 if "InvalidArgs" not in str(e) or "Parse error" not in str(e): 5830 raise Exception("Unexpected error message for invalid VendorElemAdd[3]: " + str(e)) 5831 5832 try: 5833 iface.VendorElemGet(-1) 5834 raise Exception("Invalid VendorElemGet() accepted") 5835 except dbus.exceptions.DBusException as e: 5836 if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e): 5837 raise Exception("Unexpected error message for invalid VendorElemGet[1]: " + str(e)) 5838 5839 try: 5840 iface.VendorElemGet(1) 5841 raise Exception("Invalid VendorElemGet() accepted") 5842 except dbus.exceptions.DBusException as e: 5843 if "InvalidArgs" not in str(e) or "ID value does not exist" not in str(e): 5844 raise Exception("Unexpected error message for invalid VendorElemGet[2]: " + str(e)) 5845 5846 try: 5847 ie = dbus.ByteArray(b"\x00\x00") 5848 iface.VendorElemRem(-1, ie) 5849 raise Exception("Invalid VendorElemRemove() accepted") 5850 except dbus.exceptions.DBusException as e: 5851 if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e): 5852 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e)) 5853 5854 try: 5855 ie = dbus.ByteArray(b'') 5856 iface.VendorElemRem(1, ie) 5857 raise Exception("Invalid VendorElemRemove() accepted") 5858 except dbus.exceptions.DBusException as e: 5859 if "InvalidArgs" not in str(e) or "Invalid value" not in str(e): 5860 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e)) 5861 5862 iface.VendorElemRem(1, b"*") 5863 5864 ie = dbus.ByteArray(b"\x00\x01\x00") 5865 iface.VendorElemAdd(1, ie) 5866 5867 val = iface.VendorElemGet(1) 5868 if len(val) != len(ie): 5869 raise Exception("Unexpected VendorElemGet length") 5870 for i in range(len(val)): 5871 if val[i] != dbus.Byte(ie[i]): 5872 raise Exception("Unexpected VendorElemGet data") 5873 5874 ie2 = dbus.ByteArray(b"\xe0\x00") 5875 iface.VendorElemAdd(1, ie2) 5876 5877 ies = ie + ie2 5878 val = iface.VendorElemGet(1) 5879 if len(val) != len(ies): 5880 raise Exception("Unexpected VendorElemGet length[2]") 5881 for i in range(len(val)): 5882 if val[i] != dbus.Byte(ies[i]): 5883 raise Exception("Unexpected VendorElemGet data[2]") 5884 5885 try: 5886 test_ie = dbus.ByteArray(b"\x01\x01") 5887 iface.VendorElemRem(1, test_ie) 5888 raise Exception("Invalid VendorElemRemove() accepted") 5889 except dbus.exceptions.DBusException as e: 5890 if "InvalidArgs" not in str(e) or "Parse error" not in str(e): 5891 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e)) 5892 5893 iface.VendorElemRem(1, ie) 5894 val = iface.VendorElemGet(1) 5895 if len(val) != len(ie2): 5896 raise Exception("Unexpected VendorElemGet length[3]") 5897 5898 iface.VendorElemRem(1, b"*") 5899 try: 5900 iface.VendorElemGet(1) 5901 raise Exception("Invalid VendorElemGet() accepted after removal") 5902 except dbus.exceptions.DBusException as e: 5903 if "InvalidArgs" not in str(e) or "ID value does not exist" not in str(e): 5904 raise Exception("Unexpected error message for invalid VendorElemGet after removal: " + str(e)) 5905 5906def test_dbus_assoc_reject(dev, apdev): 5907 """D-Bus AssocStatusCode""" 5908 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5909 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 5910 5911 ssid = "test-open" 5912 params = {"ssid": ssid, 5913 "max_listen_interval": "1"} 5914 hapd = hostapd.add_ap(apdev[0], params) 5915 5916 class TestDbusConnect(TestDbus): 5917 def __init__(self, bus): 5918 TestDbus.__init__(self, bus) 5919 self.assoc_status_seen = False 5920 self.state = 0 5921 5922 def __enter__(self): 5923 gobject.timeout_add(1, self.run_connect) 5924 gobject.timeout_add(15000, self.timeout) 5925 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, 5926 "PropertiesChanged") 5927 self.loop.run() 5928 return self 5929 5930 def propertiesChanged(self, properties): 5931 logger.debug("propertiesChanged: %s" % str(properties)) 5932 if 'AssocStatusCode' in properties: 5933 status = properties['AssocStatusCode'] 5934 if status != 51: 5935 logger.info("Unexpected status code: " + str(status)) 5936 else: 5937 self.assoc_status_seen = True 5938 iface.Disconnect() 5939 self.loop.quit() 5940 5941 def run_connect(self, *args): 5942 args = dbus.Dictionary({'ssid': ssid, 5943 'key_mgmt': 'NONE', 5944 'scan_freq': 2412}, 5945 signature='sv') 5946 self.netw = iface.AddNetwork(args) 5947 iface.SelectNetwork(self.netw) 5948 return False 5949 5950 def success(self): 5951 return self.assoc_status_seen 5952 5953 with TestDbusConnect(bus) as t: 5954 if not t.success(): 5955 raise Exception("Expected signals not seen") 5956 5957def test_dbus_mesh(dev, apdev): 5958 """D-Bus mesh""" 5959 check_mesh_support(dev[0]) 5960 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5961 mesh = dbus.Interface(if_obj, WPAS_DBUS_IFACE_MESH) 5962 5963 add_open_mesh_network(dev[1]) 5964 addr1 = dev[1].own_addr() 5965 5966 class TestDbusMesh(TestDbus): 5967 def __init__(self, bus): 5968 TestDbus.__init__(self, bus) 5969 self.done = False 5970 5971 def __enter__(self): 5972 gobject.timeout_add(1, self.run_test) 5973 gobject.timeout_add(15000, self.timeout) 5974 self.add_signal(self.meshGroupStarted, WPAS_DBUS_IFACE_MESH, 5975 "MeshGroupStarted") 5976 self.add_signal(self.meshGroupRemoved, WPAS_DBUS_IFACE_MESH, 5977 "MeshGroupRemoved") 5978 self.add_signal(self.meshPeerConnected, WPAS_DBUS_IFACE_MESH, 5979 "MeshPeerConnected") 5980 self.add_signal(self.meshPeerDisconnected, WPAS_DBUS_IFACE_MESH, 5981 "MeshPeerDisconnected") 5982 self.loop.run() 5983 return self 5984 5985 def meshGroupStarted(self, args): 5986 logger.debug("MeshGroupStarted: " + str(args)) 5987 5988 def meshGroupRemoved(self, args): 5989 logger.debug("MeshGroupRemoved: " + str(args)) 5990 self.done = True 5991 self.loop.quit() 5992 5993 def meshPeerConnected(self, args): 5994 logger.debug("MeshPeerConnected: " + str(args)) 5995 5996 res = if_obj.Get(WPAS_DBUS_IFACE_MESH, 'MeshPeers', 5997 dbus_interface=dbus.PROPERTIES_IFACE, 5998 byte_arrays=True) 5999 logger.debug("MeshPeers: " + str(res)) 6000 if len(res) != 1: 6001 raise Exception("Unexpected number of MeshPeer values") 6002 if binascii.hexlify(res[0]).decode() != addr1.replace(':', ''): 6003 raise Exception("Unexpected peer address") 6004 6005 res = if_obj.Get(WPAS_DBUS_IFACE_MESH, 'MeshGroup', 6006 dbus_interface=dbus.PROPERTIES_IFACE, 6007 byte_arrays=True) 6008 logger.debug("MeshGroup: " + str(res)) 6009 if res != b"wpas-mesh-open": 6010 raise Exception("Unexpected MeshGroup") 6011 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 6012 dev1.mesh_group_remove() 6013 6014 def meshPeerDisconnected(self, args): 6015 logger.debug("MeshPeerDisconnected: " + str(args)) 6016 dev0 = WpaSupplicant('wlan0', '/tmp/wpas-wlan0') 6017 dev0.mesh_group_remove() 6018 6019 def run_test(self, *args): 6020 logger.debug("run_test") 6021 dev0 = WpaSupplicant('wlan0', '/tmp/wpas-wlan0') 6022 add_open_mesh_network(dev0) 6023 return False 6024 6025 def success(self): 6026 return self.done 6027 6028 with TestDbusMesh(bus) as t: 6029 if not t.success(): 6030 raise Exception("Expected signals not seen") 6031 6032def test_dbus_roam(dev, apdev): 6033 """D-Bus Roam""" 6034 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 6035 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 6036 6037 ssid = "test-wpa2-psk" 6038 passphrase = 'qwertyuiop' 6039 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase) 6040 hapd = hostapd.add_ap(apdev[0], params) 6041 hapd2 = hostapd.add_ap(apdev[1], params) 6042 bssid = apdev[0]['bssid'] 6043 dev[0].scan_for_bss(bssid, freq=2412) 6044 bssid2 = apdev[1]['bssid'] 6045 dev[0].scan_for_bss(bssid2, freq=2412) 6046 6047 class TestDbusConnect(TestDbus): 6048 def __init__(self, bus): 6049 TestDbus.__init__(self, bus) 6050 self.state = 0 6051 6052 def __enter__(self): 6053 gobject.timeout_add(1, self.run_connect) 6054 gobject.timeout_add(15000, self.timeout) 6055 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, 6056 "PropertiesChanged") 6057 self.loop.run() 6058 return self 6059 6060 def propertiesChanged(self, properties): 6061 logger.debug("propertiesChanged: %s" % str(properties)) 6062 if 'State' in properties and properties['State'] == "completed": 6063 if self.state == 0: 6064 self.state = 1 6065 cur = properties["CurrentBSS"] 6066 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, cur) 6067 res = bss_obj.Get(WPAS_DBUS_BSS, 'BSSID', 6068 dbus_interface=dbus.PROPERTIES_IFACE) 6069 bssid_str = '' 6070 for item in res: 6071 if len(bssid_str) > 0: 6072 bssid_str += ':' 6073 bssid_str += '%02x' % item 6074 dst = bssid if bssid_str == bssid2 else bssid2 6075 iface.Roam(dst) 6076 elif self.state == 1: 6077 if "RoamComplete" in properties and \ 6078 properties["RoamComplete"]: 6079 self.state = 2 6080 self.loop.quit() 6081 6082 def run_connect(self, *args): 6083 logger.debug("run_connect") 6084 args = dbus.Dictionary({'ssid': ssid, 6085 'key_mgmt': 'WPA-PSK', 6086 'psk': passphrase, 6087 'scan_freq': 2412}, 6088 signature='sv') 6089 self.netw = iface.AddNetwork(args) 6090 iface.SelectNetwork(self.netw) 6091 return False 6092 6093 def success(self): 6094 return self.state == 2 6095 6096 with TestDbusConnect(bus) as t: 6097 if not t.success(): 6098 raise Exception("Expected signals not seen") 6099 6100def test_dbus_creds(dev, apdev): 6101 "D-Bus interworking credentials" 6102 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 6103 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 6104 6105 args = {'domain': 'server.w1.fi', 6106 'realm': 'server.w1.fi', 6107 'roaming_consortium': '50a9bf', 6108 'required_roaming_consortium': '23bf50', 6109 'eap': 'TTLS', 6110 'phase2': 'auth=MSCHAPV2', 6111 'username': 'user', 6112 'password': 'password', 6113 'domain_suffix_match': 'server.w1.fi', 6114 'ca_cert': 'auth_serv/ca.pem'} 6115 6116 path = iface.AddCred(dbus.Dictionary(args, signature='sv')) 6117 for k, v in args.items(): 6118 if k == 'password': 6119 continue 6120 prop = dev[0].get_cred(0, k) 6121 if prop != v: 6122 raise Exception('Credential add failed: %s does not match %s' % (prop, v)) 6123 6124 iface.RemoveCred(path) 6125 if not "FAIL" in dev[0].get_cred(0, 'domain'): 6126 raise Exception("Credential remove failed") 6127 6128 # Removal of multiple credentials 6129 cred1 = {'domain': 'server1.w1.fi','realm': 'server1.w1.fi','eap': 'TTLS'} 6130 iface.AddCred(dbus.Dictionary(cred1, signature='sv')) 6131 if "FAIL" in dev[0].get_cred(0, 'domain'): 6132 raise Exception("Failed to add credential") 6133 6134 cred2 = {'domain': 'server2.w1.fi','realm': 'server2.w1.fi','eap': 'TTLS'} 6135 iface.AddCred(dbus.Dictionary(cred2, signature='sv')) 6136 if "FAIL" in dev[0].get_cred(1, 'domain'): 6137 raise Exception("Failed to add credential") 6138 6139 iface.RemoveAllCreds() 6140 if not "FAIL" in dev[0].get_cred(0, 'domain'): 6141 raise Exception("Credential remove failed") 6142 if not "FAIL" in dev[0].get_cred(1, 'domain'): 6143 raise Exception("Credential remove failed") 6144 6145def test_dbus_interworking(dev, apdev): 6146 "D-Bus interworking selection" 6147 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 6148 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 6149 6150 params = {"ssid": "test-interworking", "wpa": "2", 6151 "wpa_key_mgmt": "WPA-EAP", "rsn_pairwise": "CCMP", 6152 "ieee8021x": "1", "eapol_version": "2", 6153 "eap_server": "1", "eap_user_file": "auth_serv/eap_user.conf", 6154 "ca_cert": "auth_serv/ca.pem", 6155 "server_cert": "auth_serv/server.pem", 6156 "private_key": "auth_serv/server.key", 6157 "interworking": "1", 6158 "domain_name": "server.w1.fi", 6159 "nai_realm": "0,server.w1.fi,21[2:4][5:7]", 6160 "roaming_consortium": "2233445566", 6161 "hs20": "1", "anqp_domain_id": "1234"} 6162 6163 hapd = hostapd.add_ap(apdev[0], params) 6164 6165 class TestDbusInterworking(TestDbus): 6166 def __init__(self, bus): 6167 TestDbus.__init__(self, bus) 6168 self.interworking_ap_seen = False 6169 self.interworking_select_done = False 6170 6171 def __enter__(self): 6172 gobject.timeout_add(1, self.run_select) 6173 gobject.timeout_add(15000, self.timeout) 6174 self.add_signal(self.interworkingAPAdded, WPAS_DBUS_IFACE, 6175 "InterworkingAPAdded") 6176 self.add_signal(self.interworkingSelectDone, WPAS_DBUS_IFACE, 6177 "InterworkingSelectDone") 6178 self.loop.run() 6179 return self 6180 6181 def interworkingAPAdded(self, bss, cred, properties): 6182 logger.debug("interworkingAPAdded: bss=%s cred=%s %s" % (bss, cred, str(properties))) 6183 if self.cred == cred: 6184 self.interworking_ap_seen = True 6185 6186 def interworkingSelectDone(self): 6187 logger.debug("interworkingSelectDone") 6188 self.interworking_select_done = True 6189 self.loop.quit() 6190 6191 def run_select(self, *args): 6192 args = {"domain": "server.w1.fi", 6193 "realm": "server.w1.fi", 6194 "eap": "TTLS", 6195 "phase2": "auth=MSCHAPV2", 6196 "username": "user", 6197 "password": "password", 6198 "domain_suffix_match": "server.w1.fi", 6199 "ca_cert": "auth_serv/ca.pem"} 6200 self.cred = iface.AddCred(dbus.Dictionary(args, signature='sv')) 6201 iface.InterworkingSelect() 6202 return False 6203 6204 def success(self): 6205 return self.interworking_ap_seen and self.interworking_select_done 6206 6207 with TestDbusInterworking(bus) as t: 6208 if not t.success(): 6209 raise Exception("Expected signals not seen") 6210