1# wpa_supplicant D-Bus interface tests
2# Copyright (c) 2014-2015, Jouni Malinen <j@w1.fi>
3#
4# This software may be distributed under the terms of the BSD license.
5# See README for more details.
6
7import binascii
8import logging
9logger = logging.getLogger()
10import subprocess
11import time
12import shutil
13import struct
14import sys
15
16try:
17    if sys.version_info[0] > 2:
18        from gi.repository import GObject as gobject
19    else:
20        import gobject
21    import dbus
22    dbus_imported = True
23except ImportError:
24    dbus_imported = False
25
26import hostapd
27from wpasupplicant import WpaSupplicant
28from utils import *
29from p2p_utils import *
30from test_ap_tdls import connect_2sta_open
31from test_ap_eap import check_altsubject_match_support
32from test_nfc_p2p import set_ip_addr_info
33from test_wpas_mesh import check_mesh_support, add_open_mesh_network
34
35WPAS_DBUS_SERVICE = "fi.w1.wpa_supplicant1"
36WPAS_DBUS_PATH = "/fi/w1/wpa_supplicant1"
37WPAS_DBUS_IFACE = "fi.w1.wpa_supplicant1.Interface"
38WPAS_DBUS_IFACE_WPS = WPAS_DBUS_IFACE + ".WPS"
39WPAS_DBUS_NETWORK = "fi.w1.wpa_supplicant1.Network"
40WPAS_DBUS_BSS = "fi.w1.wpa_supplicant1.BSS"
41WPAS_DBUS_IFACE_P2PDEVICE = WPAS_DBUS_IFACE + ".P2PDevice"
42WPAS_DBUS_P2P_PEER = "fi.w1.wpa_supplicant1.Peer"
43WPAS_DBUS_GROUP = "fi.w1.wpa_supplicant1.Group"
44WPAS_DBUS_PERSISTENT_GROUP = "fi.w1.wpa_supplicant1.PersistentGroup"
45WPAS_DBUS_IFACE_MESH = WPAS_DBUS_IFACE + ".Mesh"
46
47def prepare_dbus(dev):
48    if not dbus_imported:
49        logger.info("No dbus module available")
50        raise HwsimSkip("No dbus module available")
51    try:
52        from dbus.mainloop.glib import DBusGMainLoop
53        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
54        bus = dbus.SystemBus()
55        wpas_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH)
56        wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
57        path = wpas.GetInterface(dev.ifname)
58        if_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
59        return (bus, wpas_obj, path, if_obj)
60    except Exception as e:
61        raise HwsimSkip("Could not connect to D-Bus: %s" % e)
62
63class TestDbus(object):
64    def __init__(self, bus):
65        self.loop = gobject.MainLoop()
66        self.signals = []
67        self.bus = bus
68
69    def __exit__(self, type, value, traceback):
70        for s in self.signals:
71            s.remove()
72
73    def add_signal(self, handler, interface, name, byte_arrays=False):
74        s = self.bus.add_signal_receiver(handler, dbus_interface=interface,
75                                         signal_name=name,
76                                         byte_arrays=byte_arrays)
77        self.signals.append(s)
78
79    def timeout(self, *args):
80        logger.debug("timeout")
81        self.loop.quit()
82        return False
83
84class alloc_fail_dbus(object):
85    def __init__(self, dev, count, funcs, operation="Operation",
86                 expected="NoMemory"):
87        self._dev = dev
88        self._count = count
89        self._funcs = funcs
90        self._operation = operation
91        self._expected = expected
92    def __enter__(self):
93        cmd = "TEST_ALLOC_FAIL %d:%s" % (self._count, self._funcs)
94        if "OK" not in self._dev.request(cmd):
95            raise HwsimSkip("TEST_ALLOC_FAIL not supported")
96    def __exit__(self, type, value, traceback):
97        if type is None:
98            raise Exception("%s succeeded during out-of-memory" % self._operation)
99        if type == dbus.exceptions.DBusException and self._expected in str(value):
100            return True
101        if self._dev.request("GET_ALLOC_FAIL") != "0:%s" % self._funcs:
102            raise Exception("%s did not trigger allocation failure" % self._operation)
103        return False
104
105def start_ap(ap, ssid="test-wps",
106             ap_uuid="27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"):
107    params = {"ssid": ssid, "eap_server": "1", "wps_state": "2",
108              "wpa_passphrase": "12345678", "wpa": "2",
109              "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
110              "ap_pin": "12345670", "uuid": ap_uuid}
111    return hostapd.add_ap(ap, params)
112
113def test_dbus_getall(dev, apdev):
114    """D-Bus GetAll"""
115    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
116
117    dev[0].flush_scan_cache()
118
119    props = wpas_obj.GetAll(WPAS_DBUS_SERVICE,
120                            dbus_interface=dbus.PROPERTIES_IFACE)
121    logger.debug("GetAll(fi.w1.wpa.supplicant1, /fi/w1/wpa_supplicant1) ==> " + str(props))
122
123    props = if_obj.GetAll(WPAS_DBUS_IFACE,
124                          dbus_interface=dbus.PROPERTIES_IFACE)
125    logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE, path, str(props)))
126
127    props = if_obj.GetAll(WPAS_DBUS_IFACE_WPS,
128                          dbus_interface=dbus.PROPERTIES_IFACE)
129    logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE_WPS, path, str(props)))
130
131    res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
132                     dbus_interface=dbus.PROPERTIES_IFACE)
133    if len(res) != 0:
134        raise Exception("Unexpected BSSs entry: " + str(res))
135
136    res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
137                     dbus_interface=dbus.PROPERTIES_IFACE)
138    if len(res) != 0:
139        raise Exception("Unexpected Networks entry: " + str(res))
140
141    hapd = hostapd.add_ap(apdev[0], {"ssid": "open"})
142    bssid = apdev[0]['bssid']
143    dev[0].scan_for_bss(bssid, freq=2412)
144    id = dev[0].add_network()
145    dev[0].set_network(id, "disabled", "0")
146    dev[0].set_network_quoted(id, "ssid", "test")
147
148    res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
149                     dbus_interface=dbus.PROPERTIES_IFACE)
150    if len(res) < 1:
151        raise Exception("Missing BSSs entry: " + str(res))
152    if len(res) > 1:
153        raise Exception("Too manu BSSs entries: " + str(res))
154    bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
155    props = bss_obj.GetAll(WPAS_DBUS_BSS, dbus_interface=dbus.PROPERTIES_IFACE)
156    logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS, res[0], str(props)))
157    bssid_str = ''
158    for item in props['BSSID']:
159        if len(bssid_str) > 0:
160            bssid_str += ':'
161        bssid_str += '%02x' % item
162    if bssid_str != bssid:
163        raise Exception("Unexpected BSSID in BSSs entry")
164
165    res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
166                     dbus_interface=dbus.PROPERTIES_IFACE)
167    if len(res) != 1:
168        raise Exception("Missing Networks entry: " + str(res))
169    net_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
170    props = net_obj.GetAll(WPAS_DBUS_NETWORK,
171                           dbus_interface=dbus.PROPERTIES_IFACE)
172    logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_NETWORK, res[0], str(props)))
173    ssid = props['Properties']['ssid']
174    if ssid != '"test"':
175        raise Exception("Unexpected SSID in network entry")
176
177def test_dbus_getall_oom(dev, apdev):
178    """D-Bus GetAll wpa_config_get_all() OOM"""
179    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
180
181    id = dev[0].add_network()
182    dev[0].set_network(id, "disabled", "0")
183    dev[0].set_network_quoted(id, "ssid", "test")
184
185    res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
186                     dbus_interface=dbus.PROPERTIES_IFACE)
187    if len(res) != 1:
188        raise Exception("Missing Networks entry: " + str(res))
189    net_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
190    for i in range(1, 50):
191        with alloc_fail(dev[0], i, "wpa_config_get_all"):
192            try:
193                props = net_obj.GetAll(WPAS_DBUS_NETWORK,
194                                       dbus_interface=dbus.PROPERTIES_IFACE)
195            except dbus.exceptions.DBusException as e:
196                pass
197
198def dbus_get(dbus, wpas_obj, prop, expect=None, byte_arrays=False):
199    val = wpas_obj.Get(WPAS_DBUS_SERVICE, prop,
200                       dbus_interface=dbus.PROPERTIES_IFACE,
201                       byte_arrays=byte_arrays)
202    if expect is not None and val != expect:
203        raise Exception("Unexpected %s: %s (expected: %s)" %
204                        (prop, str(val), str(expect)))
205    return val
206
207def dbus_set(dbus, wpas_obj, prop, val):
208    wpas_obj.Set(WPAS_DBUS_SERVICE, prop, val,
209                 dbus_interface=dbus.PROPERTIES_IFACE)
210
211def test_dbus_properties(dev, apdev):
212    """D-Bus Get/Set fi.w1.wpa_supplicant1 properties"""
213    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
214
215    dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump")
216    dbus_set(dbus, wpas_obj, "DebugLevel", "debug")
217    dbus_get(dbus, wpas_obj, "DebugLevel", expect="debug")
218    for (val, err) in [(3, "Error.Failed: wrong property type"),
219                       ("foo", "Error.Failed: wrong debug level value")]:
220        try:
221            dbus_set(dbus, wpas_obj, "DebugLevel", val)
222            raise Exception("Invalid DebugLevel value accepted: " + str(val))
223        except dbus.exceptions.DBusException as e:
224            if err not in str(e):
225                raise Exception("Unexpected error message: " + str(e))
226    dbus_set(dbus, wpas_obj, "DebugLevel", "msgdump")
227    dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump")
228
229    dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True)
230    dbus_set(dbus, wpas_obj, "DebugTimestamp", False)
231    dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=False)
232    try:
233        dbus_set(dbus, wpas_obj, "DebugTimestamp", "foo")
234        raise Exception("Invalid DebugTimestamp value accepted")
235    except dbus.exceptions.DBusException as e:
236        if "Error.Failed: wrong property type" not in str(e):
237            raise Exception("Unexpected error message: " + str(e))
238    dbus_set(dbus, wpas_obj, "DebugTimestamp", True)
239    dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True)
240
241    dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True)
242    dbus_set(dbus, wpas_obj, "DebugShowKeys", False)
243    dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=False)
244    try:
245        dbus_set(dbus, wpas_obj, "DebugShowKeys", "foo")
246        raise Exception("Invalid DebugShowKeys value accepted")
247    except dbus.exceptions.DBusException as e:
248        if "Error.Failed: wrong property type" not in str(e):
249            raise Exception("Unexpected error message: " + str(e))
250    dbus_set(dbus, wpas_obj, "DebugShowKeys", True)
251    dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True)
252
253    res = dbus_get(dbus, wpas_obj, "Interfaces")
254    if len(res) != 1:
255        raise Exception("Unexpected Interfaces value: " + str(res))
256
257    res = dbus_get(dbus, wpas_obj, "EapMethods")
258    if len(res) < 5 or "TTLS" not in res:
259        raise Exception("Unexpected EapMethods value: " + str(res))
260
261    res = dbus_get(dbus, wpas_obj, "Capabilities")
262    if len(res) < 2 or "p2p" not in res:
263        raise Exception("Unexpected Capabilities value: " + str(res))
264
265    dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
266    val = binascii.unhexlify("010006020304050608")
267    dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val))
268    res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
269    if val != res:
270        raise Exception("WFDIEs value changed")
271    try:
272        dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(b'\x00'))
273        raise Exception("Invalid WFDIEs value accepted")
274    except dbus.exceptions.DBusException as e:
275        if "InvalidArgs" not in str(e):
276            raise Exception("Unexpected error message: " + str(e))
277    dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(b''))
278    dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val))
279    dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(b''))
280    res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
281    if len(res) != 0:
282        raise Exception("WFDIEs not cleared properly")
283
284    res = dbus_get(dbus, wpas_obj, "EapMethods")
285    try:
286        dbus_set(dbus, wpas_obj, "EapMethods", res)
287        raise Exception("Invalid Set accepted")
288    except dbus.exceptions.DBusException as e:
289        if "InvalidArgs: Property is read-only" not in str(e):
290            raise Exception("Unexpected error message: " + str(e))
291
292    try:
293        wpas_obj.SetFoo(WPAS_DBUS_SERVICE, "DebugShowKeys", True,
294                        dbus_interface=dbus.PROPERTIES_IFACE)
295        raise Exception("Unknown method accepted")
296    except dbus.exceptions.DBusException as e:
297        if "UnknownMethod" not in str(e):
298            raise Exception("Unexpected error message: " + str(e))
299
300    try:
301        wpas_obj.Get("foo", "DebugShowKeys",
302                     dbus_interface=dbus.PROPERTIES_IFACE)
303        raise Exception("Invalid Get accepted")
304    except dbus.exceptions.DBusException as e:
305        if "InvalidArgs: No such property" not in str(e):
306            raise Exception("Unexpected error message: " + str(e))
307
308    test_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH,
309                              introspect=False)
310    try:
311        test_obj.Get(123, "DebugShowKeys",
312                     dbus_interface=dbus.PROPERTIES_IFACE)
313        raise Exception("Invalid Get accepted")
314    except dbus.exceptions.DBusException as e:
315        if "InvalidArgs: Invalid arguments" not in str(e):
316            raise Exception("Unexpected error message: " + str(e))
317    try:
318        test_obj.Get(WPAS_DBUS_SERVICE, 123,
319                     dbus_interface=dbus.PROPERTIES_IFACE)
320        raise Exception("Invalid Get accepted")
321    except dbus.exceptions.DBusException as e:
322        if "InvalidArgs: Invalid arguments" not in str(e):
323            raise Exception("Unexpected error message: " + str(e))
324
325    try:
326        wpas_obj.Set(WPAS_DBUS_SERVICE, "WFDIEs",
327                     dbus.ByteArray(b'', variant_level=2),
328                     dbus_interface=dbus.PROPERTIES_IFACE)
329        raise Exception("Invalid Set accepted")
330    except dbus.exceptions.DBusException as e:
331        if "InvalidArgs: invalid message format" not in str(e):
332            raise Exception("Unexpected error message: " + str(e))
333
334def test_dbus_set_global_properties(dev, apdev):
335    """D-Bus Get/Set fi.w1.wpa_supplicant1 interface global properties"""
336    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
337
338    dev[0].set("model_name", "")
339    props = [('Okc', '0', '1'), ('ModelName', '', 'blahblahblah')]
340
341    for p in props:
342        res = if_obj.Get(WPAS_DBUS_IFACE, p[0],
343                         dbus_interface=dbus.PROPERTIES_IFACE)
344        if res != p[1]:
345            raise Exception("Unexpected " + p[0] + " value: " + str(res))
346
347        if_obj.Set(WPAS_DBUS_IFACE, p[0], p[2],
348                   dbus_interface=dbus.PROPERTIES_IFACE)
349
350        res = if_obj.Get(WPAS_DBUS_IFACE, p[0],
351                         dbus_interface=dbus.PROPERTIES_IFACE)
352        if res != p[2]:
353            raise Exception("Unexpected " + p[0] + " value after set: " + str(res))
354    dev[0].set("model_name", "")
355
356def test_dbus_invalid_method(dev, apdev):
357    """D-Bus invalid method"""
358    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
359    wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
360
361    try:
362        wps.Foo()
363        raise Exception("Unknown method accepted")
364    except dbus.exceptions.DBusException as e:
365        if "UnknownMethod" not in str(e):
366            raise Exception("Unexpected error message: " + str(e))
367
368    test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False)
369    test_wps = dbus.Interface(test_obj, WPAS_DBUS_IFACE_WPS)
370    try:
371        test_wps.Start(123)
372        raise Exception("WPS.Start with incorrect signature accepted")
373    except dbus.exceptions.DBusException as e:
374        if "InvalidArgs: Invalid arg" not in str(e):
375            raise Exception("Unexpected error message: " + str(e))
376
377def test_dbus_get_set_wps(dev, apdev):
378    """D-Bus Get/Set for WPS properties"""
379    try:
380        _test_dbus_get_set_wps(dev, apdev)
381    finally:
382        dev[0].request("SET wps_cred_processing 0")
383        dev[0].request("SET config_methods display keypad virtual_display nfc_interface p2ps")
384        dev[0].set("device_name", "Device A")
385        dev[0].set("manufacturer", "")
386        dev[0].set("model_name", "")
387        dev[0].set("model_number", "")
388        dev[0].set("serial_number", "")
389        dev[0].set("device_type", "0-00000000-0")
390
391def _test_dbus_get_set_wps(dev, apdev):
392    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
393
394    if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
395               dbus_interface=dbus.PROPERTIES_IFACE)
396
397    val = "display keypad virtual_display nfc_interface"
398    dev[0].request("SET config_methods " + val)
399
400    config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
401                        dbus_interface=dbus.PROPERTIES_IFACE)
402    if config != val:
403        raise Exception("Unexpected Get(ConfigMethods) result: " + config)
404
405    val2 = "push_button display"
406    if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2,
407               dbus_interface=dbus.PROPERTIES_IFACE)
408    config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
409                        dbus_interface=dbus.PROPERTIES_IFACE)
410    if config != val2:
411        raise Exception("Unexpected Get(ConfigMethods) result after Set: " + config)
412
413    dev[0].request("SET config_methods " + val)
414
415    for i in range(3):
416        dev[0].request("SET wps_cred_processing " + str(i))
417        val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
418                         dbus_interface=dbus.PROPERTIES_IFACE)
419        expected_val = False if i == 1 else True
420        if val != expected_val:
421            raise Exception("Unexpected Get(ProcessCredentials) result({}): {}".format(i, val))
422
423    tests = [("device_name", "DeviceName"),
424             ("manufacturer", "Manufacturer"),
425             ("model_name", "ModelName"),
426             ("model_number", "ModelNumber"),
427             ("serial_number", "SerialNumber")]
428
429    for f1, f2 in tests:
430        val2 = "test-value-test"
431        dev[0].set(f1, val2)
432        val = if_obj.Get(WPAS_DBUS_IFACE_WPS, f2,
433                         dbus_interface=dbus.PROPERTIES_IFACE)
434        if val != val2:
435            raise Exception("Get(%s) returned unexpected value" % f2)
436        val2 = "TEST-value"
437        if_obj.Set(WPAS_DBUS_IFACE_WPS, f2, val2,
438                   dbus_interface=dbus.PROPERTIES_IFACE)
439        val = if_obj.Get(WPAS_DBUS_IFACE_WPS, f2,
440                         dbus_interface=dbus.PROPERTIES_IFACE)
441        if val != val2:
442            raise Exception("Get(%s) returned unexpected value after Set" % f2)
443
444    dev[0].set("device_type", "5-0050F204-1")
445    val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "DeviceType",
446                     dbus_interface=dbus.PROPERTIES_IFACE)
447    if val[0] != 0x00 or val[1] != 0x05 != val[2] != 0x00 or val[3] != 0x50 or val[4] != 0xf2 or val[5] != 0x04 or val[6] != 0x00 or val[7] != 0x01:
448        raise Exception("DeviceType mismatch")
449    if_obj.Set(WPAS_DBUS_IFACE_WPS, "DeviceType", val,
450               dbus_interface=dbus.PROPERTIES_IFACE)
451    val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "DeviceType",
452                     dbus_interface=dbus.PROPERTIES_IFACE)
453    if val[0] != 0x00 or val[1] != 0x05 != val[2] != 0x00 or val[3] != 0x50 or val[4] != 0xf2 or val[5] != 0x04 or val[6] != 0x00 or val[7] != 0x01:
454        raise Exception("DeviceType mismatch after Set")
455
456    val2 = b'\x01\x02\x03\x04\x05\x06\x07\x08'
457    if_obj.Set(WPAS_DBUS_IFACE_WPS, "DeviceType", dbus.ByteArray(val2),
458               dbus_interface=dbus.PROPERTIES_IFACE)
459    val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "DeviceType",
460                     dbus_interface=dbus.PROPERTIES_IFACE,
461                     byte_arrays=True)
462    if val != val2:
463        raise Exception("DeviceType mismatch after Set (2)")
464
465    class TestDbusGetSet(TestDbus):
466        def __init__(self, bus):
467            TestDbus.__init__(self, bus)
468            self.signal_received = False
469            self.signal_received_deprecated = False
470            self.sets_done = False
471
472        def __enter__(self):
473            gobject.timeout_add(1, self.run_sets)
474            gobject.timeout_add(1000, self.timeout)
475            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE_WPS,
476                            "PropertiesChanged")
477            self.add_signal(self.propertiesChanged2, dbus.PROPERTIES_IFACE,
478                            "PropertiesChanged")
479            self.loop.run()
480            return self
481
482        def propertiesChanged(self, properties):
483            logger.debug("PropertiesChanged: " + str(properties))
484            if "ProcessCredentials" in properties:
485                self.signal_received_deprecated = True
486                if self.sets_done and self.signal_received:
487                    self.loop.quit()
488
489        def propertiesChanged2(self, interface_name, changed_properties,
490                               invalidated_properties):
491            logger.debug("propertiesChanged2: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
492            if interface_name != WPAS_DBUS_IFACE_WPS:
493                return
494            if "ProcessCredentials" in changed_properties:
495                self.signal_received = True
496                if self.sets_done and self.signal_received_deprecated:
497                    self.loop.quit()
498
499        def run_sets(self, *args):
500            logger.debug("run_sets")
501            if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
502                       dbus.Boolean(1),
503                       dbus_interface=dbus.PROPERTIES_IFACE)
504            if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
505                          dbus_interface=dbus.PROPERTIES_IFACE) != True:
506                raise Exception("Unexpected Get(ProcessCredentials) result after Set")
507            if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
508                       dbus.Boolean(0),
509                       dbus_interface=dbus.PROPERTIES_IFACE)
510            if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
511                          dbus_interface=dbus.PROPERTIES_IFACE) != False:
512                raise Exception("Unexpected Get(ProcessCredentials) result after Set")
513
514            self.dbus_sets_done = True
515            return False
516
517        def success(self):
518            return self.signal_received and self.signal_received_deprecated
519
520    with TestDbusGetSet(bus) as t:
521        if not t.success():
522            raise Exception("No signal received for ProcessCredentials change")
523
524def test_dbus_wps_invalid(dev, apdev):
525    """D-Bus invaldi WPS operation"""
526    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
527    wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
528
529    failures = [{'Role': 'foo', 'Type': 'pbc'},
530                {'Role': 123, 'Type': 'pbc'},
531                {'Type': 'pbc'},
532                {'Role': 'enrollee'},
533                {'Role': 'registrar'},
534                {'Role': 'enrollee', 'Type': 123},
535                {'Role': 'enrollee', 'Type': 'foo'},
536                {'Role': 'enrollee', 'Type': 'pbc',
537                 'Bssid': '02:33:44:55:66:77'},
538                {'Role': 'enrollee', 'Type': 'pin', 'Pin': 123},
539                {'Role': 'enrollee', 'Type': 'pbc',
540                 'Bssid': dbus.ByteArray(b'12345')},
541                {'Role': 'enrollee', 'Type': 'pbc',
542                 'P2PDeviceAddress': 12345},
543                {'Role': 'enrollee', 'Type': 'pbc',
544                 'P2PDeviceAddress': dbus.ByteArray(b'12345')},
545                {'Role': 'enrollee', 'Type': 'pbc', 'Foo': 'bar'}]
546    for args in failures:
547        try:
548            wps.Start(args)
549            raise Exception("Invalid WPS.Start() arguments accepted: " + str(args))
550        except dbus.exceptions.DBusException as e:
551            if not str(e).startswith("fi.w1.wpa_supplicant1.InvalidArgs"):
552                raise Exception("Unexpected error message: " + str(e))
553
554def test_dbus_wps_oom(dev, apdev):
555    """D-Bus WPS operation (OOM)"""
556    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
557    wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
558
559    with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_state", "Get"):
560        if_obj.Get(WPAS_DBUS_IFACE, "State",
561                   dbus_interface=dbus.PROPERTIES_IFACE)
562
563    hapd = hostapd.add_ap(apdev[0], {"ssid": "open"})
564    bssid = apdev[0]['bssid']
565    dev[0].scan_for_bss(bssid, freq=2412)
566
567    time.sleep(0.05)
568    for i in range(1, 3):
569        with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_bsss", "Get"):
570            if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
571                       dbus_interface=dbus.PROPERTIES_IFACE)
572
573    res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
574                     dbus_interface=dbus.PROPERTIES_IFACE)
575    bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
576    with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_bss_rates", "Get"):
577        bss_obj.Get(WPAS_DBUS_BSS, "Rates",
578                    dbus_interface=dbus.PROPERTIES_IFACE)
579    with alloc_fail(dev[0], 1,
580                    "wpa_bss_get_bit_rates;wpas_dbus_getter_bss_rates"):
581        try:
582            bss_obj.Get(WPAS_DBUS_BSS, "Rates",
583                        dbus_interface=dbus.PROPERTIES_IFACE)
584        except dbus.exceptions.DBusException as e:
585            pass
586
587    id = dev[0].add_network()
588    dev[0].set_network(id, "disabled", "0")
589    dev[0].set_network_quoted(id, "ssid", "test")
590
591    for i in range(1, 3):
592        with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_networks", "Get"):
593            if_obj.Get(WPAS_DBUS_IFACE, "Networks",
594                       dbus_interface=dbus.PROPERTIES_IFACE)
595
596    with alloc_fail_dbus(dev[0], 1, "wpas_dbus_getter_interfaces", "Get"):
597        dbus_get(dbus, wpas_obj, "Interfaces")
598
599    for i in range(1, 6):
600        with alloc_fail_dbus(dev[0], i, "=eap_get_names_as_string_array;wpas_dbus_getter_eap_methods", "Get"):
601            dbus_get(dbus, wpas_obj, "EapMethods")
602
603    with alloc_fail_dbus(dev[0], 1, "wpas_dbus_setter_config_methods", "Set",
604                         expected="Error.Failed: Failed to set property"):
605        val2 = "push_button display"
606        if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2,
607                   dbus_interface=dbus.PROPERTIES_IFACE)
608
609    with alloc_fail_dbus(dev[0], 1, "=wpa_config_add_network;wpas_dbus_handler_wps_start",
610                         "WPS.Start",
611                         expected="UnknownError: WPS start failed"):
612        wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670'})
613
614def test_dbus_wps_pbc(dev, apdev):
615    """D-Bus WPS/PBC operation and signals"""
616    try:
617        _test_dbus_wps_pbc(dev, apdev)
618    finally:
619        dev[0].request("SET wps_cred_processing 0")
620
621def _test_dbus_wps_pbc(dev, apdev):
622    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
623    wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
624
625    hapd = start_ap(apdev[0])
626    hapd.request("WPS_PBC")
627    bssid = apdev[0]['bssid']
628    dev[0].flush_scan_cache()
629    dev[0].scan_for_bss(bssid, freq="2412")
630    dev[0].request("SET wps_cred_processing 2")
631
632    res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
633                     dbus_interface=dbus.PROPERTIES_IFACE)
634    if len(res) != 1:
635        raise Exception("Missing BSSs entry: " + str(res))
636    bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
637    props = bss_obj.GetAll(WPAS_DBUS_BSS, dbus_interface=dbus.PROPERTIES_IFACE)
638    logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS, res[0], str(props)))
639    if 'WPS' not in props:
640        raise Exception("No WPS information in the BSS entry")
641    if 'Type' not in props['WPS']:
642        raise Exception("No Type field in the WPS dictionary")
643    if props['WPS']['Type'] != 'pbc':
644        raise Exception("Unexpected WPS Type: " + props['WPS']['Type'])
645
646    class TestDbusWps(TestDbus):
647        def __init__(self, bus, wps):
648            TestDbus.__init__(self, bus)
649            self.success_seen = False
650            self.credentials_received = False
651            self.wps = wps
652
653        def __enter__(self):
654            gobject.timeout_add(1, self.start_pbc)
655            gobject.timeout_add(15000, self.timeout)
656            self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
657            self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
658                            "Credentials")
659            self.loop.run()
660            return self
661
662        def wpsEvent(self, name, args):
663            logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
664            if name == "success":
665                self.success_seen = True
666                if self.credentials_received:
667                    self.loop.quit()
668
669        def credentials(self, args):
670            logger.debug("credentials: " + str(args))
671            self.credentials_received = True
672            if self.success_seen:
673                self.loop.quit()
674
675        def start_pbc(self, *args):
676            logger.debug("start_pbc")
677            self.wps.Start({'Role': 'enrollee', 'Type': 'pbc'})
678            return False
679
680        def success(self):
681            return self.success_seen and self.credentials_received
682
683    with TestDbusWps(bus, wps) as t:
684        if not t.success():
685            raise Exception("Failure in D-Bus operations")
686
687    dev[0].wait_connected(timeout=10)
688    dev[0].request("DISCONNECT")
689    hapd.disable()
690    dev[0].flush_scan_cache()
691
692def test_dbus_wps_pbc_overlap(dev, apdev):
693    """D-Bus WPS/PBC operation and signal for PBC overlap"""
694    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
695    wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
696
697    hapd = start_ap(apdev[0])
698    hapd2 = start_ap(apdev[1], ssid="test-wps2",
699                     ap_uuid="27ea801a-9e5c-4e73-bd82-f89cbcd10d7f")
700    hapd.request("WPS_PBC")
701    hapd2.request("WPS_PBC")
702    bssid = apdev[0]['bssid']
703    dev[0].scan_for_bss(bssid, freq="2412")
704    bssid2 = apdev[1]['bssid']
705    dev[0].scan_for_bss(bssid2, freq="2412")
706
707    class TestDbusWps(TestDbus):
708        def __init__(self, bus, wps):
709            TestDbus.__init__(self, bus)
710            self.overlap_seen = False
711            self.wps = wps
712
713        def __enter__(self):
714            gobject.timeout_add(1, self.start_pbc)
715            gobject.timeout_add(15000, self.timeout)
716            self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
717            self.loop.run()
718            return self
719
720        def wpsEvent(self, name, args):
721            logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
722            if name == "pbc-overlap":
723                self.overlap_seen = True
724                self.loop.quit()
725
726        def start_pbc(self, *args):
727            logger.debug("start_pbc")
728            self.wps.Start({'Role': 'enrollee', 'Type': 'pbc'})
729            return False
730
731        def success(self):
732            return self.overlap_seen
733
734    with TestDbusWps(bus, wps) as t:
735        if not t.success():
736            raise Exception("Failure in D-Bus operations")
737
738    dev[0].request("WPS_CANCEL")
739    dev[0].request("DISCONNECT")
740    hapd.disable()
741    dev[0].flush_scan_cache()
742
743def test_dbus_wps_pin(dev, apdev):
744    """D-Bus WPS/PIN operation and signals"""
745    try:
746        _test_dbus_wps_pin(dev, apdev)
747    finally:
748        dev[0].request("SET wps_cred_processing 0")
749
750def _test_dbus_wps_pin(dev, apdev):
751    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
752    wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
753
754    hapd = start_ap(apdev[0])
755    hapd.request("WPS_PIN any 12345670")
756    bssid = apdev[0]['bssid']
757    dev[0].scan_for_bss(bssid, freq="2412")
758    dev[0].request("SET wps_cred_processing 2")
759
760    class TestDbusWps(TestDbus):
761        def __init__(self, bus):
762            TestDbus.__init__(self, bus)
763            self.success_seen = False
764            self.credentials_received = False
765
766        def __enter__(self):
767            gobject.timeout_add(1, self.start_pin)
768            gobject.timeout_add(15000, self.timeout)
769            self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
770            self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
771                            "Credentials")
772            self.loop.run()
773            return self
774
775        def wpsEvent(self, name, args):
776            logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
777            if name == "success":
778                self.success_seen = True
779                if self.credentials_received:
780                    self.loop.quit()
781
782        def credentials(self, args):
783            logger.debug("credentials: " + str(args))
784            self.credentials_received = True
785            if self.success_seen:
786                self.loop.quit()
787
788        def start_pin(self, *args):
789            logger.debug("start_pin")
790            bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':', '').encode()))
791            wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
792                       'Bssid': bssid_ay})
793            return False
794
795        def success(self):
796            return self.success_seen and self.credentials_received
797
798    with TestDbusWps(bus) as t:
799        if not t.success():
800            raise Exception("Failure in D-Bus operations")
801
802    dev[0].wait_connected(timeout=10)
803
804def test_dbus_wps_pin2(dev, apdev):
805    """D-Bus WPS/PIN operation and signals (PIN from wpa_supplicant)"""
806    try:
807        _test_dbus_wps_pin2(dev, apdev)
808    finally:
809        dev[0].request("SET wps_cred_processing 0")
810
811def _test_dbus_wps_pin2(dev, apdev):
812    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
813    wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
814
815    hapd = start_ap(apdev[0])
816    bssid = apdev[0]['bssid']
817    dev[0].scan_for_bss(bssid, freq="2412")
818    dev[0].request("SET wps_cred_processing 2")
819
820    class TestDbusWps(TestDbus):
821        def __init__(self, bus):
822            TestDbus.__init__(self, bus)
823            self.success_seen = False
824            self.failed = False
825
826        def __enter__(self):
827            gobject.timeout_add(1, self.start_pin)
828            gobject.timeout_add(15000, self.timeout)
829            self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
830            self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
831                            "Credentials")
832            self.loop.run()
833            return self
834
835        def wpsEvent(self, name, args):
836            logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
837            if name == "success":
838                self.success_seen = True
839                if self.credentials_received:
840                    self.loop.quit()
841
842        def credentials(self, args):
843            logger.debug("credentials: " + str(args))
844            self.credentials_received = True
845            if self.success_seen:
846                self.loop.quit()
847
848        def start_pin(self, *args):
849            logger.debug("start_pin")
850            bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':', '').encode()))
851            res = wps.Start({'Role': 'enrollee', 'Type': 'pin',
852                             'Bssid': bssid_ay})
853            pin = res['Pin']
854            h = hostapd.Hostapd(apdev[0]['ifname'])
855            h.request("WPS_PIN any " + pin)
856            return False
857
858        def success(self):
859            return self.success_seen and self.credentials_received
860
861    with TestDbusWps(bus) as t:
862        if not t.success():
863            raise Exception("Failure in D-Bus operations")
864
865    dev[0].wait_connected(timeout=10)
866
867def test_dbus_wps_pin_m2d(dev, apdev):
868    """D-Bus WPS/PIN operation and signals with M2D"""
869    try:
870        _test_dbus_wps_pin_m2d(dev, apdev)
871    finally:
872        dev[0].request("SET wps_cred_processing 0")
873
874def _test_dbus_wps_pin_m2d(dev, apdev):
875    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
876    wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
877
878    hapd = start_ap(apdev[0])
879    bssid = apdev[0]['bssid']
880    dev[0].scan_for_bss(bssid, freq="2412")
881    dev[0].request("SET wps_cred_processing 2")
882
883    class TestDbusWps(TestDbus):
884        def __init__(self, bus):
885            TestDbus.__init__(self, bus)
886            self.success_seen = False
887            self.credentials_received = False
888
889        def __enter__(self):
890            gobject.timeout_add(1, self.start_pin)
891            gobject.timeout_add(15000, self.timeout)
892            self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
893            self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
894                            "Credentials")
895            self.loop.run()
896            return self
897
898        def wpsEvent(self, name, args):
899            logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
900            if name == "success":
901                self.success_seen = True
902                if self.credentials_received:
903                    self.loop.quit()
904            elif name == "m2d":
905                h = hostapd.Hostapd(apdev[0]['ifname'])
906                h.request("WPS_PIN any 12345670")
907
908        def credentials(self, args):
909            logger.debug("credentials: " + str(args))
910            self.credentials_received = True
911            if self.success_seen:
912                self.loop.quit()
913
914        def start_pin(self, *args):
915            logger.debug("start_pin")
916            bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':', '').encode()))
917            wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
918                       'Bssid': bssid_ay})
919            return False
920
921        def success(self):
922            return self.success_seen and self.credentials_received
923
924    with TestDbusWps(bus) as t:
925        if not t.success():
926            raise Exception("Failure in D-Bus operations")
927
928    dev[0].wait_connected(timeout=10)
929
930def test_dbus_wps_reg(dev, apdev):
931    """D-Bus WPS/Registrar operation and signals"""
932    try:
933        _test_dbus_wps_reg(dev, apdev)
934    finally:
935        dev[0].request("SET wps_cred_processing 0")
936
937def _test_dbus_wps_reg(dev, apdev):
938    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
939    wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
940
941    hapd = start_ap(apdev[0])
942    hapd.request("WPS_PIN any 12345670")
943    bssid = apdev[0]['bssid']
944    dev[0].scan_for_bss(bssid, freq="2412")
945    dev[0].request("SET wps_cred_processing 2")
946
947    class TestDbusWps(TestDbus):
948        def __init__(self, bus):
949            TestDbus.__init__(self, bus)
950            self.credentials_received = False
951
952        def __enter__(self):
953            gobject.timeout_add(100, self.start_reg)
954            gobject.timeout_add(15000, self.timeout)
955            self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
956            self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
957                            "Credentials")
958            self.loop.run()
959            return self
960
961        def wpsEvent(self, name, args):
962            logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
963
964        def credentials(self, args):
965            logger.debug("credentials: " + str(args))
966            self.credentials_received = True
967            self.loop.quit()
968
969        def start_reg(self, *args):
970            logger.debug("start_reg")
971            bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':', '').encode()))
972            wps.Start({'Role': 'registrar', 'Type': 'pin',
973                       'Pin': '12345670', 'Bssid': bssid_ay})
974            return False
975
976        def success(self):
977            return self.credentials_received
978
979    with TestDbusWps(bus) as t:
980        if not t.success():
981            raise Exception("Failure in D-Bus operations")
982
983    dev[0].wait_connected(timeout=10)
984
985def test_dbus_wps_cancel(dev, apdev):
986    """D-Bus WPS Cancel operation"""
987    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
988    wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
989
990    hapd = start_ap(apdev[0])
991    bssid = apdev[0]['bssid']
992
993    wps.Cancel()
994    dev[0].scan_for_bss(bssid, freq="2412")
995    bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':', '').encode()))
996    wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
997               'Bssid': bssid_ay})
998    wps.Cancel()
999    dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 1)
1000
1001def test_dbus_scan_invalid(dev, apdev):
1002    """D-Bus invalid scan method"""
1003    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1004    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1005
1006    tests = [({}, "InvalidArgs"),
1007             ({'Type': 123}, "InvalidArgs"),
1008             ({'Type': 'foo'}, "InvalidArgs"),
1009             ({'Type': 'active', 'Foo': 'bar'}, "InvalidArgs"),
1010             ({'Type': 'active', 'SSIDs': 'foo'}, "InvalidArgs"),
1011             ({'Type': 'active', 'SSIDs': ['foo']}, "InvalidArgs"),
1012             ({'Type': 'active',
1013               'SSIDs': [dbus.ByteArray(b"1"), dbus.ByteArray(b"2"),
1014                         dbus.ByteArray(b"3"), dbus.ByteArray(b"4"),
1015                         dbus.ByteArray(b"5"), dbus.ByteArray(b"6"),
1016                         dbus.ByteArray(b"7"), dbus.ByteArray(b"8"),
1017                         dbus.ByteArray(b"9"), dbus.ByteArray(b"10"),
1018                         dbus.ByteArray(b"11"), dbus.ByteArray(b"12"),
1019                         dbus.ByteArray(b"13"), dbus.ByteArray(b"14"),
1020                         dbus.ByteArray(b"15"), dbus.ByteArray(b"16"),
1021                         dbus.ByteArray(b"17")]},
1022              "InvalidArgs"),
1023             ({'Type': 'active',
1024               'SSIDs': [dbus.ByteArray(b"1234567890abcdef1234567890abcdef1")]},
1025              "InvalidArgs"),
1026             ({'Type': 'active', 'IEs': 'foo'}, "InvalidArgs"),
1027             ({'Type': 'active', 'IEs': ['foo']}, "InvalidArgs"),
1028             ({'Type': 'active', 'Channels': 2412}, "InvalidArgs"),
1029             ({'Type': 'active', 'Channels': [2412]}, "InvalidArgs"),
1030             ({'Type': 'active',
1031               'Channels': [(dbus.Int32(2412), dbus.UInt32(20))]},
1032              "InvalidArgs"),
1033             ({'Type': 'active',
1034               'Channels': [(dbus.UInt32(2412), dbus.Int32(20))]},
1035              "InvalidArgs"),
1036             ({'Type': 'active', 'AllowRoam': "yes"}, "InvalidArgs"),
1037             ({'Type': 'passive', 'IEs': [dbus.ByteArray(b"\xdd\x00")]},
1038              "InvalidArgs"),
1039             ({'Type': 'passive', 'SSIDs': [dbus.ByteArray(b"foo")]},
1040              "InvalidArgs")]
1041    for (t, err) in tests:
1042        try:
1043            iface.Scan(t)
1044            raise Exception("Invalid Scan() arguments accepted: " + str(t))
1045        except dbus.exceptions.DBusException as e:
1046            if err not in str(e):
1047                raise Exception("Unexpected error message for invalid Scan(%s): %s" % (str(t), str(e)))
1048
1049def test_dbus_scan_oom(dev, apdev):
1050    """D-Bus scan method and OOM"""
1051    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1052    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1053
1054    with alloc_fail_dbus(dev[0], 1,
1055                         "wpa_scan_clone_params;wpas_dbus_handler_scan",
1056                         "Scan", expected="ScanError: Scan request rejected"):
1057        iface.Scan({'Type': 'passive',
1058                    'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1059
1060    with alloc_fail_dbus(dev[0], 1,
1061                         "=wpas_dbus_get_scan_channels;wpas_dbus_handler_scan",
1062                         "Scan"):
1063        iface.Scan({'Type': 'passive',
1064                    'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1065
1066    with alloc_fail_dbus(dev[0], 1,
1067                         "=wpas_dbus_get_scan_ies;wpas_dbus_handler_scan",
1068                         "Scan"):
1069        iface.Scan({'Type': 'active',
1070                    'IEs': [dbus.ByteArray(b"\xdd\x00")],
1071                    'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1072
1073    with alloc_fail_dbus(dev[0], 1,
1074                         "=wpas_dbus_get_scan_ssids;wpas_dbus_handler_scan",
1075                         "Scan"):
1076        iface.Scan({'Type': 'active',
1077                    'SSIDs': [dbus.ByteArray(b"open"),
1078                              dbus.ByteArray()],
1079                    'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1080
1081def test_dbus_scan(dev, apdev):
1082    """D-Bus scan and related signals"""
1083    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1084    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1085
1086    hapd = hostapd.add_ap(apdev[0], {"ssid": "open"})
1087
1088    class TestDbusScan(TestDbus):
1089        def __init__(self, bus):
1090            TestDbus.__init__(self, bus)
1091            self.scan_completed = 0
1092            self.bss_added = False
1093            self.fail_reason = None
1094
1095        def __enter__(self):
1096            gobject.timeout_add(1, self.run_scan)
1097            gobject.timeout_add(15000, self.timeout)
1098            self.add_signal(self.scanDone, WPAS_DBUS_IFACE, "ScanDone")
1099            self.add_signal(self.bssAdded, WPAS_DBUS_IFACE, "BSSAdded")
1100            self.add_signal(self.bssRemoved, WPAS_DBUS_IFACE, "BSSRemoved")
1101            self.loop.run()
1102            return self
1103
1104        def scanDone(self, success):
1105            logger.debug("scanDone: success=%s" % success)
1106            self.scan_completed += 1
1107            if self.scan_completed == 1:
1108                iface.Scan({'Type': 'passive',
1109                            'AllowRoam': True,
1110                            'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1111            elif self.scan_completed == 2:
1112                iface.Scan({'Type': 'passive',
1113                            'AllowRoam': False})
1114            elif self.bss_added and self.scan_completed == 3:
1115                self.loop.quit()
1116
1117        def bssAdded(self, bss, properties):
1118            logger.debug("bssAdded: %s" % bss)
1119            logger.debug(str(properties))
1120            if 'WPS' in properties:
1121                if 'Type' in properties['WPS']:
1122                    self.fail_reason = "Unexpected WPS dictionary entry in non-WPS BSS"
1123                    self.loop.quit()
1124            self.bss_added = True
1125            if self.scan_completed == 3:
1126                self.loop.quit()
1127
1128        def bssRemoved(self, bss):
1129            logger.debug("bssRemoved: %s" % bss)
1130
1131        def run_scan(self, *args):
1132            logger.debug("run_scan")
1133            iface.Scan({'Type': 'active',
1134                        'SSIDs': [dbus.ByteArray(b"open"),
1135                                  dbus.ByteArray()],
1136                        'IEs': [dbus.ByteArray(b"\xdd\x00"),
1137                                dbus.ByteArray()],
1138                        'AllowRoam': False,
1139                        'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1140            return False
1141
1142        def success(self):
1143            return self.scan_completed == 3 and self.bss_added
1144
1145    with TestDbusScan(bus) as t:
1146        if t.fail_reason:
1147            raise Exception(t.fail_reason)
1148        if not t.success():
1149            raise Exception("Expected signals not seen")
1150
1151    res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
1152                     dbus_interface=dbus.PROPERTIES_IFACE)
1153    if len(res) < 1:
1154        raise Exception("Scan result not in BSSs property")
1155    iface.FlushBSS(0)
1156    res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
1157                     dbus_interface=dbus.PROPERTIES_IFACE)
1158    if len(res) != 0:
1159        raise Exception("FlushBSS() did not remove scan results from BSSs property")
1160    iface.FlushBSS(1)
1161
1162def test_dbus_scan_rand(dev, apdev):
1163    """D-Bus MACAddressRandomizationMask property Get/Set"""
1164    try:
1165        run_dbus_scan_rand(dev, apdev)
1166    finally:
1167        dev[0].request("MAC_RAND_SCAN all enable=0")
1168
1169def run_dbus_scan_rand(dev, apdev):
1170    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1171    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1172
1173    res = if_obj.Get(WPAS_DBUS_IFACE, "MACAddressRandomizationMask",
1174                     dbus_interface=dbus.PROPERTIES_IFACE)
1175    if len(res) != 0:
1176        logger.info(str(res))
1177        raise Exception("Unexpected initial MACAddressRandomizationMask value")
1178
1179    try:
1180        if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask", "foo",
1181                   dbus_interface=dbus.PROPERTIES_IFACE)
1182        raise Exception("Invalid Set accepted")
1183    except dbus.exceptions.DBusException as e:
1184        if "InvalidArgs: invalid message format" not in str(e):
1185            raise Exception("Unexpected error message: " + str(e))
1186
1187    try:
1188        if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask",
1189                   {"foo": "bar"},
1190                   dbus_interface=dbus.PROPERTIES_IFACE)
1191        raise Exception("Invalid Set accepted")
1192    except dbus.exceptions.DBusException as e:
1193        if "wpas_dbus_setter_mac_address_randomization_mask: mask was not a byte array" not in str(e):
1194            raise Exception("Unexpected error message: " + str(e))
1195
1196    try:
1197        if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask",
1198                   {"foo": dbus.ByteArray(b'123456')},
1199                   dbus_interface=dbus.PROPERTIES_IFACE)
1200        raise Exception("Invalid Set accepted")
1201    except dbus.exceptions.DBusException as e:
1202        if 'wpas_dbus_setter_mac_address_randomization_mask: bad scan type "foo"' not in str(e):
1203            raise Exception("Unexpected error message: " + str(e))
1204
1205    try:
1206        if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask",
1207                   {"scan": dbus.ByteArray(b'12345')},
1208                   dbus_interface=dbus.PROPERTIES_IFACE)
1209        raise Exception("Invalid Set accepted")
1210    except dbus.exceptions.DBusException as e:
1211        if 'wpas_dbus_setter_mac_address_randomization_mask: malformed MAC mask given' not in str(e):
1212            raise Exception("Unexpected error message: " + str(e))
1213
1214    if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask",
1215               {"scan": dbus.ByteArray(b'123456')},
1216               dbus_interface=dbus.PROPERTIES_IFACE)
1217    res = if_obj.Get(WPAS_DBUS_IFACE, "MACAddressRandomizationMask",
1218                     dbus_interface=dbus.PROPERTIES_IFACE)
1219    if len(res) != 1:
1220        logger.info(str(res))
1221        raise Exception("Unexpected MACAddressRandomizationMask value")
1222
1223    try:
1224        if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask",
1225                   {"scan": dbus.ByteArray(b'123456'),
1226                    "sched_scan": dbus.ByteArray(b'987654')},
1227                   dbus_interface=dbus.PROPERTIES_IFACE)
1228    except dbus.exceptions.DBusException as e:
1229        # sched_scan is unlikely to be supported
1230        pass
1231
1232    if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask",
1233               dbus.Dictionary({}, signature='sv'),
1234               dbus_interface=dbus.PROPERTIES_IFACE)
1235    res = if_obj.Get(WPAS_DBUS_IFACE, "MACAddressRandomizationMask",
1236                     dbus_interface=dbus.PROPERTIES_IFACE)
1237    if len(res) != 0:
1238        logger.info(str(res))
1239        raise Exception("Unexpected MACAddressRandomizationMask value")
1240
1241def test_dbus_scan_busy(dev, apdev):
1242    """D-Bus scan trigger rejection when busy with previous scan"""
1243    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1244    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1245
1246    if "OK" not in dev[0].request("SCAN freq=2412-2462"):
1247        raise Exception("Failed to start scan")
1248    ev = dev[0].wait_event(["CTRL-EVENT-SCAN-STARTED"], 15)
1249    if ev is None:
1250        raise Exception("Scan start timed out")
1251
1252    try:
1253        iface.Scan({'Type': 'active', 'AllowRoam': False})
1254        raise Exception("Scan() accepted when busy")
1255    except dbus.exceptions.DBusException as e:
1256        if "ScanError: Scan request reject" not in str(e):
1257            raise Exception("Unexpected error message: " + str(e))
1258
1259    ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
1260    if ev is None:
1261        raise Exception("Scan timed out")
1262
1263def test_dbus_scan_abort(dev, apdev):
1264    """D-Bus scan trigger and abort"""
1265    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1266    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1267
1268    iface.Scan({'Type': 'active', 'AllowRoam': False})
1269    ev = dev[0].wait_event(["CTRL-EVENT-SCAN-STARTED"], 15)
1270    if ev is None:
1271        raise Exception("Scan start timed out")
1272
1273    iface.AbortScan()
1274    ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
1275    if ev is None:
1276        raise Exception("Scan abort result timed out")
1277    dev[0].dump_monitor()
1278    iface.Scan({'Type': 'active', 'AllowRoam': False})
1279    iface.AbortScan()
1280
1281    ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
1282    if ev is None:
1283        raise Exception("Scan timed out")
1284
1285def test_dbus_connect(dev, apdev):
1286    """D-Bus AddNetwork and connect"""
1287    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1288    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1289
1290    ssid = "test-wpa2-psk"
1291    passphrase = 'qwertyuiop'
1292    params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
1293    hapd = hostapd.add_ap(apdev[0], params)
1294
1295    class TestDbusConnect(TestDbus):
1296        def __init__(self, bus):
1297            TestDbus.__init__(self, bus)
1298            self.network_added = False
1299            self.network_selected = False
1300            self.network_removed = False
1301            self.state = 0
1302
1303        def __enter__(self):
1304            gobject.timeout_add(1, self.run_connect)
1305            gobject.timeout_add(15000, self.timeout)
1306            self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
1307            self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE,
1308                            "NetworkRemoved")
1309            self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
1310                            "NetworkSelected")
1311            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1312                            "PropertiesChanged")
1313            self.loop.run()
1314            return self
1315
1316        def networkAdded(self, network, properties):
1317            logger.debug("networkAdded: %s" % str(network))
1318            logger.debug(str(properties))
1319            self.network_added = True
1320
1321        def networkRemoved(self, network):
1322            logger.debug("networkRemoved: %s" % str(network))
1323            self.network_removed = True
1324
1325        def networkSelected(self, network):
1326            logger.debug("networkSelected: %s" % str(network))
1327            self.network_selected = True
1328
1329        def propertiesChanged(self, properties):
1330            logger.debug("propertiesChanged: %s" % str(properties))
1331            if 'State' in properties and properties['State'] == "completed":
1332                if self.state == 0:
1333                    self.state = 1
1334                    iface.Disconnect()
1335                elif self.state == 2:
1336                    self.state = 3
1337                    iface.Disconnect()
1338                elif self.state == 4:
1339                    self.state = 5
1340                    iface.Reattach()
1341                elif self.state == 5:
1342                    self.state = 6
1343                    iface.Disconnect()
1344                elif self.state == 7:
1345                    self.state = 8
1346                    res = iface.SignalPoll()
1347                    logger.debug("SignalPoll: " + str(res))
1348                    if 'frequency' not in res or res['frequency'] != 2412:
1349                        self.state = -1
1350                        logger.info("Unexpected SignalPoll result")
1351                    iface.RemoveNetwork(self.netw)
1352            if 'State' in properties and properties['State'] == "disconnected":
1353                if self.state == 1:
1354                    self.state = 2
1355                    iface.SelectNetwork(self.netw)
1356                elif self.state == 3:
1357                    self.state = 4
1358                    iface.Reassociate()
1359                elif self.state == 6:
1360                    self.state = 7
1361                    iface.Reconnect()
1362                elif self.state == 8:
1363                    self.state = 9
1364                    self.loop.quit()
1365
1366        def run_connect(self, *args):
1367            logger.debug("run_connect")
1368            args = dbus.Dictionary({'ssid': ssid,
1369                                    'key_mgmt': 'WPA-PSK',
1370                                    'psk': passphrase,
1371                                    'scan_freq': 2412},
1372                                   signature='sv')
1373            self.netw = iface.AddNetwork(args)
1374            iface.SelectNetwork(self.netw)
1375            return False
1376
1377        def success(self):
1378            if not self.network_added or \
1379               not self.network_removed or \
1380               not self.network_selected:
1381                return False
1382            return self.state == 9
1383
1384    with TestDbusConnect(bus) as t:
1385        if not t.success():
1386            raise Exception("Expected signals not seen")
1387
1388def test_dbus_remove_connected(dev, apdev):
1389    """D-Bus RemoveAllNetworks while connected"""
1390    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1391    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1392
1393    ssid = "test-open"
1394    hapd = hostapd.add_ap(apdev[0], {"ssid": ssid})
1395
1396    class TestDbusConnect(TestDbus):
1397        def __init__(self, bus):
1398            TestDbus.__init__(self, bus)
1399            self.network_added = False
1400            self.network_selected = False
1401            self.network_removed = False
1402            self.state = 0
1403
1404        def __enter__(self):
1405            gobject.timeout_add(1, self.run_connect)
1406            gobject.timeout_add(15000, self.timeout)
1407            self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
1408            self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE,
1409                            "NetworkRemoved")
1410            self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
1411                            "NetworkSelected")
1412            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1413                            "PropertiesChanged")
1414            self.loop.run()
1415            return self
1416
1417        def networkAdded(self, network, properties):
1418            logger.debug("networkAdded: %s" % str(network))
1419            logger.debug(str(properties))
1420            self.network_added = True
1421
1422        def networkRemoved(self, network):
1423            logger.debug("networkRemoved: %s" % str(network))
1424            self.network_removed = True
1425
1426        def networkSelected(self, network):
1427            logger.debug("networkSelected: %s" % str(network))
1428            self.network_selected = True
1429
1430        def propertiesChanged(self, properties):
1431            logger.debug("propertiesChanged: %s" % str(properties))
1432            if 'State' in properties and properties['State'] == "completed":
1433                if self.state == 0:
1434                    self.state = 1
1435                    iface.Disconnect()
1436                elif self.state == 2:
1437                    self.state = 3
1438                    iface.Disconnect()
1439                elif self.state == 4:
1440                    self.state = 5
1441                    iface.Reattach()
1442                elif self.state == 5:
1443                    self.state = 6
1444                    iface.Disconnect()
1445                elif self.state == 7:
1446                    self.state = 8
1447                    res = iface.SignalPoll()
1448                    logger.debug("SignalPoll: " + str(res))
1449                    if 'frequency' not in res or res['frequency'] != 2412:
1450                        self.state = -1
1451                        logger.info("Unexpected SignalPoll result")
1452                    iface.RemoveAllNetworks()
1453            if 'State' in properties and properties['State'] == "disconnected":
1454                if self.state == 1:
1455                    self.state = 2
1456                    iface.SelectNetwork(self.netw)
1457                elif self.state == 3:
1458                    self.state = 4
1459                    iface.Reassociate()
1460                elif self.state == 6:
1461                    self.state = 7
1462                    iface.Reconnect()
1463                elif self.state == 8:
1464                    self.state = 9
1465                    self.loop.quit()
1466
1467        def run_connect(self, *args):
1468            logger.debug("run_connect")
1469            args = dbus.Dictionary({'ssid': ssid,
1470                                    'key_mgmt': 'NONE',
1471                                    'scan_freq': 2412},
1472                                   signature='sv')
1473            self.netw = iface.AddNetwork(args)
1474            iface.SelectNetwork(self.netw)
1475            return False
1476
1477        def success(self):
1478            if not self.network_added or \
1479               not self.network_removed or \
1480               not self.network_selected:
1481                return False
1482            return self.state == 9
1483
1484    with TestDbusConnect(bus) as t:
1485        if not t.success():
1486            raise Exception("Expected signals not seen")
1487
1488def test_dbus_connect_psk_mem(dev, apdev):
1489    """D-Bus AddNetwork and connect with memory-only PSK"""
1490    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1491    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1492
1493    ssid = "test-wpa2-psk"
1494    passphrase = 'qwertyuiop'
1495    params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
1496    hapd = hostapd.add_ap(apdev[0], params)
1497
1498    class TestDbusConnect(TestDbus):
1499        def __init__(self, bus):
1500            TestDbus.__init__(self, bus)
1501            self.connected = False
1502
1503        def __enter__(self):
1504            gobject.timeout_add(1, self.run_connect)
1505            gobject.timeout_add(15000, self.timeout)
1506            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1507                            "PropertiesChanged")
1508            self.add_signal(self.networkRequest, WPAS_DBUS_IFACE,
1509                            "NetworkRequest")
1510            self.loop.run()
1511            return self
1512
1513        def propertiesChanged(self, properties):
1514            logger.debug("propertiesChanged: %s" % str(properties))
1515            if 'State' in properties and properties['State'] == "completed":
1516                self.connected = True
1517                self.loop.quit()
1518
1519        def networkRequest(self, path, field, txt):
1520            logger.debug("networkRequest: %s %s %s" % (path, field, txt))
1521            if field == "PSK_PASSPHRASE":
1522                iface.NetworkReply(path, field, '"' + passphrase + '"')
1523
1524        def run_connect(self, *args):
1525            logger.debug("run_connect")
1526            args = dbus.Dictionary({'ssid': ssid,
1527                                    'key_mgmt': 'WPA-PSK',
1528                                    'mem_only_psk': 1,
1529                                    'scan_freq': 2412},
1530                                   signature='sv')
1531            self.netw = iface.AddNetwork(args)
1532            iface.SelectNetwork(self.netw)
1533            return False
1534
1535        def success(self):
1536            return self.connected
1537
1538    with TestDbusConnect(bus) as t:
1539        if not t.success():
1540            raise Exception("Expected signals not seen")
1541
1542def test_dbus_connect_oom(dev, apdev):
1543    """D-Bus AddNetwork and connect when out-of-memory"""
1544    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1545    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1546
1547    if "OK" not in dev[0].request("TEST_ALLOC_FAIL 0:"):
1548        raise HwsimSkip("TEST_ALLOC_FAIL not supported in the build")
1549
1550    ssid = "test-wpa2-psk"
1551    passphrase = 'qwertyuiop'
1552    params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
1553    hapd = hostapd.add_ap(apdev[0], params)
1554
1555    class TestDbusConnect(TestDbus):
1556        def __init__(self, bus):
1557            TestDbus.__init__(self, bus)
1558            self.network_added = False
1559            self.network_selected = False
1560            self.network_removed = False
1561            self.state = 0
1562
1563        def __enter__(self):
1564            gobject.timeout_add(1, self.run_connect)
1565            gobject.timeout_add(1500, self.timeout)
1566            self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
1567            self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE,
1568                            "NetworkRemoved")
1569            self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
1570                            "NetworkSelected")
1571            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1572                            "PropertiesChanged")
1573            self.loop.run()
1574            return self
1575
1576        def networkAdded(self, network, properties):
1577            logger.debug("networkAdded: %s" % str(network))
1578            logger.debug(str(properties))
1579            self.network_added = True
1580
1581        def networkRemoved(self, network):
1582            logger.debug("networkRemoved: %s" % str(network))
1583            self.network_removed = True
1584
1585        def networkSelected(self, network):
1586            logger.debug("networkSelected: %s" % str(network))
1587            self.network_selected = True
1588
1589        def propertiesChanged(self, properties):
1590            logger.debug("propertiesChanged: %s" % str(properties))
1591            if 'State' in properties and properties['State'] == "completed":
1592                if self.state == 0:
1593                    self.state = 1
1594                    iface.Disconnect()
1595                elif self.state == 2:
1596                    self.state = 3
1597                    iface.Disconnect()
1598                elif self.state == 4:
1599                    self.state = 5
1600                    iface.Reattach()
1601                elif self.state == 5:
1602                    self.state = 6
1603                    res = iface.SignalPoll()
1604                    logger.debug("SignalPoll: " + str(res))
1605                    if 'frequency' not in res or res['frequency'] != 2412:
1606                        self.state = -1
1607                        logger.info("Unexpected SignalPoll result")
1608                    iface.RemoveNetwork(self.netw)
1609            if 'State' in properties and properties['State'] == "disconnected":
1610                if self.state == 1:
1611                    self.state = 2
1612                    iface.SelectNetwork(self.netw)
1613                elif self.state == 3:
1614                    self.state = 4
1615                    iface.Reassociate()
1616                elif self.state == 6:
1617                    self.state = 7
1618                    self.loop.quit()
1619
1620        def run_connect(self, *args):
1621            logger.debug("run_connect")
1622            args = dbus.Dictionary({'ssid': ssid,
1623                                    'key_mgmt': 'WPA-PSK',
1624                                    'psk': passphrase,
1625                                    'scan_freq': 2412},
1626                                   signature='sv')
1627            try:
1628                self.netw = iface.AddNetwork(args)
1629            except Exception as e:
1630                logger.info("Exception on AddNetwork: " + str(e))
1631                self.loop.quit()
1632                return False
1633            try:
1634                iface.SelectNetwork(self.netw)
1635            except Exception as e:
1636                logger.info("Exception on SelectNetwork: " + str(e))
1637                self.loop.quit()
1638
1639            return False
1640
1641        def success(self):
1642            if not self.network_added or \
1643               not self.network_removed or \
1644               not self.network_selected:
1645                return False
1646            return self.state == 7
1647
1648    count = 0
1649    for i in range(1, 1000):
1650        for j in range(3):
1651            dev[j].dump_monitor()
1652        dev[0].request("TEST_ALLOC_FAIL %d:main" % i)
1653        try:
1654            with TestDbusConnect(bus) as t:
1655                if not t.success():
1656                    logger.info("Iteration %d - Expected signals not seen" % i)
1657                else:
1658                    logger.info("Iteration %d - success" % i)
1659
1660            state = dev[0].request('GET_ALLOC_FAIL')
1661            logger.info("GET_ALLOC_FAIL: " + state)
1662            dev[0].dump_monitor()
1663            dev[0].request("TEST_ALLOC_FAIL 0:")
1664            if i < 3:
1665                raise Exception("Connection succeeded during out-of-memory")
1666            if not state.startswith('0:'):
1667                count += 1
1668                if count == 5:
1669                    break
1670        except:
1671            pass
1672
1673    # Force regulatory update to re-fetch hw capabilities for the following
1674    # test cases.
1675    try:
1676        dev[0].dump_monitor()
1677        subprocess.call(['iw', 'reg', 'set', 'US'])
1678        ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1679    finally:
1680        dev[0].dump_monitor()
1681        subprocess.call(['iw', 'reg', 'set', '00'])
1682        ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1683
1684def test_dbus_while_not_connected(dev, apdev):
1685    """D-Bus invalid operations while not connected"""
1686    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1687    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1688
1689    try:
1690        iface.Disconnect()
1691        raise Exception("Disconnect() accepted when not connected")
1692    except dbus.exceptions.DBusException as e:
1693        if "NotConnected" not in str(e):
1694            raise Exception("Unexpected error message for invalid Disconnect: " + str(e))
1695
1696    try:
1697        iface.Reattach()
1698        raise Exception("Reattach() accepted when not connected")
1699    except dbus.exceptions.DBusException as e:
1700        if "NotConnected" not in str(e):
1701            raise Exception("Unexpected error message for invalid Reattach: " + str(e))
1702
1703def test_dbus_connect_eap(dev, apdev):
1704    """D-Bus AddNetwork and connect to EAP network"""
1705    check_altsubject_match_support(dev[0])
1706    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1707    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1708
1709    ssid = "ieee8021x-open"
1710    params = hostapd.radius_params()
1711    params["ssid"] = ssid
1712    params["ieee8021x"] = "1"
1713    hapd = hostapd.add_ap(apdev[0], params)
1714
1715    class TestDbusConnect(TestDbus):
1716        def __init__(self, bus):
1717            TestDbus.__init__(self, bus)
1718            self.certification_received = False
1719            self.eap_status = False
1720            self.state = 0
1721
1722        def __enter__(self):
1723            gobject.timeout_add(1, self.run_connect)
1724            gobject.timeout_add(15000, self.timeout)
1725            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1726                            "PropertiesChanged")
1727            self.add_signal(self.certification, WPAS_DBUS_IFACE,
1728                            "Certification", byte_arrays=True)
1729            self.add_signal(self.networkRequest, WPAS_DBUS_IFACE,
1730                            "NetworkRequest")
1731            self.add_signal(self.eap, WPAS_DBUS_IFACE, "EAP")
1732            self.loop.run()
1733            return self
1734
1735        def propertiesChanged(self, properties):
1736            logger.debug("propertiesChanged: %s" % str(properties))
1737            if 'State' in properties and properties['State'] == "completed":
1738                if self.state == 0:
1739                    self.state = 1
1740                    iface.EAPLogoff()
1741                    logger.info("Set dNSName constraint")
1742                    net_obj = bus.get_object(WPAS_DBUS_SERVICE, self.netw)
1743                    args = dbus.Dictionary({'altsubject_match':
1744                                            self.server_dnsname},
1745                                           signature='sv')
1746                    net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1747                                dbus_interface=dbus.PROPERTIES_IFACE)
1748                elif self.state == 2:
1749                    self.state = 3
1750                    iface.Disconnect()
1751                    logger.info("Set non-matching dNSName constraint")
1752                    net_obj = bus.get_object(WPAS_DBUS_SERVICE, self.netw)
1753                    args = dbus.Dictionary({'altsubject_match':
1754                                            self.server_dnsname + "FOO"},
1755                                           signature='sv')
1756                    net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1757                                dbus_interface=dbus.PROPERTIES_IFACE)
1758            if 'State' in properties and properties['State'] == "disconnected":
1759                if self.state == 1:
1760                    self.state = 2
1761                    iface.EAPLogon()
1762                    iface.SelectNetwork(self.netw)
1763                if self.state == 3:
1764                    self.state = 4
1765                    iface.SelectNetwork(self.netw)
1766
1767        def certification(self, args):
1768            logger.debug("certification: %s" % str(args))
1769            self.certification_received = True
1770            if args['depth'] == 0:
1771                # The test server certificate is supposed to have dNSName
1772                if len(args['altsubject']) < 1:
1773                    raise Exception("Missing dNSName")
1774                dnsname = args['altsubject'][0]
1775                if not dnsname.startswith("DNS:"):
1776                    raise Exception("Expected dNSName not found: " + dnsname)
1777                logger.info("altsubject: " + dnsname)
1778                self.server_dnsname = dnsname
1779
1780        def eap(self, status, parameter):
1781            logger.debug("EAP: status=%s parameter=%s" % (status, parameter))
1782            if status == 'completion' and parameter == 'success':
1783                self.eap_status = True
1784            if self.state == 4 and status == 'remote certificate verification' and parameter == 'AltSubject mismatch':
1785                self.state = 5
1786                self.loop.quit()
1787
1788        def networkRequest(self, path, field, txt):
1789            logger.debug("networkRequest: %s %s %s" % (path, field, txt))
1790            if field == "PASSWORD":
1791                iface.NetworkReply(path, field, "password")
1792
1793        def run_connect(self, *args):
1794            logger.debug("run_connect")
1795            args = dbus.Dictionary({'ssid': ssid,
1796                                    'key_mgmt': 'IEEE8021X',
1797                                    'eapol_flags': 0,
1798                                    'eap': 'TTLS',
1799                                    'anonymous_identity': 'ttls',
1800                                    'identity': 'pap user',
1801                                    'ca_cert': 'auth_serv/ca.pem',
1802                                    'phase2': 'auth=PAP',
1803                                    'scan_freq': 2412},
1804                                   signature='sv')
1805            self.netw = iface.AddNetwork(args)
1806            iface.SelectNetwork(self.netw)
1807            return False
1808
1809        def success(self):
1810            if not self.eap_status or not self.certification_received:
1811                return False
1812            return self.state == 5
1813
1814    with TestDbusConnect(bus) as t:
1815        if not t.success():
1816            raise Exception("Expected signals not seen")
1817
1818def test_dbus_network(dev, apdev):
1819    """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
1820    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1821    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1822
1823    args = dbus.Dictionary({'ssid': "foo",
1824                            'key_mgmt': 'WPA-PSK',
1825                            'psk': "12345678",
1826                            'identity': dbus.ByteArray([1, 2]),
1827                            'priority': dbus.Int32(0),
1828                            'scan_freq': dbus.UInt32(2412)},
1829                           signature='sv')
1830    netw = iface.AddNetwork(args)
1831    id = int(dev[0].list_networks()[0]['id'])
1832    val = dev[0].get_network(id, "scan_freq")
1833    if val != "2412":
1834        raise Exception("Invalid scan_freq value: " + str(val))
1835    iface.RemoveNetwork(netw)
1836
1837    args = dbus.Dictionary({'ssid': "foo",
1838                            'key_mgmt': 'NONE',
1839                            'scan_freq': "2412 2432",
1840                            'freq_list': "2412 2417 2432"},
1841                           signature='sv')
1842    netw = iface.AddNetwork(args)
1843    id = int(dev[0].list_networks()[0]['id'])
1844    val = dev[0].get_network(id, "scan_freq")
1845    if val != "2412 2432":
1846        raise Exception("Invalid scan_freq value (2): " + str(val))
1847    val = dev[0].get_network(id, "freq_list")
1848    if val != "2412 2417 2432":
1849        raise Exception("Invalid freq_list value: " + str(val))
1850    iface.RemoveNetwork(netw)
1851    try:
1852        iface.RemoveNetwork(netw)
1853        raise Exception("Invalid RemoveNetwork() accepted")
1854    except dbus.exceptions.DBusException as e:
1855        if "NetworkUnknown" not in str(e):
1856            raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e))
1857    try:
1858        iface.SelectNetwork(netw)
1859        raise Exception("Invalid SelectNetwork() accepted")
1860    except dbus.exceptions.DBusException as e:
1861        if "NetworkUnknown" not in str(e):
1862            raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e))
1863
1864    args = dbus.Dictionary({'ssid': "foo1", 'key_mgmt': 'NONE',
1865                            'identity': "testuser", 'scan_freq': '2412'},
1866                           signature='sv')
1867    netw1 = iface.AddNetwork(args)
1868    args = dbus.Dictionary({'ssid': "foo2", 'key_mgmt': 'NONE'},
1869                           signature='sv')
1870    netw2 = iface.AddNetwork(args)
1871    res = if_obj.Get(WPAS_DBUS_IFACE, "Networks",
1872                     dbus_interface=dbus.PROPERTIES_IFACE)
1873    if len(res) != 2:
1874        raise Exception("Unexpected number of networks")
1875
1876    net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1)
1877    res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1878                      dbus_interface=dbus.PROPERTIES_IFACE)
1879    if res != False:
1880        raise Exception("Added network was unexpectedly enabled by default")
1881    net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(True),
1882                dbus_interface=dbus.PROPERTIES_IFACE)
1883    res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1884                      dbus_interface=dbus.PROPERTIES_IFACE)
1885    if res != True:
1886        raise Exception("Set(Enabled,True) did not seem to change property value")
1887    net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(False),
1888                dbus_interface=dbus.PROPERTIES_IFACE)
1889    res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1890                      dbus_interface=dbus.PROPERTIES_IFACE)
1891    if res != False:
1892        raise Exception("Set(Enabled,False) did not seem to change property value")
1893    try:
1894        net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.UInt32(1),
1895                    dbus_interface=dbus.PROPERTIES_IFACE)
1896        raise Exception("Invalid Set(Enabled,1) accepted")
1897    except dbus.exceptions.DBusException as e:
1898        if "Error.Failed: wrong property type" not in str(e):
1899            raise Exception("Unexpected error message for invalid Set(Enabled,1): " + str(e))
1900
1901    args = dbus.Dictionary({'ssid': "foo1new"}, signature='sv')
1902    net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1903                dbus_interface=dbus.PROPERTIES_IFACE)
1904    res = net_obj.Get(WPAS_DBUS_NETWORK, "Properties",
1905                      dbus_interface=dbus.PROPERTIES_IFACE)
1906    if res['ssid'] != '"foo1new"':
1907        raise Exception("Set(Properties) failed to update ssid")
1908    if res['identity'] != '"testuser"':
1909        raise Exception("Set(Properties) unexpectedly changed unrelated parameter")
1910
1911    iface.RemoveAllNetworks()
1912    res = if_obj.Get(WPAS_DBUS_IFACE, "Networks",
1913                     dbus_interface=dbus.PROPERTIES_IFACE)
1914    if len(res) != 0:
1915        raise Exception("Unexpected number of networks")
1916    iface.RemoveAllNetworks()
1917
1918    tests = [dbus.Dictionary({'psk': "1234567"}, signature='sv'),
1919             dbus.Dictionary({'identity': dbus.ByteArray()},
1920                             signature='sv'),
1921             dbus.Dictionary({'identity': dbus.Byte(1)}, signature='sv')]
1922    for args in tests:
1923        try:
1924            iface.AddNetwork(args)
1925            raise Exception("Invalid AddNetwork args accepted: " + str(args))
1926        except dbus.exceptions.DBusException as e:
1927            if "InvalidArgs" not in str(e):
1928                raise Exception("Unexpected error message for invalid AddNetwork: " + str(e))
1929
1930def test_dbus_network_oom(dev, apdev):
1931    """D-Bus AddNetwork/RemoveNetwork parameters and OOM error cases"""
1932    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1933    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1934
1935    args = dbus.Dictionary({'ssid': "foo1", 'key_mgmt': 'NONE',
1936                            'identity': "testuser", 'scan_freq': '2412'},
1937                           signature='sv')
1938    netw1 = iface.AddNetwork(args)
1939    net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1)
1940
1941    with alloc_fail_dbus(dev[0], 1,
1942                         "wpa_config_get_all;wpas_dbus_getter_network_properties",
1943                         "Get"):
1944        net_obj.Get(WPAS_DBUS_NETWORK, "Properties",
1945                    dbus_interface=dbus.PROPERTIES_IFACE)
1946
1947    iface.RemoveAllNetworks()
1948
1949    with alloc_fail_dbus(dev[0], 1,
1950                         "wpas_dbus_new_decompose_object_path;wpas_dbus_handler_remove_network",
1951                         "RemoveNetwork", "InvalidArgs"):
1952        iface.RemoveNetwork(dbus.ObjectPath("/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234"))
1953
1954    with alloc_fail(dev[0], 1, "wpa_dbus_register_object_per_iface;wpas_dbus_register_network"):
1955        args = dbus.Dictionary({'ssid': "foo2", 'key_mgmt': 'NONE'},
1956                               signature='sv')
1957        try:
1958            netw = iface.AddNetwork(args)
1959            # Currently, AddNetwork() succeeds even if os_strdup() for path
1960            # fails, so remove the network if that occurs.
1961            iface.RemoveNetwork(netw)
1962        except dbus.exceptions.DBusException as e:
1963            pass
1964
1965    for i in range(1, 3):
1966        with alloc_fail(dev[0], i, "=wpas_dbus_register_network"):
1967            try:
1968                netw = iface.AddNetwork(args)
1969                # Currently, AddNetwork() succeeds even if network registration
1970                # fails, so remove the network if that occurs.
1971                iface.RemoveNetwork(netw)
1972            except dbus.exceptions.DBusException as e:
1973                pass
1974
1975    with alloc_fail_dbus(dev[0], 1,
1976                         "=wpa_config_add_network;wpas_dbus_handler_add_network",
1977                         "AddNetwork",
1978                         "UnknownError: wpa_supplicant could not add a network"):
1979        args = dbus.Dictionary({'ssid': "foo2", 'key_mgmt': 'NONE'},
1980                               signature='sv')
1981        netw = iface.AddNetwork(args)
1982
1983    tests = [(1,
1984              'wpa_dbus_dict_get_entry;set_network_properties;wpas_dbus_handler_add_network',
1985              dbus.Dictionary({'ssid': dbus.ByteArray(b' ')},
1986                              signature='sv')),
1987             (1, '=set_network_properties;wpas_dbus_handler_add_network',
1988              dbus.Dictionary({'ssid': 'foo'}, signature='sv')),
1989             (1, '=set_network_properties;wpas_dbus_handler_add_network',
1990              dbus.Dictionary({'eap': 'foo'}, signature='sv')),
1991             (1, '=set_network_properties;wpas_dbus_handler_add_network',
1992              dbus.Dictionary({'priority': dbus.UInt32(1)},
1993                              signature='sv')),
1994             (1, '=set_network_properties;wpas_dbus_handler_add_network',
1995              dbus.Dictionary({'priority': dbus.Int32(1)},
1996                              signature='sv')),
1997             (1, '=set_network_properties;wpas_dbus_handler_add_network',
1998              dbus.Dictionary({'ssid': dbus.ByteArray(b' ')},
1999                              signature='sv'))]
2000    for (count, funcs, args) in tests:
2001        with alloc_fail_dbus(dev[0], count, funcs, "AddNetwork", "InvalidArgs"):
2002            netw = iface.AddNetwork(args)
2003
2004    if len(if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
2005                      dbus_interface=dbus.PROPERTIES_IFACE)) > 0:
2006        raise Exception("Unexpected network block added")
2007    if len(dev[0].list_networks()) > 0:
2008        raise Exception("Unexpected network block visible")
2009
2010def test_dbus_interface(dev, apdev):
2011    """D-Bus CreateInterface/GetInterface/RemoveInterface parameters and error cases"""
2012    try:
2013        _test_dbus_interface(dev, apdev)
2014    finally:
2015        # Need to force P2P channel list update since the 'lo' interface
2016        # with driver=none ends up configuring default dualband channels.
2017        dev[0].request("SET country US")
2018        ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
2019        if ev is None:
2020            ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"],
2021                                          timeout=1)
2022        dev[0].request("SET country 00")
2023        ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
2024        if ev is None:
2025            ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"],
2026                                          timeout=1)
2027        subprocess.call(['iw', 'reg', 'set', '00'])
2028
2029def _test_dbus_interface(dev, apdev):
2030    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2031    wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
2032
2033    params = dbus.Dictionary({'Ifname': 'lo', 'Driver': 'none'},
2034                             signature='sv')
2035    path = wpas.CreateInterface(params)
2036    logger.debug("New interface path: " + str(path))
2037    path2 = wpas.GetInterface("lo")
2038    if path != path2:
2039        raise Exception("Interface object mismatch")
2040
2041    params = dbus.Dictionary({'Ifname': 'lo',
2042                              'Driver': 'none',
2043                              'ConfigFile': 'foo',
2044                              'BridgeIfname': 'foo',},
2045                             signature='sv')
2046    try:
2047        wpas.CreateInterface(params)
2048        raise Exception("Invalid CreateInterface() accepted")
2049    except dbus.exceptions.DBusException as e:
2050        if "InterfaceExists" not in str(e):
2051            raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
2052
2053    wpas.RemoveInterface(path)
2054    try:
2055        wpas.RemoveInterface(path)
2056        raise Exception("Invalid RemoveInterface() accepted")
2057    except dbus.exceptions.DBusException as e:
2058        if "InterfaceUnknown" not in str(e):
2059            raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e))
2060
2061    params = dbus.Dictionary({'Ifname': 'lo', 'Driver': 'none',
2062                              'Foo': 123},
2063                             signature='sv')
2064    try:
2065        wpas.CreateInterface(params)
2066        raise Exception("Invalid CreateInterface() accepted")
2067    except dbus.exceptions.DBusException as e:
2068        if "InvalidArgs" not in str(e):
2069            raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
2070
2071    params = dbus.Dictionary({'Driver': 'none'}, signature='sv')
2072    try:
2073        wpas.CreateInterface(params)
2074        raise Exception("Invalid CreateInterface() accepted")
2075    except dbus.exceptions.DBusException as e:
2076        if "InvalidArgs" not in str(e):
2077            raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
2078
2079    try:
2080        wpas.GetInterface("lo")
2081        raise Exception("Invalid GetInterface() accepted")
2082    except dbus.exceptions.DBusException as e:
2083        if "InterfaceUnknown" not in str(e):
2084            raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e))
2085
2086def test_dbus_interface_oom(dev, apdev):
2087    """D-Bus CreateInterface/GetInterface/RemoveInterface OOM error cases"""
2088    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2089    wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
2090
2091    with alloc_fail_dbus(dev[0], 1, "wpa_dbus_dict_get_entry;wpas_dbus_handler_create_interface", "CreateInterface", "InvalidArgs"):
2092        params = dbus.Dictionary({'Ifname': 'lo', 'Driver': 'none'},
2093                                 signature='sv')
2094        wpas.CreateInterface(params)
2095
2096    for i in range(1, 1000):
2097        dev[0].request("TEST_ALLOC_FAIL %d:wpa_supplicant_add_iface;wpas_dbus_handler_create_interface" % i)
2098        params = dbus.Dictionary({'Ifname': 'lo', 'Driver': 'none'},
2099                                 signature='sv')
2100        try:
2101            npath = wpas.CreateInterface(params)
2102            wpas.RemoveInterface(npath)
2103            logger.info("CreateInterface succeeds after %d allocation failures" % i)
2104            state = dev[0].request('GET_ALLOC_FAIL')
2105            logger.info("GET_ALLOC_FAIL: " + state)
2106            dev[0].dump_monitor()
2107            dev[0].request("TEST_ALLOC_FAIL 0:")
2108            if i < 5:
2109                raise Exception("CreateInterface succeeded during out-of-memory")
2110            if not state.startswith('0:'):
2111                break
2112        except dbus.exceptions.DBusException as e:
2113            pass
2114
2115    for arg in ['Driver', 'Ifname', 'ConfigFile', 'BridgeIfname']:
2116        with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_handler_create_interface",
2117                             "CreateInterface"):
2118            params = dbus.Dictionary({arg: 'foo'}, signature='sv')
2119            wpas.CreateInterface(params)
2120
2121def test_dbus_blob(dev, apdev):
2122    """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
2123    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2124    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2125
2126    blob = dbus.ByteArray(b"\x01\x02\x03")
2127    iface.AddBlob('blob1', blob)
2128    try:
2129        iface.AddBlob('blob1', dbus.ByteArray(b"\x01\x02\x04"))
2130        raise Exception("Invalid AddBlob() accepted")
2131    except dbus.exceptions.DBusException as e:
2132        if "BlobExists" not in str(e):
2133            raise Exception("Unexpected error message for invalid AddBlob: " + str(e))
2134    res = iface.GetBlob('blob1')
2135    if len(res) != len(blob):
2136        raise Exception("Unexpected blob data length")
2137    for i in range(len(res)):
2138        if res[i] != dbus.Byte(blob[i]):
2139            raise Exception("Unexpected blob data")
2140    res = if_obj.Get(WPAS_DBUS_IFACE, "Blobs",
2141                     dbus_interface=dbus.PROPERTIES_IFACE)
2142    if 'blob1' not in res:
2143        raise Exception("Added blob missing from Blobs property")
2144    iface.RemoveBlob('blob1')
2145    try:
2146        iface.RemoveBlob('blob1')
2147        raise Exception("Invalid RemoveBlob() accepted")
2148    except dbus.exceptions.DBusException as e:
2149        if "BlobUnknown" not in str(e):
2150            raise Exception("Unexpected error message for invalid RemoveBlob: " + str(e))
2151    try:
2152        iface.GetBlob('blob1')
2153        raise Exception("Invalid GetBlob() accepted")
2154    except dbus.exceptions.DBusException as e:
2155        if "BlobUnknown" not in str(e):
2156            raise Exception("Unexpected error message for invalid GetBlob: " + str(e))
2157
2158    class TestDbusBlob(TestDbus):
2159        def __init__(self, bus):
2160            TestDbus.__init__(self, bus)
2161            self.blob_added = False
2162            self.blob_removed = False
2163
2164        def __enter__(self):
2165            gobject.timeout_add(1, self.run_blob)
2166            gobject.timeout_add(15000, self.timeout)
2167            self.add_signal(self.blobAdded, WPAS_DBUS_IFACE, "BlobAdded")
2168            self.add_signal(self.blobRemoved, WPAS_DBUS_IFACE, "BlobRemoved")
2169            self.loop.run()
2170            return self
2171
2172        def blobAdded(self, blobName):
2173            logger.debug("blobAdded: %s" % blobName)
2174            if blobName == 'blob2':
2175                self.blob_added = True
2176
2177        def blobRemoved(self, blobName):
2178            logger.debug("blobRemoved: %s" % blobName)
2179            if blobName == 'blob2':
2180                self.blob_removed = True
2181                self.loop.quit()
2182
2183        def run_blob(self, *args):
2184            logger.debug("run_blob")
2185            iface.AddBlob('blob2', dbus.ByteArray(b"\x01\x02\x04"))
2186            iface.RemoveBlob('blob2')
2187            return False
2188
2189        def success(self):
2190            return self.blob_added and self.blob_removed
2191
2192    with TestDbusBlob(bus) as t:
2193        if not t.success():
2194            raise Exception("Expected signals not seen")
2195
2196def test_dbus_blob_oom(dev, apdev):
2197    """D-Bus AddNetwork/RemoveNetwork OOM error cases"""
2198    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2199    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2200
2201    for i in range(1, 4):
2202        with alloc_fail_dbus(dev[0], i, "wpas_dbus_handler_add_blob",
2203                             "AddBlob"):
2204            iface.AddBlob('blob_no_mem', dbus.ByteArray(b"\x01\x02\x03\x04"))
2205
2206def test_dbus_autoscan(dev, apdev):
2207    """D-Bus Autoscan()"""
2208    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2209    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2210
2211    iface.AutoScan("foo")
2212    iface.AutoScan("periodic:1")
2213    iface.AutoScan("")
2214    dev[0].request("AUTOSCAN ")
2215
2216def test_dbus_autoscan_oom(dev, apdev):
2217    """D-Bus Autoscan() OOM"""
2218    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2219    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2220
2221    with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_autoscan", "AutoScan"):
2222        iface.AutoScan("foo")
2223    dev[0].request("AUTOSCAN ")
2224
2225def test_dbus_tdls_invalid(dev, apdev):
2226    """D-Bus invalid TDLS operations"""
2227    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2228    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2229
2230    hapd = hostapd.add_ap(apdev[0], {"ssid": "test-open"})
2231    connect_2sta_open(dev, hapd)
2232    addr1 = dev[1].p2p_interface_addr()
2233
2234    try:
2235        iface.TDLSDiscover("foo")
2236        raise Exception("Invalid TDLSDiscover() accepted")
2237    except dbus.exceptions.DBusException as e:
2238        if "InvalidArgs" not in str(e):
2239            raise Exception("Unexpected error message for invalid TDLSDiscover: " + str(e))
2240
2241    try:
2242        iface.TDLSStatus("foo")
2243        raise Exception("Invalid TDLSStatus() accepted")
2244    except dbus.exceptions.DBusException as e:
2245        if "InvalidArgs" not in str(e):
2246            raise Exception("Unexpected error message for invalid TDLSStatus: " + str(e))
2247
2248    res = iface.TDLSStatus(addr1)
2249    if res != "peer does not exist":
2250        raise Exception("Unexpected TDLSStatus response")
2251
2252    try:
2253        iface.TDLSSetup("foo")
2254        raise Exception("Invalid TDLSSetup() accepted")
2255    except dbus.exceptions.DBusException as e:
2256        if "InvalidArgs" not in str(e):
2257            raise Exception("Unexpected error message for invalid TDLSSetup: " + str(e))
2258
2259    try:
2260        iface.TDLSTeardown("foo")
2261        raise Exception("Invalid TDLSTeardown() accepted")
2262    except dbus.exceptions.DBusException as e:
2263        if "InvalidArgs" not in str(e):
2264            raise Exception("Unexpected error message for invalid TDLSTeardown: " + str(e))
2265
2266    try:
2267        iface.TDLSTeardown("00:11:22:33:44:55")
2268        raise Exception("TDLSTeardown accepted for unknown peer")
2269    except dbus.exceptions.DBusException as e:
2270        if "UnknownError: error performing TDLS teardown" not in str(e):
2271            raise Exception("Unexpected error message: " + str(e))
2272
2273    try:
2274        iface.TDLSChannelSwitch({})
2275        raise Exception("Invalid TDLSChannelSwitch() accepted")
2276    except dbus.exceptions.DBusException as e:
2277        if "InvalidArgs" not in str(e):
2278            raise Exception("Unexpected error message for invalid TDLSChannelSwitch: " + str(e))
2279
2280    try:
2281        iface.TDLSCancelChannelSwitch("foo")
2282        raise Exception("Invalid TDLSCancelChannelSwitch() accepted")
2283    except dbus.exceptions.DBusException as e:
2284        if "InvalidArgs" not in str(e):
2285            raise Exception("Unexpected error message for invalid TDLSCancelChannelSwitch: " + str(e))
2286
2287def test_dbus_tdls_oom(dev, apdev):
2288    """D-Bus TDLS operations during OOM"""
2289    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2290    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2291
2292    with alloc_fail_dbus(dev[0], 1, "wpa_tdls_add_peer", "TDLSSetup",
2293                         "UnknownError: error performing TDLS setup"):
2294        iface.TDLSSetup("00:11:22:33:44:55")
2295
2296def test_dbus_tdls(dev, apdev):
2297    """D-Bus TDLS"""
2298    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2299    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2300
2301    hapd = hostapd.add_ap(apdev[0], {"ssid": "test-open"})
2302    connect_2sta_open(dev, hapd)
2303
2304    addr1 = dev[1].p2p_interface_addr()
2305
2306    class TestDbusTdls(TestDbus):
2307        def __init__(self, bus):
2308            TestDbus.__init__(self, bus)
2309            self.tdls_setup = False
2310            self.tdls_teardown = False
2311
2312        def __enter__(self):
2313            gobject.timeout_add(1, self.run_tdls)
2314            gobject.timeout_add(15000, self.timeout)
2315            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
2316                            "PropertiesChanged")
2317            self.loop.run()
2318            return self
2319
2320        def propertiesChanged(self, properties):
2321            logger.debug("propertiesChanged: %s" % str(properties))
2322
2323        def run_tdls(self, *args):
2324            logger.debug("run_tdls")
2325            iface.TDLSDiscover(addr1)
2326            gobject.timeout_add(100, self.run_tdls2)
2327            return False
2328
2329        def run_tdls2(self, *args):
2330            logger.debug("run_tdls2")
2331            iface.TDLSSetup(addr1)
2332            gobject.timeout_add(500, self.run_tdls3)
2333            return False
2334
2335        def run_tdls3(self, *args):
2336            logger.debug("run_tdls3")
2337            res = iface.TDLSStatus(addr1)
2338            if res == "connected":
2339                self.tdls_setup = True
2340            else:
2341                logger.info("Unexpected TDLSStatus: " + res)
2342            iface.TDLSTeardown(addr1)
2343            gobject.timeout_add(200, self.run_tdls4)
2344            return False
2345
2346        def run_tdls4(self, *args):
2347            logger.debug("run_tdls4")
2348            res = iface.TDLSStatus(addr1)
2349            if res == "peer does not exist":
2350                self.tdls_teardown = True
2351            else:
2352                logger.info("Unexpected TDLSStatus: " + res)
2353            self.loop.quit()
2354            return False
2355
2356        def success(self):
2357            return self.tdls_setup and self.tdls_teardown
2358
2359    with TestDbusTdls(bus) as t:
2360        if not t.success():
2361            raise Exception("Expected signals not seen")
2362
2363def test_dbus_tdls_channel_switch(dev, apdev):
2364    """D-Bus TDLS channel switch configuration"""
2365    flags = int(dev[0].get_driver_status_field('capa.flags'), 16)
2366    if flags & 0x800000000 == 0:
2367        raise HwsimSkip("Driver does not support TDLS channel switching")
2368
2369    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2370    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2371
2372    hapd = hostapd.add_ap(apdev[0], {"ssid": "test-open"})
2373    connect_2sta_open(dev, hapd)
2374
2375    addr1 = dev[1].p2p_interface_addr()
2376
2377    class TestDbusTdls(TestDbus):
2378        def __init__(self, bus):
2379            TestDbus.__init__(self, bus)
2380            self.tdls_setup = False
2381            self.tdls_done = False
2382
2383        def __enter__(self):
2384            gobject.timeout_add(1, self.run_tdls)
2385            gobject.timeout_add(15000, self.timeout)
2386            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
2387                            "PropertiesChanged")
2388            self.loop.run()
2389            return self
2390
2391        def propertiesChanged(self, properties):
2392            logger.debug("propertiesChanged: %s" % str(properties))
2393
2394        def run_tdls(self, *args):
2395            logger.debug("run_tdls")
2396            iface.TDLSDiscover(addr1)
2397            gobject.timeout_add(100, self.run_tdls2)
2398            return False
2399
2400        def run_tdls2(self, *args):
2401            logger.debug("run_tdls2")
2402            iface.TDLSSetup(addr1)
2403            gobject.timeout_add(500, self.run_tdls3)
2404            return False
2405
2406        def run_tdls3(self, *args):
2407            logger.debug("run_tdls3")
2408            res = iface.TDLSStatus(addr1)
2409            if res == "connected":
2410                self.tdls_setup = True
2411            else:
2412                logger.info("Unexpected TDLSStatus: " + res)
2413
2414            # Unknown dict entry
2415            args = dbus.Dictionary({'Foobar': dbus.Byte(1)},
2416                                   signature='sv')
2417            try:
2418                iface.TDLSChannelSwitch(args)
2419            except Exception as e:
2420                if "InvalidArgs" not in str(e):
2421                    raise Exception("Unexpected exception")
2422
2423            # Missing OperClass
2424            args = dbus.Dictionary({}, signature='sv')
2425            try:
2426                iface.TDLSChannelSwitch(args)
2427            except Exception as e:
2428                if "InvalidArgs" not in str(e):
2429                    raise Exception("Unexpected exception")
2430
2431            # Missing Frequency
2432            args = dbus.Dictionary({'OperClass': dbus.Byte(1)},
2433                                   signature='sv')
2434            try:
2435                iface.TDLSChannelSwitch(args)
2436            except Exception as e:
2437                if "InvalidArgs" not in str(e):
2438                    raise Exception("Unexpected exception")
2439
2440            # Missing PeerAddress
2441            args = dbus.Dictionary({'OperClass': dbus.Byte(1),
2442                                     'Frequency': dbus.UInt32(2417)},
2443                                   signature='sv')
2444            try:
2445                iface.TDLSChannelSwitch(args)
2446            except Exception as e:
2447                if "InvalidArgs" not in str(e):
2448                    raise Exception("Unexpected exception")
2449
2450            # Valid parameters
2451            args = dbus.Dictionary({'OperClass': dbus.Byte(1),
2452                                    'Frequency': dbus.UInt32(2417),
2453                                    'PeerAddress': addr1,
2454                                    'SecChannelOffset': dbus.UInt32(0),
2455                                    'CenterFrequency1': dbus.UInt32(0),
2456                                    'CenterFrequency2': dbus.UInt32(0),
2457                                    'Bandwidth': dbus.UInt32(20),
2458                                    'HT': dbus.Boolean(False),
2459                                    'VHT': dbus.Boolean(False)},
2460                                   signature='sv')
2461            iface.TDLSChannelSwitch(args)
2462
2463            gobject.timeout_add(200, self.run_tdls4)
2464            return False
2465
2466        def run_tdls4(self, *args):
2467            logger.debug("run_tdls4")
2468            iface.TDLSCancelChannelSwitch(addr1)
2469            self.tdls_done = True
2470            self.loop.quit()
2471            return False
2472
2473        def success(self):
2474            return self.tdls_setup and self.tdls_done
2475
2476    with TestDbusTdls(bus) as t:
2477        if not t.success():
2478            raise Exception("Expected signals not seen")
2479
2480def test_dbus_pkcs11(dev, apdev):
2481    """D-Bus SetPKCS11EngineAndModulePath()"""
2482    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2483    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2484
2485    try:
2486        iface.SetPKCS11EngineAndModulePath("foo", "bar")
2487    except dbus.exceptions.DBusException as e:
2488        if "Error.Failed: Reinit of the EAPOL" not in str(e):
2489            raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e))
2490
2491    try:
2492        iface.SetPKCS11EngineAndModulePath("foo", "")
2493    except dbus.exceptions.DBusException as e:
2494        if "Error.Failed: Reinit of the EAPOL" not in str(e):
2495            raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e))
2496
2497    iface.SetPKCS11EngineAndModulePath("", "bar")
2498    res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath",
2499                     dbus_interface=dbus.PROPERTIES_IFACE)
2500    if res != "":
2501        raise Exception("Unexpected PKCS11EnginePath value: " + res)
2502    res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath",
2503                     dbus_interface=dbus.PROPERTIES_IFACE)
2504    if res != "bar":
2505        raise Exception("Unexpected PKCS11ModulePath value: " + res)
2506
2507    iface.SetPKCS11EngineAndModulePath("", "")
2508    res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath",
2509                     dbus_interface=dbus.PROPERTIES_IFACE)
2510    if res != "":
2511        raise Exception("Unexpected PKCS11EnginePath value: " + res)
2512    res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath",
2513                     dbus_interface=dbus.PROPERTIES_IFACE)
2514    if res != "":
2515        raise Exception("Unexpected PKCS11ModulePath value: " + res)
2516
2517def test_dbus_apscan(dev, apdev):
2518    """D-Bus Get/Set ApScan"""
2519    try:
2520        _test_dbus_apscan(dev, apdev)
2521    finally:
2522        dev[0].request("AP_SCAN 1")
2523
2524def _test_dbus_apscan(dev, apdev):
2525    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2526
2527    res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan",
2528                     dbus_interface=dbus.PROPERTIES_IFACE)
2529    if res != 1:
2530        raise Exception("Unexpected initial ApScan value: %d" % res)
2531
2532    for i in range(3):
2533        if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(i),
2534                     dbus_interface=dbus.PROPERTIES_IFACE)
2535        res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan",
2536                         dbus_interface=dbus.PROPERTIES_IFACE)
2537        if res != i:
2538            raise Exception("Unexpected ApScan value %d (expected %d)" % (res, i))
2539
2540    try:
2541        if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.Int16(-1),
2542                   dbus_interface=dbus.PROPERTIES_IFACE)
2543        raise Exception("Invalid Set(ApScan,-1) accepted")
2544    except dbus.exceptions.DBusException as e:
2545        if "Error.Failed: wrong property type" not in str(e):
2546            raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e))
2547
2548    try:
2549        if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(123),
2550                   dbus_interface=dbus.PROPERTIES_IFACE)
2551        raise Exception("Invalid Set(ApScan,123) accepted")
2552    except dbus.exceptions.DBusException as e:
2553        if "Error.Failed: ap_scan must be 0, 1, or 2" not in str(e):
2554            raise Exception("Unexpected error message for invalid Set(ApScan,123): " + str(e))
2555
2556    if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(1),
2557               dbus_interface=dbus.PROPERTIES_IFACE)
2558
2559def test_dbus_pmf(dev, apdev):
2560    """D-Bus Get/Set Pmf"""
2561    try:
2562        _test_dbus_pmf(dev, apdev)
2563    finally:
2564        dev[0].request("SET pmf 0")
2565
2566def _test_dbus_pmf(dev, apdev):
2567    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2568
2569    dev[0].set("pmf", "0")
2570    res = if_obj.Get(WPAS_DBUS_IFACE, "Pmf",
2571                     dbus_interface=dbus.PROPERTIES_IFACE)
2572    if res != "0":
2573        raise Exception("Unexpected initial Pmf value: %s" % res)
2574
2575    for i in range(3):
2576        if_obj.Set(WPAS_DBUS_IFACE, "Pmf", str(i),
2577                   dbus_interface=dbus.PROPERTIES_IFACE)
2578        res = if_obj.Get(WPAS_DBUS_IFACE, "Pmf",
2579                         dbus_interface=dbus.PROPERTIES_IFACE)
2580        if res != str(i):
2581            raise Exception("Unexpected Pmf value %s (expected %d)" % (res, i))
2582
2583    if_obj.Set(WPAS_DBUS_IFACE, "Pmf", "1",
2584               dbus_interface=dbus.PROPERTIES_IFACE)
2585
2586def test_dbus_fastreauth(dev, apdev):
2587    """D-Bus Get/Set FastReauth"""
2588    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2589
2590    res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth",
2591                     dbus_interface=dbus.PROPERTIES_IFACE)
2592    if res != True:
2593        raise Exception("Unexpected initial FastReauth value: " + str(res))
2594
2595    for i in [False, True]:
2596        if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(i),
2597                     dbus_interface=dbus.PROPERTIES_IFACE)
2598        res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth",
2599                         dbus_interface=dbus.PROPERTIES_IFACE)
2600        if res != i:
2601            raise Exception("Unexpected FastReauth value %d (expected %d)" % (res, i))
2602
2603    try:
2604        if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Int16(-1),
2605                   dbus_interface=dbus.PROPERTIES_IFACE)
2606        raise Exception("Invalid Set(FastReauth,-1) accepted")
2607    except dbus.exceptions.DBusException as e:
2608        if "Error.Failed: wrong property type" not in str(e):
2609            raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e))
2610
2611    if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(True),
2612               dbus_interface=dbus.PROPERTIES_IFACE)
2613
2614def test_dbus_bss_expire(dev, apdev):
2615    """D-Bus Get/Set BSSExpireAge and BSSExpireCount"""
2616    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2617
2618    if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(179),
2619               dbus_interface=dbus.PROPERTIES_IFACE)
2620    res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireAge",
2621                     dbus_interface=dbus.PROPERTIES_IFACE)
2622    if res != 179:
2623        raise Exception("Unexpected BSSExpireAge value %d (expected %d)" % (res, i))
2624
2625    if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(3),
2626               dbus_interface=dbus.PROPERTIES_IFACE)
2627    res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireCount",
2628                     dbus_interface=dbus.PROPERTIES_IFACE)
2629    if res != 3:
2630        raise Exception("Unexpected BSSExpireCount value %d (expected %d)" % (res, i))
2631
2632    try:
2633        if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.Int16(-1),
2634                   dbus_interface=dbus.PROPERTIES_IFACE)
2635        raise Exception("Invalid Set(BSSExpireAge,-1) accepted")
2636    except dbus.exceptions.DBusException as e:
2637        if "Error.Failed: wrong property type" not in str(e):
2638            raise Exception("Unexpected error message for invalid Set(BSSExpireAge,-1): " + str(e))
2639
2640    try:
2641        if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(9),
2642                   dbus_interface=dbus.PROPERTIES_IFACE)
2643        raise Exception("Invalid Set(BSSExpireAge,9) accepted")
2644    except dbus.exceptions.DBusException as e:
2645        if "Error.Failed: BSSExpireAge must be >= 10" not in str(e):
2646            raise Exception("Unexpected error message for invalid Set(BSSExpireAge,9): " + str(e))
2647
2648    try:
2649        if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.Int16(-1),
2650                   dbus_interface=dbus.PROPERTIES_IFACE)
2651        raise Exception("Invalid Set(BSSExpireCount,-1) accepted")
2652    except dbus.exceptions.DBusException as e:
2653        if "Error.Failed: wrong property type" not in str(e):
2654            raise Exception("Unexpected error message for invalid Set(BSSExpireCount,-1): " + str(e))
2655
2656    try:
2657        if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(0),
2658                   dbus_interface=dbus.PROPERTIES_IFACE)
2659        raise Exception("Invalid Set(BSSExpireCount,0) accepted")
2660    except dbus.exceptions.DBusException as e:
2661        if "Error.Failed: BSSExpireCount must be > 0" not in str(e):
2662            raise Exception("Unexpected error message for invalid Set(BSSExpireCount,0): " + str(e))
2663
2664    if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(180),
2665               dbus_interface=dbus.PROPERTIES_IFACE)
2666    if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(2),
2667               dbus_interface=dbus.PROPERTIES_IFACE)
2668
2669def test_dbus_country(dev, apdev):
2670    """D-Bus Get/Set Country"""
2671    try:
2672        _test_dbus_country(dev, apdev)
2673    finally:
2674        dev[0].request("SET country 00")
2675        subprocess.call(['iw', 'reg', 'set', '00'])
2676
2677def _test_dbus_country(dev, apdev):
2678    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2679
2680    # work around issues with possible pending regdom event from the end of
2681    # the previous test case
2682    time.sleep(0.2)
2683    dev[0].dump_monitor()
2684
2685    if_obj.Set(WPAS_DBUS_IFACE, "Country", "FI",
2686               dbus_interface=dbus.PROPERTIES_IFACE)
2687    res = if_obj.Get(WPAS_DBUS_IFACE, "Country",
2688                     dbus_interface=dbus.PROPERTIES_IFACE)
2689    if res != "FI":
2690        raise Exception("Unexpected Country value %s (expected FI)" % res)
2691
2692    ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
2693    if ev is None:
2694        # For now, work around separate P2P Device interface event delivery
2695        ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
2696        if ev is None:
2697            raise Exception("regdom change event not seen")
2698    if "init=USER type=COUNTRY alpha2=FI" not in ev:
2699        raise Exception("Unexpected event contents: " + ev)
2700
2701    try:
2702        if_obj.Set(WPAS_DBUS_IFACE, "Country", dbus.Int16(-1),
2703                   dbus_interface=dbus.PROPERTIES_IFACE)
2704        raise Exception("Invalid Set(Country,-1) accepted")
2705    except dbus.exceptions.DBusException as e:
2706        if "Error.Failed: wrong property type" not in str(e):
2707            raise Exception("Unexpected error message for invalid Set(Country,-1): " + str(e))
2708
2709    try:
2710        if_obj.Set(WPAS_DBUS_IFACE, "Country", "F",
2711                   dbus_interface=dbus.PROPERTIES_IFACE)
2712        raise Exception("Invalid Set(Country,F) accepted")
2713    except dbus.exceptions.DBusException as e:
2714        if "Error.Failed: invalid country code" not in str(e):
2715            raise Exception("Unexpected error message for invalid Set(Country,F): " + str(e))
2716
2717    if_obj.Set(WPAS_DBUS_IFACE, "Country", "00",
2718               dbus_interface=dbus.PROPERTIES_IFACE)
2719
2720    ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
2721    if ev is None:
2722        # For now, work around separate P2P Device interface event delivery
2723        ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
2724        if ev is None:
2725            raise Exception("regdom change event not seen")
2726    # init=CORE was previously used due to invalid db.txt data for 00. For
2727    # now, allow both it and the new init=USER after fixed db.txt.
2728    if "init=CORE type=WORLD" not in ev and "init=USER type=WORLD" not in ev:
2729        raise Exception("Unexpected event contents: " + ev)
2730
2731def test_dbus_scan_interval(dev, apdev):
2732    """D-Bus Get/Set ScanInterval"""
2733    try:
2734        _test_dbus_scan_interval(dev, apdev)
2735    finally:
2736        dev[0].request("SCAN_INTERVAL 5")
2737
2738def _test_dbus_scan_interval(dev, apdev):
2739    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2740
2741    if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(3),
2742               dbus_interface=dbus.PROPERTIES_IFACE)
2743    res = if_obj.Get(WPAS_DBUS_IFACE, "ScanInterval",
2744                     dbus_interface=dbus.PROPERTIES_IFACE)
2745    if res != 3:
2746        raise Exception("Unexpected ScanInterval value %d (expected %d)" % (res, i))
2747
2748    try:
2749        if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.UInt16(100),
2750                   dbus_interface=dbus.PROPERTIES_IFACE)
2751        raise Exception("Invalid Set(ScanInterval,100) accepted")
2752    except dbus.exceptions.DBusException as e:
2753        if "Error.Failed: wrong property type" not in str(e):
2754            raise Exception("Unexpected error message for invalid Set(ScanInterval,100): " + str(e))
2755
2756    try:
2757        if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(-1),
2758                   dbus_interface=dbus.PROPERTIES_IFACE)
2759        raise Exception("Invalid Set(ScanInterval,-1) accepted")
2760    except dbus.exceptions.DBusException as e:
2761        if "Error.Failed: scan_interval must be >= 0" not in str(e):
2762            raise Exception("Unexpected error message for invalid Set(ScanInterval,-1): " + str(e))
2763
2764    if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(5),
2765               dbus_interface=dbus.PROPERTIES_IFACE)
2766
2767def test_dbus_probe_req_reporting(dev, apdev):
2768    """D-Bus Probe Request reporting"""
2769    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2770
2771    dev[1].p2p_find(social=True)
2772
2773    class TestDbusProbe(TestDbus):
2774        def __init__(self, bus):
2775            TestDbus.__init__(self, bus)
2776            self.reported = False
2777
2778        def __enter__(self):
2779            gobject.timeout_add(1, self.run_test)
2780            gobject.timeout_add(15000, self.timeout)
2781            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
2782                            "GroupStarted")
2783            self.add_signal(self.probeRequest, WPAS_DBUS_IFACE, "ProbeRequest",
2784                            byte_arrays=True)
2785            self.loop.run()
2786            return self
2787
2788        def groupStarted(self, properties):
2789            logger.debug("groupStarted: " + str(properties))
2790            g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
2791                                      properties['interface_object'])
2792            self.iface = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE)
2793            self.iface.SubscribeProbeReq()
2794            self.group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2795
2796        def probeRequest(self, args):
2797            logger.debug("probeRequest: args=%s" % str(args))
2798            self.reported = True
2799            self.loop.quit()
2800
2801        def run_test(self, *args):
2802            logger.debug("run_test")
2803            p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2804            params = dbus.Dictionary({'frequency': 2412})
2805            p2p.GroupAdd(params)
2806            return False
2807
2808        def success(self):
2809            return self.reported
2810
2811    with TestDbusProbe(bus) as t:
2812        if not t.success():
2813            raise Exception("Expected signals not seen")
2814        t.iface.UnsubscribeProbeReq()
2815        try:
2816            t.iface.UnsubscribeProbeReq()
2817            raise Exception("Invalid UnsubscribeProbeReq() accepted")
2818        except dbus.exceptions.DBusException as e:
2819            if "NoSubscription" not in str(e):
2820                raise Exception("Unexpected error message for invalid UnsubscribeProbeReq(): " + str(e))
2821        t.group_p2p.Disconnect()
2822
2823    with TestDbusProbe(bus) as t:
2824        if not t.success():
2825            raise Exception("Expected signals not seen")
2826        # On purpose, leave ProbeReq subscription in place to test automatic
2827        # cleanup.
2828
2829    dev[1].p2p_stop_find()
2830
2831def test_dbus_probe_req_reporting_oom(dev, apdev):
2832    """D-Bus Probe Request reporting (OOM)"""
2833    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2834    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2835
2836    # Need to make sure this process has not already subscribed to avoid false
2837    # failures due to the operation succeeding due to os_strdup() not even
2838    # getting called.
2839    try:
2840        iface.UnsubscribeProbeReq()
2841        was_subscribed = True
2842    except dbus.exceptions.DBusException as e:
2843        was_subscribed = False
2844        pass
2845
2846    with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_subscribe_preq",
2847                         "SubscribeProbeReq"):
2848        iface.SubscribeProbeReq()
2849
2850    if was_subscribed:
2851        # On purpose, leave ProbeReq subscription in place to test automatic
2852        # cleanup.
2853        iface.SubscribeProbeReq()
2854
2855def test_dbus_p2p_invalid(dev, apdev):
2856    """D-Bus invalid P2P operations"""
2857    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2858    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2859
2860    try:
2861        p2p.RejectPeer(path + "/Peers/00112233445566")
2862        raise Exception("Invalid RejectPeer accepted")
2863    except dbus.exceptions.DBusException as e:
2864        if "UnknownError: Failed to call wpas_p2p_reject" not in str(e):
2865            raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2866
2867    try:
2868        p2p.RejectPeer("/foo")
2869        raise Exception("Invalid RejectPeer accepted")
2870    except dbus.exceptions.DBusException as e:
2871        if "InvalidArgs" not in str(e):
2872            raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2873
2874    tests = [{},
2875             {'peer': 'foo'},
2876             {'foo': "bar"},
2877             {'iface': "abc"},
2878             {'iface': 123}]
2879    for t in tests:
2880        try:
2881            p2p.RemoveClient(t)
2882            raise Exception("Invalid RemoveClient accepted")
2883        except dbus.exceptions.DBusException as e:
2884            if "InvalidArgs" not in str(e):
2885                raise Exception("Unexpected error message for invalid RemoveClient(): " + str(e))
2886
2887    tests = [{'DiscoveryType': 'foo'},
2888             {'RequestedDeviceTypes': 'foo'},
2889             {'RequestedDeviceTypes': ['foo']},
2890             {'RequestedDeviceTypes': ['1', '2', '3', '4', '5', '6', '7', '8',
2891                                       '9', '10', '11', '12', '13', '14', '15',
2892                                       '16', '17']},
2893             {'RequestedDeviceTypes': dbus.Array([], signature="s")},
2894             {'RequestedDeviceTypes': dbus.Array([['foo']], signature="as")},
2895             {'RequestedDeviceTypes': dbus.Array([], signature="i")},
2896             {'RequestedDeviceTypes': [dbus.ByteArray(b'12345678'),
2897                                       dbus.ByteArray(b'1234567')]},
2898             {'Foo': dbus.Int16(1)},
2899             {'Foo': dbus.UInt16(1)},
2900             {'Foo': dbus.Int64(1)},
2901             {'Foo': dbus.UInt64(1)},
2902             {'Foo': dbus.Double(1.23)},
2903             {'Foo': dbus.Signature('s')},
2904             {'Foo': 'bar'}]
2905    for t in tests:
2906        try:
2907            p2p.Find(dbus.Dictionary(t))
2908            raise Exception("Invalid Find accepted")
2909        except dbus.exceptions.DBusException as e:
2910            if "InvalidArgs" not in str(e):
2911                raise Exception("Unexpected error message for invalid Find(): " + str(e))
2912
2913    for p in ["/foo",
2914              "/fi/w1/wpa_supplicant1/Interfaces/1234",
2915              "/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234"]:
2916        try:
2917            p2p.RemovePersistentGroup(dbus.ObjectPath(p))
2918            raise Exception("Invalid RemovePersistentGroup accepted")
2919        except dbus.exceptions.DBusException as e:
2920            if "InvalidArgs" not in str(e):
2921                raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e))
2922
2923    try:
2924        dev[0].request("P2P_SET disabled 1")
2925        p2p.Listen(5)
2926        raise Exception("Invalid Listen accepted")
2927    except dbus.exceptions.DBusException as e:
2928        if "UnknownError: Could not start P2P listen" not in str(e):
2929            raise Exception("Unexpected error message for invalid Listen: " + str(e))
2930    finally:
2931        dev[0].request("P2P_SET disabled 0")
2932
2933    test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False)
2934    test_p2p = dbus.Interface(test_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2935    try:
2936        test_p2p.Listen("foo")
2937        raise Exception("Invalid Listen accepted")
2938    except dbus.exceptions.DBusException as e:
2939        if "InvalidArgs" not in str(e):
2940            raise Exception("Unexpected error message for invalid Listen: " + str(e))
2941
2942    try:
2943        dev[0].request("P2P_SET disabled 1")
2944        p2p.ExtendedListen(dbus.Dictionary({}))
2945        raise Exception("Invalid ExtendedListen accepted")
2946    except dbus.exceptions.DBusException as e:
2947        if "UnknownError: failed to initiate a p2p_ext_listen" not in str(e):
2948            raise Exception("Unexpected error message for invalid ExtendedListen: " + str(e))
2949    finally:
2950        dev[0].request("P2P_SET disabled 0")
2951
2952    try:
2953        dev[0].request("P2P_SET disabled 1")
2954        args = {'duration1': 30000, 'interval1': 102400,
2955                'duration2': 20000, 'interval2': 102400}
2956        p2p.PresenceRequest(args)
2957        raise Exception("Invalid PresenceRequest accepted")
2958    except dbus.exceptions.DBusException as e:
2959        if "UnknownError: Failed to invoke presence request" not in str(e):
2960            raise Exception("Unexpected error message for invalid PresenceRequest: " + str(e))
2961    finally:
2962        dev[0].request("P2P_SET disabled 0")
2963
2964    try:
2965        params = dbus.Dictionary({'frequency': dbus.Int32(-1)})
2966        p2p.GroupAdd(params)
2967        raise Exception("Invalid GroupAdd accepted")
2968    except dbus.exceptions.DBusException as e:
2969        if "InvalidArgs" not in str(e):
2970            raise Exception("Unexpected error message for invalid GroupAdd: " + str(e))
2971
2972    try:
2973        params = dbus.Dictionary({'persistent_group_object':
2974                                  dbus.ObjectPath(path),
2975                                  'frequency': 2412})
2976        p2p.GroupAdd(params)
2977        raise Exception("Invalid GroupAdd accepted")
2978    except dbus.exceptions.DBusException as e:
2979        if "InvalidArgs" not in str(e):
2980            raise Exception("Unexpected error message for invalid GroupAdd: " + str(e))
2981
2982    try:
2983        p2p.Disconnect()
2984        raise Exception("Invalid Disconnect accepted")
2985    except dbus.exceptions.DBusException as e:
2986        if "UnknownError: failed to disconnect" not in str(e):
2987            raise Exception("Unexpected error message for invalid Disconnect: " + str(e))
2988
2989    try:
2990        dev[0].request("P2P_SET disabled 1")
2991        p2p.Flush()
2992        raise Exception("Invalid Flush accepted")
2993    except dbus.exceptions.DBusException as e:
2994        if "Error.Failed: P2P is not available for this interface" not in str(e):
2995            raise Exception("Unexpected error message for invalid Flush: " + str(e))
2996    finally:
2997        dev[0].request("P2P_SET disabled 0")
2998
2999    try:
3000        dev[0].request("P2P_SET disabled 1")
3001        args = {'peer': path,
3002                'join': True,
3003                'wps_method': 'pbc',
3004                'frequency': 2412}
3005        pin = p2p.Connect(args)
3006        raise Exception("Invalid Connect accepted")
3007    except dbus.exceptions.DBusException as e:
3008        if "Error.Failed: P2P is not available for this interface" not in str(e):
3009            raise Exception("Unexpected error message for invalid Connect: " + str(e))
3010    finally:
3011        dev[0].request("P2P_SET disabled 0")
3012
3013    tests = [{'frequency': dbus.Int32(-1)},
3014             {'wps_method': 'pbc'},
3015             {'wps_method': 'foo'}]
3016    for args in tests:
3017        try:
3018            pin = p2p.Connect(args)
3019            raise Exception("Invalid Connect accepted")
3020        except dbus.exceptions.DBusException as e:
3021            if "InvalidArgs" not in str(e):
3022                raise Exception("Unexpected error message for invalid Connect: " + str(e))
3023
3024    try:
3025        dev[0].request("P2P_SET disabled 1")
3026        args = {'peer': path}
3027        pin = p2p.Invite(args)
3028        raise Exception("Invalid Invite accepted")
3029    except dbus.exceptions.DBusException as e:
3030        if "Error.Failed: P2P is not available for this interface" not in str(e):
3031            raise Exception("Unexpected error message for invalid Invite: " + str(e))
3032    finally:
3033        dev[0].request("P2P_SET disabled 0")
3034
3035    try:
3036        args = {'foo': 'bar'}
3037        pin = p2p.Invite(args)
3038        raise Exception("Invalid Invite accepted")
3039    except dbus.exceptions.DBusException as e:
3040        if "InvalidArgs" not in str(e):
3041            raise Exception("Unexpected error message for invalid Connect: " + str(e))
3042
3043    tests = [(path, 'display', "InvalidArgs"),
3044             (dbus.ObjectPath(path + "/Peers/00112233445566"),
3045              'display',
3046              "UnknownError: Failed to send provision discovery request"),
3047             (dbus.ObjectPath(path + "/Peers/00112233445566"),
3048              'keypad',
3049              "UnknownError: Failed to send provision discovery request"),
3050             (dbus.ObjectPath(path + "/Peers/00112233445566"),
3051              'pbc',
3052              "UnknownError: Failed to send provision discovery request"),
3053             (dbus.ObjectPath(path + "/Peers/00112233445566"),
3054              'pushbutton',
3055              "UnknownError: Failed to send provision discovery request"),
3056             (dbus.ObjectPath(path + "/Peers/00112233445566"),
3057              'foo', "InvalidArgs")]
3058    for (p, method, err) in tests:
3059        try:
3060            p2p.ProvisionDiscoveryRequest(p, method)
3061            raise Exception("Invalid ProvisionDiscoveryRequest accepted")
3062        except dbus.exceptions.DBusException as e:
3063            if err not in str(e):
3064                raise Exception("Unexpected error message for invalid ProvisionDiscoveryRequest: " + str(e))
3065
3066    try:
3067        dev[0].request("P2P_SET disabled 1")
3068        if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers",
3069                   dbus_interface=dbus.PROPERTIES_IFACE)
3070        raise Exception("Invalid Get(Peers) accepted")
3071    except dbus.exceptions.DBusException as e:
3072        if "Error.Failed: P2P is not available for this interface" not in str(e):
3073            raise Exception("Unexpected error message for invalid Get(Peers): " + str(e))
3074    finally:
3075        dev[0].request("P2P_SET disabled 0")
3076
3077def test_dbus_p2p_oom(dev, apdev):
3078    """D-Bus P2P operations and OOM"""
3079    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
3080    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3081
3082    with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_string_array",
3083                         "Find", "InvalidArgs"):
3084        p2p.Find(dbus.Dictionary({'Foo': ['bar']}))
3085
3086    with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_string_array",
3087                         "Find", "InvalidArgs"):
3088        p2p.Find(dbus.Dictionary({'Foo': ['bar']}))
3089
3090    with alloc_fail_dbus(dev[0], 10, "_wpa_dbus_dict_entry_get_string_array",
3091                         "Find", "InvalidArgs"):
3092        p2p.Find(dbus.Dictionary({'Foo': ['1', '2', '3', '4', '5', '6', '7',
3093                                          '8', '9']}))
3094
3095    with alloc_fail_dbus(dev[0], 1, ":=_wpa_dbus_dict_entry_get_binarray",
3096                         "Find", "InvalidArgs"):
3097        p2p.Find(dbus.Dictionary({'Foo': [dbus.ByteArray(b'123')]}))
3098
3099    with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array;_wpa_dbus_dict_entry_get_binarray",
3100                         "Find", "InvalidArgs"):
3101        p2p.Find(dbus.Dictionary({'Foo': [dbus.ByteArray(b'123')]}))
3102
3103    with alloc_fail_dbus(dev[0], 2, "=_wpa_dbus_dict_entry_get_binarray",
3104                         "Find", "InvalidArgs"):
3105        p2p.Find(dbus.Dictionary({'Foo': [dbus.ByteArray(b'123'),
3106                                          dbus.ByteArray(b'123'),
3107                                          dbus.ByteArray(b'123'),
3108                                          dbus.ByteArray(b'123'),
3109                                          dbus.ByteArray(b'123'),
3110                                          dbus.ByteArray(b'123'),
3111                                          dbus.ByteArray(b'123'),
3112                                          dbus.ByteArray(b'123'),
3113                                          dbus.ByteArray(b'123'),
3114                                          dbus.ByteArray(b'123'),
3115                                          dbus.ByteArray(b'123')]}))
3116
3117    with alloc_fail_dbus(dev[0], 1, "wpabuf_alloc_ext_data;_wpa_dbus_dict_entry_get_binarray",
3118                         "Find", "InvalidArgs"):
3119        p2p.Find(dbus.Dictionary({'Foo': [dbus.ByteArray(b'123')]}))
3120
3121    with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_handler_p2p_find",
3122                         "Find", "InvalidArgs"):
3123        p2p.Find(dbus.Dictionary({'Foo': path}))
3124
3125    with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array",
3126                         "AddService", "InvalidArgs"):
3127        args = {'service_type': 'bonjour',
3128                'response': dbus.ByteArray(500*b'b')}
3129        p2p.AddService(args)
3130
3131    with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_byte_array",
3132                         "AddService", "InvalidArgs"):
3133        p2p.AddService(args)
3134
3135def test_dbus_p2p_discovery(dev, apdev):
3136    """D-Bus P2P discovery"""
3137    try:
3138        run_dbus_p2p_discovery(dev, apdev)
3139    finally:
3140        dev[1].request("VENDOR_ELEM_REMOVE 1 *")
3141
3142def run_dbus_p2p_discovery(dev, apdev):
3143    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
3144    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3145
3146    addr0 = dev[0].p2p_dev_addr()
3147
3148    dev[1].request("SET sec_device_type 1-0050F204-2")
3149    dev[1].request("VENDOR_ELEM_ADD 1 dd0c0050f2041049000411223344")
3150    dev[1].request("VENDOR_ELEM_ADD 1 dd06001122335566")
3151    dev[1].p2p_listen()
3152    addr1 = dev[1].p2p_dev_addr()
3153    a1 = binascii.unhexlify(addr1.replace(':', ''))
3154
3155    wfd_devinfo = "00001c440028"
3156    dev[2].request("SET wifi_display 1")
3157    dev[2].request("WFD_SUBELEM_SET 0 0006" + wfd_devinfo)
3158    wfd = binascii.unhexlify('000006' + wfd_devinfo)
3159    dev[2].p2p_listen()
3160    addr2 = dev[2].p2p_dev_addr()
3161    a2 = binascii.unhexlify(addr2.replace(':', ''))
3162
3163    res = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
3164                        dbus_interface=dbus.PROPERTIES_IFACE)
3165    if 'Peers' not in res:
3166        raise Exception("GetAll result missing Peers")
3167    if len(res['Peers']) != 0:
3168        raise Exception("Unexpected peer(s) in the list")
3169
3170    args = {'DiscoveryType': 'social',
3171            'RequestedDeviceTypes': [dbus.ByteArray(b'12345678')],
3172            'Timeout': dbus.Int32(1)}
3173    p2p.Find(dbus.Dictionary(args))
3174    p2p.StopFind()
3175
3176    class TestDbusP2p(TestDbus):
3177        def __init__(self, bus):
3178            TestDbus.__init__(self, bus)
3179            self.found = False
3180            self.found2 = False
3181            self.found_prop = False
3182            self.lost = False
3183            self.find_stopped = False
3184
3185        def __enter__(self):
3186            gobject.timeout_add(1, self.run_test)
3187            gobject.timeout_add(15000, self.timeout)
3188            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3189                            "DeviceFound")
3190            self.add_signal(self.deviceFoundProperties,
3191                            WPAS_DBUS_IFACE_P2PDEVICE, "DeviceFoundProperties")
3192            self.add_signal(self.deviceLost, WPAS_DBUS_IFACE_P2PDEVICE,
3193                            "DeviceLost")
3194            self.add_signal(self.provisionDiscoveryResponseEnterPin,
3195                            WPAS_DBUS_IFACE_P2PDEVICE,
3196                            "ProvisionDiscoveryResponseEnterPin")
3197            self.add_signal(self.findStopped, WPAS_DBUS_IFACE_P2PDEVICE,
3198                            "FindStopped")
3199            self.loop.run()
3200            return self
3201
3202        def deviceFound(self, path):
3203            logger.debug("deviceFound: path=%s" % path)
3204            res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers",
3205                             dbus_interface=dbus.PROPERTIES_IFACE)
3206            if len(res) < 1:
3207                raise Exception("Unexpected number of peers")
3208            if path not in res:
3209                raise Exception("Mismatch in peer object path")
3210            peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3211            res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3212                                  dbus_interface=dbus.PROPERTIES_IFACE,
3213                                  byte_arrays=True)
3214            logger.debug("peer properties: " + str(res))
3215
3216            if res['DeviceAddress'] == a1:
3217                if 'SecondaryDeviceTypes' not in res:
3218                    raise Exception("Missing SecondaryDeviceTypes")
3219                sec = res['SecondaryDeviceTypes']
3220                if len(sec) < 1:
3221                    raise Exception("Secondary device type missing")
3222                if b"\x00\x01\x00\x50\xF2\x04\x00\x02" not in sec:
3223                    raise Exception("Secondary device type mismatch")
3224
3225                if 'VendorExtension' not in res:
3226                    raise Exception("Missing VendorExtension")
3227                vendor = res['VendorExtension']
3228                if len(vendor) < 1:
3229                    raise Exception("Vendor extension missing")
3230                if b"\x11\x22\x33\x44" not in vendor:
3231                    raise Exception("Secondary device type mismatch")
3232
3233                if 'VSIE' not in res:
3234                    raise Exception("Missing VSIE")
3235                vendor = res['VSIE']
3236                if len(vendor) < 1:
3237                    raise Exception("VSIE missing")
3238                if vendor != b"\xdd\x06\x00\x11\x22\x33\x55\x66":
3239                    raise Exception("VSIE mismatch")
3240
3241                self.found = True
3242            elif res['DeviceAddress'] == a2:
3243                if 'IEs' not in res:
3244                    raise Exception("IEs missing")
3245                if res['IEs'] != wfd:
3246                    raise Exception("IEs mismatch")
3247                self.found2 = True
3248            else:
3249                raise Exception("Unexpected peer device address")
3250
3251            if self.found and self.found2:
3252                p2p.StopFind()
3253                p2p.RejectPeer(path)
3254                p2p.ProvisionDiscoveryRequest(path, 'display')
3255
3256        def deviceLost(self, path):
3257            logger.debug("deviceLost: path=%s" % path)
3258            if not self.found or not self.found2:
3259                # This may happen if a previous test case ended up scheduling
3260                # deviceLost event and that event did not get delivered before
3261                # starting the next test execution.
3262                logger.debug("Ignore deviceLost before the deviceFound events")
3263                return
3264            self.lost = True
3265            try:
3266                p2p.RejectPeer(path)
3267                raise Exception("Invalid RejectPeer accepted")
3268            except dbus.exceptions.DBusException as e:
3269                if "UnknownError: Failed to call wpas_p2p_reject" not in str(e):
3270                    raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
3271            self.loop.quit()
3272
3273        def deviceFoundProperties(self, path, properties):
3274            logger.debug("deviceFoundProperties: path=%s" % path)
3275            logger.debug("peer properties: " + str(properties))
3276            if properties['DeviceAddress'] == a1:
3277                self.found_prop = True
3278
3279        def provisionDiscoveryResponseEnterPin(self, peer_object):
3280            logger.debug("provisionDiscoveryResponseEnterPin - peer=%s" % peer_object)
3281            p2p.Flush()
3282
3283        def findStopped(self):
3284            logger.debug("findStopped")
3285            self.find_stopped = True
3286
3287        def run_test(self, *args):
3288            logger.debug("run_test")
3289            p2p.Find(dbus.Dictionary({'DiscoveryType': 'social',
3290                                      'Timeout': dbus.Int32(10)}))
3291            return False
3292
3293        def success(self):
3294            return self.found and self.lost and self.found2 and self.find_stopped
3295
3296    with TestDbusP2p(bus) as t:
3297        if not t.success():
3298            raise Exception("Expected signals not seen")
3299
3300    dev[1].request("VENDOR_ELEM_REMOVE 1 *")
3301    dev[1].p2p_stop_find()
3302
3303    p2p.Listen(1)
3304    dev[2].p2p_stop_find()
3305    dev[2].request("P2P_FLUSH")
3306    if not dev[2].discover_peer(addr0):
3307        raise Exception("Peer not found")
3308    p2p.StopFind()
3309    dev[2].p2p_stop_find()
3310
3311    try:
3312        p2p.ExtendedListen(dbus.Dictionary({'foo': 100}))
3313        raise Exception("Invalid ExtendedListen accepted")
3314    except dbus.exceptions.DBusException as e:
3315        if "InvalidArgs" not in str(e):
3316            raise Exception("Unexpected error message for invalid ExtendedListen(): " + str(e))
3317
3318    p2p.ExtendedListen(dbus.Dictionary({'period': 100, 'interval': 1000}))
3319    p2p.ExtendedListen(dbus.Dictionary({}))
3320    dev[0].global_request("P2P_EXT_LISTEN")
3321
3322def test_dbus_p2p_discovery_freq(dev, apdev):
3323    """D-Bus P2P discovery on a specific non-social channel"""
3324    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
3325    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3326
3327    addr1 = dev[1].p2p_dev_addr()
3328    autogo(dev[1], freq=2422)
3329
3330    class TestDbusP2p(TestDbus):
3331        def __init__(self, bus):
3332            TestDbus.__init__(self, bus)
3333            self.found = False
3334
3335        def __enter__(self):
3336            gobject.timeout_add(1, self.run_test)
3337            gobject.timeout_add(5000, self.timeout)
3338            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3339                            "DeviceFound")
3340            self.loop.run()
3341            return self
3342
3343        def deviceFound(self, path):
3344            logger.debug("deviceFound: path=%s" % path)
3345            self.found = True
3346            self.loop.quit()
3347
3348        def run_test(self, *args):
3349            logger.debug("run_test")
3350            p2p.Find(dbus.Dictionary({'freq': 2422}))
3351            return False
3352
3353        def success(self):
3354            return self.found
3355
3356    with TestDbusP2p(bus) as t:
3357        if not t.success():
3358            raise Exception("Expected signals not seen")
3359
3360    dev[1].remove_group()
3361    p2p.StopFind()
3362
3363def test_dbus_p2p_service_discovery(dev, apdev):
3364    """D-Bus P2P service discovery"""
3365    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
3366    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3367
3368    addr0 = dev[0].p2p_dev_addr()
3369    addr1 = dev[1].p2p_dev_addr()
3370
3371    bonjour_query = dbus.ByteArray(binascii.unhexlify('0b5f6166706f766572746370c00c000c01'))
3372    bonjour_response = dbus.ByteArray(binascii.unhexlify('074578616d706c65c027'))
3373
3374    args = {'service_type': 'bonjour',
3375            'query': bonjour_query,
3376            'response': bonjour_response}
3377    p2p.AddService(args)
3378    p2p.FlushService()
3379    p2p.AddService(args)
3380
3381    try:
3382        p2p.DeleteService(args)
3383        raise Exception("Invalid DeleteService() accepted")
3384    except dbus.exceptions.DBusException as e:
3385        if "InvalidArgs" not in str(e):
3386            raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
3387
3388    args = {'service_type': 'bonjour',
3389            'query': bonjour_query}
3390    p2p.DeleteService(args)
3391    try:
3392        p2p.DeleteService(args)
3393        raise Exception("Invalid DeleteService() accepted")
3394    except dbus.exceptions.DBusException as e:
3395        if "InvalidArgs" not in str(e):
3396            raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
3397
3398    args = {'service_type': 'upnp',
3399            'version': 0x10,
3400            'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice'}
3401    p2p.AddService(args)
3402    p2p.DeleteService(args)
3403    try:
3404        p2p.DeleteService(args)
3405        raise Exception("Invalid DeleteService() accepted")
3406    except dbus.exceptions.DBusException as e:
3407        if "InvalidArgs" not in str(e):
3408            raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
3409
3410    tests = [{'service_type': 'foo'},
3411             {'service_type': 'foo', 'query': bonjour_query},
3412             {'service_type': 'upnp'},
3413             {'service_type': 'upnp', 'version': 0x10},
3414             {'service_type': 'upnp',
3415              'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice'},
3416             {'version': 0x10,
3417              'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice'},
3418             {'service_type': 'upnp', 'foo': 'bar'},
3419             {'service_type': 'bonjour'},
3420             {'service_type': 'bonjour', 'query': 'foo'},
3421             {'service_type': 'bonjour', 'foo': 'bar'}]
3422    for args in tests:
3423        try:
3424            p2p.DeleteService(args)
3425            raise Exception("Invalid DeleteService() accepted")
3426        except dbus.exceptions.DBusException as e:
3427            if "InvalidArgs" not in str(e):
3428                raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
3429
3430    tests = [{'service_type': 'foo'},
3431             {'service_type': 'upnp'},
3432             {'service_type': 'upnp', 'version': 0x10},
3433             {'service_type': 'upnp',
3434              'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice'},
3435             {'version': 0x10,
3436              'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice'},
3437             {'service_type': 'upnp', 'foo': 'bar'},
3438             {'service_type': 'bonjour'},
3439             {'service_type': 'bonjour', 'query': 'foo'},
3440             {'service_type': 'bonjour', 'response': 'foo'},
3441             {'service_type': 'bonjour', 'query': bonjour_query},
3442             {'service_type': 'bonjour', 'response': bonjour_response},
3443             {'service_type': 'bonjour', 'query': dbus.ByteArray(500*b'a')},
3444             {'service_type': 'bonjour', 'foo': 'bar'}]
3445    for args in tests:
3446        try:
3447            p2p.AddService(args)
3448            raise Exception("Invalid AddService() accepted")
3449        except dbus.exceptions.DBusException as e:
3450            if "InvalidArgs" not in str(e):
3451                raise Exception("Unexpected error message for invalid AddService(): " + str(e))
3452
3453    args = {'tlv': dbus.ByteArray(b"\x02\x00\x00\x01")}
3454    ref = p2p.ServiceDiscoveryRequest(args)
3455    p2p.ServiceDiscoveryCancelRequest(ref)
3456    try:
3457        p2p.ServiceDiscoveryCancelRequest(ref)
3458        raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
3459    except dbus.exceptions.DBusException as e:
3460        if "InvalidArgs" not in str(e):
3461            raise Exception("Unexpected error message for invalid AddService(): " + str(e))
3462    try:
3463        p2p.ServiceDiscoveryCancelRequest(dbus.UInt64(0))
3464        raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
3465    except dbus.exceptions.DBusException as e:
3466        if "InvalidArgs" not in str(e):
3467            raise Exception("Unexpected error message for invalid AddService(): " + str(e))
3468
3469    args = {'service_type': 'upnp',
3470            'version': 0x10,
3471            'service': 'ssdp:foo'}
3472    ref = p2p.ServiceDiscoveryRequest(args)
3473    p2p.ServiceDiscoveryCancelRequest(ref)
3474
3475    tests = [{'service_type': 'foo'},
3476             {'foo': 'bar'},
3477             {'tlv': 'foo'},
3478             {},
3479             {'version': 0},
3480             {'service_type': 'upnp',
3481              'service': 'ssdp:foo'},
3482             {'service_type': 'upnp',
3483              'version': 0x10},
3484             {'service_type': 'upnp',
3485              'version': 0x10,
3486              'service': 'ssdp:foo',
3487              'peer_object': dbus.ObjectPath(path + "/Peers")},
3488             {'service_type': 'upnp',
3489              'version': 0x10,
3490              'service': 'ssdp:foo',
3491              'peer_object': path + "/Peers"},
3492             {'service_type': 'upnp',
3493              'version': 0x10,
3494              'service': 'ssdp:foo',
3495              'peer_object': dbus.ObjectPath(path + "/Peers/00112233445566")}]
3496    for args in tests:
3497        try:
3498            p2p.ServiceDiscoveryRequest(args)
3499            raise Exception("Invalid ServiceDiscoveryRequest accepted")
3500        except dbus.exceptions.DBusException as e:
3501            if "InvalidArgs" not in str(e):
3502                raise Exception("Unexpected error message for invalid ServiceDiscoveryRequest(): " + str(e))
3503
3504    args = {'foo': 'bar'}
3505    try:
3506        p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv'))
3507        raise Exception("Invalid ServiceDiscoveryResponse accepted")
3508    except dbus.exceptions.DBusException as e:
3509        if "InvalidArgs" not in str(e):
3510            raise Exception("Unexpected error message for invalid ServiceDiscoveryResponse(): " + str(e))
3511
3512def test_dbus_p2p_service_discovery_query(dev, apdev):
3513    """D-Bus P2P service discovery query"""
3514    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
3515    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3516
3517    addr0 = dev[0].p2p_dev_addr()
3518    dev[1].request("P2P_SERVICE_ADD bonjour 0b5f6166706f766572746370c00c000c01 074578616d706c65c027")
3519    dev[1].p2p_listen()
3520    addr1 = dev[1].p2p_dev_addr()
3521
3522    class TestDbusP2p(TestDbus):
3523        def __init__(self, bus):
3524            TestDbus.__init__(self, bus)
3525            self.done = False
3526
3527        def __enter__(self):
3528            gobject.timeout_add(1, self.run_test)
3529            gobject.timeout_add(15000, self.timeout)
3530            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3531                            "DeviceFound")
3532            self.add_signal(self.serviceDiscoveryResponse,
3533                            WPAS_DBUS_IFACE_P2PDEVICE,
3534                            "ServiceDiscoveryResponse", byte_arrays=True)
3535            self.loop.run()
3536            return self
3537
3538        def deviceFound(self, path):
3539            logger.debug("deviceFound: path=%s" % path)
3540            args = {'peer_object': path,
3541                    'tlv': dbus.ByteArray(b"\x02\x00\x00\x01")}
3542            p2p.ServiceDiscoveryRequest(args)
3543
3544        def serviceDiscoveryResponse(self, sd_request):
3545            logger.debug("serviceDiscoveryResponse: sd_request=%s" % str(sd_request))
3546            self.done = True
3547            self.loop.quit()
3548
3549        def run_test(self, *args):
3550            logger.debug("run_test")
3551            p2p.Find(dbus.Dictionary({'DiscoveryType': 'social',
3552                                      'Timeout': dbus.Int32(10)}))
3553            return False
3554
3555        def success(self):
3556            return self.done
3557
3558    with TestDbusP2p(bus) as t:
3559        if not t.success():
3560            raise Exception("Expected signals not seen")
3561
3562    dev[1].p2p_stop_find()
3563
3564def test_dbus_p2p_service_discovery_external(dev, apdev):
3565    """D-Bus P2P service discovery with external response"""
3566    try:
3567        _test_dbus_p2p_service_discovery_external(dev, apdev)
3568    finally:
3569        dev[0].request("P2P_SERV_DISC_EXTERNAL 0")
3570
3571def _test_dbus_p2p_service_discovery_external(dev, apdev):
3572    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
3573    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3574
3575    addr0 = dev[0].p2p_dev_addr()
3576    addr1 = dev[1].p2p_dev_addr()
3577    resp = "0300000101"
3578
3579    dev[1].request("P2P_FLUSH")
3580    dev[1].request("P2P_SERV_DISC_REQ " + addr0 + " 02000001")
3581    dev[1].p2p_find(social=True)
3582
3583    class TestDbusP2p(TestDbus):
3584        def __init__(self, bus):
3585            TestDbus.__init__(self, bus)
3586            self.sd = False
3587
3588        def __enter__(self):
3589            gobject.timeout_add(1, self.run_test)
3590            gobject.timeout_add(15000, self.timeout)
3591            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3592                            "DeviceFound")
3593            self.add_signal(self.serviceDiscoveryRequest,
3594                            WPAS_DBUS_IFACE_P2PDEVICE,
3595                            "ServiceDiscoveryRequest")
3596            self.loop.run()
3597            return self
3598
3599        def deviceFound(self, path):
3600            logger.debug("deviceFound: path=%s" % path)
3601
3602        def serviceDiscoveryRequest(self, sd_request):
3603            logger.debug("serviceDiscoveryRequest: sd_request=%s" % str(sd_request))
3604            self.sd = True
3605            args = {'peer_object': sd_request['peer_object'],
3606                    'frequency': sd_request['frequency'],
3607                    'dialog_token': sd_request['dialog_token'],
3608                    'tlvs': dbus.ByteArray(binascii.unhexlify(resp))}
3609            p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv'))
3610            self.loop.quit()
3611
3612        def run_test(self, *args):
3613            logger.debug("run_test")
3614            p2p.ServiceDiscoveryExternal(1)
3615            p2p.ServiceUpdate()
3616            p2p.Listen(15)
3617            return False
3618
3619        def success(self):
3620            return self.sd
3621
3622    with TestDbusP2p(bus) as t:
3623        if not t.success():
3624            raise Exception("Expected signals not seen")
3625
3626    ev = dev[1].wait_global_event(["P2P-SERV-DISC-RESP"], timeout=5)
3627    if ev is None:
3628        raise Exception("Service discovery timed out")
3629    if addr0 not in ev:
3630        raise Exception("Unexpected address in SD Response: " + ev)
3631    if ev.split(' ')[4] != resp:
3632        raise Exception("Unexpected response data SD Response: " + ev)
3633    dev[1].p2p_stop_find()
3634
3635    p2p.StopFind()
3636    p2p.ServiceDiscoveryExternal(0)
3637
3638def test_dbus_p2p_autogo(dev, apdev):
3639    """D-Bus P2P autonomous GO"""
3640    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
3641    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3642
3643    addr0 = dev[0].p2p_dev_addr()
3644
3645    class TestDbusP2p(TestDbus):
3646        def __init__(self, bus):
3647            TestDbus.__init__(self, bus)
3648            self.first = True
3649            self.waiting_end = False
3650            self.exceptions = False
3651            self.deauthorized = False
3652            self.done = False
3653
3654        def __enter__(self):
3655            gobject.timeout_add(1, self.run_test)
3656            gobject.timeout_add(15000, self.timeout)
3657            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3658                            "DeviceFound")
3659            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3660                            "GroupStarted")
3661            self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3662                            "GroupFinished")
3663            self.add_signal(self.persistentGroupAdded,
3664                            WPAS_DBUS_IFACE_P2PDEVICE,
3665                            "PersistentGroupAdded")
3666            self.add_signal(self.persistentGroupRemoved,
3667                            WPAS_DBUS_IFACE_P2PDEVICE,
3668                            "PersistentGroupRemoved")
3669            self.add_signal(self.provisionDiscoveryRequestDisplayPin,
3670                            WPAS_DBUS_IFACE_P2PDEVICE,
3671                            "ProvisionDiscoveryRequestDisplayPin")
3672            self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3673                            "StaAuthorized")
3674            self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE,
3675                            "StaDeauthorized")
3676            self.loop.run()
3677            return self
3678
3679        def groupStarted(self, properties):
3680            logger.debug("groupStarted: " + str(properties))
3681            self.group = properties['group_object']
3682            self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3683                                           properties['interface_object'])
3684            role = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
3685                                     dbus_interface=dbus.PROPERTIES_IFACE)
3686            if role != "GO":
3687                self.exceptions = True
3688                raise Exception("Unexpected role reported: " + role)
3689            group = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
3690                                      dbus_interface=dbus.PROPERTIES_IFACE)
3691            if group != properties['group_object']:
3692                self.exceptions = True
3693                raise Exception("Unexpected Group reported: " + str(group))
3694            go = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
3695                                   dbus_interface=dbus.PROPERTIES_IFACE)
3696            if go != '/':
3697                self.exceptions = True
3698                raise Exception("Unexpected PeerGO value: " + str(go))
3699            if self.first:
3700                self.first = False
3701                logger.info("Remove persistent group instance")
3702                group_p2p = dbus.Interface(self.g_if_obj,
3703                                           WPAS_DBUS_IFACE_P2PDEVICE)
3704                group_p2p.Disconnect()
3705            else:
3706                dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3707                dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join")
3708
3709        def groupFinished(self, properties):
3710            logger.debug("groupFinished: " + str(properties))
3711            if self.waiting_end:
3712                logger.info("Remove persistent group")
3713                p2p.RemovePersistentGroup(self.persistent)
3714            else:
3715                logger.info("Re-start persistent group")
3716                params = dbus.Dictionary({'persistent_group_object':
3717                                          self.persistent,
3718                                          'frequency': 2412})
3719                p2p.GroupAdd(params)
3720
3721        def persistentGroupAdded(self, path, properties):
3722            logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
3723            self.persistent = path
3724
3725        def persistentGroupRemoved(self, path):
3726            logger.debug("persistentGroupRemoved: %s" % path)
3727            self.done = True
3728            self.loop.quit()
3729
3730        def deviceFound(self, path):
3731            logger.debug("deviceFound: path=%s" % path)
3732            peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3733            self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3734                                        dbus_interface=dbus.PROPERTIES_IFACE,
3735                                        byte_arrays=True)
3736            logger.debug('peer properties: ' + str(self.peer))
3737
3738        def provisionDiscoveryRequestDisplayPin(self, peer_object, pin):
3739            logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin))
3740            self.peer_path = peer_object
3741            peer = binascii.unhexlify(peer_object.split('/')[-1])
3742            addr = ':'.join(["%02x" % i for i in struct.unpack('6B', peer)])
3743
3744            params = {'Role': 'registrar',
3745                      'P2PDeviceAddress': self.peer['DeviceAddress'],
3746                      'Bssid': self.peer['DeviceAddress'],
3747                      'Type': 'pin'}
3748            wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
3749            try:
3750                wps.Start(params)
3751                self.exceptions = True
3752                raise Exception("Invalid WPS.Start() accepted")
3753            except dbus.exceptions.DBusException as e:
3754                if "InvalidArgs" not in str(e):
3755                    self.exceptions = True
3756                    raise Exception("Unexpected error message: " + str(e))
3757            params = {'Role': 'registrar',
3758                      'P2PDeviceAddress': self.peer['DeviceAddress'],
3759                      'Type': 'pin',
3760                      'Pin': '12345670'}
3761            logger.info("Authorize peer to connect to the group")
3762            wps.Start(params)
3763
3764        def staAuthorized(self, name):
3765            logger.debug("staAuthorized: " + name)
3766            peer_obj = bus.get_object(WPAS_DBUS_SERVICE, self.peer_path)
3767            res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3768                                  dbus_interface=dbus.PROPERTIES_IFACE,
3769                                  byte_arrays=True)
3770            logger.debug("Peer properties: " + str(res))
3771            if 'Groups' not in res or len(res['Groups']) != 1:
3772                self.exceptions = True
3773                raise Exception("Unexpected number of peer Groups entries")
3774            if res['Groups'][0] != self.group:
3775                self.exceptions = True
3776                raise Exception("Unexpected peer Groups[0] value")
3777
3778            g_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group)
3779            res = g_obj.GetAll(WPAS_DBUS_GROUP,
3780                               dbus_interface=dbus.PROPERTIES_IFACE,
3781                               byte_arrays=True)
3782            logger.debug("Group properties: " + str(res))
3783            if 'Members' not in res or len(res['Members']) != 1:
3784                self.exceptions = True
3785                raise Exception("Unexpected number of group members")
3786
3787            ext = dbus.ByteArray(b"\x11\x22\x33\x44")
3788            # Earlier implementation of this interface was a bit strange. The
3789            # property is defined to have aay signature and that is what the
3790            # getter returned. However, the setter expected there to be a
3791            # dictionary with 'WPSVendorExtensions' as the key surrounding these
3792            # values.. The current implementations maintains support for that
3793            # for backwards compability reasons. Verify that encoding first.
3794            vals = dbus.Dictionary({'WPSVendorExtensions': [ext]},
3795                                   signature='sv')
3796            g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3797                      dbus_interface=dbus.PROPERTIES_IFACE)
3798            res = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions',
3799                               dbus_interface=dbus.PROPERTIES_IFACE,
3800                               byte_arrays=True)
3801            if len(res) != 1:
3802                self.exceptions = True
3803                raise Exception("Unexpected number of vendor extensions")
3804            if res[0] != ext:
3805                self.exceptions = True
3806                raise Exception("Vendor extension value changed")
3807
3808            # And now verify that the more appropriate encoding is accepted as
3809            # well.
3810            res.append(dbus.ByteArray(b'\xaa\xbb\xcc\xdd\xee\xff'))
3811            g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3812                      dbus_interface=dbus.PROPERTIES_IFACE)
3813            res2 = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions',
3814                             dbus_interface=dbus.PROPERTIES_IFACE,
3815                             byte_arrays=True)
3816            if len(res) != 2:
3817                self.exceptions = True
3818                raise Exception("Unexpected number of vendor extensions")
3819            if res[0] != res2[0] or res[1] != res2[1]:
3820                self.exceptions = True
3821                raise Exception("Vendor extension value changed")
3822
3823            for i in range(10):
3824                res.append(dbus.ByteArray(b'\xaa\xbb'))
3825            try:
3826                g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3827                          dbus_interface=dbus.PROPERTIES_IFACE)
3828                self.exceptions = True
3829                raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3830            except dbus.exceptions.DBusException as e:
3831                if "Error.Failed" not in str(e):
3832                    self.exceptions = True
3833                    raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3834
3835            vals = dbus.Dictionary({'Foo': [ext]}, signature='sv')
3836            try:
3837                g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3838                          dbus_interface=dbus.PROPERTIES_IFACE)
3839                self.exceptions = True
3840                raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3841            except dbus.exceptions.DBusException as e:
3842                if "InvalidArgs" not in str(e):
3843                    self.exceptions = True
3844                    raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3845
3846            vals = ["foo"]
3847            try:
3848                g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3849                          dbus_interface=dbus.PROPERTIES_IFACE)
3850                self.exceptions = True
3851                raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3852            except dbus.exceptions.DBusException as e:
3853                if "Error.Failed" not in str(e):
3854                    self.exceptions = True
3855                    raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3856
3857            vals = [["foo"]]
3858            try:
3859                g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3860                          dbus_interface=dbus.PROPERTIES_IFACE)
3861                self.exceptions = True
3862                raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3863            except dbus.exceptions.DBusException as e:
3864                if "Error.Failed" not in str(e):
3865                    self.exceptions = True
3866                    raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3867
3868            p2p.RemoveClient({'peer': self.peer_path})
3869
3870            self.waiting_end = True
3871            group_p2p = dbus.Interface(self.g_if_obj,
3872                                       WPAS_DBUS_IFACE_P2PDEVICE)
3873            group_p2p.Disconnect()
3874
3875        def staDeauthorized(self, name):
3876            logger.debug("staDeauthorized: " + name)
3877            self.deauthorized = True
3878
3879        def run_test(self, *args):
3880            logger.debug("run_test")
3881            params = dbus.Dictionary({'persistent': True,
3882                                      'frequency': 2412})
3883            logger.info("Add a persistent group")
3884            p2p.GroupAdd(params)
3885            return False
3886
3887        def success(self):
3888            return self.done and self.deauthorized and not self.exceptions
3889
3890    with TestDbusP2p(bus) as t:
3891        if not t.success():
3892            raise Exception("Expected signals not seen")
3893
3894    dev[1].wait_go_ending_session()
3895
3896def test_dbus_p2p_autogo_pbc(dev, apdev):
3897    """D-Bus P2P autonomous GO and PBC"""
3898    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
3899    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3900
3901    addr0 = dev[0].p2p_dev_addr()
3902
3903    class TestDbusP2p(TestDbus):
3904        def __init__(self, bus):
3905            TestDbus.__init__(self, bus)
3906            self.first = True
3907            self.waiting_end = False
3908            self.done = False
3909
3910        def __enter__(self):
3911            gobject.timeout_add(1, self.run_test)
3912            gobject.timeout_add(15000, self.timeout)
3913            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3914                            "DeviceFound")
3915            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3916                            "GroupStarted")
3917            self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3918                            "GroupFinished")
3919            self.add_signal(self.provisionDiscoveryPBCRequest,
3920                            WPAS_DBUS_IFACE_P2PDEVICE,
3921                            "ProvisionDiscoveryPBCRequest")
3922            self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3923                            "StaAuthorized")
3924            self.loop.run()
3925            return self
3926
3927        def groupStarted(self, properties):
3928            logger.debug("groupStarted: " + str(properties))
3929            self.group = properties['group_object']
3930            self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3931                                           properties['interface_object'])
3932            dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3933            dev1.global_request("P2P_CONNECT " + addr0 + " pbc join")
3934
3935        def groupFinished(self, properties):
3936            logger.debug("groupFinished: " + str(properties))
3937            self.done = True
3938            self.loop.quit()
3939
3940        def deviceFound(self, path):
3941            logger.debug("deviceFound: path=%s" % path)
3942            peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3943            self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3944                                        dbus_interface=dbus.PROPERTIES_IFACE,
3945                                        byte_arrays=True)
3946            logger.debug('peer properties: ' + str(self.peer))
3947
3948        def provisionDiscoveryPBCRequest(self, peer_object):
3949            logger.debug("provisionDiscoveryPBCRequest - peer=%s" % peer_object)
3950            self.peer_path = peer_object
3951            peer = binascii.unhexlify(peer_object.split('/')[-1])
3952            addr = ':'.join(["%02x" % i for i in struct.unpack('6B', peer)])
3953            params = {'Role': 'registrar',
3954                      'P2PDeviceAddress': self.peer['DeviceAddress'],
3955                      'Type': 'pbc'}
3956            logger.info("Authorize peer to connect to the group")
3957            wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
3958            wps.Start(params)
3959
3960        def staAuthorized(self, name):
3961            logger.debug("staAuthorized: " + name)
3962            group_p2p = dbus.Interface(self.g_if_obj,
3963                                       WPAS_DBUS_IFACE_P2PDEVICE)
3964            group_p2p.Disconnect()
3965
3966        def run_test(self, *args):
3967            logger.debug("run_test")
3968            params = dbus.Dictionary({'frequency': 2412})
3969            p2p.GroupAdd(params)
3970            return False
3971
3972        def success(self):
3973            return self.done
3974
3975    with TestDbusP2p(bus) as t:
3976        if not t.success():
3977            raise Exception("Expected signals not seen")
3978
3979    dev[1].wait_go_ending_session()
3980    dev[1].flush_scan_cache()
3981
3982def test_dbus_p2p_autogo_legacy(dev, apdev):
3983    """D-Bus P2P autonomous GO and legacy STA"""
3984    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
3985    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3986
3987    addr0 = dev[0].p2p_dev_addr()
3988
3989    class TestDbusP2p(TestDbus):
3990        def __init__(self, bus):
3991            TestDbus.__init__(self, bus)
3992            self.done = False
3993
3994        def __enter__(self):
3995            gobject.timeout_add(1, self.run_test)
3996            gobject.timeout_add(15000, self.timeout)
3997            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3998                            "GroupStarted")
3999            self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4000                            "GroupFinished")
4001            self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
4002                            "StaAuthorized")
4003            self.loop.run()
4004            return self
4005
4006        def groupStarted(self, properties):
4007            logger.debug("groupStarted: " + str(properties))
4008            g_obj = bus.get_object(WPAS_DBUS_SERVICE,
4009                                   properties['group_object'])
4010            res = g_obj.GetAll(WPAS_DBUS_GROUP,
4011                               dbus_interface=dbus.PROPERTIES_IFACE,
4012                               byte_arrays=True)
4013            bssid = ':'.join(["%02x" % i for i in struct.unpack('6B', res['BSSID'])])
4014
4015            pin = '12345670'
4016            params = {'Role': 'enrollee',
4017                      'Type': 'pin',
4018                      'Pin': pin}
4019            g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4020                                      properties['interface_object'])
4021            wps = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_WPS)
4022            wps.Start(params)
4023            dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4024            dev1.scan_for_bss(bssid, freq=2412)
4025            dev1.request("WPS_PIN " + bssid + " " + pin)
4026            self.group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4027
4028        def groupFinished(self, properties):
4029            logger.debug("groupFinished: " + str(properties))
4030            self.done = True
4031            self.loop.quit()
4032
4033        def staAuthorized(self, name):
4034            logger.debug("staAuthorized: " + name)
4035            dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4036            dev1.request("DISCONNECT")
4037            self.group_p2p.Disconnect()
4038
4039        def run_test(self, *args):
4040            logger.debug("run_test")
4041            params = dbus.Dictionary({'frequency': 2412})
4042            p2p.GroupAdd(params)
4043            return False
4044
4045        def success(self):
4046            return self.done
4047
4048    with TestDbusP2p(bus) as t:
4049        if not t.success():
4050            raise Exception("Expected signals not seen")
4051
4052def test_dbus_p2p_join(dev, apdev):
4053    """D-Bus P2P join an autonomous GO"""
4054    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
4055    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4056
4057    addr1 = dev[1].p2p_dev_addr()
4058    addr2 = dev[2].p2p_dev_addr()
4059    dev[1].p2p_start_go(freq=2412)
4060    dev1_group_ifname = dev[1].group_ifname
4061    dev[2].p2p_listen()
4062
4063    class TestDbusP2p(TestDbus):
4064        def __init__(self, bus):
4065            TestDbus.__init__(self, bus)
4066            self.done = False
4067            self.peer = None
4068            self.go = None
4069
4070        def __enter__(self):
4071            gobject.timeout_add(1, self.run_test)
4072            gobject.timeout_add(15000, self.timeout)
4073            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4074                            "DeviceFound")
4075            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4076                            "GroupStarted")
4077            self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4078                            "GroupFinished")
4079            self.add_signal(self.invitationResult, WPAS_DBUS_IFACE_P2PDEVICE,
4080                            "InvitationResult")
4081            self.loop.run()
4082            return self
4083
4084        def deviceFound(self, path):
4085            logger.debug("deviceFound: path=%s" % path)
4086            peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
4087            res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
4088                                  dbus_interface=dbus.PROPERTIES_IFACE,
4089                                  byte_arrays=True)
4090            logger.debug('peer properties: ' + str(res))
4091            if addr2.replace(':', '') in path:
4092                self.peer = path
4093            elif addr1.replace(':', '') in path:
4094                self.go = path
4095            if self.peer and self.go:
4096                logger.info("Join the group")
4097                p2p.StopFind()
4098                args = {'peer': self.go,
4099                        'join': True,
4100                        'wps_method': 'pin',
4101                        'frequency': 2412}
4102                pin = p2p.Connect(args)
4103
4104                dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4105                dev1.group_ifname = dev1_group_ifname
4106                dev1.group_request("WPS_PIN any " + pin)
4107
4108        def groupStarted(self, properties):
4109            logger.debug("groupStarted: " + str(properties))
4110            g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4111                                      properties['interface_object'])
4112            role = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
4113                                dbus_interface=dbus.PROPERTIES_IFACE)
4114            if role != "client":
4115                raise Exception("Unexpected role reported: " + role)
4116            group = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
4117                                 dbus_interface=dbus.PROPERTIES_IFACE)
4118            if group != properties['group_object']:
4119                raise Exception("Unexpected Group reported: " + str(group))
4120            go = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
4121                              dbus_interface=dbus.PROPERTIES_IFACE)
4122            if go != self.go:
4123                raise Exception("Unexpected PeerGO value: " + str(go))
4124
4125            g_obj = bus.get_object(WPAS_DBUS_SERVICE,
4126                                   properties['group_object'])
4127            res = g_obj.GetAll(WPAS_DBUS_GROUP,
4128                               dbus_interface=dbus.PROPERTIES_IFACE,
4129                               byte_arrays=True)
4130            logger.debug("Group properties: " + str(res))
4131
4132            ext = dbus.ByteArray(b"\x11\x22\x33\x44")
4133            try:
4134                # Set(WPSVendorExtensions) not allowed for P2P Client
4135                g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
4136                          dbus_interface=dbus.PROPERTIES_IFACE)
4137                raise Exception("Invalid Set(WPSVendorExtensions) accepted")
4138            except dbus.exceptions.DBusException as e:
4139                if "Error.Failed: Failed to set property" not in str(e):
4140                    raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
4141
4142            group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4143            args = {'duration1': 30000, 'interval1': 102400,
4144                    'duration2': 20000, 'interval2': 102400}
4145            group_p2p.PresenceRequest(args)
4146
4147            args = {'peer': self.peer}
4148            group_p2p.Invite(args)
4149
4150        def groupFinished(self, properties):
4151            logger.debug("groupFinished: " + str(properties))
4152            self.done = True
4153            self.loop.quit()
4154
4155        def invitationResult(self, result):
4156            logger.debug("invitationResult: " + str(result))
4157            if result['status'] != 1:
4158                raise Exception("Unexpected invitation result: " + str(result))
4159            dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4160            dev1.group_ifname = dev1_group_ifname
4161            dev1.remove_group()
4162
4163        def run_test(self, *args):
4164            logger.debug("run_test")
4165            p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4166            return False
4167
4168        def success(self):
4169            return self.done
4170
4171    with TestDbusP2p(bus) as t:
4172        if not t.success():
4173            raise Exception("Expected signals not seen")
4174
4175    dev[2].p2p_stop_find()
4176
4177def test_dbus_p2p_invitation_received(dev, apdev):
4178    """D-Bus P2P and InvitationReceived"""
4179    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
4180    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4181
4182    form(dev[0], dev[1])
4183    addr0 = dev[0].p2p_dev_addr()
4184    dev[0].p2p_listen()
4185    dev[0].global_request("SET persistent_reconnect 0")
4186
4187    if not dev[1].discover_peer(addr0, social=True):
4188        raise Exception("Peer " + addr0 + " not found")
4189    peer = dev[1].get_peer(addr0)
4190
4191    class TestDbusP2p(TestDbus):
4192        def __init__(self, bus):
4193            TestDbus.__init__(self, bus)
4194            self.done = False
4195
4196        def __enter__(self):
4197            gobject.timeout_add(1, self.run_test)
4198            gobject.timeout_add(15000, self.timeout)
4199            self.add_signal(self.invitationReceived, WPAS_DBUS_IFACE_P2PDEVICE,
4200                            "InvitationReceived")
4201            self.loop.run()
4202            return self
4203
4204        def invitationReceived(self, result):
4205            logger.debug("invitationReceived: " + str(result))
4206            self.done = True
4207            self.loop.quit()
4208
4209        def run_test(self, *args):
4210            logger.debug("run_test")
4211            dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4212            cmd = "P2P_INVITE persistent=" + peer['persistent'] + " peer=" + addr0
4213            dev1.global_request(cmd)
4214            return False
4215
4216        def success(self):
4217            return self.done
4218
4219    with TestDbusP2p(bus) as t:
4220        if not t.success():
4221            raise Exception("Expected signals not seen")
4222
4223    dev[0].p2p_stop_find()
4224    dev[1].p2p_stop_find()
4225
4226def test_dbus_p2p_config(dev, apdev):
4227    """D-Bus Get/Set P2PDeviceConfig"""
4228    try:
4229        _test_dbus_p2p_config(dev, apdev)
4230    finally:
4231        dev[0].request("P2P_SET ssid_postfix ")
4232
4233def _test_dbus_p2p_config(dev, apdev):
4234    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
4235    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4236
4237    res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4238                     dbus_interface=dbus.PROPERTIES_IFACE,
4239                     byte_arrays=True)
4240    if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", res,
4241               dbus_interface=dbus.PROPERTIES_IFACE)
4242    res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4243                      dbus_interface=dbus.PROPERTIES_IFACE,
4244                      byte_arrays=True)
4245
4246    if len(res) != len(res2):
4247        raise Exception("Different number of parameters")
4248    for k in res:
4249        if res[k] != res2[k]:
4250            raise Exception("Parameter %s value changes" % k)
4251
4252    changes = {'SsidPostfix': 'foo',
4253               'VendorExtension': [dbus.ByteArray(b'\x11\x22\x33\x44')],
4254               'SecondaryDeviceTypes': [dbus.ByteArray(b'\x11\x22\x33\x44\x55\x66\x77\x88')]}
4255    if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4256               dbus.Dictionary(changes, signature='sv'),
4257               dbus_interface=dbus.PROPERTIES_IFACE)
4258
4259    res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4260                      dbus_interface=dbus.PROPERTIES_IFACE,
4261                      byte_arrays=True)
4262    logger.debug("P2PDeviceConfig: " + str(res2))
4263    if 'VendorExtension' not in res2 or len(res2['VendorExtension']) != 1:
4264        raise Exception("VendorExtension does not match")
4265    if 'SecondaryDeviceTypes' not in res2 or len(res2['SecondaryDeviceTypes']) != 1:
4266        raise Exception("SecondaryDeviceType does not match")
4267
4268    changes = {'SsidPostfix': '',
4269               'VendorExtension': dbus.Array([], signature="ay"),
4270               'SecondaryDeviceTypes': dbus.Array([], signature="ay")}
4271    if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4272               dbus.Dictionary(changes, signature='sv'),
4273               dbus_interface=dbus.PROPERTIES_IFACE)
4274
4275    res3 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4276                      dbus_interface=dbus.PROPERTIES_IFACE,
4277                      byte_arrays=True)
4278    logger.debug("P2PDeviceConfig: " + str(res3))
4279    if 'VendorExtension' in res3:
4280        raise Exception("VendorExtension not removed")
4281    if 'SecondaryDeviceTypes' in res3:
4282        raise Exception("SecondaryDeviceType not removed")
4283
4284    try:
4285        dev[0].request("P2P_SET disabled 1")
4286        if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4287                   dbus_interface=dbus.PROPERTIES_IFACE,
4288                   byte_arrays=True)
4289        raise Exception("Invalid Get(P2PDeviceConfig) accepted")
4290    except dbus.exceptions.DBusException as e:
4291        if "Error.Failed: P2P is not available for this interface" not in str(e):
4292            raise Exception("Unexpected error message for invalid Invite: " + str(e))
4293    finally:
4294        dev[0].request("P2P_SET disabled 0")
4295
4296    try:
4297        dev[0].request("P2P_SET disabled 1")
4298        changes = {'SsidPostfix': 'foo'}
4299        if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4300                   dbus.Dictionary(changes, signature='sv'),
4301                   dbus_interface=dbus.PROPERTIES_IFACE)
4302        raise Exception("Invalid Set(P2PDeviceConfig) accepted")
4303    except dbus.exceptions.DBusException as e:
4304        if "Error.Failed: P2P is not available for this interface" not in str(e):
4305            raise Exception("Unexpected error message for invalid Invite: " + str(e))
4306    finally:
4307        dev[0].request("P2P_SET disabled 0")
4308
4309    tests = [{'DeviceName': 123},
4310             {'SsidPostfix': 123},
4311             {'Foo': 'Bar'}]
4312    for changes in tests:
4313        try:
4314            if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4315                       dbus.Dictionary(changes, signature='sv'),
4316                       dbus_interface=dbus.PROPERTIES_IFACE)
4317            raise Exception("Invalid Set(P2PDeviceConfig) accepted")
4318        except dbus.exceptions.DBusException as e:
4319            if "InvalidArgs" not in str(e):
4320                raise Exception("Unexpected error message for invalid Invite: " + str(e))
4321
4322def test_dbus_p2p_persistent(dev, apdev):
4323    """D-Bus P2P persistent group"""
4324    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
4325    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4326
4327    class TestDbusP2p(TestDbus):
4328        def __init__(self, bus):
4329            TestDbus.__init__(self, bus)
4330
4331        def __enter__(self):
4332            gobject.timeout_add(1, self.run_test)
4333            gobject.timeout_add(15000, self.timeout)
4334            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4335                            "GroupStarted")
4336            self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4337                            "GroupFinished")
4338            self.add_signal(self.persistentGroupAdded,
4339                            WPAS_DBUS_IFACE_P2PDEVICE,
4340                            "PersistentGroupAdded")
4341            self.loop.run()
4342            return self
4343
4344        def groupStarted(self, properties):
4345            logger.debug("groupStarted: " + str(properties))
4346            g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4347                                      properties['interface_object'])
4348            group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4349            group_p2p.Disconnect()
4350
4351        def groupFinished(self, properties):
4352            logger.debug("groupFinished: " + str(properties))
4353            self.loop.quit()
4354
4355        def persistentGroupAdded(self, path, properties):
4356            logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
4357            self.persistent = path
4358
4359        def run_test(self, *args):
4360            logger.debug("run_test")
4361            params = dbus.Dictionary({'persistent': True,
4362                                      'frequency': 2412})
4363            logger.info("Add a persistent group")
4364            p2p.GroupAdd(params)
4365            return False
4366
4367        def success(self):
4368            return True
4369
4370    with TestDbusP2p(bus) as t:
4371        if not t.success():
4372            raise Exception("Expected signals not seen")
4373        persistent = t.persistent
4374
4375    p_obj = bus.get_object(WPAS_DBUS_SERVICE, persistent)
4376    res = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties",
4377                    dbus_interface=dbus.PROPERTIES_IFACE, byte_arrays=True)
4378    logger.info("Persistent group Properties: " + str(res))
4379    vals = dbus.Dictionary({'ssid': 'DIRECT-foo'}, signature='sv')
4380    p_obj.Set(WPAS_DBUS_PERSISTENT_GROUP, "Properties", vals,
4381              dbus_interface=dbus.PROPERTIES_IFACE)
4382    res2 = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties",
4383                     dbus_interface=dbus.PROPERTIES_IFACE)
4384    if len(res) != len(res2):
4385        raise Exception("Different number of parameters")
4386    for k in res:
4387        if k != 'ssid' and res[k] != res2[k]:
4388            raise Exception("Parameter %s value changes" % k)
4389    if res2['ssid'] != '"DIRECT-foo"':
4390        raise Exception("Unexpected ssid")
4391
4392    args = dbus.Dictionary({'ssid': 'DIRECT-testing',
4393                            'psk': '1234567890'}, signature='sv')
4394    group = p2p.AddPersistentGroup(args)
4395
4396    groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups",
4397                        dbus_interface=dbus.PROPERTIES_IFACE)
4398    if len(groups) != 2:
4399        raise Exception("Unexpected number of persistent groups: " + str(groups))
4400
4401    p2p.RemoveAllPersistentGroups()
4402
4403    groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups",
4404                        dbus_interface=dbus.PROPERTIES_IFACE)
4405    if len(groups) != 0:
4406        raise Exception("Unexpected number of persistent groups: " + str(groups))
4407
4408    try:
4409        p2p.RemovePersistentGroup(persistent)
4410        raise Exception("Invalid RemovePersistentGroup accepted")
4411    except dbus.exceptions.DBusException as e:
4412        if "NetworkUnknown: There is no such persistent group" not in str(e):
4413            raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e))
4414
4415def test_dbus_p2p_reinvoke_persistent(dev, apdev):
4416    """D-Bus P2P reinvoke persistent group"""
4417    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
4418    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4419
4420    addr0 = dev[0].p2p_dev_addr()
4421
4422    class TestDbusP2p(TestDbus):
4423        def __init__(self, bus):
4424            TestDbus.__init__(self, bus)
4425            self.first = True
4426            self.waiting_end = False
4427            self.done = False
4428            self.invited = False
4429
4430        def __enter__(self):
4431            gobject.timeout_add(1, self.run_test)
4432            gobject.timeout_add(15000, self.timeout)
4433            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4434                            "DeviceFound")
4435            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4436                            "GroupStarted")
4437            self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4438                            "GroupFinished")
4439            self.add_signal(self.persistentGroupAdded,
4440                            WPAS_DBUS_IFACE_P2PDEVICE,
4441                            "PersistentGroupAdded")
4442            self.add_signal(self.provisionDiscoveryRequestDisplayPin,
4443                            WPAS_DBUS_IFACE_P2PDEVICE,
4444                            "ProvisionDiscoveryRequestDisplayPin")
4445            self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
4446                            "StaAuthorized")
4447            self.loop.run()
4448            return self
4449
4450        def groupStarted(self, properties):
4451            logger.debug("groupStarted: " + str(properties))
4452            self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4453                                           properties['interface_object'])
4454            if not self.invited:
4455                g_obj = bus.get_object(WPAS_DBUS_SERVICE,
4456                                       properties['group_object'])
4457                res = g_obj.GetAll(WPAS_DBUS_GROUP,
4458                                   dbus_interface=dbus.PROPERTIES_IFACE,
4459                                   byte_arrays=True)
4460                bssid = ':'.join(["%02x" % i for i in struct.unpack('6B', res['BSSID'])])
4461                dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4462                dev1.scan_for_bss(bssid, freq=2412)
4463                dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join")
4464
4465        def groupFinished(self, properties):
4466            logger.debug("groupFinished: " + str(properties))
4467            if self.invited:
4468                self.done = True
4469                self.loop.quit()
4470            else:
4471                dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4472                dev1.global_request("SET persistent_reconnect 1")
4473                dev1.p2p_listen()
4474
4475                args = {'persistent_group_object': dbus.ObjectPath(path),
4476                        'peer': self.peer_path}
4477                try:
4478                    pin = p2p.Invite(args)
4479                    raise Exception("Invalid Invite accepted")
4480                except dbus.exceptions.DBusException as e:
4481                    if "InvalidArgs" not in str(e):
4482                        raise Exception("Unexpected error message for invalid Invite: " + str(e))
4483
4484                args = {'persistent_group_object': self.persistent,
4485                        'peer': self.peer_path}
4486                pin = p2p.Invite(args)
4487                self.invited = True
4488
4489                self.sta_group_ev = dev1.wait_global_event(["P2P-GROUP-STARTED"],
4490                                                           timeout=15)
4491                if self.sta_group_ev is None:
4492                    raise Exception("P2P-GROUP-STARTED event not seen")
4493
4494        def persistentGroupAdded(self, path, properties):
4495            logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
4496            self.persistent = path
4497
4498        def deviceFound(self, path):
4499            logger.debug("deviceFound: path=%s" % path)
4500            peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
4501            self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
4502                                        dbus_interface=dbus.PROPERTIES_IFACE,
4503                                        byte_arrays=True)
4504
4505        def provisionDiscoveryRequestDisplayPin(self, peer_object, pin):
4506            logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin))
4507            self.peer_path = peer_object
4508            peer = binascii.unhexlify(peer_object.split('/')[-1])
4509            addr = ':'.join(["%02x" % i for i in struct.unpack('6B', peer)])
4510            params = {'Role': 'registrar',
4511                      'P2PDeviceAddress': self.peer['DeviceAddress'],
4512                      'Bssid': self.peer['DeviceAddress'],
4513                      'Type': 'pin',
4514                      'Pin': '12345670'}
4515            logger.info("Authorize peer to connect to the group")
4516            dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4517            wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
4518            wps.Start(params)
4519            self.sta_group_ev = dev1.wait_global_event(["P2P-GROUP-STARTED"],
4520                                                       timeout=15)
4521            if self.sta_group_ev is None:
4522                raise Exception("P2P-GROUP-STARTED event not seen")
4523
4524        def staAuthorized(self, name):
4525            logger.debug("staAuthorized: " + name)
4526            dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4527            dev1.group_form_result(self.sta_group_ev)
4528            dev1.remove_group()
4529            ev = dev1.wait_global_event(["P2P-GROUP-REMOVED"], timeout=10)
4530            if ev is None:
4531                raise Exception("Group removal timed out")
4532            group_p2p = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4533            group_p2p.Disconnect()
4534
4535        def run_test(self, *args):
4536            logger.debug("run_test")
4537            params = dbus.Dictionary({'persistent': True,
4538                                      'frequency': 2412})
4539            logger.info("Add a persistent group")
4540            p2p.GroupAdd(params)
4541            return False
4542
4543        def success(self):
4544            return self.done
4545
4546    with TestDbusP2p(bus) as t:
4547        if not t.success():
4548            raise Exception("Expected signals not seen")
4549
4550def test_dbus_p2p_go_neg_rx(dev, apdev):
4551    """D-Bus P2P GO Negotiation receive"""
4552    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
4553    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4554    addr0 = dev[0].p2p_dev_addr()
4555
4556    class TestDbusP2p(TestDbus):
4557        def __init__(self, bus):
4558            TestDbus.__init__(self, bus)
4559            self.done = False
4560
4561        def __enter__(self):
4562            gobject.timeout_add(1, self.run_test)
4563            gobject.timeout_add(15000, self.timeout)
4564            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4565                            "DeviceFound")
4566            self.add_signal(self.goNegotiationRequest,
4567                            WPAS_DBUS_IFACE_P2PDEVICE,
4568                            "GONegotiationRequest",
4569                            byte_arrays=True)
4570            self.add_signal(self.goNegotiationSuccess,
4571                            WPAS_DBUS_IFACE_P2PDEVICE,
4572                            "GONegotiationSuccess",
4573                            byte_arrays=True)
4574            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4575                            "GroupStarted")
4576            self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4577                            "GroupFinished")
4578            self.loop.run()
4579            return self
4580
4581        def deviceFound(self, path):
4582            logger.debug("deviceFound: path=%s" % path)
4583
4584        def goNegotiationRequest(self, path, dev_passwd_id, go_intent=0):
4585            logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path, dev_passwd_id, go_intent))
4586            if dev_passwd_id != 1:
4587                raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id)
4588            args = {'peer': path, 'wps_method': 'display', 'pin': '12345670',
4589                    'go_intent': 15, 'persistent': False, 'frequency': 5175}
4590            try:
4591                p2p.Connect(args)
4592                raise Exception("Invalid Connect accepted")
4593            except dbus.exceptions.DBusException as e:
4594                if "ConnectChannelUnsupported" not in str(e):
4595                    raise Exception("Unexpected error message for invalid Connect: " + str(e))
4596
4597            args = {'peer': path, 'wps_method': 'display', 'pin': '12345670',
4598                    'go_intent': 15, 'persistent': False}
4599            p2p.Connect(args)
4600
4601        def goNegotiationSuccess(self, properties):
4602            logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4603
4604        def groupStarted(self, properties):
4605            logger.debug("groupStarted: " + str(properties))
4606            g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4607                                      properties['interface_object'])
4608            group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4609            group_p2p.Disconnect()
4610
4611        def groupFinished(self, properties):
4612            logger.debug("groupFinished: " + str(properties))
4613            self.done = True
4614            self.loop.quit()
4615
4616        def run_test(self, *args):
4617            logger.debug("run_test")
4618            p2p.Listen(10)
4619            dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4620            if not dev1.discover_peer(addr0):
4621                raise Exception("Peer not found")
4622            dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 enter")
4623            return False
4624
4625        def success(self):
4626            return self.done
4627
4628    with TestDbusP2p(bus) as t:
4629        if not t.success():
4630            raise Exception("Expected signals not seen")
4631
4632def test_dbus_p2p_go_neg_auth(dev, apdev):
4633    """D-Bus P2P GO Negotiation authorized"""
4634    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
4635    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4636    addr0 = dev[0].p2p_dev_addr()
4637    dev[1].p2p_listen()
4638
4639    class TestDbusP2p(TestDbus):
4640        def __init__(self, bus):
4641            TestDbus.__init__(self, bus)
4642            self.done = False
4643            self.peer_joined = False
4644            self.peer_disconnected = False
4645
4646        def __enter__(self):
4647            gobject.timeout_add(1, self.run_test)
4648            gobject.timeout_add(15000, self.timeout)
4649            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4650                            "DeviceFound")
4651            self.add_signal(self.goNegotiationSuccess,
4652                            WPAS_DBUS_IFACE_P2PDEVICE,
4653                            "GONegotiationSuccess",
4654                            byte_arrays=True)
4655            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4656                            "GroupStarted")
4657            self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4658                            "GroupFinished")
4659            self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE,
4660                            "StaDeauthorized")
4661            self.add_signal(self.peerJoined, WPAS_DBUS_GROUP,
4662                            "PeerJoined")
4663            self.add_signal(self.peerDisconnected, WPAS_DBUS_GROUP,
4664                            "PeerDisconnected")
4665            self.loop.run()
4666            return self
4667
4668        def deviceFound(self, path):
4669            logger.debug("deviceFound: path=%s" % path)
4670            args = {'peer': path, 'wps_method': 'keypad',
4671                    'go_intent': 15, 'authorize_only': True}
4672            try:
4673                p2p.Connect(args)
4674                raise Exception("Invalid Connect accepted")
4675            except dbus.exceptions.DBusException as e:
4676                if "InvalidArgs" not in str(e):
4677                    raise Exception("Unexpected error message for invalid Connect: " + str(e))
4678
4679            args = {'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4680                    'go_intent': 15, 'authorize_only': True}
4681            p2p.Connect(args)
4682            p2p.Listen(10)
4683            dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4684            if not dev1.discover_peer(addr0):
4685                raise Exception("Peer not found")
4686            dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=0")
4687            ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
4688            if ev is None:
4689                raise Exception("Group formation timed out")
4690            self.sta_group_ev = ev
4691
4692        def goNegotiationSuccess(self, properties):
4693            logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4694
4695        def groupStarted(self, properties):
4696            logger.debug("groupStarted: " + str(properties))
4697            self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4698                                           properties['interface_object'])
4699            dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4700            dev1.group_form_result(self.sta_group_ev)
4701            dev1.remove_group()
4702
4703        def staDeauthorized(self, name):
4704            logger.debug("staDeuthorized: " + name)
4705            group_p2p = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4706            group_p2p.Disconnect()
4707
4708        def peerJoined(self, peer):
4709            logger.debug("peerJoined: " + peer)
4710            self.peer_joined = True
4711
4712        def peerDisconnected(self, peer):
4713            logger.debug("peerDisconnected: " + peer)
4714            self.peer_disconnected = True
4715
4716        def groupFinished(self, properties):
4717            logger.debug("groupFinished: " + str(properties))
4718            self.done = True
4719            self.loop.quit()
4720
4721        def run_test(self, *args):
4722            logger.debug("run_test")
4723            p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4724            return False
4725
4726        def success(self):
4727            return self.done and self.peer_joined and self.peer_disconnected
4728
4729    with TestDbusP2p(bus) as t:
4730        if not t.success():
4731            raise Exception("Expected signals not seen")
4732
4733def test_dbus_p2p_go_neg_init(dev, apdev):
4734    """D-Bus P2P GO Negotiation initiation"""
4735    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
4736    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4737    addr0 = dev[0].p2p_dev_addr()
4738    dev[1].p2p_listen()
4739
4740    class TestDbusP2p(TestDbus):
4741        def __init__(self, bus):
4742            TestDbus.__init__(self, bus)
4743            self.done = False
4744            self.peer_group_added = False
4745            self.peer_group_removed = False
4746
4747        def __enter__(self):
4748            gobject.timeout_add(1, self.run_test)
4749            gobject.timeout_add(15000, self.timeout)
4750            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4751                            "DeviceFound")
4752            self.add_signal(self.goNegotiationSuccess,
4753                            WPAS_DBUS_IFACE_P2PDEVICE,
4754                            "GONegotiationSuccess",
4755                            byte_arrays=True)
4756            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4757                            "GroupStarted")
4758            self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4759                            "GroupFinished")
4760            self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4761                            "PropertiesChanged")
4762            self.loop.run()
4763            return self
4764
4765        def deviceFound(self, path):
4766            logger.debug("deviceFound: path=%s" % path)
4767            dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4768            args = {'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4769                    'go_intent': 0}
4770            p2p.Connect(args)
4771
4772            ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4773            if ev is None:
4774                raise Exception("Timeout while waiting for GO Neg Request")
4775            dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
4776            ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
4777            if ev is None:
4778                raise Exception("Group formation timed out")
4779            self.sta_group_ev = ev
4780            dev1.close_monitor_global()
4781            dev1.close_monitor_mon()
4782            dev1 = None
4783
4784        def goNegotiationSuccess(self, properties):
4785            logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4786
4787        def groupStarted(self, properties):
4788            logger.debug("groupStarted: " + str(properties))
4789            g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4790                                      properties['interface_object'])
4791            group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4792            group_p2p.Disconnect()
4793            dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1', monitor=False)
4794            dev1.group_form_result(self.sta_group_ev)
4795            dev1.remove_group()
4796            dev1 = None
4797
4798        def groupFinished(self, properties):
4799            logger.debug("groupFinished: " + str(properties))
4800            self.done = True
4801
4802        def propertiesChanged(self, interface_name, changed_properties,
4803                              invalidated_properties):
4804            logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4805            if interface_name != WPAS_DBUS_P2P_PEER:
4806                return
4807            if "Groups" not in changed_properties:
4808                return
4809            if len(changed_properties["Groups"]) > 0:
4810                self.peer_group_added = True
4811            if len(changed_properties["Groups"]) == 0:
4812                if not self.peer_group_added:
4813                    # This is likely a leftover event from an earlier test case,
4814                    # ignore it to allow this test case to go through its steps.
4815                    logger.info("Ignore propertiesChanged indicating group removal before group has been added")
4816                    return
4817                self.peer_group_removed = True
4818                self.loop.quit()
4819
4820        def run_test(self, *args):
4821            logger.debug("run_test")
4822            p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4823            return False
4824
4825        def success(self):
4826            return self.done and self.peer_group_added and self.peer_group_removed
4827
4828    with TestDbusP2p(bus) as t:
4829        if not t.success():
4830            raise Exception("Expected signals not seen")
4831
4832def test_dbus_p2p_group_termination_by_go(dev, apdev):
4833    """D-Bus P2P group removal on GO terminating the group"""
4834    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
4835    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4836    addr0 = dev[0].p2p_dev_addr()
4837    dev[1].p2p_listen()
4838
4839    class TestDbusP2p(TestDbus):
4840        def __init__(self, bus):
4841            TestDbus.__init__(self, bus)
4842            self.done = False
4843            self.peer_group_added = False
4844            self.peer_group_removed = False
4845
4846        def __enter__(self):
4847            gobject.timeout_add(1, self.run_test)
4848            gobject.timeout_add(15000, self.timeout)
4849            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4850                            "DeviceFound")
4851            self.add_signal(self.goNegotiationSuccess,
4852                            WPAS_DBUS_IFACE_P2PDEVICE,
4853                            "GONegotiationSuccess",
4854                            byte_arrays=True)
4855            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4856                            "GroupStarted")
4857            self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4858                            "GroupFinished")
4859            self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4860                            "PropertiesChanged")
4861            self.loop.run()
4862            return self
4863
4864        def deviceFound(self, path):
4865            logger.debug("deviceFound: path=%s" % path)
4866            dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4867            args = {'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4868                    'go_intent': 0}
4869            p2p.Connect(args)
4870
4871            ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4872            if ev is None:
4873                raise Exception("Timeout while waiting for GO Neg Request")
4874            dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
4875            ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
4876            if ev is None:
4877                raise Exception("Group formation timed out")
4878            self.sta_group_ev = ev
4879            dev1.close_monitor_global()
4880            dev1.close_monitor_mon()
4881            dev1 = None
4882
4883        def goNegotiationSuccess(self, properties):
4884            logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4885
4886        def groupStarted(self, properties):
4887            logger.debug("groupStarted: " + str(properties))
4888            g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4889                                      properties['interface_object'])
4890            dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1', monitor=False)
4891            dev1.group_form_result(self.sta_group_ev)
4892            dev1.remove_group()
4893
4894        def groupFinished(self, properties):
4895            logger.debug("groupFinished: " + str(properties))
4896            self.done = True
4897
4898        def propertiesChanged(self, interface_name, changed_properties,
4899                              invalidated_properties):
4900            logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4901            if interface_name != WPAS_DBUS_P2P_PEER:
4902                return
4903            if "Groups" not in changed_properties:
4904                return
4905            if len(changed_properties["Groups"]) > 0:
4906                self.peer_group_added = True
4907            if len(changed_properties["Groups"]) == 0 and self.peer_group_added:
4908                self.peer_group_removed = True
4909                self.loop.quit()
4910
4911        def run_test(self, *args):
4912            logger.debug("run_test")
4913            p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4914            return False
4915
4916        def success(self):
4917            return self.done and self.peer_group_added and self.peer_group_removed
4918
4919    with TestDbusP2p(bus) as t:
4920        if not t.success():
4921            raise Exception("Expected signals not seen")
4922
4923def test_dbus_p2p_group_idle_timeout(dev, apdev):
4924    """D-Bus P2P group removal on idle timeout"""
4925    try:
4926        dev[0].global_request("SET p2p_group_idle 1")
4927        _test_dbus_p2p_group_idle_timeout(dev, apdev)
4928    finally:
4929        dev[0].global_request("SET p2p_group_idle 0")
4930
4931def _test_dbus_p2p_group_idle_timeout(dev, apdev):
4932    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
4933    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4934    addr0 = dev[0].p2p_dev_addr()
4935    dev[1].p2p_listen()
4936
4937    class TestDbusP2p(TestDbus):
4938        def __init__(self, bus):
4939            TestDbus.__init__(self, bus)
4940            self.done = False
4941            self.group_started = False
4942            self.peer_group_added = False
4943            self.peer_group_removed = False
4944
4945        def __enter__(self):
4946            gobject.timeout_add(1, self.run_test)
4947            gobject.timeout_add(15000, self.timeout)
4948            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4949                            "DeviceFound")
4950            self.add_signal(self.goNegotiationSuccess,
4951                            WPAS_DBUS_IFACE_P2PDEVICE,
4952                            "GONegotiationSuccess",
4953                            byte_arrays=True)
4954            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4955                            "GroupStarted")
4956            self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4957                            "GroupFinished")
4958            self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4959                            "PropertiesChanged")
4960            self.loop.run()
4961            return self
4962
4963        def deviceFound(self, path):
4964            logger.debug("deviceFound: path=%s" % path)
4965            dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4966            args = {'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4967                    'go_intent': 0}
4968            p2p.Connect(args)
4969
4970            ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4971            if ev is None:
4972                raise Exception("Timeout while waiting for GO Neg Request")
4973            dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
4974            ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
4975            if ev is None:
4976                raise Exception("Group formation timed out")
4977            self.sta_group_ev = ev
4978            dev1.close_monitor_global()
4979            dev1.close_monitor_mon()
4980            dev1 = None
4981
4982        def goNegotiationSuccess(self, properties):
4983            logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4984
4985        def groupStarted(self, properties):
4986            logger.debug("groupStarted: " + str(properties))
4987            self.group_started = True
4988            g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4989                                      properties['interface_object'])
4990            dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1', monitor=False)
4991            dev1.group_form_result(self.sta_group_ev)
4992            ifaddr = dev1.group_request("STA-FIRST").splitlines()[0]
4993            # Force disassociation with different reason code so that the
4994            # P2P Client using D-Bus does not get normal group termination event
4995            # from the GO.
4996            dev1.group_request("DEAUTHENTICATE " + ifaddr + " reason=0 test=0")
4997            dev1.remove_group()
4998
4999        def groupFinished(self, properties):
5000            logger.debug("groupFinished: " + str(properties))
5001            self.done = True
5002
5003        def propertiesChanged(self, interface_name, changed_properties,
5004                              invalidated_properties):
5005            logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
5006            if interface_name != WPAS_DBUS_P2P_PEER:
5007                return
5008            if not self.group_started:
5009                return
5010            if "Groups" not in changed_properties:
5011                return
5012            if len(changed_properties["Groups"]) > 0:
5013                self.peer_group_added = True
5014            if len(changed_properties["Groups"]) == 0:
5015                self.peer_group_removed = True
5016                self.loop.quit()
5017
5018        def run_test(self, *args):
5019            logger.debug("run_test")
5020            p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
5021            return False
5022
5023        def success(self):
5024            return self.done and self.peer_group_added and self.peer_group_removed
5025
5026    with TestDbusP2p(bus) as t:
5027        if not t.success():
5028            raise Exception("Expected signals not seen")
5029
5030def test_dbus_p2p_wps_failure(dev, apdev):
5031    """D-Bus P2P WPS failure"""
5032    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5033    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
5034    addr0 = dev[0].p2p_dev_addr()
5035
5036    class TestDbusP2p(TestDbus):
5037        def __init__(self, bus):
5038            TestDbus.__init__(self, bus)
5039            self.wps_failed = False
5040            self.formation_failure = False
5041
5042        def __enter__(self):
5043            gobject.timeout_add(1, self.run_test)
5044            gobject.timeout_add(15000, self.timeout)
5045            self.add_signal(self.goNegotiationRequest,
5046                            WPAS_DBUS_IFACE_P2PDEVICE,
5047                            "GONegotiationRequest",
5048                            byte_arrays=True)
5049            self.add_signal(self.goNegotiationSuccess,
5050                            WPAS_DBUS_IFACE_P2PDEVICE,
5051                            "GONegotiationSuccess",
5052                            byte_arrays=True)
5053            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
5054                            "GroupStarted")
5055            self.add_signal(self.wpsFailed, WPAS_DBUS_IFACE_P2PDEVICE,
5056                            "WpsFailed")
5057            self.add_signal(self.groupFormationFailure,
5058                            WPAS_DBUS_IFACE_P2PDEVICE,
5059                            "GroupFormationFailure")
5060            self.loop.run()
5061            return self
5062
5063        def goNegotiationRequest(self, path, dev_passwd_id, go_intent=0):
5064            logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path, dev_passwd_id, go_intent))
5065            if dev_passwd_id != 1:
5066                raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id)
5067            args = {'peer': path, 'wps_method': 'display', 'pin': '12345670',
5068                    'go_intent': 15}
5069            p2p.Connect(args)
5070
5071        def goNegotiationSuccess(self, properties):
5072            logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
5073
5074        def groupStarted(self, properties):
5075            logger.debug("groupStarted: " + str(properties))
5076            raise Exception("Unexpected GroupStarted")
5077
5078        def wpsFailed(self, name, args):
5079            logger.debug("wpsFailed - name=%s args=%s" % (name, str(args)))
5080            self.wps_failed = True
5081            if self.formation_failure:
5082                self.loop.quit()
5083
5084        def groupFormationFailure(self, reason):
5085            logger.debug("groupFormationFailure - reason=%s" % reason)
5086            self.formation_failure = True
5087            if self.wps_failed:
5088                self.loop.quit()
5089
5090        def run_test(self, *args):
5091            logger.debug("run_test")
5092            p2p.Listen(10)
5093            dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
5094            if not dev1.discover_peer(addr0):
5095                raise Exception("Peer not found")
5096            dev1.global_request("P2P_CONNECT " + addr0 + " 87654321 enter")
5097            return False
5098
5099        def success(self):
5100            return self.wps_failed and self.formation_failure
5101
5102    with TestDbusP2p(bus) as t:
5103        if not t.success():
5104            raise Exception("Expected signals not seen")
5105
5106def test_dbus_p2p_two_groups(dev, apdev):
5107    """D-Bus P2P with two concurrent groups"""
5108    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5109    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
5110
5111    dev[0].request("SET p2p_no_group_iface 0")
5112    addr0 = dev[0].p2p_dev_addr()
5113    addr1 = dev[1].p2p_dev_addr()
5114    addr2 = dev[2].p2p_dev_addr()
5115    dev[1].p2p_start_go(freq=2412)
5116    dev1_group_ifname = dev[1].group_ifname
5117
5118    class TestDbusP2p(TestDbus):
5119        def __init__(self, bus):
5120            TestDbus.__init__(self, bus)
5121            self.done = False
5122            self.peer = None
5123            self.go = None
5124            self.group1 = None
5125            self.group2 = None
5126            self.groups_removed = False
5127
5128        def __enter__(self):
5129            gobject.timeout_add(1, self.run_test)
5130            gobject.timeout_add(15000, self.timeout)
5131            self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
5132                            "PropertiesChanged", byte_arrays=True)
5133            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
5134                            "DeviceFound")
5135            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
5136                            "GroupStarted")
5137            self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
5138                            "GroupFinished")
5139            self.add_signal(self.peerJoined, WPAS_DBUS_GROUP,
5140                            "PeerJoined")
5141            self.loop.run()
5142            return self
5143
5144        def propertiesChanged(self, interface_name, changed_properties,
5145                              invalidated_properties):
5146            logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
5147
5148        def deviceFound(self, path):
5149            logger.debug("deviceFound: path=%s" % path)
5150            if addr2.replace(':', '') in path:
5151                self.peer = path
5152            elif addr1.replace(':', '') in path:
5153                self.go = path
5154            if self.go and not self.group1:
5155                logger.info("Join the group")
5156                p2p.StopFind()
5157                pin = '12345670'
5158                dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
5159                dev1.group_ifname = dev1_group_ifname
5160                dev1.group_request("WPS_PIN any " + pin)
5161                args = {'peer': self.go,
5162                        'join': True,
5163                        'wps_method': 'pin',
5164                        'pin': pin,
5165                        'frequency': 2412}
5166                p2p.Connect(args)
5167
5168        def groupStarted(self, properties):
5169            logger.debug("groupStarted: " + str(properties))
5170            prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
5171                                 dbus_interface=dbus.PROPERTIES_IFACE)
5172            logger.debug("p2pdevice properties: " + str(prop))
5173
5174            g_obj = bus.get_object(WPAS_DBUS_SERVICE,
5175                                   properties['group_object'])
5176            res = g_obj.GetAll(WPAS_DBUS_GROUP,
5177                               dbus_interface=dbus.PROPERTIES_IFACE,
5178                               byte_arrays=True)
5179            logger.debug("Group properties: " + str(res))
5180
5181            if not self.group1:
5182                self.group1 = properties['group_object']
5183                self.group1iface = properties['interface_object']
5184                self.g1_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
5185                                                self.group1iface)
5186
5187                logger.info("Start autonomous GO")
5188                params = dbus.Dictionary({'frequency': 2412})
5189                p2p.GroupAdd(params)
5190            elif not self.group2:
5191                self.group2 = properties['group_object']
5192                self.group2iface = properties['interface_object']
5193                self.g2_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
5194                                                self.group2iface)
5195                self.g2_bssid = res['BSSID']
5196
5197            if self.group1 and self.group2:
5198                logger.info("Authorize peer to join the group")
5199                a2 = binascii.unhexlify(addr2.replace(':', ''))
5200                params = {'Role': 'enrollee',
5201                          'P2PDeviceAddress': dbus.ByteArray(a2),
5202                          'Bssid': dbus.ByteArray(a2),
5203                          'Type': 'pin',
5204                          'Pin': '12345670'}
5205                g_wps = dbus.Interface(self.g2_if_obj, WPAS_DBUS_IFACE_WPS)
5206                g_wps.Start(params)
5207
5208                bssid = ':'.join(["%02x" % i for i in struct.unpack('6B', self.g2_bssid)])
5209                dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
5210                dev2.scan_for_bss(bssid, freq=2412)
5211                dev2.global_request("P2P_CONNECT " + bssid + " 12345670 join freq=2412")
5212                ev = dev2.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
5213                if ev is None:
5214                    raise Exception("Group join timed out")
5215                self.dev2_group_ev = ev
5216
5217        def groupFinished(self, properties):
5218            logger.debug("groupFinished: " + str(properties))
5219
5220            if self.group1 == properties['group_object']:
5221                self.group1 = None
5222            elif self.group2 == properties['group_object']:
5223                self.group2 = None
5224
5225            if not self.group1 and not self.group2:
5226                self.done = True
5227                self.loop.quit()
5228
5229        def peerJoined(self, peer):
5230            logger.debug("peerJoined: " + peer)
5231            if self.groups_removed:
5232                return
5233            self.check_results()
5234
5235            dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
5236            dev2.group_form_result(self.dev2_group_ev)
5237            dev2.remove_group()
5238
5239            logger.info("Disconnect group2")
5240            group_p2p = dbus.Interface(self.g2_if_obj,
5241                                       WPAS_DBUS_IFACE_P2PDEVICE)
5242            group_p2p.Disconnect()
5243
5244            logger.info("Disconnect group1")
5245            group_p2p = dbus.Interface(self.g1_if_obj,
5246                                       WPAS_DBUS_IFACE_P2PDEVICE)
5247            group_p2p.Disconnect()
5248            self.groups_removed = True
5249
5250        def check_results(self):
5251            logger.info("Check results with two concurrent groups in operation")
5252
5253            g1_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group1)
5254            res1 = g1_obj.GetAll(WPAS_DBUS_GROUP,
5255                                 dbus_interface=dbus.PROPERTIES_IFACE,
5256                                 byte_arrays=True)
5257
5258            g2_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group2)
5259            res2 = g2_obj.GetAll(WPAS_DBUS_GROUP,
5260                                 dbus_interface=dbus.PROPERTIES_IFACE,
5261                                 byte_arrays=True)
5262
5263            logger.info("group1 = " + self.group1)
5264            logger.debug("Group properties: " + str(res1))
5265
5266            logger.info("group2 = " + self.group2)
5267            logger.debug("Group properties: " + str(res2))
5268
5269            prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
5270                                 dbus_interface=dbus.PROPERTIES_IFACE)
5271            logger.debug("p2pdevice properties: " + str(prop))
5272
5273            if res1['Role'] != 'client':
5274                raise Exception("Group1 role reported incorrectly: " + res1['Role'])
5275            if res2['Role'] != 'GO':
5276                raise Exception("Group2 role reported incorrectly: " + res2['Role'])
5277            if prop['Role'] != 'device':
5278                raise Exception("p2pdevice role reported incorrectly: " + prop['Role'])
5279
5280            if len(res2['Members']) != 1:
5281                   raise Exception("Unexpected Members value for group 2")
5282
5283        def run_test(self, *args):
5284            logger.debug("run_test")
5285            p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
5286            return False
5287
5288        def success(self):
5289            return self.done
5290
5291    with TestDbusP2p(bus) as t:
5292        if not t.success():
5293            raise Exception("Expected signals not seen")
5294
5295    dev[1].remove_group()
5296
5297def test_dbus_p2p_cancel(dev, apdev):
5298    """D-Bus P2P Cancel"""
5299    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5300    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
5301    try:
5302        p2p.Cancel()
5303        raise Exception("Unexpected p2p.Cancel() success")
5304    except dbus.exceptions.DBusException as e:
5305        pass
5306
5307    addr0 = dev[0].p2p_dev_addr()
5308    dev[1].p2p_listen()
5309
5310    class TestDbusP2p(TestDbus):
5311        def __init__(self, bus):
5312            TestDbus.__init__(self, bus)
5313            self.done = False
5314
5315        def __enter__(self):
5316            gobject.timeout_add(1, self.run_test)
5317            gobject.timeout_add(15000, self.timeout)
5318            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
5319                            "DeviceFound")
5320            self.loop.run()
5321            return self
5322
5323        def deviceFound(self, path):
5324            logger.debug("deviceFound: path=%s" % path)
5325            args = {'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
5326                    'go_intent': 0}
5327            p2p.Connect(args)
5328            p2p.Cancel()
5329            self.done = True
5330            self.loop.quit()
5331
5332        def run_test(self, *args):
5333            logger.debug("run_test")
5334            p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
5335            return False
5336
5337        def success(self):
5338            return self.done
5339
5340    with TestDbusP2p(bus) as t:
5341        if not t.success():
5342            raise Exception("Expected signals not seen")
5343
5344def test_dbus_p2p_ip_addr(dev, apdev):
5345    """D-Bus P2P and IP address parameters"""
5346    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5347    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
5348
5349    vals = [("IpAddrGo", "192.168.43.1"),
5350            ("IpAddrMask", "255.255.255.0"),
5351            ("IpAddrStart", "192.168.43.100"),
5352            ("IpAddrEnd", "192.168.43.199")]
5353    for field, value in vals:
5354        if_obj.Set(WPAS_DBUS_IFACE, field, value,
5355                   dbus_interface=dbus.PROPERTIES_IFACE)
5356        val = if_obj.Get(WPAS_DBUS_IFACE, field,
5357                         dbus_interface=dbus.PROPERTIES_IFACE)
5358        if val != value:
5359            raise Exception("Unexpected %s value: %s" % (field, val))
5360
5361    set_ip_addr_info(dev[1])
5362
5363    dev[0].global_request("SET p2p_go_intent 0")
5364
5365    req = dev[0].global_request("NFC_GET_HANDOVER_REQ NDEF P2P-CR").rstrip()
5366    if "FAIL" in req:
5367        raise Exception("Failed to generate NFC connection handover request")
5368    sel = dev[1].global_request("NFC_GET_HANDOVER_SEL NDEF P2P-CR").rstrip()
5369    if "FAIL" in sel:
5370        raise Exception("Failed to generate NFC connection handover select")
5371    dev[0].dump_monitor()
5372    dev[1].dump_monitor()
5373    res = dev[1].global_request("NFC_REPORT_HANDOVER RESP P2P " + req + " " + sel)
5374    if "FAIL" in res:
5375        raise Exception("Failed to report NFC connection handover to wpa_supplicant(resp)")
5376    res = dev[0].global_request("NFC_REPORT_HANDOVER INIT P2P " + req + " " + sel)
5377    if "FAIL" in res:
5378        raise Exception("Failed to report NFC connection handover to wpa_supplicant(init)")
5379
5380    class TestDbusP2p(TestDbus):
5381        def __init__(self, bus):
5382            TestDbus.__init__(self, bus)
5383            self.done = False
5384
5385        def __enter__(self):
5386            gobject.timeout_add(1, self.run_test)
5387            gobject.timeout_add(15000, self.timeout)
5388            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
5389                            "GroupStarted")
5390            self.loop.run()
5391            return self
5392
5393        def groupStarted(self, properties):
5394            logger.debug("groupStarted: " + str(properties))
5395            self.loop.quit()
5396
5397            if 'IpAddrGo' not in properties:
5398                logger.info("IpAddrGo missing from GroupStarted")
5399            ip_addr_go = properties['IpAddrGo']
5400            addr = "%d.%d.%d.%d" % (ip_addr_go[0], ip_addr_go[1], ip_addr_go[2], ip_addr_go[3])
5401            if addr != "192.168.42.1":
5402                logger.info("Unexpected IpAddrGo value: " + addr)
5403            self.done = True
5404
5405        def run_test(self, *args):
5406            logger.debug("run_test")
5407            return False
5408
5409        def success(self):
5410            return self.done
5411
5412    with TestDbusP2p(bus) as t:
5413        if not t.success():
5414            raise Exception("Expected signals not seen")
5415
5416def test_dbus_introspect(dev, apdev):
5417    """D-Bus introspection"""
5418    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5419
5420    res = if_obj.Introspect(WPAS_DBUS_IFACE,
5421                            dbus_interface=dbus.INTROSPECTABLE_IFACE)
5422    logger.info("Initial Introspect: " + str(res))
5423    if res is None or "Introspectable" not in res or "GroupStarted" not in res:
5424        raise Exception("Unexpected initial Introspect response: " + str(res))
5425    if "FastReauth" not in res or "PassiveScan" not in res:
5426        raise Exception("Unexpected initial Introspect response: " + str(res))
5427
5428    with alloc_fail(dev[0], 1, "wpa_dbus_introspect"):
5429        res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
5430                                 dbus_interface=dbus.INTROSPECTABLE_IFACE)
5431        logger.info("Introspect: " + str(res2))
5432        if res2 is not None:
5433            raise Exception("Unexpected Introspect response")
5434
5435    with alloc_fail(dev[0], 1, "=add_interface;wpa_dbus_introspect"):
5436        res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
5437                                 dbus_interface=dbus.INTROSPECTABLE_IFACE)
5438        logger.info("Introspect: " + str(res2))
5439        if res2 is None:
5440            raise Exception("No Introspect response")
5441        if len(res2) >= len(res):
5442            raise Exception("Unexpected Introspect response")
5443
5444    with alloc_fail(dev[0], 1, "wpabuf_alloc;add_interface;wpa_dbus_introspect"):
5445        res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
5446                                 dbus_interface=dbus.INTROSPECTABLE_IFACE)
5447        logger.info("Introspect: " + str(res2))
5448        if res2 is None:
5449            raise Exception("No Introspect response")
5450        if len(res2) >= len(res):
5451            raise Exception("Unexpected Introspect response")
5452
5453    with alloc_fail(dev[0], 2, "=add_interface;wpa_dbus_introspect"):
5454        res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
5455                                 dbus_interface=dbus.INTROSPECTABLE_IFACE)
5456        logger.info("Introspect: " + str(res2))
5457        if res2 is None:
5458            raise Exception("No Introspect response")
5459        if len(res2) >= len(res):
5460            raise Exception("Unexpected Introspect response")
5461
5462def run_busctl(service, obj):
5463    if not shutil.which("busctl"):
5464        raise HwsimSkip("No busctl available")
5465    logger.info("busctl introspect %s %s" % (service, obj))
5466    cmd = subprocess.Popen(['busctl', 'introspect', service, obj],
5467                           stdout=subprocess.PIPE,
5468                           stderr=subprocess.PIPE)
5469    out = cmd.communicate()
5470    cmd.wait()
5471    logger.info("busctl stdout:\n%s" % out[0].strip())
5472    if len(out[1]) > 0:
5473        logger.info("busctl stderr: %s" % out[1].decode().strip())
5474    if "Duplicate property" in out[1].decode():
5475        raise Exception("Duplicate property")
5476
5477def test_dbus_introspect_busctl(dev, apdev):
5478    """D-Bus introspection with busctl"""
5479    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5480    ifaces = dbus_get(dbus, wpas_obj, "Interfaces")
5481    run_busctl(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH)
5482    run_busctl(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH + "/Interfaces")
5483    run_busctl(WPAS_DBUS_SERVICE, ifaces[0])
5484
5485    hapd = hostapd.add_ap(apdev[0], {"ssid": "open"})
5486    bssid = apdev[0]['bssid']
5487    dev[0].scan_for_bss(bssid, freq=2412)
5488    id = dev[0].add_network()
5489    dev[0].set_network(id, "disabled", "0")
5490    dev[0].set_network_quoted(id, "ssid", "test")
5491
5492    run_busctl(WPAS_DBUS_SERVICE, ifaces[0] + "/BSSs/0")
5493    run_busctl(WPAS_DBUS_SERVICE, ifaces[0] + "/Networks/0")
5494
5495def test_dbus_ap(dev, apdev):
5496    """D-Bus AddNetwork for AP mode"""
5497    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5498    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5499
5500    ssid = "test-wpa2-psk"
5501    passphrase = 'qwertyuiop'
5502
5503    class TestDbusConnect(TestDbus):
5504        def __init__(self, bus):
5505            TestDbus.__init__(self, bus)
5506            self.started = False
5507            self.sta_added = False
5508            self.sta_removed = False
5509            self.authorized = False
5510            self.deauthorized = False
5511            self.stations = False
5512
5513        def __enter__(self):
5514            gobject.timeout_add(1, self.run_connect)
5515            gobject.timeout_add(15000, self.timeout)
5516            self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
5517            self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
5518                            "NetworkSelected")
5519            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
5520                            "PropertiesChanged")
5521            self.add_signal(self.stationAdded, WPAS_DBUS_IFACE, "StationAdded")
5522            self.add_signal(self.stationRemoved, WPAS_DBUS_IFACE,
5523                            "StationRemoved")
5524            self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
5525                            "StaAuthorized")
5526            self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE,
5527                            "StaDeauthorized")
5528            self.loop.run()
5529            return self
5530
5531        def networkAdded(self, network, properties):
5532            logger.debug("networkAdded: %s" % str(network))
5533            logger.debug(str(properties))
5534
5535        def networkSelected(self, network):
5536            logger.debug("networkSelected: %s" % str(network))
5537            self.network_selected = True
5538
5539        def propertiesChanged(self, properties):
5540            logger.debug("propertiesChanged: %s" % str(properties))
5541            if 'State' in properties and properties['State'] == "completed":
5542                self.started = True
5543                dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
5544                dev1.connect(ssid, psk=passphrase, scan_freq="2412")
5545
5546        def stationAdded(self, station, properties):
5547            logger.debug("stationAdded: %s" % str(station))
5548            logger.debug(str(properties))
5549            self.sta_added = True
5550            res = if_obj.Get(WPAS_DBUS_IFACE, 'Stations',
5551                             dbus_interface=dbus.PROPERTIES_IFACE)
5552            logger.info("Stations: " + str(res))
5553            if len(res) == 1:
5554                self.stations = True
5555            else:
5556                raise Exception("Missing Stations entry: " + str(res))
5557
5558        def stationRemoved(self, station):
5559            logger.debug("stationRemoved: %s" % str(station))
5560            self.sta_removed = True
5561            res = if_obj.Get(WPAS_DBUS_IFACE, 'Stations',
5562                             dbus_interface=dbus.PROPERTIES_IFACE)
5563            logger.info("Stations: " + str(res))
5564            if len(res) != 0:
5565                self.stations = False
5566                raise Exception("Unexpected Stations entry: " + str(res))
5567            self.loop.quit()
5568
5569        def staAuthorized(self, name):
5570            logger.debug("staAuthorized: " + name)
5571            self.authorized = True
5572            dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
5573            dev1.request("DISCONNECT")
5574
5575        def staDeauthorized(self, name):
5576            logger.debug("staDeauthorized: " + name)
5577            self.deauthorized = True
5578
5579        def run_connect(self, *args):
5580            logger.debug("run_connect")
5581            args = dbus.Dictionary({'ssid': ssid,
5582                                    'key_mgmt': 'WPA-PSK',
5583                                    'psk': passphrase,
5584                                    'mode': 2,
5585                                    'frequency': 2412,
5586                                    'scan_freq': 2412},
5587                                   signature='sv')
5588            self.netw = iface.AddNetwork(args)
5589            iface.SelectNetwork(self.netw)
5590            return False
5591
5592        def success(self):
5593            return self.started and self.sta_added and self.sta_removed and \
5594                self.authorized and self.deauthorized
5595
5596    with TestDbusConnect(bus) as t:
5597        if not t.success():
5598            raise Exception("Expected signals not seen")
5599
5600def test_dbus_ap_scan(dev, apdev):
5601    """D-Bus AddNetwork for AP mode and scan"""
5602    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5603    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5604
5605    ssid = "test-wpa2-psk"
5606    passphrase = 'qwertyuiop'
5607
5608    hapd = hostapd.add_ap(apdev[0], {"ssid": "open"})
5609    bssid = hapd.own_addr()
5610
5611    class TestDbusConnect(TestDbus):
5612        def __init__(self, bus):
5613            TestDbus.__init__(self, bus)
5614            self.started = False
5615            self.scan_completed = False
5616
5617        def __enter__(self):
5618            gobject.timeout_add(1, self.run_connect)
5619            gobject.timeout_add(15000, self.timeout)
5620            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
5621                            "PropertiesChanged")
5622            self.add_signal(self.scanDone, WPAS_DBUS_IFACE, "ScanDone")
5623            self.loop.run()
5624            return self
5625
5626        def propertiesChanged(self, properties):
5627            logger.debug("propertiesChanged: %s" % str(properties))
5628            if 'State' in properties and properties['State'] == "completed":
5629                self.started = True
5630                logger.info("Try to scan in AP mode")
5631                iface.Scan({'Type': 'active',
5632                            'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
5633                logger.info("Scan() returned")
5634
5635        def scanDone(self, success):
5636            logger.debug("scanDone: success=%s" % success)
5637            if self.started:
5638                self.scan_completed = True
5639                self.loop.quit()
5640
5641        def run_connect(self, *args):
5642            logger.debug("run_connect")
5643            args = dbus.Dictionary({'ssid': ssid,
5644                                    'key_mgmt': 'WPA-PSK',
5645                                    'psk': passphrase,
5646                                    'mode': 2,
5647                                    'frequency': 2412,
5648                                    'scan_freq': 2412},
5649                                   signature='sv')
5650            self.netw = iface.AddNetwork(args)
5651            iface.SelectNetwork(self.netw)
5652            return False
5653
5654        def success(self):
5655            return self.started and self.scan_completed
5656
5657    with TestDbusConnect(bus) as t:
5658        if not t.success():
5659            raise Exception("Expected signals not seen")
5660
5661def test_dbus_connect_wpa_eap(dev, apdev):
5662    """D-Bus AddNetwork and connection with WPA+WPA2-Enterprise AP"""
5663    skip_without_tkip(dev[0])
5664    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5665    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5666
5667    ssid = "test-wpa-eap"
5668    params = hostapd.wpa_eap_params(ssid=ssid)
5669    params["wpa"] = "3"
5670    params["rsn_pairwise"] = "CCMP"
5671    hapd = hostapd.add_ap(apdev[0], params)
5672
5673    class TestDbusConnect(TestDbus):
5674        def __init__(self, bus):
5675            TestDbus.__init__(self, bus)
5676            self.done = False
5677
5678        def __enter__(self):
5679            gobject.timeout_add(1, self.run_connect)
5680            gobject.timeout_add(15000, self.timeout)
5681            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
5682                            "PropertiesChanged")
5683            self.add_signal(self.eap, WPAS_DBUS_IFACE, "EAP")
5684            self.loop.run()
5685            return self
5686
5687        def propertiesChanged(self, properties):
5688            logger.debug("propertiesChanged: %s" % str(properties))
5689            if 'State' in properties and properties['State'] == "completed":
5690                self.done = True
5691                self.loop.quit()
5692
5693        def eap(self, status, parameter):
5694            logger.debug("EAP: status=%s parameter=%s" % (status, parameter))
5695
5696        def run_connect(self, *args):
5697            logger.debug("run_connect")
5698            args = dbus.Dictionary({'ssid': ssid,
5699                                    'key_mgmt': 'WPA-EAP',
5700                                    'eap': 'PEAP',
5701                                    'identity': 'user',
5702                                    'password': 'password',
5703                                    'ca_cert': 'auth_serv/ca.pem',
5704                                    'phase2': 'auth=MSCHAPV2',
5705                                    'scan_freq': 2412},
5706                                   signature='sv')
5707            self.netw = iface.AddNetwork(args)
5708            iface.SelectNetwork(self.netw)
5709            return False
5710
5711        def success(self):
5712            return self.done
5713
5714    with TestDbusConnect(bus) as t:
5715        if not t.success():
5716            raise Exception("Expected signals not seen")
5717
5718def test_dbus_ap_scan_2_ap_mode_scan(dev, apdev):
5719    """AP_SCAN 2 AP mode and D-Bus Scan()"""
5720    try:
5721        _test_dbus_ap_scan_2_ap_mode_scan(dev, apdev)
5722    finally:
5723        dev[0].request("AP_SCAN 1")
5724
5725def _test_dbus_ap_scan_2_ap_mode_scan(dev, apdev):
5726    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5727    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5728
5729    if "OK" not in dev[0].request("AP_SCAN 2"):
5730        raise Exception("Failed to set AP_SCAN 2")
5731
5732    id = dev[0].add_network()
5733    dev[0].set_network(id, "mode", "2")
5734    dev[0].set_network_quoted(id, "ssid", "wpas-ap-open")
5735    dev[0].set_network(id, "key_mgmt", "NONE")
5736    dev[0].set_network(id, "frequency", "2412")
5737    dev[0].set_network(id, "scan_freq", "2412")
5738    dev[0].set_network(id, "disabled", "0")
5739    dev[0].select_network(id)
5740    ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=5)
5741    if ev is None:
5742        raise Exception("AP failed to start")
5743
5744    with fail_test(dev[0], 1, "wpa_driver_nl80211_scan"):
5745        iface.Scan({'Type': 'active',
5746                    'AllowRoam': True,
5747                    'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
5748        ev = dev[0].wait_event(["CTRL-EVENT-SCAN-FAILED",
5749                                "AP-DISABLED"], timeout=5)
5750        if ev is None:
5751            raise Exception("CTRL-EVENT-SCAN-FAILED not seen")
5752        if "AP-DISABLED" in ev:
5753            raise Exception("Unexpected AP-DISABLED event")
5754        if "retry=1" in ev:
5755            # Wait for the retry to scan happen
5756            ev = dev[0].wait_event(["CTRL-EVENT-SCAN-FAILED",
5757                                    "AP-DISABLED"], timeout=5)
5758            if ev is None:
5759                raise Exception("CTRL-EVENT-SCAN-FAILED not seen - retry")
5760            if "AP-DISABLED" in ev:
5761                raise Exception("Unexpected AP-DISABLED event - retry")
5762
5763    dev[1].connect("wpas-ap-open", key_mgmt="NONE", scan_freq="2412")
5764    dev[1].request("DISCONNECT")
5765    dev[1].wait_disconnected()
5766    dev[0].request("DISCONNECT")
5767    dev[0].wait_disconnected()
5768
5769def test_dbus_expectdisconnect(dev, apdev):
5770    """D-Bus ExpectDisconnect"""
5771    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5772    wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
5773
5774    params = {"ssid": "test-open"}
5775    hapd = hostapd.add_ap(apdev[0], params)
5776    dev[0].connect("test-open", key_mgmt="NONE", scan_freq="2412")
5777
5778    # This does not really verify the behavior other than by going through the
5779    # code path for additional coverage.
5780    wpas.ExpectDisconnect()
5781    dev[0].request("DISCONNECT")
5782    dev[0].wait_disconnected()
5783
5784def test_dbus_save_config(dev, apdev):
5785    """D-Bus SaveConfig"""
5786    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5787    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5788    try:
5789        iface.SaveConfig()
5790        raise Exception("SaveConfig() accepted unexpectedly")
5791    except dbus.exceptions.DBusException as e:
5792        if not str(e).startswith("fi.w1.wpa_supplicant1.UnknownError: Not allowed to update configuration"):
5793            raise Exception("Unexpected error message for SaveConfig(): " + str(e))
5794
5795def test_dbus_vendor_elem(dev, apdev):
5796    """D-Bus vendor element operations"""
5797    try:
5798        _test_dbus_vendor_elem(dev, apdev)
5799    finally:
5800        dev[0].request("VENDOR_ELEM_REMOVE 1 *")
5801
5802def _test_dbus_vendor_elem(dev, apdev):
5803    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5804    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5805
5806    dev[0].request("VENDOR_ELEM_REMOVE 1 *")
5807
5808    try:
5809        ie = dbus.ByteArray(b"\x00\x00")
5810        iface.VendorElemAdd(-1, ie)
5811        raise Exception("Invalid VendorElemAdd() accepted")
5812    except dbus.exceptions.DBusException as e:
5813        if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5814            raise Exception("Unexpected error message for invalid VendorElemAdd[1]: " + str(e))
5815
5816    try:
5817        ie = dbus.ByteArray(b'')
5818        iface.VendorElemAdd(1, ie)
5819        raise Exception("Invalid VendorElemAdd() accepted")
5820    except dbus.exceptions.DBusException as e:
5821        if "InvalidArgs" not in str(e) or "Invalid value" not in str(e):
5822            raise Exception("Unexpected error message for invalid VendorElemAdd[2]: " + str(e))
5823
5824    try:
5825        ie = dbus.ByteArray(b"\x00\x01")
5826        iface.VendorElemAdd(1, ie)
5827        raise Exception("Invalid VendorElemAdd() accepted")
5828    except dbus.exceptions.DBusException as e:
5829        if "InvalidArgs" not in str(e) or "Parse error" not in str(e):
5830            raise Exception("Unexpected error message for invalid VendorElemAdd[3]: " + str(e))
5831
5832    try:
5833        iface.VendorElemGet(-1)
5834        raise Exception("Invalid VendorElemGet() accepted")
5835    except dbus.exceptions.DBusException as e:
5836        if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5837            raise Exception("Unexpected error message for invalid VendorElemGet[1]: " + str(e))
5838
5839    try:
5840        iface.VendorElemGet(1)
5841        raise Exception("Invalid VendorElemGet() accepted")
5842    except dbus.exceptions.DBusException as e:
5843        if "InvalidArgs" not in str(e) or "ID value does not exist" not in str(e):
5844            raise Exception("Unexpected error message for invalid VendorElemGet[2]: " + str(e))
5845
5846    try:
5847        ie = dbus.ByteArray(b"\x00\x00")
5848        iface.VendorElemRem(-1, ie)
5849        raise Exception("Invalid VendorElemRemove() accepted")
5850    except dbus.exceptions.DBusException as e:
5851        if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5852            raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5853
5854    try:
5855        ie = dbus.ByteArray(b'')
5856        iface.VendorElemRem(1, ie)
5857        raise Exception("Invalid VendorElemRemove() accepted")
5858    except dbus.exceptions.DBusException as e:
5859        if "InvalidArgs" not in str(e) or "Invalid value" not in str(e):
5860            raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5861
5862    iface.VendorElemRem(1, b"*")
5863
5864    ie = dbus.ByteArray(b"\x00\x01\x00")
5865    iface.VendorElemAdd(1, ie)
5866
5867    val = iface.VendorElemGet(1)
5868    if len(val) != len(ie):
5869        raise Exception("Unexpected VendorElemGet length")
5870    for i in range(len(val)):
5871        if val[i] != dbus.Byte(ie[i]):
5872            raise Exception("Unexpected VendorElemGet data")
5873
5874    ie2 = dbus.ByteArray(b"\xe0\x00")
5875    iface.VendorElemAdd(1, ie2)
5876
5877    ies = ie + ie2
5878    val = iface.VendorElemGet(1)
5879    if len(val) != len(ies):
5880        raise Exception("Unexpected VendorElemGet length[2]")
5881    for i in range(len(val)):
5882        if val[i] != dbus.Byte(ies[i]):
5883            raise Exception("Unexpected VendorElemGet data[2]")
5884
5885    try:
5886        test_ie = dbus.ByteArray(b"\x01\x01")
5887        iface.VendorElemRem(1, test_ie)
5888        raise Exception("Invalid VendorElemRemove() accepted")
5889    except dbus.exceptions.DBusException as e:
5890        if "InvalidArgs" not in str(e) or "Parse error" not in str(e):
5891            raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5892
5893    iface.VendorElemRem(1, ie)
5894    val = iface.VendorElemGet(1)
5895    if len(val) != len(ie2):
5896        raise Exception("Unexpected VendorElemGet length[3]")
5897
5898    iface.VendorElemRem(1, b"*")
5899    try:
5900        iface.VendorElemGet(1)
5901        raise Exception("Invalid VendorElemGet() accepted after removal")
5902    except dbus.exceptions.DBusException as e:
5903        if "InvalidArgs" not in str(e) or "ID value does not exist" not in str(e):
5904            raise Exception("Unexpected error message for invalid VendorElemGet after removal: " + str(e))
5905
5906def test_dbus_assoc_reject(dev, apdev):
5907    """D-Bus AssocStatusCode"""
5908    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5909    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5910
5911    ssid = "test-open"
5912    params = {"ssid": ssid,
5913              "max_listen_interval": "1"}
5914    hapd = hostapd.add_ap(apdev[0], params)
5915
5916    class TestDbusConnect(TestDbus):
5917        def __init__(self, bus):
5918            TestDbus.__init__(self, bus)
5919            self.assoc_status_seen = False
5920            self.state = 0
5921
5922        def __enter__(self):
5923            gobject.timeout_add(1, self.run_connect)
5924            gobject.timeout_add(15000, self.timeout)
5925            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
5926                            "PropertiesChanged")
5927            self.loop.run()
5928            return self
5929
5930        def propertiesChanged(self, properties):
5931            logger.debug("propertiesChanged: %s" % str(properties))
5932            if 'AssocStatusCode' in properties:
5933                status = properties['AssocStatusCode']
5934                if status != 51:
5935                    logger.info("Unexpected status code: " + str(status))
5936                else:
5937                    self.assoc_status_seen = True
5938                iface.Disconnect()
5939                self.loop.quit()
5940
5941        def run_connect(self, *args):
5942            args = dbus.Dictionary({'ssid': ssid,
5943                                    'key_mgmt': 'NONE',
5944                                    'scan_freq': 2412},
5945                                   signature='sv')
5946            self.netw = iface.AddNetwork(args)
5947            iface.SelectNetwork(self.netw)
5948            return False
5949
5950        def success(self):
5951            return self.assoc_status_seen
5952
5953    with TestDbusConnect(bus) as t:
5954        if not t.success():
5955            raise Exception("Expected signals not seen")
5956
5957def test_dbus_mesh(dev, apdev):
5958    """D-Bus mesh"""
5959    check_mesh_support(dev[0])
5960    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5961    mesh = dbus.Interface(if_obj, WPAS_DBUS_IFACE_MESH)
5962
5963    add_open_mesh_network(dev[1])
5964    addr1 = dev[1].own_addr()
5965
5966    class TestDbusMesh(TestDbus):
5967        def __init__(self, bus):
5968            TestDbus.__init__(self, bus)
5969            self.done = False
5970
5971        def __enter__(self):
5972            gobject.timeout_add(1, self.run_test)
5973            gobject.timeout_add(15000, self.timeout)
5974            self.add_signal(self.meshGroupStarted, WPAS_DBUS_IFACE_MESH,
5975                            "MeshGroupStarted")
5976            self.add_signal(self.meshGroupRemoved, WPAS_DBUS_IFACE_MESH,
5977                            "MeshGroupRemoved")
5978            self.add_signal(self.meshPeerConnected, WPAS_DBUS_IFACE_MESH,
5979                            "MeshPeerConnected")
5980            self.add_signal(self.meshPeerDisconnected, WPAS_DBUS_IFACE_MESH,
5981                            "MeshPeerDisconnected")
5982            self.loop.run()
5983            return self
5984
5985        def meshGroupStarted(self, args):
5986            logger.debug("MeshGroupStarted: " + str(args))
5987
5988        def meshGroupRemoved(self, args):
5989            logger.debug("MeshGroupRemoved: " + str(args))
5990            self.done = True
5991            self.loop.quit()
5992
5993        def meshPeerConnected(self, args):
5994            logger.debug("MeshPeerConnected: " + str(args))
5995
5996            res = if_obj.Get(WPAS_DBUS_IFACE_MESH, 'MeshPeers',
5997                             dbus_interface=dbus.PROPERTIES_IFACE,
5998                             byte_arrays=True)
5999            logger.debug("MeshPeers: " + str(res))
6000            if len(res) != 1:
6001                raise Exception("Unexpected number of MeshPeer values")
6002            if binascii.hexlify(res[0]).decode() != addr1.replace(':', ''):
6003                raise Exception("Unexpected peer address")
6004
6005            res = if_obj.Get(WPAS_DBUS_IFACE_MESH, 'MeshGroup',
6006                             dbus_interface=dbus.PROPERTIES_IFACE,
6007                             byte_arrays=True)
6008            logger.debug("MeshGroup: " + str(res))
6009            if res != b"wpas-mesh-open":
6010                raise Exception("Unexpected MeshGroup")
6011            dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
6012            dev1.mesh_group_remove()
6013
6014        def meshPeerDisconnected(self, args):
6015            logger.debug("MeshPeerDisconnected: " + str(args))
6016            dev0 = WpaSupplicant('wlan0', '/tmp/wpas-wlan0')
6017            dev0.mesh_group_remove()
6018
6019        def run_test(self, *args):
6020            logger.debug("run_test")
6021            dev0 = WpaSupplicant('wlan0', '/tmp/wpas-wlan0')
6022            add_open_mesh_network(dev0)
6023            return False
6024
6025        def success(self):
6026            return self.done
6027
6028    with TestDbusMesh(bus) as t:
6029        if not t.success():
6030            raise Exception("Expected signals not seen")
6031
6032def test_dbus_roam(dev, apdev):
6033    """D-Bus Roam"""
6034    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
6035    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
6036
6037    ssid = "test-wpa2-psk"
6038    passphrase = 'qwertyuiop'
6039    params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
6040    hapd = hostapd.add_ap(apdev[0], params)
6041    hapd2 = hostapd.add_ap(apdev[1], params)
6042    bssid = apdev[0]['bssid']
6043    dev[0].scan_for_bss(bssid, freq=2412)
6044    bssid2 = apdev[1]['bssid']
6045    dev[0].scan_for_bss(bssid2, freq=2412)
6046
6047    class TestDbusConnect(TestDbus):
6048        def __init__(self, bus):
6049            TestDbus.__init__(self, bus)
6050            self.state = 0
6051
6052        def __enter__(self):
6053            gobject.timeout_add(1, self.run_connect)
6054            gobject.timeout_add(15000, self.timeout)
6055            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
6056                            "PropertiesChanged")
6057            self.loop.run()
6058            return self
6059
6060        def propertiesChanged(self, properties):
6061            logger.debug("propertiesChanged: %s" % str(properties))
6062            if 'State' in properties and properties['State'] == "completed":
6063                if self.state == 0:
6064                    self.state = 1
6065                    cur = properties["CurrentBSS"]
6066                    bss_obj = bus.get_object(WPAS_DBUS_SERVICE, cur)
6067                    res = bss_obj.Get(WPAS_DBUS_BSS, 'BSSID',
6068                                      dbus_interface=dbus.PROPERTIES_IFACE)
6069                    bssid_str = ''
6070                    for item in res:
6071                        if len(bssid_str) > 0:
6072                            bssid_str += ':'
6073                        bssid_str += '%02x' % item
6074                    dst = bssid if bssid_str == bssid2 else bssid2
6075                    iface.Roam(dst)
6076                elif self.state == 1:
6077                    if "RoamComplete" in properties and \
6078                       properties["RoamComplete"]:
6079                        self.state = 2
6080                        self.loop.quit()
6081
6082        def run_connect(self, *args):
6083            logger.debug("run_connect")
6084            args = dbus.Dictionary({'ssid': ssid,
6085                                    'key_mgmt': 'WPA-PSK',
6086                                    'psk': passphrase,
6087                                    'scan_freq': 2412},
6088                                   signature='sv')
6089            self.netw = iface.AddNetwork(args)
6090            iface.SelectNetwork(self.netw)
6091            return False
6092
6093        def success(self):
6094            return self.state == 2
6095
6096    with TestDbusConnect(bus) as t:
6097        if not t.success():
6098            raise Exception("Expected signals not seen")
6099
6100def test_dbus_creds(dev, apdev):
6101    "D-Bus interworking credentials"
6102    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
6103    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
6104
6105    args = {'domain': 'server.w1.fi',
6106            'realm': 'server.w1.fi',
6107            'roaming_consortium': '50a9bf',
6108            'required_roaming_consortium': '23bf50',
6109            'eap': 'TTLS',
6110            'phase2': 'auth=MSCHAPV2',
6111            'username': 'user',
6112            'password': 'password',
6113            'domain_suffix_match': 'server.w1.fi',
6114            'ca_cert': 'auth_serv/ca.pem'}
6115
6116    path = iface.AddCred(dbus.Dictionary(args, signature='sv'))
6117    for k, v in args.items():
6118        if k == 'password':
6119            continue
6120        prop = dev[0].get_cred(0, k)
6121        if prop != v:
6122            raise Exception('Credential add failed: %s does not match %s' % (prop, v))
6123
6124    iface.RemoveCred(path)
6125    if not "FAIL" in dev[0].get_cred(0, 'domain'):
6126        raise Exception("Credential remove failed")
6127
6128    # Removal of multiple credentials
6129    cred1 = {'domain': 'server1.w1.fi','realm': 'server1.w1.fi','eap': 'TTLS'}
6130    iface.AddCred(dbus.Dictionary(cred1, signature='sv'))
6131    if "FAIL" in dev[0].get_cred(0, 'domain'):
6132        raise Exception("Failed to add credential")
6133
6134    cred2 = {'domain': 'server2.w1.fi','realm': 'server2.w1.fi','eap': 'TTLS'}
6135    iface.AddCred(dbus.Dictionary(cred2, signature='sv'))
6136    if "FAIL" in dev[0].get_cred(1, 'domain'):
6137        raise Exception("Failed to add credential")
6138
6139    iface.RemoveAllCreds()
6140    if not "FAIL" in dev[0].get_cred(0, 'domain'):
6141        raise Exception("Credential remove failed")
6142    if not "FAIL" in dev[0].get_cred(1, 'domain'):
6143        raise Exception("Credential remove failed")
6144
6145def test_dbus_interworking(dev, apdev):
6146    "D-Bus interworking selection"
6147    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
6148    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
6149
6150    params = {"ssid": "test-interworking", "wpa": "2",
6151              "wpa_key_mgmt": "WPA-EAP", "rsn_pairwise": "CCMP",
6152              "ieee8021x": "1", "eapol_version": "2",
6153              "eap_server": "1", "eap_user_file": "auth_serv/eap_user.conf",
6154              "ca_cert": "auth_serv/ca.pem",
6155              "server_cert": "auth_serv/server.pem",
6156              "private_key": "auth_serv/server.key",
6157              "interworking": "1",
6158              "domain_name": "server.w1.fi",
6159              "nai_realm": "0,server.w1.fi,21[2:4][5:7]",
6160              "roaming_consortium": "2233445566",
6161              "hs20": "1", "anqp_domain_id": "1234"}
6162
6163    hapd = hostapd.add_ap(apdev[0], params)
6164
6165    class TestDbusInterworking(TestDbus):
6166        def __init__(self, bus):
6167            TestDbus.__init__(self, bus)
6168            self.interworking_ap_seen = False
6169            self.interworking_select_done = False
6170
6171        def __enter__(self):
6172            gobject.timeout_add(1, self.run_select)
6173            gobject.timeout_add(15000, self.timeout)
6174            self.add_signal(self.interworkingAPAdded, WPAS_DBUS_IFACE,
6175                            "InterworkingAPAdded")
6176            self.add_signal(self.interworkingSelectDone, WPAS_DBUS_IFACE,
6177                            "InterworkingSelectDone")
6178            self.loop.run()
6179            return self
6180
6181        def interworkingAPAdded(self, bss, cred, properties):
6182            logger.debug("interworkingAPAdded: bss=%s cred=%s %s" % (bss, cred, str(properties)))
6183            if self.cred == cred:
6184                self.interworking_ap_seen = True
6185
6186        def interworkingSelectDone(self):
6187            logger.debug("interworkingSelectDone")
6188            self.interworking_select_done = True
6189            self.loop.quit()
6190
6191        def run_select(self, *args):
6192            args = {"domain": "server.w1.fi",
6193                    "realm": "server.w1.fi",
6194                    "eap": "TTLS",
6195                    "phase2": "auth=MSCHAPV2",
6196                    "username": "user",
6197                    "password": "password",
6198                    "domain_suffix_match": "server.w1.fi",
6199                    "ca_cert": "auth_serv/ca.pem"}
6200            self.cred = iface.AddCred(dbus.Dictionary(args, signature='sv'))
6201            iface.InterworkingSelect()
6202            return False
6203
6204        def success(self):
6205            return self.interworking_ap_seen and self.interworking_select_done
6206
6207    with TestDbusInterworking(bus) as t:
6208        if not t.success():
6209            raise Exception("Expected signals not seen")
6210