1# wpa_supplicant D-Bus interface tests
2# Copyright (c) 2014-2015, Jouni Malinen <j@w1.fi>
3#
4# This software may be distributed under the terms of the BSD license.
5# See README for more details.
6
7import binascii
8import logging
9logger = logging.getLogger()
10import subprocess
11import time
12import shutil
13import struct
14import sys
15from test_ap_hs20 import hs20_ap_params
16
17try:
18    if sys.version_info[0] > 2:
19        from gi.repository import GObject as gobject
20    else:
21        import gobject
22    import dbus
23    dbus_imported = True
24except ImportError:
25    dbus_imported = False
26
27import hostapd
28from wpasupplicant import WpaSupplicant
29from utils import *
30from p2p_utils import *
31from test_ap_tdls import connect_2sta_open
32from test_ap_eap import check_altsubject_match_support, check_eap_capa
33from test_nfc_p2p import set_ip_addr_info
34from test_wpas_mesh import check_mesh_support, add_open_mesh_network
35
36WPAS_DBUS_SERVICE = "fi.w1.wpa_supplicant1"
37WPAS_DBUS_PATH = "/fi/w1/wpa_supplicant1"
38WPAS_DBUS_IFACE = "fi.w1.wpa_supplicant1.Interface"
39WPAS_DBUS_IFACE_WPS = WPAS_DBUS_IFACE + ".WPS"
40WPAS_DBUS_NETWORK = "fi.w1.wpa_supplicant1.Network"
41WPAS_DBUS_BSS = "fi.w1.wpa_supplicant1.BSS"
42WPAS_DBUS_IFACE_P2PDEVICE = WPAS_DBUS_IFACE + ".P2PDevice"
43WPAS_DBUS_P2P_PEER = "fi.w1.wpa_supplicant1.Peer"
44WPAS_DBUS_GROUP = "fi.w1.wpa_supplicant1.Group"
45WPAS_DBUS_PERSISTENT_GROUP = "fi.w1.wpa_supplicant1.PersistentGroup"
46WPAS_DBUS_IFACE_MESH = WPAS_DBUS_IFACE + ".Mesh"
47
48def prepare_dbus(dev):
49    if not dbus_imported:
50        logger.info("No dbus module available")
51        raise HwsimSkip("No dbus module available")
52    try:
53        from dbus.mainloop.glib import DBusGMainLoop
54        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
55        bus = dbus.SystemBus()
56        wpas_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH)
57        wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
58        path = wpas.GetInterface(dev.ifname)
59        if_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
60        return (bus, wpas_obj, path, if_obj)
61    except Exception as e:
62        raise HwsimSkip("Could not connect to D-Bus: %s" % e)
63
64class TestDbus(object):
65    def __init__(self, bus):
66        self.loop = gobject.MainLoop()
67        self.signals = []
68        self.bus = bus
69
70    def __exit__(self, type, value, traceback):
71        for s in self.signals:
72            s.remove()
73
74    def add_signal(self, handler, interface, name, byte_arrays=False):
75        s = self.bus.add_signal_receiver(handler, dbus_interface=interface,
76                                         signal_name=name,
77                                         byte_arrays=byte_arrays)
78        self.signals.append(s)
79
80    def timeout(self, *args):
81        logger.debug("timeout")
82        self.loop.quit()
83        return False
84
85class alloc_fail_dbus(object):
86    def __init__(self, dev, count, funcs, operation="Operation",
87                 expected="NoMemory"):
88        self._dev = dev
89        self._count = count
90        self._funcs = funcs
91        self._operation = operation
92        self._expected = expected
93    def __enter__(self):
94        cmd = "TEST_ALLOC_FAIL %d:%s" % (self._count, self._funcs)
95        if "OK" not in self._dev.request(cmd):
96            raise HwsimSkip("TEST_ALLOC_FAIL not supported")
97    def __exit__(self, type, value, traceback):
98        if type is None:
99            raise Exception("%s succeeded during out-of-memory" % self._operation)
100        if type == dbus.exceptions.DBusException and self._expected in str(value):
101            return True
102        if self._dev.request("GET_ALLOC_FAIL") != "0:%s" % self._funcs:
103            raise Exception("%s did not trigger allocation failure" % self._operation)
104        return False
105
106def start_ap(ap, ssid="test-wps",
107             ap_uuid="27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"):
108    params = {"ssid": ssid, "eap_server": "1", "wps_state": "2",
109              "wpa_passphrase": "12345678", "wpa": "2",
110              "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
111              "ap_pin": "12345670", "uuid": ap_uuid}
112    return hostapd.add_ap(ap, params)
113
114def test_dbus_getall(dev, apdev):
115    """D-Bus GetAll"""
116    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
117
118    dev[0].flush_scan_cache()
119
120    props = wpas_obj.GetAll(WPAS_DBUS_SERVICE,
121                            dbus_interface=dbus.PROPERTIES_IFACE)
122    logger.debug("GetAll(fi.w1.wpa.supplicant1, /fi/w1/wpa_supplicant1) ==> " + str(props))
123
124    props = if_obj.GetAll(WPAS_DBUS_IFACE,
125                          dbus_interface=dbus.PROPERTIES_IFACE)
126    logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE, path, str(props)))
127
128    props = if_obj.GetAll(WPAS_DBUS_IFACE_WPS,
129                          dbus_interface=dbus.PROPERTIES_IFACE)
130    logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE_WPS, path, str(props)))
131
132    res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
133                     dbus_interface=dbus.PROPERTIES_IFACE)
134    if len(res) != 0:
135        raise Exception("Unexpected BSSs entry: " + str(res))
136
137    res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
138                     dbus_interface=dbus.PROPERTIES_IFACE)
139    if len(res) != 0:
140        raise Exception("Unexpected Networks entry: " + str(res))
141
142    hapd = hostapd.add_ap(apdev[0], {"ssid": "open"})
143    bssid = apdev[0]['bssid']
144    dev[0].scan_for_bss(bssid, freq=2412)
145    id = dev[0].add_network()
146    dev[0].set_network(id, "disabled", "0")
147    dev[0].set_network_quoted(id, "ssid", "test")
148
149    res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
150                     dbus_interface=dbus.PROPERTIES_IFACE)
151    if len(res) < 1:
152        raise Exception("Missing BSSs entry: " + str(res))
153    if len(res) > 1:
154        raise Exception("Too manu BSSs entries: " + str(res))
155    bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
156    props = bss_obj.GetAll(WPAS_DBUS_BSS, dbus_interface=dbus.PROPERTIES_IFACE)
157    logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS, res[0], str(props)))
158    bssid_str = ''
159    for item in props['BSSID']:
160        if len(bssid_str) > 0:
161            bssid_str += ':'
162        bssid_str += '%02x' % item
163    if bssid_str != bssid:
164        raise Exception("Unexpected BSSID in BSSs entry")
165
166    res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
167                     dbus_interface=dbus.PROPERTIES_IFACE)
168    if len(res) != 1:
169        raise Exception("Missing Networks entry: " + str(res))
170    net_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
171    props = net_obj.GetAll(WPAS_DBUS_NETWORK,
172                           dbus_interface=dbus.PROPERTIES_IFACE)
173    logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_NETWORK, res[0], str(props)))
174    ssid = props['Properties']['ssid']
175    if ssid != '"test"':
176        raise Exception("Unexpected SSID in network entry")
177
178def test_dbus_getall_oom(dev, apdev):
179    """D-Bus GetAll wpa_config_get_all() OOM"""
180    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
181
182    id = dev[0].add_network()
183    dev[0].set_network(id, "disabled", "0")
184    dev[0].set_network_quoted(id, "ssid", "test")
185
186    res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
187                     dbus_interface=dbus.PROPERTIES_IFACE)
188    if len(res) != 1:
189        raise Exception("Missing Networks entry: " + str(res))
190    net_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
191    for i in range(1, 50):
192        with alloc_fail(dev[0], i, "wpa_config_get_all"):
193            try:
194                props = net_obj.GetAll(WPAS_DBUS_NETWORK,
195                                       dbus_interface=dbus.PROPERTIES_IFACE)
196            except dbus.exceptions.DBusException as e:
197                pass
198
199def dbus_get(dbus, wpas_obj, prop, expect=None, byte_arrays=False):
200    val = wpas_obj.Get(WPAS_DBUS_SERVICE, prop,
201                       dbus_interface=dbus.PROPERTIES_IFACE,
202                       byte_arrays=byte_arrays)
203    if expect is not None and val != expect:
204        raise Exception("Unexpected %s: %s (expected: %s)" %
205                        (prop, str(val), str(expect)))
206    return val
207
208def dbus_set(dbus, wpas_obj, prop, val):
209    wpas_obj.Set(WPAS_DBUS_SERVICE, prop, val,
210                 dbus_interface=dbus.PROPERTIES_IFACE)
211
212def test_dbus_properties(dev, apdev):
213    """D-Bus Get/Set fi.w1.wpa_supplicant1 properties"""
214    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
215
216    dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump")
217    dbus_set(dbus, wpas_obj, "DebugLevel", "debug")
218    dbus_get(dbus, wpas_obj, "DebugLevel", expect="debug")
219    for (val, err) in [(3, "Error.Failed: wrong property type"),
220                       ("foo", "Error.Failed: wrong debug level value")]:
221        try:
222            dbus_set(dbus, wpas_obj, "DebugLevel", val)
223            raise Exception("Invalid DebugLevel value accepted: " + str(val))
224        except dbus.exceptions.DBusException as e:
225            if err not in str(e):
226                raise Exception("Unexpected error message: " + str(e))
227    dbus_set(dbus, wpas_obj, "DebugLevel", "msgdump")
228    dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump")
229
230    dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True)
231    dbus_set(dbus, wpas_obj, "DebugTimestamp", False)
232    dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=False)
233    try:
234        dbus_set(dbus, wpas_obj, "DebugTimestamp", "foo")
235        raise Exception("Invalid DebugTimestamp value accepted")
236    except dbus.exceptions.DBusException as e:
237        if "Error.Failed: wrong property type" not in str(e):
238            raise Exception("Unexpected error message: " + str(e))
239    dbus_set(dbus, wpas_obj, "DebugTimestamp", True)
240    dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True)
241
242    dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True)
243    dbus_set(dbus, wpas_obj, "DebugShowKeys", False)
244    dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=False)
245    try:
246        dbus_set(dbus, wpas_obj, "DebugShowKeys", "foo")
247        raise Exception("Invalid DebugShowKeys value accepted")
248    except dbus.exceptions.DBusException as e:
249        if "Error.Failed: wrong property type" not in str(e):
250            raise Exception("Unexpected error message: " + str(e))
251    dbus_set(dbus, wpas_obj, "DebugShowKeys", True)
252    dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True)
253
254    res = dbus_get(dbus, wpas_obj, "Interfaces")
255    if len(res) != 1:
256        raise Exception("Unexpected Interfaces value: " + str(res))
257
258    res = dbus_get(dbus, wpas_obj, "EapMethods")
259    if len(res) < 5 or "TTLS" not in res:
260        raise Exception("Unexpected EapMethods value: " + str(res))
261
262    res = dbus_get(dbus, wpas_obj, "Capabilities")
263    if len(res) < 2 or "p2p" not in res:
264        raise Exception("Unexpected Capabilities value: " + str(res))
265
266    dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
267    val = binascii.unhexlify("010006020304050608")
268    dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val))
269    res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
270    if val != res:
271        raise Exception("WFDIEs value changed")
272    try:
273        dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(b'\x00'))
274        raise Exception("Invalid WFDIEs value accepted")
275    except dbus.exceptions.DBusException as e:
276        if "InvalidArgs" not in str(e):
277            raise Exception("Unexpected error message: " + str(e))
278    dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(b''))
279    dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val))
280    dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(b''))
281    res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
282    if len(res) != 0:
283        raise Exception("WFDIEs not cleared properly")
284
285    res = dbus_get(dbus, wpas_obj, "EapMethods")
286    try:
287        dbus_set(dbus, wpas_obj, "EapMethods", res)
288        raise Exception("Invalid Set accepted")
289    except dbus.exceptions.DBusException as e:
290        if "InvalidArgs: Property is read-only" not in str(e):
291            raise Exception("Unexpected error message: " + str(e))
292
293    try:
294        wpas_obj.SetFoo(WPAS_DBUS_SERVICE, "DebugShowKeys", True,
295                        dbus_interface=dbus.PROPERTIES_IFACE)
296        raise Exception("Unknown method accepted")
297    except dbus.exceptions.DBusException as e:
298        if "UnknownMethod" not in str(e):
299            raise Exception("Unexpected error message: " + str(e))
300
301    try:
302        wpas_obj.Get("foo", "DebugShowKeys",
303                     dbus_interface=dbus.PROPERTIES_IFACE)
304        raise Exception("Invalid Get accepted")
305    except dbus.exceptions.DBusException as e:
306        if "InvalidArgs: No such property" not in str(e):
307            raise Exception("Unexpected error message: " + str(e))
308
309    test_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH,
310                              introspect=False)
311    try:
312        test_obj.Get(123, "DebugShowKeys",
313                     dbus_interface=dbus.PROPERTIES_IFACE)
314        raise Exception("Invalid Get accepted")
315    except dbus.exceptions.DBusException as e:
316        if "InvalidArgs: Invalid arguments" not in str(e):
317            raise Exception("Unexpected error message: " + str(e))
318    try:
319        test_obj.Get(WPAS_DBUS_SERVICE, 123,
320                     dbus_interface=dbus.PROPERTIES_IFACE)
321        raise Exception("Invalid Get accepted")
322    except dbus.exceptions.DBusException as e:
323        if "InvalidArgs: Invalid arguments" not in str(e):
324            raise Exception("Unexpected error message: " + str(e))
325
326    try:
327        wpas_obj.Set(WPAS_DBUS_SERVICE, "WFDIEs",
328                     dbus.ByteArray(b'', variant_level=2),
329                     dbus_interface=dbus.PROPERTIES_IFACE)
330        raise Exception("Invalid Set accepted")
331    except dbus.exceptions.DBusException as e:
332        if "InvalidArgs: invalid message format" not in str(e):
333            raise Exception("Unexpected error message: " + str(e))
334
335def test_dbus_set_global_properties(dev, apdev):
336    """D-Bus Get/Set fi.w1.wpa_supplicant1 interface global properties"""
337    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
338
339    dev[0].set("model_name", "")
340    props = [('Okc', '0', '1'), ('ModelName', '', 'blahblahblah')]
341
342    for p in props:
343        res = if_obj.Get(WPAS_DBUS_IFACE, p[0],
344                         dbus_interface=dbus.PROPERTIES_IFACE)
345        if res != p[1]:
346            raise Exception("Unexpected " + p[0] + " value: " + str(res))
347
348        if_obj.Set(WPAS_DBUS_IFACE, p[0], p[2],
349                   dbus_interface=dbus.PROPERTIES_IFACE)
350
351        res = if_obj.Get(WPAS_DBUS_IFACE, p[0],
352                         dbus_interface=dbus.PROPERTIES_IFACE)
353        if res != p[2]:
354            raise Exception("Unexpected " + p[0] + " value after set: " + str(res))
355    dev[0].set("model_name", "")
356
357def test_dbus_invalid_method(dev, apdev):
358    """D-Bus invalid method"""
359    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
360    wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
361
362    try:
363        wps.Foo()
364        raise Exception("Unknown method accepted")
365    except dbus.exceptions.DBusException as e:
366        if "UnknownMethod" not in str(e):
367            raise Exception("Unexpected error message: " + str(e))
368
369    test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False)
370    test_wps = dbus.Interface(test_obj, WPAS_DBUS_IFACE_WPS)
371    try:
372        test_wps.Start(123)
373        raise Exception("WPS.Start with incorrect signature accepted")
374    except dbus.exceptions.DBusException as e:
375        if "InvalidArgs: Invalid arg" not in str(e):
376            raise Exception("Unexpected error message: " + str(e))
377
378def test_dbus_get_set_wps(dev, apdev):
379    """D-Bus Get/Set for WPS properties"""
380    try:
381        _test_dbus_get_set_wps(dev, apdev)
382    finally:
383        dev[0].request("SET wps_cred_processing 0")
384        dev[0].request("SET config_methods display keypad virtual_display nfc_interface p2ps")
385        dev[0].set("device_name", "Device A")
386        dev[0].set("manufacturer", "")
387        dev[0].set("model_name", "")
388        dev[0].set("model_number", "")
389        dev[0].set("serial_number", "")
390        dev[0].set("device_type", "0-00000000-0")
391
392def _test_dbus_get_set_wps(dev, apdev):
393    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
394
395    if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
396               dbus_interface=dbus.PROPERTIES_IFACE)
397
398    val = "display keypad virtual_display nfc_interface"
399    dev[0].request("SET config_methods " + val)
400
401    config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
402                        dbus_interface=dbus.PROPERTIES_IFACE)
403    if config != val:
404        raise Exception("Unexpected Get(ConfigMethods) result: " + config)
405
406    val2 = "push_button display"
407    if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2,
408               dbus_interface=dbus.PROPERTIES_IFACE)
409    config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
410                        dbus_interface=dbus.PROPERTIES_IFACE)
411    if config != val2:
412        raise Exception("Unexpected Get(ConfigMethods) result after Set: " + config)
413
414    dev[0].request("SET config_methods " + val)
415
416    for i in range(3):
417        dev[0].request("SET wps_cred_processing " + str(i))
418        val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
419                         dbus_interface=dbus.PROPERTIES_IFACE)
420        expected_val = False if i == 1 else True
421        if val != expected_val:
422            raise Exception("Unexpected Get(ProcessCredentials) result({}): {}".format(i, val))
423
424    tests = [("device_name", "DeviceName"),
425             ("manufacturer", "Manufacturer"),
426             ("model_name", "ModelName"),
427             ("model_number", "ModelNumber"),
428             ("serial_number", "SerialNumber")]
429
430    for f1, f2 in tests:
431        val2 = "test-value-test"
432        dev[0].set(f1, val2)
433        val = if_obj.Get(WPAS_DBUS_IFACE_WPS, f2,
434                         dbus_interface=dbus.PROPERTIES_IFACE)
435        if val != val2:
436            raise Exception("Get(%s) returned unexpected value" % f2)
437        val2 = "TEST-value"
438        if_obj.Set(WPAS_DBUS_IFACE_WPS, f2, val2,
439                   dbus_interface=dbus.PROPERTIES_IFACE)
440        val = if_obj.Get(WPAS_DBUS_IFACE_WPS, f2,
441                         dbus_interface=dbus.PROPERTIES_IFACE)
442        if val != val2:
443            raise Exception("Get(%s) returned unexpected value after Set" % f2)
444
445    dev[0].set("device_type", "5-0050F204-1")
446    val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "DeviceType",
447                     dbus_interface=dbus.PROPERTIES_IFACE)
448    if val[0] != 0x00 or val[1] != 0x05 != val[2] != 0x00 or val[3] != 0x50 or val[4] != 0xf2 or val[5] != 0x04 or val[6] != 0x00 or val[7] != 0x01:
449        raise Exception("DeviceType mismatch")
450    if_obj.Set(WPAS_DBUS_IFACE_WPS, "DeviceType", val,
451               dbus_interface=dbus.PROPERTIES_IFACE)
452    val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "DeviceType",
453                     dbus_interface=dbus.PROPERTIES_IFACE)
454    if val[0] != 0x00 or val[1] != 0x05 != val[2] != 0x00 or val[3] != 0x50 or val[4] != 0xf2 or val[5] != 0x04 or val[6] != 0x00 or val[7] != 0x01:
455        raise Exception("DeviceType mismatch after Set")
456
457    val2 = b'\x01\x02\x03\x04\x05\x06\x07\x08'
458    if_obj.Set(WPAS_DBUS_IFACE_WPS, "DeviceType", dbus.ByteArray(val2),
459               dbus_interface=dbus.PROPERTIES_IFACE)
460    val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "DeviceType",
461                     dbus_interface=dbus.PROPERTIES_IFACE,
462                     byte_arrays=True)
463    if val != val2:
464        raise Exception("DeviceType mismatch after Set (2)")
465
466    class TestDbusGetSet(TestDbus):
467        def __init__(self, bus):
468            TestDbus.__init__(self, bus)
469            self.signal_received = False
470            self.signal_received_deprecated = False
471            self.sets_done = False
472
473        def __enter__(self):
474            gobject.timeout_add(1, self.run_sets)
475            gobject.timeout_add(1000, self.timeout)
476            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE_WPS,
477                            "PropertiesChanged")
478            self.add_signal(self.propertiesChanged2, dbus.PROPERTIES_IFACE,
479                            "PropertiesChanged")
480            self.loop.run()
481            return self
482
483        def propertiesChanged(self, properties):
484            logger.debug("PropertiesChanged: " + str(properties))
485            if "ProcessCredentials" in properties:
486                self.signal_received_deprecated = True
487                if self.sets_done and self.signal_received:
488                    self.loop.quit()
489
490        def propertiesChanged2(self, interface_name, changed_properties,
491                               invalidated_properties):
492            logger.debug("propertiesChanged2: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
493            if interface_name != WPAS_DBUS_IFACE_WPS:
494                return
495            if "ProcessCredentials" in changed_properties:
496                self.signal_received = True
497                if self.sets_done and self.signal_received_deprecated:
498                    self.loop.quit()
499
500        def run_sets(self, *args):
501            logger.debug("run_sets")
502            if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
503                       dbus.Boolean(1),
504                       dbus_interface=dbus.PROPERTIES_IFACE)
505            if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
506                          dbus_interface=dbus.PROPERTIES_IFACE) != True:
507                raise Exception("Unexpected Get(ProcessCredentials) result after Set")
508            if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
509                       dbus.Boolean(0),
510                       dbus_interface=dbus.PROPERTIES_IFACE)
511            if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
512                          dbus_interface=dbus.PROPERTIES_IFACE) != False:
513                raise Exception("Unexpected Get(ProcessCredentials) result after Set")
514
515            self.dbus_sets_done = True
516            return False
517
518        def success(self):
519            return self.signal_received and self.signal_received_deprecated
520
521    with TestDbusGetSet(bus) as t:
522        if not t.success():
523            raise Exception("No signal received for ProcessCredentials change")
524
525def test_dbus_wps_invalid(dev, apdev):
526    """D-Bus invaldi WPS operation"""
527    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
528    wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
529
530    failures = [{'Role': 'foo', 'Type': 'pbc'},
531                {'Role': 123, 'Type': 'pbc'},
532                {'Type': 'pbc'},
533                {'Role': 'enrollee'},
534                {'Role': 'registrar'},
535                {'Role': 'enrollee', 'Type': 123},
536                {'Role': 'enrollee', 'Type': 'foo'},
537                {'Role': 'enrollee', 'Type': 'pbc',
538                 'Bssid': '02:33:44:55:66:77'},
539                {'Role': 'enrollee', 'Type': 'pin', 'Pin': 123},
540                {'Role': 'enrollee', 'Type': 'pbc',
541                 'Bssid': dbus.ByteArray(b'12345')},
542                {'Role': 'enrollee', 'Type': 'pbc',
543                 'P2PDeviceAddress': 12345},
544                {'Role': 'enrollee', 'Type': 'pbc',
545                 'P2PDeviceAddress': dbus.ByteArray(b'12345')},
546                {'Role': 'enrollee', 'Type': 'pbc', 'Foo': 'bar'}]
547    for args in failures:
548        try:
549            wps.Start(args)
550            raise Exception("Invalid WPS.Start() arguments accepted: " + str(args))
551        except dbus.exceptions.DBusException as e:
552            if not str(e).startswith("fi.w1.wpa_supplicant1.InvalidArgs"):
553                raise Exception("Unexpected error message: " + str(e))
554
555def test_dbus_wps_oom(dev, apdev):
556    """D-Bus WPS operation (OOM)"""
557    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
558    wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
559
560    with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_state", "Get"):
561        if_obj.Get(WPAS_DBUS_IFACE, "State",
562                   dbus_interface=dbus.PROPERTIES_IFACE)
563
564    hapd = hostapd.add_ap(apdev[0], {"ssid": "open"})
565    bssid = apdev[0]['bssid']
566    dev[0].scan_for_bss(bssid, freq=2412)
567
568    time.sleep(0.05)
569    for i in range(1, 3):
570        with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_bsss", "Get"):
571            if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
572                       dbus_interface=dbus.PROPERTIES_IFACE)
573
574    res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
575                     dbus_interface=dbus.PROPERTIES_IFACE)
576    bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
577    with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_bss_rates", "Get"):
578        bss_obj.Get(WPAS_DBUS_BSS, "Rates",
579                    dbus_interface=dbus.PROPERTIES_IFACE)
580    with alloc_fail(dev[0], 1,
581                    "wpa_bss_get_bit_rates;wpas_dbus_getter_bss_rates"):
582        try:
583            bss_obj.Get(WPAS_DBUS_BSS, "Rates",
584                        dbus_interface=dbus.PROPERTIES_IFACE)
585        except dbus.exceptions.DBusException as e:
586            pass
587
588    id = dev[0].add_network()
589    dev[0].set_network(id, "disabled", "0")
590    dev[0].set_network_quoted(id, "ssid", "test")
591
592    for i in range(1, 3):
593        with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_networks", "Get"):
594            if_obj.Get(WPAS_DBUS_IFACE, "Networks",
595                       dbus_interface=dbus.PROPERTIES_IFACE)
596
597    with alloc_fail_dbus(dev[0], 1, "wpas_dbus_getter_interfaces", "Get"):
598        dbus_get(dbus, wpas_obj, "Interfaces")
599
600    for i in range(1, 6):
601        with alloc_fail_dbus(dev[0], i, "=eap_get_names_as_string_array;wpas_dbus_getter_eap_methods", "Get"):
602            dbus_get(dbus, wpas_obj, "EapMethods")
603
604    with alloc_fail_dbus(dev[0], 1, "wpas_dbus_setter_config_methods", "Set",
605                         expected="Error.Failed: Failed to set property"):
606        val2 = "push_button display"
607        if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2,
608                   dbus_interface=dbus.PROPERTIES_IFACE)
609
610    with alloc_fail_dbus(dev[0], 1, "=wpa_config_add_network;wpas_dbus_handler_wps_start",
611                         "WPS.Start",
612                         expected="UnknownError: WPS start failed"):
613        wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670'})
614
615def test_dbus_wps_pbc(dev, apdev):
616    """D-Bus WPS/PBC operation and signals"""
617    try:
618        _test_dbus_wps_pbc(dev, apdev)
619    finally:
620        dev[0].request("SET wps_cred_processing 0")
621
622def _test_dbus_wps_pbc(dev, apdev):
623    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
624    wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
625
626    hapd = start_ap(apdev[0])
627    hapd.request("WPS_PBC")
628    bssid = apdev[0]['bssid']
629    dev[0].flush_scan_cache()
630    dev[0].scan_for_bss(bssid, freq="2412")
631    dev[0].request("SET wps_cred_processing 2")
632
633    res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
634                     dbus_interface=dbus.PROPERTIES_IFACE)
635    if len(res) != 1:
636        raise Exception("Missing BSSs entry: " + str(res))
637    bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
638    props = bss_obj.GetAll(WPAS_DBUS_BSS, dbus_interface=dbus.PROPERTIES_IFACE)
639    logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS, res[0], str(props)))
640    if 'WPS' not in props:
641        raise Exception("No WPS information in the BSS entry")
642    if 'Type' not in props['WPS']:
643        raise Exception("No Type field in the WPS dictionary")
644    if props['WPS']['Type'] != 'pbc':
645        raise Exception("Unexpected WPS Type: " + props['WPS']['Type'])
646
647    class TestDbusWps(TestDbus):
648        def __init__(self, bus, wps):
649            TestDbus.__init__(self, bus)
650            self.success_seen = False
651            self.credentials_received = False
652            self.wps = wps
653
654        def __enter__(self):
655            gobject.timeout_add(1, self.start_pbc)
656            gobject.timeout_add(15000, self.timeout)
657            self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
658            self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
659                            "Credentials")
660            self.loop.run()
661            return self
662
663        def wpsEvent(self, name, args):
664            logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
665            if name == "success":
666                self.success_seen = True
667                if self.credentials_received:
668                    self.loop.quit()
669
670        def credentials(self, args):
671            logger.debug("credentials: " + str(args))
672            self.credentials_received = True
673            if self.success_seen:
674                self.loop.quit()
675
676        def start_pbc(self, *args):
677            logger.debug("start_pbc")
678            self.wps.Start({'Role': 'enrollee', 'Type': 'pbc'})
679            return False
680
681        def success(self):
682            return self.success_seen and self.credentials_received
683
684    with TestDbusWps(bus, wps) as t:
685        if not t.success():
686            raise Exception("Failure in D-Bus operations")
687
688    dev[0].wait_connected(timeout=10)
689    dev[0].request("DISCONNECT")
690    hapd.disable()
691    dev[0].flush_scan_cache()
692
693def test_dbus_wps_pbc_overlap(dev, apdev):
694    """D-Bus WPS/PBC operation and signal for PBC overlap"""
695    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
696    wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
697
698    hapd = start_ap(apdev[0])
699    hapd2 = start_ap(apdev[1], ssid="test-wps2",
700                     ap_uuid="27ea801a-9e5c-4e73-bd82-f89cbcd10d7f")
701    hapd.request("WPS_PBC")
702    hapd2.request("WPS_PBC")
703    bssid = apdev[0]['bssid']
704    dev[0].scan_for_bss(bssid, freq="2412")
705    bssid2 = apdev[1]['bssid']
706    dev[0].scan_for_bss(bssid2, freq="2412")
707
708    class TestDbusWps(TestDbus):
709        def __init__(self, bus, wps):
710            TestDbus.__init__(self, bus)
711            self.overlap_seen = False
712            self.wps = wps
713
714        def __enter__(self):
715            gobject.timeout_add(1, self.start_pbc)
716            gobject.timeout_add(15000, self.timeout)
717            self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
718            self.loop.run()
719            return self
720
721        def wpsEvent(self, name, args):
722            logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
723            if name == "pbc-overlap":
724                self.overlap_seen = True
725                self.loop.quit()
726
727        def start_pbc(self, *args):
728            logger.debug("start_pbc")
729            self.wps.Start({'Role': 'enrollee', 'Type': 'pbc'})
730            return False
731
732        def success(self):
733            return self.overlap_seen
734
735    with TestDbusWps(bus, wps) as t:
736        if not t.success():
737            raise Exception("Failure in D-Bus operations")
738
739    dev[0].request("WPS_CANCEL")
740    dev[0].request("DISCONNECT")
741    hapd.disable()
742    hapd2.disable()
743    dev[0].flush_scan_cache()
744
745def test_dbus_wps_pin(dev, apdev):
746    """D-Bus WPS/PIN operation and signals"""
747    try:
748        _test_dbus_wps_pin(dev, apdev)
749    finally:
750        dev[0].request("SET wps_cred_processing 0")
751
752def _test_dbus_wps_pin(dev, apdev):
753    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
754    wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
755
756    hapd = start_ap(apdev[0])
757    hapd.request("WPS_PIN any 12345670")
758    bssid = apdev[0]['bssid']
759    dev[0].scan_for_bss(bssid, freq="2412")
760    dev[0].request("SET wps_cred_processing 2")
761
762    class TestDbusWps(TestDbus):
763        def __init__(self, bus):
764            TestDbus.__init__(self, bus)
765            self.success_seen = False
766            self.credentials_received = False
767
768        def __enter__(self):
769            gobject.timeout_add(1, self.start_pin)
770            gobject.timeout_add(15000, self.timeout)
771            self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
772            self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
773                            "Credentials")
774            self.loop.run()
775            return self
776
777        def wpsEvent(self, name, args):
778            logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
779            if name == "success":
780                self.success_seen = True
781                if self.credentials_received:
782                    self.loop.quit()
783
784        def credentials(self, args):
785            logger.debug("credentials: " + str(args))
786            self.credentials_received = True
787            if self.success_seen:
788                self.loop.quit()
789
790        def start_pin(self, *args):
791            logger.debug("start_pin")
792            bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':', '').encode()))
793            wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
794                       'Bssid': bssid_ay})
795            return False
796
797        def success(self):
798            return self.success_seen and self.credentials_received
799
800    with TestDbusWps(bus) as t:
801        if not t.success():
802            raise Exception("Failure in D-Bus operations")
803
804    dev[0].wait_connected(timeout=10)
805
806def test_dbus_wps_pin2(dev, apdev):
807    """D-Bus WPS/PIN operation and signals (PIN from wpa_supplicant)"""
808    try:
809        _test_dbus_wps_pin2(dev, apdev)
810    finally:
811        dev[0].request("SET wps_cred_processing 0")
812
813def _test_dbus_wps_pin2(dev, apdev):
814    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
815    wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
816
817    hapd = start_ap(apdev[0])
818    bssid = apdev[0]['bssid']
819    dev[0].scan_for_bss(bssid, freq="2412")
820    dev[0].request("SET wps_cred_processing 2")
821
822    class TestDbusWps(TestDbus):
823        def __init__(self, bus):
824            TestDbus.__init__(self, bus)
825            self.success_seen = False
826            self.failed = False
827
828        def __enter__(self):
829            gobject.timeout_add(1, self.start_pin)
830            gobject.timeout_add(15000, self.timeout)
831            self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
832            self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
833                            "Credentials")
834            self.loop.run()
835            return self
836
837        def wpsEvent(self, name, args):
838            logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
839            if name == "success":
840                self.success_seen = True
841                if self.credentials_received:
842                    self.loop.quit()
843
844        def credentials(self, args):
845            logger.debug("credentials: " + str(args))
846            self.credentials_received = True
847            if self.success_seen:
848                self.loop.quit()
849
850        def start_pin(self, *args):
851            logger.debug("start_pin")
852            bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':', '').encode()))
853            res = wps.Start({'Role': 'enrollee', 'Type': 'pin',
854                             'Bssid': bssid_ay})
855            pin = res['Pin']
856            h = hostapd.Hostapd(apdev[0]['ifname'])
857            h.request("WPS_PIN any " + pin)
858            return False
859
860        def success(self):
861            return self.success_seen and self.credentials_received
862
863    with TestDbusWps(bus) as t:
864        if not t.success():
865            raise Exception("Failure in D-Bus operations")
866
867    dev[0].wait_connected(timeout=10)
868
869def test_dbus_wps_pin_m2d(dev, apdev):
870    """D-Bus WPS/PIN operation and signals with M2D"""
871    try:
872        _test_dbus_wps_pin_m2d(dev, apdev)
873    finally:
874        dev[0].request("SET wps_cred_processing 0")
875
876def _test_dbus_wps_pin_m2d(dev, apdev):
877    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
878    wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
879
880    hapd = start_ap(apdev[0])
881    bssid = apdev[0]['bssid']
882    dev[0].scan_for_bss(bssid, freq="2412")
883    dev[0].request("SET wps_cred_processing 2")
884
885    class TestDbusWps(TestDbus):
886        def __init__(self, bus):
887            TestDbus.__init__(self, bus)
888            self.success_seen = False
889            self.credentials_received = False
890
891        def __enter__(self):
892            gobject.timeout_add(1, self.start_pin)
893            gobject.timeout_add(15000, self.timeout)
894            self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
895            self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
896                            "Credentials")
897            self.loop.run()
898            return self
899
900        def wpsEvent(self, name, args):
901            logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
902            if name == "success":
903                self.success_seen = True
904                if self.credentials_received:
905                    self.loop.quit()
906            elif name == "m2d":
907                h = hostapd.Hostapd(apdev[0]['ifname'])
908                h.request("WPS_PIN any 12345670")
909
910        def credentials(self, args):
911            logger.debug("credentials: " + str(args))
912            self.credentials_received = True
913            if self.success_seen:
914                self.loop.quit()
915
916        def start_pin(self, *args):
917            logger.debug("start_pin")
918            bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':', '').encode()))
919            wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
920                       'Bssid': bssid_ay})
921            return False
922
923        def success(self):
924            return self.success_seen and self.credentials_received
925
926    with TestDbusWps(bus) as t:
927        if not t.success():
928            raise Exception("Failure in D-Bus operations")
929
930    dev[0].wait_connected(timeout=10)
931
932def test_dbus_wps_reg(dev, apdev):
933    """D-Bus WPS/Registrar operation and signals"""
934    try:
935        _test_dbus_wps_reg(dev, apdev)
936    finally:
937        dev[0].request("SET wps_cred_processing 0")
938
939def _test_dbus_wps_reg(dev, apdev):
940    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
941    wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
942
943    hapd = start_ap(apdev[0])
944    hapd.request("WPS_PIN any 12345670")
945    bssid = apdev[0]['bssid']
946    dev[0].scan_for_bss(bssid, freq="2412")
947    dev[0].request("SET wps_cred_processing 2")
948
949    class TestDbusWps(TestDbus):
950        def __init__(self, bus):
951            TestDbus.__init__(self, bus)
952            self.credentials_received = False
953
954        def __enter__(self):
955            gobject.timeout_add(100, self.start_reg)
956            gobject.timeout_add(15000, self.timeout)
957            self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
958            self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
959                            "Credentials")
960            self.loop.run()
961            return self
962
963        def wpsEvent(self, name, args):
964            logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
965
966        def credentials(self, args):
967            logger.debug("credentials: " + str(args))
968            self.credentials_received = True
969            self.loop.quit()
970
971        def start_reg(self, *args):
972            logger.debug("start_reg")
973            bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':', '').encode()))
974            wps.Start({'Role': 'registrar', 'Type': 'pin',
975                       'Pin': '12345670', 'Bssid': bssid_ay})
976            return False
977
978        def success(self):
979            return self.credentials_received
980
981    with TestDbusWps(bus) as t:
982        if not t.success():
983            raise Exception("Failure in D-Bus operations")
984
985    dev[0].wait_connected(timeout=10)
986
987def test_dbus_wps_cancel(dev, apdev):
988    """D-Bus WPS Cancel operation"""
989    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
990    wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
991
992    hapd = start_ap(apdev[0])
993    bssid = apdev[0]['bssid']
994
995    wps.Cancel()
996    dev[0].scan_for_bss(bssid, freq="2412")
997    bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':', '').encode()))
998    wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
999               'Bssid': bssid_ay})
1000    wps.Cancel()
1001    dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 1)
1002
1003def test_dbus_scan_invalid(dev, apdev):
1004    """D-Bus invalid scan method"""
1005    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1006    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1007
1008    tests = [({}, "InvalidArgs"),
1009             ({'Type': 123}, "InvalidArgs"),
1010             ({'Type': 'foo'}, "InvalidArgs"),
1011             ({'Type': 'active', 'Foo': 'bar'}, "InvalidArgs"),
1012             ({'Type': 'active', 'SSIDs': 'foo'}, "InvalidArgs"),
1013             ({'Type': 'active', 'SSIDs': ['foo']}, "InvalidArgs"),
1014             ({'Type': 'active',
1015               'SSIDs': [dbus.ByteArray(b"1"), dbus.ByteArray(b"2"),
1016                         dbus.ByteArray(b"3"), dbus.ByteArray(b"4"),
1017                         dbus.ByteArray(b"5"), dbus.ByteArray(b"6"),
1018                         dbus.ByteArray(b"7"), dbus.ByteArray(b"8"),
1019                         dbus.ByteArray(b"9"), dbus.ByteArray(b"10"),
1020                         dbus.ByteArray(b"11"), dbus.ByteArray(b"12"),
1021                         dbus.ByteArray(b"13"), dbus.ByteArray(b"14"),
1022                         dbus.ByteArray(b"15"), dbus.ByteArray(b"16"),
1023                         dbus.ByteArray(b"17")]},
1024              "InvalidArgs"),
1025             ({'Type': 'active',
1026               'SSIDs': [dbus.ByteArray(b"1234567890abcdef1234567890abcdef1")]},
1027              "InvalidArgs"),
1028             ({'Type': 'active', 'IEs': 'foo'}, "InvalidArgs"),
1029             ({'Type': 'active', 'IEs': ['foo']}, "InvalidArgs"),
1030             ({'Type': 'active', 'Channels': 2412}, "InvalidArgs"),
1031             ({'Type': 'active', 'Channels': [2412]}, "InvalidArgs"),
1032             ({'Type': 'active',
1033               'Channels': [(dbus.Int32(2412), dbus.UInt32(20))]},
1034              "InvalidArgs"),
1035             ({'Type': 'active',
1036               'Channels': [(dbus.UInt32(2412), dbus.Int32(20))]},
1037              "InvalidArgs"),
1038             ({'Type': 'active', 'AllowRoam': "yes"}, "InvalidArgs"),
1039             ({'Type': 'passive', 'IEs': [dbus.ByteArray(b"\xdd\x00")]},
1040              "InvalidArgs"),
1041             ({'Type': 'passive', 'SSIDs': [dbus.ByteArray(b"foo")]},
1042              "InvalidArgs")]
1043    for (t, err) in tests:
1044        try:
1045            iface.Scan(t)
1046            raise Exception("Invalid Scan() arguments accepted: " + str(t))
1047        except dbus.exceptions.DBusException as e:
1048            if err not in str(e):
1049                raise Exception("Unexpected error message for invalid Scan(%s): %s" % (str(t), str(e)))
1050
1051def test_dbus_scan_oom(dev, apdev):
1052    """D-Bus scan method and OOM"""
1053    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1054    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1055
1056    with alloc_fail_dbus(dev[0], 1,
1057                         "wpa_scan_clone_params;wpas_dbus_handler_scan",
1058                         "Scan", expected="ScanError: Scan request rejected"):
1059        iface.Scan({'Type': 'passive',
1060                    'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1061
1062    with alloc_fail_dbus(dev[0], 1,
1063                         "=wpas_dbus_get_scan_channels;wpas_dbus_handler_scan",
1064                         "Scan"):
1065        iface.Scan({'Type': 'passive',
1066                    'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1067
1068    with alloc_fail_dbus(dev[0], 1,
1069                         "=wpas_dbus_get_scan_ies;wpas_dbus_handler_scan",
1070                         "Scan"):
1071        iface.Scan({'Type': 'active',
1072                    'IEs': [dbus.ByteArray(b"\xdd\x00")],
1073                    'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1074
1075    with alloc_fail_dbus(dev[0], 1,
1076                         "=wpas_dbus_get_scan_ssids;wpas_dbus_handler_scan",
1077                         "Scan"):
1078        iface.Scan({'Type': 'active',
1079                    'SSIDs': [dbus.ByteArray(b"open"),
1080                              dbus.ByteArray()],
1081                    'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1082
1083def test_dbus_scan(dev, apdev):
1084    """D-Bus scan and related signals"""
1085    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1086    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1087
1088    hapd = hostapd.add_ap(apdev[0], {"ssid": "open"})
1089
1090    class TestDbusScan(TestDbus):
1091        def __init__(self, bus):
1092            TestDbus.__init__(self, bus)
1093            self.scan_completed = 0
1094            self.bss_added = False
1095            self.fail_reason = None
1096
1097        def __enter__(self):
1098            gobject.timeout_add(1, self.run_scan)
1099            gobject.timeout_add(15000, self.timeout)
1100            self.add_signal(self.scanDone, WPAS_DBUS_IFACE, "ScanDone")
1101            self.add_signal(self.bssAdded, WPAS_DBUS_IFACE, "BSSAdded")
1102            self.add_signal(self.bssRemoved, WPAS_DBUS_IFACE, "BSSRemoved")
1103            self.loop.run()
1104            return self
1105
1106        def scanDone(self, success):
1107            logger.debug("scanDone: success=%s" % success)
1108            self.scan_completed += 1
1109            if self.scan_completed == 1:
1110                iface.Scan({'Type': 'passive',
1111                            'AllowRoam': True,
1112                            'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1113            elif self.scan_completed == 2:
1114                iface.Scan({'Type': 'passive',
1115                            'AllowRoam': False})
1116            elif self.bss_added and self.scan_completed == 3:
1117                self.loop.quit()
1118
1119        def bssAdded(self, bss, properties):
1120            logger.debug("bssAdded: %s" % bss)
1121            logger.debug(str(properties))
1122            if 'WPS' in properties:
1123                if 'Type' in properties['WPS']:
1124                    self.fail_reason = "Unexpected WPS dictionary entry in non-WPS BSS"
1125                    self.loop.quit()
1126            self.bss_added = True
1127            if self.scan_completed == 3:
1128                self.loop.quit()
1129
1130        def bssRemoved(self, bss):
1131            logger.debug("bssRemoved: %s" % bss)
1132
1133        def run_scan(self, *args):
1134            logger.debug("run_scan")
1135            iface.Scan({'Type': 'active',
1136                        'SSIDs': [dbus.ByteArray(b"open"),
1137                                  dbus.ByteArray()],
1138                        'IEs': [dbus.ByteArray(b"\xdd\x00"),
1139                                dbus.ByteArray()],
1140                        'AllowRoam': False,
1141                        'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1142            return False
1143
1144        def success(self):
1145            return self.scan_completed == 3 and self.bss_added
1146
1147    with TestDbusScan(bus) as t:
1148        if t.fail_reason:
1149            raise Exception(t.fail_reason)
1150        if not t.success():
1151            raise Exception("Expected signals not seen")
1152
1153    res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
1154                     dbus_interface=dbus.PROPERTIES_IFACE)
1155    if len(res) < 1:
1156        raise Exception("Scan result not in BSSs property")
1157    iface.FlushBSS(0)
1158    res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
1159                     dbus_interface=dbus.PROPERTIES_IFACE)
1160    if len(res) != 0:
1161        raise Exception("FlushBSS() did not remove scan results from BSSs property")
1162    iface.FlushBSS(1)
1163
1164def test_dbus_scan_rand(dev, apdev):
1165    """D-Bus MACAddressRandomizationMask property Get/Set"""
1166    try:
1167        run_dbus_scan_rand(dev, apdev)
1168    finally:
1169        dev[0].request("MAC_RAND_SCAN all enable=0")
1170
1171def run_dbus_scan_rand(dev, apdev):
1172    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1173    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1174
1175    res = if_obj.Get(WPAS_DBUS_IFACE, "MACAddressRandomizationMask",
1176                     dbus_interface=dbus.PROPERTIES_IFACE)
1177    if len(res) != 0:
1178        logger.info(str(res))
1179        raise Exception("Unexpected initial MACAddressRandomizationMask value")
1180
1181    try:
1182        if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask", "foo",
1183                   dbus_interface=dbus.PROPERTIES_IFACE)
1184        raise Exception("Invalid Set accepted")
1185    except dbus.exceptions.DBusException as e:
1186        if "InvalidArgs: invalid message format" not in str(e):
1187            raise Exception("Unexpected error message: " + str(e))
1188
1189    try:
1190        if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask",
1191                   {"foo": "bar"},
1192                   dbus_interface=dbus.PROPERTIES_IFACE)
1193        raise Exception("Invalid Set accepted")
1194    except dbus.exceptions.DBusException as e:
1195        if "wpas_dbus_setter_mac_address_randomization_mask: mask was not a byte array" not in str(e):
1196            raise Exception("Unexpected error message: " + str(e))
1197
1198    try:
1199        if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask",
1200                   {"foo": dbus.ByteArray(b'123456')},
1201                   dbus_interface=dbus.PROPERTIES_IFACE)
1202        raise Exception("Invalid Set accepted")
1203    except dbus.exceptions.DBusException as e:
1204        if 'wpas_dbus_setter_mac_address_randomization_mask: bad scan type "foo"' not in str(e):
1205            raise Exception("Unexpected error message: " + str(e))
1206
1207    try:
1208        if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask",
1209                   {"scan": dbus.ByteArray(b'12345')},
1210                   dbus_interface=dbus.PROPERTIES_IFACE)
1211        raise Exception("Invalid Set accepted")
1212    except dbus.exceptions.DBusException as e:
1213        if 'wpas_dbus_setter_mac_address_randomization_mask: malformed MAC mask given' not in str(e):
1214            raise Exception("Unexpected error message: " + str(e))
1215
1216    if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask",
1217               {"scan": dbus.ByteArray(b'123456')},
1218               dbus_interface=dbus.PROPERTIES_IFACE)
1219    res = if_obj.Get(WPAS_DBUS_IFACE, "MACAddressRandomizationMask",
1220                     dbus_interface=dbus.PROPERTIES_IFACE)
1221    if len(res) != 1:
1222        logger.info(str(res))
1223        raise Exception("Unexpected MACAddressRandomizationMask value")
1224
1225    try:
1226        if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask",
1227                   {"scan": dbus.ByteArray(b'123456'),
1228                    "sched_scan": dbus.ByteArray(b'987654')},
1229                   dbus_interface=dbus.PROPERTIES_IFACE)
1230    except dbus.exceptions.DBusException as e:
1231        # sched_scan is unlikely to be supported
1232        pass
1233
1234    if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask",
1235               dbus.Dictionary({}, signature='sv'),
1236               dbus_interface=dbus.PROPERTIES_IFACE)
1237    res = if_obj.Get(WPAS_DBUS_IFACE, "MACAddressRandomizationMask",
1238                     dbus_interface=dbus.PROPERTIES_IFACE)
1239    if len(res) != 0:
1240        logger.info(str(res))
1241        raise Exception("Unexpected MACAddressRandomizationMask value")
1242
1243def test_dbus_scan_busy(dev, apdev):
1244    """D-Bus scan trigger rejection when busy with previous scan"""
1245    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1246    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1247
1248    if "OK" not in dev[0].request("SCAN freq=2412-2462"):
1249        raise Exception("Failed to start scan")
1250    ev = dev[0].wait_event(["CTRL-EVENT-SCAN-STARTED"], 15)
1251    if ev is None:
1252        raise Exception("Scan start timed out")
1253
1254    try:
1255        iface.Scan({'Type': 'active', 'AllowRoam': False})
1256        raise Exception("Scan() accepted when busy")
1257    except dbus.exceptions.DBusException as e:
1258        if "ScanError: Scan request reject" not in str(e):
1259            raise Exception("Unexpected error message: " + str(e))
1260
1261    ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
1262    if ev is None:
1263        raise Exception("Scan timed out")
1264
1265def test_dbus_scan_abort(dev, apdev):
1266    """D-Bus scan trigger and abort"""
1267    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1268    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1269
1270    iface.Scan({'Type': 'active', 'AllowRoam': False})
1271    ev = dev[0].wait_event(["CTRL-EVENT-SCAN-STARTED"], 15)
1272    if ev is None:
1273        raise Exception("Scan start timed out")
1274
1275    iface.AbortScan()
1276    ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
1277    if ev is None:
1278        raise Exception("Scan abort result timed out")
1279    dev[0].dump_monitor()
1280    iface.Scan({'Type': 'active', 'AllowRoam': False})
1281    iface.AbortScan()
1282
1283    ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
1284    if ev is None:
1285        raise Exception("Scan timed out")
1286
1287def test_dbus_connect(dev, apdev):
1288    """D-Bus AddNetwork and connect"""
1289    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1290    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1291
1292    ssid = "test-wpa2-psk"
1293    passphrase = 'qwertyuiop'
1294    params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
1295    hapd = hostapd.add_ap(apdev[0], params)
1296
1297    class TestDbusConnect(TestDbus):
1298        def __init__(self, bus):
1299            TestDbus.__init__(self, bus)
1300            self.network_added = False
1301            self.network_selected = False
1302            self.network_removed = False
1303            self.state = 0
1304
1305        def __enter__(self):
1306            gobject.timeout_add(1, self.run_connect)
1307            gobject.timeout_add(15000, self.timeout)
1308            self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
1309            self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE,
1310                            "NetworkRemoved")
1311            self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
1312                            "NetworkSelected")
1313            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1314                            "PropertiesChanged")
1315            self.loop.run()
1316            return self
1317
1318        def networkAdded(self, network, properties):
1319            logger.debug("networkAdded: %s" % str(network))
1320            logger.debug(str(properties))
1321            self.network_added = True
1322
1323        def networkRemoved(self, network):
1324            logger.debug("networkRemoved: %s" % str(network))
1325            self.network_removed = True
1326
1327        def networkSelected(self, network):
1328            logger.debug("networkSelected: %s" % str(network))
1329            self.network_selected = True
1330
1331        def propertiesChanged(self, properties):
1332            logger.debug("propertiesChanged: %s" % str(properties))
1333            if 'State' in properties and properties['State'] == "completed":
1334                if self.state == 0:
1335                    self.state = 1
1336                    iface.Disconnect()
1337                elif self.state == 2:
1338                    self.state = 3
1339                    iface.Disconnect()
1340                elif self.state == 4:
1341                    self.state = 5
1342                    iface.Reattach()
1343                elif self.state == 5:
1344                    self.state = 6
1345                    iface.Disconnect()
1346                elif self.state == 7:
1347                    self.state = 8
1348                    res = iface.SignalPoll()
1349                    logger.debug("SignalPoll: " + str(res))
1350                    if 'frequency' not in res or res['frequency'] != 2412:
1351                        self.state = -1
1352                        logger.info("Unexpected SignalPoll result")
1353                    iface.RemoveNetwork(self.netw)
1354            if 'State' in properties and properties['State'] == "disconnected":
1355                if self.state == 1:
1356                    self.state = 2
1357                    iface.SelectNetwork(self.netw)
1358                elif self.state == 3:
1359                    self.state = 4
1360                    iface.Reassociate()
1361                elif self.state == 6:
1362                    self.state = 7
1363                    iface.Reconnect()
1364                elif self.state == 8:
1365                    self.state = 9
1366                    self.loop.quit()
1367
1368        def run_connect(self, *args):
1369            logger.debug("run_connect")
1370            args = dbus.Dictionary({'ssid': ssid,
1371                                    'key_mgmt': 'WPA-PSK',
1372                                    'psk': passphrase,
1373                                    'scan_freq': 2412},
1374                                   signature='sv')
1375            self.netw = iface.AddNetwork(args)
1376            iface.SelectNetwork(self.netw)
1377            return False
1378
1379        def success(self):
1380            if not self.network_added or \
1381               not self.network_removed or \
1382               not self.network_selected:
1383                return False
1384            return self.state == 9
1385
1386    with TestDbusConnect(bus) as t:
1387        if not t.success():
1388            raise Exception("Expected signals not seen")
1389
1390def test_dbus_remove_connected(dev, apdev):
1391    """D-Bus RemoveAllNetworks while connected"""
1392    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1393    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1394
1395    ssid = "test-open"
1396    hapd = hostapd.add_ap(apdev[0], {"ssid": ssid})
1397
1398    class TestDbusConnect(TestDbus):
1399        def __init__(self, bus):
1400            TestDbus.__init__(self, bus)
1401            self.network_added = False
1402            self.network_selected = False
1403            self.network_removed = False
1404            self.state = 0
1405
1406        def __enter__(self):
1407            gobject.timeout_add(1, self.run_connect)
1408            gobject.timeout_add(15000, self.timeout)
1409            self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
1410            self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE,
1411                            "NetworkRemoved")
1412            self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
1413                            "NetworkSelected")
1414            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1415                            "PropertiesChanged")
1416            self.loop.run()
1417            return self
1418
1419        def networkAdded(self, network, properties):
1420            logger.debug("networkAdded: %s" % str(network))
1421            logger.debug(str(properties))
1422            self.network_added = True
1423
1424        def networkRemoved(self, network):
1425            logger.debug("networkRemoved: %s" % str(network))
1426            self.network_removed = True
1427
1428        def networkSelected(self, network):
1429            logger.debug("networkSelected: %s" % str(network))
1430            self.network_selected = True
1431
1432        def propertiesChanged(self, properties):
1433            logger.debug("propertiesChanged: %s" % str(properties))
1434            if 'State' in properties and properties['State'] == "completed":
1435                if self.state == 0:
1436                    self.state = 1
1437                    iface.Disconnect()
1438                elif self.state == 2:
1439                    self.state = 3
1440                    iface.Disconnect()
1441                elif self.state == 4:
1442                    self.state = 5
1443                    iface.Reattach()
1444                elif self.state == 5:
1445                    self.state = 6
1446                    iface.Disconnect()
1447                elif self.state == 7:
1448                    self.state = 8
1449                    res = iface.SignalPoll()
1450                    logger.debug("SignalPoll: " + str(res))
1451                    if 'frequency' not in res or res['frequency'] != 2412:
1452                        self.state = -1
1453                        logger.info("Unexpected SignalPoll result")
1454                    iface.RemoveAllNetworks()
1455            if 'State' in properties and properties['State'] == "disconnected":
1456                if self.state == 1:
1457                    self.state = 2
1458                    iface.SelectNetwork(self.netw)
1459                elif self.state == 3:
1460                    self.state = 4
1461                    iface.Reassociate()
1462                elif self.state == 6:
1463                    self.state = 7
1464                    iface.Reconnect()
1465                elif self.state == 8:
1466                    self.state = 9
1467                    self.loop.quit()
1468
1469        def run_connect(self, *args):
1470            logger.debug("run_connect")
1471            args = dbus.Dictionary({'ssid': ssid,
1472                                    'key_mgmt': 'NONE',
1473                                    'scan_freq': 2412},
1474                                   signature='sv')
1475            self.netw = iface.AddNetwork(args)
1476            iface.SelectNetwork(self.netw)
1477            return False
1478
1479        def success(self):
1480            if not self.network_added or \
1481               not self.network_removed or \
1482               not self.network_selected:
1483                return False
1484            return self.state == 9
1485
1486    with TestDbusConnect(bus) as t:
1487        if not t.success():
1488            raise Exception("Expected signals not seen")
1489
1490def test_dbus_connect_psk_mem(dev, apdev):
1491    """D-Bus AddNetwork and connect with memory-only PSK"""
1492    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1493    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1494
1495    ssid = "test-wpa2-psk"
1496    passphrase = 'qwertyuiop'
1497    params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
1498    hapd = hostapd.add_ap(apdev[0], params)
1499
1500    class TestDbusConnect(TestDbus):
1501        def __init__(self, bus):
1502            TestDbus.__init__(self, bus)
1503            self.connected = False
1504
1505        def __enter__(self):
1506            gobject.timeout_add(1, self.run_connect)
1507            gobject.timeout_add(15000, self.timeout)
1508            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1509                            "PropertiesChanged")
1510            self.add_signal(self.networkRequest, WPAS_DBUS_IFACE,
1511                            "NetworkRequest")
1512            self.loop.run()
1513            return self
1514
1515        def propertiesChanged(self, properties):
1516            logger.debug("propertiesChanged: %s" % str(properties))
1517            if 'State' in properties and properties['State'] == "completed":
1518                self.connected = True
1519                self.loop.quit()
1520
1521        def networkRequest(self, path, field, txt):
1522            logger.debug("networkRequest: %s %s %s" % (path, field, txt))
1523            if field == "PSK_PASSPHRASE":
1524                iface.NetworkReply(path, field, '"' + passphrase + '"')
1525
1526        def run_connect(self, *args):
1527            logger.debug("run_connect")
1528            args = dbus.Dictionary({'ssid': ssid,
1529                                    'key_mgmt': 'WPA-PSK',
1530                                    'mem_only_psk': 1,
1531                                    'scan_freq': 2412},
1532                                   signature='sv')
1533            self.netw = iface.AddNetwork(args)
1534            iface.SelectNetwork(self.netw)
1535            return False
1536
1537        def success(self):
1538            return self.connected
1539
1540    with TestDbusConnect(bus) as t:
1541        if not t.success():
1542            raise Exception("Expected signals not seen")
1543
1544def test_dbus_connect_oom(dev, apdev):
1545    """D-Bus AddNetwork and connect when out-of-memory"""
1546    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1547    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1548
1549    if "OK" not in dev[0].request("TEST_ALLOC_FAIL 0:"):
1550        raise HwsimSkip("TEST_ALLOC_FAIL not supported in the build")
1551
1552    ssid = "test-wpa2-psk"
1553    passphrase = 'qwertyuiop'
1554    params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
1555    hapd = hostapd.add_ap(apdev[0], params)
1556
1557    class TestDbusConnect(TestDbus):
1558        def __init__(self, bus):
1559            TestDbus.__init__(self, bus)
1560            self.network_added = False
1561            self.network_selected = False
1562            self.network_removed = False
1563            self.state = 0
1564
1565        def __enter__(self):
1566            gobject.timeout_add(1, self.run_connect)
1567            gobject.timeout_add(1500, self.timeout)
1568            self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
1569            self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE,
1570                            "NetworkRemoved")
1571            self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
1572                            "NetworkSelected")
1573            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1574                            "PropertiesChanged")
1575            self.loop.run()
1576            return self
1577
1578        def networkAdded(self, network, properties):
1579            logger.debug("networkAdded: %s" % str(network))
1580            logger.debug(str(properties))
1581            self.network_added = True
1582
1583        def networkRemoved(self, network):
1584            logger.debug("networkRemoved: %s" % str(network))
1585            self.network_removed = True
1586
1587        def networkSelected(self, network):
1588            logger.debug("networkSelected: %s" % str(network))
1589            self.network_selected = True
1590
1591        def propertiesChanged(self, properties):
1592            logger.debug("propertiesChanged: %s" % str(properties))
1593            if 'State' in properties and properties['State'] == "completed":
1594                if self.state == 0:
1595                    self.state = 1
1596                    iface.Disconnect()
1597                elif self.state == 2:
1598                    self.state = 3
1599                    iface.Disconnect()
1600                elif self.state == 4:
1601                    self.state = 5
1602                    iface.Reattach()
1603                elif self.state == 5:
1604                    self.state = 6
1605                    res = iface.SignalPoll()
1606                    logger.debug("SignalPoll: " + str(res))
1607                    if 'frequency' not in res or res['frequency'] != 2412:
1608                        self.state = -1
1609                        logger.info("Unexpected SignalPoll result")
1610                    iface.RemoveNetwork(self.netw)
1611            if 'State' in properties and properties['State'] == "disconnected":
1612                if self.state == 1:
1613                    self.state = 2
1614                    iface.SelectNetwork(self.netw)
1615                elif self.state == 3:
1616                    self.state = 4
1617                    iface.Reassociate()
1618                elif self.state == 6:
1619                    self.state = 7
1620                    self.loop.quit()
1621
1622        def run_connect(self, *args):
1623            logger.debug("run_connect")
1624            args = dbus.Dictionary({'ssid': ssid,
1625                                    'key_mgmt': 'WPA-PSK',
1626                                    'psk': passphrase,
1627                                    'scan_freq': 2412},
1628                                   signature='sv')
1629            try:
1630                self.netw = iface.AddNetwork(args)
1631            except Exception as e:
1632                logger.info("Exception on AddNetwork: " + str(e))
1633                self.loop.quit()
1634                return False
1635            try:
1636                iface.SelectNetwork(self.netw)
1637            except Exception as e:
1638                logger.info("Exception on SelectNetwork: " + str(e))
1639                self.loop.quit()
1640
1641            return False
1642
1643        def success(self):
1644            if not self.network_added or \
1645               not self.network_removed or \
1646               not self.network_selected:
1647                return False
1648            return self.state == 7
1649
1650    count = 0
1651    for i in range(1, 1000):
1652        for j in range(3):
1653            dev[j].dump_monitor()
1654        dev[0].request("TEST_ALLOC_FAIL %d:main" % i)
1655        try:
1656            with TestDbusConnect(bus) as t:
1657                if not t.success():
1658                    logger.info("Iteration %d - Expected signals not seen" % i)
1659                else:
1660                    logger.info("Iteration %d - success" % i)
1661
1662            state = dev[0].request('GET_ALLOC_FAIL')
1663            logger.info("GET_ALLOC_FAIL: " + state)
1664            dev[0].dump_monitor()
1665            dev[0].request("TEST_ALLOC_FAIL 0:")
1666            if i < 3:
1667                raise Exception("Connection succeeded during out-of-memory")
1668            if not state.startswith('0:'):
1669                count += 1
1670                if count == 5:
1671                    break
1672        except:
1673            pass
1674
1675    # Force regulatory update to re-fetch hw capabilities for the following
1676    # test cases.
1677    try:
1678        dev[0].dump_monitor()
1679        subprocess.call(['iw', 'reg', 'set', 'US'])
1680        ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1681    finally:
1682        dev[0].dump_monitor()
1683        subprocess.call(['iw', 'reg', 'set', '00'])
1684        ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1685
1686def test_dbus_while_not_connected(dev, apdev):
1687    """D-Bus invalid operations while not connected"""
1688    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1689    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1690
1691    try:
1692        iface.Disconnect()
1693        raise Exception("Disconnect() accepted when not connected")
1694    except dbus.exceptions.DBusException as e:
1695        if "NotConnected" not in str(e):
1696            raise Exception("Unexpected error message for invalid Disconnect: " + str(e))
1697
1698    try:
1699        iface.Reattach()
1700        raise Exception("Reattach() accepted when not connected")
1701    except dbus.exceptions.DBusException as e:
1702        if "NotConnected" not in str(e):
1703            raise Exception("Unexpected error message for invalid Reattach: " + str(e))
1704
1705def test_dbus_connect_eap(dev, apdev):
1706    """D-Bus AddNetwork and connect to EAP network"""
1707    check_altsubject_match_support(dev[0])
1708    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1709    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1710
1711    ssid = "ieee8021x-open"
1712    params = hostapd.radius_params()
1713    params["ssid"] = ssid
1714    params["ieee8021x"] = "1"
1715    hapd = hostapd.add_ap(apdev[0], params)
1716
1717    class TestDbusConnect(TestDbus):
1718        def __init__(self, bus):
1719            TestDbus.__init__(self, bus)
1720            self.certification_received = False
1721            self.eap_status = False
1722            self.state = 0
1723
1724        def __enter__(self):
1725            gobject.timeout_add(1, self.run_connect)
1726            gobject.timeout_add(15000, self.timeout)
1727            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1728                            "PropertiesChanged")
1729            self.add_signal(self.certification, WPAS_DBUS_IFACE,
1730                            "Certification", byte_arrays=True)
1731            self.add_signal(self.networkRequest, WPAS_DBUS_IFACE,
1732                            "NetworkRequest")
1733            self.add_signal(self.eap, WPAS_DBUS_IFACE, "EAP")
1734            self.loop.run()
1735            return self
1736
1737        def propertiesChanged(self, properties):
1738            logger.debug("propertiesChanged: %s" % str(properties))
1739            if 'State' in properties and properties['State'] == "completed":
1740                if self.state == 0:
1741                    self.state = 1
1742                    iface.EAPLogoff()
1743                    logger.info("Set dNSName constraint")
1744                    net_obj = bus.get_object(WPAS_DBUS_SERVICE, self.netw)
1745                    args = dbus.Dictionary({'altsubject_match':
1746                                            self.server_dnsname},
1747                                           signature='sv')
1748                    net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1749                                dbus_interface=dbus.PROPERTIES_IFACE)
1750                elif self.state == 2:
1751                    self.state = 3
1752                    iface.Disconnect()
1753                    logger.info("Set non-matching dNSName constraint")
1754                    net_obj = bus.get_object(WPAS_DBUS_SERVICE, self.netw)
1755                    args = dbus.Dictionary({'altsubject_match':
1756                                            self.server_dnsname + "FOO"},
1757                                           signature='sv')
1758                    net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1759                                dbus_interface=dbus.PROPERTIES_IFACE)
1760            if 'State' in properties and properties['State'] == "disconnected":
1761                if self.state == 1:
1762                    self.state = 2
1763                    iface.EAPLogon()
1764                    iface.SelectNetwork(self.netw)
1765                if self.state == 3:
1766                    self.state = 4
1767                    iface.SelectNetwork(self.netw)
1768
1769        def certification(self, args):
1770            logger.debug("certification: %s" % str(args))
1771            self.certification_received = True
1772            if args['depth'] == 0:
1773                # The test server certificate is supposed to have dNSName
1774                if len(args['altsubject']) < 1:
1775                    raise Exception("Missing dNSName")
1776                dnsname = args['altsubject'][0]
1777                if not dnsname.startswith("DNS:"):
1778                    raise Exception("Expected dNSName not found: " + dnsname)
1779                logger.info("altsubject: " + dnsname)
1780                self.server_dnsname = dnsname
1781
1782        def eap(self, status, parameter):
1783            logger.debug("EAP: status=%s parameter=%s" % (status, parameter))
1784            if status == 'completion' and parameter == 'success':
1785                self.eap_status = True
1786            if self.state == 4 and status == 'remote certificate verification' and parameter == 'AltSubject mismatch':
1787                self.state = 5
1788                self.loop.quit()
1789
1790        def networkRequest(self, path, field, txt):
1791            logger.debug("networkRequest: %s %s %s" % (path, field, txt))
1792            if field == "PASSWORD":
1793                iface.NetworkReply(path, field, "password")
1794
1795        def run_connect(self, *args):
1796            logger.debug("run_connect")
1797            args = dbus.Dictionary({'ssid': ssid,
1798                                    'key_mgmt': 'IEEE8021X',
1799                                    'eapol_flags': 0,
1800                                    'eap': 'TTLS',
1801                                    'anonymous_identity': 'ttls',
1802                                    'identity': 'pap user',
1803                                    'ca_cert': 'auth_serv/ca.pem',
1804                                    'phase2': 'auth=PAP',
1805                                    'scan_freq': 2412},
1806                                   signature='sv')
1807            self.netw = iface.AddNetwork(args)
1808            iface.SelectNetwork(self.netw)
1809            return False
1810
1811        def success(self):
1812            if not self.eap_status or not self.certification_received:
1813                return False
1814            return self.state == 5
1815
1816    with TestDbusConnect(bus) as t:
1817        if not t.success():
1818            raise Exception("Expected signals not seen")
1819
1820def test_dbus_network(dev, apdev):
1821    """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
1822    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1823    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1824
1825    args = dbus.Dictionary({'ssid': "foo",
1826                            'key_mgmt': 'WPA-PSK',
1827                            'psk': "12345678",
1828                            'identity': dbus.ByteArray([1, 2]),
1829                            'priority': dbus.Int32(0),
1830                            'scan_freq': dbus.UInt32(2412)},
1831                           signature='sv')
1832    netw = iface.AddNetwork(args)
1833    id = int(dev[0].list_networks()[0]['id'])
1834    val = dev[0].get_network(id, "scan_freq")
1835    if val != "2412":
1836        raise Exception("Invalid scan_freq value: " + str(val))
1837    iface.RemoveNetwork(netw)
1838
1839    args = dbus.Dictionary({'ssid': "foo",
1840                            'key_mgmt': 'NONE',
1841                            'scan_freq': "2412 2432",
1842                            'freq_list': "2412 2417 2432"},
1843                           signature='sv')
1844    netw = iface.AddNetwork(args)
1845    id = int(dev[0].list_networks()[0]['id'])
1846    val = dev[0].get_network(id, "scan_freq")
1847    if val != "2412 2432":
1848        raise Exception("Invalid scan_freq value (2): " + str(val))
1849    val = dev[0].get_network(id, "freq_list")
1850    if val != "2412 2417 2432":
1851        raise Exception("Invalid freq_list value: " + str(val))
1852    iface.RemoveNetwork(netw)
1853    try:
1854        iface.RemoveNetwork(netw)
1855        raise Exception("Invalid RemoveNetwork() accepted")
1856    except dbus.exceptions.DBusException as e:
1857        if "NetworkUnknown" not in str(e):
1858            raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e))
1859    try:
1860        iface.SelectNetwork(netw)
1861        raise Exception("Invalid SelectNetwork() accepted")
1862    except dbus.exceptions.DBusException as e:
1863        if "NetworkUnknown" not in str(e):
1864            raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e))
1865
1866    args = dbus.Dictionary({'ssid': "foo1", 'key_mgmt': 'NONE',
1867                            'identity': "testuser", 'scan_freq': '2412'},
1868                           signature='sv')
1869    netw1 = iface.AddNetwork(args)
1870    args = dbus.Dictionary({'ssid': "foo2", 'key_mgmt': 'NONE'},
1871                           signature='sv')
1872    netw2 = iface.AddNetwork(args)
1873    res = if_obj.Get(WPAS_DBUS_IFACE, "Networks",
1874                     dbus_interface=dbus.PROPERTIES_IFACE)
1875    if len(res) != 2:
1876        raise Exception("Unexpected number of networks")
1877
1878    net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1)
1879    res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1880                      dbus_interface=dbus.PROPERTIES_IFACE)
1881    if res != False:
1882        raise Exception("Added network was unexpectedly enabled by default")
1883    net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(True),
1884                dbus_interface=dbus.PROPERTIES_IFACE)
1885    res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1886                      dbus_interface=dbus.PROPERTIES_IFACE)
1887    if res != True:
1888        raise Exception("Set(Enabled,True) did not seem to change property value")
1889    net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(False),
1890                dbus_interface=dbus.PROPERTIES_IFACE)
1891    res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1892                      dbus_interface=dbus.PROPERTIES_IFACE)
1893    if res != False:
1894        raise Exception("Set(Enabled,False) did not seem to change property value")
1895    try:
1896        net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.UInt32(1),
1897                    dbus_interface=dbus.PROPERTIES_IFACE)
1898        raise Exception("Invalid Set(Enabled,1) accepted")
1899    except dbus.exceptions.DBusException as e:
1900        if "Error.Failed: wrong property type" not in str(e):
1901            raise Exception("Unexpected error message for invalid Set(Enabled,1): " + str(e))
1902
1903    args = dbus.Dictionary({'ssid': "foo1new"}, signature='sv')
1904    net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1905                dbus_interface=dbus.PROPERTIES_IFACE)
1906    res = net_obj.Get(WPAS_DBUS_NETWORK, "Properties",
1907                      dbus_interface=dbus.PROPERTIES_IFACE)
1908    if res['ssid'] != '"foo1new"':
1909        raise Exception("Set(Properties) failed to update ssid")
1910    if res['identity'] != '"testuser"':
1911        raise Exception("Set(Properties) unexpectedly changed unrelated parameter")
1912
1913    iface.RemoveAllNetworks()
1914    res = if_obj.Get(WPAS_DBUS_IFACE, "Networks",
1915                     dbus_interface=dbus.PROPERTIES_IFACE)
1916    if len(res) != 0:
1917        raise Exception("Unexpected number of networks")
1918    iface.RemoveAllNetworks()
1919
1920    tests = [dbus.Dictionary({'psk': "1234567"}, signature='sv'),
1921             dbus.Dictionary({'identity': dbus.ByteArray()},
1922                             signature='sv'),
1923             dbus.Dictionary({'identity': dbus.Byte(1)}, signature='sv')]
1924    for args in tests:
1925        try:
1926            iface.AddNetwork(args)
1927            raise Exception("Invalid AddNetwork args accepted: " + str(args))
1928        except dbus.exceptions.DBusException as e:
1929            if "InvalidArgs" not in str(e):
1930                raise Exception("Unexpected error message for invalid AddNetwork: " + str(e))
1931
1932def test_dbus_network_oom(dev, apdev):
1933    """D-Bus AddNetwork/RemoveNetwork parameters and OOM error cases"""
1934    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1935    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1936
1937    args = dbus.Dictionary({'ssid': "foo1", 'key_mgmt': 'NONE',
1938                            'identity': "testuser", 'scan_freq': '2412'},
1939                           signature='sv')
1940    netw1 = iface.AddNetwork(args)
1941    net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1)
1942
1943    with alloc_fail_dbus(dev[0], 1,
1944                         "wpa_config_get_all;wpas_dbus_getter_network_properties",
1945                         "Get"):
1946        net_obj.Get(WPAS_DBUS_NETWORK, "Properties",
1947                    dbus_interface=dbus.PROPERTIES_IFACE)
1948
1949    iface.RemoveAllNetworks()
1950
1951    with alloc_fail_dbus(dev[0], 1,
1952                         "wpas_dbus_new_decompose_object_path;wpas_dbus_handler_remove_network",
1953                         "RemoveNetwork", "InvalidArgs"):
1954        iface.RemoveNetwork(dbus.ObjectPath("/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234"))
1955
1956    with alloc_fail(dev[0], 1, "wpa_dbus_register_object_per_iface;wpas_dbus_register_network"):
1957        args = dbus.Dictionary({'ssid': "foo2", 'key_mgmt': 'NONE'},
1958                               signature='sv')
1959        try:
1960            netw = iface.AddNetwork(args)
1961            # Currently, AddNetwork() succeeds even if os_strdup() for path
1962            # fails, so remove the network if that occurs.
1963            iface.RemoveNetwork(netw)
1964        except dbus.exceptions.DBusException as e:
1965            pass
1966
1967    for i in range(1, 3):
1968        with alloc_fail(dev[0], i, "=wpas_dbus_register_network"):
1969            try:
1970                netw = iface.AddNetwork(args)
1971                # Currently, AddNetwork() succeeds even if network registration
1972                # fails, so remove the network if that occurs.
1973                iface.RemoveNetwork(netw)
1974            except dbus.exceptions.DBusException as e:
1975                pass
1976
1977    with alloc_fail_dbus(dev[0], 1,
1978                         "=wpa_config_add_network;wpas_dbus_handler_add_network",
1979                         "AddNetwork",
1980                         "UnknownError: wpa_supplicant could not add a network"):
1981        args = dbus.Dictionary({'ssid': "foo2", 'key_mgmt': 'NONE'},
1982                               signature='sv')
1983        netw = iface.AddNetwork(args)
1984
1985    tests = [(1,
1986              'wpa_dbus_dict_get_entry;set_network_properties;wpas_dbus_handler_add_network',
1987              dbus.Dictionary({'ssid': dbus.ByteArray(b' ')},
1988                              signature='sv')),
1989             (1, '=set_network_properties;wpas_dbus_handler_add_network',
1990              dbus.Dictionary({'ssid': 'foo'}, signature='sv')),
1991             (1, '=set_network_properties;wpas_dbus_handler_add_network',
1992              dbus.Dictionary({'eap': 'foo'}, signature='sv')),
1993             (1, '=set_network_properties;wpas_dbus_handler_add_network',
1994              dbus.Dictionary({'priority': dbus.UInt32(1)},
1995                              signature='sv')),
1996             (1, '=set_network_properties;wpas_dbus_handler_add_network',
1997              dbus.Dictionary({'priority': dbus.Int32(1)},
1998                              signature='sv')),
1999             (1, '=set_network_properties;wpas_dbus_handler_add_network',
2000              dbus.Dictionary({'ssid': dbus.ByteArray(b' ')},
2001                              signature='sv'))]
2002    for (count, funcs, args) in tests:
2003        with alloc_fail_dbus(dev[0], count, funcs, "AddNetwork", "InvalidArgs"):
2004            netw = iface.AddNetwork(args)
2005
2006    if len(if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
2007                      dbus_interface=dbus.PROPERTIES_IFACE)) > 0:
2008        raise Exception("Unexpected network block added")
2009    if len(dev[0].list_networks()) > 0:
2010        raise Exception("Unexpected network block visible")
2011
2012def test_dbus_interface(dev, apdev):
2013    """D-Bus CreateInterface/GetInterface/RemoveInterface parameters and error cases"""
2014    try:
2015        _test_dbus_interface(dev, apdev)
2016    finally:
2017        # Need to force P2P channel list update since the 'lo' interface
2018        # with driver=none ends up configuring default dualband channels.
2019        dev[0].request("SET country US")
2020        ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
2021        if ev is None:
2022            ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"],
2023                                          timeout=1)
2024        dev[0].request("SET country 00")
2025        ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
2026        if ev is None:
2027            ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"],
2028                                          timeout=1)
2029        subprocess.call(['iw', 'reg', 'set', '00'])
2030
2031def _test_dbus_interface(dev, apdev):
2032    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2033    wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
2034
2035    params = dbus.Dictionary({'Ifname': 'lo', 'Driver': 'none', 'Type': 'sta',
2036                              'Address': '02:03:11:22:33:44'},
2037                             signature='sv')
2038    path = wpas.CreateInterface(params)
2039    logger.debug("New interface path: " + str(path))
2040    path2 = wpas.GetInterface("lo")
2041    if path != path2:
2042        raise Exception("Interface object mismatch")
2043
2044    params = dbus.Dictionary({'Ifname': 'lo',
2045                              'Driver': 'none',
2046                              'ConfigFile': 'foo',
2047                              'BridgeIfname': 'foo',},
2048                             signature='sv')
2049    try:
2050        wpas.CreateInterface(params)
2051        raise Exception("Invalid CreateInterface() accepted")
2052    except dbus.exceptions.DBusException as e:
2053        if "InterfaceExists" not in str(e):
2054            raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
2055
2056    wpas.RemoveInterface(path)
2057    try:
2058        wpas.RemoveInterface(path)
2059        raise Exception("Invalid RemoveInterface() accepted")
2060    except dbus.exceptions.DBusException as e:
2061        if "InterfaceUnknown" not in str(e):
2062            raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e))
2063
2064    params = dbus.Dictionary({'Ifname': 'lo', 'Driver': 'none',
2065                              'Foo': 123},
2066                             signature='sv')
2067    try:
2068        wpas.CreateInterface(params)
2069        raise Exception("Invalid CreateInterface() accepted")
2070    except dbus.exceptions.DBusException as e:
2071        if "InvalidArgs" not in str(e):
2072            raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
2073
2074    params = dbus.Dictionary({'Driver': 'none'}, signature='sv')
2075    try:
2076        wpas.CreateInterface(params)
2077        raise Exception("Invalid CreateInterface() accepted")
2078    except dbus.exceptions.DBusException as e:
2079        if "InvalidArgs" not in str(e):
2080            raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
2081
2082    try:
2083        wpas.GetInterface("lo")
2084        raise Exception("Invalid GetInterface() accepted")
2085    except dbus.exceptions.DBusException as e:
2086        if "InterfaceUnknown" not in str(e):
2087            raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e))
2088
2089    params = dbus.Dictionary({'Ifname': 'lo', 'Driver': 'none',
2090                              'Type': 'foo'}, signature='sv')
2091    try:
2092        wpas.CreateInterface(params)
2093        raise Exception("Invalid CreateInterface() accepted")
2094    except dbus.exceptions.DBusException as e:
2095        if "InvalidArgs" not in str(e):
2096            raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
2097
2098    try:
2099        wpas.GetInterface("lo")
2100        raise Exception("Invalid GetInterface() accepted")
2101    except dbus.exceptions.DBusException as e:
2102        if "InterfaceUnknown" not in str(e):
2103            raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e))
2104
2105    params = dbus.Dictionary({'Ifname': 'lo', 'Driver': 'none',
2106                              'Address': 'foo'}, signature='sv')
2107    try:
2108        wpas.CreateInterface(params)
2109        raise Exception("Invalid CreateInterface() accepted")
2110    except dbus.exceptions.DBusException as e:
2111        if "InvalidArgs" not in str(e):
2112            raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
2113
2114    try:
2115        wpas.GetInterface("lo")
2116        raise Exception("Invalid GetInterface() accepted")
2117    except dbus.exceptions.DBusException as e:
2118        if "InterfaceUnknown" not in str(e):
2119            raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e))
2120
2121def test_dbus_interface_oom(dev, apdev):
2122    """D-Bus CreateInterface/GetInterface/RemoveInterface OOM error cases"""
2123    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2124    wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
2125
2126    with alloc_fail_dbus(dev[0], 1, "wpa_dbus_dict_get_entry;wpas_dbus_handler_create_interface", "CreateInterface", "InvalidArgs"):
2127        params = dbus.Dictionary({'Ifname': 'lo', 'Driver': 'none'},
2128                                 signature='sv')
2129        wpas.CreateInterface(params)
2130
2131    for i in range(1, 1000):
2132        dev[0].request("TEST_ALLOC_FAIL %d:wpa_supplicant_add_iface;wpas_dbus_handler_create_interface" % i)
2133        params = dbus.Dictionary({'Ifname': 'lo', 'Driver': 'none'},
2134                                 signature='sv')
2135        try:
2136            npath = wpas.CreateInterface(params)
2137            wpas.RemoveInterface(npath)
2138            logger.info("CreateInterface succeeds after %d allocation failures" % i)
2139            state = dev[0].request('GET_ALLOC_FAIL')
2140            logger.info("GET_ALLOC_FAIL: " + state)
2141            dev[0].dump_monitor()
2142            dev[0].request("TEST_ALLOC_FAIL 0:")
2143            if i < 5:
2144                raise Exception("CreateInterface succeeded during out-of-memory")
2145            if not state.startswith('0:'):
2146                break
2147        except dbus.exceptions.DBusException as e:
2148            pass
2149
2150    for arg in ['Driver', 'Ifname', 'ConfigFile', 'BridgeIfname']:
2151        with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_handler_create_interface",
2152                             "CreateInterface"):
2153            params = dbus.Dictionary({arg: 'foo'}, signature='sv')
2154            wpas.CreateInterface(params)
2155
2156def test_dbus_blob(dev, apdev):
2157    """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
2158    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2159    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2160
2161    blob = dbus.ByteArray(b"\x01\x02\x03")
2162    iface.AddBlob('blob1', blob)
2163    try:
2164        iface.AddBlob('blob1', dbus.ByteArray(b"\x01\x02\x04"))
2165        raise Exception("Invalid AddBlob() accepted")
2166    except dbus.exceptions.DBusException as e:
2167        if "BlobExists" not in str(e):
2168            raise Exception("Unexpected error message for invalid AddBlob: " + str(e))
2169    res = iface.GetBlob('blob1')
2170    if len(res) != len(blob):
2171        raise Exception("Unexpected blob data length")
2172    for i in range(len(res)):
2173        if res[i] != dbus.Byte(blob[i]):
2174            raise Exception("Unexpected blob data")
2175    res = if_obj.Get(WPAS_DBUS_IFACE, "Blobs",
2176                     dbus_interface=dbus.PROPERTIES_IFACE)
2177    if 'blob1' not in res:
2178        raise Exception("Added blob missing from Blobs property")
2179    iface.RemoveBlob('blob1')
2180    try:
2181        iface.RemoveBlob('blob1')
2182        raise Exception("Invalid RemoveBlob() accepted")
2183    except dbus.exceptions.DBusException as e:
2184        if "BlobUnknown" not in str(e):
2185            raise Exception("Unexpected error message for invalid RemoveBlob: " + str(e))
2186    try:
2187        iface.GetBlob('blob1')
2188        raise Exception("Invalid GetBlob() accepted")
2189    except dbus.exceptions.DBusException as e:
2190        if "BlobUnknown" not in str(e):
2191            raise Exception("Unexpected error message for invalid GetBlob: " + str(e))
2192
2193    class TestDbusBlob(TestDbus):
2194        def __init__(self, bus):
2195            TestDbus.__init__(self, bus)
2196            self.blob_added = False
2197            self.blob_removed = False
2198
2199        def __enter__(self):
2200            gobject.timeout_add(1, self.run_blob)
2201            gobject.timeout_add(15000, self.timeout)
2202            self.add_signal(self.blobAdded, WPAS_DBUS_IFACE, "BlobAdded")
2203            self.add_signal(self.blobRemoved, WPAS_DBUS_IFACE, "BlobRemoved")
2204            self.loop.run()
2205            return self
2206
2207        def blobAdded(self, blobName):
2208            logger.debug("blobAdded: %s" % blobName)
2209            if blobName == 'blob2':
2210                self.blob_added = True
2211
2212        def blobRemoved(self, blobName):
2213            logger.debug("blobRemoved: %s" % blobName)
2214            if blobName == 'blob2':
2215                self.blob_removed = True
2216                self.loop.quit()
2217
2218        def run_blob(self, *args):
2219            logger.debug("run_blob")
2220            iface.AddBlob('blob2', dbus.ByteArray(b"\x01\x02\x04"))
2221            iface.RemoveBlob('blob2')
2222            return False
2223
2224        def success(self):
2225            return self.blob_added and self.blob_removed
2226
2227    with TestDbusBlob(bus) as t:
2228        if not t.success():
2229            raise Exception("Expected signals not seen")
2230
2231def test_dbus_blob_oom(dev, apdev):
2232    """D-Bus AddNetwork/RemoveNetwork OOM error cases"""
2233    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2234    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2235
2236    for i in range(1, 4):
2237        with alloc_fail_dbus(dev[0], i, "wpas_dbus_handler_add_blob",
2238                             "AddBlob"):
2239            iface.AddBlob('blob_no_mem', dbus.ByteArray(b"\x01\x02\x03\x04"))
2240
2241def test_dbus_autoscan(dev, apdev):
2242    """D-Bus Autoscan()"""
2243    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2244    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2245
2246    iface.AutoScan("foo")
2247    iface.AutoScan("periodic:1")
2248    iface.AutoScan("")
2249    dev[0].request("AUTOSCAN ")
2250
2251def test_dbus_autoscan_oom(dev, apdev):
2252    """D-Bus Autoscan() OOM"""
2253    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2254    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2255
2256    with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_autoscan", "AutoScan"):
2257        iface.AutoScan("foo")
2258    dev[0].request("AUTOSCAN ")
2259
2260def test_dbus_tdls_invalid(dev, apdev):
2261    """D-Bus invalid TDLS operations"""
2262    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2263    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2264
2265    hapd = hostapd.add_ap(apdev[0], {"ssid": "test-open"})
2266    connect_2sta_open(dev, hapd)
2267    addr1 = dev[1].p2p_interface_addr()
2268
2269    try:
2270        iface.TDLSDiscover("foo")
2271        raise Exception("Invalid TDLSDiscover() accepted")
2272    except dbus.exceptions.DBusException as e:
2273        if "InvalidArgs" not in str(e):
2274            raise Exception("Unexpected error message for invalid TDLSDiscover: " + str(e))
2275
2276    try:
2277        iface.TDLSStatus("foo")
2278        raise Exception("Invalid TDLSStatus() accepted")
2279    except dbus.exceptions.DBusException as e:
2280        if "InvalidArgs" not in str(e):
2281            raise Exception("Unexpected error message for invalid TDLSStatus: " + str(e))
2282
2283    res = iface.TDLSStatus(addr1)
2284    if res != "peer does not exist":
2285        raise Exception("Unexpected TDLSStatus response")
2286
2287    try:
2288        iface.TDLSSetup("foo")
2289        raise Exception("Invalid TDLSSetup() accepted")
2290    except dbus.exceptions.DBusException as e:
2291        if "InvalidArgs" not in str(e):
2292            raise Exception("Unexpected error message for invalid TDLSSetup: " + str(e))
2293
2294    try:
2295        iface.TDLSTeardown("foo")
2296        raise Exception("Invalid TDLSTeardown() accepted")
2297    except dbus.exceptions.DBusException as e:
2298        if "InvalidArgs" not in str(e):
2299            raise Exception("Unexpected error message for invalid TDLSTeardown: " + str(e))
2300
2301    try:
2302        iface.TDLSTeardown("00:11:22:33:44:55")
2303        raise Exception("TDLSTeardown accepted for unknown peer")
2304    except dbus.exceptions.DBusException as e:
2305        if "UnknownError: error performing TDLS teardown" not in str(e):
2306            raise Exception("Unexpected error message: " + str(e))
2307
2308    try:
2309        iface.TDLSChannelSwitch({})
2310        raise Exception("Invalid TDLSChannelSwitch() accepted")
2311    except dbus.exceptions.DBusException as e:
2312        if "InvalidArgs" not in str(e):
2313            raise Exception("Unexpected error message for invalid TDLSChannelSwitch: " + str(e))
2314
2315    try:
2316        iface.TDLSCancelChannelSwitch("foo")
2317        raise Exception("Invalid TDLSCancelChannelSwitch() accepted")
2318    except dbus.exceptions.DBusException as e:
2319        if "InvalidArgs" not in str(e):
2320            raise Exception("Unexpected error message for invalid TDLSCancelChannelSwitch: " + str(e))
2321
2322def test_dbus_tdls_oom(dev, apdev):
2323    """D-Bus TDLS operations during OOM"""
2324    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2325    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2326
2327    with alloc_fail_dbus(dev[0], 1, "wpa_tdls_add_peer", "TDLSSetup",
2328                         "UnknownError: error performing TDLS setup"):
2329        iface.TDLSSetup("00:11:22:33:44:55")
2330
2331def test_dbus_tdls(dev, apdev):
2332    """D-Bus TDLS"""
2333    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2334    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2335
2336    hapd = hostapd.add_ap(apdev[0], {"ssid": "test-open"})
2337    connect_2sta_open(dev, hapd)
2338
2339    addr1 = dev[1].p2p_interface_addr()
2340
2341    class TestDbusTdls(TestDbus):
2342        def __init__(self, bus):
2343            TestDbus.__init__(self, bus)
2344            self.tdls_setup = False
2345            self.tdls_teardown = False
2346
2347        def __enter__(self):
2348            gobject.timeout_add(1, self.run_tdls)
2349            gobject.timeout_add(15000, self.timeout)
2350            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
2351                            "PropertiesChanged")
2352            self.loop.run()
2353            return self
2354
2355        def propertiesChanged(self, properties):
2356            logger.debug("propertiesChanged: %s" % str(properties))
2357
2358        def run_tdls(self, *args):
2359            logger.debug("run_tdls")
2360            iface.TDLSDiscover(addr1)
2361            gobject.timeout_add(100, self.run_tdls2)
2362            return False
2363
2364        def run_tdls2(self, *args):
2365            logger.debug("run_tdls2")
2366            iface.TDLSSetup(addr1)
2367            gobject.timeout_add(500, self.run_tdls3)
2368            return False
2369
2370        def run_tdls3(self, *args):
2371            logger.debug("run_tdls3")
2372            res = iface.TDLSStatus(addr1)
2373            if res == "connected":
2374                self.tdls_setup = True
2375            else:
2376                logger.info("Unexpected TDLSStatus: " + res)
2377            iface.TDLSTeardown(addr1)
2378            gobject.timeout_add(200, self.run_tdls4)
2379            return False
2380
2381        def run_tdls4(self, *args):
2382            logger.debug("run_tdls4")
2383            res = iface.TDLSStatus(addr1)
2384            if res == "peer does not exist":
2385                self.tdls_teardown = True
2386            else:
2387                logger.info("Unexpected TDLSStatus: " + res)
2388            self.loop.quit()
2389            return False
2390
2391        def success(self):
2392            return self.tdls_setup and self.tdls_teardown
2393
2394    with TestDbusTdls(bus) as t:
2395        if not t.success():
2396            raise Exception("Expected signals not seen")
2397
2398def test_dbus_tdls_channel_switch(dev, apdev):
2399    """D-Bus TDLS channel switch configuration"""
2400    flags = int(dev[0].get_driver_status_field('capa.flags'), 16)
2401    if flags & 0x800000000 == 0:
2402        raise HwsimSkip("Driver does not support TDLS channel switching")
2403
2404    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2405    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2406
2407    hapd = hostapd.add_ap(apdev[0], {"ssid": "test-open"})
2408    connect_2sta_open(dev, hapd)
2409
2410    addr1 = dev[1].p2p_interface_addr()
2411
2412    class TestDbusTdls(TestDbus):
2413        def __init__(self, bus):
2414            TestDbus.__init__(self, bus)
2415            self.tdls_setup = False
2416            self.tdls_done = False
2417
2418        def __enter__(self):
2419            gobject.timeout_add(1, self.run_tdls)
2420            gobject.timeout_add(15000, self.timeout)
2421            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
2422                            "PropertiesChanged")
2423            self.loop.run()
2424            return self
2425
2426        def propertiesChanged(self, properties):
2427            logger.debug("propertiesChanged: %s" % str(properties))
2428
2429        def run_tdls(self, *args):
2430            logger.debug("run_tdls")
2431            iface.TDLSDiscover(addr1)
2432            gobject.timeout_add(100, self.run_tdls2)
2433            return False
2434
2435        def run_tdls2(self, *args):
2436            logger.debug("run_tdls2")
2437            iface.TDLSSetup(addr1)
2438            gobject.timeout_add(500, self.run_tdls3)
2439            return False
2440
2441        def run_tdls3(self, *args):
2442            logger.debug("run_tdls3")
2443            res = iface.TDLSStatus(addr1)
2444            if res == "connected":
2445                self.tdls_setup = True
2446            else:
2447                logger.info("Unexpected TDLSStatus: " + res)
2448
2449            # Unknown dict entry
2450            args = dbus.Dictionary({'Foobar': dbus.Byte(1)},
2451                                   signature='sv')
2452            try:
2453                iface.TDLSChannelSwitch(args)
2454            except Exception as e:
2455                if "InvalidArgs" not in str(e):
2456                    raise Exception("Unexpected exception")
2457
2458            # Missing OperClass
2459            args = dbus.Dictionary({}, signature='sv')
2460            try:
2461                iface.TDLSChannelSwitch(args)
2462            except Exception as e:
2463                if "InvalidArgs" not in str(e):
2464                    raise Exception("Unexpected exception")
2465
2466            # Missing Frequency
2467            args = dbus.Dictionary({'OperClass': dbus.Byte(1)},
2468                                   signature='sv')
2469            try:
2470                iface.TDLSChannelSwitch(args)
2471            except Exception as e:
2472                if "InvalidArgs" not in str(e):
2473                    raise Exception("Unexpected exception")
2474
2475            # Missing PeerAddress
2476            args = dbus.Dictionary({'OperClass': dbus.Byte(1),
2477                                     'Frequency': dbus.UInt32(2417)},
2478                                   signature='sv')
2479            try:
2480                iface.TDLSChannelSwitch(args)
2481            except Exception as e:
2482                if "InvalidArgs" not in str(e):
2483                    raise Exception("Unexpected exception")
2484
2485            # Valid parameters
2486            args = dbus.Dictionary({'OperClass': dbus.Byte(1),
2487                                    'Frequency': dbus.UInt32(2417),
2488                                    'PeerAddress': addr1,
2489                                    'SecChannelOffset': dbus.UInt32(0),
2490                                    'CenterFrequency1': dbus.UInt32(0),
2491                                    'CenterFrequency2': dbus.UInt32(0),
2492                                    'Bandwidth': dbus.UInt32(20),
2493                                    'HT': dbus.Boolean(False),
2494                                    'VHT': dbus.Boolean(False)},
2495                                   signature='sv')
2496            iface.TDLSChannelSwitch(args)
2497
2498            gobject.timeout_add(200, self.run_tdls4)
2499            return False
2500
2501        def run_tdls4(self, *args):
2502            logger.debug("run_tdls4")
2503            iface.TDLSCancelChannelSwitch(addr1)
2504            self.tdls_done = True
2505            self.loop.quit()
2506            return False
2507
2508        def success(self):
2509            return self.tdls_setup and self.tdls_done
2510
2511    with TestDbusTdls(bus) as t:
2512        if not t.success():
2513            raise Exception("Expected signals not seen")
2514
2515def test_dbus_pkcs11(dev, apdev):
2516    """D-Bus SetPKCS11EngineAndModulePath()"""
2517    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2518    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2519
2520    try:
2521        iface.SetPKCS11EngineAndModulePath("foo", "bar")
2522    except dbus.exceptions.DBusException as e:
2523        if "Error.Failed: Reinit of the EAPOL" not in str(e):
2524            raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e))
2525
2526    try:
2527        iface.SetPKCS11EngineAndModulePath("foo", "")
2528    except dbus.exceptions.DBusException as e:
2529        if "Error.Failed: Reinit of the EAPOL" not in str(e):
2530            raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e))
2531
2532    iface.SetPKCS11EngineAndModulePath("", "bar")
2533    res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath",
2534                     dbus_interface=dbus.PROPERTIES_IFACE)
2535    if res != "":
2536        raise Exception("Unexpected PKCS11EnginePath value: " + res)
2537    res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath",
2538                     dbus_interface=dbus.PROPERTIES_IFACE)
2539    if res != "bar":
2540        raise Exception("Unexpected PKCS11ModulePath value: " + res)
2541
2542    iface.SetPKCS11EngineAndModulePath("", "")
2543    res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath",
2544                     dbus_interface=dbus.PROPERTIES_IFACE)
2545    if res != "":
2546        raise Exception("Unexpected PKCS11EnginePath value: " + res)
2547    res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath",
2548                     dbus_interface=dbus.PROPERTIES_IFACE)
2549    if res != "":
2550        raise Exception("Unexpected PKCS11ModulePath value: " + res)
2551
2552def test_dbus_apscan(dev, apdev):
2553    """D-Bus Get/Set ApScan"""
2554    try:
2555        _test_dbus_apscan(dev, apdev)
2556    finally:
2557        dev[0].request("AP_SCAN 1")
2558
2559def _test_dbus_apscan(dev, apdev):
2560    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2561
2562    res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan",
2563                     dbus_interface=dbus.PROPERTIES_IFACE)
2564    if res != 1:
2565        raise Exception("Unexpected initial ApScan value: %d" % res)
2566
2567    for i in range(3):
2568        if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(i),
2569                     dbus_interface=dbus.PROPERTIES_IFACE)
2570        res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan",
2571                         dbus_interface=dbus.PROPERTIES_IFACE)
2572        if res != i:
2573            raise Exception("Unexpected ApScan value %d (expected %d)" % (res, i))
2574
2575    try:
2576        if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.Int16(-1),
2577                   dbus_interface=dbus.PROPERTIES_IFACE)
2578        raise Exception("Invalid Set(ApScan,-1) accepted")
2579    except dbus.exceptions.DBusException as e:
2580        if "Error.Failed: wrong property type" not in str(e):
2581            raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e))
2582
2583    try:
2584        if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(123),
2585                   dbus_interface=dbus.PROPERTIES_IFACE)
2586        raise Exception("Invalid Set(ApScan,123) accepted")
2587    except dbus.exceptions.DBusException as e:
2588        if "Error.Failed: ap_scan must be 0, 1, or 2" not in str(e):
2589            raise Exception("Unexpected error message for invalid Set(ApScan,123): " + str(e))
2590
2591    if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(1),
2592               dbus_interface=dbus.PROPERTIES_IFACE)
2593
2594def test_dbus_pmf(dev, apdev):
2595    """D-Bus Get/Set Pmf"""
2596    try:
2597        _test_dbus_pmf(dev, apdev)
2598    finally:
2599        dev[0].request("SET pmf 0")
2600
2601def _test_dbus_pmf(dev, apdev):
2602    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2603
2604    dev[0].set("pmf", "0")
2605    res = if_obj.Get(WPAS_DBUS_IFACE, "Pmf",
2606                     dbus_interface=dbus.PROPERTIES_IFACE)
2607    if res != "0":
2608        raise Exception("Unexpected initial Pmf value: %s" % res)
2609
2610    for i in range(3):
2611        if_obj.Set(WPAS_DBUS_IFACE, "Pmf", str(i),
2612                   dbus_interface=dbus.PROPERTIES_IFACE)
2613        res = if_obj.Get(WPAS_DBUS_IFACE, "Pmf",
2614                         dbus_interface=dbus.PROPERTIES_IFACE)
2615        if res != str(i):
2616            raise Exception("Unexpected Pmf value %s (expected %d)" % (res, i))
2617
2618    if_obj.Set(WPAS_DBUS_IFACE, "Pmf", "1",
2619               dbus_interface=dbus.PROPERTIES_IFACE)
2620
2621def test_dbus_fastreauth(dev, apdev):
2622    """D-Bus Get/Set FastReauth"""
2623    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2624
2625    res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth",
2626                     dbus_interface=dbus.PROPERTIES_IFACE)
2627    if res != True:
2628        raise Exception("Unexpected initial FastReauth value: " + str(res))
2629
2630    for i in [False, True]:
2631        if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(i),
2632                     dbus_interface=dbus.PROPERTIES_IFACE)
2633        res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth",
2634                         dbus_interface=dbus.PROPERTIES_IFACE)
2635        if res != i:
2636            raise Exception("Unexpected FastReauth value %d (expected %d)" % (res, i))
2637
2638    try:
2639        if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Int16(-1),
2640                   dbus_interface=dbus.PROPERTIES_IFACE)
2641        raise Exception("Invalid Set(FastReauth,-1) accepted")
2642    except dbus.exceptions.DBusException as e:
2643        if "Error.Failed: wrong property type" not in str(e):
2644            raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e))
2645
2646    if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(True),
2647               dbus_interface=dbus.PROPERTIES_IFACE)
2648
2649def test_dbus_bss_expire(dev, apdev):
2650    """D-Bus Get/Set BSSExpireAge and BSSExpireCount"""
2651    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2652
2653    if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(179),
2654               dbus_interface=dbus.PROPERTIES_IFACE)
2655    res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireAge",
2656                     dbus_interface=dbus.PROPERTIES_IFACE)
2657    if res != 179:
2658        raise Exception("Unexpected BSSExpireAge value %d (expected %d)" % (res, i))
2659
2660    if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(3),
2661               dbus_interface=dbus.PROPERTIES_IFACE)
2662    res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireCount",
2663                     dbus_interface=dbus.PROPERTIES_IFACE)
2664    if res != 3:
2665        raise Exception("Unexpected BSSExpireCount value %d (expected %d)" % (res, i))
2666
2667    try:
2668        if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.Int16(-1),
2669                   dbus_interface=dbus.PROPERTIES_IFACE)
2670        raise Exception("Invalid Set(BSSExpireAge,-1) accepted")
2671    except dbus.exceptions.DBusException as e:
2672        if "Error.Failed: wrong property type" not in str(e):
2673            raise Exception("Unexpected error message for invalid Set(BSSExpireAge,-1): " + str(e))
2674
2675    try:
2676        if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(9),
2677                   dbus_interface=dbus.PROPERTIES_IFACE)
2678        raise Exception("Invalid Set(BSSExpireAge,9) accepted")
2679    except dbus.exceptions.DBusException as e:
2680        if "Error.Failed: BSSExpireAge must be >= 10" not in str(e):
2681            raise Exception("Unexpected error message for invalid Set(BSSExpireAge,9): " + str(e))
2682
2683    try:
2684        if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.Int16(-1),
2685                   dbus_interface=dbus.PROPERTIES_IFACE)
2686        raise Exception("Invalid Set(BSSExpireCount,-1) accepted")
2687    except dbus.exceptions.DBusException as e:
2688        if "Error.Failed: wrong property type" not in str(e):
2689            raise Exception("Unexpected error message for invalid Set(BSSExpireCount,-1): " + str(e))
2690
2691    try:
2692        if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(0),
2693                   dbus_interface=dbus.PROPERTIES_IFACE)
2694        raise Exception("Invalid Set(BSSExpireCount,0) accepted")
2695    except dbus.exceptions.DBusException as e:
2696        if "Error.Failed: BSSExpireCount must be > 0" not in str(e):
2697            raise Exception("Unexpected error message for invalid Set(BSSExpireCount,0): " + str(e))
2698
2699    if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(180),
2700               dbus_interface=dbus.PROPERTIES_IFACE)
2701    if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(2),
2702               dbus_interface=dbus.PROPERTIES_IFACE)
2703
2704def test_dbus_country(dev, apdev):
2705    """D-Bus Get/Set Country"""
2706    try:
2707        _test_dbus_country(dev, apdev)
2708    finally:
2709        dev[0].request("SET country 00")
2710        subprocess.call(['iw', 'reg', 'set', '00'])
2711
2712def _test_dbus_country(dev, apdev):
2713    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2714
2715    # work around issues with possible pending regdom event from the end of
2716    # the previous test case
2717    time.sleep(0.2)
2718    dev[0].dump_monitor()
2719
2720    if_obj.Set(WPAS_DBUS_IFACE, "Country", "FI",
2721               dbus_interface=dbus.PROPERTIES_IFACE)
2722    res = if_obj.Get(WPAS_DBUS_IFACE, "Country",
2723                     dbus_interface=dbus.PROPERTIES_IFACE)
2724    if res != "FI":
2725        raise Exception("Unexpected Country value %s (expected FI)" % res)
2726
2727    ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
2728    if ev is None:
2729        # For now, work around separate P2P Device interface event delivery
2730        ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
2731        if ev is None:
2732            raise Exception("regdom change event not seen")
2733    if "init=USER type=COUNTRY alpha2=FI" not in ev:
2734        raise Exception("Unexpected event contents: " + ev)
2735
2736    try:
2737        if_obj.Set(WPAS_DBUS_IFACE, "Country", dbus.Int16(-1),
2738                   dbus_interface=dbus.PROPERTIES_IFACE)
2739        raise Exception("Invalid Set(Country,-1) accepted")
2740    except dbus.exceptions.DBusException as e:
2741        if "Error.Failed: wrong property type" not in str(e):
2742            raise Exception("Unexpected error message for invalid Set(Country,-1): " + str(e))
2743
2744    try:
2745        if_obj.Set(WPAS_DBUS_IFACE, "Country", "F",
2746                   dbus_interface=dbus.PROPERTIES_IFACE)
2747        raise Exception("Invalid Set(Country,F) accepted")
2748    except dbus.exceptions.DBusException as e:
2749        if "Error.Failed: invalid country code" not in str(e):
2750            raise Exception("Unexpected error message for invalid Set(Country,F): " + str(e))
2751
2752    if_obj.Set(WPAS_DBUS_IFACE, "Country", "00",
2753               dbus_interface=dbus.PROPERTIES_IFACE)
2754
2755    ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
2756    if ev is None:
2757        # For now, work around separate P2P Device interface event delivery
2758        ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
2759        if ev is None:
2760            raise Exception("regdom change event not seen")
2761    # init=CORE was previously used due to invalid db.txt data for 00. For
2762    # now, allow both it and the new init=USER after fixed db.txt.
2763    if "init=CORE type=WORLD" not in ev and "init=USER type=WORLD" not in ev:
2764        raise Exception("Unexpected event contents: " + ev)
2765
2766def test_dbus_scan_interval(dev, apdev):
2767    """D-Bus Get/Set ScanInterval"""
2768    try:
2769        _test_dbus_scan_interval(dev, apdev)
2770    finally:
2771        dev[0].request("SCAN_INTERVAL 5")
2772
2773def _test_dbus_scan_interval(dev, apdev):
2774    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2775
2776    if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(3),
2777               dbus_interface=dbus.PROPERTIES_IFACE)
2778    res = if_obj.Get(WPAS_DBUS_IFACE, "ScanInterval",
2779                     dbus_interface=dbus.PROPERTIES_IFACE)
2780    if res != 3:
2781        raise Exception("Unexpected ScanInterval value %d (expected %d)" % (res, i))
2782
2783    try:
2784        if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.UInt16(100),
2785                   dbus_interface=dbus.PROPERTIES_IFACE)
2786        raise Exception("Invalid Set(ScanInterval,100) accepted")
2787    except dbus.exceptions.DBusException as e:
2788        if "Error.Failed: wrong property type" not in str(e):
2789            raise Exception("Unexpected error message for invalid Set(ScanInterval,100): " + str(e))
2790
2791    try:
2792        if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(-1),
2793                   dbus_interface=dbus.PROPERTIES_IFACE)
2794        raise Exception("Invalid Set(ScanInterval,-1) accepted")
2795    except dbus.exceptions.DBusException as e:
2796        if "Error.Failed: scan_interval must be >= 0" not in str(e):
2797            raise Exception("Unexpected error message for invalid Set(ScanInterval,-1): " + str(e))
2798
2799    if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(5),
2800               dbus_interface=dbus.PROPERTIES_IFACE)
2801
2802def test_dbus_probe_req_reporting(dev, apdev):
2803    """D-Bus Probe Request reporting"""
2804    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2805
2806    dev[1].p2p_find(social=True)
2807
2808    class TestDbusProbe(TestDbus):
2809        def __init__(self, bus):
2810            TestDbus.__init__(self, bus)
2811            self.reported = False
2812
2813        def __enter__(self):
2814            gobject.timeout_add(1, self.run_test)
2815            gobject.timeout_add(15000, self.timeout)
2816            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
2817                            "GroupStarted")
2818            self.add_signal(self.probeRequest, WPAS_DBUS_IFACE, "ProbeRequest",
2819                            byte_arrays=True)
2820            self.loop.run()
2821            return self
2822
2823        def groupStarted(self, properties):
2824            logger.debug("groupStarted: " + str(properties))
2825            g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
2826                                      properties['interface_object'])
2827            self.iface = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE)
2828            self.iface.SubscribeProbeReq()
2829            self.group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2830
2831        def probeRequest(self, args):
2832            logger.debug("probeRequest: args=%s" % str(args))
2833            self.reported = True
2834            self.loop.quit()
2835
2836        def run_test(self, *args):
2837            logger.debug("run_test")
2838            p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2839            params = dbus.Dictionary({'frequency': 2412})
2840            p2p.GroupAdd(params)
2841            return False
2842
2843        def success(self):
2844            return self.reported
2845
2846    with TestDbusProbe(bus) as t:
2847        if not t.success():
2848            raise Exception("Expected signals not seen")
2849        t.iface.UnsubscribeProbeReq()
2850        try:
2851            t.iface.UnsubscribeProbeReq()
2852            raise Exception("Invalid UnsubscribeProbeReq() accepted")
2853        except dbus.exceptions.DBusException as e:
2854            if "NoSubscription" not in str(e):
2855                raise Exception("Unexpected error message for invalid UnsubscribeProbeReq(): " + str(e))
2856        t.group_p2p.Disconnect()
2857
2858    with TestDbusProbe(bus) as t:
2859        if not t.success():
2860            raise Exception("Expected signals not seen")
2861        # On purpose, leave ProbeReq subscription in place to test automatic
2862        # cleanup.
2863
2864    dev[1].p2p_stop_find()
2865
2866def test_dbus_probe_req_reporting_oom(dev, apdev):
2867    """D-Bus Probe Request reporting (OOM)"""
2868    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2869    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2870
2871    # Need to make sure this process has not already subscribed to avoid false
2872    # failures due to the operation succeeding due to os_strdup() not even
2873    # getting called.
2874    try:
2875        iface.UnsubscribeProbeReq()
2876        was_subscribed = True
2877    except dbus.exceptions.DBusException as e:
2878        was_subscribed = False
2879        pass
2880
2881    with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_subscribe_preq",
2882                         "SubscribeProbeReq"):
2883        iface.SubscribeProbeReq()
2884
2885    if was_subscribed:
2886        # On purpose, leave ProbeReq subscription in place to test automatic
2887        # cleanup.
2888        iface.SubscribeProbeReq()
2889
2890def test_dbus_p2p_invalid(dev, apdev):
2891    """D-Bus invalid P2P operations"""
2892    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2893    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2894
2895    try:
2896        p2p.RejectPeer(path + "/Peers/00112233445566")
2897        raise Exception("Invalid RejectPeer accepted")
2898    except dbus.exceptions.DBusException as e:
2899        if "UnknownError: Failed to call wpas_p2p_reject" not in str(e):
2900            raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2901
2902    try:
2903        p2p.RejectPeer("/foo")
2904        raise Exception("Invalid RejectPeer accepted")
2905    except dbus.exceptions.DBusException as e:
2906        if "InvalidArgs" not in str(e):
2907            raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2908
2909    tests = [{},
2910             {'peer': 'foo'},
2911             {'foo': "bar"},
2912             {'iface': "abc"},
2913             {'iface': 123}]
2914    for t in tests:
2915        try:
2916            p2p.RemoveClient(t)
2917            raise Exception("Invalid RemoveClient accepted")
2918        except dbus.exceptions.DBusException as e:
2919            if "InvalidArgs" not in str(e):
2920                raise Exception("Unexpected error message for invalid RemoveClient(): " + str(e))
2921
2922    tests = [{'DiscoveryType': 'foo'},
2923             {'RequestedDeviceTypes': 'foo'},
2924             {'RequestedDeviceTypes': ['foo']},
2925             {'RequestedDeviceTypes': ['1', '2', '3', '4', '5', '6', '7', '8',
2926                                       '9', '10', '11', '12', '13', '14', '15',
2927                                       '16', '17']},
2928             {'RequestedDeviceTypes': dbus.Array([], signature="s")},
2929             {'RequestedDeviceTypes': dbus.Array([['foo']], signature="as")},
2930             {'RequestedDeviceTypes': dbus.Array([], signature="i")},
2931             {'RequestedDeviceTypes': [dbus.ByteArray(b'12345678'),
2932                                       dbus.ByteArray(b'1234567')]},
2933             {'Foo': dbus.Int16(1)},
2934             {'Foo': dbus.UInt16(1)},
2935             {'Foo': dbus.Int64(1)},
2936             {'Foo': dbus.UInt64(1)},
2937             {'Foo': dbus.Double(1.23)},
2938             {'Foo': dbus.Signature('s')},
2939             {'Foo': 'bar'}]
2940    for t in tests:
2941        try:
2942            p2p.Find(dbus.Dictionary(t))
2943            raise Exception("Invalid Find accepted")
2944        except dbus.exceptions.DBusException as e:
2945            if "InvalidArgs" not in str(e):
2946                raise Exception("Unexpected error message for invalid Find(): " + str(e))
2947
2948    for p in ["/foo",
2949              "/fi/w1/wpa_supplicant1/Interfaces/1234",
2950              "/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234"]:
2951        try:
2952            p2p.RemovePersistentGroup(dbus.ObjectPath(p))
2953            raise Exception("Invalid RemovePersistentGroup accepted")
2954        except dbus.exceptions.DBusException as e:
2955            if "InvalidArgs" not in str(e):
2956                raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e))
2957
2958    try:
2959        dev[0].request("P2P_SET disabled 1")
2960        p2p.Listen(5)
2961        raise Exception("Invalid Listen accepted")
2962    except dbus.exceptions.DBusException as e:
2963        if "UnknownError: Could not start P2P listen" not in str(e):
2964            raise Exception("Unexpected error message for invalid Listen: " + str(e))
2965    finally:
2966        dev[0].request("P2P_SET disabled 0")
2967
2968    test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False)
2969    test_p2p = dbus.Interface(test_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2970    try:
2971        test_p2p.Listen("foo")
2972        raise Exception("Invalid Listen accepted")
2973    except dbus.exceptions.DBusException as e:
2974        if "InvalidArgs" not in str(e):
2975            raise Exception("Unexpected error message for invalid Listen: " + str(e))
2976
2977    try:
2978        dev[0].request("P2P_SET disabled 1")
2979        p2p.ExtendedListen(dbus.Dictionary({}))
2980        raise Exception("Invalid ExtendedListen accepted")
2981    except dbus.exceptions.DBusException as e:
2982        if "UnknownError: failed to initiate a p2p_ext_listen" not in str(e):
2983            raise Exception("Unexpected error message for invalid ExtendedListen: " + str(e))
2984    finally:
2985        dev[0].request("P2P_SET disabled 0")
2986
2987    try:
2988        dev[0].request("P2P_SET disabled 1")
2989        args = {'duration1': 30000, 'interval1': 102400,
2990                'duration2': 20000, 'interval2': 102400}
2991        p2p.PresenceRequest(args)
2992        raise Exception("Invalid PresenceRequest accepted")
2993    except dbus.exceptions.DBusException as e:
2994        if "UnknownError: Failed to invoke presence request" not in str(e):
2995            raise Exception("Unexpected error message for invalid PresenceRequest: " + str(e))
2996    finally:
2997        dev[0].request("P2P_SET disabled 0")
2998
2999    try:
3000        params = dbus.Dictionary({'frequency': dbus.Int32(-1)})
3001        p2p.GroupAdd(params)
3002        raise Exception("Invalid GroupAdd accepted")
3003    except dbus.exceptions.DBusException as e:
3004        if "InvalidArgs" not in str(e):
3005            raise Exception("Unexpected error message for invalid GroupAdd: " + str(e))
3006
3007    try:
3008        params = dbus.Dictionary({'persistent_group_object':
3009                                  dbus.ObjectPath(path),
3010                                  'frequency': 2412})
3011        p2p.GroupAdd(params)
3012        raise Exception("Invalid GroupAdd accepted")
3013    except dbus.exceptions.DBusException as e:
3014        if "InvalidArgs" not in str(e):
3015            raise Exception("Unexpected error message for invalid GroupAdd: " + str(e))
3016
3017    try:
3018        p2p.Disconnect()
3019        raise Exception("Invalid Disconnect accepted")
3020    except dbus.exceptions.DBusException as e:
3021        if "UnknownError: failed to disconnect" not in str(e):
3022            raise Exception("Unexpected error message for invalid Disconnect: " + str(e))
3023
3024    try:
3025        dev[0].request("P2P_SET disabled 1")
3026        p2p.Flush()
3027        raise Exception("Invalid Flush accepted")
3028    except dbus.exceptions.DBusException as e:
3029        if "Error.Failed: P2P is not available for this interface" not in str(e):
3030            raise Exception("Unexpected error message for invalid Flush: " + str(e))
3031    finally:
3032        dev[0].request("P2P_SET disabled 0")
3033
3034    try:
3035        dev[0].request("P2P_SET disabled 1")
3036        args = {'peer': path,
3037                'join': True,
3038                'wps_method': 'pbc',
3039                'frequency': 2412}
3040        pin = p2p.Connect(args)
3041        raise Exception("Invalid Connect accepted")
3042    except dbus.exceptions.DBusException as e:
3043        if "Error.Failed: P2P is not available for this interface" not in str(e):
3044            raise Exception("Unexpected error message for invalid Connect: " + str(e))
3045    finally:
3046        dev[0].request("P2P_SET disabled 0")
3047
3048    tests = [{'frequency': dbus.Int32(-1)},
3049             {'wps_method': 'pbc'},
3050             {'wps_method': 'foo'}]
3051    for args in tests:
3052        try:
3053            pin = p2p.Connect(args)
3054            raise Exception("Invalid Connect accepted")
3055        except dbus.exceptions.DBusException as e:
3056            if "InvalidArgs" not in str(e):
3057                raise Exception("Unexpected error message for invalid Connect: " + str(e))
3058
3059    try:
3060        dev[0].request("P2P_SET disabled 1")
3061        args = {'peer': path}
3062        pin = p2p.Invite(args)
3063        raise Exception("Invalid Invite accepted")
3064    except dbus.exceptions.DBusException as e:
3065        if "Error.Failed: P2P is not available for this interface" not in str(e):
3066            raise Exception("Unexpected error message for invalid Invite: " + str(e))
3067    finally:
3068        dev[0].request("P2P_SET disabled 0")
3069
3070    try:
3071        args = {'foo': 'bar'}
3072        pin = p2p.Invite(args)
3073        raise Exception("Invalid Invite accepted")
3074    except dbus.exceptions.DBusException as e:
3075        if "InvalidArgs" not in str(e):
3076            raise Exception("Unexpected error message for invalid Connect: " + str(e))
3077
3078    tests = [(path, 'display', "InvalidArgs"),
3079             (dbus.ObjectPath(path + "/Peers/00112233445566"),
3080              'display',
3081              "UnknownError: Failed to send provision discovery request"),
3082             (dbus.ObjectPath(path + "/Peers/00112233445566"),
3083              'keypad',
3084              "UnknownError: Failed to send provision discovery request"),
3085             (dbus.ObjectPath(path + "/Peers/00112233445566"),
3086              'pbc',
3087              "UnknownError: Failed to send provision discovery request"),
3088             (dbus.ObjectPath(path + "/Peers/00112233445566"),
3089              'pushbutton',
3090              "UnknownError: Failed to send provision discovery request"),
3091             (dbus.ObjectPath(path + "/Peers/00112233445566"),
3092              'foo', "InvalidArgs")]
3093    for (p, method, err) in tests:
3094        try:
3095            p2p.ProvisionDiscoveryRequest(p, method)
3096            raise Exception("Invalid ProvisionDiscoveryRequest accepted")
3097        except dbus.exceptions.DBusException as e:
3098            if err not in str(e):
3099                raise Exception("Unexpected error message for invalid ProvisionDiscoveryRequest: " + str(e))
3100
3101    try:
3102        dev[0].request("P2P_SET disabled 1")
3103        if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers",
3104                   dbus_interface=dbus.PROPERTIES_IFACE)
3105        raise Exception("Invalid Get(Peers) accepted")
3106    except dbus.exceptions.DBusException as e:
3107        if "Error.Failed: P2P is not available for this interface" not in str(e):
3108            raise Exception("Unexpected error message for invalid Get(Peers): " + str(e))
3109    finally:
3110        dev[0].request("P2P_SET disabled 0")
3111
3112def test_dbus_p2p_oom(dev, apdev):
3113    """D-Bus P2P operations and OOM"""
3114    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
3115    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3116
3117    with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_string_array",
3118                         "Find", "InvalidArgs"):
3119        p2p.Find(dbus.Dictionary({'Foo': ['bar']}))
3120
3121    with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_string_array",
3122                         "Find", "InvalidArgs"):
3123        p2p.Find(dbus.Dictionary({'Foo': ['bar']}))
3124
3125    with alloc_fail_dbus(dev[0], 10, "_wpa_dbus_dict_entry_get_string_array",
3126                         "Find", "InvalidArgs"):
3127        p2p.Find(dbus.Dictionary({'Foo': ['1', '2', '3', '4', '5', '6', '7',
3128                                          '8', '9']}))
3129
3130    with alloc_fail_dbus(dev[0], 1, ":=_wpa_dbus_dict_entry_get_binarray",
3131                         "Find", "InvalidArgs"):
3132        p2p.Find(dbus.Dictionary({'Foo': [dbus.ByteArray(b'123')]}))
3133
3134    with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array;_wpa_dbus_dict_entry_get_binarray",
3135                         "Find", "InvalidArgs"):
3136        p2p.Find(dbus.Dictionary({'Foo': [dbus.ByteArray(b'123')]}))
3137
3138    with alloc_fail_dbus(dev[0], 2, "=_wpa_dbus_dict_entry_get_binarray",
3139                         "Find", "InvalidArgs"):
3140        p2p.Find(dbus.Dictionary({'Foo': [dbus.ByteArray(b'123'),
3141                                          dbus.ByteArray(b'123'),
3142                                          dbus.ByteArray(b'123'),
3143                                          dbus.ByteArray(b'123'),
3144                                          dbus.ByteArray(b'123'),
3145                                          dbus.ByteArray(b'123'),
3146                                          dbus.ByteArray(b'123'),
3147                                          dbus.ByteArray(b'123'),
3148                                          dbus.ByteArray(b'123'),
3149                                          dbus.ByteArray(b'123'),
3150                                          dbus.ByteArray(b'123')]}))
3151
3152    with alloc_fail_dbus(dev[0], 1, "wpabuf_alloc_ext_data;_wpa_dbus_dict_entry_get_binarray",
3153                         "Find", "InvalidArgs"):
3154        p2p.Find(dbus.Dictionary({'Foo': [dbus.ByteArray(b'123')]}))
3155
3156    with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_handler_p2p_find",
3157                         "Find", "InvalidArgs"):
3158        p2p.Find(dbus.Dictionary({'Foo': path}))
3159
3160    with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array",
3161                         "AddService", "InvalidArgs"):
3162        args = {'service_type': 'bonjour',
3163                'response': dbus.ByteArray(500*b'b')}
3164        p2p.AddService(args)
3165
3166    with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_byte_array",
3167                         "AddService", "InvalidArgs"):
3168        p2p.AddService(args)
3169
3170def test_dbus_p2p_discovery(dev, apdev):
3171    """D-Bus P2P discovery"""
3172    try:
3173        run_dbus_p2p_discovery(dev, apdev)
3174    finally:
3175        dev[1].request("VENDOR_ELEM_REMOVE 1 *")
3176
3177def run_dbus_p2p_discovery(dev, apdev):
3178    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
3179    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3180
3181    addr0 = dev[0].p2p_dev_addr()
3182
3183    dev[1].request("SET sec_device_type 1-0050F204-2")
3184    dev[1].request("VENDOR_ELEM_ADD 1 dd0c0050f2041049000411223344")
3185    dev[1].request("VENDOR_ELEM_ADD 1 dd06001122335566")
3186    dev[1].p2p_listen()
3187    addr1 = dev[1].p2p_dev_addr()
3188    a1 = binascii.unhexlify(addr1.replace(':', ''))
3189
3190    wfd_devinfo = "00001c440028"
3191    dev[2].request("SET wifi_display 1")
3192    dev[2].request("WFD_SUBELEM_SET 0 0006" + wfd_devinfo)
3193    wfd = binascii.unhexlify('000006' + wfd_devinfo)
3194    dev[2].p2p_listen()
3195    addr2 = dev[2].p2p_dev_addr()
3196    a2 = binascii.unhexlify(addr2.replace(':', ''))
3197
3198    res = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
3199                        dbus_interface=dbus.PROPERTIES_IFACE)
3200    if 'Peers' not in res:
3201        raise Exception("GetAll result missing Peers")
3202    if len(res['Peers']) != 0:
3203        raise Exception("Unexpected peer(s) in the list")
3204
3205    args = {'DiscoveryType': 'social',
3206            'RequestedDeviceTypes': [dbus.ByteArray(b'12345678')],
3207            'Timeout': dbus.Int32(1)}
3208    p2p.Find(dbus.Dictionary(args))
3209    p2p.StopFind()
3210
3211    class TestDbusP2p(TestDbus):
3212        def __init__(self, bus):
3213            TestDbus.__init__(self, bus)
3214            self.found = False
3215            self.found2 = False
3216            self.found_prop = False
3217            self.lost = False
3218            self.find_stopped = False
3219
3220        def __enter__(self):
3221            gobject.timeout_add(1, self.run_test)
3222            gobject.timeout_add(15000, self.timeout)
3223            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3224                            "DeviceFound")
3225            self.add_signal(self.deviceFoundProperties,
3226                            WPAS_DBUS_IFACE_P2PDEVICE, "DeviceFoundProperties")
3227            self.add_signal(self.deviceLost, WPAS_DBUS_IFACE_P2PDEVICE,
3228                            "DeviceLost")
3229            self.add_signal(self.provisionDiscoveryResponseEnterPin,
3230                            WPAS_DBUS_IFACE_P2PDEVICE,
3231                            "ProvisionDiscoveryResponseEnterPin")
3232            self.add_signal(self.findStopped, WPAS_DBUS_IFACE_P2PDEVICE,
3233                            "FindStopped")
3234            self.loop.run()
3235            return self
3236
3237        def deviceFound(self, path):
3238            logger.debug("deviceFound: path=%s" % path)
3239            res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers",
3240                             dbus_interface=dbus.PROPERTIES_IFACE)
3241            if len(res) < 1:
3242                raise Exception("Unexpected number of peers")
3243            if path not in res:
3244                raise Exception("Mismatch in peer object path")
3245            peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3246            res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3247                                  dbus_interface=dbus.PROPERTIES_IFACE,
3248                                  byte_arrays=True)
3249            logger.debug("peer properties: " + str(res))
3250
3251            if res['DeviceAddress'] == a1:
3252                if 'SecondaryDeviceTypes' not in res:
3253                    raise Exception("Missing SecondaryDeviceTypes")
3254                sec = res['SecondaryDeviceTypes']
3255                if len(sec) < 1:
3256                    raise Exception("Secondary device type missing")
3257                if b"\x00\x01\x00\x50\xF2\x04\x00\x02" not in sec:
3258                    raise Exception("Secondary device type mismatch")
3259
3260                if 'VendorExtension' not in res:
3261                    raise Exception("Missing VendorExtension")
3262                vendor = res['VendorExtension']
3263                if len(vendor) < 1:
3264                    raise Exception("Vendor extension missing")
3265                if b"\x11\x22\x33\x44" not in vendor:
3266                    raise Exception("Secondary device type mismatch")
3267
3268                if 'VSIE' not in res:
3269                    raise Exception("Missing VSIE")
3270                vendor = res['VSIE']
3271                if len(vendor) < 1:
3272                    raise Exception("VSIE missing")
3273                if vendor != b"\xdd\x06\x00\x11\x22\x33\x55\x66":
3274                    raise Exception("VSIE mismatch")
3275
3276                self.found = True
3277            elif res['DeviceAddress'] == a2:
3278                if 'IEs' not in res:
3279                    raise Exception("IEs missing")
3280                if res['IEs'] != wfd:
3281                    raise Exception("IEs mismatch")
3282                self.found2 = True
3283            else:
3284                raise Exception("Unexpected peer device address")
3285
3286            if self.found and self.found2:
3287                p2p.StopFind()
3288                p2p.RejectPeer(path)
3289                p2p.ProvisionDiscoveryRequest(path, 'display')
3290
3291        def deviceLost(self, path):
3292            logger.debug("deviceLost: path=%s" % path)
3293            if not self.found or not self.found2:
3294                # This may happen if a previous test case ended up scheduling
3295                # deviceLost event and that event did not get delivered before
3296                # starting the next test execution.
3297                logger.debug("Ignore deviceLost before the deviceFound events")
3298                return
3299            self.lost = True
3300            try:
3301                p2p.RejectPeer(path)
3302                raise Exception("Invalid RejectPeer accepted")
3303            except dbus.exceptions.DBusException as e:
3304                if "UnknownError: Failed to call wpas_p2p_reject" not in str(e):
3305                    raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
3306            self.loop.quit()
3307
3308        def deviceFoundProperties(self, path, properties):
3309            logger.debug("deviceFoundProperties: path=%s" % path)
3310            logger.debug("peer properties: " + str(properties))
3311            if properties['DeviceAddress'] == a1:
3312                self.found_prop = True
3313
3314        def provisionDiscoveryResponseEnterPin(self, peer_object):
3315            logger.debug("provisionDiscoveryResponseEnterPin - peer=%s" % peer_object)
3316            p2p.Flush()
3317
3318        def findStopped(self):
3319            logger.debug("findStopped")
3320            self.find_stopped = True
3321
3322        def run_test(self, *args):
3323            logger.debug("run_test")
3324            p2p.Find(dbus.Dictionary({'DiscoveryType': 'social',
3325                                      'Timeout': dbus.Int32(10)}))
3326            return False
3327
3328        def success(self):
3329            return self.found and self.lost and self.found2 and self.find_stopped
3330
3331    with TestDbusP2p(bus) as t:
3332        if not t.success():
3333            raise Exception("Expected signals not seen")
3334
3335    dev[1].request("VENDOR_ELEM_REMOVE 1 *")
3336    dev[1].p2p_stop_find()
3337
3338    p2p.Listen(1)
3339    dev[2].p2p_stop_find()
3340    dev[2].request("P2P_FLUSH")
3341    if not dev[2].discover_peer(addr0):
3342        raise Exception("Peer not found")
3343    p2p.StopFind()
3344    dev[2].p2p_stop_find()
3345
3346    try:
3347        p2p.ExtendedListen(dbus.Dictionary({'foo': 100}))
3348        raise Exception("Invalid ExtendedListen accepted")
3349    except dbus.exceptions.DBusException as e:
3350        if "InvalidArgs" not in str(e):
3351            raise Exception("Unexpected error message for invalid ExtendedListen(): " + str(e))
3352
3353    p2p.ExtendedListen(dbus.Dictionary({'period': 100, 'interval': 1000}))
3354    p2p.ExtendedListen(dbus.Dictionary({}))
3355    dev[0].global_request("P2P_EXT_LISTEN")
3356
3357def test_dbus_p2p_discovery_freq(dev, apdev):
3358    """D-Bus P2P discovery on a specific non-social channel"""
3359    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
3360    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3361
3362    addr1 = dev[1].p2p_dev_addr()
3363    autogo(dev[1], freq=2422)
3364
3365    class TestDbusP2p(TestDbus):
3366        def __init__(self, bus):
3367            TestDbus.__init__(self, bus)
3368            self.found = False
3369
3370        def __enter__(self):
3371            gobject.timeout_add(1, self.run_test)
3372            gobject.timeout_add(5000, self.timeout)
3373            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3374                            "DeviceFound")
3375            self.loop.run()
3376            return self
3377
3378        def deviceFound(self, path):
3379            logger.debug("deviceFound: path=%s" % path)
3380            self.found = True
3381            self.loop.quit()
3382
3383        def run_test(self, *args):
3384            logger.debug("run_test")
3385            p2p.Find(dbus.Dictionary({'freq': 2422}))
3386            return False
3387
3388        def success(self):
3389            return self.found
3390
3391    with TestDbusP2p(bus) as t:
3392        if not t.success():
3393            raise Exception("Expected signals not seen")
3394
3395    dev[1].remove_group()
3396    p2p.StopFind()
3397
3398def test_dbus_p2p_service_discovery(dev, apdev):
3399    """D-Bus P2P service discovery"""
3400    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
3401    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3402
3403    addr0 = dev[0].p2p_dev_addr()
3404    addr1 = dev[1].p2p_dev_addr()
3405
3406    bonjour_query = dbus.ByteArray(binascii.unhexlify('0b5f6166706f766572746370c00c000c01'))
3407    bonjour_response = dbus.ByteArray(binascii.unhexlify('074578616d706c65c027'))
3408
3409    tests = [{'service_type': 'bonjour',
3410              'query': bonjour_query,
3411              'response': bonjour_response},
3412             {'service_type': 'upnp',
3413              'version': 0x10,
3414              'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice',
3415              'query': bonjour_query,
3416              'response': bonjour_response}]
3417    for args in tests:
3418        p2p.AddService(args)
3419        p2p.FlushService()
3420
3421    args = {'service_type': 'bonjour',
3422            'query': bonjour_query,
3423            'response': bonjour_response}
3424    p2p.AddService(args)
3425
3426    try:
3427        p2p.DeleteService(args)
3428        raise Exception("Invalid DeleteService() accepted")
3429    except dbus.exceptions.DBusException as e:
3430        if "InvalidArgs" not in str(e):
3431            raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
3432
3433    args = {'service_type': 'bonjour',
3434            'query': bonjour_query}
3435    p2p.DeleteService(args)
3436    try:
3437        p2p.DeleteService(args)
3438        raise Exception("Invalid DeleteService() accepted")
3439    except dbus.exceptions.DBusException as e:
3440        if "InvalidArgs" not in str(e):
3441            raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
3442
3443    args = {'service_type': 'upnp',
3444            'version': 0x10,
3445            'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice'}
3446    p2p.AddService(args)
3447    p2p.DeleteService(args)
3448    try:
3449        p2p.DeleteService(args)
3450        raise Exception("Invalid DeleteService() accepted")
3451    except dbus.exceptions.DBusException as e:
3452        if "InvalidArgs" not in str(e):
3453            raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
3454
3455    tests = [{'service_type': 'foo'},
3456             {'service_type': 'foo', 'query': bonjour_query},
3457             {'service_type': 'upnp'},
3458             {'service_type': 'upnp', 'version': 0x10},
3459             {'service_type': 'upnp',
3460              'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice'},
3461             {'version': 0x10,
3462              'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice'},
3463             {'service_type': 'upnp', 'foo': 'bar'},
3464             {'service_type': 'bonjour'},
3465             {'service_type': 'bonjour', 'query': 'foo'},
3466             {'service_type': 'bonjour', 'foo': 'bar'}]
3467    for args in tests:
3468        try:
3469            p2p.DeleteService(args)
3470            raise Exception("Invalid DeleteService() accepted")
3471        except dbus.exceptions.DBusException as e:
3472            if "InvalidArgs" not in str(e):
3473                raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
3474
3475    tests = [{'service_type': 'foo'},
3476             {'service_type': 'upnp'},
3477             {'service_type': 'upnp', 'version': 0x10},
3478             {'service_type': 'upnp',
3479              'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice'},
3480             {'version': 0x10,
3481              'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice'},
3482             {'service_type': 'upnp', 'foo': 'bar'},
3483             {'service_type': 'bonjour'},
3484             {'service_type': 'bonjour', 'query': 'foo'},
3485             {'service_type': 'bonjour', 'response': 'foo'},
3486             {'service_type': 'bonjour', 'query': bonjour_query},
3487             {'service_type': 'bonjour', 'response': bonjour_response},
3488             {'service_type': 'bonjour', 'query': dbus.ByteArray(500*b'a')},
3489             {'service_type': 'bonjour', 'foo': 'bar'}]
3490    for args in tests:
3491        try:
3492            p2p.AddService(args)
3493            raise Exception("Invalid AddService() accepted")
3494        except dbus.exceptions.DBusException as e:
3495            if "InvalidArgs" not in str(e):
3496                raise Exception("Unexpected error message for invalid AddService(): " + str(e))
3497
3498    args = {'tlv': dbus.ByteArray(b"\x02\x00\x00\x01")}
3499    ref = p2p.ServiceDiscoveryRequest(args)
3500    p2p.ServiceDiscoveryCancelRequest(ref)
3501    try:
3502        p2p.ServiceDiscoveryCancelRequest(ref)
3503        raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
3504    except dbus.exceptions.DBusException as e:
3505        if "InvalidArgs" not in str(e):
3506            raise Exception("Unexpected error message for invalid AddService(): " + str(e))
3507    try:
3508        p2p.ServiceDiscoveryCancelRequest(dbus.UInt64(0))
3509        raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
3510    except dbus.exceptions.DBusException as e:
3511        if "InvalidArgs" not in str(e):
3512            raise Exception("Unexpected error message for invalid AddService(): " + str(e))
3513
3514    tests= [{'service_type': 'upnp',
3515             'version': 0x10,
3516             'service': 'ssdp:foo'},
3517            {'service_type': 'upnp',
3518             'version': 0x10,
3519             'service': 'ssdp:bar',
3520             'tlv': dbus.ByteArray(b"\x02\x00\x00\x01")}]
3521    for args in tests:
3522        ref = p2p.ServiceDiscoveryRequest(args)
3523        p2p.ServiceDiscoveryCancelRequest(ref)
3524
3525    tests = [{'service_type': 'foo'},
3526             {'foo': 'bar'},
3527             {'tlv': 'foo'},
3528             {},
3529             {'version': 0},
3530             {'service_type': 'upnp',
3531              'service': 'ssdp:foo'},
3532             {'service_type': 'upnp',
3533              'version': 0x10},
3534             {'service_type': 'upnp',
3535              'version': 0x10,
3536              'service': 'ssdp:foo',
3537              'peer_object': dbus.ObjectPath(path + "/Peers")},
3538             {'service_type': 'upnp',
3539              'version': 0x10,
3540              'service': 'ssdp:foo',
3541              'peer_object': path + "/Peers"},
3542             {'service_type': 'upnp',
3543              'version': 0x10,
3544              'service': 'ssdp:foo',
3545              'peer_object': dbus.ObjectPath(path + "/Peers/00112233445566")}]
3546    for args in tests:
3547        try:
3548            p2p.ServiceDiscoveryRequest(args)
3549            raise Exception("Invalid ServiceDiscoveryRequest accepted")
3550        except dbus.exceptions.DBusException as e:
3551            if "InvalidArgs" not in str(e):
3552                raise Exception("Unexpected error message for invalid ServiceDiscoveryRequest(): " + str(e))
3553
3554    tests = [{'foo': 'bar'},
3555             {'tlvs': dbus.ByteArray(b"\x02\x00\x00\x01"),
3556              'bar': 'foo'}]
3557    for args in tests:
3558        try:
3559            p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv'))
3560            raise Exception("Invalid ServiceDiscoveryResponse accepted")
3561        except dbus.exceptions.DBusException as e:
3562            if "InvalidArgs" not in str(e):
3563                raise Exception("Unexpected error message for invalid ServiceDiscoveryResponse(): " + str(e))
3564
3565def test_dbus_p2p_service_discovery_query(dev, apdev):
3566    """D-Bus P2P service discovery query"""
3567    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
3568    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3569
3570    addr0 = dev[0].p2p_dev_addr()
3571    dev[1].request("P2P_SERVICE_ADD bonjour 0b5f6166706f766572746370c00c000c01 074578616d706c65c027")
3572    dev[1].p2p_listen()
3573    addr1 = dev[1].p2p_dev_addr()
3574
3575    class TestDbusP2p(TestDbus):
3576        def __init__(self, bus):
3577            TestDbus.__init__(self, bus)
3578            self.done = False
3579
3580        def __enter__(self):
3581            gobject.timeout_add(1, self.run_test)
3582            gobject.timeout_add(15000, self.timeout)
3583            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3584                            "DeviceFound")
3585            self.add_signal(self.serviceDiscoveryResponse,
3586                            WPAS_DBUS_IFACE_P2PDEVICE,
3587                            "ServiceDiscoveryResponse", byte_arrays=True)
3588            self.loop.run()
3589            return self
3590
3591        def deviceFound(self, path):
3592            logger.debug("deviceFound: path=%s" % path)
3593            args = {'peer_object': path,
3594                    'tlv': dbus.ByteArray(b"\x02\x00\x00\x01")}
3595            p2p.ServiceDiscoveryRequest(args)
3596
3597        def serviceDiscoveryResponse(self, sd_request):
3598            logger.debug("serviceDiscoveryResponse: sd_request=%s" % str(sd_request))
3599            self.done = True
3600            self.loop.quit()
3601
3602        def run_test(self, *args):
3603            logger.debug("run_test")
3604            p2p.Find(dbus.Dictionary({'DiscoveryType': 'social',
3605                                      'Timeout': dbus.Int32(10)}))
3606            return False
3607
3608        def success(self):
3609            return self.done
3610
3611    with TestDbusP2p(bus) as t:
3612        if not t.success():
3613            raise Exception("Expected signals not seen")
3614
3615    dev[1].p2p_stop_find()
3616
3617def test_dbus_p2p_service_discovery_external(dev, apdev):
3618    """D-Bus P2P service discovery with external response"""
3619    try:
3620        _test_dbus_p2p_service_discovery_external(dev, apdev)
3621    finally:
3622        dev[0].request("P2P_SERV_DISC_EXTERNAL 0")
3623
3624def _test_dbus_p2p_service_discovery_external(dev, apdev):
3625    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
3626    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3627
3628    addr0 = dev[0].p2p_dev_addr()
3629    addr1 = dev[1].p2p_dev_addr()
3630    resp = "0300000101"
3631
3632    dev[1].request("P2P_FLUSH")
3633    dev[1].request("P2P_SERV_DISC_REQ " + addr0 + " 02000001")
3634    dev[1].p2p_find(social=True)
3635
3636    class TestDbusP2p(TestDbus):
3637        def __init__(self, bus):
3638            TestDbus.__init__(self, bus)
3639            self.sd = False
3640
3641        def __enter__(self):
3642            gobject.timeout_add(1, self.run_test)
3643            gobject.timeout_add(15000, self.timeout)
3644            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3645                            "DeviceFound")
3646            self.add_signal(self.serviceDiscoveryRequest,
3647                            WPAS_DBUS_IFACE_P2PDEVICE,
3648                            "ServiceDiscoveryRequest")
3649            self.loop.run()
3650            return self
3651
3652        def deviceFound(self, path):
3653            logger.debug("deviceFound: path=%s" % path)
3654
3655        def serviceDiscoveryRequest(self, sd_request):
3656            logger.debug("serviceDiscoveryRequest: sd_request=%s" % str(sd_request))
3657            self.sd = True
3658            args = {'peer_object': sd_request['peer_object'],
3659                    'frequency': sd_request['frequency'],
3660                    'dialog_token': sd_request['dialog_token'],
3661                    'tlvs': dbus.ByteArray(binascii.unhexlify(resp))}
3662            p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv'))
3663            self.loop.quit()
3664
3665        def run_test(self, *args):
3666            logger.debug("run_test")
3667            p2p.ServiceDiscoveryExternal(1)
3668            p2p.ServiceUpdate()
3669            p2p.Listen(15)
3670            return False
3671
3672        def success(self):
3673            return self.sd
3674
3675    with TestDbusP2p(bus) as t:
3676        if not t.success():
3677            raise Exception("Expected signals not seen")
3678
3679    ev = dev[1].wait_global_event(["P2P-SERV-DISC-RESP"], timeout=5)
3680    if ev is None:
3681        raise Exception("Service discovery timed out")
3682    if addr0 not in ev:
3683        raise Exception("Unexpected address in SD Response: " + ev)
3684    if ev.split(' ')[4] != resp:
3685        raise Exception("Unexpected response data SD Response: " + ev)
3686    dev[1].p2p_stop_find()
3687
3688    p2p.StopFind()
3689    p2p.ServiceDiscoveryExternal(0)
3690
3691def test_dbus_p2p_autogo(dev, apdev):
3692    """D-Bus P2P autonomous GO"""
3693    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
3694    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3695
3696    addr0 = dev[0].p2p_dev_addr()
3697
3698    class TestDbusP2p(TestDbus):
3699        def __init__(self, bus):
3700            TestDbus.__init__(self, bus)
3701            self.first = True
3702            self.waiting_end = False
3703            self.exceptions = False
3704            self.deauthorized = False
3705            self.done = False
3706
3707        def __enter__(self):
3708            gobject.timeout_add(1, self.run_test)
3709            gobject.timeout_add(15000, self.timeout)
3710            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3711                            "DeviceFound")
3712            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3713                            "GroupStarted")
3714            self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3715                            "GroupFinished")
3716            self.add_signal(self.persistentGroupAdded,
3717                            WPAS_DBUS_IFACE_P2PDEVICE,
3718                            "PersistentGroupAdded")
3719            self.add_signal(self.persistentGroupRemoved,
3720                            WPAS_DBUS_IFACE_P2PDEVICE,
3721                            "PersistentGroupRemoved")
3722            self.add_signal(self.provisionDiscoveryRequestDisplayPin,
3723                            WPAS_DBUS_IFACE_P2PDEVICE,
3724                            "ProvisionDiscoveryRequestDisplayPin")
3725            self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3726                            "StaAuthorized")
3727            self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE,
3728                            "StaDeauthorized")
3729            self.loop.run()
3730            return self
3731
3732        def groupStarted(self, properties):
3733            logger.debug("groupStarted: " + str(properties))
3734            self.group = properties['group_object']
3735            self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3736                                           properties['interface_object'])
3737            role = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
3738                                     dbus_interface=dbus.PROPERTIES_IFACE)
3739            if role != "GO":
3740                self.exceptions = True
3741                raise Exception("Unexpected role reported: " + role)
3742            group = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
3743                                      dbus_interface=dbus.PROPERTIES_IFACE)
3744            if group != properties['group_object']:
3745                self.exceptions = True
3746                raise Exception("Unexpected Group reported: " + str(group))
3747            go = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
3748                                   dbus_interface=dbus.PROPERTIES_IFACE)
3749            if go != '/':
3750                self.exceptions = True
3751                raise Exception("Unexpected PeerGO value: " + str(go))
3752            if self.first:
3753                self.first = False
3754                logger.info("Remove persistent group instance")
3755                group_p2p = dbus.Interface(self.g_if_obj,
3756                                           WPAS_DBUS_IFACE_P2PDEVICE)
3757                group_p2p.Disconnect()
3758            else:
3759                dev[1].global_request("P2P_CONNECT " + addr0 + " 12345670 join")
3760
3761        def groupFinished(self, properties):
3762            logger.debug("groupFinished: " + str(properties))
3763            if self.waiting_end:
3764                logger.info("Remove persistent group")
3765                p2p.RemovePersistentGroup(self.persistent)
3766            else:
3767                logger.info("Re-start persistent group")
3768                params = dbus.Dictionary({'persistent_group_object':
3769                                          self.persistent,
3770                                          'frequency': 2412})
3771                p2p.GroupAdd(params)
3772
3773        def persistentGroupAdded(self, path, properties):
3774            logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
3775            self.persistent = path
3776
3777        def persistentGroupRemoved(self, path):
3778            logger.debug("persistentGroupRemoved: %s" % path)
3779            self.done = True
3780            self.loop.quit()
3781
3782        def deviceFound(self, path):
3783            logger.debug("deviceFound: path=%s" % path)
3784            peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3785            self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3786                                        dbus_interface=dbus.PROPERTIES_IFACE,
3787                                        byte_arrays=True)
3788            logger.debug('peer properties: ' + str(self.peer))
3789
3790        def provisionDiscoveryRequestDisplayPin(self, peer_object, pin):
3791            logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin))
3792            self.peer_path = peer_object
3793            peer = binascii.unhexlify(peer_object.split('/')[-1])
3794            addr = ':'.join(["%02x" % i for i in struct.unpack('6B', peer)])
3795
3796            params = {'Role': 'registrar',
3797                      'P2PDeviceAddress': self.peer['DeviceAddress'],
3798                      'Bssid': self.peer['DeviceAddress'],
3799                      'Type': 'pin'}
3800            wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
3801            try:
3802                wps.Start(params)
3803                self.exceptions = True
3804                raise Exception("Invalid WPS.Start() accepted")
3805            except dbus.exceptions.DBusException as e:
3806                if "InvalidArgs" not in str(e):
3807                    self.exceptions = True
3808                    raise Exception("Unexpected error message: " + str(e))
3809            params = {'Role': 'registrar',
3810                      'P2PDeviceAddress': self.peer['DeviceAddress'],
3811                      'Type': 'pin',
3812                      'Pin': '12345670'}
3813            logger.info("Authorize peer to connect to the group")
3814            wps.Start(params)
3815
3816        def staAuthorized(self, name):
3817            logger.debug("staAuthorized: " + name)
3818            peer_obj = bus.get_object(WPAS_DBUS_SERVICE, self.peer_path)
3819            res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3820                                  dbus_interface=dbus.PROPERTIES_IFACE,
3821                                  byte_arrays=True)
3822            logger.debug("Peer properties: " + str(res))
3823            if 'Groups' not in res or len(res['Groups']) != 1:
3824                self.exceptions = True
3825                raise Exception("Unexpected number of peer Groups entries")
3826            if res['Groups'][0] != self.group:
3827                self.exceptions = True
3828                raise Exception("Unexpected peer Groups[0] value")
3829
3830            g_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group)
3831            res = g_obj.GetAll(WPAS_DBUS_GROUP,
3832                               dbus_interface=dbus.PROPERTIES_IFACE,
3833                               byte_arrays=True)
3834            logger.debug("Group properties: " + str(res))
3835            if 'Members' not in res or len(res['Members']) != 1:
3836                self.exceptions = True
3837                raise Exception("Unexpected number of group members")
3838
3839            ext = dbus.ByteArray(b"\x11\x22\x33\x44")
3840            # Earlier implementation of this interface was a bit strange. The
3841            # property is defined to have aay signature and that is what the
3842            # getter returned. However, the setter expected there to be a
3843            # dictionary with 'WPSVendorExtensions' as the key surrounding these
3844            # values.. The current implementations maintains support for that
3845            # for backwards compability reasons. Verify that encoding first.
3846            vals = dbus.Dictionary({'WPSVendorExtensions': [ext]},
3847                                   signature='sv')
3848            g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3849                      dbus_interface=dbus.PROPERTIES_IFACE)
3850            res = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions',
3851                               dbus_interface=dbus.PROPERTIES_IFACE,
3852                               byte_arrays=True)
3853            if len(res) != 1:
3854                self.exceptions = True
3855                raise Exception("Unexpected number of vendor extensions")
3856            if res[0] != ext:
3857                self.exceptions = True
3858                raise Exception("Vendor extension value changed")
3859
3860            # And now verify that the more appropriate encoding is accepted as
3861            # well.
3862            res.append(dbus.ByteArray(b'\xaa\xbb\xcc\xdd\xee\xff'))
3863            g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3864                      dbus_interface=dbus.PROPERTIES_IFACE)
3865            res2 = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions',
3866                             dbus_interface=dbus.PROPERTIES_IFACE,
3867                             byte_arrays=True)
3868            if len(res) != 2:
3869                self.exceptions = True
3870                raise Exception("Unexpected number of vendor extensions")
3871            if res[0] != res2[0] or res[1] != res2[1]:
3872                self.exceptions = True
3873                raise Exception("Vendor extension value changed")
3874
3875            for i in range(10):
3876                res.append(dbus.ByteArray(b'\xaa\xbb'))
3877            try:
3878                g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3879                          dbus_interface=dbus.PROPERTIES_IFACE)
3880                self.exceptions = True
3881                raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3882            except dbus.exceptions.DBusException as e:
3883                if "Error.Failed" not in str(e):
3884                    self.exceptions = True
3885                    raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3886
3887            vals = dbus.Dictionary({'Foo': [ext]}, signature='sv')
3888            try:
3889                g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3890                          dbus_interface=dbus.PROPERTIES_IFACE)
3891                self.exceptions = True
3892                raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3893            except dbus.exceptions.DBusException as e:
3894                if "InvalidArgs" not in str(e):
3895                    self.exceptions = True
3896                    raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3897
3898            vals = ["foo"]
3899            try:
3900                g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3901                          dbus_interface=dbus.PROPERTIES_IFACE)
3902                self.exceptions = True
3903                raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3904            except dbus.exceptions.DBusException as e:
3905                if "Error.Failed" not in str(e):
3906                    self.exceptions = True
3907                    raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3908
3909            vals = [["foo"]]
3910            try:
3911                g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3912                          dbus_interface=dbus.PROPERTIES_IFACE)
3913                self.exceptions = True
3914                raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3915            except dbus.exceptions.DBusException as e:
3916                if "Error.Failed" not in str(e):
3917                    self.exceptions = True
3918                    raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3919
3920            p2p.RemoveClient({'peer': self.peer_path})
3921
3922            self.waiting_end = True
3923
3924            # wait for client to be fully connected
3925            dev[1].wait_connected()
3926            # so we can cleanly disconnect it now
3927            group_p2p = dbus.Interface(self.g_if_obj,
3928                                       WPAS_DBUS_IFACE_P2PDEVICE)
3929            group_p2p.Disconnect()
3930
3931        def staDeauthorized(self, name):
3932            logger.debug("staDeauthorized: " + name)
3933            self.deauthorized = True
3934
3935        def run_test(self, *args):
3936            logger.debug("run_test")
3937            params = dbus.Dictionary({'persistent': True,
3938                                      'frequency': 2412})
3939            logger.info("Add a persistent group")
3940            p2p.GroupAdd(params)
3941            return False
3942
3943        def success(self):
3944            return self.done and self.deauthorized and not self.exceptions
3945
3946    with TestDbusP2p(bus) as t:
3947        if not t.success():
3948            raise Exception("Expected signals not seen")
3949
3950    dev[1].wait_go_ending_session()
3951
3952def test_dbus_p2p_autogo_pbc(dev, apdev):
3953    """D-Bus P2P autonomous GO and PBC"""
3954    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
3955    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3956
3957    addr0 = dev[0].p2p_dev_addr()
3958
3959    class TestDbusP2p(TestDbus):
3960        def __init__(self, bus):
3961            TestDbus.__init__(self, bus)
3962            self.first = True
3963            self.waiting_end = False
3964            self.done = False
3965
3966        def __enter__(self):
3967            gobject.timeout_add(1, self.run_test)
3968            gobject.timeout_add(15000, self.timeout)
3969            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3970                            "DeviceFound")
3971            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3972                            "GroupStarted")
3973            self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3974                            "GroupFinished")
3975            self.add_signal(self.provisionDiscoveryPBCRequest,
3976                            WPAS_DBUS_IFACE_P2PDEVICE,
3977                            "ProvisionDiscoveryPBCRequest")
3978            self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3979                            "StaAuthorized")
3980            self.loop.run()
3981            return self
3982
3983        def groupStarted(self, properties):
3984            logger.debug("groupStarted: " + str(properties))
3985            self.group = properties['group_object']
3986            self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3987                                           properties['interface_object'])
3988            dev[1].global_request("P2P_CONNECT " + addr0 + " pbc join")
3989
3990        def groupFinished(self, properties):
3991            logger.debug("groupFinished: " + str(properties))
3992            self.done = True
3993            self.loop.quit()
3994
3995        def deviceFound(self, path):
3996            logger.debug("deviceFound: path=%s" % path)
3997            peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3998            self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3999                                        dbus_interface=dbus.PROPERTIES_IFACE,
4000                                        byte_arrays=True)
4001            logger.debug('peer properties: ' + str(self.peer))
4002
4003        def provisionDiscoveryPBCRequest(self, peer_object):
4004            logger.debug("provisionDiscoveryPBCRequest - peer=%s" % peer_object)
4005            self.peer_path = peer_object
4006            peer = binascii.unhexlify(peer_object.split('/')[-1])
4007            addr = ':'.join(["%02x" % i for i in struct.unpack('6B', peer)])
4008            params = {'Role': 'registrar',
4009                      'P2PDeviceAddress': self.peer['DeviceAddress'],
4010                      'Type': 'pbc'}
4011            logger.info("Authorize peer to connect to the group")
4012            wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
4013            wps.Start(params)
4014
4015        def staAuthorized(self, name):
4016            logger.debug("staAuthorized: " + name)
4017            # wait for client to be fully connected
4018            dev[1].wait_connected()
4019            # so we can cleanly disconnect it now
4020            group_p2p = dbus.Interface(self.g_if_obj,
4021                                       WPAS_DBUS_IFACE_P2PDEVICE)
4022            group_p2p.Disconnect()
4023
4024        def run_test(self, *args):
4025            logger.debug("run_test")
4026            params = dbus.Dictionary({'frequency': 2412})
4027            p2p.GroupAdd(params)
4028            return False
4029
4030        def success(self):
4031            return self.done
4032
4033    with TestDbusP2p(bus) as t:
4034        if not t.success():
4035            raise Exception("Expected signals not seen")
4036
4037    dev[1].wait_go_ending_session()
4038    dev[1].flush_scan_cache()
4039
4040def test_dbus_p2p_autogo_legacy(dev, apdev):
4041    """D-Bus P2P autonomous GO and legacy STA"""
4042    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
4043    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4044
4045    addr0 = dev[0].p2p_dev_addr()
4046
4047    class TestDbusP2p(TestDbus):
4048        def __init__(self, bus):
4049            TestDbus.__init__(self, bus)
4050            self.done = False
4051
4052        def __enter__(self):
4053            gobject.timeout_add(1, self.run_test)
4054            gobject.timeout_add(15000, self.timeout)
4055            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4056                            "GroupStarted")
4057            self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4058                            "GroupFinished")
4059            self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
4060                            "StaAuthorized")
4061            self.loop.run()
4062            return self
4063
4064        def groupStarted(self, properties):
4065            logger.debug("groupStarted: " + str(properties))
4066            g_obj = bus.get_object(WPAS_DBUS_SERVICE,
4067                                   properties['group_object'])
4068            res = g_obj.GetAll(WPAS_DBUS_GROUP,
4069                               dbus_interface=dbus.PROPERTIES_IFACE,
4070                               byte_arrays=True)
4071            bssid = ':'.join(["%02x" % i for i in struct.unpack('6B', res['BSSID'])])
4072
4073            pin = '12345670'
4074            params = {'Role': 'enrollee',
4075                      'Type': 'pin',
4076                      'Pin': pin}
4077            g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4078                                      properties['interface_object'])
4079            wps = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_WPS)
4080            wps.Start(params)
4081            dev[1].scan_for_bss(bssid, freq=2412)
4082            dev[1].request("WPS_PIN " + bssid + " " + pin)
4083            self.group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4084
4085        def groupFinished(self, properties):
4086            logger.debug("groupFinished: " + str(properties))
4087            self.done = True
4088            self.loop.quit()
4089
4090        def staAuthorized(self, name):
4091            logger.debug("staAuthorized: " + name)
4092            dev[1].request("DISCONNECT")
4093            self.group_p2p.Disconnect()
4094
4095        def run_test(self, *args):
4096            logger.debug("run_test")
4097            params = dbus.Dictionary({'frequency': 2412})
4098            p2p.GroupAdd(params)
4099            return False
4100
4101        def success(self):
4102            return self.done
4103
4104    with TestDbusP2p(bus) as t:
4105        if not t.success():
4106            raise Exception("Expected signals not seen")
4107
4108def test_dbus_p2p_join(dev, apdev):
4109    """D-Bus P2P join an autonomous GO"""
4110    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
4111    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4112
4113    addr1 = dev[1].p2p_dev_addr()
4114    addr2 = dev[2].p2p_dev_addr()
4115    dev[1].p2p_start_go(freq=2412)
4116    dev[2].p2p_listen()
4117
4118    class TestDbusP2p(TestDbus):
4119        def __init__(self, bus):
4120            TestDbus.__init__(self, bus)
4121            self.done = False
4122            self.peer = None
4123            self.go = None
4124
4125        def __enter__(self):
4126            gobject.timeout_add(1, self.run_test)
4127            gobject.timeout_add(15000, self.timeout)
4128            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4129                            "DeviceFound")
4130            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4131                            "GroupStarted")
4132            self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4133                            "GroupFinished")
4134            self.add_signal(self.invitationResult, WPAS_DBUS_IFACE_P2PDEVICE,
4135                            "InvitationResult")
4136            self.loop.run()
4137            return self
4138
4139        def deviceFound(self, path):
4140            logger.debug("deviceFound: path=%s" % path)
4141            peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
4142            res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
4143                                  dbus_interface=dbus.PROPERTIES_IFACE,
4144                                  byte_arrays=True)
4145            logger.debug('peer properties: ' + str(res))
4146            if addr2.replace(':', '') in path:
4147                self.peer = path
4148            elif addr1.replace(':', '') in path:
4149                self.go = path
4150            if self.peer and self.go:
4151                logger.info("Join the group")
4152                p2p.StopFind()
4153                args = {'peer': self.go,
4154                        'join': True,
4155                        'wps_method': 'pin',
4156                        'frequency': 2412}
4157                pin = p2p.Connect(args)
4158
4159                dev[1].group_request("WPS_PIN any " + pin)
4160
4161        def groupStarted(self, properties):
4162            logger.debug("groupStarted: " + str(properties))
4163            g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4164                                      properties['interface_object'])
4165            role = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
4166                                dbus_interface=dbus.PROPERTIES_IFACE)
4167            if role != "client":
4168                raise Exception("Unexpected role reported: " + role)
4169            group = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
4170                                 dbus_interface=dbus.PROPERTIES_IFACE)
4171            if group != properties['group_object']:
4172                raise Exception("Unexpected Group reported: " + str(group))
4173            go = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
4174                              dbus_interface=dbus.PROPERTIES_IFACE)
4175            if go != self.go:
4176                raise Exception("Unexpected PeerGO value: " + str(go))
4177
4178            g_obj = bus.get_object(WPAS_DBUS_SERVICE,
4179                                   properties['group_object'])
4180            res = g_obj.GetAll(WPAS_DBUS_GROUP,
4181                               dbus_interface=dbus.PROPERTIES_IFACE,
4182                               byte_arrays=True)
4183            logger.debug("Group properties: " + str(res))
4184
4185            ext = dbus.ByteArray(b"\x11\x22\x33\x44")
4186            try:
4187                # Set(WPSVendorExtensions) not allowed for P2P Client
4188                g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
4189                          dbus_interface=dbus.PROPERTIES_IFACE)
4190                raise Exception("Invalid Set(WPSVendorExtensions) accepted")
4191            except dbus.exceptions.DBusException as e:
4192                if "Error.Failed: Failed to set property" not in str(e):
4193                    raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
4194
4195            group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4196            args = {'duration1': 30000, 'interval1': 102400,
4197                    'duration2': 20000, 'interval2': 102400}
4198            group_p2p.PresenceRequest(args)
4199
4200            args = {'peer': self.peer}
4201            group_p2p.Invite(args)
4202
4203        def groupFinished(self, properties):
4204            logger.debug("groupFinished: " + str(properties))
4205            self.done = True
4206            self.loop.quit()
4207
4208        def invitationResult(self, result):
4209            logger.debug("invitationResult: " + str(result))
4210            if result['status'] != 1:
4211                raise Exception("Unexpected invitation result: " + str(result))
4212            dev[1].remove_group()
4213
4214        def run_test(self, *args):
4215            logger.debug("run_test")
4216            p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4217            return False
4218
4219        def success(self):
4220            return self.done
4221
4222    with TestDbusP2p(bus) as t:
4223        if not t.success():
4224            raise Exception("Expected signals not seen")
4225
4226    dev[2].p2p_stop_find()
4227
4228def test_dbus_p2p_invitation_received(dev, apdev):
4229    """D-Bus P2P and InvitationReceived"""
4230    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
4231    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4232
4233    form(dev[0], dev[1])
4234    addr0 = dev[0].p2p_dev_addr()
4235    dev[0].p2p_listen()
4236    dev[0].global_request("SET persistent_reconnect 0")
4237
4238    if not dev[1].discover_peer(addr0, social=True):
4239        raise Exception("Peer " + addr0 + " not found")
4240    peer = dev[1].get_peer(addr0)
4241
4242    class TestDbusP2p(TestDbus):
4243        def __init__(self, bus):
4244            TestDbus.__init__(self, bus)
4245            self.done = False
4246
4247        def __enter__(self):
4248            gobject.timeout_add(1, self.run_test)
4249            gobject.timeout_add(15000, self.timeout)
4250            self.add_signal(self.invitationReceived, WPAS_DBUS_IFACE_P2PDEVICE,
4251                            "InvitationReceived")
4252            self.loop.run()
4253            return self
4254
4255        def invitationReceived(self, result):
4256            logger.debug("invitationReceived: " + str(result))
4257            self.done = True
4258            self.loop.quit()
4259
4260        def run_test(self, *args):
4261            logger.debug("run_test")
4262            cmd = "P2P_INVITE persistent=" + peer['persistent'] + " peer=" + addr0
4263            dev[1].global_request(cmd)
4264            return False
4265
4266        def success(self):
4267            return self.done
4268
4269    with TestDbusP2p(bus) as t:
4270        if not t.success():
4271            raise Exception("Expected signals not seen")
4272
4273    dev[0].p2p_stop_find()
4274    dev[1].p2p_stop_find()
4275
4276def test_dbus_p2p_config(dev, apdev):
4277    """D-Bus Get/Set P2PDeviceConfig"""
4278    try:
4279        _test_dbus_p2p_config(dev, apdev)
4280    finally:
4281        dev[0].request("P2P_SET ssid_postfix ")
4282
4283def _test_dbus_p2p_config(dev, apdev):
4284    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
4285    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4286
4287    res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4288                     dbus_interface=dbus.PROPERTIES_IFACE,
4289                     byte_arrays=True)
4290    if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", res,
4291               dbus_interface=dbus.PROPERTIES_IFACE)
4292    res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4293                      dbus_interface=dbus.PROPERTIES_IFACE,
4294                      byte_arrays=True)
4295
4296    if len(res) != len(res2):
4297        raise Exception("Different number of parameters")
4298    for k in res:
4299        if res[k] != res2[k]:
4300            raise Exception("Parameter %s value changes" % k)
4301
4302    changes = {'SsidPostfix': 'foo',
4303               'VendorExtension': [dbus.ByteArray(b'\x11\x22\x33\x44')],
4304               'SecondaryDeviceTypes': [dbus.ByteArray(b'\x11\x22\x33\x44\x55\x66\x77\x88')]}
4305    if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4306               dbus.Dictionary(changes, signature='sv'),
4307               dbus_interface=dbus.PROPERTIES_IFACE)
4308
4309    res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4310                      dbus_interface=dbus.PROPERTIES_IFACE,
4311                      byte_arrays=True)
4312    logger.debug("P2PDeviceConfig: " + str(res2))
4313    if 'VendorExtension' not in res2 or len(res2['VendorExtension']) != 1:
4314        raise Exception("VendorExtension does not match")
4315    if 'SecondaryDeviceTypes' not in res2 or len(res2['SecondaryDeviceTypes']) != 1:
4316        raise Exception("SecondaryDeviceType does not match")
4317
4318    changes = {'SsidPostfix': '',
4319               'VendorExtension': dbus.Array([], signature="ay"),
4320               'SecondaryDeviceTypes': dbus.Array([], signature="ay")}
4321    if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4322               dbus.Dictionary(changes, signature='sv'),
4323               dbus_interface=dbus.PROPERTIES_IFACE)
4324
4325    res3 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4326                      dbus_interface=dbus.PROPERTIES_IFACE,
4327                      byte_arrays=True)
4328    logger.debug("P2PDeviceConfig: " + str(res3))
4329    if 'VendorExtension' in res3:
4330        raise Exception("VendorExtension not removed")
4331    if 'SecondaryDeviceTypes' in res3:
4332        raise Exception("SecondaryDeviceType not removed")
4333
4334    try:
4335        dev[0].request("P2P_SET disabled 1")
4336        if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4337                   dbus_interface=dbus.PROPERTIES_IFACE,
4338                   byte_arrays=True)
4339        raise Exception("Invalid Get(P2PDeviceConfig) accepted")
4340    except dbus.exceptions.DBusException as e:
4341        if "Error.Failed: P2P is not available for this interface" not in str(e):
4342            raise Exception("Unexpected error message for invalid Invite: " + str(e))
4343    finally:
4344        dev[0].request("P2P_SET disabled 0")
4345
4346    try:
4347        dev[0].request("P2P_SET disabled 1")
4348        changes = {'SsidPostfix': 'foo'}
4349        if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4350                   dbus.Dictionary(changes, signature='sv'),
4351                   dbus_interface=dbus.PROPERTIES_IFACE)
4352        raise Exception("Invalid Set(P2PDeviceConfig) accepted")
4353    except dbus.exceptions.DBusException as e:
4354        if "Error.Failed: P2P is not available for this interface" not in str(e):
4355            raise Exception("Unexpected error message for invalid Invite: " + str(e))
4356    finally:
4357        dev[0].request("P2P_SET disabled 0")
4358
4359    tests = [{'DeviceName': 123},
4360             {'SsidPostfix': 123},
4361             {'Foo': 'Bar'}]
4362    for changes in tests:
4363        try:
4364            if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4365                       dbus.Dictionary(changes, signature='sv'),
4366                       dbus_interface=dbus.PROPERTIES_IFACE)
4367            raise Exception("Invalid Set(P2PDeviceConfig) accepted")
4368        except dbus.exceptions.DBusException as e:
4369            if "InvalidArgs" not in str(e):
4370                raise Exception("Unexpected error message for invalid Invite: " + str(e))
4371
4372def test_dbus_p2p_persistent(dev, apdev):
4373    """D-Bus P2P persistent group"""
4374    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
4375    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4376
4377    class TestDbusP2p(TestDbus):
4378        def __init__(self, bus):
4379            TestDbus.__init__(self, bus)
4380
4381        def __enter__(self):
4382            gobject.timeout_add(1, self.run_test)
4383            gobject.timeout_add(15000, self.timeout)
4384            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4385                            "GroupStarted")
4386            self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4387                            "GroupFinished")
4388            self.add_signal(self.persistentGroupAdded,
4389                            WPAS_DBUS_IFACE_P2PDEVICE,
4390                            "PersistentGroupAdded")
4391            self.loop.run()
4392            return self
4393
4394        def groupStarted(self, properties):
4395            logger.debug("groupStarted: " + str(properties))
4396            g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4397                                      properties['interface_object'])
4398            group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4399            group_p2p.Disconnect()
4400
4401        def groupFinished(self, properties):
4402            logger.debug("groupFinished: " + str(properties))
4403            self.loop.quit()
4404
4405        def persistentGroupAdded(self, path, properties):
4406            logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
4407            self.persistent = path
4408
4409        def run_test(self, *args):
4410            logger.debug("run_test")
4411            params = dbus.Dictionary({'persistent': True,
4412                                      'frequency': 2412})
4413            logger.info("Add a persistent group")
4414            p2p.GroupAdd(params)
4415            return False
4416
4417        def success(self):
4418            return True
4419
4420    with TestDbusP2p(bus) as t:
4421        if not t.success():
4422            raise Exception("Expected signals not seen")
4423        persistent = t.persistent
4424
4425    p_obj = bus.get_object(WPAS_DBUS_SERVICE, persistent)
4426    res = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties",
4427                    dbus_interface=dbus.PROPERTIES_IFACE, byte_arrays=True)
4428    logger.info("Persistent group Properties: " + str(res))
4429    vals = dbus.Dictionary({'ssid': 'DIRECT-foo'}, signature='sv')
4430    p_obj.Set(WPAS_DBUS_PERSISTENT_GROUP, "Properties", vals,
4431              dbus_interface=dbus.PROPERTIES_IFACE)
4432    res2 = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties",
4433                     dbus_interface=dbus.PROPERTIES_IFACE)
4434    if len(res) != len(res2):
4435        raise Exception("Different number of parameters")
4436    for k in res:
4437        if k != 'ssid' and res[k] != res2[k]:
4438            raise Exception("Parameter %s value changes" % k)
4439    if res2['ssid'] != '"DIRECT-foo"':
4440        raise Exception("Unexpected ssid")
4441
4442    args = dbus.Dictionary({'ssid': 'DIRECT-testing',
4443                            'psk': '1234567890'}, signature='sv')
4444    group = p2p.AddPersistentGroup(args)
4445
4446    groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups",
4447                        dbus_interface=dbus.PROPERTIES_IFACE)
4448    if len(groups) != 2:
4449        raise Exception("Unexpected number of persistent groups: " + str(groups))
4450
4451    p2p.RemoveAllPersistentGroups()
4452
4453    groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups",
4454                        dbus_interface=dbus.PROPERTIES_IFACE)
4455    if len(groups) != 0:
4456        raise Exception("Unexpected number of persistent groups: " + str(groups))
4457
4458    try:
4459        p2p.RemovePersistentGroup(persistent)
4460        raise Exception("Invalid RemovePersistentGroup accepted")
4461    except dbus.exceptions.DBusException as e:
4462        if "NetworkUnknown: There is no such persistent group" not in str(e):
4463            raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e))
4464
4465def test_dbus_p2p_reinvoke_persistent(dev, apdev):
4466    """D-Bus P2P reinvoke persistent group"""
4467    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
4468    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4469
4470    addr0 = dev[0].p2p_dev_addr()
4471
4472    class TestDbusP2p(TestDbus):
4473        def __init__(self, bus):
4474            TestDbus.__init__(self, bus)
4475            self.first = True
4476            self.waiting_end = False
4477            self.done = False
4478            self.invited = False
4479
4480        def __enter__(self):
4481            gobject.timeout_add(1, self.run_test)
4482            gobject.timeout_add(15000, self.timeout)
4483            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4484                            "DeviceFound")
4485            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4486                            "GroupStarted")
4487            self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4488                            "GroupFinished")
4489            self.add_signal(self.persistentGroupAdded,
4490                            WPAS_DBUS_IFACE_P2PDEVICE,
4491                            "PersistentGroupAdded")
4492            self.add_signal(self.provisionDiscoveryRequestDisplayPin,
4493                            WPAS_DBUS_IFACE_P2PDEVICE,
4494                            "ProvisionDiscoveryRequestDisplayPin")
4495            self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
4496                            "StaAuthorized")
4497            self.loop.run()
4498            return self
4499
4500        def groupStarted(self, properties):
4501            logger.debug("groupStarted: " + str(properties))
4502            self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4503                                           properties['interface_object'])
4504            if not self.invited:
4505                g_obj = bus.get_object(WPAS_DBUS_SERVICE,
4506                                       properties['group_object'])
4507                res = g_obj.GetAll(WPAS_DBUS_GROUP,
4508                                   dbus_interface=dbus.PROPERTIES_IFACE,
4509                                   byte_arrays=True)
4510                bssid = ':'.join(["%02x" % i for i in struct.unpack('6B', res['BSSID'])])
4511                dev[1].scan_for_bss(bssid, freq=2412)
4512                dev[1].global_request("P2P_CONNECT " + addr0 + " 12345670 join")
4513
4514        def groupFinished(self, properties):
4515            logger.debug("groupFinished: " + str(properties))
4516            if self.invited:
4517                self.done = True
4518                self.loop.quit()
4519            else:
4520                dev[1].global_request("SET persistent_reconnect 1")
4521                dev[1].p2p_listen()
4522
4523                args = {'persistent_group_object': dbus.ObjectPath(path),
4524                        'peer': self.peer_path}
4525                try:
4526                    pin = p2p.Invite(args)
4527                    raise Exception("Invalid Invite accepted")
4528                except dbus.exceptions.DBusException as e:
4529                    if "InvalidArgs" not in str(e):
4530                        raise Exception("Unexpected error message for invalid Invite: " + str(e))
4531
4532                args = {'persistent_group_object': self.persistent,
4533                        'peer': self.peer_path}
4534                pin = p2p.Invite(args)
4535                self.invited = True
4536
4537                self.sta_group_ev = dev[1].wait_global_event(["P2P-GROUP-STARTED"],
4538                                                             timeout=15)
4539                if self.sta_group_ev is None:
4540                    raise Exception("P2P-GROUP-STARTED event not seen")
4541
4542        def persistentGroupAdded(self, path, properties):
4543            logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
4544            self.persistent = path
4545
4546        def deviceFound(self, path):
4547            logger.debug("deviceFound: path=%s" % path)
4548            peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
4549            self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
4550                                        dbus_interface=dbus.PROPERTIES_IFACE,
4551                                        byte_arrays=True)
4552
4553        def provisionDiscoveryRequestDisplayPin(self, peer_object, pin):
4554            logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin))
4555            self.peer_path = peer_object
4556            peer = binascii.unhexlify(peer_object.split('/')[-1])
4557            addr = ':'.join(["%02x" % i for i in struct.unpack('6B', peer)])
4558            params = {'Role': 'registrar',
4559                      'P2PDeviceAddress': self.peer['DeviceAddress'],
4560                      'Bssid': self.peer['DeviceAddress'],
4561                      'Type': 'pin',
4562                      'Pin': '12345670'}
4563            logger.info("Authorize peer to connect to the group")
4564            wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
4565            wps.Start(params)
4566            self.sta_group_ev = dev[1].wait_global_event(["P2P-GROUP-STARTED"],
4567                                                         timeout=15)
4568            if self.sta_group_ev is None:
4569                raise Exception("P2P-GROUP-STARTED event not seen")
4570
4571        def staAuthorized(self, name):
4572            logger.debug("staAuthorized: " + name)
4573            dev[1].group_form_result(self.sta_group_ev)
4574            dev[1].remove_group()
4575            ev = dev[1].wait_global_event(["P2P-GROUP-REMOVED"], timeout=10)
4576            if ev is None:
4577                raise Exception("Group removal timed out")
4578            group_p2p = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4579            group_p2p.Disconnect()
4580
4581        def run_test(self, *args):
4582            logger.debug("run_test")
4583            params = dbus.Dictionary({'persistent': True,
4584                                      'frequency': 2412})
4585            logger.info("Add a persistent group")
4586            p2p.GroupAdd(params)
4587            return False
4588
4589        def success(self):
4590            return self.done
4591
4592    with TestDbusP2p(bus) as t:
4593        if not t.success():
4594            raise Exception("Expected signals not seen")
4595
4596def test_dbus_p2p_go_neg_rx(dev, apdev):
4597    """D-Bus P2P GO Negotiation receive"""
4598    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
4599    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4600    addr0 = dev[0].p2p_dev_addr()
4601
4602    class TestDbusP2p(TestDbus):
4603        def __init__(self, bus):
4604            TestDbus.__init__(self, bus)
4605            self.done = False
4606
4607        def __enter__(self):
4608            gobject.timeout_add(1, self.run_test)
4609            gobject.timeout_add(15000, self.timeout)
4610            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4611                            "DeviceFound")
4612            self.add_signal(self.goNegotiationRequest,
4613                            WPAS_DBUS_IFACE_P2PDEVICE,
4614                            "GONegotiationRequest",
4615                            byte_arrays=True)
4616            self.add_signal(self.goNegotiationSuccess,
4617                            WPAS_DBUS_IFACE_P2PDEVICE,
4618                            "GONegotiationSuccess",
4619                            byte_arrays=True)
4620            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4621                            "GroupStarted")
4622            self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4623                            "GroupFinished")
4624            self.loop.run()
4625            return self
4626
4627        def deviceFound(self, path):
4628            logger.debug("deviceFound: path=%s" % path)
4629
4630        def goNegotiationRequest(self, path, dev_passwd_id, go_intent=0):
4631            logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path, dev_passwd_id, go_intent))
4632            if dev_passwd_id != 1:
4633                raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id)
4634            args = {'peer': path, 'wps_method': 'display', 'pin': '12345670',
4635                    'go_intent': 15, 'persistent': False, 'frequency': 5175}
4636            try:
4637                p2p.Connect(args)
4638                raise Exception("Invalid Connect accepted")
4639            except dbus.exceptions.DBusException as e:
4640                if "ConnectChannelUnsupported" not in str(e):
4641                    raise Exception("Unexpected error message for invalid Connect: " + str(e))
4642
4643            args = {'peer': path, 'wps_method': 'display', 'pin': '12345670',
4644                    'go_intent': 15, 'persistent': False}
4645            p2p.Connect(args)
4646
4647        def goNegotiationSuccess(self, properties):
4648            logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4649
4650        def groupStarted(self, properties):
4651            logger.debug("groupStarted: " + str(properties))
4652            g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4653                                      properties['interface_object'])
4654            group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4655            group_p2p.Disconnect()
4656
4657        def groupFinished(self, properties):
4658            logger.debug("groupFinished: " + str(properties))
4659            self.done = True
4660            self.loop.quit()
4661
4662        def run_test(self, *args):
4663            logger.debug("run_test")
4664            p2p.Listen(10)
4665            if not dev[1].discover_peer(addr0):
4666                raise Exception("Peer not found")
4667            dev[1].global_request("P2P_CONNECT " + addr0 + " 12345670 enter")
4668            return False
4669
4670        def success(self):
4671            return self.done
4672
4673    with TestDbusP2p(bus) as t:
4674        if not t.success():
4675            raise Exception("Expected signals not seen")
4676
4677def test_dbus_p2p_go_neg_auth(dev, apdev):
4678    """D-Bus P2P GO Negotiation authorized"""
4679    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
4680    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4681    addr0 = dev[0].p2p_dev_addr()
4682    dev[1].p2p_listen()
4683
4684    class TestDbusP2p(TestDbus):
4685        def __init__(self, bus):
4686            TestDbus.__init__(self, bus)
4687            self.done = False
4688            self.peer_joined = False
4689            self.peer_disconnected = False
4690
4691        def __enter__(self):
4692            gobject.timeout_add(1, self.run_test)
4693            gobject.timeout_add(15000, self.timeout)
4694            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4695                            "DeviceFound")
4696            self.add_signal(self.goNegotiationSuccess,
4697                            WPAS_DBUS_IFACE_P2PDEVICE,
4698                            "GONegotiationSuccess",
4699                            byte_arrays=True)
4700            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4701                            "GroupStarted")
4702            self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4703                            "GroupFinished")
4704            self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE,
4705                            "StaDeauthorized")
4706            self.add_signal(self.peerJoined, WPAS_DBUS_GROUP,
4707                            "PeerJoined")
4708            self.add_signal(self.peerDisconnected, WPAS_DBUS_GROUP,
4709                            "PeerDisconnected")
4710            self.loop.run()
4711            return self
4712
4713        def deviceFound(self, path):
4714            logger.debug("deviceFound: path=%s" % path)
4715            args = {'peer': path, 'wps_method': 'keypad',
4716                    'go_intent': 15, 'authorize_only': True}
4717            try:
4718                p2p.Connect(args)
4719                raise Exception("Invalid Connect accepted")
4720            except dbus.exceptions.DBusException as e:
4721                if "InvalidArgs" not in str(e):
4722                    raise Exception("Unexpected error message for invalid Connect: " + str(e))
4723
4724            args = {'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4725                    'go_intent': 15, 'authorize_only': True}
4726            p2p.Connect(args)
4727            p2p.Listen(10)
4728            if not dev[1].discover_peer(addr0):
4729                raise Exception("Peer not found")
4730            dev[1].global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=0")
4731            ev = dev[1].wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
4732            if ev is None:
4733                raise Exception("Group formation timed out")
4734            self.sta_group_ev = ev
4735
4736        def goNegotiationSuccess(self, properties):
4737            logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4738
4739        def groupStarted(self, properties):
4740            logger.debug("groupStarted: " + str(properties))
4741            self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4742                                           properties['interface_object'])
4743            dev[1].group_form_result(self.sta_group_ev)
4744            dev[1].remove_group()
4745
4746        def staDeauthorized(self, name):
4747            logger.debug("staDeuthorized: " + name)
4748            group_p2p = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4749            group_p2p.Disconnect()
4750
4751        def peerJoined(self, peer):
4752            logger.debug("peerJoined: " + peer)
4753            self.peer_joined = True
4754
4755        def peerDisconnected(self, peer):
4756            logger.debug("peerDisconnected: " + peer)
4757            self.peer_disconnected = True
4758
4759        def groupFinished(self, properties):
4760            logger.debug("groupFinished: " + str(properties))
4761            self.done = True
4762            self.loop.quit()
4763
4764        def run_test(self, *args):
4765            logger.debug("run_test")
4766            p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4767            return False
4768
4769        def success(self):
4770            return self.done and self.peer_joined and self.peer_disconnected
4771
4772    with TestDbusP2p(bus) as t:
4773        if not t.success():
4774            raise Exception("Expected signals not seen")
4775
4776def test_dbus_p2p_go_neg_init(dev, apdev):
4777    """D-Bus P2P GO Negotiation initiation"""
4778    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
4779    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4780    addr0 = dev[0].p2p_dev_addr()
4781    dev[1].p2p_listen()
4782
4783    class TestDbusP2p(TestDbus):
4784        def __init__(self, bus):
4785            TestDbus.__init__(self, bus)
4786            self.done = False
4787            self.peer_group_added = False
4788            self.peer_group_removed = False
4789
4790        def __enter__(self):
4791            gobject.timeout_add(1, self.run_test)
4792            gobject.timeout_add(15000, self.timeout)
4793            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4794                            "DeviceFound")
4795            self.add_signal(self.goNegotiationSuccess,
4796                            WPAS_DBUS_IFACE_P2PDEVICE,
4797                            "GONegotiationSuccess",
4798                            byte_arrays=True)
4799            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4800                            "GroupStarted")
4801            self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4802                            "GroupFinished")
4803            self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4804                            "PropertiesChanged")
4805            self.loop.run()
4806            return self
4807
4808        def deviceFound(self, path):
4809            logger.debug("deviceFound: path=%s" % path)
4810            args = {'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4811                    'go_intent': 0}
4812            p2p.Connect(args)
4813
4814            ev = dev[1].wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4815            if ev is None:
4816                raise Exception("Timeout while waiting for GO Neg Request")
4817            dev[1].global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
4818            ev = dev[1].wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
4819            if ev is None:
4820                raise Exception("Group formation timed out")
4821            self.sta_group_ev = ev
4822
4823        def goNegotiationSuccess(self, properties):
4824            logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4825
4826        def groupStarted(self, properties):
4827            logger.debug("groupStarted: " + str(properties))
4828            g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4829                                      properties['interface_object'])
4830            group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4831            group_p2p.Disconnect()
4832            dev[1].group_form_result(self.sta_group_ev)
4833            dev[1].remove_group()
4834
4835        def groupFinished(self, properties):
4836            logger.debug("groupFinished: " + str(properties))
4837            self.done = True
4838
4839        def propertiesChanged(self, interface_name, changed_properties,
4840                              invalidated_properties):
4841            logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4842            if interface_name != WPAS_DBUS_P2P_PEER:
4843                return
4844            if "Groups" not in changed_properties:
4845                return
4846            if len(changed_properties["Groups"]) > 0:
4847                self.peer_group_added = True
4848            if len(changed_properties["Groups"]) == 0:
4849                if not self.peer_group_added:
4850                    # This is likely a leftover event from an earlier test case,
4851                    # ignore it to allow this test case to go through its steps.
4852                    logger.info("Ignore propertiesChanged indicating group removal before group has been added")
4853                    return
4854                self.peer_group_removed = True
4855                self.loop.quit()
4856
4857        def run_test(self, *args):
4858            logger.debug("run_test")
4859            p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4860            return False
4861
4862        def success(self):
4863            return self.done and self.peer_group_added and self.peer_group_removed
4864
4865    with TestDbusP2p(bus) as t:
4866        if not t.success():
4867            raise Exception("Expected signals not seen")
4868
4869def test_dbus_p2p_group_termination_by_go(dev, apdev):
4870    """D-Bus P2P group removal on GO terminating the group"""
4871    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
4872    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4873    addr0 = dev[0].p2p_dev_addr()
4874    dev[1].p2p_listen()
4875
4876    class TestDbusP2p(TestDbus):
4877        def __init__(self, bus):
4878            TestDbus.__init__(self, bus)
4879            self.done = False
4880            self.peer_group_added = False
4881            self.peer_group_removed = False
4882
4883        def __enter__(self):
4884            gobject.timeout_add(1, self.run_test)
4885            gobject.timeout_add(15000, self.timeout)
4886            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4887                            "DeviceFound")
4888            self.add_signal(self.goNegotiationSuccess,
4889                            WPAS_DBUS_IFACE_P2PDEVICE,
4890                            "GONegotiationSuccess",
4891                            byte_arrays=True)
4892            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4893                            "GroupStarted")
4894            self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4895                            "GroupFinished")
4896            self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4897                            "PropertiesChanged")
4898            self.loop.run()
4899            return self
4900
4901        def deviceFound(self, path):
4902            logger.debug("deviceFound: path=%s" % path)
4903            args = {'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4904                    'go_intent': 0}
4905            p2p.Connect(args)
4906
4907            ev = dev[1].wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4908            if ev is None:
4909                raise Exception("Timeout while waiting for GO Neg Request")
4910            dev[1].global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
4911            ev = dev[1].wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
4912            if ev is None:
4913                raise Exception("Group formation timed out")
4914            self.sta_group_ev = ev
4915
4916        def goNegotiationSuccess(self, properties):
4917            logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4918
4919        def groupStarted(self, properties):
4920            logger.debug("groupStarted: " + str(properties))
4921            g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4922                                      properties['interface_object'])
4923            dev[1].group_form_result(self.sta_group_ev)
4924            dev[1].remove_group()
4925
4926        def groupFinished(self, properties):
4927            logger.debug("groupFinished: " + str(properties))
4928            self.done = True
4929
4930        def propertiesChanged(self, interface_name, changed_properties,
4931                              invalidated_properties):
4932            logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4933            if interface_name != WPAS_DBUS_P2P_PEER:
4934                return
4935            if "Groups" not in changed_properties:
4936                return
4937            if len(changed_properties["Groups"]) > 0:
4938                self.peer_group_added = True
4939            if len(changed_properties["Groups"]) == 0 and self.peer_group_added:
4940                self.peer_group_removed = True
4941                self.loop.quit()
4942
4943        def run_test(self, *args):
4944            logger.debug("run_test")
4945            p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4946            return False
4947
4948        def success(self):
4949            return self.done and self.peer_group_added and self.peer_group_removed
4950
4951    with TestDbusP2p(bus) as t:
4952        if not t.success():
4953            raise Exception("Expected signals not seen")
4954
4955def test_dbus_p2p_group_idle_timeout(dev, apdev):
4956    """D-Bus P2P group removal on idle timeout"""
4957    try:
4958        dev[0].global_request("SET p2p_group_idle 1")
4959        _test_dbus_p2p_group_idle_timeout(dev, apdev)
4960    finally:
4961        dev[0].global_request("SET p2p_group_idle 0")
4962
4963def _test_dbus_p2p_group_idle_timeout(dev, apdev):
4964    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
4965    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4966    addr0 = dev[0].p2p_dev_addr()
4967    dev[1].p2p_listen()
4968
4969    class TestDbusP2p(TestDbus):
4970        def __init__(self, bus):
4971            TestDbus.__init__(self, bus)
4972            self.done = False
4973            self.group_started = False
4974            self.peer_group_added = False
4975            self.peer_group_removed = False
4976
4977        def __enter__(self):
4978            gobject.timeout_add(1, self.run_test)
4979            gobject.timeout_add(15000, self.timeout)
4980            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4981                            "DeviceFound")
4982            self.add_signal(self.goNegotiationSuccess,
4983                            WPAS_DBUS_IFACE_P2PDEVICE,
4984                            "GONegotiationSuccess",
4985                            byte_arrays=True)
4986            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4987                            "GroupStarted")
4988            self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4989                            "GroupFinished")
4990            self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4991                            "PropertiesChanged")
4992            self.loop.run()
4993            return self
4994
4995        def deviceFound(self, path):
4996            logger.debug("deviceFound: path=%s" % path)
4997            args = {'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4998                    'go_intent': 0}
4999            p2p.Connect(args)
5000
5001            ev = dev[1].wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
5002            if ev is None:
5003                raise Exception("Timeout while waiting for GO Neg Request")
5004            dev[1].global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
5005            ev = dev[1].wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
5006            if ev is None:
5007                raise Exception("Group formation timed out")
5008            self.sta_group_ev = ev
5009
5010        def goNegotiationSuccess(self, properties):
5011            logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
5012
5013        def groupStarted(self, properties):
5014            logger.debug("groupStarted: " + str(properties))
5015            self.group_started = True
5016            g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
5017                                      properties['interface_object'])
5018            dev[1].group_form_result(self.sta_group_ev)
5019            ifaddr = dev[1].group_request("STA-FIRST").splitlines()[0]
5020            # Force disassociation with different reason code so that the
5021            # P2P Client using D-Bus does not get normal group termination event
5022            # from the GO.
5023            dev[1].group_request("DEAUTHENTICATE " + ifaddr + " reason=0 test=0")
5024            dev[1].remove_group()
5025
5026        def groupFinished(self, properties):
5027            logger.debug("groupFinished: " + str(properties))
5028            self.done = True
5029
5030        def propertiesChanged(self, interface_name, changed_properties,
5031                              invalidated_properties):
5032            logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
5033            if interface_name != WPAS_DBUS_P2P_PEER:
5034                return
5035            if not self.group_started:
5036                return
5037            if "Groups" not in changed_properties:
5038                return
5039            if len(changed_properties["Groups"]) > 0:
5040                self.peer_group_added = True
5041            if len(changed_properties["Groups"]) == 0:
5042                self.peer_group_removed = True
5043                self.loop.quit()
5044
5045        def run_test(self, *args):
5046            logger.debug("run_test")
5047            p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
5048            return False
5049
5050        def success(self):
5051            return self.done and self.peer_group_added and self.peer_group_removed
5052
5053    with TestDbusP2p(bus) as t:
5054        if not t.success():
5055            raise Exception("Expected signals not seen")
5056
5057def test_dbus_p2p_wps_failure(dev, apdev):
5058    """D-Bus P2P WPS failure"""
5059    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5060    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
5061    addr0 = dev[0].p2p_dev_addr()
5062
5063    class TestDbusP2p(TestDbus):
5064        def __init__(self, bus):
5065            TestDbus.__init__(self, bus)
5066            self.wps_failed = False
5067            self.formation_failure = False
5068
5069        def __enter__(self):
5070            gobject.timeout_add(1, self.run_test)
5071            gobject.timeout_add(15000, self.timeout)
5072            self.add_signal(self.goNegotiationRequest,
5073                            WPAS_DBUS_IFACE_P2PDEVICE,
5074                            "GONegotiationRequest",
5075                            byte_arrays=True)
5076            self.add_signal(self.goNegotiationSuccess,
5077                            WPAS_DBUS_IFACE_P2PDEVICE,
5078                            "GONegotiationSuccess",
5079                            byte_arrays=True)
5080            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
5081                            "GroupStarted")
5082            self.add_signal(self.wpsFailed, WPAS_DBUS_IFACE_P2PDEVICE,
5083                            "WpsFailed")
5084            self.add_signal(self.groupFormationFailure,
5085                            WPAS_DBUS_IFACE_P2PDEVICE,
5086                            "GroupFormationFailure")
5087            self.loop.run()
5088            return self
5089
5090        def goNegotiationRequest(self, path, dev_passwd_id, go_intent=0):
5091            logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path, dev_passwd_id, go_intent))
5092            if dev_passwd_id != 1:
5093                raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id)
5094            args = {'peer': path, 'wps_method': 'display', 'pin': '12345670',
5095                    'go_intent': 15}
5096            p2p.Connect(args)
5097
5098        def goNegotiationSuccess(self, properties):
5099            logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
5100
5101        def groupStarted(self, properties):
5102            logger.debug("groupStarted: " + str(properties))
5103            raise Exception("Unexpected GroupStarted")
5104
5105        def wpsFailed(self, name, args):
5106            logger.debug("wpsFailed - name=%s args=%s" % (name, str(args)))
5107            self.wps_failed = True
5108            if self.formation_failure:
5109                self.loop.quit()
5110
5111        def groupFormationFailure(self, reason):
5112            logger.debug("groupFormationFailure - reason=%s" % reason)
5113            self.formation_failure = True
5114            if self.wps_failed:
5115                self.loop.quit()
5116
5117        def run_test(self, *args):
5118            logger.debug("run_test")
5119            p2p.Listen(10)
5120            if not dev[1].discover_peer(addr0):
5121                raise Exception("Peer not found")
5122            dev[1].global_request("P2P_CONNECT " + addr0 + " 87654321 enter")
5123            return False
5124
5125        def success(self):
5126            return self.wps_failed and self.formation_failure
5127
5128    with TestDbusP2p(bus) as t:
5129        if not t.success():
5130            raise Exception("Expected signals not seen")
5131
5132def test_dbus_p2p_two_groups(dev, apdev):
5133    """D-Bus P2P with two concurrent groups"""
5134    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5135    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
5136
5137    dev[0].request("SET p2p_no_group_iface 0")
5138    addr0 = dev[0].p2p_dev_addr()
5139    addr1 = dev[1].p2p_dev_addr()
5140    addr2 = dev[2].p2p_dev_addr()
5141    dev[1].p2p_start_go(freq=2412)
5142
5143    class TestDbusP2p(TestDbus):
5144        def __init__(self, bus):
5145            TestDbus.__init__(self, bus)
5146            self.done = False
5147            self.peer = None
5148            self.go = None
5149            self.group1 = None
5150            self.group2 = None
5151            self.groups_removed = False
5152
5153        def __enter__(self):
5154            gobject.timeout_add(1, self.run_test)
5155            gobject.timeout_add(15000, self.timeout)
5156            self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
5157                            "PropertiesChanged", byte_arrays=True)
5158            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
5159                            "DeviceFound")
5160            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
5161                            "GroupStarted")
5162            self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
5163                            "GroupFinished")
5164            self.add_signal(self.peerJoined, WPAS_DBUS_GROUP,
5165                            "PeerJoined")
5166            self.loop.run()
5167            return self
5168
5169        def propertiesChanged(self, interface_name, changed_properties,
5170                              invalidated_properties):
5171            logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
5172
5173        def deviceFound(self, path):
5174            logger.debug("deviceFound: path=%s" % path)
5175            if addr2.replace(':', '') in path:
5176                self.peer = path
5177            elif addr1.replace(':', '') in path:
5178                self.go = path
5179            if self.go and not self.group1:
5180                logger.info("Join the group")
5181                p2p.StopFind()
5182                pin = '12345670'
5183                dev[1].group_request("WPS_PIN any " + pin)
5184                args = {'peer': self.go,
5185                        'join': True,
5186                        'wps_method': 'pin',
5187                        'pin': pin,
5188                        'frequency': 2412}
5189                p2p.Connect(args)
5190
5191        def groupStarted(self, properties):
5192            logger.debug("groupStarted: " + str(properties))
5193            prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
5194                                 dbus_interface=dbus.PROPERTIES_IFACE)
5195            logger.debug("p2pdevice properties: " + str(prop))
5196
5197            g_obj = bus.get_object(WPAS_DBUS_SERVICE,
5198                                   properties['group_object'])
5199            res = g_obj.GetAll(WPAS_DBUS_GROUP,
5200                               dbus_interface=dbus.PROPERTIES_IFACE,
5201                               byte_arrays=True)
5202            logger.debug("Group properties: " + str(res))
5203
5204            if not self.group1:
5205                self.group1 = properties['group_object']
5206                self.group1iface = properties['interface_object']
5207                self.g1_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
5208                                                self.group1iface)
5209
5210                logger.info("Start autonomous GO")
5211                params = dbus.Dictionary({'frequency': 2412})
5212                p2p.GroupAdd(params)
5213            elif not self.group2:
5214                self.group2 = properties['group_object']
5215                self.group2iface = properties['interface_object']
5216                self.g2_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
5217                                                self.group2iface)
5218                self.g2_bssid = res['BSSID']
5219
5220            if self.group1 and self.group2:
5221                logger.info("Authorize peer to join the group")
5222                a2 = binascii.unhexlify(addr2.replace(':', ''))
5223                params = {'Role': 'enrollee',
5224                          'P2PDeviceAddress': dbus.ByteArray(a2),
5225                          'Bssid': dbus.ByteArray(a2),
5226                          'Type': 'pin',
5227                          'Pin': '12345670'}
5228                g_wps = dbus.Interface(self.g2_if_obj, WPAS_DBUS_IFACE_WPS)
5229                g_wps.Start(params)
5230
5231                bssid = ':'.join(["%02x" % i for i in struct.unpack('6B', self.g2_bssid)])
5232                dev[2].scan_for_bss(bssid, freq=2412)
5233                dev[2].global_request("P2P_CONNECT " + bssid + " 12345670 join freq=2412")
5234                ev = dev[2].wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
5235                if ev is None:
5236                    raise Exception("Group join timed out")
5237                dev[2].group_form_result(ev)
5238
5239        def groupFinished(self, properties):
5240            logger.debug("groupFinished: " + str(properties))
5241
5242            if self.group1 == properties['group_object']:
5243                self.group1 = None
5244            elif self.group2 == properties['group_object']:
5245                self.group2 = None
5246
5247            if not self.group1 and not self.group2:
5248                self.done = True
5249                self.loop.quit()
5250
5251        def peerJoined(self, peer):
5252            logger.debug("peerJoined: " + peer)
5253            if self.groups_removed:
5254                return
5255            self.check_results()
5256
5257            dev[2].remove_group()
5258
5259            logger.info("Disconnect group2")
5260            group_p2p = dbus.Interface(self.g2_if_obj,
5261                                       WPAS_DBUS_IFACE_P2PDEVICE)
5262            group_p2p.Disconnect()
5263
5264            logger.info("Disconnect group1")
5265            group_p2p = dbus.Interface(self.g1_if_obj,
5266                                       WPAS_DBUS_IFACE_P2PDEVICE)
5267            group_p2p.Disconnect()
5268            self.groups_removed = True
5269
5270        def check_results(self):
5271            logger.info("Check results with two concurrent groups in operation")
5272
5273            g1_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group1)
5274            res1 = g1_obj.GetAll(WPAS_DBUS_GROUP,
5275                                 dbus_interface=dbus.PROPERTIES_IFACE,
5276                                 byte_arrays=True)
5277
5278            g2_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group2)
5279            res2 = g2_obj.GetAll(WPAS_DBUS_GROUP,
5280                                 dbus_interface=dbus.PROPERTIES_IFACE,
5281                                 byte_arrays=True)
5282
5283            logger.info("group1 = " + self.group1)
5284            logger.debug("Group properties: " + str(res1))
5285
5286            logger.info("group2 = " + self.group2)
5287            logger.debug("Group properties: " + str(res2))
5288
5289            prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
5290                                 dbus_interface=dbus.PROPERTIES_IFACE)
5291            logger.debug("p2pdevice properties: " + str(prop))
5292
5293            if res1['Role'] != 'client':
5294                raise Exception("Group1 role reported incorrectly: " + res1['Role'])
5295            if res2['Role'] != 'GO':
5296                raise Exception("Group2 role reported incorrectly: " + res2['Role'])
5297            if prop['Role'] != 'device':
5298                raise Exception("p2pdevice role reported incorrectly: " + prop['Role'])
5299
5300            if len(res2['Members']) != 1:
5301                   raise Exception("Unexpected Members value for group 2")
5302
5303        def run_test(self, *args):
5304            logger.debug("run_test")
5305            p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
5306            return False
5307
5308        def success(self):
5309            return self.done
5310
5311    with TestDbusP2p(bus) as t:
5312        if not t.success():
5313            raise Exception("Expected signals not seen")
5314
5315    dev[1].remove_group()
5316
5317def test_dbus_p2p_cancel(dev, apdev):
5318    """D-Bus P2P Cancel"""
5319    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5320    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
5321    try:
5322        p2p.Cancel()
5323        raise Exception("Unexpected p2p.Cancel() success")
5324    except dbus.exceptions.DBusException as e:
5325        pass
5326
5327    addr0 = dev[0].p2p_dev_addr()
5328    dev[1].p2p_listen()
5329
5330    class TestDbusP2p(TestDbus):
5331        def __init__(self, bus):
5332            TestDbus.__init__(self, bus)
5333            self.done = False
5334
5335        def __enter__(self):
5336            gobject.timeout_add(1, self.run_test)
5337            gobject.timeout_add(15000, self.timeout)
5338            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
5339                            "DeviceFound")
5340            self.loop.run()
5341            return self
5342
5343        def deviceFound(self, path):
5344            logger.debug("deviceFound: path=%s" % path)
5345            args = {'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
5346                    'go_intent': 0}
5347            p2p.Connect(args)
5348            p2p.Cancel()
5349            self.done = True
5350            self.loop.quit()
5351
5352        def run_test(self, *args):
5353            logger.debug("run_test")
5354            p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
5355            return False
5356
5357        def success(self):
5358            return self.done
5359
5360    with TestDbusP2p(bus) as t:
5361        if not t.success():
5362            raise Exception("Expected signals not seen")
5363
5364def test_dbus_p2p_ip_addr(dev, apdev):
5365    """D-Bus P2P and IP address parameters"""
5366    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5367    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
5368
5369    vals = [("IpAddrGo", "192.168.43.1"),
5370            ("IpAddrMask", "255.255.255.0"),
5371            ("IpAddrStart", "192.168.43.100"),
5372            ("IpAddrEnd", "192.168.43.199")]
5373    for field, value in vals:
5374        if_obj.Set(WPAS_DBUS_IFACE, field, value,
5375                   dbus_interface=dbus.PROPERTIES_IFACE)
5376        val = if_obj.Get(WPAS_DBUS_IFACE, field,
5377                         dbus_interface=dbus.PROPERTIES_IFACE)
5378        if val != value:
5379            raise Exception("Unexpected %s value: %s" % (field, val))
5380
5381    set_ip_addr_info(dev[1])
5382
5383    dev[0].global_request("SET p2p_go_intent 0")
5384
5385    req = dev[0].global_request("NFC_GET_HANDOVER_REQ NDEF P2P-CR").rstrip()
5386    if "FAIL" in req:
5387        raise Exception("Failed to generate NFC connection handover request")
5388    sel = dev[1].global_request("NFC_GET_HANDOVER_SEL NDEF P2P-CR").rstrip()
5389    if "FAIL" in sel:
5390        raise Exception("Failed to generate NFC connection handover select")
5391    dev[0].dump_monitor()
5392    dev[1].dump_monitor()
5393    res = dev[1].global_request("NFC_REPORT_HANDOVER RESP P2P " + req + " " + sel)
5394    if "FAIL" in res:
5395        raise Exception("Failed to report NFC connection handover to wpa_supplicant(resp)")
5396    res = dev[0].global_request("NFC_REPORT_HANDOVER INIT P2P " + req + " " + sel)
5397    if "FAIL" in res:
5398        raise Exception("Failed to report NFC connection handover to wpa_supplicant(init)")
5399
5400    class TestDbusP2p(TestDbus):
5401        def __init__(self, bus):
5402            TestDbus.__init__(self, bus)
5403            self.done = False
5404
5405        def __enter__(self):
5406            gobject.timeout_add(1, self.run_test)
5407            gobject.timeout_add(15000, self.timeout)
5408            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
5409                            "GroupStarted")
5410            self.loop.run()
5411            return self
5412
5413        def groupStarted(self, properties):
5414            logger.debug("groupStarted: " + str(properties))
5415            self.loop.quit()
5416
5417            if 'IpAddrGo' not in properties:
5418                logger.info("IpAddrGo missing from GroupStarted")
5419            ip_addr_go = properties['IpAddrGo']
5420            addr = "%d.%d.%d.%d" % (ip_addr_go[0], ip_addr_go[1], ip_addr_go[2], ip_addr_go[3])
5421            if addr != "192.168.42.1":
5422                logger.info("Unexpected IpAddrGo value: " + addr)
5423            self.done = True
5424
5425        def run_test(self, *args):
5426            logger.debug("run_test")
5427            return False
5428
5429        def success(self):
5430            return self.done
5431
5432    with TestDbusP2p(bus) as t:
5433        if not t.success():
5434            raise Exception("Expected signals not seen")
5435
5436def test_dbus_introspect(dev, apdev):
5437    """D-Bus introspection"""
5438    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5439
5440    res = if_obj.Introspect(WPAS_DBUS_IFACE,
5441                            dbus_interface=dbus.INTROSPECTABLE_IFACE)
5442    logger.info("Initial Introspect: " + str(res))
5443    if res is None or "Introspectable" not in res or "GroupStarted" not in res:
5444        raise Exception("Unexpected initial Introspect response: " + str(res))
5445    if "FastReauth" not in res or "PassiveScan" not in res:
5446        raise Exception("Unexpected initial Introspect response: " + str(res))
5447
5448    with alloc_fail(dev[0], 1, "wpa_dbus_introspect"):
5449        res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
5450                                 dbus_interface=dbus.INTROSPECTABLE_IFACE)
5451        logger.info("Introspect: " + str(res2))
5452        if res2 is not None:
5453            raise Exception("Unexpected Introspect response")
5454
5455    with alloc_fail(dev[0], 1, "=add_interface;wpa_dbus_introspect"):
5456        res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
5457                                 dbus_interface=dbus.INTROSPECTABLE_IFACE)
5458        logger.info("Introspect: " + str(res2))
5459        if res2 is None:
5460            raise Exception("No Introspect response")
5461        if len(res2) >= len(res):
5462            raise Exception("Unexpected Introspect response")
5463
5464    with alloc_fail(dev[0], 1, "wpabuf_alloc;add_interface;wpa_dbus_introspect"):
5465        res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
5466                                 dbus_interface=dbus.INTROSPECTABLE_IFACE)
5467        logger.info("Introspect: " + str(res2))
5468        if res2 is None:
5469            raise Exception("No Introspect response")
5470        if len(res2) >= len(res):
5471            raise Exception("Unexpected Introspect response")
5472
5473    with alloc_fail(dev[0], 2, "=add_interface;wpa_dbus_introspect"):
5474        res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
5475                                 dbus_interface=dbus.INTROSPECTABLE_IFACE)
5476        logger.info("Introspect: " + str(res2))
5477        if res2 is None:
5478            raise Exception("No Introspect response")
5479        if len(res2) >= len(res):
5480            raise Exception("Unexpected Introspect response")
5481
5482def run_busctl(service, obj):
5483    if not shutil.which("busctl"):
5484        raise HwsimSkip("No busctl available")
5485    logger.info("busctl introspect %s %s" % (service, obj))
5486    cmd = subprocess.Popen(['busctl', 'introspect', service, obj],
5487                           stdout=subprocess.PIPE,
5488                           stderr=subprocess.PIPE)
5489    out = cmd.communicate()
5490    cmd.wait()
5491    logger.info("busctl stdout:\n%s" % out[0].strip())
5492    if len(out[1]) > 0:
5493        logger.info("busctl stderr: %s" % out[1].decode().strip())
5494    if "Duplicate property" in out[1].decode():
5495        raise Exception("Duplicate property")
5496
5497def test_dbus_introspect_busctl(dev, apdev):
5498    """D-Bus introspection with busctl"""
5499    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5500    ifaces = dbus_get(dbus, wpas_obj, "Interfaces")
5501    run_busctl(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH)
5502    run_busctl(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH + "/Interfaces")
5503    run_busctl(WPAS_DBUS_SERVICE, ifaces[0])
5504
5505    hapd = hostapd.add_ap(apdev[0], {"ssid": "open"})
5506    bssid = apdev[0]['bssid']
5507    dev[0].scan_for_bss(bssid, freq=2412)
5508    id = dev[0].add_network()
5509    dev[0].set_network(id, "disabled", "0")
5510    dev[0].set_network_quoted(id, "ssid", "test")
5511
5512    run_busctl(WPAS_DBUS_SERVICE, ifaces[0] + "/BSSs/0")
5513    run_busctl(WPAS_DBUS_SERVICE, ifaces[0] + "/Networks/0")
5514
5515def test_dbus_ap(dev, apdev):
5516    """D-Bus AddNetwork for AP mode"""
5517    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5518    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5519
5520    ssid = "test-wpa2-psk"
5521    passphrase = 'qwertyuiop'
5522
5523    class TestDbusConnect(TestDbus):
5524        def __init__(self, bus):
5525            TestDbus.__init__(self, bus)
5526            self.started = False
5527            self.sta_added = False
5528            self.sta_removed = False
5529            self.authorized = False
5530            self.deauthorized = False
5531            self.stations = False
5532
5533        def __enter__(self):
5534            gobject.timeout_add(1, self.run_connect)
5535            gobject.timeout_add(15000, self.timeout)
5536            self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
5537            self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
5538                            "NetworkSelected")
5539            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
5540                            "PropertiesChanged")
5541            self.add_signal(self.stationAdded, WPAS_DBUS_IFACE, "StationAdded")
5542            self.add_signal(self.stationRemoved, WPAS_DBUS_IFACE,
5543                            "StationRemoved")
5544            self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
5545                            "StaAuthorized")
5546            self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE,
5547                            "StaDeauthorized")
5548            self.loop.run()
5549            return self
5550
5551        def networkAdded(self, network, properties):
5552            logger.debug("networkAdded: %s" % str(network))
5553            logger.debug(str(properties))
5554
5555        def networkSelected(self, network):
5556            logger.debug("networkSelected: %s" % str(network))
5557            self.network_selected = True
5558
5559        def propertiesChanged(self, properties):
5560            logger.debug("propertiesChanged: %s" % str(properties))
5561            if 'State' in properties and properties['State'] == "completed":
5562                self.started = True
5563                dev[1].connect(ssid, psk=passphrase, scan_freq="2412")
5564
5565        def stationAdded(self, station, properties):
5566            logger.debug("stationAdded: %s" % str(station))
5567            logger.debug(str(properties))
5568            self.sta_added = True
5569            res = if_obj.Get(WPAS_DBUS_IFACE, 'Stations',
5570                             dbus_interface=dbus.PROPERTIES_IFACE)
5571            logger.info("Stations: " + str(res))
5572            if len(res) == 1:
5573                self.stations = True
5574            else:
5575                raise Exception("Missing Stations entry: " + str(res))
5576
5577        def stationRemoved(self, station):
5578            logger.debug("stationRemoved: %s" % str(station))
5579            self.sta_removed = True
5580            res = if_obj.Get(WPAS_DBUS_IFACE, 'Stations',
5581                             dbus_interface=dbus.PROPERTIES_IFACE)
5582            logger.info("Stations: " + str(res))
5583            if len(res) != 0:
5584                self.stations = False
5585                raise Exception("Unexpected Stations entry: " + str(res))
5586            self.loop.quit()
5587
5588        def staAuthorized(self, name):
5589            logger.debug("staAuthorized: " + name)
5590            self.authorized = True
5591            dev[1].request("DISCONNECT")
5592
5593        def staDeauthorized(self, name):
5594            logger.debug("staDeauthorized: " + name)
5595            self.deauthorized = True
5596
5597        def run_connect(self, *args):
5598            logger.debug("run_connect")
5599            args = dbus.Dictionary({'ssid': ssid,
5600                                    'key_mgmt': 'WPA-PSK',
5601                                    'psk': passphrase,
5602                                    'mode': 2,
5603                                    'frequency': 2412,
5604                                    'scan_freq': 2412},
5605                                   signature='sv')
5606            self.netw = iface.AddNetwork(args)
5607            iface.SelectNetwork(self.netw)
5608            return False
5609
5610        def success(self):
5611            return self.started and self.sta_added and self.sta_removed and \
5612                self.authorized and self.deauthorized
5613
5614    with TestDbusConnect(bus) as t:
5615        if not t.success():
5616            raise Exception("Expected signals not seen")
5617
5618def test_dbus_ap_scan(dev, apdev):
5619    """D-Bus AddNetwork for AP mode and scan"""
5620    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5621    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5622
5623    ssid = "test-wpa2-psk"
5624    passphrase = 'qwertyuiop'
5625
5626    hapd = hostapd.add_ap(apdev[0], {"ssid": "open"})
5627    bssid = hapd.own_addr()
5628
5629    class TestDbusConnect(TestDbus):
5630        def __init__(self, bus):
5631            TestDbus.__init__(self, bus)
5632            self.started = False
5633            self.scan_completed = False
5634
5635        def __enter__(self):
5636            gobject.timeout_add(1, self.run_connect)
5637            gobject.timeout_add(15000, self.timeout)
5638            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
5639                            "PropertiesChanged")
5640            self.add_signal(self.scanDone, WPAS_DBUS_IFACE, "ScanDone")
5641            self.loop.run()
5642            return self
5643
5644        def propertiesChanged(self, properties):
5645            logger.debug("propertiesChanged: %s" % str(properties))
5646            if 'State' in properties and properties['State'] == "completed":
5647                self.started = True
5648                logger.info("Try to scan in AP mode")
5649                iface.Scan({'Type': 'active',
5650                            'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
5651                logger.info("Scan() returned")
5652
5653        def scanDone(self, success):
5654            logger.debug("scanDone: success=%s" % success)
5655            if self.started:
5656                self.scan_completed = True
5657                self.loop.quit()
5658
5659        def run_connect(self, *args):
5660            logger.debug("run_connect")
5661            args = dbus.Dictionary({'ssid': ssid,
5662                                    'key_mgmt': 'WPA-PSK',
5663                                    'psk': passphrase,
5664                                    'mode': 2,
5665                                    'frequency': 2412,
5666                                    'scan_freq': 2412},
5667                                   signature='sv')
5668            self.netw = iface.AddNetwork(args)
5669            iface.SelectNetwork(self.netw)
5670            return False
5671
5672        def success(self):
5673            return self.started and self.scan_completed
5674
5675    with TestDbusConnect(bus) as t:
5676        if not t.success():
5677            raise Exception("Expected signals not seen")
5678
5679def test_dbus_connect_wpa_eap(dev, apdev):
5680    """D-Bus AddNetwork and connection with WPA+WPA2-Enterprise AP"""
5681    skip_without_tkip(dev[0])
5682    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5683    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5684
5685    ssid = "test-wpa-eap"
5686    params = hostapd.wpa_eap_params(ssid=ssid)
5687    params["wpa"] = "3"
5688    params["rsn_pairwise"] = "CCMP"
5689    hapd = hostapd.add_ap(apdev[0], params)
5690
5691    class TestDbusConnect(TestDbus):
5692        def __init__(self, bus):
5693            TestDbus.__init__(self, bus)
5694            self.done = False
5695
5696        def __enter__(self):
5697            gobject.timeout_add(1, self.run_connect)
5698            gobject.timeout_add(15000, self.timeout)
5699            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
5700                            "PropertiesChanged")
5701            self.add_signal(self.eap, WPAS_DBUS_IFACE, "EAP")
5702            self.loop.run()
5703            return self
5704
5705        def propertiesChanged(self, properties):
5706            logger.debug("propertiesChanged: %s" % str(properties))
5707            if 'State' in properties and properties['State'] == "completed":
5708                self.done = True
5709                self.loop.quit()
5710
5711        def eap(self, status, parameter):
5712            logger.debug("EAP: status=%s parameter=%s" % (status, parameter))
5713
5714        def run_connect(self, *args):
5715            logger.debug("run_connect")
5716            args = dbus.Dictionary({'ssid': ssid,
5717                                    'key_mgmt': 'WPA-EAP',
5718                                    'eap': 'PEAP',
5719                                    'identity': 'user',
5720                                    'password': 'password',
5721                                    'ca_cert': 'auth_serv/ca.pem',
5722                                    'phase2': 'auth=MSCHAPV2',
5723                                    'scan_freq': 2412},
5724                                   signature='sv')
5725            self.netw = iface.AddNetwork(args)
5726            iface.SelectNetwork(self.netw)
5727            return False
5728
5729        def success(self):
5730            return self.done
5731
5732    with TestDbusConnect(bus) as t:
5733        if not t.success():
5734            raise Exception("Expected signals not seen")
5735
5736def test_dbus_ap_scan_2_ap_mode_scan(dev, apdev):
5737    """AP_SCAN 2 AP mode and D-Bus Scan()"""
5738    try:
5739        _test_dbus_ap_scan_2_ap_mode_scan(dev, apdev)
5740    finally:
5741        dev[0].request("AP_SCAN 1")
5742
5743def _test_dbus_ap_scan_2_ap_mode_scan(dev, apdev):
5744    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5745    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5746
5747    if "OK" not in dev[0].request("AP_SCAN 2"):
5748        raise Exception("Failed to set AP_SCAN 2")
5749
5750    id = dev[0].add_network()
5751    dev[0].set_network(id, "mode", "2")
5752    dev[0].set_network_quoted(id, "ssid", "wpas-ap-open")
5753    dev[0].set_network(id, "key_mgmt", "NONE")
5754    dev[0].set_network(id, "frequency", "2412")
5755    dev[0].set_network(id, "scan_freq", "2412")
5756    dev[0].set_network(id, "disabled", "0")
5757    dev[0].select_network(id)
5758    ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=5)
5759    if ev is None:
5760        raise Exception("AP failed to start")
5761
5762    with fail_test(dev[0], 1, "wpa_driver_nl80211_scan"):
5763        iface.Scan({'Type': 'active',
5764                    'AllowRoam': True,
5765                    'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
5766        ev = dev[0].wait_event(["CTRL-EVENT-SCAN-FAILED",
5767                                "AP-DISABLED"], timeout=5)
5768        if ev is None:
5769            raise Exception("CTRL-EVENT-SCAN-FAILED not seen")
5770        if "AP-DISABLED" in ev:
5771            raise Exception("Unexpected AP-DISABLED event")
5772        if "retry=1" in ev:
5773            # Wait for the retry to scan happen
5774            ev = dev[0].wait_event(["CTRL-EVENT-SCAN-FAILED",
5775                                    "AP-DISABLED"], timeout=5)
5776            if ev is None:
5777                raise Exception("CTRL-EVENT-SCAN-FAILED not seen - retry")
5778            if "AP-DISABLED" in ev:
5779                raise Exception("Unexpected AP-DISABLED event - retry")
5780
5781    dev[1].connect("wpas-ap-open", key_mgmt="NONE", scan_freq="2412")
5782    dev[1].request("DISCONNECT")
5783    dev[1].wait_disconnected()
5784    dev[0].request("DISCONNECT")
5785    dev[0].wait_disconnected()
5786
5787def test_dbus_expectdisconnect(dev, apdev):
5788    """D-Bus ExpectDisconnect"""
5789    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5790    wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
5791
5792    params = {"ssid": "test-open"}
5793    hapd = hostapd.add_ap(apdev[0], params)
5794    dev[0].connect("test-open", key_mgmt="NONE", scan_freq="2412")
5795
5796    # This does not really verify the behavior other than by going through the
5797    # code path for additional coverage.
5798    wpas.ExpectDisconnect()
5799    dev[0].request("DISCONNECT")
5800    dev[0].wait_disconnected()
5801
5802def test_dbus_save_config(dev, apdev):
5803    """D-Bus SaveConfig"""
5804    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5805    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5806    try:
5807        iface.SaveConfig()
5808        raise Exception("SaveConfig() accepted unexpectedly")
5809    except dbus.exceptions.DBusException as e:
5810        if not str(e).startswith("fi.w1.wpa_supplicant1.UnknownError: Not allowed to update configuration"):
5811            raise Exception("Unexpected error message for SaveConfig(): " + str(e))
5812
5813def test_dbus_vendor_elem(dev, apdev):
5814    """D-Bus vendor element operations"""
5815    try:
5816        _test_dbus_vendor_elem(dev, apdev)
5817    finally:
5818        dev[0].request("VENDOR_ELEM_REMOVE 1 *")
5819
5820def _test_dbus_vendor_elem(dev, apdev):
5821    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5822    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5823
5824    dev[0].request("VENDOR_ELEM_REMOVE 1 *")
5825
5826    try:
5827        ie = dbus.ByteArray(b"\x00\x00")
5828        iface.VendorElemAdd(-1, ie)
5829        raise Exception("Invalid VendorElemAdd() accepted")
5830    except dbus.exceptions.DBusException as e:
5831        if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5832            raise Exception("Unexpected error message for invalid VendorElemAdd[1]: " + str(e))
5833
5834    try:
5835        ie = dbus.ByteArray(b'')
5836        iface.VendorElemAdd(1, ie)
5837        raise Exception("Invalid VendorElemAdd() accepted")
5838    except dbus.exceptions.DBusException as e:
5839        if "InvalidArgs" not in str(e) or "Invalid value" not in str(e):
5840            raise Exception("Unexpected error message for invalid VendorElemAdd[2]: " + str(e))
5841
5842    try:
5843        ie = dbus.ByteArray(b"\x00\x01")
5844        iface.VendorElemAdd(1, ie)
5845        raise Exception("Invalid VendorElemAdd() accepted")
5846    except dbus.exceptions.DBusException as e:
5847        if "InvalidArgs" not in str(e) or "Parse error" not in str(e):
5848            raise Exception("Unexpected error message for invalid VendorElemAdd[3]: " + str(e))
5849
5850    try:
5851        iface.VendorElemGet(-1)
5852        raise Exception("Invalid VendorElemGet() accepted")
5853    except dbus.exceptions.DBusException as e:
5854        if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5855            raise Exception("Unexpected error message for invalid VendorElemGet[1]: " + str(e))
5856
5857    try:
5858        iface.VendorElemGet(1)
5859        raise Exception("Invalid VendorElemGet() accepted")
5860    except dbus.exceptions.DBusException as e:
5861        if "InvalidArgs" not in str(e) or "ID value does not exist" not in str(e):
5862            raise Exception("Unexpected error message for invalid VendorElemGet[2]: " + str(e))
5863
5864    try:
5865        ie = dbus.ByteArray(b"\x00\x00")
5866        iface.VendorElemRem(-1, ie)
5867        raise Exception("Invalid VendorElemRemove() accepted")
5868    except dbus.exceptions.DBusException as e:
5869        if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5870            raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5871
5872    try:
5873        ie = dbus.ByteArray(b'')
5874        iface.VendorElemRem(1, ie)
5875        raise Exception("Invalid VendorElemRemove() accepted")
5876    except dbus.exceptions.DBusException as e:
5877        if "InvalidArgs" not in str(e) or "Invalid value" not in str(e):
5878            raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5879
5880    iface.VendorElemRem(1, b"*")
5881
5882    ie = dbus.ByteArray(b"\x00\x01\x00")
5883    iface.VendorElemAdd(1, ie)
5884
5885    val = iface.VendorElemGet(1)
5886    if len(val) != len(ie):
5887        raise Exception("Unexpected VendorElemGet length")
5888    for i in range(len(val)):
5889        if val[i] != dbus.Byte(ie[i]):
5890            raise Exception("Unexpected VendorElemGet data")
5891
5892    ie2 = dbus.ByteArray(b"\xe0\x00")
5893    iface.VendorElemAdd(1, ie2)
5894
5895    ies = ie + ie2
5896    val = iface.VendorElemGet(1)
5897    if len(val) != len(ies):
5898        raise Exception("Unexpected VendorElemGet length[2]")
5899    for i in range(len(val)):
5900        if val[i] != dbus.Byte(ies[i]):
5901            raise Exception("Unexpected VendorElemGet data[2]")
5902
5903    try:
5904        test_ie = dbus.ByteArray(b"\x01\x01")
5905        iface.VendorElemRem(1, test_ie)
5906        raise Exception("Invalid VendorElemRemove() accepted")
5907    except dbus.exceptions.DBusException as e:
5908        if "InvalidArgs" not in str(e) or "Parse error" not in str(e):
5909            raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5910
5911    iface.VendorElemRem(1, ie)
5912    val = iface.VendorElemGet(1)
5913    if len(val) != len(ie2):
5914        raise Exception("Unexpected VendorElemGet length[3]")
5915
5916    iface.VendorElemRem(1, b"*")
5917    try:
5918        iface.VendorElemGet(1)
5919        raise Exception("Invalid VendorElemGet() accepted after removal")
5920    except dbus.exceptions.DBusException as e:
5921        if "InvalidArgs" not in str(e) or "ID value does not exist" not in str(e):
5922            raise Exception("Unexpected error message for invalid VendorElemGet after removal: " + str(e))
5923
5924def test_dbus_assoc_reject(dev, apdev):
5925    """D-Bus AssocStatusCode"""
5926    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5927    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5928
5929    ssid = "test-open"
5930    params = {"ssid": ssid,
5931              "max_listen_interval": "1"}
5932    hapd = hostapd.add_ap(apdev[0], params)
5933
5934    class TestDbusConnect(TestDbus):
5935        def __init__(self, bus):
5936            TestDbus.__init__(self, bus)
5937            self.assoc_status_seen = False
5938            self.state = 0
5939
5940        def __enter__(self):
5941            gobject.timeout_add(1, self.run_connect)
5942            gobject.timeout_add(15000, self.timeout)
5943            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
5944                            "PropertiesChanged")
5945            self.loop.run()
5946            return self
5947
5948        def propertiesChanged(self, properties):
5949            logger.debug("propertiesChanged: %s" % str(properties))
5950            if 'AssocStatusCode' in properties:
5951                status = properties['AssocStatusCode']
5952                if status != 51:
5953                    logger.info("Unexpected status code: " + str(status))
5954                else:
5955                    self.assoc_status_seen = True
5956                iface.Disconnect()
5957                self.loop.quit()
5958
5959        def run_connect(self, *args):
5960            args = dbus.Dictionary({'ssid': ssid,
5961                                    'key_mgmt': 'NONE',
5962                                    'scan_freq': 2412},
5963                                   signature='sv')
5964            self.netw = iface.AddNetwork(args)
5965            iface.SelectNetwork(self.netw)
5966            return False
5967
5968        def success(self):
5969            return self.assoc_status_seen
5970
5971    with TestDbusConnect(bus) as t:
5972        if not t.success():
5973            raise Exception("Expected signals not seen")
5974
5975def test_dbus_mesh(dev, apdev):
5976    """D-Bus mesh"""
5977    check_mesh_support(dev[0])
5978    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5979    mesh = dbus.Interface(if_obj, WPAS_DBUS_IFACE_MESH)
5980
5981    add_open_mesh_network(dev[1])
5982    addr1 = dev[1].own_addr()
5983
5984    class TestDbusMesh(TestDbus):
5985        def __init__(self, bus):
5986            TestDbus.__init__(self, bus)
5987            self.done = False
5988
5989        def __enter__(self):
5990            gobject.timeout_add(1, self.run_test)
5991            gobject.timeout_add(15000, self.timeout)
5992            self.add_signal(self.meshGroupStarted, WPAS_DBUS_IFACE_MESH,
5993                            "MeshGroupStarted")
5994            self.add_signal(self.meshGroupRemoved, WPAS_DBUS_IFACE_MESH,
5995                            "MeshGroupRemoved")
5996            self.add_signal(self.meshPeerConnected, WPAS_DBUS_IFACE_MESH,
5997                            "MeshPeerConnected")
5998            self.add_signal(self.meshPeerDisconnected, WPAS_DBUS_IFACE_MESH,
5999                            "MeshPeerDisconnected")
6000            self.loop.run()
6001            return self
6002
6003        def meshGroupStarted(self, args):
6004            logger.debug("MeshGroupStarted: " + str(args))
6005
6006        def meshGroupRemoved(self, args):
6007            logger.debug("MeshGroupRemoved: " + str(args))
6008            self.done = True
6009            self.loop.quit()
6010
6011        def meshPeerConnected(self, args):
6012            logger.debug("MeshPeerConnected: " + str(args))
6013
6014            res = if_obj.Get(WPAS_DBUS_IFACE_MESH, 'MeshPeers',
6015                             dbus_interface=dbus.PROPERTIES_IFACE,
6016                             byte_arrays=True)
6017            logger.debug("MeshPeers: " + str(res))
6018            if len(res) != 1:
6019                raise Exception("Unexpected number of MeshPeer values")
6020            if binascii.hexlify(res[0]).decode() != addr1.replace(':', ''):
6021                raise Exception("Unexpected peer address")
6022
6023            res = if_obj.Get(WPAS_DBUS_IFACE_MESH, 'MeshGroup',
6024                             dbus_interface=dbus.PROPERTIES_IFACE,
6025                             byte_arrays=True)
6026            logger.debug("MeshGroup: " + str(res))
6027            if res != b"wpas-mesh-open":
6028                raise Exception("Unexpected MeshGroup")
6029            dev[1].mesh_group_remove()
6030
6031        def meshPeerDisconnected(self, args):
6032            logger.debug("MeshPeerDisconnected: " + str(args))
6033            dev[0].mesh_group_remove()
6034
6035        def run_test(self, *args):
6036            logger.debug("run_test")
6037            add_open_mesh_network(dev[0])
6038            return False
6039
6040        def success(self):
6041            return self.done
6042
6043    with TestDbusMesh(bus) as t:
6044        if not t.success():
6045            raise Exception("Expected signals not seen")
6046
6047def test_dbus_roam(dev, apdev):
6048    """D-Bus Roam"""
6049    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
6050    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
6051
6052    ssid = "test-wpa2-psk"
6053    passphrase = 'qwertyuiop'
6054    params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
6055    hapd = hostapd.add_ap(apdev[0], params)
6056    hapd2 = hostapd.add_ap(apdev[1], params)
6057    bssid = apdev[0]['bssid']
6058    dev[0].scan_for_bss(bssid, freq=2412)
6059    bssid2 = apdev[1]['bssid']
6060    dev[0].scan_for_bss(bssid2, freq=2412)
6061
6062    class TestDbusConnect(TestDbus):
6063        def __init__(self, bus):
6064            TestDbus.__init__(self, bus)
6065            self.state = 0
6066
6067        def __enter__(self):
6068            gobject.timeout_add(1, self.run_connect)
6069            gobject.timeout_add(15000, self.timeout)
6070            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
6071                            "PropertiesChanged")
6072            self.loop.run()
6073            return self
6074
6075        def propertiesChanged(self, properties):
6076            logger.debug("propertiesChanged: %s" % str(properties))
6077            if 'State' in properties and properties['State'] == "completed":
6078                if self.state == 0:
6079                    self.state = 1
6080                    cur = properties["CurrentBSS"]
6081                    bss_obj = bus.get_object(WPAS_DBUS_SERVICE, cur)
6082                    res = bss_obj.Get(WPAS_DBUS_BSS, 'BSSID',
6083                                      dbus_interface=dbus.PROPERTIES_IFACE)
6084                    bssid_str = ''
6085                    for item in res:
6086                        if len(bssid_str) > 0:
6087                            bssid_str += ':'
6088                        bssid_str += '%02x' % item
6089                    dst = bssid if bssid_str == bssid2 else bssid2
6090                    iface.Roam(dst)
6091                elif self.state == 1:
6092                    if "RoamComplete" in properties and \
6093                       properties["RoamComplete"]:
6094                        self.state = 2
6095                        self.loop.quit()
6096
6097        def run_connect(self, *args):
6098            logger.debug("run_connect")
6099            args = dbus.Dictionary({'ssid': ssid,
6100                                    'key_mgmt': 'WPA-PSK',
6101                                    'psk': passphrase,
6102                                    'scan_freq': 2412},
6103                                   signature='sv')
6104            self.netw = iface.AddNetwork(args)
6105            iface.SelectNetwork(self.netw)
6106            return False
6107
6108        def success(self):
6109            return self.state == 2
6110
6111    with TestDbusConnect(bus) as t:
6112        if not t.success():
6113            raise Exception("Expected signals not seen")
6114
6115def test_dbus_creds(dev, apdev):
6116    """D-Bus interworking credentials"""
6117    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
6118    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
6119
6120    args = {'domain': ['server.w1.fi','server2.w1.fi'],
6121            'realm': 'server.w1.fi',
6122            'home_ois': '50a9bf',
6123            'required_home_ois': '23bf50',
6124            'eap': 'TTLS',
6125            'phase2': 'auth=MSCHAPV2',
6126            'username': 'user',
6127            'password': 'password',
6128            'domain_suffix_match': 'server.w1.fi',
6129            'ca_cert': 'auth_serv/ca.pem'}
6130
6131    path = iface.AddCred(dbus.Dictionary(args, signature='sv'))
6132    for k, v in args.items():
6133        if k == 'password':
6134            continue
6135        prop = dev[0].get_cred(0, k)
6136        if isinstance(v, list):
6137            v = '\n'.join(v)
6138        if prop != v:
6139            raise Exception('Credential add failed: %s does not match %s' % (prop, v))
6140
6141    iface.RemoveCred(path)
6142    if not "FAIL" in dev[0].get_cred(0, 'domain'):
6143        raise Exception("Credential remove failed")
6144
6145    # Removal of multiple credentials
6146    cred1 = {'domain': 'server1.w1.fi','realm': 'server1.w1.fi','eap': 'TTLS'}
6147    iface.AddCred(dbus.Dictionary(cred1, signature='sv'))
6148    if "FAIL" in dev[0].get_cred(0, 'domain'):
6149        raise Exception("Failed to add credential")
6150
6151    cred2 = {'domain': 'server2.w1.fi','realm': 'server2.w1.fi','eap': 'TTLS'}
6152    iface.AddCred(dbus.Dictionary(cred2, signature='sv'))
6153    if "FAIL" in dev[0].get_cred(1, 'domain'):
6154        raise Exception("Failed to add credential")
6155
6156    iface.RemoveAllCreds()
6157    if not "FAIL" in dev[0].get_cred(0, 'domain'):
6158        raise Exception("Credential remove failed")
6159    if not "FAIL" in dev[0].get_cred(1, 'domain'):
6160        raise Exception("Credential remove failed")
6161
6162def test_dbus_interworking(dev, apdev):
6163    """D-Bus interworking selection"""
6164    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
6165    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
6166
6167    params = {"ssid": "test-interworking", "wpa": "2",
6168              "wpa_key_mgmt": "WPA-EAP", "rsn_pairwise": "CCMP",
6169              "ieee8021x": "1", "eapol_version": "2",
6170              "eap_server": "1", "eap_user_file": "auth_serv/eap_user.conf",
6171              "ca_cert": "auth_serv/ca.pem",
6172              "server_cert": "auth_serv/server.pem",
6173              "private_key": "auth_serv/server.key",
6174              "interworking": "1",
6175              "domain_name": "server.w1.fi",
6176              "nai_realm": "0,server.w1.fi,21[2:4][5:7]",
6177              "roaming_consortium": "2233445566",
6178              "hs20": "1", "anqp_domain_id": "1234"}
6179
6180    hapd = hostapd.add_ap(apdev[0], params)
6181
6182    class TestDbusInterworking(TestDbus):
6183        def __init__(self, bus):
6184            TestDbus.__init__(self, bus)
6185            self.interworking_ap_seen = False
6186            self.interworking_select_done = False
6187
6188        def __enter__(self):
6189            gobject.timeout_add(1, self.run_select)
6190            gobject.timeout_add(15000, self.timeout)
6191            self.add_signal(self.interworkingAPAdded, WPAS_DBUS_IFACE,
6192                            "InterworkingAPAdded")
6193            self.add_signal(self.interworkingSelectDone, WPAS_DBUS_IFACE,
6194                            "InterworkingSelectDone")
6195            self.loop.run()
6196            return self
6197
6198        def interworkingAPAdded(self, bss, cred, properties):
6199            logger.debug("interworkingAPAdded: bss=%s cred=%s %s" % (bss, cred, str(properties)))
6200            if self.cred == cred:
6201                self.interworking_ap_seen = True
6202
6203        def interworkingSelectDone(self):
6204            logger.debug("interworkingSelectDone")
6205            self.interworking_select_done = True
6206            self.loop.quit()
6207
6208        def run_select(self, *args):
6209            args = {"domain": "server.w1.fi",
6210                    "realm": "server.w1.fi",
6211                    "eap": "TTLS",
6212                    "phase2": "auth=MSCHAPV2",
6213                    "username": "user",
6214                    "password": "password",
6215                    "domain_suffix_match": "server.w1.fi",
6216                    "ca_cert": "auth_serv/ca.pem"}
6217            self.cred = iface.AddCred(dbus.Dictionary(args, signature='sv'))
6218            iface.InterworkingSelect()
6219            return False
6220
6221        def success(self):
6222            return self.interworking_ap_seen and self.interworking_select_done
6223
6224    with TestDbusInterworking(bus) as t:
6225        if not t.success():
6226            raise Exception("Expected signals not seen")
6227
6228def test_dbus_hs20_terms_and_conditions(dev, apdev):
6229    """D-Bus HS2.0 Terms and Conditions acceptance"""
6230    check_eap_capa(dev[0], "MSCHAPV2")
6231
6232    (bus, wpa_obj, path, if_obj) = prepare_dbus(dev[0])
6233    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
6234
6235    bssid = apdev[0]['bssid']
6236    params = {"ssid": "test-hs20", "hessid": bssid, "wpa": "2",
6237              "rsn_pairwise": "CCMP", "wpa_key_mgmt": "WPA-EAP",
6238              "ieee80211w": "1", "ieee8021x": "1",
6239              "auth_server_addr": "127.0.0.1", "auth_server_port": "1812",
6240              "auth_server_shared_secret": "radius", "interworking": "1",
6241              "access_network_type": "14", "internet": "1", "asra": "0",
6242              "esr": "0", "uesa": "0", "venue_group": "7", "venue_type": "1",
6243              "venue_name": ["eng:Example venue", "fin:Esimerkkipaikka"],
6244              "roaming_consortium": ["112233", "1020304050", "010203040506",
6245              "fedcba"], "domain_name": "example.com,another.example.com",
6246              "nai_realm": ["0,example.com,13[5:6],21[2:4][5:7]",
6247              "0,another.example.com"], "hs20": "1",
6248              "hs20_wan_metrics": "01:8000:1000:80:240:3000",
6249              "hs20_conn_capab": ["1:0:2", "6:22:1", "17:5060:0"],
6250              "hs20_operating_class": "5173", "anqp_3gpp_cell_net": "244,91",
6251              "hs20_t_c_filename": "terms-and-conditions",
6252              "hs20_t_c_timestamp": "123456789"}
6253
6254    hapd = hostapd.add_ap(apdev[0], params)
6255
6256    class TestDbusInterworking(TestDbus):
6257        def __init__(self, bus):
6258            TestDbus.__init__(self, bus)
6259            self.hs20_t_and_c_seen = False
6260
6261        def __enter__(self):
6262            gobject.timeout_add(1, self.run_connect)
6263            gobject.timeout_add(15000, self.timeout)
6264            self.add_signal(self.hs20TermsAndConditions, WPAS_DBUS_IFACE,
6265                            "HS20TermsAndConditions")
6266            self.loop.run()
6267            return self
6268
6269        def hs20TermsAndConditions(self, t_c_url):
6270            logger.debug("hs20TermsAndConditions: url=%s" % (t_c_url))
6271            url = "https://example.com/t_and_c?addr=%s&ap=123" % dev[0].own_addr()
6272            if url in t_c_url:
6273                self.hs20_t_and_c_seen = True
6274
6275        def run_connect(self, *args):
6276            dev[0].hs20_enable()
6277            dev[0].connect("test-hs20", proto="RSN", key_mgmt="WPA-EAP", eap="TTLS",
6278                           identity="hs20-t-c-test", password="password",
6279                           ca_cert="auth_serv/ca.pem", phase2="auth=MSCHAPV2",
6280                           ieee80211w='2', scan_freq="2412")
6281            return False
6282
6283        def success(self):
6284            return self.hs20_t_and_c_seen
6285
6286    with TestDbusInterworking(bus) as t:
6287        if not t.success():
6288            raise Exception("Expected signals not seen")
6289
6290def test_dbus_anqp_get(dev, apdev):
6291    """D-Bus ANQP get test"""
6292    (bus, wpa_obj, path, if_obj) = prepare_dbus(dev[0])
6293    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
6294
6295    bssid = apdev[0]['bssid']
6296    params = hs20_ap_params(ssid="test-anqp")
6297    params["hessid"] = bssid
6298    params['mbo'] = '1'
6299    params['mbo_cell_data_conn_pref'] = '1'
6300    params['hs20_oper_friendly_name'] = ["eng:Example operator",
6301                                         "fin:Esimerkkioperaattori"]
6302    hapd = hostapd.add_ap(apdev[0], params)
6303
6304    dev[0].flush_scan_cache()
6305    dev[0].scan_for_bss(bssid, freq="2412", force_scan=True)
6306    iface.ANQPGet({"addr": bssid,
6307                   "ids": dbus.Array([257], dbus.Signature("q")),
6308                   "mbo_ids": dbus.Array([2], dbus.Signature("y")),
6309                   "hs20_ids": dbus.Array([3, 4], dbus.Signature("y"))})
6310
6311    ev = dev[0].wait_event(["GAS-QUERY-DONE"], timeout=10)
6312    if ev is None:
6313        raise Exception("GAS query timed out")
6314
6315    ev = dev[0].wait_event(["RX-ANQP"], timeout=1)
6316    if ev is None or "ANQP Capability list" not in ev:
6317        raise Exception("Did not receive Capability list")
6318
6319    ev = dev[0].wait_event(["RX-HS20-ANQP"], timeout=1)
6320    if ev is None or "Operator Friendly Name" not in ev:
6321        raise Exception("Did not receive Operator Friendly Name")
6322
6323    ev = dev[0].wait_event(["RX-MBO-ANQP"], timeout=1)
6324    if ev is None or "cell_conn_pref" not in ev:
6325        raise Exception("Did not receive MBO Cellular Data Connection Preference")
6326
6327    bss = dev[0].get_bss(bssid)
6328    if 'anqp_capability_list' not in bss:
6329        raise Exception("Capability List ANQP-element not seen")
6330
6331def test_dbus_anqp_query_done(dev, apdev):
6332    """D-Bus ANQP get test"""
6333    (bus, wpa_obj, path, if_obj) = prepare_dbus(dev[0])
6334    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
6335
6336    bssid = apdev[0]['bssid']
6337    params = hs20_ap_params(ssid="test-anqp")
6338    params["hessid"] = bssid
6339    params['mbo'] = '1'
6340    params['mbo_cell_data_conn_pref'] = '1'
6341    params['hs20_oper_friendly_name'] = ["eng:Example operator",
6342                                         "fin:Esimerkkioperaattori"]
6343    hapd = hostapd.add_ap(apdev[0], params)
6344
6345    class TestDbusANQPGet(TestDbus):
6346        def __init__(self, bus):
6347            TestDbus.__init__(self, bus)
6348            self.anqp_query_done = False
6349
6350        def __enter__(self):
6351            gobject.timeout_add(1, self.run_query)
6352            gobject.timeout_add(15000, self.timeout)
6353            self.add_signal(self.anqpQueryDone, WPAS_DBUS_IFACE,
6354                            "ANQPQueryDone")
6355            self.loop.run()
6356            return self
6357
6358        def anqpQueryDone(self, addr, result):
6359            logger.debug("anqpQueryDone: addr=%s result=%s" % (addr, result))
6360            if addr == bssid and "SUCCESS" in result:
6361                self.anqp_query_done = True
6362
6363        def run_query(self, *args):
6364            dev[0].scan_for_bss(bssid, freq="2412", force_scan=True)
6365            iface.ANQPGet({"addr": bssid,
6366                           "ids": dbus.Array([257], dbus.Signature("q"))})
6367            return False
6368
6369        def success(self):
6370            return self.anqp_query_done
6371
6372    with TestDbusANQPGet(bus) as t:
6373        if not t.success():
6374            raise Exception("Expected signals not seen")
6375
6376def test_dbus_bss_anqp_properties(dev, apdev):
6377    """D-Bus ANQP BSS properties changed"""
6378    (bus, wpa_obj, path, if_obj) = prepare_dbus(dev[0])
6379    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
6380
6381    bssid = apdev[0]['bssid']
6382    params = hs20_ap_params(ssid="test-anqp")
6383    params["hessid"] = bssid
6384    params['mbo'] = '1'
6385    params['mbo_cell_data_conn_pref'] = '1'
6386    params['hs20_oper_friendly_name'] = ["eng:Example operator",
6387                                         "fin:Esimerkkioperaattori"]
6388    hapd = hostapd.add_ap(apdev[0], params)
6389
6390    class TestDbusANQPBSSPropertiesChanged(TestDbus):
6391        def __init__(self, bus):
6392            TestDbus.__init__(self, bus)
6393            self.capability_list = False
6394            self.venue_name = False
6395            self.roaming_consortium = False
6396            self.nai_realm = False
6397
6398        def __enter__(self):
6399            gobject.timeout_add(1, self.run_query)
6400            gobject.timeout_add(15000, self.timeout)
6401            self.add_signal(self.propertiesChanged, WPAS_DBUS_BSS,
6402                            "PropertiesChanged")
6403            self.loop.run()
6404            return self
6405
6406        def propertiesChanged(self, properties):
6407            logger.debug("propertiesChanged: %s" % str(properties))
6408            if 'ANQP' in properties:
6409                anqp_properties = properties['ANQP']
6410                self.capability_list = 'CapabilityList' in anqp_properties
6411                self.venue_name = 'VenueName' in anqp_properties
6412                self.roaming_consortium = 'RoamingConsortium' in anqp_properties
6413                self.nai_realm = 'NAIRealm' in anqp_properties
6414
6415        def run_query(self, *args):
6416            dev[0].scan_for_bss(bssid, freq="2412", force_scan=True)
6417            iface.ANQPGet({"addr": bssid,
6418                           "ids": dbus.Array([257,258,261,263], dbus.Signature("q"))})
6419            return False
6420
6421        def success(self):
6422            return self.capability_list and self.venue_name and self.roaming_consortium and self.nai_realm
6423
6424    with TestDbusANQPBSSPropertiesChanged(bus) as t:
6425        if not t.success():
6426            raise Exception("Expected signals not seen")
6427