]> git.ipfire.org Git - thirdparty/hostap.git/blame - tests/hwsim/test_dbus.py
tests: Avoid confusing "DETACH failed" exception prints in D-Bus tests
[thirdparty/hostap.git] / tests / hwsim / test_dbus.py
CommitLineData
2ec82e67 1# wpa_supplicant D-Bus interface tests
910eb5b5 2# Copyright (c) 2014-2015, Jouni Malinen <j@w1.fi>
2ec82e67
JM
3#
4# This software may be distributed under the terms of the BSD license.
5# See README for more details.
6
7import binascii
2ec82e67
JM
8import logging
9logger = logging.getLogger()
10import subprocess
11import time
c37ef330 12import struct
5c9ba341 13import sys
2ec82e67 14
910eb5b5 15try:
5c9ba341
JM
16 if sys.version_info[0] > 2:
17 from gi.repository import GObject as gobject
18 else:
19 import gobject
910eb5b5
JM
20 import dbus
21 dbus_imported = True
22except ImportError:
23 dbus_imported = False
24
2ec82e67
JM
25import hostapd
26from wpasupplicant import WpaSupplicant
708ec753 27from utils import HwsimSkip, alloc_fail, fail_test
1992af11 28from p2p_utils import *
2ec82e67 29from test_ap_tdls import connect_2sta_open
089e7ca3 30from test_ap_eap import check_altsubject_match_support
afe28053 31from test_nfc_p2p import set_ip_addr_info
504c7ffd 32from test_wpas_mesh import check_mesh_support, add_open_mesh_network
2ec82e67
JM
33
34WPAS_DBUS_SERVICE = "fi.w1.wpa_supplicant1"
35WPAS_DBUS_PATH = "/fi/w1/wpa_supplicant1"
36WPAS_DBUS_IFACE = "fi.w1.wpa_supplicant1.Interface"
37WPAS_DBUS_IFACE_WPS = WPAS_DBUS_IFACE + ".WPS"
38WPAS_DBUS_NETWORK = "fi.w1.wpa_supplicant1.Network"
39WPAS_DBUS_BSS = "fi.w1.wpa_supplicant1.BSS"
40WPAS_DBUS_IFACE_P2PDEVICE = WPAS_DBUS_IFACE + ".P2PDevice"
41WPAS_DBUS_P2P_PEER = "fi.w1.wpa_supplicant1.Peer"
42WPAS_DBUS_GROUP = "fi.w1.wpa_supplicant1.Group"
43WPAS_DBUS_PERSISTENT_GROUP = "fi.w1.wpa_supplicant1.PersistentGroup"
504c7ffd 44WPAS_DBUS_IFACE_MESH = WPAS_DBUS_IFACE + ".Mesh"
2ec82e67
JM
45
46def prepare_dbus(dev):
910eb5b5
JM
47 if not dbus_imported:
48 logger.info("No dbus module available")
51c5aeb4 49 raise HwsimSkip("No dbus module available")
2ec82e67 50 try:
2ec82e67
JM
51 from dbus.mainloop.glib import DBusGMainLoop
52 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
53 bus = dbus.SystemBus()
54 wpas_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH)
55 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
56 path = wpas.GetInterface(dev.ifname)
57 if_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
fab49f61 58 return (bus, wpas_obj, path, if_obj)
bab493b9 59 except Exception as e:
51c5aeb4 60 raise HwsimSkip("Could not connect to D-Bus: %s" % e)
2ec82e67
JM
61
62class TestDbus(object):
63 def __init__(self, bus):
64 self.loop = gobject.MainLoop()
65 self.signals = []
66 self.bus = bus
67
68 def __exit__(self, type, value, traceback):
69 for s in self.signals:
70 s.remove()
71
72 def add_signal(self, handler, interface, name, byte_arrays=False):
73 s = self.bus.add_signal_receiver(handler, dbus_interface=interface,
74 signal_name=name,
75 byte_arrays=byte_arrays)
76 self.signals.append(s)
77
78 def timeout(self, *args):
79 logger.debug("timeout")
80 self.loop.quit()
81 return False
82
0e126c6d
JM
83class alloc_fail_dbus(object):
84 def __init__(self, dev, count, funcs, operation="Operation",
85 expected="NoMemory"):
86 self._dev = dev
87 self._count = count
88 self._funcs = funcs
89 self._operation = operation
90 self._expected = expected
91 def __enter__(self):
92 cmd = "TEST_ALLOC_FAIL %d:%s" % (self._count, self._funcs)
93 if "OK" not in self._dev.request(cmd):
94 raise HwsimSkip("TEST_ALLOC_FAIL not supported")
95 def __exit__(self, type, value, traceback):
96 if type is None:
97 raise Exception("%s succeeded during out-of-memory" % self._operation)
98 if type == dbus.exceptions.DBusException and self._expected in str(value):
99 return True
100 if self._dev.request("GET_ALLOC_FAIL") != "0:%s" % self._funcs:
101 raise Exception("%s did not trigger allocation failure" % self._operation)
102 return False
103
3a59cda1
JM
104def start_ap(ap, ssid="test-wps",
105 ap_uuid="27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"):
fab49f61
JM
106 params = {"ssid": ssid, "eap_server": "1", "wps_state": "2",
107 "wpa_passphrase": "12345678", "wpa": "2",
108 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
109 "ap_pin": "12345670", "uuid": ap_uuid}
afc26df2 110 return hostapd.add_ap(ap, params)
2ec82e67
JM
111
112def test_dbus_getall(dev, apdev):
113 """D-Bus GetAll"""
fab49f61 114 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
115
116 props = wpas_obj.GetAll(WPAS_DBUS_SERVICE,
117 dbus_interface=dbus.PROPERTIES_IFACE)
118 logger.debug("GetAll(fi.w1.wpa.supplicant1, /fi/w1/wpa_supplicant1) ==> " + str(props))
119
120 props = if_obj.GetAll(WPAS_DBUS_IFACE,
121 dbus_interface=dbus.PROPERTIES_IFACE)
122 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE, path, str(props)))
123
124 props = if_obj.GetAll(WPAS_DBUS_IFACE_WPS,
125 dbus_interface=dbus.PROPERTIES_IFACE)
126 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE_WPS, path, str(props)))
127
128 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
129 dbus_interface=dbus.PROPERTIES_IFACE)
130 if len(res) != 0:
131 raise Exception("Unexpected BSSs entry: " + str(res))
132
133 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
134 dbus_interface=dbus.PROPERTIES_IFACE)
135 if len(res) != 0:
136 raise Exception("Unexpected Networks entry: " + str(res))
137
fab49f61 138 hapd = hostapd.add_ap(apdev[0], {"ssid": "open"})
2ec82e67
JM
139 bssid = apdev[0]['bssid']
140 dev[0].scan_for_bss(bssid, freq=2412)
141 id = dev[0].add_network()
142 dev[0].set_network(id, "disabled", "0")
143 dev[0].set_network_quoted(id, "ssid", "test")
144
145 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
146 dbus_interface=dbus.PROPERTIES_IFACE)
147 if len(res) != 1:
148 raise Exception("Missing BSSs entry: " + str(res))
149 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
150 props = bss_obj.GetAll(WPAS_DBUS_BSS, dbus_interface=dbus.PROPERTIES_IFACE)
151 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS, res[0], str(props)))
152 bssid_str = ''
153 for item in props['BSSID']:
154 if len(bssid_str) > 0:
155 bssid_str += ':'
156 bssid_str += '%02x' % item
157 if bssid_str != bssid:
158 raise Exception("Unexpected BSSID in BSSs entry")
159
160 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
161 dbus_interface=dbus.PROPERTIES_IFACE)
162 if len(res) != 1:
163 raise Exception("Missing Networks entry: " + str(res))
164 net_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
165 props = net_obj.GetAll(WPAS_DBUS_NETWORK,
166 dbus_interface=dbus.PROPERTIES_IFACE)
167 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_NETWORK, res[0], str(props)))
168 ssid = props['Properties']['ssid']
169 if ssid != '"test"':
170 raise Exception("Unexpected SSID in network entry")
171
73ece491
JM
172def test_dbus_getall_oom(dev, apdev):
173 """D-Bus GetAll wpa_config_get_all() OOM"""
fab49f61 174 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
73ece491
JM
175
176 id = dev[0].add_network()
177 dev[0].set_network(id, "disabled", "0")
178 dev[0].set_network_quoted(id, "ssid", "test")
179
180 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
181 dbus_interface=dbus.PROPERTIES_IFACE)
182 if len(res) != 1:
183 raise Exception("Missing Networks entry: " + str(res))
184 net_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
185 for i in range(1, 50):
186 with alloc_fail(dev[0], i, "wpa_config_get_all"):
187 try:
188 props = net_obj.GetAll(WPAS_DBUS_NETWORK,
189 dbus_interface=dbus.PROPERTIES_IFACE)
bab493b9 190 except dbus.exceptions.DBusException as e:
73ece491
JM
191 pass
192
2ec82e67
JM
193def dbus_get(dbus, wpas_obj, prop, expect=None, byte_arrays=False):
194 val = wpas_obj.Get(WPAS_DBUS_SERVICE, prop,
195 dbus_interface=dbus.PROPERTIES_IFACE,
196 byte_arrays=byte_arrays)
197 if expect is not None and val != expect:
198 raise Exception("Unexpected %s: %s (expected: %s)" %
199 (prop, str(val), str(expect)))
200 return val
201
202def dbus_set(dbus, wpas_obj, prop, val):
203 wpas_obj.Set(WPAS_DBUS_SERVICE, prop, val,
204 dbus_interface=dbus.PROPERTIES_IFACE)
205
206def test_dbus_properties(dev, apdev):
207 """D-Bus Get/Set fi.w1.wpa_supplicant1 properties"""
fab49f61 208 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
209
210 dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump")
211 dbus_set(dbus, wpas_obj, "DebugLevel", "debug")
212 dbus_get(dbus, wpas_obj, "DebugLevel", expect="debug")
fab49f61
JM
213 for (val, err) in [(3, "Error.Failed: wrong property type"),
214 ("foo", "Error.Failed: wrong debug level value")]:
2ec82e67
JM
215 try:
216 dbus_set(dbus, wpas_obj, "DebugLevel", val)
217 raise Exception("Invalid DebugLevel value accepted: " + str(val))
bab493b9 218 except dbus.exceptions.DBusException as e:
2ec82e67
JM
219 if err not in str(e):
220 raise Exception("Unexpected error message: " + str(e))
221 dbus_set(dbus, wpas_obj, "DebugLevel", "msgdump")
222 dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump")
223
224 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True)
225 dbus_set(dbus, wpas_obj, "DebugTimestamp", False)
226 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=False)
227 try:
228 dbus_set(dbus, wpas_obj, "DebugTimestamp", "foo")
229 raise Exception("Invalid DebugTimestamp value accepted")
bab493b9 230 except dbus.exceptions.DBusException as e:
2ec82e67
JM
231 if "Error.Failed: wrong property type" not in str(e):
232 raise Exception("Unexpected error message: " + str(e))
233 dbus_set(dbus, wpas_obj, "DebugTimestamp", True)
234 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True)
235
236 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True)
237 dbus_set(dbus, wpas_obj, "DebugShowKeys", False)
238 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=False)
239 try:
240 dbus_set(dbus, wpas_obj, "DebugShowKeys", "foo")
241 raise Exception("Invalid DebugShowKeys value accepted")
bab493b9 242 except dbus.exceptions.DBusException as e:
2ec82e67
JM
243 if "Error.Failed: wrong property type" not in str(e):
244 raise Exception("Unexpected error message: " + str(e))
245 dbus_set(dbus, wpas_obj, "DebugShowKeys", True)
246 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True)
247
248 res = dbus_get(dbus, wpas_obj, "Interfaces")
249 if len(res) != 1:
250 raise Exception("Unexpected Interfaces value: " + str(res))
251
252 res = dbus_get(dbus, wpas_obj, "EapMethods")
253 if len(res) < 5 or "TTLS" not in res:
254 raise Exception("Unexpected EapMethods value: " + str(res))
255
256 res = dbus_get(dbus, wpas_obj, "Capabilities")
257 if len(res) < 2 or "p2p" not in res:
258 raise Exception("Unexpected Capabilities value: " + str(res))
259
260 dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
261 val = binascii.unhexlify("010006020304050608")
262 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val))
263 res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
264 if val != res:
265 raise Exception("WFDIEs value changed")
266 try:
15dfcb69 267 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(b'\x00'))
2ec82e67 268 raise Exception("Invalid WFDIEs value accepted")
bab493b9 269 except dbus.exceptions.DBusException as e:
2ec82e67
JM
270 if "InvalidArgs" not in str(e):
271 raise Exception("Unexpected error message: " + str(e))
15dfcb69 272 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(b''))
2ec82e67 273 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val))
15dfcb69 274 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(b''))
2ec82e67
JM
275 res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
276 if len(res) != 0:
277 raise Exception("WFDIEs not cleared properly")
278
279 res = dbus_get(dbus, wpas_obj, "EapMethods")
280 try:
281 dbus_set(dbus, wpas_obj, "EapMethods", res)
282 raise Exception("Invalid Set accepted")
bab493b9 283 except dbus.exceptions.DBusException as e:
2ec82e67
JM
284 if "InvalidArgs: Property is read-only" not in str(e):
285 raise Exception("Unexpected error message: " + str(e))
286
287 try:
288 wpas_obj.SetFoo(WPAS_DBUS_SERVICE, "DebugShowKeys", True,
289 dbus_interface=dbus.PROPERTIES_IFACE)
290 raise Exception("Unknown method accepted")
bab493b9 291 except dbus.exceptions.DBusException as e:
2ec82e67
JM
292 if "UnknownMethod" not in str(e):
293 raise Exception("Unexpected error message: " + str(e))
294
295 try:
296 wpas_obj.Get("foo", "DebugShowKeys",
297 dbus_interface=dbus.PROPERTIES_IFACE)
298 raise Exception("Invalid Get accepted")
bab493b9 299 except dbus.exceptions.DBusException as e:
2ec82e67
JM
300 if "InvalidArgs: No such property" not in str(e):
301 raise Exception("Unexpected error message: " + str(e))
302
303 test_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH,
304 introspect=False)
305 try:
306 test_obj.Get(123, "DebugShowKeys",
307 dbus_interface=dbus.PROPERTIES_IFACE)
308 raise Exception("Invalid Get accepted")
bab493b9 309 except dbus.exceptions.DBusException as e:
2ec82e67
JM
310 if "InvalidArgs: Invalid arguments" not in str(e):
311 raise Exception("Unexpected error message: " + str(e))
312 try:
313 test_obj.Get(WPAS_DBUS_SERVICE, 123,
314 dbus_interface=dbus.PROPERTIES_IFACE)
315 raise Exception("Invalid Get accepted")
bab493b9 316 except dbus.exceptions.DBusException as e:
2ec82e67
JM
317 if "InvalidArgs: Invalid arguments" not in str(e):
318 raise Exception("Unexpected error message: " + str(e))
319
320 try:
321 wpas_obj.Set(WPAS_DBUS_SERVICE, "WFDIEs",
15dfcb69 322 dbus.ByteArray(b'', variant_level=2),
2ec82e67
JM
323 dbus_interface=dbus.PROPERTIES_IFACE)
324 raise Exception("Invalid Set accepted")
bab493b9 325 except dbus.exceptions.DBusException as e:
2ec82e67
JM
326 if "InvalidArgs: invalid message format" not in str(e):
327 raise Exception("Unexpected error message: " + str(e))
328
f0ef6889
DW
329def test_dbus_set_global_properties(dev, apdev):
330 """D-Bus Get/Set fi.w1.wpa_supplicant1 interface global properties"""
fab49f61 331 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
f0ef6889 332
e51e49fc 333 dev[0].set("model_name", "")
fab49f61 334 props = [('Okc', '0', '1'), ('ModelName', '', 'blahblahblah')]
f0ef6889
DW
335
336 for p in props:
337 res = if_obj.Get(WPAS_DBUS_IFACE, p[0],
338 dbus_interface=dbus.PROPERTIES_IFACE)
339 if res != p[1]:
340 raise Exception("Unexpected " + p[0] + " value: " + str(res))
341
342 if_obj.Set(WPAS_DBUS_IFACE, p[0], p[2],
343 dbus_interface=dbus.PROPERTIES_IFACE)
344
345 res = if_obj.Get(WPAS_DBUS_IFACE, p[0],
346 dbus_interface=dbus.PROPERTIES_IFACE)
347 if res != p[2]:
348 raise Exception("Unexpected " + p[0] + " value after set: " + str(res))
e51e49fc 349 dev[0].set("model_name", "")
f0ef6889 350
2ec82e67
JM
351def test_dbus_invalid_method(dev, apdev):
352 """D-Bus invalid method"""
fab49f61 353 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
354 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
355
356 try:
357 wps.Foo()
358 raise Exception("Unknown method accepted")
bab493b9 359 except dbus.exceptions.DBusException as e:
2ec82e67
JM
360 if "UnknownMethod" not in str(e):
361 raise Exception("Unexpected error message: " + str(e))
362
363 test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False)
364 test_wps = dbus.Interface(test_obj, WPAS_DBUS_IFACE_WPS)
365 try:
366 test_wps.Start(123)
367 raise Exception("WPS.Start with incorrect signature accepted")
bab493b9 368 except dbus.exceptions.DBusException as e:
2ec82e67
JM
369 if "InvalidArgs: Invalid arg" not in str(e):
370 raise Exception("Unexpected error message: " + str(e))
371
372def test_dbus_get_set_wps(dev, apdev):
373 """D-Bus Get/Set for WPS properties"""
374 try:
81e787b7 375 _test_dbus_get_set_wps(dev, apdev)
2ec82e67
JM
376 finally:
377 dev[0].request("SET wps_cred_processing 0")
364e28c9 378 dev[0].request("SET config_methods display keypad virtual_display nfc_interface p2ps")
4e6bd667
JM
379 dev[0].set("device_name", "Device A")
380 dev[0].set("manufacturer", "")
381 dev[0].set("model_name", "")
382 dev[0].set("model_number", "")
383 dev[0].set("serial_number", "")
384 dev[0].set("device_type", "0-00000000-0")
2ec82e67
JM
385
386def _test_dbus_get_set_wps(dev, apdev):
fab49f61 387 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
388
389 if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
390 dbus_interface=dbus.PROPERTIES_IFACE)
391
392 val = "display keypad virtual_display nfc_interface"
393 dev[0].request("SET config_methods " + val)
394
395 config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
396 dbus_interface=dbus.PROPERTIES_IFACE)
397 if config != val:
398 raise Exception("Unexpected Get(ConfigMethods) result: " + config)
399
400 val2 = "push_button display"
401 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2,
402 dbus_interface=dbus.PROPERTIES_IFACE)
403 config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
404 dbus_interface=dbus.PROPERTIES_IFACE)
405 if config != val2:
406 raise Exception("Unexpected Get(ConfigMethods) result after Set: " + config)
407
408 dev[0].request("SET config_methods " + val)
409
410 for i in range(3):
411 dev[0].request("SET wps_cred_processing " + str(i))
412 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
413 dbus_interface=dbus.PROPERTIES_IFACE)
414 expected_val = False if i == 1 else True
415 if val != expected_val:
416 raise Exception("Unexpected Get(ProcessCredentials) result({}): {}".format(i, val))
417
fab49f61
JM
418 tests = [("device_name", "DeviceName"),
419 ("manufacturer", "Manufacturer"),
420 ("model_name", "ModelName"),
421 ("model_number", "ModelNumber"),
422 ("serial_number", "SerialNumber")]
4e6bd667 423
fab49f61 424 for f1, f2 in tests:
4e6bd667
JM
425 val2 = "test-value-test"
426 dev[0].set(f1, val2)
427 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, f2,
428 dbus_interface=dbus.PROPERTIES_IFACE)
429 if val != val2:
430 raise Exception("Get(%s) returned unexpected value" % f2)
431 val2 = "TEST-value"
432 if_obj.Set(WPAS_DBUS_IFACE_WPS, f2, val2,
433 dbus_interface=dbus.PROPERTIES_IFACE)
434 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, f2,
435 dbus_interface=dbus.PROPERTIES_IFACE)
436 if val != val2:
437 raise Exception("Get(%s) returned unexpected value after Set" % f2)
438
439 dev[0].set("device_type", "5-0050F204-1")
440 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "DeviceType",
441 dbus_interface=dbus.PROPERTIES_IFACE)
442 if val[0] != 0x00 or val[1] != 0x05 != val[2] != 0x00 or val[3] != 0x50 or val[4] != 0xf2 or val[5] != 0x04 or val[6] != 0x00 or val[7] != 0x01:
443 raise Exception("DeviceType mismatch")
444 if_obj.Set(WPAS_DBUS_IFACE_WPS, "DeviceType", val,
445 dbus_interface=dbus.PROPERTIES_IFACE)
446 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "DeviceType",
447 dbus_interface=dbus.PROPERTIES_IFACE)
448 if val[0] != 0x00 or val[1] != 0x05 != val[2] != 0x00 or val[3] != 0x50 or val[4] != 0xf2 or val[5] != 0x04 or val[6] != 0x00 or val[7] != 0x01:
449 raise Exception("DeviceType mismatch after Set")
450
15dfcb69 451 val2 = b'\x01\x02\x03\x04\x05\x06\x07\x08'
4e6bd667
JM
452 if_obj.Set(WPAS_DBUS_IFACE_WPS, "DeviceType", dbus.ByteArray(val2),
453 dbus_interface=dbus.PROPERTIES_IFACE)
454 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "DeviceType",
455 dbus_interface=dbus.PROPERTIES_IFACE,
456 byte_arrays=True)
457 if val != val2:
458 raise Exception("DeviceType mismatch after Set (2)")
459
2ec82e67
JM
460 class TestDbusGetSet(TestDbus):
461 def __init__(self, bus):
462 TestDbus.__init__(self, bus)
463 self.signal_received = False
464 self.signal_received_deprecated = False
465 self.sets_done = False
466
467 def __enter__(self):
468 gobject.timeout_add(1, self.run_sets)
469 gobject.timeout_add(1000, self.timeout)
470 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE_WPS,
471 "PropertiesChanged")
472 self.add_signal(self.propertiesChanged2, dbus.PROPERTIES_IFACE,
473 "PropertiesChanged")
474 self.loop.run()
475 return self
476
477 def propertiesChanged(self, properties):
478 logger.debug("PropertiesChanged: " + str(properties))
b723b259 479 if "ProcessCredentials" in properties:
2ec82e67
JM
480 self.signal_received_deprecated = True
481 if self.sets_done and self.signal_received:
482 self.loop.quit()
483
484 def propertiesChanged2(self, interface_name, changed_properties,
485 invalidated_properties):
486 logger.debug("propertiesChanged2: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
487 if interface_name != WPAS_DBUS_IFACE_WPS:
488 return
b723b259 489 if "ProcessCredentials" in changed_properties:
2ec82e67
JM
490 self.signal_received = True
491 if self.sets_done and self.signal_received_deprecated:
492 self.loop.quit()
493
494 def run_sets(self, *args):
495 logger.debug("run_sets")
496 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
497 dbus.Boolean(1),
498 dbus_interface=dbus.PROPERTIES_IFACE)
499 if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
500 dbus_interface=dbus.PROPERTIES_IFACE) != True:
bc6e3288 501 raise Exception("Unexpected Get(ProcessCredentials) result after Set")
2ec82e67
JM
502 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
503 dbus.Boolean(0),
504 dbus_interface=dbus.PROPERTIES_IFACE)
505 if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
506 dbus_interface=dbus.PROPERTIES_IFACE) != False:
bc6e3288 507 raise Exception("Unexpected Get(ProcessCredentials) result after Set")
2ec82e67
JM
508
509 self.dbus_sets_done = True
510 return False
511
512 def success(self):
513 return self.signal_received and self.signal_received_deprecated
514
515 with TestDbusGetSet(bus) as t:
516 if not t.success():
517 raise Exception("No signal received for ProcessCredentials change")
518
519def test_dbus_wps_invalid(dev, apdev):
520 """D-Bus invaldi WPS operation"""
fab49f61 521 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
522 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
523
fab49f61
JM
524 failures = [{'Role': 'foo', 'Type': 'pbc'},
525 {'Role': 123, 'Type': 'pbc'},
526 {'Type': 'pbc'},
527 {'Role': 'enrollee'},
528 {'Role': 'registrar'},
529 {'Role': 'enrollee', 'Type': 123},
530 {'Role': 'enrollee', 'Type': 'foo'},
531 {'Role': 'enrollee', 'Type': 'pbc',
532 'Bssid': '02:33:44:55:66:77'},
533 {'Role': 'enrollee', 'Type': 'pin', 'Pin': 123},
534 {'Role': 'enrollee', 'Type': 'pbc',
535 'Bssid': dbus.ByteArray(b'12345')},
536 {'Role': 'enrollee', 'Type': 'pbc',
537 'P2PDeviceAddress': 12345},
538 {'Role': 'enrollee', 'Type': 'pbc',
539 'P2PDeviceAddress': dbus.ByteArray(b'12345')},
540 {'Role': 'enrollee', 'Type': 'pbc', 'Foo': 'bar'}]
2ec82e67
JM
541 for args in failures:
542 try:
543 wps.Start(args)
544 raise Exception("Invalid WPS.Start() arguments accepted: " + str(args))
bab493b9 545 except dbus.exceptions.DBusException as e:
2ec82e67
JM
546 if not str(e).startswith("fi.w1.wpa_supplicant1.InvalidArgs"):
547 raise Exception("Unexpected error message: " + str(e))
548
0e126c6d
JM
549def test_dbus_wps_oom(dev, apdev):
550 """D-Bus WPS operation (OOM)"""
fab49f61 551 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
0e126c6d
JM
552 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
553
554 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_state", "Get"):
555 if_obj.Get(WPAS_DBUS_IFACE, "State",
556 dbus_interface=dbus.PROPERTIES_IFACE)
557
fab49f61 558 hapd = hostapd.add_ap(apdev[0], {"ssid": "open"})
0e126c6d
JM
559 bssid = apdev[0]['bssid']
560 dev[0].scan_for_bss(bssid, freq=2412)
561
1d32bc2c 562 time.sleep(0.05)
0e126c6d
JM
563 for i in range(1, 3):
564 with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_bsss", "Get"):
565 if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
566 dbus_interface=dbus.PROPERTIES_IFACE)
567
568 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
569 dbus_interface=dbus.PROPERTIES_IFACE)
570 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
571 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_bss_rates", "Get"):
572 bss_obj.Get(WPAS_DBUS_BSS, "Rates",
573 dbus_interface=dbus.PROPERTIES_IFACE)
20c7d26f
JM
574 with alloc_fail(dev[0], 1,
575 "wpa_bss_get_bit_rates;wpas_dbus_getter_bss_rates"):
576 try:
577 bss_obj.Get(WPAS_DBUS_BSS, "Rates",
578 dbus_interface=dbus.PROPERTIES_IFACE)
bab493b9 579 except dbus.exceptions.DBusException as e:
20c7d26f 580 pass
0e126c6d
JM
581
582 id = dev[0].add_network()
583 dev[0].set_network(id, "disabled", "0")
584 dev[0].set_network_quoted(id, "ssid", "test")
585
586 for i in range(1, 3):
587 with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_networks", "Get"):
588 if_obj.Get(WPAS_DBUS_IFACE, "Networks",
589 dbus_interface=dbus.PROPERTIES_IFACE)
590
591 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_getter_interfaces", "Get"):
592 dbus_get(dbus, wpas_obj, "Interfaces")
593
594 for i in range(1, 6):
595 with alloc_fail_dbus(dev[0], i, "=eap_get_names_as_string_array;wpas_dbus_getter_eap_methods", "Get"):
596 dbus_get(dbus, wpas_obj, "EapMethods")
597
598 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_setter_config_methods", "Set",
599 expected="Error.Failed: Failed to set property"):
600 val2 = "push_button display"
601 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2,
602 dbus_interface=dbus.PROPERTIES_IFACE)
603
604 with alloc_fail_dbus(dev[0], 1, "=wpa_config_add_network;wpas_dbus_handler_wps_start",
605 "WPS.Start",
606 expected="UnknownError: WPS start failed"):
607 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670'})
608
2ec82e67
JM
609def test_dbus_wps_pbc(dev, apdev):
610 """D-Bus WPS/PBC operation and signals"""
611 try:
81e787b7 612 _test_dbus_wps_pbc(dev, apdev)
2ec82e67
JM
613 finally:
614 dev[0].request("SET wps_cred_processing 0")
615
616def _test_dbus_wps_pbc(dev, apdev):
fab49f61 617 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
618 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
619
620 hapd = start_ap(apdev[0])
621 hapd.request("WPS_PBC")
622 bssid = apdev[0]['bssid']
623 dev[0].scan_for_bss(bssid, freq="2412")
624 dev[0].request("SET wps_cred_processing 2")
625
1d0a917f
JM
626 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
627 dbus_interface=dbus.PROPERTIES_IFACE)
628 if len(res) != 1:
629 raise Exception("Missing BSSs entry: " + str(res))
630 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
631 props = bss_obj.GetAll(WPAS_DBUS_BSS, dbus_interface=dbus.PROPERTIES_IFACE)
632 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS, res[0], str(props)))
633 if 'WPS' not in props:
634 raise Exception("No WPS information in the BSS entry")
635 if 'Type' not in props['WPS']:
636 raise Exception("No Type field in the WPS dictionary")
637 if props['WPS']['Type'] != 'pbc':
638 raise Exception("Unexpected WPS Type: " + props['WPS']['Type'])
639
2ec82e67
JM
640 class TestDbusWps(TestDbus):
641 def __init__(self, bus, wps):
642 TestDbus.__init__(self, bus)
643 self.success_seen = False
644 self.credentials_received = False
645 self.wps = wps
646
647 def __enter__(self):
648 gobject.timeout_add(1, self.start_pbc)
649 gobject.timeout_add(15000, self.timeout)
650 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
651 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
652 "Credentials")
653 self.loop.run()
654 return self
655
656 def wpsEvent(self, name, args):
657 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
658 if name == "success":
659 self.success_seen = True
660 if self.credentials_received:
661 self.loop.quit()
662
663 def credentials(self, args):
664 logger.debug("credentials: " + str(args))
665 self.credentials_received = True
666 if self.success_seen:
667 self.loop.quit()
668
669 def start_pbc(self, *args):
670 logger.debug("start_pbc")
671 self.wps.Start({'Role': 'enrollee', 'Type': 'pbc'})
672 return False
673
674 def success(self):
675 return self.success_seen and self.credentials_received
676
677 with TestDbusWps(bus, wps) as t:
678 if not t.success():
679 raise Exception("Failure in D-Bus operations")
680
681 dev[0].wait_connected(timeout=10)
682 dev[0].request("DISCONNECT")
683 hapd.disable()
684 dev[0].flush_scan_cache()
685
3a59cda1
JM
686def test_dbus_wps_pbc_overlap(dev, apdev):
687 """D-Bus WPS/PBC operation and signal for PBC overlap"""
fab49f61 688 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
3a59cda1
JM
689 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
690
691 hapd = start_ap(apdev[0])
692 hapd2 = start_ap(apdev[1], ssid="test-wps2",
693 ap_uuid="27ea801a-9e5c-4e73-bd82-f89cbcd10d7f")
694 hapd.request("WPS_PBC")
695 hapd2.request("WPS_PBC")
696 bssid = apdev[0]['bssid']
697 dev[0].scan_for_bss(bssid, freq="2412")
698 bssid2 = apdev[1]['bssid']
699 dev[0].scan_for_bss(bssid2, freq="2412")
700
701 class TestDbusWps(TestDbus):
702 def __init__(self, bus, wps):
703 TestDbus.__init__(self, bus)
704 self.overlap_seen = False
705 self.wps = wps
706
707 def __enter__(self):
708 gobject.timeout_add(1, self.start_pbc)
709 gobject.timeout_add(15000, self.timeout)
710 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
711 self.loop.run()
712 return self
713
714 def wpsEvent(self, name, args):
715 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
716 if name == "pbc-overlap":
717 self.overlap_seen = True
718 self.loop.quit()
719
720 def start_pbc(self, *args):
721 logger.debug("start_pbc")
722 self.wps.Start({'Role': 'enrollee', 'Type': 'pbc'})
723 return False
724
725 def success(self):
726 return self.overlap_seen
727
728 with TestDbusWps(bus, wps) as t:
729 if not t.success():
730 raise Exception("Failure in D-Bus operations")
731
732 dev[0].request("WPS_CANCEL")
733 dev[0].request("DISCONNECT")
734 hapd.disable()
735 dev[0].flush_scan_cache()
736
2ec82e67
JM
737def test_dbus_wps_pin(dev, apdev):
738 """D-Bus WPS/PIN operation and signals"""
739 try:
81e787b7 740 _test_dbus_wps_pin(dev, apdev)
2ec82e67
JM
741 finally:
742 dev[0].request("SET wps_cred_processing 0")
743
744def _test_dbus_wps_pin(dev, apdev):
fab49f61 745 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
746 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
747
748 hapd = start_ap(apdev[0])
749 hapd.request("WPS_PIN any 12345670")
750 bssid = apdev[0]['bssid']
751 dev[0].scan_for_bss(bssid, freq="2412")
752 dev[0].request("SET wps_cred_processing 2")
753
754 class TestDbusWps(TestDbus):
755 def __init__(self, bus):
756 TestDbus.__init__(self, bus)
757 self.success_seen = False
758 self.credentials_received = False
759
760 def __enter__(self):
761 gobject.timeout_add(1, self.start_pin)
762 gobject.timeout_add(15000, self.timeout)
763 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
764 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
765 "Credentials")
766 self.loop.run()
767 return self
768
769 def wpsEvent(self, name, args):
770 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
771 if name == "success":
772 self.success_seen = True
773 if self.credentials_received:
774 self.loop.quit()
775
776 def credentials(self, args):
777 logger.debug("credentials: " + str(args))
778 self.credentials_received = True
779 if self.success_seen:
780 self.loop.quit()
781
782 def start_pin(self, *args):
783 logger.debug("start_pin")
fab49f61 784 bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':', '').encode()))
2ec82e67
JM
785 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
786 'Bssid': bssid_ay})
787 return False
788
789 def success(self):
790 return self.success_seen and self.credentials_received
791
792 with TestDbusWps(bus) as t:
793 if not t.success():
794 raise Exception("Failure in D-Bus operations")
795
796 dev[0].wait_connected(timeout=10)
797
798def test_dbus_wps_pin2(dev, apdev):
799 """D-Bus WPS/PIN operation and signals (PIN from wpa_supplicant)"""
800 try:
81e787b7 801 _test_dbus_wps_pin2(dev, apdev)
2ec82e67
JM
802 finally:
803 dev[0].request("SET wps_cred_processing 0")
804
805def _test_dbus_wps_pin2(dev, apdev):
fab49f61 806 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
807 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
808
809 hapd = start_ap(apdev[0])
810 bssid = apdev[0]['bssid']
811 dev[0].scan_for_bss(bssid, freq="2412")
812 dev[0].request("SET wps_cred_processing 2")
813
814 class TestDbusWps(TestDbus):
815 def __init__(self, bus):
816 TestDbus.__init__(self, bus)
817 self.success_seen = False
818 self.failed = False
819
820 def __enter__(self):
821 gobject.timeout_add(1, self.start_pin)
822 gobject.timeout_add(15000, self.timeout)
823 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
824 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
825 "Credentials")
826 self.loop.run()
827 return self
828
829 def wpsEvent(self, name, args):
830 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
831 if name == "success":
832 self.success_seen = True
833 if self.credentials_received:
834 self.loop.quit()
835
836 def credentials(self, args):
837 logger.debug("credentials: " + str(args))
838 self.credentials_received = True
839 if self.success_seen:
840 self.loop.quit()
841
842 def start_pin(self, *args):
843 logger.debug("start_pin")
fab49f61 844 bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':', '').encode()))
2ec82e67
JM
845 res = wps.Start({'Role': 'enrollee', 'Type': 'pin',
846 'Bssid': bssid_ay})
847 pin = res['Pin']
848 h = hostapd.Hostapd(apdev[0]['ifname'])
849 h.request("WPS_PIN any " + pin)
850 return False
851
852 def success(self):
853 return self.success_seen and self.credentials_received
854
855 with TestDbusWps(bus) as t:
856 if not t.success():
857 raise Exception("Failure in D-Bus operations")
858
859 dev[0].wait_connected(timeout=10)
860
861def test_dbus_wps_pin_m2d(dev, apdev):
862 """D-Bus WPS/PIN operation and signals with M2D"""
863 try:
81e787b7 864 _test_dbus_wps_pin_m2d(dev, apdev)
2ec82e67
JM
865 finally:
866 dev[0].request("SET wps_cred_processing 0")
867
868def _test_dbus_wps_pin_m2d(dev, apdev):
fab49f61 869 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
870 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
871
872 hapd = start_ap(apdev[0])
873 bssid = apdev[0]['bssid']
874 dev[0].scan_for_bss(bssid, freq="2412")
875 dev[0].request("SET wps_cred_processing 2")
876
877 class TestDbusWps(TestDbus):
878 def __init__(self, bus):
879 TestDbus.__init__(self, bus)
880 self.success_seen = False
881 self.credentials_received = False
882
883 def __enter__(self):
884 gobject.timeout_add(1, self.start_pin)
885 gobject.timeout_add(15000, self.timeout)
886 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
887 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
888 "Credentials")
889 self.loop.run()
890 return self
891
892 def wpsEvent(self, name, args):
893 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
894 if name == "success":
895 self.success_seen = True
896 if self.credentials_received:
897 self.loop.quit()
898 elif name == "m2d":
899 h = hostapd.Hostapd(apdev[0]['ifname'])
900 h.request("WPS_PIN any 12345670")
901
902 def credentials(self, args):
903 logger.debug("credentials: " + str(args))
904 self.credentials_received = True
905 if self.success_seen:
906 self.loop.quit()
907
908 def start_pin(self, *args):
909 logger.debug("start_pin")
fab49f61 910 bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':', '').encode()))
2ec82e67
JM
911 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
912 'Bssid': bssid_ay})
913 return False
914
915 def success(self):
916 return self.success_seen and self.credentials_received
917
918 with TestDbusWps(bus) as t:
919 if not t.success():
920 raise Exception("Failure in D-Bus operations")
921
922 dev[0].wait_connected(timeout=10)
923
924def test_dbus_wps_reg(dev, apdev):
925 """D-Bus WPS/Registrar operation and signals"""
926 try:
81e787b7 927 _test_dbus_wps_reg(dev, apdev)
2ec82e67
JM
928 finally:
929 dev[0].request("SET wps_cred_processing 0")
930
931def _test_dbus_wps_reg(dev, apdev):
fab49f61 932 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
933 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
934
935 hapd = start_ap(apdev[0])
936 hapd.request("WPS_PIN any 12345670")
937 bssid = apdev[0]['bssid']
938 dev[0].scan_for_bss(bssid, freq="2412")
939 dev[0].request("SET wps_cred_processing 2")
940
941 class TestDbusWps(TestDbus):
942 def __init__(self, bus):
943 TestDbus.__init__(self, bus)
944 self.credentials_received = False
945
946 def __enter__(self):
947 gobject.timeout_add(100, self.start_reg)
948 gobject.timeout_add(15000, self.timeout)
949 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
950 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
951 "Credentials")
952 self.loop.run()
953 return self
954
955 def wpsEvent(self, name, args):
956 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
957
958 def credentials(self, args):
959 logger.debug("credentials: " + str(args))
960 self.credentials_received = True
961 self.loop.quit()
962
963 def start_reg(self, *args):
964 logger.debug("start_reg")
fab49f61 965 bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':', '').encode()))
2ec82e67
JM
966 wps.Start({'Role': 'registrar', 'Type': 'pin',
967 'Pin': '12345670', 'Bssid': bssid_ay})
968 return False
969
970 def success(self):
971 return self.credentials_received
972
973 with TestDbusWps(bus) as t:
974 if not t.success():
975 raise Exception("Failure in D-Bus operations")
976
977 dev[0].wait_connected(timeout=10)
978
2a8e3f35
JM
979def test_dbus_wps_cancel(dev, apdev):
980 """D-Bus WPS Cancel operation"""
fab49f61 981 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2a8e3f35
JM
982 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
983
984 hapd = start_ap(apdev[0])
985 bssid = apdev[0]['bssid']
986
987 wps.Cancel()
988 dev[0].scan_for_bss(bssid, freq="2412")
fab49f61 989 bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':', '').encode()))
2a8e3f35
JM
990 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
991 'Bssid': bssid_ay})
992 wps.Cancel()
993 dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 1)
994
2ec82e67
JM
995def test_dbus_scan_invalid(dev, apdev):
996 """D-Bus invalid scan method"""
fab49f61 997 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
998 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
999
fab49f61
JM
1000 tests = [({}, "InvalidArgs"),
1001 ({'Type': 123}, "InvalidArgs"),
1002 ({'Type': 'foo'}, "InvalidArgs"),
1003 ({'Type': 'active', 'Foo': 'bar'}, "InvalidArgs"),
1004 ({'Type': 'active', 'SSIDs': 'foo'}, "InvalidArgs"),
1005 ({'Type': 'active', 'SSIDs': ['foo']}, "InvalidArgs"),
1006 ({'Type': 'active',
1007 'SSIDs': [dbus.ByteArray(b"1"), dbus.ByteArray(b"2"),
1008 dbus.ByteArray(b"3"), dbus.ByteArray(b"4"),
1009 dbus.ByteArray(b"5"), dbus.ByteArray(b"6"),
1010 dbus.ByteArray(b"7"), dbus.ByteArray(b"8"),
1011 dbus.ByteArray(b"9"), dbus.ByteArray(b"10"),
1012 dbus.ByteArray(b"11"), dbus.ByteArray(b"12"),
1013 dbus.ByteArray(b"13"), dbus.ByteArray(b"14"),
1014 dbus.ByteArray(b"15"), dbus.ByteArray(b"16"),
1015 dbus.ByteArray(b"17")]},
1016 "InvalidArgs"),
1017 ({'Type': 'active',
1018 'SSIDs': [dbus.ByteArray(b"1234567890abcdef1234567890abcdef1")]},
1019 "InvalidArgs"),
1020 ({'Type': 'active', 'IEs': 'foo'}, "InvalidArgs"),
1021 ({'Type': 'active', 'IEs': ['foo']}, "InvalidArgs"),
1022 ({'Type': 'active', 'Channels': 2412}, "InvalidArgs"),
1023 ({'Type': 'active', 'Channels': [2412]}, "InvalidArgs"),
1024 ({'Type': 'active',
1025 'Channels': [(dbus.Int32(2412), dbus.UInt32(20))]},
1026 "InvalidArgs"),
1027 ({'Type': 'active',
1028 'Channels': [(dbus.UInt32(2412), dbus.Int32(20))]},
1029 "InvalidArgs"),
1030 ({'Type': 'active', 'AllowRoam': "yes"}, "InvalidArgs"),
1031 ({'Type': 'passive', 'IEs': [dbus.ByteArray(b"\xdd\x00")]},
1032 "InvalidArgs"),
1033 ({'Type': 'passive', 'SSIDs': [dbus.ByteArray(b"foo")]},
1034 "InvalidArgs")]
1035 for (t, err) in tests:
2ec82e67
JM
1036 try:
1037 iface.Scan(t)
1038 raise Exception("Invalid Scan() arguments accepted: " + str(t))
bab493b9 1039 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1040 if err not in str(e):
1041 raise Exception("Unexpected error message for invalid Scan(%s): %s" % (str(t), str(e)))
1042
0e126c6d
JM
1043def test_dbus_scan_oom(dev, apdev):
1044 """D-Bus scan method and OOM"""
fab49f61 1045 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
0e126c6d
JM
1046 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1047
1048 with alloc_fail_dbus(dev[0], 1,
1049 "wpa_scan_clone_params;wpas_dbus_handler_scan",
1050 "Scan", expected="ScanError: Scan request rejected"):
fab49f61
JM
1051 iface.Scan({'Type': 'passive',
1052 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
0e126c6d
JM
1053
1054 with alloc_fail_dbus(dev[0], 1,
1055 "=wpas_dbus_get_scan_channels;wpas_dbus_handler_scan",
1056 "Scan"):
fab49f61
JM
1057 iface.Scan({'Type': 'passive',
1058 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
0e126c6d
JM
1059
1060 with alloc_fail_dbus(dev[0], 1,
1061 "=wpas_dbus_get_scan_ies;wpas_dbus_handler_scan",
1062 "Scan"):
fab49f61
JM
1063 iface.Scan({'Type': 'active',
1064 'IEs': [dbus.ByteArray(b"\xdd\x00")],
1065 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
0e126c6d
JM
1066
1067 with alloc_fail_dbus(dev[0], 1,
1068 "=wpas_dbus_get_scan_ssids;wpas_dbus_handler_scan",
1069 "Scan"):
fab49f61
JM
1070 iface.Scan({'Type': 'active',
1071 'SSIDs': [dbus.ByteArray(b"open"),
1072 dbus.ByteArray()],
1073 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
0e126c6d 1074
2ec82e67
JM
1075def test_dbus_scan(dev, apdev):
1076 """D-Bus scan and related signals"""
fab49f61 1077 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1078 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1079
fab49f61 1080 hapd = hostapd.add_ap(apdev[0], {"ssid": "open"})
2ec82e67
JM
1081
1082 class TestDbusScan(TestDbus):
1083 def __init__(self, bus):
1084 TestDbus.__init__(self, bus)
1085 self.scan_completed = 0
1086 self.bss_added = False
1d0a917f 1087 self.fail_reason = None
2ec82e67
JM
1088
1089 def __enter__(self):
1090 gobject.timeout_add(1, self.run_scan)
1091 gobject.timeout_add(15000, self.timeout)
1092 self.add_signal(self.scanDone, WPAS_DBUS_IFACE, "ScanDone")
1093 self.add_signal(self.bssAdded, WPAS_DBUS_IFACE, "BSSAdded")
1094 self.add_signal(self.bssRemoved, WPAS_DBUS_IFACE, "BSSRemoved")
1095 self.loop.run()
1096 return self
1097
1098 def scanDone(self, success):
1099 logger.debug("scanDone: success=%s" % success)
1100 self.scan_completed += 1
1101 if self.scan_completed == 1:
1102 iface.Scan({'Type': 'passive',
1103 'AllowRoam': True,
1104 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1105 elif self.scan_completed == 2:
1106 iface.Scan({'Type': 'passive',
1107 'AllowRoam': False})
1108 elif self.bss_added and self.scan_completed == 3:
1109 self.loop.quit()
1110
1111 def bssAdded(self, bss, properties):
1112 logger.debug("bssAdded: %s" % bss)
1113 logger.debug(str(properties))
1d0a917f
JM
1114 if 'WPS' in properties:
1115 if 'Type' in properties['WPS']:
1116 self.fail_reason = "Unexpected WPS dictionary entry in non-WPS BSS"
1117 self.loop.quit()
2ec82e67
JM
1118 self.bss_added = True
1119 if self.scan_completed == 3:
1120 self.loop.quit()
1121
1122 def bssRemoved(self, bss):
1123 logger.debug("bssRemoved: %s" % bss)
1124
1125 def run_scan(self, *args):
1126 logger.debug("run_scan")
1127 iface.Scan({'Type': 'active',
fab49f61
JM
1128 'SSIDs': [dbus.ByteArray(b"open"),
1129 dbus.ByteArray()],
1130 'IEs': [dbus.ByteArray(b"\xdd\x00"),
1131 dbus.ByteArray()],
2ec82e67
JM
1132 'AllowRoam': False,
1133 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1134 return False
1135
1136 def success(self):
1137 return self.scan_completed == 3 and self.bss_added
1138
1139 with TestDbusScan(bus) as t:
1d0a917f
JM
1140 if t.fail_reason:
1141 raise Exception(t.fail_reason)
2ec82e67
JM
1142 if not t.success():
1143 raise Exception("Expected signals not seen")
1144
1145 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
1146 dbus_interface=dbus.PROPERTIES_IFACE)
1147 if len(res) < 1:
1148 raise Exception("Scan result not in BSSs property")
1149 iface.FlushBSS(0)
1150 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
1151 dbus_interface=dbus.PROPERTIES_IFACE)
1152 if len(res) != 0:
1153 raise Exception("FlushBSS() did not remove scan results from BSSs property")
1154 iface.FlushBSS(1)
1155
1156def test_dbus_scan_busy(dev, apdev):
1157 """D-Bus scan trigger rejection when busy with previous scan"""
fab49f61 1158 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1159 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1160
1161 if "OK" not in dev[0].request("SCAN freq=2412-2462"):
1162 raise Exception("Failed to start scan")
1163 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-STARTED"], 15)
1164 if ev is None:
1165 raise Exception("Scan start timed out")
1166
1167 try:
1168 iface.Scan({'Type': 'active', 'AllowRoam': False})
1169 raise Exception("Scan() accepted when busy")
bab493b9 1170 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1171 if "ScanError: Scan request reject" not in str(e):
1172 raise Exception("Unexpected error message: " + str(e))
1173
1174 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
1175 if ev is None:
1176 raise Exception("Scan timed out")
1177
0238e8ba
JM
1178def test_dbus_scan_abort(dev, apdev):
1179 """D-Bus scan trigger and abort"""
fab49f61 1180 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
0238e8ba
JM
1181 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1182
1183 iface.Scan({'Type': 'active', 'AllowRoam': False})
1184 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-STARTED"], 15)
1185 if ev is None:
1186 raise Exception("Scan start timed out")
1187
1188 iface.AbortScan()
f41f04d0
JM
1189 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
1190 if ev is None:
1191 raise Exception("Scan abort result timed out")
1192 dev[0].dump_monitor()
0238e8ba
JM
1193 iface.Scan({'Type': 'active', 'AllowRoam': False})
1194 iface.AbortScan()
1195
1196 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
1197 if ev is None:
1198 raise Exception("Scan timed out")
1199
2ec82e67
JM
1200def test_dbus_connect(dev, apdev):
1201 """D-Bus AddNetwork and connect"""
fab49f61 1202 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1203 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1204
1205 ssid = "test-wpa2-psk"
1206 passphrase = 'qwertyuiop'
1207 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
8b8a1864 1208 hapd = hostapd.add_ap(apdev[0], params)
2ec82e67
JM
1209
1210 class TestDbusConnect(TestDbus):
1211 def __init__(self, bus):
1212 TestDbus.__init__(self, bus)
1213 self.network_added = False
1214 self.network_selected = False
1215 self.network_removed = False
1216 self.state = 0
1217
1218 def __enter__(self):
1219 gobject.timeout_add(1, self.run_connect)
1220 gobject.timeout_add(15000, self.timeout)
1221 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
1222 self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE,
1223 "NetworkRemoved")
1224 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
1225 "NetworkSelected")
1226 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1227 "PropertiesChanged")
1228 self.loop.run()
1229 return self
1230
1231 def networkAdded(self, network, properties):
1232 logger.debug("networkAdded: %s" % str(network))
1233 logger.debug(str(properties))
1234 self.network_added = True
1235
1236 def networkRemoved(self, network):
1237 logger.debug("networkRemoved: %s" % str(network))
1238 self.network_removed = True
1239
1240 def networkSelected(self, network):
1241 logger.debug("networkSelected: %s" % str(network))
1242 self.network_selected = True
1243
1244 def propertiesChanged(self, properties):
1245 logger.debug("propertiesChanged: %s" % str(properties))
1246 if 'State' in properties and properties['State'] == "completed":
1247 if self.state == 0:
1248 self.state = 1
1249 iface.Disconnect()
1250 elif self.state == 2:
1251 self.state = 3
1252 iface.Disconnect()
1253 elif self.state == 4:
1254 self.state = 5
1255 iface.Reattach()
1256 elif self.state == 5:
1257 self.state = 6
13b34972
JM
1258 iface.Disconnect()
1259 elif self.state == 7:
1260 self.state = 8
2ec82e67
JM
1261 res = iface.SignalPoll()
1262 logger.debug("SignalPoll: " + str(res))
1263 if 'frequency' not in res or res['frequency'] != 2412:
1264 self.state = -1
1265 logger.info("Unexpected SignalPoll result")
1266 iface.RemoveNetwork(self.netw)
1267 if 'State' in properties and properties['State'] == "disconnected":
1268 if self.state == 1:
1269 self.state = 2
1270 iface.SelectNetwork(self.netw)
1271 elif self.state == 3:
1272 self.state = 4
1273 iface.Reassociate()
1274 elif self.state == 6:
1275 self.state = 7
13b34972
JM
1276 iface.Reconnect()
1277 elif self.state == 8:
1278 self.state = 9
2ec82e67
JM
1279 self.loop.quit()
1280
1281 def run_connect(self, *args):
1282 logger.debug("run_connect")
fab49f61
JM
1283 args = dbus.Dictionary({'ssid': ssid,
1284 'key_mgmt': 'WPA-PSK',
1285 'psk': passphrase,
1286 'scan_freq': 2412},
2ec82e67
JM
1287 signature='sv')
1288 self.netw = iface.AddNetwork(args)
1289 iface.SelectNetwork(self.netw)
1290 return False
1291
1292 def success(self):
1293 if not self.network_added or \
1294 not self.network_removed or \
1295 not self.network_selected:
1296 return False
13b34972 1297 return self.state == 9
2ec82e67
JM
1298
1299 with TestDbusConnect(bus) as t:
1300 if not t.success():
1301 raise Exception("Expected signals not seen")
1302
53f4ed68
JM
1303def test_dbus_connect_psk_mem(dev, apdev):
1304 """D-Bus AddNetwork and connect with memory-only PSK"""
fab49f61 1305 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
53f4ed68
JM
1306 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1307
1308 ssid = "test-wpa2-psk"
1309 passphrase = 'qwertyuiop'
1310 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
8b8a1864 1311 hapd = hostapd.add_ap(apdev[0], params)
53f4ed68
JM
1312
1313 class TestDbusConnect(TestDbus):
1314 def __init__(self, bus):
1315 TestDbus.__init__(self, bus)
1316 self.connected = False
1317
1318 def __enter__(self):
1319 gobject.timeout_add(1, self.run_connect)
1320 gobject.timeout_add(15000, self.timeout)
1321 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1322 "PropertiesChanged")
1323 self.add_signal(self.networkRequest, WPAS_DBUS_IFACE,
1324 "NetworkRequest")
1325 self.loop.run()
1326 return self
1327
1328 def propertiesChanged(self, properties):
1329 logger.debug("propertiesChanged: %s" % str(properties))
1330 if 'State' in properties and properties['State'] == "completed":
1331 self.connected = True
1332 self.loop.quit()
1333
1334 def networkRequest(self, path, field, txt):
1335 logger.debug("networkRequest: %s %s %s" % (path, field, txt))
1336 if field == "PSK_PASSPHRASE":
1337 iface.NetworkReply(path, field, '"' + passphrase + '"')
1338
1339 def run_connect(self, *args):
1340 logger.debug("run_connect")
fab49f61
JM
1341 args = dbus.Dictionary({'ssid': ssid,
1342 'key_mgmt': 'WPA-PSK',
1343 'mem_only_psk': 1,
1344 'scan_freq': 2412},
53f4ed68
JM
1345 signature='sv')
1346 self.netw = iface.AddNetwork(args)
1347 iface.SelectNetwork(self.netw)
1348 return False
1349
1350 def success(self):
1351 return self.connected
1352
1353 with TestDbusConnect(bus) as t:
1354 if not t.success():
1355 raise Exception("Expected signals not seen")
1356
0e126c6d
JM
1357def test_dbus_connect_oom(dev, apdev):
1358 """D-Bus AddNetwork and connect when out-of-memory"""
fab49f61 1359 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
0e126c6d
JM
1360 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1361
1362 if "OK" not in dev[0].request("TEST_ALLOC_FAIL 0:"):
1363 raise HwsimSkip("TEST_ALLOC_FAIL not supported in the build")
1364
1365 ssid = "test-wpa2-psk"
1366 passphrase = 'qwertyuiop'
1367 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
8b8a1864 1368 hapd = hostapd.add_ap(apdev[0], params)
0e126c6d
JM
1369
1370 class TestDbusConnect(TestDbus):
1371 def __init__(self, bus):
1372 TestDbus.__init__(self, bus)
1373 self.network_added = False
1374 self.network_selected = False
1375 self.network_removed = False
1376 self.state = 0
1377
1378 def __enter__(self):
1379 gobject.timeout_add(1, self.run_connect)
1380 gobject.timeout_add(1500, self.timeout)
1381 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
1382 self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE,
1383 "NetworkRemoved")
1384 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
1385 "NetworkSelected")
1386 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1387 "PropertiesChanged")
1388 self.loop.run()
1389 return self
1390
1391 def networkAdded(self, network, properties):
1392 logger.debug("networkAdded: %s" % str(network))
1393 logger.debug(str(properties))
1394 self.network_added = True
1395
1396 def networkRemoved(self, network):
1397 logger.debug("networkRemoved: %s" % str(network))
1398 self.network_removed = True
1399
1400 def networkSelected(self, network):
1401 logger.debug("networkSelected: %s" % str(network))
1402 self.network_selected = True
1403
1404 def propertiesChanged(self, properties):
1405 logger.debug("propertiesChanged: %s" % str(properties))
1406 if 'State' in properties and properties['State'] == "completed":
1407 if self.state == 0:
1408 self.state = 1
1409 iface.Disconnect()
1410 elif self.state == 2:
1411 self.state = 3
1412 iface.Disconnect()
1413 elif self.state == 4:
1414 self.state = 5
1415 iface.Reattach()
1416 elif self.state == 5:
1417 self.state = 6
1418 res = iface.SignalPoll()
1419 logger.debug("SignalPoll: " + str(res))
1420 if 'frequency' not in res or res['frequency'] != 2412:
1421 self.state = -1
1422 logger.info("Unexpected SignalPoll result")
1423 iface.RemoveNetwork(self.netw)
1424 if 'State' in properties and properties['State'] == "disconnected":
1425 if self.state == 1:
1426 self.state = 2
1427 iface.SelectNetwork(self.netw)
1428 elif self.state == 3:
1429 self.state = 4
1430 iface.Reassociate()
1431 elif self.state == 6:
1432 self.state = 7
1433 self.loop.quit()
1434
1435 def run_connect(self, *args):
1436 logger.debug("run_connect")
fab49f61
JM
1437 args = dbus.Dictionary({'ssid': ssid,
1438 'key_mgmt': 'WPA-PSK',
1439 'psk': passphrase,
1440 'scan_freq': 2412},
0e126c6d
JM
1441 signature='sv')
1442 try:
1443 self.netw = iface.AddNetwork(args)
bab493b9 1444 except Exception as e:
0e126c6d
JM
1445 logger.info("Exception on AddNetwork: " + str(e))
1446 self.loop.quit()
1447 return False
1448 try:
1449 iface.SelectNetwork(self.netw)
bab493b9 1450 except Exception as e:
0e126c6d
JM
1451 logger.info("Exception on SelectNetwork: " + str(e))
1452 self.loop.quit()
1453
1454 return False
1455
1456 def success(self):
1457 if not self.network_added or \
1458 not self.network_removed or \
1459 not self.network_selected:
1460 return False
1461 return self.state == 7
1462
1463 count = 0
1464 for i in range(1, 1000):
1465 for j in range(3):
1466 dev[j].dump_monitor()
1467 dev[0].request("TEST_ALLOC_FAIL %d:main" % i)
1468 try:
1469 with TestDbusConnect(bus) as t:
1470 if not t.success():
1471 logger.info("Iteration %d - Expected signals not seen" % i)
1472 else:
1473 logger.info("Iteration %d - success" % i)
1474
1475 state = dev[0].request('GET_ALLOC_FAIL')
1476 logger.info("GET_ALLOC_FAIL: " + state)
1477 dev[0].dump_monitor()
1478 dev[0].request("TEST_ALLOC_FAIL 0:")
1479 if i < 3:
1480 raise Exception("Connection succeeded during out-of-memory")
1481 if not state.startswith('0:'):
1482 count += 1
1483 if count == 5:
1484 break
1485 except:
1486 pass
2ec82e67 1487
89a79ca2
JM
1488 # Force regulatory update to re-fetch hw capabilities for the following
1489 # test cases.
1490 try:
1491 dev[0].dump_monitor()
1492 subprocess.call(['iw', 'reg', 'set', 'US'])
1493 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1494 finally:
1495 dev[0].dump_monitor()
1496 subprocess.call(['iw', 'reg', 'set', '00'])
1497 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1498
2ec82e67
JM
1499def test_dbus_while_not_connected(dev, apdev):
1500 """D-Bus invalid operations while not connected"""
fab49f61 1501 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1502 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1503
1504 try:
1505 iface.Disconnect()
1506 raise Exception("Disconnect() accepted when not connected")
bab493b9 1507 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1508 if "NotConnected" not in str(e):
1509 raise Exception("Unexpected error message for invalid Disconnect: " + str(e))
1510
1511 try:
1512 iface.Reattach()
1513 raise Exception("Reattach() accepted when not connected")
bab493b9 1514 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1515 if "NotConnected" not in str(e):
1516 raise Exception("Unexpected error message for invalid Reattach: " + str(e))
1517
1518def test_dbus_connect_eap(dev, apdev):
1519 """D-Bus AddNetwork and connect to EAP network"""
089e7ca3 1520 check_altsubject_match_support(dev[0])
fab49f61 1521 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1522 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1523
1524 ssid = "ieee8021x-open"
1525 params = hostapd.radius_params()
1526 params["ssid"] = ssid
1527 params["ieee8021x"] = "1"
8b8a1864 1528 hapd = hostapd.add_ap(apdev[0], params)
2ec82e67
JM
1529
1530 class TestDbusConnect(TestDbus):
1531 def __init__(self, bus):
1532 TestDbus.__init__(self, bus)
1533 self.certification_received = False
1534 self.eap_status = False
1535 self.state = 0
1536
1537 def __enter__(self):
1538 gobject.timeout_add(1, self.run_connect)
1539 gobject.timeout_add(15000, self.timeout)
1540 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1541 "PropertiesChanged")
1542 self.add_signal(self.certification, WPAS_DBUS_IFACE,
2099fed4 1543 "Certification", byte_arrays=True)
2ec82e67
JM
1544 self.add_signal(self.networkRequest, WPAS_DBUS_IFACE,
1545 "NetworkRequest")
1546 self.add_signal(self.eap, WPAS_DBUS_IFACE, "EAP")
1547 self.loop.run()
1548 return self
1549
1550 def propertiesChanged(self, properties):
1551 logger.debug("propertiesChanged: %s" % str(properties))
1552 if 'State' in properties and properties['State'] == "completed":
1553 if self.state == 0:
1554 self.state = 1
1555 iface.EAPLogoff()
2099fed4
JM
1556 logger.info("Set dNSName constraint")
1557 net_obj = bus.get_object(WPAS_DBUS_SERVICE, self.netw)
fab49f61
JM
1558 args = dbus.Dictionary({'altsubject_match':
1559 self.server_dnsname},
2099fed4
JM
1560 signature='sv')
1561 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1562 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67
JM
1563 elif self.state == 2:
1564 self.state = 3
2099fed4
JM
1565 iface.Disconnect()
1566 logger.info("Set non-matching dNSName constraint")
1567 net_obj = bus.get_object(WPAS_DBUS_SERVICE, self.netw)
fab49f61
JM
1568 args = dbus.Dictionary({'altsubject_match':
1569 self.server_dnsname + "FOO"},
2099fed4
JM
1570 signature='sv')
1571 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1572 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67
JM
1573 if 'State' in properties and properties['State'] == "disconnected":
1574 if self.state == 1:
1575 self.state = 2
1576 iface.EAPLogon()
1577 iface.SelectNetwork(self.netw)
2099fed4
JM
1578 if self.state == 3:
1579 self.state = 4
1580 iface.SelectNetwork(self.netw)
2ec82e67
JM
1581
1582 def certification(self, args):
1583 logger.debug("certification: %s" % str(args))
1584 self.certification_received = True
2099fed4
JM
1585 if args['depth'] == 0:
1586 # The test server certificate is supposed to have dNSName
1587 if len(args['altsubject']) < 1:
1588 raise Exception("Missing dNSName")
1589 dnsname = args['altsubject'][0]
1590 if not dnsname.startswith("DNS:"):
1591 raise Exception("Expected dNSName not found: " + dnsname)
1592 logger.info("altsubject: " + dnsname)
1593 self.server_dnsname = dnsname
2ec82e67
JM
1594
1595 def eap(self, status, parameter):
1596 logger.debug("EAP: status=%s parameter=%s" % (status, parameter))
1597 if status == 'completion' and parameter == 'success':
1598 self.eap_status = True
2099fed4
JM
1599 if self.state == 4 and status == 'remote certificate verification' and parameter == 'AltSubject mismatch':
1600 self.state = 5
1601 self.loop.quit()
2ec82e67
JM
1602
1603 def networkRequest(self, path, field, txt):
1604 logger.debug("networkRequest: %s %s %s" % (path, field, txt))
1605 if field == "PASSWORD":
1606 iface.NetworkReply(path, field, "password")
1607
1608 def run_connect(self, *args):
1609 logger.debug("run_connect")
fab49f61
JM
1610 args = dbus.Dictionary({'ssid': ssid,
1611 'key_mgmt': 'IEEE8021X',
1612 'eapol_flags': 0,
1613 'eap': 'TTLS',
1614 'anonymous_identity': 'ttls',
1615 'identity': 'pap user',
1616 'ca_cert': 'auth_serv/ca.pem',
1617 'phase2': 'auth=PAP',
1618 'scan_freq': 2412},
2ec82e67
JM
1619 signature='sv')
1620 self.netw = iface.AddNetwork(args)
1621 iface.SelectNetwork(self.netw)
1622 return False
1623
1624 def success(self):
1625 if not self.eap_status or not self.certification_received:
1626 return False
2099fed4 1627 return self.state == 5
2ec82e67
JM
1628
1629 with TestDbusConnect(bus) as t:
1630 if not t.success():
1631 raise Exception("Expected signals not seen")
1632
1633def test_dbus_network(dev, apdev):
1634 """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
fab49f61 1635 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1636 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1637
fab49f61
JM
1638 args = dbus.Dictionary({'ssid': "foo",
1639 'key_mgmt': 'WPA-PSK',
1640 'psk': "12345678",
1641 'identity': dbus.ByteArray([1, 2]),
1642 'priority': dbus.Int32(0),
1643 'scan_freq': dbus.UInt32(2412)},
2ec82e67
JM
1644 signature='sv')
1645 netw = iface.AddNetwork(args)
5a233fbd
JM
1646 id = int(dev[0].list_networks()[0]['id'])
1647 val = dev[0].get_network(id, "scan_freq")
1648 if val != "2412":
1649 raise Exception("Invalid scan_freq value: " + str(val))
1650 iface.RemoveNetwork(netw)
1651
fab49f61
JM
1652 args = dbus.Dictionary({'ssid': "foo",
1653 'key_mgmt': 'NONE',
1654 'scan_freq': "2412 2432",
1655 'freq_list': "2412 2417 2432"},
5a233fbd
JM
1656 signature='sv')
1657 netw = iface.AddNetwork(args)
1658 id = int(dev[0].list_networks()[0]['id'])
1659 val = dev[0].get_network(id, "scan_freq")
1660 if val != "2412 2432":
1661 raise Exception("Invalid scan_freq value (2): " + str(val))
1662 val = dev[0].get_network(id, "freq_list")
1663 if val != "2412 2417 2432":
1664 raise Exception("Invalid freq_list value: " + str(val))
2ec82e67
JM
1665 iface.RemoveNetwork(netw)
1666 try:
1667 iface.RemoveNetwork(netw)
1668 raise Exception("Invalid RemoveNetwork() accepted")
bab493b9 1669 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1670 if "NetworkUnknown" not in str(e):
1671 raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e))
1672 try:
1673 iface.SelectNetwork(netw)
1674 raise Exception("Invalid SelectNetwork() accepted")
bab493b9 1675 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1676 if "NetworkUnknown" not in str(e):
1677 raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e))
1678
fab49f61
JM
1679 args = dbus.Dictionary({'ssid': "foo1", 'key_mgmt': 'NONE',
1680 'identity': "testuser", 'scan_freq': '2412'},
2ec82e67
JM
1681 signature='sv')
1682 netw1 = iface.AddNetwork(args)
fab49f61 1683 args = dbus.Dictionary({'ssid': "foo2", 'key_mgmt': 'NONE'},
2ec82e67
JM
1684 signature='sv')
1685 netw2 = iface.AddNetwork(args)
1686 res = if_obj.Get(WPAS_DBUS_IFACE, "Networks",
1687 dbus_interface=dbus.PROPERTIES_IFACE)
1688 if len(res) != 2:
1689 raise Exception("Unexpected number of networks")
1690
1691 net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1)
1692 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1693 dbus_interface=dbus.PROPERTIES_IFACE)
1694 if res != False:
1695 raise Exception("Added network was unexpectedly enabled by default")
1696 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(True),
1697 dbus_interface=dbus.PROPERTIES_IFACE)
1698 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1699 dbus_interface=dbus.PROPERTIES_IFACE)
1700 if res != True:
1701 raise Exception("Set(Enabled,True) did not seem to change property value")
1702 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(False),
1703 dbus_interface=dbus.PROPERTIES_IFACE)
1704 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1705 dbus_interface=dbus.PROPERTIES_IFACE)
1706 if res != False:
1707 raise Exception("Set(Enabled,False) did not seem to change property value")
1708 try:
1709 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.UInt32(1),
1710 dbus_interface=dbus.PROPERTIES_IFACE)
1711 raise Exception("Invalid Set(Enabled,1) accepted")
bab493b9 1712 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1713 if "Error.Failed: wrong property type" not in str(e):
1714 raise Exception("Unexpected error message for invalid Set(Enabled,1): " + str(e))
1715
fab49f61 1716 args = dbus.Dictionary({'ssid': "foo1new"}, signature='sv')
2ec82e67
JM
1717 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1718 dbus_interface=dbus.PROPERTIES_IFACE)
1719 res = net_obj.Get(WPAS_DBUS_NETWORK, "Properties",
1720 dbus_interface=dbus.PROPERTIES_IFACE)
1721 if res['ssid'] != '"foo1new"':
1722 raise Exception("Set(Properties) failed to update ssid")
1723 if res['identity'] != '"testuser"':
1724 raise Exception("Set(Properties) unexpectedly changed unrelated parameter")
1725
1726 iface.RemoveAllNetworks()
1727 res = if_obj.Get(WPAS_DBUS_IFACE, "Networks",
1728 dbus_interface=dbus.PROPERTIES_IFACE)
1729 if len(res) != 0:
1730 raise Exception("Unexpected number of networks")
1731 iface.RemoveAllNetworks()
1732
fab49f61
JM
1733 tests = [dbus.Dictionary({'psk': "1234567"}, signature='sv'),
1734 dbus.Dictionary({'identity': dbus.ByteArray()},
1735 signature='sv'),
1736 dbus.Dictionary({'identity': dbus.Byte(1)}, signature='sv'),
1737 dbus.Dictionary({'identity': ""}, signature='sv')]
2ec82e67
JM
1738 for args in tests:
1739 try:
1740 iface.AddNetwork(args)
1741 raise Exception("Invalid AddNetwork args accepted: " + str(args))
bab493b9 1742 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1743 if "InvalidArgs" not in str(e):
1744 raise Exception("Unexpected error message for invalid AddNetwork: " + str(e))
1745
0e126c6d
JM
1746def test_dbus_network_oom(dev, apdev):
1747 """D-Bus AddNetwork/RemoveNetwork parameters and OOM error cases"""
fab49f61 1748 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
0e126c6d
JM
1749 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1750
fab49f61
JM
1751 args = dbus.Dictionary({'ssid': "foo1", 'key_mgmt': 'NONE',
1752 'identity': "testuser", 'scan_freq': '2412'},
0e126c6d
JM
1753 signature='sv')
1754 netw1 = iface.AddNetwork(args)
1755 net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1)
1756
1757 with alloc_fail_dbus(dev[0], 1,
1758 "wpa_config_get_all;wpas_dbus_getter_network_properties",
1759 "Get"):
1760 net_obj.Get(WPAS_DBUS_NETWORK, "Properties",
1761 dbus_interface=dbus.PROPERTIES_IFACE)
1762
1763 iface.RemoveAllNetworks()
1764
1765 with alloc_fail_dbus(dev[0], 1,
1766 "wpas_dbus_new_decompose_object_path;wpas_dbus_handler_remove_network",
1767 "RemoveNetwork", "InvalidArgs"):
1768 iface.RemoveNetwork(dbus.ObjectPath("/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234"))
1769
1770 with alloc_fail(dev[0], 1, "wpa_dbus_register_object_per_iface;wpas_dbus_register_network"):
fab49f61 1771 args = dbus.Dictionary({'ssid': "foo2", 'key_mgmt': 'NONE'},
0e126c6d
JM
1772 signature='sv')
1773 try:
1774 netw = iface.AddNetwork(args)
1775 # Currently, AddNetwork() succeeds even if os_strdup() for path
1776 # fails, so remove the network if that occurs.
1777 iface.RemoveNetwork(netw)
bab493b9 1778 except dbus.exceptions.DBusException as e:
0e126c6d
JM
1779 pass
1780
1781 for i in range(1, 3):
1782 with alloc_fail(dev[0], i, "=wpas_dbus_register_network"):
1783 try:
1784 netw = iface.AddNetwork(args)
1785 # Currently, AddNetwork() succeeds even if network registration
1786 # fails, so remove the network if that occurs.
1787 iface.RemoveNetwork(netw)
bab493b9 1788 except dbus.exceptions.DBusException as e:
0e126c6d
JM
1789 pass
1790
1791 with alloc_fail_dbus(dev[0], 1,
1792 "=wpa_config_add_network;wpas_dbus_handler_add_network",
1793 "AddNetwork",
1794 "UnknownError: wpa_supplicant could not add a network"):
fab49f61 1795 args = dbus.Dictionary({'ssid': "foo2", 'key_mgmt': 'NONE'},
0e126c6d
JM
1796 signature='sv')
1797 netw = iface.AddNetwork(args)
1798
fab49f61
JM
1799 tests = [(1,
1800 'wpa_dbus_dict_get_entry;set_network_properties;wpas_dbus_handler_add_network',
1801 dbus.Dictionary({'ssid': dbus.ByteArray(b' ')},
1802 signature='sv')),
1803 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1804 dbus.Dictionary({'ssid': 'foo'}, signature='sv')),
1805 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1806 dbus.Dictionary({'eap': 'foo'}, signature='sv')),
1807 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1808 dbus.Dictionary({'priority': dbus.UInt32(1)},
1809 signature='sv')),
1810 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1811 dbus.Dictionary({'priority': dbus.Int32(1)},
1812 signature='sv')),
1813 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1814 dbus.Dictionary({'ssid': dbus.ByteArray(b' ')},
1815 signature='sv'))]
1816 for (count, funcs, args) in tests:
0e126c6d
JM
1817 with alloc_fail_dbus(dev[0], count, funcs, "AddNetwork", "InvalidArgs"):
1818 netw = iface.AddNetwork(args)
1819
1820 if len(if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
1821 dbus_interface=dbus.PROPERTIES_IFACE)) > 0:
1822 raise Exception("Unexpected network block added")
1823 if len(dev[0].list_networks()) > 0:
1824 raise Exception("Unexpected network block visible")
1825
2ec82e67
JM
1826def test_dbus_interface(dev, apdev):
1827 """D-Bus CreateInterface/GetInterface/RemoveInterface parameters and error cases"""
7966674d
JM
1828 try:
1829 _test_dbus_interface(dev, apdev)
1830 finally:
1831 # Need to force P2P channel list update since the 'lo' interface
1832 # with driver=none ends up configuring default dualband channels.
1833 dev[0].request("SET country US")
1834 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1835 if ev is None:
1836 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"],
1837 timeout=1)
1838 dev[0].request("SET country 00")
1839 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1840 if ev is None:
1841 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"],
1842 timeout=1)
1843 subprocess.call(['iw', 'reg', 'set', '00'])
1844
1845def _test_dbus_interface(dev, apdev):
fab49f61 1846 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1847 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
1848
fab49f61 1849 params = dbus.Dictionary({'Ifname': 'lo', 'Driver': 'none'},
2ec82e67
JM
1850 signature='sv')
1851 path = wpas.CreateInterface(params)
1852 logger.debug("New interface path: " + str(path))
1853 path2 = wpas.GetInterface("lo")
1854 if path != path2:
1855 raise Exception("Interface object mismatch")
1856
fab49f61
JM
1857 params = dbus.Dictionary({'Ifname': 'lo',
1858 'Driver': 'none',
1859 'ConfigFile': 'foo',
1860 'BridgeIfname': 'foo',},
2ec82e67
JM
1861 signature='sv')
1862 try:
1863 wpas.CreateInterface(params)
1864 raise Exception("Invalid CreateInterface() accepted")
bab493b9 1865 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1866 if "InterfaceExists" not in str(e):
1867 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1868
1869 wpas.RemoveInterface(path)
1870 try:
1871 wpas.RemoveInterface(path)
1872 raise Exception("Invalid RemoveInterface() accepted")
bab493b9 1873 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1874 if "InterfaceUnknown" not in str(e):
1875 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e))
1876
fab49f61
JM
1877 params = dbus.Dictionary({'Ifname': 'lo', 'Driver': 'none',
1878 'Foo': 123},
2ec82e67
JM
1879 signature='sv')
1880 try:
1881 wpas.CreateInterface(params)
1882 raise Exception("Invalid CreateInterface() accepted")
bab493b9 1883 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1884 if "InvalidArgs" not in str(e):
1885 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1886
fab49f61 1887 params = dbus.Dictionary({'Driver': 'none'}, signature='sv')
2ec82e67
JM
1888 try:
1889 wpas.CreateInterface(params)
1890 raise Exception("Invalid CreateInterface() accepted")
bab493b9 1891 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1892 if "InvalidArgs" not in str(e):
1893 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1894
1895 try:
1896 wpas.GetInterface("lo")
1897 raise Exception("Invalid GetInterface() accepted")
bab493b9 1898 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1899 if "InterfaceUnknown" not in str(e):
1900 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e))
1901
0e126c6d
JM
1902def test_dbus_interface_oom(dev, apdev):
1903 """D-Bus CreateInterface/GetInterface/RemoveInterface OOM error cases"""
fab49f61 1904 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
0e126c6d
JM
1905 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
1906
1907 with alloc_fail_dbus(dev[0], 1, "wpa_dbus_dict_get_entry;wpas_dbus_handler_create_interface", "CreateInterface", "InvalidArgs"):
fab49f61 1908 params = dbus.Dictionary({'Ifname': 'lo', 'Driver': 'none'},
0e126c6d
JM
1909 signature='sv')
1910 wpas.CreateInterface(params)
1911
1912 for i in range(1, 1000):
1913 dev[0].request("TEST_ALLOC_FAIL %d:wpa_supplicant_add_iface;wpas_dbus_handler_create_interface" % i)
fab49f61 1914 params = dbus.Dictionary({'Ifname': 'lo', 'Driver': 'none'},
0e126c6d
JM
1915 signature='sv')
1916 try:
1917 npath = wpas.CreateInterface(params)
1918 wpas.RemoveInterface(npath)
1919 logger.info("CreateInterface succeeds after %d allocation failures" % i)
1920 state = dev[0].request('GET_ALLOC_FAIL')
1921 logger.info("GET_ALLOC_FAIL: " + state)
1922 dev[0].dump_monitor()
1923 dev[0].request("TEST_ALLOC_FAIL 0:")
1924 if i < 5:
1925 raise Exception("CreateInterface succeeded during out-of-memory")
1926 if not state.startswith('0:'):
1927 break
bab493b9 1928 except dbus.exceptions.DBusException as e:
0e126c6d
JM
1929 pass
1930
fab49f61 1931 for arg in ['Driver', 'Ifname', 'ConfigFile', 'BridgeIfname']:
0e126c6d
JM
1932 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_handler_create_interface",
1933 "CreateInterface"):
fab49f61 1934 params = dbus.Dictionary({arg: 'foo'}, signature='sv')
0e126c6d
JM
1935 wpas.CreateInterface(params)
1936
2ec82e67
JM
1937def test_dbus_blob(dev, apdev):
1938 """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
fab49f61 1939 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1940 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1941
15dfcb69 1942 blob = dbus.ByteArray(b"\x01\x02\x03")
2ec82e67
JM
1943 iface.AddBlob('blob1', blob)
1944 try:
15dfcb69 1945 iface.AddBlob('blob1', dbus.ByteArray(b"\x01\x02\x04"))
2ec82e67 1946 raise Exception("Invalid AddBlob() accepted")
bab493b9 1947 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1948 if "BlobExists" not in str(e):
1949 raise Exception("Unexpected error message for invalid AddBlob: " + str(e))
1950 res = iface.GetBlob('blob1')
1951 if len(res) != len(blob):
1952 raise Exception("Unexpected blob data length")
1953 for i in range(len(res)):
1954 if res[i] != dbus.Byte(blob[i]):
1955 raise Exception("Unexpected blob data")
1956 res = if_obj.Get(WPAS_DBUS_IFACE, "Blobs",
1957 dbus_interface=dbus.PROPERTIES_IFACE)
1958 if 'blob1' not in res:
1959 raise Exception("Added blob missing from Blobs property")
1960 iface.RemoveBlob('blob1')
1961 try:
1962 iface.RemoveBlob('blob1')
1963 raise Exception("Invalid RemoveBlob() accepted")
bab493b9 1964 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1965 if "BlobUnknown" not in str(e):
1966 raise Exception("Unexpected error message for invalid RemoveBlob: " + str(e))
1967 try:
1968 iface.GetBlob('blob1')
1969 raise Exception("Invalid GetBlob() accepted")
bab493b9 1970 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1971 if "BlobUnknown" not in str(e):
1972 raise Exception("Unexpected error message for invalid GetBlob: " + str(e))
1973
1974 class TestDbusBlob(TestDbus):
1975 def __init__(self, bus):
1976 TestDbus.__init__(self, bus)
1977 self.blob_added = False
1978 self.blob_removed = False
1979
1980 def __enter__(self):
1981 gobject.timeout_add(1, self.run_blob)
1982 gobject.timeout_add(15000, self.timeout)
1983 self.add_signal(self.blobAdded, WPAS_DBUS_IFACE, "BlobAdded")
1984 self.add_signal(self.blobRemoved, WPAS_DBUS_IFACE, "BlobRemoved")
1985 self.loop.run()
1986 return self
1987
1988 def blobAdded(self, blobName):
1989 logger.debug("blobAdded: %s" % blobName)
1990 if blobName == 'blob2':
1991 self.blob_added = True
1992
1993 def blobRemoved(self, blobName):
1994 logger.debug("blobRemoved: %s" % blobName)
1995 if blobName == 'blob2':
1996 self.blob_removed = True
1997 self.loop.quit()
1998
1999 def run_blob(self, *args):
2000 logger.debug("run_blob")
15dfcb69 2001 iface.AddBlob('blob2', dbus.ByteArray(b"\x01\x02\x04"))
2ec82e67
JM
2002 iface.RemoveBlob('blob2')
2003 return False
2004
2005 def success(self):
2006 return self.blob_added and self.blob_removed
2007
2008 with TestDbusBlob(bus) as t:
2009 if not t.success():
2010 raise Exception("Expected signals not seen")
2011
0e126c6d
JM
2012def test_dbus_blob_oom(dev, apdev):
2013 """D-Bus AddNetwork/RemoveNetwork OOM error cases"""
fab49f61 2014 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
0e126c6d
JM
2015 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2016
2017 for i in range(1, 4):
2018 with alloc_fail_dbus(dev[0], i, "wpas_dbus_handler_add_blob",
2019 "AddBlob"):
15dfcb69 2020 iface.AddBlob('blob_no_mem', dbus.ByteArray(b"\x01\x02\x03\x04"))
0e126c6d 2021
2ec82e67
JM
2022def test_dbus_autoscan(dev, apdev):
2023 """D-Bus Autoscan()"""
fab49f61 2024 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2025 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2026
2027 iface.AutoScan("foo")
2028 iface.AutoScan("periodic:1")
2029 iface.AutoScan("")
2030 dev[0].request("AUTOSCAN ")
2031
0e126c6d
JM
2032def test_dbus_autoscan_oom(dev, apdev):
2033 """D-Bus Autoscan() OOM"""
fab49f61 2034 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
0e126c6d
JM
2035 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2036
2037 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_autoscan", "AutoScan"):
2038 iface.AutoScan("foo")
2039 dev[0].request("AUTOSCAN ")
2040
2ec82e67
JM
2041def test_dbus_tdls_invalid(dev, apdev):
2042 """D-Bus invalid TDLS operations"""
fab49f61 2043 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2044 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2045
fab49f61 2046 hapd = hostapd.add_ap(apdev[0], {"ssid": "test-open"})
2ec82e67
JM
2047 connect_2sta_open(dev, hapd)
2048 addr1 = dev[1].p2p_interface_addr()
2049
2050 try:
2051 iface.TDLSDiscover("foo")
2052 raise Exception("Invalid TDLSDiscover() accepted")
bab493b9 2053 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2054 if "InvalidArgs" not in str(e):
2055 raise Exception("Unexpected error message for invalid TDLSDiscover: " + str(e))
2056
2057 try:
2058 iface.TDLSStatus("foo")
2059 raise Exception("Invalid TDLSStatus() accepted")
bab493b9 2060 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2061 if "InvalidArgs" not in str(e):
2062 raise Exception("Unexpected error message for invalid TDLSStatus: " + str(e))
2063
2064 res = iface.TDLSStatus(addr1)
2065 if res != "peer does not exist":
2066 raise Exception("Unexpected TDLSStatus response")
2067
2068 try:
2069 iface.TDLSSetup("foo")
2070 raise Exception("Invalid TDLSSetup() accepted")
bab493b9 2071 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2072 if "InvalidArgs" not in str(e):
2073 raise Exception("Unexpected error message for invalid TDLSSetup: " + str(e))
2074
2075 try:
2076 iface.TDLSTeardown("foo")
2077 raise Exception("Invalid TDLSTeardown() accepted")
bab493b9 2078 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2079 if "InvalidArgs" not in str(e):
2080 raise Exception("Unexpected error message for invalid TDLSTeardown: " + str(e))
2081
795b6f57
JM
2082 try:
2083 iface.TDLSTeardown("00:11:22:33:44:55")
2084 raise Exception("TDLSTeardown accepted for unknown peer")
bab493b9 2085 except dbus.exceptions.DBusException as e:
795b6f57
JM
2086 if "UnknownError: error performing TDLS teardown" not in str(e):
2087 raise Exception("Unexpected error message: " + str(e))
2088
82aea622
JM
2089 try:
2090 iface.TDLSChannelSwitch({})
2091 raise Exception("Invalid TDLSChannelSwitch() accepted")
bab493b9 2092 except dbus.exceptions.DBusException as e:
82aea622
JM
2093 if "InvalidArgs" not in str(e):
2094 raise Exception("Unexpected error message for invalid TDLSChannelSwitch: " + str(e))
2095
2096 try:
2097 iface.TDLSCancelChannelSwitch("foo")
2098 raise Exception("Invalid TDLSCancelChannelSwitch() accepted")
bab493b9 2099 except dbus.exceptions.DBusException as e:
82aea622
JM
2100 if "InvalidArgs" not in str(e):
2101 raise Exception("Unexpected error message for invalid TDLSCancelChannelSwitch: " + str(e))
2102
0e126c6d
JM
2103def test_dbus_tdls_oom(dev, apdev):
2104 """D-Bus TDLS operations during OOM"""
fab49f61 2105 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
0e126c6d
JM
2106 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2107
2108 with alloc_fail_dbus(dev[0], 1, "wpa_tdls_add_peer", "TDLSSetup",
2109 "UnknownError: error performing TDLS setup"):
2110 iface.TDLSSetup("00:11:22:33:44:55")
2111
2ec82e67
JM
2112def test_dbus_tdls(dev, apdev):
2113 """D-Bus TDLS"""
fab49f61 2114 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2115 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2116
fab49f61 2117 hapd = hostapd.add_ap(apdev[0], {"ssid": "test-open"})
2ec82e67
JM
2118 connect_2sta_open(dev, hapd)
2119
2120 addr1 = dev[1].p2p_interface_addr()
2121
2122 class TestDbusTdls(TestDbus):
2123 def __init__(self, bus):
2124 TestDbus.__init__(self, bus)
2125 self.tdls_setup = False
2126 self.tdls_teardown = False
2127
2128 def __enter__(self):
2129 gobject.timeout_add(1, self.run_tdls)
2130 gobject.timeout_add(15000, self.timeout)
2131 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
2132 "PropertiesChanged")
2133 self.loop.run()
2134 return self
2135
2136 def propertiesChanged(self, properties):
2137 logger.debug("propertiesChanged: %s" % str(properties))
2138
2139 def run_tdls(self, *args):
2140 logger.debug("run_tdls")
2141 iface.TDLSDiscover(addr1)
2142 gobject.timeout_add(100, self.run_tdls2)
2143 return False
2144
2145 def run_tdls2(self, *args):
2146 logger.debug("run_tdls2")
2147 iface.TDLSSetup(addr1)
2148 gobject.timeout_add(500, self.run_tdls3)
2149 return False
2150
2151 def run_tdls3(self, *args):
2152 logger.debug("run_tdls3")
2153 res = iface.TDLSStatus(addr1)
2154 if res == "connected":
2155 self.tdls_setup = True
2156 else:
2157 logger.info("Unexpected TDLSStatus: " + res)
2158 iface.TDLSTeardown(addr1)
2159 gobject.timeout_add(200, self.run_tdls4)
2160 return False
2161
2162 def run_tdls4(self, *args):
2163 logger.debug("run_tdls4")
2164 res = iface.TDLSStatus(addr1)
2165 if res == "peer does not exist":
2166 self.tdls_teardown = True
2167 else:
2168 logger.info("Unexpected TDLSStatus: " + res)
2169 self.loop.quit()
2170 return False
2171
2172 def success(self):
2173 return self.tdls_setup and self.tdls_teardown
2174
2175 with TestDbusTdls(bus) as t:
2176 if not t.success():
2177 raise Exception("Expected signals not seen")
2178
82aea622
JM
2179def test_dbus_tdls_channel_switch(dev, apdev):
2180 """D-Bus TDLS channel switch configuration"""
79467d74
JM
2181 flags = int(dev[0].get_driver_status_field('capa.flags'), 16)
2182 if flags & 0x800000000 == 0:
2183 raise HwsimSkip("Driver does not support TDLS channel switching")
2184
fab49f61 2185 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
82aea622
JM
2186 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2187
fab49f61 2188 hapd = hostapd.add_ap(apdev[0], {"ssid": "test-open"})
82aea622
JM
2189 connect_2sta_open(dev, hapd)
2190
2191 addr1 = dev[1].p2p_interface_addr()
2192
2193 class TestDbusTdls(TestDbus):
2194 def __init__(self, bus):
2195 TestDbus.__init__(self, bus)
2196 self.tdls_setup = False
2197 self.tdls_done = False
2198
2199 def __enter__(self):
2200 gobject.timeout_add(1, self.run_tdls)
2201 gobject.timeout_add(15000, self.timeout)
2202 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
2203 "PropertiesChanged")
2204 self.loop.run()
2205 return self
2206
2207 def propertiesChanged(self, properties):
2208 logger.debug("propertiesChanged: %s" % str(properties))
2209
2210 def run_tdls(self, *args):
2211 logger.debug("run_tdls")
2212 iface.TDLSDiscover(addr1)
2213 gobject.timeout_add(100, self.run_tdls2)
2214 return False
2215
2216 def run_tdls2(self, *args):
2217 logger.debug("run_tdls2")
2218 iface.TDLSSetup(addr1)
2219 gobject.timeout_add(500, self.run_tdls3)
2220 return False
2221
2222 def run_tdls3(self, *args):
2223 logger.debug("run_tdls3")
2224 res = iface.TDLSStatus(addr1)
2225 if res == "connected":
2226 self.tdls_setup = True
2227 else:
2228 logger.info("Unexpected TDLSStatus: " + res)
2229
2230 # Unknown dict entry
fab49f61 2231 args = dbus.Dictionary({'Foobar': dbus.Byte(1)},
82aea622
JM
2232 signature='sv')
2233 try:
2234 iface.TDLSChannelSwitch(args)
bab493b9 2235 except Exception as e:
82aea622
JM
2236 if "InvalidArgs" not in str(e):
2237 raise Exception("Unexpected exception")
2238
2239 # Missing OperClass
2240 args = dbus.Dictionary({}, signature='sv')
2241 try:
2242 iface.TDLSChannelSwitch(args)
bab493b9 2243 except Exception as e:
82aea622
JM
2244 if "InvalidArgs" not in str(e):
2245 raise Exception("Unexpected exception")
2246
2247 # Missing Frequency
fab49f61 2248 args = dbus.Dictionary({'OperClass': dbus.Byte(1)},
82aea622
JM
2249 signature='sv')
2250 try:
2251 iface.TDLSChannelSwitch(args)
bab493b9 2252 except Exception as e:
82aea622
JM
2253 if "InvalidArgs" not in str(e):
2254 raise Exception("Unexpected exception")
2255
2256 # Missing PeerAddress
fab49f61
JM
2257 args = dbus.Dictionary({'OperClass': dbus.Byte(1),
2258 'Frequency': dbus.UInt32(2417)},
82aea622
JM
2259 signature='sv')
2260 try:
2261 iface.TDLSChannelSwitch(args)
bab493b9 2262 except Exception as e:
82aea622
JM
2263 if "InvalidArgs" not in str(e):
2264 raise Exception("Unexpected exception")
2265
2266 # Valid parameters
fab49f61
JM
2267 args = dbus.Dictionary({'OperClass': dbus.Byte(1),
2268 'Frequency': dbus.UInt32(2417),
2269 'PeerAddress': addr1,
2270 'SecChannelOffset': dbus.UInt32(0),
2271 'CenterFrequency1': dbus.UInt32(0),
2272 'CenterFrequency2': dbus.UInt32(0),
2273 'Bandwidth': dbus.UInt32(20),
2274 'HT': dbus.Boolean(False),
2275 'VHT': dbus.Boolean(False)},
82aea622
JM
2276 signature='sv')
2277 iface.TDLSChannelSwitch(args)
2278
2279 gobject.timeout_add(200, self.run_tdls4)
2280 return False
2281
2282 def run_tdls4(self, *args):
2283 logger.debug("run_tdls4")
2284 iface.TDLSCancelChannelSwitch(addr1)
2285 self.tdls_done = True
2286 self.loop.quit()
2287 return False
2288
2289 def success(self):
2290 return self.tdls_setup and self.tdls_done
2291
2292 with TestDbusTdls(bus) as t:
2293 if not t.success():
2294 raise Exception("Expected signals not seen")
2295
2ec82e67
JM
2296def test_dbus_pkcs11(dev, apdev):
2297 """D-Bus SetPKCS11EngineAndModulePath()"""
fab49f61 2298 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2299 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2300
2301 try:
2302 iface.SetPKCS11EngineAndModulePath("foo", "bar")
bab493b9 2303 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2304 if "Error.Failed: Reinit of the EAPOL" not in str(e):
2305 raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e))
2306
2307 try:
2308 iface.SetPKCS11EngineAndModulePath("foo", "")
bab493b9 2309 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2310 if "Error.Failed: Reinit of the EAPOL" not in str(e):
2311 raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e))
2312
2313 iface.SetPKCS11EngineAndModulePath("", "bar")
2314 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath",
2315 dbus_interface=dbus.PROPERTIES_IFACE)
2316 if res != "":
2317 raise Exception("Unexpected PKCS11EnginePath value: " + res)
2318 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath",
2319 dbus_interface=dbus.PROPERTIES_IFACE)
2320 if res != "bar":
2321 raise Exception("Unexpected PKCS11ModulePath value: " + res)
2322
2323 iface.SetPKCS11EngineAndModulePath("", "")
2324 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath",
2325 dbus_interface=dbus.PROPERTIES_IFACE)
2326 if res != "":
2327 raise Exception("Unexpected PKCS11EnginePath value: " + res)
2328 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath",
2329 dbus_interface=dbus.PROPERTIES_IFACE)
2330 if res != "":
2331 raise Exception("Unexpected PKCS11ModulePath value: " + res)
2332
2333def test_dbus_apscan(dev, apdev):
2334 """D-Bus Get/Set ApScan"""
2335 try:
81e787b7 2336 _test_dbus_apscan(dev, apdev)
2ec82e67
JM
2337 finally:
2338 dev[0].request("AP_SCAN 1")
2339
2340def _test_dbus_apscan(dev, apdev):
fab49f61 2341 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2342
2343 res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan",
2344 dbus_interface=dbus.PROPERTIES_IFACE)
2345 if res != 1:
2346 raise Exception("Unexpected initial ApScan value: %d" % res)
2347
2348 for i in range(3):
2349 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(i),
2350 dbus_interface=dbus.PROPERTIES_IFACE)
2351 res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan",
2352 dbus_interface=dbus.PROPERTIES_IFACE)
2353 if res != i:
2354 raise Exception("Unexpected ApScan value %d (expected %d)" % (res, i))
2355
2356 try:
2357 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.Int16(-1),
2358 dbus_interface=dbus.PROPERTIES_IFACE)
2359 raise Exception("Invalid Set(ApScan,-1) accepted")
bab493b9 2360 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2361 if "Error.Failed: wrong property type" not in str(e):
2362 raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e))
2363
2364 try:
2365 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(123),
2366 dbus_interface=dbus.PROPERTIES_IFACE)
2367 raise Exception("Invalid Set(ApScan,123) accepted")
bab493b9 2368 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2369 if "Error.Failed: ap_scan must be 0, 1, or 2" not in str(e):
2370 raise Exception("Unexpected error message for invalid Set(ApScan,123): " + str(e))
2371
2372 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(1),
2373 dbus_interface=dbus.PROPERTIES_IFACE)
2374
8b67f649
LR
2375def test_dbus_pmf(dev, apdev):
2376 """D-Bus Get/Set Pmf"""
2377 try:
2378 _test_dbus_pmf(dev, apdev)
2379 finally:
2380 dev[0].request("SET pmf 0")
2381
2382def _test_dbus_pmf(dev, apdev):
fab49f61 2383 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
8b67f649
LR
2384
2385 dev[0].set("pmf", "0")
2386 res = if_obj.Get(WPAS_DBUS_IFACE, "Pmf",
2387 dbus_interface=dbus.PROPERTIES_IFACE)
2388 if res != "0":
2389 raise Exception("Unexpected initial Pmf value: %s" % res)
2390
2391 for i in range(3):
2392 if_obj.Set(WPAS_DBUS_IFACE, "Pmf", str(i),
2393 dbus_interface=dbus.PROPERTIES_IFACE)
2394 res = if_obj.Get(WPAS_DBUS_IFACE, "Pmf",
2395 dbus_interface=dbus.PROPERTIES_IFACE)
2396 if res != str(i):
2397 raise Exception("Unexpected Pmf value %s (expected %d)" % (res, i))
2398
2399 if_obj.Set(WPAS_DBUS_IFACE, "Pmf", "1",
2400 dbus_interface=dbus.PROPERTIES_IFACE)
2401
2ec82e67
JM
2402def test_dbus_fastreauth(dev, apdev):
2403 """D-Bus Get/Set FastReauth"""
fab49f61 2404 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2405
2406 res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth",
2407 dbus_interface=dbus.PROPERTIES_IFACE)
2408 if res != True:
2409 raise Exception("Unexpected initial FastReauth value: " + str(res))
2410
fab49f61 2411 for i in [False, True]:
2ec82e67
JM
2412 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(i),
2413 dbus_interface=dbus.PROPERTIES_IFACE)
2414 res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth",
2415 dbus_interface=dbus.PROPERTIES_IFACE)
2416 if res != i:
2417 raise Exception("Unexpected FastReauth value %d (expected %d)" % (res, i))
2418
2419 try:
2420 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Int16(-1),
2421 dbus_interface=dbus.PROPERTIES_IFACE)
2422 raise Exception("Invalid Set(FastReauth,-1) accepted")
bab493b9 2423 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2424 if "Error.Failed: wrong property type" not in str(e):
2425 raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e))
2426
2427 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(True),
2428 dbus_interface=dbus.PROPERTIES_IFACE)
2429
2430def test_dbus_bss_expire(dev, apdev):
2431 """D-Bus Get/Set BSSExpireAge and BSSExpireCount"""
fab49f61 2432 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2433
2434 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(179),
2435 dbus_interface=dbus.PROPERTIES_IFACE)
2436 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireAge",
2437 dbus_interface=dbus.PROPERTIES_IFACE)
2438 if res != 179:
2439 raise Exception("Unexpected BSSExpireAge value %d (expected %d)" % (res, i))
2440
2441 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(3),
2442 dbus_interface=dbus.PROPERTIES_IFACE)
2443 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireCount",
2444 dbus_interface=dbus.PROPERTIES_IFACE)
2445 if res != 3:
2446 raise Exception("Unexpected BSSExpireCount value %d (expected %d)" % (res, i))
2447
2448 try:
2449 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.Int16(-1),
2450 dbus_interface=dbus.PROPERTIES_IFACE)
2451 raise Exception("Invalid Set(BSSExpireAge,-1) accepted")
bab493b9 2452 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2453 if "Error.Failed: wrong property type" not in str(e):
2454 raise Exception("Unexpected error message for invalid Set(BSSExpireAge,-1): " + str(e))
2455
2456 try:
2457 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(9),
2458 dbus_interface=dbus.PROPERTIES_IFACE)
2459 raise Exception("Invalid Set(BSSExpireAge,9) accepted")
bab493b9 2460 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2461 if "Error.Failed: BSSExpireAge must be >= 10" not in str(e):
2462 raise Exception("Unexpected error message for invalid Set(BSSExpireAge,9): " + str(e))
2463
2464 try:
2465 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.Int16(-1),
2466 dbus_interface=dbus.PROPERTIES_IFACE)
2467 raise Exception("Invalid Set(BSSExpireCount,-1) accepted")
bab493b9 2468 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2469 if "Error.Failed: wrong property type" not in str(e):
2470 raise Exception("Unexpected error message for invalid Set(BSSExpireCount,-1): " + str(e))
2471
2472 try:
2473 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(0),
2474 dbus_interface=dbus.PROPERTIES_IFACE)
2475 raise Exception("Invalid Set(BSSExpireCount,0) accepted")
bab493b9 2476 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2477 if "Error.Failed: BSSExpireCount must be > 0" not in str(e):
2478 raise Exception("Unexpected error message for invalid Set(BSSExpireCount,0): " + str(e))
2479
2480 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(180),
2481 dbus_interface=dbus.PROPERTIES_IFACE)
2482 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(2),
2483 dbus_interface=dbus.PROPERTIES_IFACE)
2484
2485def test_dbus_country(dev, apdev):
2486 """D-Bus Get/Set Country"""
2487 try:
81e787b7 2488 _test_dbus_country(dev, apdev)
2ec82e67
JM
2489 finally:
2490 dev[0].request("SET country 00")
2491 subprocess.call(['iw', 'reg', 'set', '00'])
2492
2493def _test_dbus_country(dev, apdev):
fab49f61 2494 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2495
2496 # work around issues with possible pending regdom event from the end of
2497 # the previous test case
2498 time.sleep(0.2)
2499 dev[0].dump_monitor()
2500
2501 if_obj.Set(WPAS_DBUS_IFACE, "Country", "FI",
2502 dbus_interface=dbus.PROPERTIES_IFACE)
2503 res = if_obj.Get(WPAS_DBUS_IFACE, "Country",
2504 dbus_interface=dbus.PROPERTIES_IFACE)
2505 if res != "FI":
2506 raise Exception("Unexpected Country value %s (expected FI)" % res)
2507
2508 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
2509 if ev is None:
cb346b49
JM
2510 # For now, work around separate P2P Device interface event delivery
2511 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
2512 if ev is None:
2513 raise Exception("regdom change event not seen")
2ec82e67
JM
2514 if "init=USER type=COUNTRY alpha2=FI" not in ev:
2515 raise Exception("Unexpected event contents: " + ev)
2516
2517 try:
2518 if_obj.Set(WPAS_DBUS_IFACE, "Country", dbus.Int16(-1),
2519 dbus_interface=dbus.PROPERTIES_IFACE)
2520 raise Exception("Invalid Set(Country,-1) accepted")
bab493b9 2521 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2522 if "Error.Failed: wrong property type" not in str(e):
2523 raise Exception("Unexpected error message for invalid Set(Country,-1): " + str(e))
2524
2525 try:
2526 if_obj.Set(WPAS_DBUS_IFACE, "Country", "F",
2527 dbus_interface=dbus.PROPERTIES_IFACE)
2528 raise Exception("Invalid Set(Country,F) accepted")
bab493b9 2529 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2530 if "Error.Failed: invalid country code" not in str(e):
2531 raise Exception("Unexpected error message for invalid Set(Country,F): " + str(e))
2532
2533 if_obj.Set(WPAS_DBUS_IFACE, "Country", "00",
2534 dbus_interface=dbus.PROPERTIES_IFACE)
2535
2536 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
2537 if ev is None:
cb346b49
JM
2538 # For now, work around separate P2P Device interface event delivery
2539 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
2540 if ev is None:
2541 raise Exception("regdom change event not seen")
be90370b
JM
2542 # init=CORE was previously used due to invalid db.txt data for 00. For
2543 # now, allow both it and the new init=USER after fixed db.txt.
2544 if "init=CORE type=WORLD" not in ev and "init=USER type=WORLD" not in ev:
2ec82e67
JM
2545 raise Exception("Unexpected event contents: " + ev)
2546
2547def test_dbus_scan_interval(dev, apdev):
2548 """D-Bus Get/Set ScanInterval"""
2549 try:
2550 _test_dbus_scan_interval(dev, apdev)
2551 finally:
2552 dev[0].request("SCAN_INTERVAL 5")
2553
2554def _test_dbus_scan_interval(dev, apdev):
fab49f61 2555 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2556
2557 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(3),
2558 dbus_interface=dbus.PROPERTIES_IFACE)
2559 res = if_obj.Get(WPAS_DBUS_IFACE, "ScanInterval",
2560 dbus_interface=dbus.PROPERTIES_IFACE)
2561 if res != 3:
2562 raise Exception("Unexpected ScanInterval value %d (expected %d)" % (res, i))
2563
2564 try:
2565 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.UInt16(100),
2566 dbus_interface=dbus.PROPERTIES_IFACE)
2567 raise Exception("Invalid Set(ScanInterval,100) accepted")
bab493b9 2568 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2569 if "Error.Failed: wrong property type" not in str(e):
2570 raise Exception("Unexpected error message for invalid Set(ScanInterval,100): " + str(e))
2571
2572 try:
2573 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(-1),
2574 dbus_interface=dbus.PROPERTIES_IFACE)
2575 raise Exception("Invalid Set(ScanInterval,-1) accepted")
bab493b9 2576 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2577 if "Error.Failed: scan_interval must be >= 0" not in str(e):
2578 raise Exception("Unexpected error message for invalid Set(ScanInterval,-1): " + str(e))
2579
2580 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(5),
2581 dbus_interface=dbus.PROPERTIES_IFACE)
2582
2583def test_dbus_probe_req_reporting(dev, apdev):
2584 """D-Bus Probe Request reporting"""
fab49f61 2585 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67 2586
2ec82e67
JM
2587 dev[1].p2p_find(social=True)
2588
2589 class TestDbusProbe(TestDbus):
2590 def __init__(self, bus):
2591 TestDbus.__init__(self, bus)
2592 self.reported = False
2593
2594 def __enter__(self):
2595 gobject.timeout_add(1, self.run_test)
2596 gobject.timeout_add(15000, self.timeout)
cb346b49
JM
2597 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
2598 "GroupStarted")
2ec82e67
JM
2599 self.add_signal(self.probeRequest, WPAS_DBUS_IFACE, "ProbeRequest",
2600 byte_arrays=True)
2ec82e67
JM
2601 self.loop.run()
2602 return self
2603
cb346b49
JM
2604 def groupStarted(self, properties):
2605 logger.debug("groupStarted: " + str(properties))
2606 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
2607 properties['interface_object'])
2608 self.iface = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE)
2609 self.iface.SubscribeProbeReq()
2610 self.group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2611
2ec82e67
JM
2612 def probeRequest(self, args):
2613 logger.debug("probeRequest: args=%s" % str(args))
2614 self.reported = True
2615 self.loop.quit()
2616
2617 def run_test(self, *args):
2618 logger.debug("run_test")
cb346b49 2619 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
fab49f61 2620 params = dbus.Dictionary({'frequency': 2412})
cb346b49 2621 p2p.GroupAdd(params)
2ec82e67
JM
2622 return False
2623
2624 def success(self):
2625 return self.reported
2626
2627 with TestDbusProbe(bus) as t:
2628 if not t.success():
2629 raise Exception("Expected signals not seen")
cb346b49
JM
2630 t.iface.UnsubscribeProbeReq()
2631 try:
2632 t.iface.UnsubscribeProbeReq()
2633 raise Exception("Invalid UnsubscribeProbeReq() accepted")
bab493b9 2634 except dbus.exceptions.DBusException as e:
cb346b49
JM
2635 if "NoSubscription" not in str(e):
2636 raise Exception("Unexpected error message for invalid UnsubscribeProbeReq(): " + str(e))
2637 t.group_p2p.Disconnect()
2ec82e67
JM
2638
2639 with TestDbusProbe(bus) as t:
2640 if not t.success():
2641 raise Exception("Expected signals not seen")
2642 # On purpose, leave ProbeReq subscription in place to test automatic
2643 # cleanup.
2644
2645 dev[1].p2p_stop_find()
2ec82e67 2646
0e126c6d
JM
2647def test_dbus_probe_req_reporting_oom(dev, apdev):
2648 """D-Bus Probe Request reporting (OOM)"""
fab49f61 2649 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
0e126c6d
JM
2650 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2651
fb9adae4
JM
2652 # Need to make sure this process has not already subscribed to avoid false
2653 # failures due to the operation succeeding due to os_strdup() not even
2654 # getting called.
2655 try:
2656 iface.UnsubscribeProbeReq()
2657 was_subscribed = True
bab493b9 2658 except dbus.exceptions.DBusException as e:
fb9adae4
JM
2659 was_subscribed = False
2660 pass
2661
0e126c6d
JM
2662 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_subscribe_preq",
2663 "SubscribeProbeReq"):
2664 iface.SubscribeProbeReq()
2665
fb9adae4
JM
2666 if was_subscribed:
2667 # On purpose, leave ProbeReq subscription in place to test automatic
2668 # cleanup.
2669 iface.SubscribeProbeReq()
2670
2ec82e67
JM
2671def test_dbus_p2p_invalid(dev, apdev):
2672 """D-Bus invalid P2P operations"""
fab49f61 2673 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2674 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2675
2676 try:
2677 p2p.RejectPeer(path + "/Peers/00112233445566")
2678 raise Exception("Invalid RejectPeer accepted")
bab493b9 2679 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2680 if "UnknownError: Failed to call wpas_p2p_reject" not in str(e):
2681 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2682
2683 try:
2684 p2p.RejectPeer("/foo")
2685 raise Exception("Invalid RejectPeer accepted")
bab493b9 2686 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2687 if "InvalidArgs" not in str(e):
2688 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2689
fab49f61
JM
2690 tests = [{},
2691 {'peer': 'foo'},
2692 {'foo': "bar"},
2693 {'iface': "abc"},
2694 {'iface': 123}]
001c4bf5
JM
2695 for t in tests:
2696 try:
2697 p2p.RemoveClient(t)
2698 raise Exception("Invalid RemoveClient accepted")
bab493b9 2699 except dbus.exceptions.DBusException as e:
001c4bf5
JM
2700 if "InvalidArgs" not in str(e):
2701 raise Exception("Unexpected error message for invalid RemoveClient(): " + str(e))
2702
fab49f61
JM
2703 tests = [{'DiscoveryType': 'foo'},
2704 {'RequestedDeviceTypes': 'foo'},
2705 {'RequestedDeviceTypes': ['foo']},
2706 {'RequestedDeviceTypes': ['1', '2', '3', '4', '5', '6', '7', '8',
2707 '9', '10', '11', '12', '13', '14', '15',
2708 '16', '17']},
2709 {'RequestedDeviceTypes': dbus.Array([], signature="s")},
2710 {'RequestedDeviceTypes': dbus.Array([['foo']], signature="as")},
2711 {'RequestedDeviceTypes': dbus.Array([], signature="i")},
2712 {'RequestedDeviceTypes': [dbus.ByteArray(b'12345678'),
2713 dbus.ByteArray(b'1234567')]},
2714 {'Foo': dbus.Int16(1)},
2715 {'Foo': dbus.UInt16(1)},
2716 {'Foo': dbus.Int64(1)},
2717 {'Foo': dbus.UInt64(1)},
2718 {'Foo': dbus.Double(1.23)},
2719 {'Foo': dbus.Signature('s')},
2720 {'Foo': 'bar'}]
2ec82e67
JM
2721 for t in tests:
2722 try:
2723 p2p.Find(dbus.Dictionary(t))
2724 raise Exception("Invalid Find accepted")
bab493b9 2725 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2726 if "InvalidArgs" not in str(e):
2727 raise Exception("Unexpected error message for invalid Find(): " + str(e))
2728
fab49f61
JM
2729 for p in ["/foo",
2730 "/fi/w1/wpa_supplicant1/Interfaces/1234",
2731 "/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234"]:
2ec82e67
JM
2732 try:
2733 p2p.RemovePersistentGroup(dbus.ObjectPath(p))
2734 raise Exception("Invalid RemovePersistentGroup accepted")
bab493b9 2735 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2736 if "InvalidArgs" not in str(e):
2737 raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e))
2738
2739 try:
2740 dev[0].request("P2P_SET disabled 1")
2741 p2p.Listen(5)
2742 raise Exception("Invalid Listen accepted")
bab493b9 2743 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2744 if "UnknownError: Could not start P2P listen" not in str(e):
2745 raise Exception("Unexpected error message for invalid Listen: " + str(e))
2746 finally:
2747 dev[0].request("P2P_SET disabled 0")
2748
2749 test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False)
2750 test_p2p = dbus.Interface(test_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2751 try:
2752 test_p2p.Listen("foo")
2753 raise Exception("Invalid Listen accepted")
bab493b9 2754 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2755 if "InvalidArgs" not in str(e):
2756 raise Exception("Unexpected error message for invalid Listen: " + str(e))
2757
2758 try:
2759 dev[0].request("P2P_SET disabled 1")
2760 p2p.ExtendedListen(dbus.Dictionary({}))
2761 raise Exception("Invalid ExtendedListen accepted")
bab493b9 2762 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2763 if "UnknownError: failed to initiate a p2p_ext_listen" not in str(e):
2764 raise Exception("Unexpected error message for invalid ExtendedListen: " + str(e))
2765 finally:
2766 dev[0].request("P2P_SET disabled 0")
2767
2768 try:
2769 dev[0].request("P2P_SET disabled 1")
fab49f61
JM
2770 args = {'duration1': 30000, 'interval1': 102400,
2771 'duration2': 20000, 'interval2': 102400}
2ec82e67
JM
2772 p2p.PresenceRequest(args)
2773 raise Exception("Invalid PresenceRequest accepted")
bab493b9 2774 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2775 if "UnknownError: Failed to invoke presence request" not in str(e):
2776 raise Exception("Unexpected error message for invalid PresenceRequest: " + str(e))
2777 finally:
2778 dev[0].request("P2P_SET disabled 0")
2779
2780 try:
2781 params = dbus.Dictionary({'frequency': dbus.Int32(-1)})
2782 p2p.GroupAdd(params)
2783 raise Exception("Invalid GroupAdd accepted")
bab493b9 2784 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2785 if "InvalidArgs" not in str(e):
2786 raise Exception("Unexpected error message for invalid GroupAdd: " + str(e))
2787
2788 try:
2789 params = dbus.Dictionary({'persistent_group_object':
2790 dbus.ObjectPath(path),
2791 'frequency': 2412})
2792 p2p.GroupAdd(params)
2793 raise Exception("Invalid GroupAdd accepted")
bab493b9 2794 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2795 if "InvalidArgs" not in str(e):
2796 raise Exception("Unexpected error message for invalid GroupAdd: " + str(e))
2797
2798 try:
2799 p2p.Disconnect()
2800 raise Exception("Invalid Disconnect accepted")
bab493b9 2801 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2802 if "UnknownError: failed to disconnect" not in str(e):
2803 raise Exception("Unexpected error message for invalid Disconnect: " + str(e))
2804
2805 try:
2806 dev[0].request("P2P_SET disabled 1")
2807 p2p.Flush()
2808 raise Exception("Invalid Flush accepted")
bab493b9 2809 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2810 if "Error.Failed: P2P is not available for this interface" not in str(e):
2811 raise Exception("Unexpected error message for invalid Flush: " + str(e))
2812 finally:
2813 dev[0].request("P2P_SET disabled 0")
2814
2815 try:
2816 dev[0].request("P2P_SET disabled 1")
fab49f61
JM
2817 args = {'peer': path,
2818 'join': True,
2819 'wps_method': 'pbc',
2820 'frequency': 2412}
2ec82e67
JM
2821 pin = p2p.Connect(args)
2822 raise Exception("Invalid Connect accepted")
bab493b9 2823 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2824 if "Error.Failed: P2P is not available for this interface" not in str(e):
2825 raise Exception("Unexpected error message for invalid Connect: " + str(e))
2826 finally:
2827 dev[0].request("P2P_SET disabled 0")
2828
fab49f61
JM
2829 tests = [{'frequency': dbus.Int32(-1)},
2830 {'wps_method': 'pbc'},
2831 {'wps_method': 'foo'}]
2ec82e67
JM
2832 for args in tests:
2833 try:
2834 pin = p2p.Connect(args)
2835 raise Exception("Invalid Connect accepted")
bab493b9 2836 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2837 if "InvalidArgs" not in str(e):
2838 raise Exception("Unexpected error message for invalid Connect: " + str(e))
2839
2840 try:
2841 dev[0].request("P2P_SET disabled 1")
fab49f61 2842 args = {'peer': path}
2ec82e67
JM
2843 pin = p2p.Invite(args)
2844 raise Exception("Invalid Invite accepted")
bab493b9 2845 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2846 if "Error.Failed: P2P is not available for this interface" not in str(e):
2847 raise Exception("Unexpected error message for invalid Invite: " + str(e))
2848 finally:
2849 dev[0].request("P2P_SET disabled 0")
2850
2851 try:
fab49f61 2852 args = {'foo': 'bar'}
2ec82e67
JM
2853 pin = p2p.Invite(args)
2854 raise Exception("Invalid Invite accepted")
bab493b9 2855 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2856 if "InvalidArgs" not in str(e):
2857 raise Exception("Unexpected error message for invalid Connect: " + str(e))
2858
fab49f61
JM
2859 tests = [(path, 'display', "InvalidArgs"),
2860 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2861 'display',
2862 "UnknownError: Failed to send provision discovery request"),
2863 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2864 'keypad',
2865 "UnknownError: Failed to send provision discovery request"),
2866 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2867 'pbc',
2868 "UnknownError: Failed to send provision discovery request"),
2869 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2870 'pushbutton',
2871 "UnknownError: Failed to send provision discovery request"),
2872 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2873 'foo', "InvalidArgs")]
2874 for (p, method, err) in tests:
2ec82e67
JM
2875 try:
2876 p2p.ProvisionDiscoveryRequest(p, method)
2877 raise Exception("Invalid ProvisionDiscoveryRequest accepted")
bab493b9 2878 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2879 if err not in str(e):
2880 raise Exception("Unexpected error message for invalid ProvisionDiscoveryRequest: " + str(e))
2881
2882 try:
2883 dev[0].request("P2P_SET disabled 1")
2884 if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers",
2885 dbus_interface=dbus.PROPERTIES_IFACE)
2886 raise Exception("Invalid Get(Peers) accepted")
bab493b9 2887 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2888 if "Error.Failed: P2P is not available for this interface" not in str(e):
2889 raise Exception("Unexpected error message for invalid Get(Peers): " + str(e))
2890 finally:
2891 dev[0].request("P2P_SET disabled 0")
2892
0e126c6d
JM
2893def test_dbus_p2p_oom(dev, apdev):
2894 """D-Bus P2P operations and OOM"""
fab49f61 2895 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
0e126c6d
JM
2896 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2897
2898 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_string_array",
2899 "Find", "InvalidArgs"):
fab49f61 2900 p2p.Find(dbus.Dictionary({'Foo': ['bar']}))
0e126c6d
JM
2901
2902 with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_string_array",
2903 "Find", "InvalidArgs"):
fab49f61 2904 p2p.Find(dbus.Dictionary({'Foo': ['bar']}))
0e126c6d
JM
2905
2906 with alloc_fail_dbus(dev[0], 10, "_wpa_dbus_dict_entry_get_string_array",
2907 "Find", "InvalidArgs"):
fab49f61
JM
2908 p2p.Find(dbus.Dictionary({'Foo': ['1', '2', '3', '4', '5', '6', '7',
2909 '8', '9']}))
0e126c6d
JM
2910
2911 with alloc_fail_dbus(dev[0], 1, ":=_wpa_dbus_dict_entry_get_binarray",
2912 "Find", "InvalidArgs"):
fab49f61 2913 p2p.Find(dbus.Dictionary({'Foo': [dbus.ByteArray(b'123')]}))
0e126c6d
JM
2914
2915 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array;_wpa_dbus_dict_entry_get_binarray",
2916 "Find", "InvalidArgs"):
fab49f61 2917 p2p.Find(dbus.Dictionary({'Foo': [dbus.ByteArray(b'123')]}))
0e126c6d
JM
2918
2919 with alloc_fail_dbus(dev[0], 2, "=_wpa_dbus_dict_entry_get_binarray",
2920 "Find", "InvalidArgs"):
fab49f61
JM
2921 p2p.Find(dbus.Dictionary({'Foo': [dbus.ByteArray(b'123'),
2922 dbus.ByteArray(b'123'),
2923 dbus.ByteArray(b'123'),
2924 dbus.ByteArray(b'123'),
2925 dbus.ByteArray(b'123'),
2926 dbus.ByteArray(b'123'),
2927 dbus.ByteArray(b'123'),
2928 dbus.ByteArray(b'123'),
2929 dbus.ByteArray(b'123'),
2930 dbus.ByteArray(b'123'),
2931 dbus.ByteArray(b'123')]}))
0e126c6d
JM
2932
2933 with alloc_fail_dbus(dev[0], 1, "wpabuf_alloc_ext_data;_wpa_dbus_dict_entry_get_binarray",
2934 "Find", "InvalidArgs"):
fab49f61 2935 p2p.Find(dbus.Dictionary({'Foo': [dbus.ByteArray(b'123')]}))
0e126c6d
JM
2936
2937 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_handler_p2p_find",
2938 "Find", "InvalidArgs"):
fab49f61 2939 p2p.Find(dbus.Dictionary({'Foo': path}))
0e126c6d
JM
2940
2941 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array",
2942 "AddService", "InvalidArgs"):
fab49f61
JM
2943 args = {'service_type': 'bonjour',
2944 'response': dbus.ByteArray(500*b'b')}
0e126c6d
JM
2945 p2p.AddService(args)
2946
2947 with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_byte_array",
2948 "AddService", "InvalidArgs"):
2949 p2p.AddService(args)
2950
2ec82e67
JM
2951def test_dbus_p2p_discovery(dev, apdev):
2952 """D-Bus P2P discovery"""
6a94fdf2
JM
2953 try:
2954 run_dbus_p2p_discovery(dev, apdev)
2955 finally:
2956 dev[1].request("VENDOR_ELEM_REMOVE 1 *")
2957
2958def run_dbus_p2p_discovery(dev, apdev):
fab49f61 2959 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2960 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2961
2962 addr0 = dev[0].p2p_dev_addr()
2963
2964 dev[1].request("SET sec_device_type 1-0050F204-2")
2965 dev[1].request("VENDOR_ELEM_ADD 1 dd0c0050f2041049000411223344")
6a94fdf2 2966 dev[1].request("VENDOR_ELEM_ADD 1 dd06001122335566")
2ec82e67
JM
2967 dev[1].p2p_listen()
2968 addr1 = dev[1].p2p_dev_addr()
fab49f61 2969 a1 = binascii.unhexlify(addr1.replace(':', ''))
2ec82e67
JM
2970
2971 wfd_devinfo = "00001c440028"
2972 dev[2].request("SET wifi_display 1")
2973 dev[2].request("WFD_SUBELEM_SET 0 0006" + wfd_devinfo)
2974 wfd = binascii.unhexlify('000006' + wfd_devinfo)
2975 dev[2].p2p_listen()
2976 addr2 = dev[2].p2p_dev_addr()
fab49f61 2977 a2 = binascii.unhexlify(addr2.replace(':', ''))
2ec82e67
JM
2978
2979 res = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
2980 dbus_interface=dbus.PROPERTIES_IFACE)
2981 if 'Peers' not in res:
2982 raise Exception("GetAll result missing Peers")
2983 if len(res['Peers']) != 0:
2984 raise Exception("Unexpected peer(s) in the list")
2985
2986 args = {'DiscoveryType': 'social',
15dfcb69 2987 'RequestedDeviceTypes': [dbus.ByteArray(b'12345678')],
fab49f61 2988 'Timeout': dbus.Int32(1)}
2ec82e67
JM
2989 p2p.Find(dbus.Dictionary(args))
2990 p2p.StopFind()
2991
2992 class TestDbusP2p(TestDbus):
2993 def __init__(self, bus):
2994 TestDbus.__init__(self, bus)
2995 self.found = False
2996 self.found2 = False
e7d454bb 2997 self.found_prop = False
2ec82e67 2998 self.lost = False
571a1af2 2999 self.find_stopped = False
2ec82e67
JM
3000
3001 def __enter__(self):
3002 gobject.timeout_add(1, self.run_test)
3003 gobject.timeout_add(15000, self.timeout)
3004 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3005 "DeviceFound")
e7d454bb
JM
3006 self.add_signal(self.deviceFoundProperties,
3007 WPAS_DBUS_IFACE_P2PDEVICE, "DeviceFoundProperties")
2ec82e67
JM
3008 self.add_signal(self.deviceLost, WPAS_DBUS_IFACE_P2PDEVICE,
3009 "DeviceLost")
3010 self.add_signal(self.provisionDiscoveryResponseEnterPin,
3011 WPAS_DBUS_IFACE_P2PDEVICE,
3012 "ProvisionDiscoveryResponseEnterPin")
571a1af2
JM
3013 self.add_signal(self.findStopped, WPAS_DBUS_IFACE_P2PDEVICE,
3014 "FindStopped")
2ec82e67
JM
3015 self.loop.run()
3016 return self
3017
3018 def deviceFound(self, path):
3019 logger.debug("deviceFound: path=%s" % path)
3020 res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers",
3021 dbus_interface=dbus.PROPERTIES_IFACE)
3022 if len(res) < 1:
3023 raise Exception("Unexpected number of peers")
3024 if path not in res:
3025 raise Exception("Mismatch in peer object path")
3026 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3027 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3028 dbus_interface=dbus.PROPERTIES_IFACE,
3029 byte_arrays=True)
3030 logger.debug("peer properties: " + str(res))
3031
3032 if res['DeviceAddress'] == a1:
3033 if 'SecondaryDeviceTypes' not in res:
3034 raise Exception("Missing SecondaryDeviceTypes")
3035 sec = res['SecondaryDeviceTypes']
3036 if len(sec) < 1:
3037 raise Exception("Secondary device type missing")
15dfcb69 3038 if b"\x00\x01\x00\x50\xF2\x04\x00\x02" not in sec:
2ec82e67
JM
3039 raise Exception("Secondary device type mismatch")
3040
3041 if 'VendorExtension' not in res:
3042 raise Exception("Missing VendorExtension")
3043 vendor = res['VendorExtension']
3044 if len(vendor) < 1:
3045 raise Exception("Vendor extension missing")
15dfcb69 3046 if b"\x11\x22\x33\x44" not in vendor:
2ec82e67
JM
3047 raise Exception("Secondary device type mismatch")
3048
6a94fdf2
JM
3049 if 'VSIE' not in res:
3050 raise Exception("Missing VSIE")
3051 vendor = res['VSIE']
3052 if len(vendor) < 1:
3053 raise Exception("VSIE missing")
15dfcb69 3054 if vendor != b"\xdd\x06\x00\x11\x22\x33\x55\x66":
6a94fdf2
JM
3055 raise Exception("VSIE mismatch")
3056
2ec82e67
JM
3057 self.found = True
3058 elif res['DeviceAddress'] == a2:
3059 if 'IEs' not in res:
3060 raise Exception("IEs missing")
3061 if res['IEs'] != wfd:
3062 raise Exception("IEs mismatch")
3063 self.found2 = True
3064 else:
3065 raise Exception("Unexpected peer device address")
3066
3067 if self.found and self.found2:
3068 p2p.StopFind()
3069 p2p.RejectPeer(path)
3070 p2p.ProvisionDiscoveryRequest(path, 'display')
3071
3072 def deviceLost(self, path):
3073 logger.debug("deviceLost: path=%s" % path)
50d7cded
JM
3074 if not self.found or not self.found2:
3075 # This may happen if a previous test case ended up scheduling
3076 # deviceLost event and that event did not get delivered before
3077 # starting the next test execution.
3078 logger.debug("Ignore deviceLost before the deviceFound events")
3079 return
2ec82e67
JM
3080 self.lost = True
3081 try:
3082 p2p.RejectPeer(path)
3083 raise Exception("Invalid RejectPeer accepted")
bab493b9 3084 except dbus.exceptions.DBusException as e:
2ec82e67
JM
3085 if "UnknownError: Failed to call wpas_p2p_reject" not in str(e):
3086 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
3087 self.loop.quit()
3088
e7d454bb
JM
3089 def deviceFoundProperties(self, path, properties):
3090 logger.debug("deviceFoundProperties: path=%s" % path)
3091 logger.debug("peer properties: " + str(properties))
3092 if properties['DeviceAddress'] == a1:
3093 self.found_prop = True
3094
2ec82e67
JM
3095 def provisionDiscoveryResponseEnterPin(self, peer_object):
3096 logger.debug("provisionDiscoveryResponseEnterPin - peer=%s" % peer_object)
3097 p2p.Flush()
3098
571a1af2
JM
3099 def findStopped(self):
3100 logger.debug("findStopped")
3101 self.find_stopped = True
3102
2ec82e67
JM
3103 def run_test(self, *args):
3104 logger.debug("run_test")
3105 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social',
3106 'Timeout': dbus.Int32(10)}))
3107 return False
3108
3109 def success(self):
571a1af2 3110 return self.found and self.lost and self.found2 and self.find_stopped
2ec82e67
JM
3111
3112 with TestDbusP2p(bus) as t:
3113 if not t.success():
3114 raise Exception("Expected signals not seen")
3115
3116 dev[1].request("VENDOR_ELEM_REMOVE 1 *")
3117 dev[1].p2p_stop_find()
3118
3119 p2p.Listen(1)
3120 dev[2].p2p_stop_find()
3121 dev[2].request("P2P_FLUSH")
3122 if not dev[2].discover_peer(addr0):
3123 raise Exception("Peer not found")
3124 p2p.StopFind()
3125 dev[2].p2p_stop_find()
3126
3127 try:
3128 p2p.ExtendedListen(dbus.Dictionary({'foo': 100}))
3129 raise Exception("Invalid ExtendedListen accepted")
bab493b9 3130 except dbus.exceptions.DBusException as e:
2ec82e67
JM
3131 if "InvalidArgs" not in str(e):
3132 raise Exception("Unexpected error message for invalid ExtendedListen(): " + str(e))
3133
3134 p2p.ExtendedListen(dbus.Dictionary({'period': 100, 'interval': 1000}))
3135 p2p.ExtendedListen(dbus.Dictionary({}))
3136 dev[0].global_request("P2P_EXT_LISTEN")
3137
6c44d97b
JM
3138def test_dbus_p2p_discovery_freq(dev, apdev):
3139 """D-Bus P2P discovery on a specific non-social channel"""
fab49f61 3140 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
6c44d97b
JM
3141 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3142
3143 addr1 = dev[1].p2p_dev_addr()
3144 autogo(dev[1], freq=2422)
3145
3146 class TestDbusP2p(TestDbus):
3147 def __init__(self, bus):
3148 TestDbus.__init__(self, bus)
3149 self.found = False
3150
3151 def __enter__(self):
3152 gobject.timeout_add(1, self.run_test)
3153 gobject.timeout_add(5000, self.timeout)
3154 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3155 "DeviceFound")
3156 self.loop.run()
3157 return self
3158
3159 def deviceFound(self, path):
3160 logger.debug("deviceFound: path=%s" % path)
3161 self.found = True
3162 self.loop.quit()
3163
3164 def run_test(self, *args):
3165 logger.debug("run_test")
3166 p2p.Find(dbus.Dictionary({'freq': 2422}))
3167 return False
3168
3169 def success(self):
3170 return self.found
3171
3172 with TestDbusP2p(bus) as t:
3173 if not t.success():
3174 raise Exception("Expected signals not seen")
3175
3176 dev[1].remove_group()
3177 p2p.StopFind()
3178
2ec82e67
JM
3179def test_dbus_p2p_service_discovery(dev, apdev):
3180 """D-Bus P2P service discovery"""
fab49f61 3181 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3182 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3183
3184 addr0 = dev[0].p2p_dev_addr()
3185 addr1 = dev[1].p2p_dev_addr()
3186
3187 bonjour_query = dbus.ByteArray(binascii.unhexlify('0b5f6166706f766572746370c00c000c01'))
3188 bonjour_response = dbus.ByteArray(binascii.unhexlify('074578616d706c65c027'))
db98b587 3189
fab49f61
JM
3190 args = {'service_type': 'bonjour',
3191 'query': bonjour_query,
3192 'response': bonjour_response}
2ec82e67
JM
3193 p2p.AddService(args)
3194 p2p.FlushService()
3195 p2p.AddService(args)
3196
3197 try:
3198 p2p.DeleteService(args)
3199 raise Exception("Invalid DeleteService() accepted")
bab493b9 3200 except dbus.exceptions.DBusException as e:
2ec82e67
JM
3201 if "InvalidArgs" not in str(e):
3202 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
3203
fab49f61
JM
3204 args = {'service_type': 'bonjour',
3205 'query': bonjour_query}
2ec82e67
JM
3206 p2p.DeleteService(args)
3207 try:
3208 p2p.DeleteService(args)
3209 raise Exception("Invalid DeleteService() accepted")
bab493b9 3210 except dbus.exceptions.DBusException as e:
2ec82e67
JM
3211 if "InvalidArgs" not in str(e):
3212 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
3213
fab49f61
JM
3214 args = {'service_type': 'upnp',
3215 'version': 0x10,
3216 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice'}
2ec82e67
JM
3217 p2p.AddService(args)
3218 p2p.DeleteService(args)
3219 try:
3220 p2p.DeleteService(args)
3221 raise Exception("Invalid DeleteService() accepted")
bab493b9 3222 except dbus.exceptions.DBusException as e:
2ec82e67
JM
3223 if "InvalidArgs" not in str(e):
3224 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
3225
fab49f61
JM
3226 tests = [{'service_type': 'foo'},
3227 {'service_type': 'foo', 'query': bonjour_query},
3228 {'service_type': 'upnp'},
3229 {'service_type': 'upnp', 'version': 0x10},
3230 {'service_type': 'upnp',
3231 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice'},
3232 {'version': 0x10,
3233 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice'},
3234 {'service_type': 'upnp', 'foo': 'bar'},
3235 {'service_type': 'bonjour'},
3236 {'service_type': 'bonjour', 'query': 'foo'},
3237 {'service_type': 'bonjour', 'foo': 'bar'}]
2ec82e67
JM
3238 for args in tests:
3239 try:
3240 p2p.DeleteService(args)
3241 raise Exception("Invalid DeleteService() accepted")
bab493b9 3242 except dbus.exceptions.DBusException as e:
2ec82e67
JM
3243 if "InvalidArgs" not in str(e):
3244 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
3245
fab49f61
JM
3246 tests = [{'service_type': 'foo'},
3247 {'service_type': 'upnp'},
3248 {'service_type': 'upnp', 'version': 0x10},
3249 {'service_type': 'upnp',
3250 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice'},
3251 {'version': 0x10,
3252 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice'},
3253 {'service_type': 'upnp', 'foo': 'bar'},
3254 {'service_type': 'bonjour'},
3255 {'service_type': 'bonjour', 'query': 'foo'},
3256 {'service_type': 'bonjour', 'response': 'foo'},
3257 {'service_type': 'bonjour', 'query': bonjour_query},
3258 {'service_type': 'bonjour', 'response': bonjour_response},
3259 {'service_type': 'bonjour', 'query': dbus.ByteArray(500*b'a')},
3260 {'service_type': 'bonjour', 'foo': 'bar'}]
2ec82e67
JM
3261 for args in tests:
3262 try:
3263 p2p.AddService(args)
3264 raise Exception("Invalid AddService() accepted")
bab493b9 3265 except dbus.exceptions.DBusException as e:
2ec82e67
JM
3266 if "InvalidArgs" not in str(e):
3267 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
3268
fab49f61 3269 args = {'tlv': dbus.ByteArray(b"\x02\x00\x00\x01")}
2ec82e67
JM
3270 ref = p2p.ServiceDiscoveryRequest(args)
3271 p2p.ServiceDiscoveryCancelRequest(ref)
3272 try:
3273 p2p.ServiceDiscoveryCancelRequest(ref)
3274 raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
bab493b9 3275 except dbus.exceptions.DBusException as e:
2ec82e67
JM
3276 if "InvalidArgs" not in str(e):
3277 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
3278 try:
3279 p2p.ServiceDiscoveryCancelRequest(dbus.UInt64(0))
3280 raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
bab493b9 3281 except dbus.exceptions.DBusException as e:
2ec82e67
JM
3282 if "InvalidArgs" not in str(e):
3283 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
3284
fab49f61
JM
3285 args = {'service_type': 'upnp',
3286 'version': 0x10,
3287 'service': 'ssdp:foo'}
2ec82e67
JM
3288 ref = p2p.ServiceDiscoveryRequest(args)
3289 p2p.ServiceDiscoveryCancelRequest(ref)
3290
fab49f61
JM
3291 tests = [{'service_type': 'foo'},
3292 {'foo': 'bar'},
3293 {'tlv': 'foo'},
3294 {},
3295 {'version': 0},
3296 {'service_type': 'upnp',
3297 'service': 'ssdp:foo'},
3298 {'service_type': 'upnp',
3299 'version': 0x10},
3300 {'service_type': 'upnp',
3301 'version': 0x10,
3302 'service': 'ssdp:foo',
3303 'peer_object': dbus.ObjectPath(path + "/Peers")},
3304 {'service_type': 'upnp',
3305 'version': 0x10,
3306 'service': 'ssdp:foo',
3307 'peer_object': path + "/Peers"},
3308 {'service_type': 'upnp',
3309 'version': 0x10,
3310 'service': 'ssdp:foo',
3311 'peer_object': dbus.ObjectPath(path + "/Peers/00112233445566")}]
2ec82e67
JM
3312 for args in tests:
3313 try:
3314 p2p.ServiceDiscoveryRequest(args)
3315 raise Exception("Invalid ServiceDiscoveryRequest accepted")
bab493b9 3316 except dbus.exceptions.DBusException as e:
2ec82e67
JM
3317 if "InvalidArgs" not in str(e):
3318 raise Exception("Unexpected error message for invalid ServiceDiscoveryRequest(): " + str(e))
3319
fab49f61 3320 args = {'foo': 'bar'}
2ec82e67
JM
3321 try:
3322 p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv'))
3323 raise Exception("Invalid ServiceDiscoveryResponse accepted")
bab493b9 3324 except dbus.exceptions.DBusException as e:
2ec82e67
JM
3325 if "InvalidArgs" not in str(e):
3326 raise Exception("Unexpected error message for invalid ServiceDiscoveryResponse(): " + str(e))
3327
3328def test_dbus_p2p_service_discovery_query(dev, apdev):
3329 """D-Bus P2P service discovery query"""
fab49f61 3330 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3331 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3332
3333 addr0 = dev[0].p2p_dev_addr()
3334 dev[1].request("P2P_SERVICE_ADD bonjour 0b5f6166706f766572746370c00c000c01 074578616d706c65c027")
3335 dev[1].p2p_listen()
3336 addr1 = dev[1].p2p_dev_addr()
3337
3338 class TestDbusP2p(TestDbus):
3339 def __init__(self, bus):
3340 TestDbus.__init__(self, bus)
3341 self.done = False
3342
3343 def __enter__(self):
3344 gobject.timeout_add(1, self.run_test)
3345 gobject.timeout_add(15000, self.timeout)
3346 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3347 "DeviceFound")
3348 self.add_signal(self.serviceDiscoveryResponse,
3349 WPAS_DBUS_IFACE_P2PDEVICE,
3350 "ServiceDiscoveryResponse", byte_arrays=True)
3351 self.loop.run()
3352 return self
3353
3354 def deviceFound(self, path):
3355 logger.debug("deviceFound: path=%s" % path)
fab49f61
JM
3356 args = {'peer_object': path,
3357 'tlv': dbus.ByteArray(b"\x02\x00\x00\x01")}
2ec82e67
JM
3358 p2p.ServiceDiscoveryRequest(args)
3359
3360 def serviceDiscoveryResponse(self, sd_request):
3361 logger.debug("serviceDiscoveryResponse: sd_request=%s" % str(sd_request))
3362 self.done = True
3363 self.loop.quit()
3364
3365 def run_test(self, *args):
3366 logger.debug("run_test")
3367 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social',
3368 'Timeout': dbus.Int32(10)}))
3369 return False
3370
3371 def success(self):
3372 return self.done
3373
3374 with TestDbusP2p(bus) as t:
3375 if not t.success():
3376 raise Exception("Expected signals not seen")
3377
3378 dev[1].p2p_stop_find()
3379
3380def test_dbus_p2p_service_discovery_external(dev, apdev):
3381 """D-Bus P2P service discovery with external response"""
3382 try:
3383 _test_dbus_p2p_service_discovery_external(dev, apdev)
3384 finally:
3385 dev[0].request("P2P_SERV_DISC_EXTERNAL 0")
3386
3387def _test_dbus_p2p_service_discovery_external(dev, apdev):
fab49f61 3388 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3389 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3390
3391 addr0 = dev[0].p2p_dev_addr()
3392 addr1 = dev[1].p2p_dev_addr()
3393 resp = "0300000101"
3394
3395 dev[1].request("P2P_FLUSH")
3396 dev[1].request("P2P_SERV_DISC_REQ " + addr0 + " 02000001")
3397 dev[1].p2p_find(social=True)
3398
3399 class TestDbusP2p(TestDbus):
3400 def __init__(self, bus):
3401 TestDbus.__init__(self, bus)
3402 self.sd = False
3403
3404 def __enter__(self):
3405 gobject.timeout_add(1, self.run_test)
3406 gobject.timeout_add(15000, self.timeout)
3407 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3408 "DeviceFound")
3409 self.add_signal(self.serviceDiscoveryRequest,
3410 WPAS_DBUS_IFACE_P2PDEVICE,
3411 "ServiceDiscoveryRequest")
3412 self.loop.run()
3413 return self
3414
3415 def deviceFound(self, path):
3416 logger.debug("deviceFound: path=%s" % path)
3417
3418 def serviceDiscoveryRequest(self, sd_request):
3419 logger.debug("serviceDiscoveryRequest: sd_request=%s" % str(sd_request))
3420 self.sd = True
fab49f61
JM
3421 args = {'peer_object': sd_request['peer_object'],
3422 'frequency': sd_request['frequency'],
3423 'dialog_token': sd_request['dialog_token'],
3424 'tlvs': dbus.ByteArray(binascii.unhexlify(resp))}
2ec82e67
JM
3425 p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv'))
3426 self.loop.quit()
3427
3428 def run_test(self, *args):
3429 logger.debug("run_test")
3430 p2p.ServiceDiscoveryExternal(1)
3431 p2p.ServiceUpdate()
3432 p2p.Listen(15)
3433 return False
3434
3435 def success(self):
3436 return self.sd
3437
3438 with TestDbusP2p(bus) as t:
3439 if not t.success():
3440 raise Exception("Expected signals not seen")
3441
3442 ev = dev[1].wait_global_event(["P2P-SERV-DISC-RESP"], timeout=5)
3443 if ev is None:
3444 raise Exception("Service discovery timed out")
3445 if addr0 not in ev:
3446 raise Exception("Unexpected address in SD Response: " + ev)
3447 if ev.split(' ')[4] != resp:
3448 raise Exception("Unexpected response data SD Response: " + ev)
3449 dev[1].p2p_stop_find()
3450
3451 p2p.StopFind()
3452 p2p.ServiceDiscoveryExternal(0)
3453
3454def test_dbus_p2p_autogo(dev, apdev):
3455 """D-Bus P2P autonomous GO"""
fab49f61 3456 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67 3457 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2ec82e67
JM
3458
3459 addr0 = dev[0].p2p_dev_addr()
3460
3461 class TestDbusP2p(TestDbus):
3462 def __init__(self, bus):
3463 TestDbus.__init__(self, bus)
3464 self.first = True
3465 self.waiting_end = False
035efb2c 3466 self.exceptions = False
001c4bf5 3467 self.deauthorized = False
2ec82e67
JM
3468 self.done = False
3469
3470 def __enter__(self):
3471 gobject.timeout_add(1, self.run_test)
3472 gobject.timeout_add(15000, self.timeout)
3473 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3474 "DeviceFound")
3475 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3476 "GroupStarted")
3477 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3478 "GroupFinished")
3479 self.add_signal(self.persistentGroupAdded,
3480 WPAS_DBUS_IFACE_P2PDEVICE,
3481 "PersistentGroupAdded")
3482 self.add_signal(self.persistentGroupRemoved,
3483 WPAS_DBUS_IFACE_P2PDEVICE,
3484 "PersistentGroupRemoved")
3485 self.add_signal(self.provisionDiscoveryRequestDisplayPin,
3486 WPAS_DBUS_IFACE_P2PDEVICE,
3487 "ProvisionDiscoveryRequestDisplayPin")
3488 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3489 "StaAuthorized")
001c4bf5
JM
3490 self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE,
3491 "StaDeauthorized")
2ec82e67
JM
3492 self.loop.run()
3493 return self
3494
3495 def groupStarted(self, properties):
3496 logger.debug("groupStarted: " + str(properties))
3497 self.group = properties['group_object']
cb346b49
JM
3498 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3499 properties['interface_object'])
3500 role = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
3501 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67 3502 if role != "GO":
035efb2c 3503 self.exceptions = True
2ec82e67 3504 raise Exception("Unexpected role reported: " + role)
cb346b49
JM
3505 group = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
3506 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67 3507 if group != properties['group_object']:
035efb2c 3508 self.exceptions = True
2ec82e67 3509 raise Exception("Unexpected Group reported: " + str(group))
cb346b49
JM
3510 go = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
3511 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67 3512 if go != '/':
035efb2c 3513 self.exceptions = True
2ec82e67
JM
3514 raise Exception("Unexpected PeerGO value: " + str(go))
3515 if self.first:
3516 self.first = False
3517 logger.info("Remove persistent group instance")
cb346b49
JM
3518 group_p2p = dbus.Interface(self.g_if_obj,
3519 WPAS_DBUS_IFACE_P2PDEVICE)
3520 group_p2p.Disconnect()
2ec82e67
JM
3521 else:
3522 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3523 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join")
3524
3525 def groupFinished(self, properties):
3526 logger.debug("groupFinished: " + str(properties))
3527 if self.waiting_end:
3528 logger.info("Remove persistent group")
3529 p2p.RemovePersistentGroup(self.persistent)
3530 else:
3531 logger.info("Re-start persistent group")
3532 params = dbus.Dictionary({'persistent_group_object':
3533 self.persistent,
3534 'frequency': 2412})
3535 p2p.GroupAdd(params)
3536
3537 def persistentGroupAdded(self, path, properties):
3538 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
3539 self.persistent = path
3540
3541 def persistentGroupRemoved(self, path):
3542 logger.debug("persistentGroupRemoved: %s" % path)
3543 self.done = True
3544 self.loop.quit()
3545
3546 def deviceFound(self, path):
3547 logger.debug("deviceFound: path=%s" % path)
3548 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3549 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3550 dbus_interface=dbus.PROPERTIES_IFACE,
3551 byte_arrays=True)
3552 logger.debug('peer properties: ' + str(self.peer))
3553
3554 def provisionDiscoveryRequestDisplayPin(self, peer_object, pin):
3555 logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin))
3556 self.peer_path = peer_object
3557 peer = binascii.unhexlify(peer_object.split('/')[-1])
fab49f61 3558 addr = ':'.join(["%02x" % i for i in struct.unpack('6B', peer)])
795b6f57 3559
fab49f61
JM
3560 params = {'Role': 'registrar',
3561 'P2PDeviceAddress': self.peer['DeviceAddress'],
3562 'Bssid': self.peer['DeviceAddress'],
3563 'Type': 'pin'}
cb346b49 3564 wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
795b6f57
JM
3565 try:
3566 wps.Start(params)
035efb2c 3567 self.exceptions = True
795b6f57 3568 raise Exception("Invalid WPS.Start() accepted")
bab493b9 3569 except dbus.exceptions.DBusException as e:
795b6f57 3570 if "InvalidArgs" not in str(e):
035efb2c 3571 self.exceptions = True
795b6f57 3572 raise Exception("Unexpected error message: " + str(e))
fab49f61
JM
3573 params = {'Role': 'registrar',
3574 'P2PDeviceAddress': self.peer['DeviceAddress'],
3575 'Type': 'pin',
3576 'Pin': '12345670'}
2ec82e67
JM
3577 logger.info("Authorize peer to connect to the group")
3578 wps.Start(params)
3579
3580 def staAuthorized(self, name):
3581 logger.debug("staAuthorized: " + name)
3582 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, self.peer_path)
3583 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3584 dbus_interface=dbus.PROPERTIES_IFACE,
3585 byte_arrays=True)
035efb2c 3586 logger.debug("Peer properties: " + str(res))
2ec82e67 3587 if 'Groups' not in res or len(res['Groups']) != 1:
035efb2c 3588 self.exceptions = True
2ec82e67
JM
3589 raise Exception("Unexpected number of peer Groups entries")
3590 if res['Groups'][0] != self.group:
035efb2c 3591 self.exceptions = True
2ec82e67
JM
3592 raise Exception("Unexpected peer Groups[0] value")
3593
3594 g_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group)
3595 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3596 dbus_interface=dbus.PROPERTIES_IFACE,
3597 byte_arrays=True)
3598 logger.debug("Group properties: " + str(res))
3599 if 'Members' not in res or len(res['Members']) != 1:
035efb2c 3600 self.exceptions = True
2ec82e67
JM
3601 raise Exception("Unexpected number of group members")
3602
15dfcb69 3603 ext = dbus.ByteArray(b"\x11\x22\x33\x44")
2ec82e67
JM
3604 # Earlier implementation of this interface was a bit strange. The
3605 # property is defined to have aay signature and that is what the
3606 # getter returned. However, the setter expected there to be a
3607 # dictionary with 'WPSVendorExtensions' as the key surrounding these
3608 # values.. The current implementations maintains support for that
3609 # for backwards compability reasons. Verify that encoding first.
fab49f61 3610 vals = dbus.Dictionary({'WPSVendorExtensions': [ext]},
2ec82e67
JM
3611 signature='sv')
3612 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3613 dbus_interface=dbus.PROPERTIES_IFACE)
3614 res = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions',
3615 dbus_interface=dbus.PROPERTIES_IFACE,
3616 byte_arrays=True)
3617 if len(res) != 1:
035efb2c 3618 self.exceptions = True
2ec82e67
JM
3619 raise Exception("Unexpected number of vendor extensions")
3620 if res[0] != ext:
035efb2c 3621 self.exceptions = True
2ec82e67
JM
3622 raise Exception("Vendor extension value changed")
3623
3624 # And now verify that the more appropriate encoding is accepted as
3625 # well.
15dfcb69 3626 res.append(dbus.ByteArray(b'\xaa\xbb\xcc\xdd\xee\xff'))
2ec82e67
JM
3627 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3628 dbus_interface=dbus.PROPERTIES_IFACE)
3629 res2 = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions',
3630 dbus_interface=dbus.PROPERTIES_IFACE,
3631 byte_arrays=True)
3632 if len(res) != 2:
035efb2c 3633 self.exceptions = True
2ec82e67
JM
3634 raise Exception("Unexpected number of vendor extensions")
3635 if res[0] != res2[0] or res[1] != res2[1]:
035efb2c 3636 self.exceptions = True
2ec82e67
JM
3637 raise Exception("Vendor extension value changed")
3638
3639 for i in range(10):
15dfcb69 3640 res.append(dbus.ByteArray(b'\xaa\xbb'))
2ec82e67
JM
3641 try:
3642 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3643 dbus_interface=dbus.PROPERTIES_IFACE)
035efb2c 3644 self.exceptions = True
2ec82e67 3645 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
bab493b9 3646 except dbus.exceptions.DBusException as e:
2ec82e67 3647 if "Error.Failed" not in str(e):
035efb2c 3648 self.exceptions = True
2ec82e67
JM
3649 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3650
fab49f61 3651 vals = dbus.Dictionary({'Foo': [ext]}, signature='sv')
2ec82e67
JM
3652 try:
3653 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3654 dbus_interface=dbus.PROPERTIES_IFACE)
035efb2c 3655 self.exceptions = True
2ec82e67 3656 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
bab493b9 3657 except dbus.exceptions.DBusException as e:
2ec82e67 3658 if "InvalidArgs" not in str(e):
035efb2c 3659 self.exceptions = True
2ec82e67
JM
3660 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3661
fab49f61 3662 vals = ["foo"]
2ec82e67
JM
3663 try:
3664 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3665 dbus_interface=dbus.PROPERTIES_IFACE)
035efb2c 3666 self.exceptions = True
2ec82e67 3667 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
bab493b9 3668 except dbus.exceptions.DBusException as e:
2ec82e67 3669 if "Error.Failed" not in str(e):
035efb2c 3670 self.exceptions = True
2ec82e67
JM
3671 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3672
fab49f61 3673 vals = [["foo"]]
2ec82e67
JM
3674 try:
3675 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3676 dbus_interface=dbus.PROPERTIES_IFACE)
035efb2c 3677 self.exceptions = True
2ec82e67 3678 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
bab493b9 3679 except dbus.exceptions.DBusException as e:
2ec82e67 3680 if "Error.Failed" not in str(e):
035efb2c 3681 self.exceptions = True
2ec82e67
JM
3682 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3683
fab49f61 3684 p2p.RemoveClient({'peer': self.peer_path})
001c4bf5 3685
2ec82e67 3686 self.waiting_end = True
cb346b49
JM
3687 group_p2p = dbus.Interface(self.g_if_obj,
3688 WPAS_DBUS_IFACE_P2PDEVICE)
3689 group_p2p.Disconnect()
2ec82e67 3690
001c4bf5
JM
3691 def staDeauthorized(self, name):
3692 logger.debug("staDeauthorized: " + name)
3693 self.deauthorized = True
3694
2ec82e67
JM
3695 def run_test(self, *args):
3696 logger.debug("run_test")
3697 params = dbus.Dictionary({'persistent': True,
3698 'frequency': 2412})
3699 logger.info("Add a persistent group")
3700 p2p.GroupAdd(params)
3701 return False
3702
3703 def success(self):
035efb2c 3704 return self.done and self.deauthorized and not self.exceptions
2ec82e67
JM
3705
3706 with TestDbusP2p(bus) as t:
3707 if not t.success():
3708 raise Exception("Expected signals not seen")
3709
3710 dev[1].wait_go_ending_session()
3711
3712def test_dbus_p2p_autogo_pbc(dev, apdev):
3713 """D-Bus P2P autonomous GO and PBC"""
fab49f61 3714 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67 3715 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2ec82e67
JM
3716
3717 addr0 = dev[0].p2p_dev_addr()
3718
3719 class TestDbusP2p(TestDbus):
3720 def __init__(self, bus):
3721 TestDbus.__init__(self, bus)
3722 self.first = True
3723 self.waiting_end = False
3724 self.done = False
3725
3726 def __enter__(self):
3727 gobject.timeout_add(1, self.run_test)
3728 gobject.timeout_add(15000, self.timeout)
3729 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3730 "DeviceFound")
3731 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3732 "GroupStarted")
3733 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3734 "GroupFinished")
3735 self.add_signal(self.provisionDiscoveryPBCRequest,
3736 WPAS_DBUS_IFACE_P2PDEVICE,
3737 "ProvisionDiscoveryPBCRequest")
3738 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3739 "StaAuthorized")
3740 self.loop.run()
3741 return self
3742
3743 def groupStarted(self, properties):
3744 logger.debug("groupStarted: " + str(properties))
3745 self.group = properties['group_object']
cb346b49
JM
3746 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3747 properties['interface_object'])
2ec82e67
JM
3748 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3749 dev1.global_request("P2P_CONNECT " + addr0 + " pbc join")
3750
3751 def groupFinished(self, properties):
3752 logger.debug("groupFinished: " + str(properties))
3753 self.done = True
3754 self.loop.quit()
3755
3756 def deviceFound(self, path):
3757 logger.debug("deviceFound: path=%s" % path)
3758 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3759 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3760 dbus_interface=dbus.PROPERTIES_IFACE,
3761 byte_arrays=True)
3762 logger.debug('peer properties: ' + str(self.peer))
3763
3764 def provisionDiscoveryPBCRequest(self, peer_object):
3765 logger.debug("provisionDiscoveryPBCRequest - peer=%s" % peer_object)
3766 self.peer_path = peer_object
3767 peer = binascii.unhexlify(peer_object.split('/')[-1])
fab49f61
JM
3768 addr = ':'.join(["%02x" % i for i in struct.unpack('6B', peer)])
3769 params = {'Role': 'registrar',
3770 'P2PDeviceAddress': self.peer['DeviceAddress'],
3771 'Type': 'pbc'}
2ec82e67 3772 logger.info("Authorize peer to connect to the group")
cb346b49 3773 wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
2ec82e67
JM
3774 wps.Start(params)
3775
3776 def staAuthorized(self, name):
3777 logger.debug("staAuthorized: " + name)
cb346b49
JM
3778 group_p2p = dbus.Interface(self.g_if_obj,
3779 WPAS_DBUS_IFACE_P2PDEVICE)
3780 group_p2p.Disconnect()
2ec82e67
JM
3781
3782 def run_test(self, *args):
3783 logger.debug("run_test")
3784 params = dbus.Dictionary({'frequency': 2412})
3785 p2p.GroupAdd(params)
3786 return False
3787
3788 def success(self):
3789 return self.done
3790
3791 with TestDbusP2p(bus) as t:
3792 if not t.success():
3793 raise Exception("Expected signals not seen")
3794
3795 dev[1].wait_go_ending_session()
3796 dev[1].flush_scan_cache()
3797
3798def test_dbus_p2p_autogo_legacy(dev, apdev):
3799 """D-Bus P2P autonomous GO and legacy STA"""
fab49f61 3800 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67 3801 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2ec82e67
JM
3802
3803 addr0 = dev[0].p2p_dev_addr()
3804
3805 class TestDbusP2p(TestDbus):
3806 def __init__(self, bus):
3807 TestDbus.__init__(self, bus)
3808 self.done = False
3809
3810 def __enter__(self):
3811 gobject.timeout_add(1, self.run_test)
3812 gobject.timeout_add(15000, self.timeout)
3813 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3814 "GroupStarted")
3815 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3816 "GroupFinished")
3817 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3818 "StaAuthorized")
3819 self.loop.run()
3820 return self
3821
3822 def groupStarted(self, properties):
3823 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
3824 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
3825 properties['group_object'])
3826 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3827 dbus_interface=dbus.PROPERTIES_IFACE,
3828 byte_arrays=True)
fab49f61 3829 bssid = ':'.join(["%02x" % i for i in struct.unpack('6B', res['BSSID'])])
cb346b49 3830
2ec82e67 3831 pin = '12345670'
fab49f61
JM
3832 params = {'Role': 'enrollee',
3833 'Type': 'pin',
3834 'Pin': pin}
cb346b49
JM
3835 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3836 properties['interface_object'])
3837 wps = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_WPS)
2ec82e67
JM
3838 wps.Start(params)
3839 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49
JM
3840 dev1.scan_for_bss(bssid, freq=2412)
3841 dev1.request("WPS_PIN " + bssid + " " + pin)
3842 self.group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2ec82e67
JM
3843
3844 def groupFinished(self, properties):
3845 logger.debug("groupFinished: " + str(properties))
3846 self.done = True
3847 self.loop.quit()
3848
3849 def staAuthorized(self, name):
3850 logger.debug("staAuthorized: " + name)
3851 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3852 dev1.request("DISCONNECT")
cb346b49 3853 self.group_p2p.Disconnect()
2ec82e67
JM
3854
3855 def run_test(self, *args):
3856 logger.debug("run_test")
3857 params = dbus.Dictionary({'frequency': 2412})
3858 p2p.GroupAdd(params)
3859 return False
3860
3861 def success(self):
3862 return self.done
3863
3864 with TestDbusP2p(bus) as t:
3865 if not t.success():
3866 raise Exception("Expected signals not seen")
3867
3868def test_dbus_p2p_join(dev, apdev):
3869 """D-Bus P2P join an autonomous GO"""
fab49f61 3870 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3871 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3872
3873 addr1 = dev[1].p2p_dev_addr()
3874 addr2 = dev[2].p2p_dev_addr()
3875 dev[1].p2p_start_go(freq=2412)
cb346b49 3876 dev1_group_ifname = dev[1].group_ifname
2ec82e67
JM
3877 dev[2].p2p_listen()
3878
3879 class TestDbusP2p(TestDbus):
3880 def __init__(self, bus):
3881 TestDbus.__init__(self, bus)
3882 self.done = False
3883 self.peer = None
3884 self.go = None
3885
3886 def __enter__(self):
3887 gobject.timeout_add(1, self.run_test)
3888 gobject.timeout_add(15000, self.timeout)
3889 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3890 "DeviceFound")
3891 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3892 "GroupStarted")
3893 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3894 "GroupFinished")
3895 self.add_signal(self.invitationResult, WPAS_DBUS_IFACE_P2PDEVICE,
3896 "InvitationResult")
3897 self.loop.run()
3898 return self
3899
3900 def deviceFound(self, path):
3901 logger.debug("deviceFound: path=%s" % path)
3902 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3903 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3904 dbus_interface=dbus.PROPERTIES_IFACE,
3905 byte_arrays=True)
3906 logger.debug('peer properties: ' + str(res))
fab49f61 3907 if addr2.replace(':', '') in path:
2ec82e67 3908 self.peer = path
fab49f61 3909 elif addr1.replace(':', '') in path:
2ec82e67
JM
3910 self.go = path
3911 if self.peer and self.go:
3912 logger.info("Join the group")
3913 p2p.StopFind()
fab49f61
JM
3914 args = {'peer': self.go,
3915 'join': True,
3916 'wps_method': 'pin',
3917 'frequency': 2412}
2ec82e67
JM
3918 pin = p2p.Connect(args)
3919
3920 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49
JM
3921 dev1.group_ifname = dev1_group_ifname
3922 dev1.group_request("WPS_PIN any " + pin)
2ec82e67
JM
3923
3924 def groupStarted(self, properties):
3925 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
3926 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3927 properties['interface_object'])
3928 role = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
3929 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67
JM
3930 if role != "client":
3931 raise Exception("Unexpected role reported: " + role)
cb346b49
JM
3932 group = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
3933 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67
JM
3934 if group != properties['group_object']:
3935 raise Exception("Unexpected Group reported: " + str(group))
cb346b49
JM
3936 go = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
3937 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67
JM
3938 if go != self.go:
3939 raise Exception("Unexpected PeerGO value: " + str(go))
3940
3941 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
3942 properties['group_object'])
3943 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3944 dbus_interface=dbus.PROPERTIES_IFACE,
3945 byte_arrays=True)
3946 logger.debug("Group properties: " + str(res))
3947
15dfcb69 3948 ext = dbus.ByteArray(b"\x11\x22\x33\x44")
2ec82e67
JM
3949 try:
3950 # Set(WPSVendorExtensions) not allowed for P2P Client
3951 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3952 dbus_interface=dbus.PROPERTIES_IFACE)
3953 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
bab493b9 3954 except dbus.exceptions.DBusException as e:
2ec82e67
JM
3955 if "Error.Failed: Failed to set property" not in str(e):
3956 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3957
cb346b49 3958 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
fab49f61
JM
3959 args = {'duration1': 30000, 'interval1': 102400,
3960 'duration2': 20000, 'interval2': 102400}
cb346b49 3961 group_p2p.PresenceRequest(args)
2ec82e67 3962
fab49f61 3963 args = {'peer': self.peer}
cb346b49 3964 group_p2p.Invite(args)
2ec82e67
JM
3965
3966 def groupFinished(self, properties):
3967 logger.debug("groupFinished: " + str(properties))
3968 self.done = True
3969 self.loop.quit()
3970
3971 def invitationResult(self, result):
3972 logger.debug("invitationResult: " + str(result))
3973 if result['status'] != 1:
3974 raise Exception("Unexpected invitation result: " + str(result))
3975 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49 3976 dev1.group_ifname = dev1_group_ifname
2ec82e67
JM
3977 dev1.remove_group()
3978
3979 def run_test(self, *args):
3980 logger.debug("run_test")
3981 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
3982 return False
3983
3984 def success(self):
3985 return self.done
3986
3987 with TestDbusP2p(bus) as t:
3988 if not t.success():
3989 raise Exception("Expected signals not seen")
3990
3991 dev[2].p2p_stop_find()
3992
1992af11
JM
3993def test_dbus_p2p_invitation_received(dev, apdev):
3994 """D-Bus P2P and InvitationReceived"""
fab49f61 3995 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1992af11
JM
3996 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3997
3998 form(dev[0], dev[1])
3999 addr0 = dev[0].p2p_dev_addr()
4000 dev[0].p2p_listen()
4001 dev[0].global_request("SET persistent_reconnect 0")
4002
4003 if not dev[1].discover_peer(addr0, social=True):
4004 raise Exception("Peer " + addr0 + " not found")
4005 peer = dev[1].get_peer(addr0)
4006
4007 class TestDbusP2p(TestDbus):
4008 def __init__(self, bus):
4009 TestDbus.__init__(self, bus)
4010 self.done = False
4011
4012 def __enter__(self):
4013 gobject.timeout_add(1, self.run_test)
4014 gobject.timeout_add(15000, self.timeout)
4015 self.add_signal(self.invitationReceived, WPAS_DBUS_IFACE_P2PDEVICE,
4016 "InvitationReceived")
4017 self.loop.run()
4018 return self
4019
4020 def invitationReceived(self, result):
4021 logger.debug("invitationReceived: " + str(result))
4022 self.done = True
4023 self.loop.quit()
4024
4025 def run_test(self, *args):
4026 logger.debug("run_test")
4027 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4028 cmd = "P2P_INVITE persistent=" + peer['persistent'] + " peer=" + addr0
4029 dev1.global_request(cmd)
4030 return False
4031
4032 def success(self):
4033 return self.done
4034
4035 with TestDbusP2p(bus) as t:
4036 if not t.success():
4037 raise Exception("Expected signals not seen")
4038
4039 dev[0].p2p_stop_find()
4040 dev[1].p2p_stop_find()
4041
2ec82e67
JM
4042def test_dbus_p2p_config(dev, apdev):
4043 """D-Bus Get/Set P2PDeviceConfig"""
4044 try:
4045 _test_dbus_p2p_config(dev, apdev)
4046 finally:
4047 dev[0].request("P2P_SET ssid_postfix ")
4048
4049def _test_dbus_p2p_config(dev, apdev):
fab49f61 4050 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
4051 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4052
4053 res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4054 dbus_interface=dbus.PROPERTIES_IFACE,
4055 byte_arrays=True)
4056 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", res,
4057 dbus_interface=dbus.PROPERTIES_IFACE)
4058 res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4059 dbus_interface=dbus.PROPERTIES_IFACE,
4060 byte_arrays=True)
4061
4062 if len(res) != len(res2):
4063 raise Exception("Different number of parameters")
4064 for k in res:
4065 if res[k] != res2[k]:
4066 raise Exception("Parameter %s value changes" % k)
4067
fab49f61
JM
4068 changes = {'SsidPostfix': 'foo',
4069 'VendorExtension': [dbus.ByteArray(b'\x11\x22\x33\x44')],
4070 'SecondaryDeviceTypes': [dbus.ByteArray(b'\x11\x22\x33\x44\x55\x66\x77\x88')]}
2ec82e67
JM
4071 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4072 dbus.Dictionary(changes, signature='sv'),
4073 dbus_interface=dbus.PROPERTIES_IFACE)
4074
4075 res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4076 dbus_interface=dbus.PROPERTIES_IFACE,
4077 byte_arrays=True)
4078 logger.debug("P2PDeviceConfig: " + str(res2))
4079 if 'VendorExtension' not in res2 or len(res2['VendorExtension']) != 1:
4080 raise Exception("VendorExtension does not match")
4081 if 'SecondaryDeviceTypes' not in res2 or len(res2['SecondaryDeviceTypes']) != 1:
4082 raise Exception("SecondaryDeviceType does not match")
4083
fab49f61
JM
4084 changes = {'SsidPostfix': '',
4085 'VendorExtension': dbus.Array([], signature="ay"),
4086 'SecondaryDeviceTypes': dbus.Array([], signature="ay")}
2ec82e67
JM
4087 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4088 dbus.Dictionary(changes, signature='sv'),
4089 dbus_interface=dbus.PROPERTIES_IFACE)
4090
4091 res3 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4092 dbus_interface=dbus.PROPERTIES_IFACE,
4093 byte_arrays=True)
4094 logger.debug("P2PDeviceConfig: " + str(res3))
4095 if 'VendorExtension' in res3:
4096 raise Exception("VendorExtension not removed")
4097 if 'SecondaryDeviceTypes' in res3:
4098 raise Exception("SecondaryDeviceType not removed")
4099
4100 try:
4101 dev[0].request("P2P_SET disabled 1")
4102 if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4103 dbus_interface=dbus.PROPERTIES_IFACE,
4104 byte_arrays=True)
4105 raise Exception("Invalid Get(P2PDeviceConfig) accepted")
bab493b9 4106 except dbus.exceptions.DBusException as e:
2ec82e67
JM
4107 if "Error.Failed: P2P is not available for this interface" not in str(e):
4108 raise Exception("Unexpected error message for invalid Invite: " + str(e))
4109 finally:
4110 dev[0].request("P2P_SET disabled 0")
4111
4112 try:
4113 dev[0].request("P2P_SET disabled 1")
fab49f61 4114 changes = {'SsidPostfix': 'foo'}
2ec82e67
JM
4115 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4116 dbus.Dictionary(changes, signature='sv'),
4117 dbus_interface=dbus.PROPERTIES_IFACE)
4118 raise Exception("Invalid Set(P2PDeviceConfig) accepted")
bab493b9 4119 except dbus.exceptions.DBusException as e:
2ec82e67
JM
4120 if "Error.Failed: P2P is not available for this interface" not in str(e):
4121 raise Exception("Unexpected error message for invalid Invite: " + str(e))
4122 finally:
4123 dev[0].request("P2P_SET disabled 0")
4124
fab49f61
JM
4125 tests = [{'DeviceName': 123},
4126 {'SsidPostfix': 123},
4127 {'Foo': 'Bar'}]
2ec82e67
JM
4128 for changes in tests:
4129 try:
4130 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4131 dbus.Dictionary(changes, signature='sv'),
4132 dbus_interface=dbus.PROPERTIES_IFACE)
4133 raise Exception("Invalid Set(P2PDeviceConfig) accepted")
bab493b9 4134 except dbus.exceptions.DBusException as e:
2ec82e67
JM
4135 if "InvalidArgs" not in str(e):
4136 raise Exception("Unexpected error message for invalid Invite: " + str(e))
4137
4138def test_dbus_p2p_persistent(dev, apdev):
4139 """D-Bus P2P persistent group"""
fab49f61 4140 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
4141 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4142
4143 class TestDbusP2p(TestDbus):
4144 def __init__(self, bus):
4145 TestDbus.__init__(self, bus)
4146
4147 def __enter__(self):
4148 gobject.timeout_add(1, self.run_test)
4149 gobject.timeout_add(15000, self.timeout)
4150 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4151 "GroupStarted")
4152 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4153 "GroupFinished")
4154 self.add_signal(self.persistentGroupAdded,
4155 WPAS_DBUS_IFACE_P2PDEVICE,
4156 "PersistentGroupAdded")
4157 self.loop.run()
4158 return self
4159
4160 def groupStarted(self, properties):
4161 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
4162 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4163 properties['interface_object'])
4164 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4165 group_p2p.Disconnect()
2ec82e67
JM
4166
4167 def groupFinished(self, properties):
4168 logger.debug("groupFinished: " + str(properties))
4169 self.loop.quit()
4170
4171 def persistentGroupAdded(self, path, properties):
4172 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
4173 self.persistent = path
4174
4175 def run_test(self, *args):
4176 logger.debug("run_test")
4177 params = dbus.Dictionary({'persistent': True,
4178 'frequency': 2412})
4179 logger.info("Add a persistent group")
4180 p2p.GroupAdd(params)
4181 return False
4182
4183 def success(self):
4184 return True
4185
4186 with TestDbusP2p(bus) as t:
4187 if not t.success():
4188 raise Exception("Expected signals not seen")
4189 persistent = t.persistent
4190
4191 p_obj = bus.get_object(WPAS_DBUS_SERVICE, persistent)
4192 res = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties",
4193 dbus_interface=dbus.PROPERTIES_IFACE, byte_arrays=True)
4194 logger.info("Persistent group Properties: " + str(res))
fab49f61 4195 vals = dbus.Dictionary({'ssid': 'DIRECT-foo'}, signature='sv')
2ec82e67
JM
4196 p_obj.Set(WPAS_DBUS_PERSISTENT_GROUP, "Properties", vals,
4197 dbus_interface=dbus.PROPERTIES_IFACE)
4198 res2 = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties",
4199 dbus_interface=dbus.PROPERTIES_IFACE)
4200 if len(res) != len(res2):
4201 raise Exception("Different number of parameters")
4202 for k in res:
4203 if k != 'ssid' and res[k] != res2[k]:
4204 raise Exception("Parameter %s value changes" % k)
4205 if res2['ssid'] != '"DIRECT-foo"':
4206 raise Exception("Unexpected ssid")
4207
fab49f61
JM
4208 args = dbus.Dictionary({'ssid': 'DIRECT-testing',
4209 'psk': '1234567890'}, signature='sv')
2ec82e67
JM
4210 group = p2p.AddPersistentGroup(args)
4211
4212 groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups",
4213 dbus_interface=dbus.PROPERTIES_IFACE)
4214 if len(groups) != 2:
4215 raise Exception("Unexpected number of persistent groups: " + str(groups))
4216
4217 p2p.RemoveAllPersistentGroups()
4218
4219 groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups",
4220 dbus_interface=dbus.PROPERTIES_IFACE)
4221 if len(groups) != 0:
4222 raise Exception("Unexpected number of persistent groups: " + str(groups))
4223
4224 try:
4225 p2p.RemovePersistentGroup(persistent)
4226 raise Exception("Invalid RemovePersistentGroup accepted")
bab493b9 4227 except dbus.exceptions.DBusException as e:
2ec82e67
JM
4228 if "NetworkUnknown: There is no such persistent group" not in str(e):
4229 raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e))
4230
4231def test_dbus_p2p_reinvoke_persistent(dev, apdev):
4232 """D-Bus P2P reinvoke persistent group"""
fab49f61 4233 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67 4234 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2ec82e67
JM
4235
4236 addr0 = dev[0].p2p_dev_addr()
4237
4238 class TestDbusP2p(TestDbus):
4239 def __init__(self, bus):
4240 TestDbus.__init__(self, bus)
4241 self.first = True
4242 self.waiting_end = False
4243 self.done = False
4244 self.invited = False
4245
4246 def __enter__(self):
4247 gobject.timeout_add(1, self.run_test)
4248 gobject.timeout_add(15000, self.timeout)
4249 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4250 "DeviceFound")
4251 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4252 "GroupStarted")
4253 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4254 "GroupFinished")
4255 self.add_signal(self.persistentGroupAdded,
4256 WPAS_DBUS_IFACE_P2PDEVICE,
4257 "PersistentGroupAdded")
4258 self.add_signal(self.provisionDiscoveryRequestDisplayPin,
4259 WPAS_DBUS_IFACE_P2PDEVICE,
4260 "ProvisionDiscoveryRequestDisplayPin")
4261 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
4262 "StaAuthorized")
4263 self.loop.run()
4264 return self
4265
4266 def groupStarted(self, properties):
4267 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
4268 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4269 properties['interface_object'])
2ec82e67 4270 if not self.invited:
cb346b49
JM
4271 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
4272 properties['group_object'])
4273 res = g_obj.GetAll(WPAS_DBUS_GROUP,
4274 dbus_interface=dbus.PROPERTIES_IFACE,
4275 byte_arrays=True)
fab49f61 4276 bssid = ':'.join(["%02x" % i for i in struct.unpack('6B', res['BSSID'])])
2ec82e67 4277 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49 4278 dev1.scan_for_bss(bssid, freq=2412)
2ec82e67
JM
4279 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join")
4280
4281 def groupFinished(self, properties):
4282 logger.debug("groupFinished: " + str(properties))
4283 if self.invited:
4284 self.done = True
4285 self.loop.quit()
4286 else:
4287 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49 4288 dev1.global_request("SET persistent_reconnect 1")
2ec82e67
JM
4289 dev1.p2p_listen()
4290
fab49f61
JM
4291 args = {'persistent_group_object': dbus.ObjectPath(path),
4292 'peer': self.peer_path}
2ec82e67
JM
4293 try:
4294 pin = p2p.Invite(args)
4295 raise Exception("Invalid Invite accepted")
bab493b9 4296 except dbus.exceptions.DBusException as e:
2ec82e67
JM
4297 if "InvalidArgs" not in str(e):
4298 raise Exception("Unexpected error message for invalid Invite: " + str(e))
4299
fab49f61
JM
4300 args = {'persistent_group_object': self.persistent,
4301 'peer': self.peer_path}
2ec82e67
JM
4302 pin = p2p.Invite(args)
4303 self.invited = True
4304
cb346b49
JM
4305 self.sta_group_ev = dev1.wait_global_event(["P2P-GROUP-STARTED"],
4306 timeout=15)
4307 if self.sta_group_ev is None:
4308 raise Exception("P2P-GROUP-STARTED event not seen")
4309
2ec82e67
JM
4310 def persistentGroupAdded(self, path, properties):
4311 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
4312 self.persistent = path
4313
4314 def deviceFound(self, path):
4315 logger.debug("deviceFound: path=%s" % path)
4316 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
4317 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
4318 dbus_interface=dbus.PROPERTIES_IFACE,
4319 byte_arrays=True)
4320
4321 def provisionDiscoveryRequestDisplayPin(self, peer_object, pin):
4322 logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin))
4323 self.peer_path = peer_object
4324 peer = binascii.unhexlify(peer_object.split('/')[-1])
fab49f61
JM
4325 addr = ':'.join(["%02x" % i for i in struct.unpack('6B', peer)])
4326 params = {'Role': 'registrar',
4327 'P2PDeviceAddress': self.peer['DeviceAddress'],
4328 'Bssid': self.peer['DeviceAddress'],
4329 'Type': 'pin',
4330 'Pin': '12345670'}
2ec82e67 4331 logger.info("Authorize peer to connect to the group")
cb346b49
JM
4332 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4333 wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
2ec82e67 4334 wps.Start(params)
cb346b49
JM
4335 self.sta_group_ev = dev1.wait_global_event(["P2P-GROUP-STARTED"],
4336 timeout=15)
4337 if self.sta_group_ev is None:
4338 raise Exception("P2P-GROUP-STARTED event not seen")
2ec82e67
JM
4339
4340 def staAuthorized(self, name):
4341 logger.debug("staAuthorized: " + name)
4342 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49 4343 dev1.group_form_result(self.sta_group_ev)
2ec82e67 4344 dev1.remove_group()
cb346b49 4345 ev = dev1.wait_global_event(["P2P-GROUP-REMOVED"], timeout=10)
2ec82e67
JM
4346 if ev is None:
4347 raise Exception("Group removal timed out")
cb346b49
JM
4348 group_p2p = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4349 group_p2p.Disconnect()
2ec82e67
JM
4350
4351 def run_test(self, *args):
4352 logger.debug("run_test")
4353 params = dbus.Dictionary({'persistent': True,
4354 'frequency': 2412})
4355 logger.info("Add a persistent group")
4356 p2p.GroupAdd(params)
4357 return False
4358
4359 def success(self):
4360 return self.done
4361
4362 with TestDbusP2p(bus) as t:
4363 if not t.success():
4364 raise Exception("Expected signals not seen")
4365
4366def test_dbus_p2p_go_neg_rx(dev, apdev):
4367 """D-Bus P2P GO Negotiation receive"""
fab49f61 4368 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
4369 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4370 addr0 = dev[0].p2p_dev_addr()
4371
4372 class TestDbusP2p(TestDbus):
4373 def __init__(self, bus):
4374 TestDbus.__init__(self, bus)
4375 self.done = False
4376
4377 def __enter__(self):
4378 gobject.timeout_add(1, self.run_test)
4379 gobject.timeout_add(15000, self.timeout)
4380 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4381 "DeviceFound")
4382 self.add_signal(self.goNegotiationRequest,
4383 WPAS_DBUS_IFACE_P2PDEVICE,
4384 "GONegotiationRequest",
4385 byte_arrays=True)
4386 self.add_signal(self.goNegotiationSuccess,
4387 WPAS_DBUS_IFACE_P2PDEVICE,
4388 "GONegotiationSuccess",
4389 byte_arrays=True)
4390 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4391 "GroupStarted")
4392 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4393 "GroupFinished")
4394 self.loop.run()
4395 return self
4396
4397 def deviceFound(self, path):
4398 logger.debug("deviceFound: path=%s" % path)
4399
f5d5161d
JM
4400 def goNegotiationRequest(self, path, dev_passwd_id, go_intent=0):
4401 logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path, dev_passwd_id, go_intent))
2ec82e67
JM
4402 if dev_passwd_id != 1:
4403 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id)
fab49f61
JM
4404 args = {'peer': path, 'wps_method': 'display', 'pin': '12345670',
4405 'go_intent': 15, 'persistent': False, 'frequency': 5175}
2ec82e67
JM
4406 try:
4407 p2p.Connect(args)
4408 raise Exception("Invalid Connect accepted")
bab493b9 4409 except dbus.exceptions.DBusException as e:
2ec82e67
JM
4410 if "ConnectChannelUnsupported" not in str(e):
4411 raise Exception("Unexpected error message for invalid Connect: " + str(e))
4412
fab49f61
JM
4413 args = {'peer': path, 'wps_method': 'display', 'pin': '12345670',
4414 'go_intent': 15, 'persistent': False}
2ec82e67
JM
4415 p2p.Connect(args)
4416
4417 def goNegotiationSuccess(self, properties):
4418 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4419
4420 def groupStarted(self, properties):
4421 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
4422 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4423 properties['interface_object'])
4424 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4425 group_p2p.Disconnect()
2ec82e67
JM
4426
4427 def groupFinished(self, properties):
4428 logger.debug("groupFinished: " + str(properties))
4429 self.done = True
4430 self.loop.quit()
4431
4432 def run_test(self, *args):
4433 logger.debug("run_test")
4434 p2p.Listen(10)
4435 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4436 if not dev1.discover_peer(addr0):
4437 raise Exception("Peer not found")
4438 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 enter")
4439 return False
4440
4441 def success(self):
4442 return self.done
4443
4444 with TestDbusP2p(bus) as t:
4445 if not t.success():
4446 raise Exception("Expected signals not seen")
4447
4448def test_dbus_p2p_go_neg_auth(dev, apdev):
4449 """D-Bus P2P GO Negotiation authorized"""
fab49f61 4450 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
4451 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4452 addr0 = dev[0].p2p_dev_addr()
4453 dev[1].p2p_listen()
4454
4455 class TestDbusP2p(TestDbus):
4456 def __init__(self, bus):
4457 TestDbus.__init__(self, bus)
4458 self.done = False
4459 self.peer_joined = False
4460 self.peer_disconnected = False
4461
4462 def __enter__(self):
4463 gobject.timeout_add(1, self.run_test)
4464 gobject.timeout_add(15000, self.timeout)
4465 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4466 "DeviceFound")
4467 self.add_signal(self.goNegotiationSuccess,
4468 WPAS_DBUS_IFACE_P2PDEVICE,
4469 "GONegotiationSuccess",
4470 byte_arrays=True)
4471 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4472 "GroupStarted")
4473 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4474 "GroupFinished")
4475 self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE,
4476 "StaDeauthorized")
4477 self.add_signal(self.peerJoined, WPAS_DBUS_GROUP,
4478 "PeerJoined")
4479 self.add_signal(self.peerDisconnected, WPAS_DBUS_GROUP,
4480 "PeerDisconnected")
4481 self.loop.run()
4482 return self
4483
4484 def deviceFound(self, path):
4485 logger.debug("deviceFound: path=%s" % path)
fab49f61
JM
4486 args = {'peer': path, 'wps_method': 'keypad',
4487 'go_intent': 15, 'authorize_only': True}
2ec82e67
JM
4488 try:
4489 p2p.Connect(args)
4490 raise Exception("Invalid Connect accepted")
bab493b9 4491 except dbus.exceptions.DBusException as e:
2ec82e67
JM
4492 if "InvalidArgs" not in str(e):
4493 raise Exception("Unexpected error message for invalid Connect: " + str(e))
4494
fab49f61
JM
4495 args = {'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4496 'go_intent': 15, 'authorize_only': True}
2ec82e67
JM
4497 p2p.Connect(args)
4498 p2p.Listen(10)
4499 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4500 if not dev1.discover_peer(addr0):
4501 raise Exception("Peer not found")
4502 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=0")
bc6e3288 4503 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
2ec82e67
JM
4504 if ev is None:
4505 raise Exception("Group formation timed out")
cb346b49 4506 self.sta_group_ev = ev
2ec82e67
JM
4507
4508 def goNegotiationSuccess(self, properties):
4509 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4510
4511 def groupStarted(self, properties):
4512 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
4513 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4514 properties['interface_object'])
2ec82e67 4515 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49 4516 dev1.group_form_result(self.sta_group_ev)
2ec82e67
JM
4517 dev1.remove_group()
4518
4519 def staDeauthorized(self, name):
4520 logger.debug("staDeuthorized: " + name)
cb346b49
JM
4521 group_p2p = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4522 group_p2p.Disconnect()
2ec82e67
JM
4523
4524 def peerJoined(self, peer):
4525 logger.debug("peerJoined: " + peer)
4526 self.peer_joined = True
4527
4528 def peerDisconnected(self, peer):
4529 logger.debug("peerDisconnected: " + peer)
4530 self.peer_disconnected = True
4531
4532 def groupFinished(self, properties):
4533 logger.debug("groupFinished: " + str(properties))
4534 self.done = True
4535 self.loop.quit()
4536
4537 def run_test(self, *args):
4538 logger.debug("run_test")
4539 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4540 return False
4541
4542 def success(self):
4543 return self.done and self.peer_joined and self.peer_disconnected
4544
4545 with TestDbusP2p(bus) as t:
4546 if not t.success():
4547 raise Exception("Expected signals not seen")
4548
4549def test_dbus_p2p_go_neg_init(dev, apdev):
4550 """D-Bus P2P GO Negotiation initiation"""
fab49f61 4551 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
4552 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4553 addr0 = dev[0].p2p_dev_addr()
4554 dev[1].p2p_listen()
4555
4556 class TestDbusP2p(TestDbus):
4557 def __init__(self, bus):
4558 TestDbus.__init__(self, bus)
4559 self.done = False
20fd8def
JM
4560 self.peer_group_added = False
4561 self.peer_group_removed = False
2ec82e67
JM
4562
4563 def __enter__(self):
4564 gobject.timeout_add(1, self.run_test)
4565 gobject.timeout_add(15000, self.timeout)
4566 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4567 "DeviceFound")
4568 self.add_signal(self.goNegotiationSuccess,
4569 WPAS_DBUS_IFACE_P2PDEVICE,
4570 "GONegotiationSuccess",
4571 byte_arrays=True)
4572 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4573 "GroupStarted")
4574 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4575 "GroupFinished")
20fd8def
JM
4576 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4577 "PropertiesChanged")
2ec82e67
JM
4578 self.loop.run()
4579 return self
4580
4581 def deviceFound(self, path):
4582 logger.debug("deviceFound: path=%s" % path)
4583 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
fab49f61
JM
4584 args = {'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4585 'go_intent': 0}
2ec82e67
JM
4586 p2p.Connect(args)
4587
4588 ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4589 if ev is None:
4590 raise Exception("Timeout while waiting for GO Neg Request")
4591 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
bc6e3288 4592 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
2ec82e67
JM
4593 if ev is None:
4594 raise Exception("Group formation timed out")
cb346b49 4595 self.sta_group_ev = ev
6541b9db
JM
4596 dev1.close_monitor_global()
4597 dev1.close_monitor_mon()
4598 dev1 = None
2ec82e67
JM
4599
4600 def goNegotiationSuccess(self, properties):
4601 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4602
4603 def groupStarted(self, properties):
4604 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
4605 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4606 properties['interface_object'])
4607 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4608 group_p2p.Disconnect()
6541b9db 4609 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1', monitor=False)
cb346b49 4610 dev1.group_form_result(self.sta_group_ev)
2ec82e67 4611 dev1.remove_group()
6541b9db 4612 dev1 = None
2ec82e67
JM
4613
4614 def groupFinished(self, properties):
4615 logger.debug("groupFinished: " + str(properties))
4616 self.done = True
20fd8def
JM
4617
4618 def propertiesChanged(self, interface_name, changed_properties,
4619 invalidated_properties):
4620 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4621 if interface_name != WPAS_DBUS_P2P_PEER:
4622 return
4623 if "Groups" not in changed_properties:
4624 return
4625 if len(changed_properties["Groups"]) > 0:
4626 self.peer_group_added = True
4627 if len(changed_properties["Groups"]) == 0:
3301e925
JM
4628 if not self.peer_group_added:
4629 # This is likely a leftover event from an earlier test case,
4630 # ignore it to allow this test case to go through its steps.
4631 logger.info("Ignore propertiesChanged indicating group removal before group has been added")
4632 return
20fd8def
JM
4633 self.peer_group_removed = True
4634 self.loop.quit()
2ec82e67
JM
4635
4636 def run_test(self, *args):
4637 logger.debug("run_test")
4638 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4639 return False
4640
4641 def success(self):
20fd8def
JM
4642 return self.done and self.peer_group_added and self.peer_group_removed
4643
4644 with TestDbusP2p(bus) as t:
4645 if not t.success():
4646 raise Exception("Expected signals not seen")
4647
4648def test_dbus_p2p_group_termination_by_go(dev, apdev):
4649 """D-Bus P2P group removal on GO terminating the group"""
fab49f61 4650 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
20fd8def
JM
4651 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4652 addr0 = dev[0].p2p_dev_addr()
4653 dev[1].p2p_listen()
4654
4655 class TestDbusP2p(TestDbus):
4656 def __init__(self, bus):
4657 TestDbus.__init__(self, bus)
4658 self.done = False
4659 self.peer_group_added = False
4660 self.peer_group_removed = False
4661
4662 def __enter__(self):
4663 gobject.timeout_add(1, self.run_test)
4664 gobject.timeout_add(15000, self.timeout)
4665 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4666 "DeviceFound")
4667 self.add_signal(self.goNegotiationSuccess,
4668 WPAS_DBUS_IFACE_P2PDEVICE,
4669 "GONegotiationSuccess",
4670 byte_arrays=True)
4671 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4672 "GroupStarted")
4673 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4674 "GroupFinished")
4675 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4676 "PropertiesChanged")
4677 self.loop.run()
4678 return self
4679
4680 def deviceFound(self, path):
4681 logger.debug("deviceFound: path=%s" % path)
4682 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
fab49f61
JM
4683 args = {'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4684 'go_intent': 0}
20fd8def
JM
4685 p2p.Connect(args)
4686
4687 ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4688 if ev is None:
4689 raise Exception("Timeout while waiting for GO Neg Request")
4690 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
bc6e3288 4691 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
20fd8def
JM
4692 if ev is None:
4693 raise Exception("Group formation timed out")
4694 self.sta_group_ev = ev
6541b9db
JM
4695 dev1.close_monitor_global()
4696 dev1.close_monitor_mon()
4697 dev1 = None
20fd8def
JM
4698
4699 def goNegotiationSuccess(self, properties):
4700 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4701
4702 def groupStarted(self, properties):
4703 logger.debug("groupStarted: " + str(properties))
4704 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4705 properties['interface_object'])
6541b9db 4706 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1', monitor=False)
20fd8def
JM
4707 dev1.group_form_result(self.sta_group_ev)
4708 dev1.remove_group()
4709
4710 def groupFinished(self, properties):
4711 logger.debug("groupFinished: " + str(properties))
4712 self.done = True
4713
4714 def propertiesChanged(self, interface_name, changed_properties,
4715 invalidated_properties):
4716 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4717 if interface_name != WPAS_DBUS_P2P_PEER:
4718 return
4719 if "Groups" not in changed_properties:
4720 return
4721 if len(changed_properties["Groups"]) > 0:
4722 self.peer_group_added = True
6fad40df 4723 if len(changed_properties["Groups"]) == 0 and self.peer_group_added:
20fd8def
JM
4724 self.peer_group_removed = True
4725 self.loop.quit()
4726
4727 def run_test(self, *args):
4728 logger.debug("run_test")
4729 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4730 return False
4731
4732 def success(self):
4733 return self.done and self.peer_group_added and self.peer_group_removed
4734
4735 with TestDbusP2p(bus) as t:
4736 if not t.success():
4737 raise Exception("Expected signals not seen")
4738
4739def test_dbus_p2p_group_idle_timeout(dev, apdev):
4740 """D-Bus P2P group removal on idle timeout"""
4741 try:
4742 dev[0].global_request("SET p2p_group_idle 1")
4743 _test_dbus_p2p_group_idle_timeout(dev, apdev)
4744 finally:
4745 dev[0].global_request("SET p2p_group_idle 0")
4746
4747def _test_dbus_p2p_group_idle_timeout(dev, apdev):
fab49f61 4748 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
20fd8def
JM
4749 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4750 addr0 = dev[0].p2p_dev_addr()
4751 dev[1].p2p_listen()
4752
4753 class TestDbusP2p(TestDbus):
4754 def __init__(self, bus):
4755 TestDbus.__init__(self, bus)
4756 self.done = False
845d48c1 4757 self.group_started = False
20fd8def
JM
4758 self.peer_group_added = False
4759 self.peer_group_removed = False
4760
4761 def __enter__(self):
4762 gobject.timeout_add(1, self.run_test)
4763 gobject.timeout_add(15000, self.timeout)
4764 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4765 "DeviceFound")
4766 self.add_signal(self.goNegotiationSuccess,
4767 WPAS_DBUS_IFACE_P2PDEVICE,
4768 "GONegotiationSuccess",
4769 byte_arrays=True)
4770 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4771 "GroupStarted")
4772 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4773 "GroupFinished")
4774 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4775 "PropertiesChanged")
4776 self.loop.run()
4777 return self
4778
4779 def deviceFound(self, path):
4780 logger.debug("deviceFound: path=%s" % path)
4781 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
fab49f61
JM
4782 args = {'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4783 'go_intent': 0}
20fd8def
JM
4784 p2p.Connect(args)
4785
4786 ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4787 if ev is None:
4788 raise Exception("Timeout while waiting for GO Neg Request")
4789 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
bc6e3288 4790 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
20fd8def
JM
4791 if ev is None:
4792 raise Exception("Group formation timed out")
4793 self.sta_group_ev = ev
6541b9db
JM
4794 dev1.close_monitor_global()
4795 dev1.close_monitor_mon()
4796 dev1 = None
20fd8def
JM
4797
4798 def goNegotiationSuccess(self, properties):
4799 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4800
4801 def groupStarted(self, properties):
4802 logger.debug("groupStarted: " + str(properties))
845d48c1 4803 self.group_started = True
20fd8def
JM
4804 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4805 properties['interface_object'])
6541b9db 4806 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1', monitor=False)
20fd8def
JM
4807 dev1.group_form_result(self.sta_group_ev)
4808 ifaddr = dev1.group_request("STA-FIRST").splitlines()[0]
4809 # Force disassociation with different reason code so that the
4810 # P2P Client using D-Bus does not get normal group termination event
4811 # from the GO.
4812 dev1.group_request("DEAUTHENTICATE " + ifaddr + " reason=0 test=0")
4813 dev1.remove_group()
4814
4815 def groupFinished(self, properties):
4816 logger.debug("groupFinished: " + str(properties))
4817 self.done = True
4818
4819 def propertiesChanged(self, interface_name, changed_properties,
4820 invalidated_properties):
4821 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4822 if interface_name != WPAS_DBUS_P2P_PEER:
4823 return
845d48c1
JM
4824 if not self.group_started:
4825 return
20fd8def
JM
4826 if "Groups" not in changed_properties:
4827 return
4828 if len(changed_properties["Groups"]) > 0:
4829 self.peer_group_added = True
4830 if len(changed_properties["Groups"]) == 0:
4831 self.peer_group_removed = True
4832 self.loop.quit()
4833
4834 def run_test(self, *args):
4835 logger.debug("run_test")
4836 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4837 return False
4838
4839 def success(self):
4840 return self.done and self.peer_group_added and self.peer_group_removed
2ec82e67
JM
4841
4842 with TestDbusP2p(bus) as t:
4843 if not t.success():
4844 raise Exception("Expected signals not seen")
4845
4846def test_dbus_p2p_wps_failure(dev, apdev):
4847 """D-Bus P2P WPS failure"""
fab49f61 4848 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
4849 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4850 addr0 = dev[0].p2p_dev_addr()
4851
4852 class TestDbusP2p(TestDbus):
4853 def __init__(self, bus):
4854 TestDbus.__init__(self, bus)
084780f1
JM
4855 self.wps_failed = False
4856 self.formation_failure = False
2ec82e67
JM
4857
4858 def __enter__(self):
4859 gobject.timeout_add(1, self.run_test)
4860 gobject.timeout_add(15000, self.timeout)
4861 self.add_signal(self.goNegotiationRequest,
4862 WPAS_DBUS_IFACE_P2PDEVICE,
4863 "GONegotiationRequest",
4864 byte_arrays=True)
4865 self.add_signal(self.goNegotiationSuccess,
4866 WPAS_DBUS_IFACE_P2PDEVICE,
4867 "GONegotiationSuccess",
4868 byte_arrays=True)
4869 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4870 "GroupStarted")
4871 self.add_signal(self.wpsFailed, WPAS_DBUS_IFACE_P2PDEVICE,
4872 "WpsFailed")
084780f1
JM
4873 self.add_signal(self.groupFormationFailure,
4874 WPAS_DBUS_IFACE_P2PDEVICE,
4875 "GroupFormationFailure")
2ec82e67
JM
4876 self.loop.run()
4877 return self
4878
f5d5161d
JM
4879 def goNegotiationRequest(self, path, dev_passwd_id, go_intent=0):
4880 logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path, dev_passwd_id, go_intent))
2ec82e67
JM
4881 if dev_passwd_id != 1:
4882 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id)
fab49f61
JM
4883 args = {'peer': path, 'wps_method': 'display', 'pin': '12345670',
4884 'go_intent': 15}
2ec82e67
JM
4885 p2p.Connect(args)
4886
4887 def goNegotiationSuccess(self, properties):
4888 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4889
4890 def groupStarted(self, properties):
4891 logger.debug("groupStarted: " + str(properties))
4892 raise Exception("Unexpected GroupStarted")
4893
4894 def wpsFailed(self, name, args):
4895 logger.debug("wpsFailed - name=%s args=%s" % (name, str(args)))
084780f1
JM
4896 self.wps_failed = True
4897 if self.formation_failure:
4898 self.loop.quit()
4899
4900 def groupFormationFailure(self, reason):
4901 logger.debug("groupFormationFailure - reason=%s" % reason)
4902 self.formation_failure = True
4903 if self.wps_failed:
4904 self.loop.quit()
2ec82e67
JM
4905
4906 def run_test(self, *args):
4907 logger.debug("run_test")
4908 p2p.Listen(10)
4909 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4910 if not dev1.discover_peer(addr0):
4911 raise Exception("Peer not found")
4912 dev1.global_request("P2P_CONNECT " + addr0 + " 87654321 enter")
4913 return False
4914
4915 def success(self):
084780f1 4916 return self.wps_failed and self.formation_failure
2ec82e67
JM
4917
4918 with TestDbusP2p(bus) as t:
4919 if not t.success():
4920 raise Exception("Expected signals not seen")
4921
4922def test_dbus_p2p_two_groups(dev, apdev):
4923 """D-Bus P2P with two concurrent groups"""
fab49f61 4924 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
4925 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4926
4927 dev[0].request("SET p2p_no_group_iface 0")
4928 addr0 = dev[0].p2p_dev_addr()
4929 addr1 = dev[1].p2p_dev_addr()
4930 addr2 = dev[2].p2p_dev_addr()
4931 dev[1].p2p_start_go(freq=2412)
cb346b49 4932 dev1_group_ifname = dev[1].group_ifname
2ec82e67
JM
4933
4934 class TestDbusP2p(TestDbus):
4935 def __init__(self, bus):
4936 TestDbus.__init__(self, bus)
4937 self.done = False
4938 self.peer = None
4939 self.go = None
4940 self.group1 = None
4941 self.group2 = None
5a217649 4942 self.groups_removed = False
2ec82e67
JM
4943
4944 def __enter__(self):
4945 gobject.timeout_add(1, self.run_test)
4946 gobject.timeout_add(15000, self.timeout)
4947 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4948 "PropertiesChanged", byte_arrays=True)
4949 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4950 "DeviceFound")
4951 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4952 "GroupStarted")
4953 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4954 "GroupFinished")
4955 self.add_signal(self.peerJoined, WPAS_DBUS_GROUP,
4956 "PeerJoined")
4957 self.loop.run()
4958 return self
4959
4960 def propertiesChanged(self, interface_name, changed_properties,
4961 invalidated_properties):
4962 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4963
4964 def deviceFound(self, path):
4965 logger.debug("deviceFound: path=%s" % path)
fab49f61 4966 if addr2.replace(':', '') in path:
2ec82e67 4967 self.peer = path
fab49f61 4968 elif addr1.replace(':', '') in path:
2ec82e67
JM
4969 self.go = path
4970 if self.go and not self.group1:
4971 logger.info("Join the group")
4972 p2p.StopFind()
4973 pin = '12345670'
4974 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49
JM
4975 dev1.group_ifname = dev1_group_ifname
4976 dev1.group_request("WPS_PIN any " + pin)
fab49f61
JM
4977 args = {'peer': self.go,
4978 'join': True,
4979 'wps_method': 'pin',
4980 'pin': pin,
4981 'frequency': 2412}
2ec82e67
JM
4982 p2p.Connect(args)
4983
4984 def groupStarted(self, properties):
4985 logger.debug("groupStarted: " + str(properties))
4986 prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
4987 dbus_interface=dbus.PROPERTIES_IFACE)
4988 logger.debug("p2pdevice properties: " + str(prop))
4989
4990 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
4991 properties['group_object'])
4992 res = g_obj.GetAll(WPAS_DBUS_GROUP,
4993 dbus_interface=dbus.PROPERTIES_IFACE,
4994 byte_arrays=True)
4995 logger.debug("Group properties: " + str(res))
4996
4997 if not self.group1:
4998 self.group1 = properties['group_object']
4999 self.group1iface = properties['interface_object']
5000 self.g1_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
5001 self.group1iface)
5002
5003 logger.info("Start autonomous GO")
fab49f61 5004 params = dbus.Dictionary({'frequency': 2412})
2ec82e67
JM
5005 p2p.GroupAdd(params)
5006 elif not self.group2:
5007 self.group2 = properties['group_object']
5008 self.group2iface = properties['interface_object']
5009 self.g2_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
5010 self.group2iface)
5011 self.g2_bssid = res['BSSID']
5012
5013 if self.group1 and self.group2:
5014 logger.info("Authorize peer to join the group")
fab49f61
JM
5015 a2 = binascii.unhexlify(addr2.replace(':', ''))
5016 params = {'Role': 'enrollee',
5017 'P2PDeviceAddress': dbus.ByteArray(a2),
5018 'Bssid': dbus.ByteArray(a2),
5019 'Type': 'pin',
5020 'Pin': '12345670'}
2ec82e67
JM
5021 g_wps = dbus.Interface(self.g2_if_obj, WPAS_DBUS_IFACE_WPS)
5022 g_wps.Start(params)
5023
fab49f61 5024 bssid = ':'.join(["%02x" % i for i in struct.unpack('6B', self.g2_bssid)])
2ec82e67
JM
5025 dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
5026 dev2.scan_for_bss(bssid, freq=2412)
5027 dev2.global_request("P2P_CONNECT " + bssid + " 12345670 join freq=2412")
bc6e3288 5028 ev = dev2.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
cb346b49
JM
5029 if ev is None:
5030 raise Exception("Group join timed out")
5031 self.dev2_group_ev = ev
2ec82e67
JM
5032
5033 def groupFinished(self, properties):
5034 logger.debug("groupFinished: " + str(properties))
5035
5036 if self.group1 == properties['group_object']:
5037 self.group1 = None
5038 elif self.group2 == properties['group_object']:
5039 self.group2 = None
5040
5041 if not self.group1 and not self.group2:
5042 self.done = True
5043 self.loop.quit()
5044
5045 def peerJoined(self, peer):
5046 logger.debug("peerJoined: " + peer)
5a217649
JM
5047 if self.groups_removed:
5048 return
2ec82e67
JM
5049 self.check_results()
5050
5051 dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
cb346b49 5052 dev2.group_form_result(self.dev2_group_ev)
2ec82e67
JM
5053 dev2.remove_group()
5054
5055 logger.info("Disconnect group2")
5056 group_p2p = dbus.Interface(self.g2_if_obj,
5057 WPAS_DBUS_IFACE_P2PDEVICE)
5058 group_p2p.Disconnect()
5059
5060 logger.info("Disconnect group1")
5061 group_p2p = dbus.Interface(self.g1_if_obj,
5062 WPAS_DBUS_IFACE_P2PDEVICE)
5063 group_p2p.Disconnect()
5a217649 5064 self.groups_removed = True
2ec82e67
JM
5065
5066 def check_results(self):
5067 logger.info("Check results with two concurrent groups in operation")
5068
5069 g1_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group1)
5070 res1 = g1_obj.GetAll(WPAS_DBUS_GROUP,
5071 dbus_interface=dbus.PROPERTIES_IFACE,
5072 byte_arrays=True)
5073
5074 g2_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group2)
5075 res2 = g2_obj.GetAll(WPAS_DBUS_GROUP,
5076 dbus_interface=dbus.PROPERTIES_IFACE,
5077 byte_arrays=True)
5078
5079 logger.info("group1 = " + self.group1)
5080 logger.debug("Group properties: " + str(res1))
5081
5082 logger.info("group2 = " + self.group2)
5083 logger.debug("Group properties: " + str(res2))
5084
5085 prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
5086 dbus_interface=dbus.PROPERTIES_IFACE)
5087 logger.debug("p2pdevice properties: " + str(prop))
5088
5089 if res1['Role'] != 'client':
5090 raise Exception("Group1 role reported incorrectly: " + res1['Role'])
5091 if res2['Role'] != 'GO':
5092 raise Exception("Group2 role reported incorrectly: " + res2['Role'])
5093 if prop['Role'] != 'device':
5094 raise Exception("p2pdevice role reported incorrectly: " + prop['Role'])
5095
5096 if len(res2['Members']) != 1:
5097 raise Exception("Unexpected Members value for group 2")
5098
5099 def run_test(self, *args):
5100 logger.debug("run_test")
5101 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
5102 return False
5103
5104 def success(self):
5105 return self.done
5106
5107 with TestDbusP2p(bus) as t:
5108 if not t.success():
5109 raise Exception("Expected signals not seen")
5110
5111 dev[1].remove_group()
0e126c6d 5112
f572ae80
JM
5113def test_dbus_p2p_cancel(dev, apdev):
5114 """D-Bus P2P Cancel"""
fab49f61 5115 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
f572ae80
JM
5116 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
5117 try:
5118 p2p.Cancel()
5119 raise Exception("Unexpected p2p.Cancel() success")
bab493b9 5120 except dbus.exceptions.DBusException as e:
f572ae80
JM
5121 pass
5122
5123 addr0 = dev[0].p2p_dev_addr()
5124 dev[1].p2p_listen()
5125
5126 class TestDbusP2p(TestDbus):
5127 def __init__(self, bus):
5128 TestDbus.__init__(self, bus)
5129 self.done = False
5130
5131 def __enter__(self):
5132 gobject.timeout_add(1, self.run_test)
5133 gobject.timeout_add(15000, self.timeout)
5134 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
5135 "DeviceFound")
5136 self.loop.run()
5137 return self
5138
5139 def deviceFound(self, path):
5140 logger.debug("deviceFound: path=%s" % path)
fab49f61
JM
5141 args = {'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
5142 'go_intent': 0}
f572ae80
JM
5143 p2p.Connect(args)
5144 p2p.Cancel()
5145 self.done = True
5146 self.loop.quit()
5147
5148 def run_test(self, *args):
5149 logger.debug("run_test")
5150 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
5151 return False
5152
5153 def success(self):
5154 return self.done
5155
5156 with TestDbusP2p(bus) as t:
5157 if not t.success():
5158 raise Exception("Expected signals not seen")
5159
afe28053
JM
5160def test_dbus_p2p_ip_addr(dev, apdev):
5161 """D-Bus P2P and IP address parameters"""
fab49f61 5162 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
afe28053
JM
5163 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
5164
fab49f61
JM
5165 vals = [("IpAddrGo", "192.168.43.1"),
5166 ("IpAddrMask", "255.255.255.0"),
5167 ("IpAddrStart", "192.168.43.100"),
5168 ("IpAddrEnd", "192.168.43.199")]
afe28053
JM
5169 for field, value in vals:
5170 if_obj.Set(WPAS_DBUS_IFACE, field, value,
5171 dbus_interface=dbus.PROPERTIES_IFACE)
5172 val = if_obj.Get(WPAS_DBUS_IFACE, field,
5173 dbus_interface=dbus.PROPERTIES_IFACE)
5174 if val != value:
5175 raise Exception("Unexpected %s value: %s" % (field, val))
5176
5177 set_ip_addr_info(dev[1])
5178
5179 dev[0].global_request("SET p2p_go_intent 0")
5180
5181 req = dev[0].global_request("NFC_GET_HANDOVER_REQ NDEF P2P-CR").rstrip()
5182 if "FAIL" in req:
5183 raise Exception("Failed to generate NFC connection handover request")
5184 sel = dev[1].global_request("NFC_GET_HANDOVER_SEL NDEF P2P-CR").rstrip()
5185 if "FAIL" in sel:
5186 raise Exception("Failed to generate NFC connection handover select")
5187 dev[0].dump_monitor()
5188 dev[1].dump_monitor()
5189 res = dev[1].global_request("NFC_REPORT_HANDOVER RESP P2P " + req + " " + sel)
5190 if "FAIL" in res:
5191 raise Exception("Failed to report NFC connection handover to wpa_supplicant(resp)")
5192 res = dev[0].global_request("NFC_REPORT_HANDOVER INIT P2P " + req + " " + sel)
5193 if "FAIL" in res:
5194 raise Exception("Failed to report NFC connection handover to wpa_supplicant(init)")
5195
5196 class TestDbusP2p(TestDbus):
5197 def __init__(self, bus):
5198 TestDbus.__init__(self, bus)
5199 self.done = False
5200
5201 def __enter__(self):
5202 gobject.timeout_add(1, self.run_test)
5203 gobject.timeout_add(15000, self.timeout)
5204 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
5205 "GroupStarted")
5206 self.loop.run()
5207 return self
5208
5209 def groupStarted(self, properties):
5210 logger.debug("groupStarted: " + str(properties))
5211 self.loop.quit()
5212
5213 if 'IpAddrGo' not in properties:
5214 logger.info("IpAddrGo missing from GroupStarted")
5215 ip_addr_go = properties['IpAddrGo']
5216 addr = "%d.%d.%d.%d" % (ip_addr_go[0], ip_addr_go[1], ip_addr_go[2], ip_addr_go[3])
5217 if addr != "192.168.42.1":
5218 logger.info("Unexpected IpAddrGo value: " + addr)
5219 self.done = True
5220
5221 def run_test(self, *args):
5222 logger.debug("run_test")
5223 return False
5224
5225 def success(self):
5226 return self.done
5227
5228 with TestDbusP2p(bus) as t:
5229 if not t.success():
5230 raise Exception("Expected signals not seen")
5231
0e126c6d
JM
5232def test_dbus_introspect(dev, apdev):
5233 """D-Bus introspection"""
fab49f61 5234 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
0e126c6d
JM
5235
5236 res = if_obj.Introspect(WPAS_DBUS_IFACE,
5237 dbus_interface=dbus.INTROSPECTABLE_IFACE)
5238 logger.info("Initial Introspect: " + str(res))
5239 if res is None or "Introspectable" not in res or "GroupStarted" not in res:
5240 raise Exception("Unexpected initial Introspect response: " + str(res))
f0ef6889
DW
5241 if "FastReauth" not in res or "PassiveScan" not in res:
5242 raise Exception("Unexpected initial Introspect response: " + str(res))
0e126c6d
JM
5243
5244 with alloc_fail(dev[0], 1, "wpa_dbus_introspect"):
5245 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
5246 dbus_interface=dbus.INTROSPECTABLE_IFACE)
5247 logger.info("Introspect: " + str(res2))
5248 if res2 is not None:
5249 raise Exception("Unexpected Introspect response")
5250
5251 with alloc_fail(dev[0], 1, "=add_interface;wpa_dbus_introspect"):
5252 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
5253 dbus_interface=dbus.INTROSPECTABLE_IFACE)
5254 logger.info("Introspect: " + str(res2))
5255 if res2 is None:
5256 raise Exception("No Introspect response")
5257 if len(res2) >= len(res):
5258 raise Exception("Unexpected Introspect response")
5259
5260 with alloc_fail(dev[0], 1, "wpabuf_alloc;add_interface;wpa_dbus_introspect"):
5261 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
5262 dbus_interface=dbus.INTROSPECTABLE_IFACE)
5263 logger.info("Introspect: " + str(res2))
5264 if res2 is None:
5265 raise Exception("No Introspect response")
5266 if len(res2) >= len(res):
5267 raise Exception("Unexpected Introspect response")
5268
5269 with alloc_fail(dev[0], 2, "=add_interface;wpa_dbus_introspect"):
5270 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
5271 dbus_interface=dbus.INTROSPECTABLE_IFACE)
5272 logger.info("Introspect: " + str(res2))
5273 if res2 is None:
5274 raise Exception("No Introspect response")
5275 if len(res2) >= len(res):
5276 raise Exception("Unexpected Introspect response")
9bbce257 5277
a21eadcf
JM
5278def run_busctl(service, obj):
5279 logger.info("busctl introspect %s %s" % (service, obj))
fab49f61 5280 cmd = subprocess.Popen(['busctl', 'introspect', service, obj],
a21eadcf
JM
5281 stdout=subprocess.PIPE,
5282 stderr=subprocess.PIPE)
5283 out = cmd.communicate()
5284 cmd.wait()
5285 logger.info("busctl stdout:\n%s" % out[0].strip())
5286 if len(out[1]) > 0:
04fa9fc7
MH
5287 logger.info("busctl stderr: %s" % out[1].decode().strip())
5288 if "Duplicate property" in out[1].decode():
a21eadcf
JM
5289 raise Exception("Duplicate property")
5290
5291def test_dbus_introspect_busctl(dev, apdev):
5292 """D-Bus introspection with busctl"""
fab49f61 5293 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
a21eadcf
JM
5294 ifaces = dbus_get(dbus, wpas_obj, "Interfaces")
5295 run_busctl(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH)
5296 run_busctl(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH + "/Interfaces")
5297 run_busctl(WPAS_DBUS_SERVICE, ifaces[0])
5298
fab49f61 5299 hapd = hostapd.add_ap(apdev[0], {"ssid": "open"})
a21eadcf
JM
5300 bssid = apdev[0]['bssid']
5301 dev[0].scan_for_bss(bssid, freq=2412)
5302 id = dev[0].add_network()
5303 dev[0].set_network(id, "disabled", "0")
5304 dev[0].set_network_quoted(id, "ssid", "test")
5305
5306 run_busctl(WPAS_DBUS_SERVICE, ifaces[0] + "/BSSs/0")
5307 run_busctl(WPAS_DBUS_SERVICE, ifaces[0] + "/Networks/0")
5308
9bbce257
JM
5309def test_dbus_ap(dev, apdev):
5310 """D-Bus AddNetwork for AP mode"""
fab49f61 5311 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
9bbce257
JM
5312 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5313
5314 ssid = "test-wpa2-psk"
5315 passphrase = 'qwertyuiop'
5316
5317 class TestDbusConnect(TestDbus):
5318 def __init__(self, bus):
5319 TestDbus.__init__(self, bus)
5320 self.started = False
345d8b5e
JM
5321 self.sta_added = False
5322 self.sta_removed = False
5323 self.authorized = False
5324 self.deauthorized = False
5325 self.stations = False
9bbce257
JM
5326
5327 def __enter__(self):
5328 gobject.timeout_add(1, self.run_connect)
5329 gobject.timeout_add(15000, self.timeout)
5330 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
5331 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
5332 "NetworkSelected")
5333 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
5334 "PropertiesChanged")
345d8b5e
JM
5335 self.add_signal(self.stationAdded, WPAS_DBUS_IFACE, "StationAdded")
5336 self.add_signal(self.stationRemoved, WPAS_DBUS_IFACE,
5337 "StationRemoved")
5338 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
5339 "StaAuthorized")
5340 self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE,
5341 "StaDeauthorized")
9bbce257
JM
5342 self.loop.run()
5343 return self
5344
5345 def networkAdded(self, network, properties):
5346 logger.debug("networkAdded: %s" % str(network))
5347 logger.debug(str(properties))
5348
5349 def networkSelected(self, network):
5350 logger.debug("networkSelected: %s" % str(network))
5351 self.network_selected = True
5352
5353 def propertiesChanged(self, properties):
5354 logger.debug("propertiesChanged: %s" % str(properties))
5355 if 'State' in properties and properties['State'] == "completed":
5356 self.started = True
345d8b5e
JM
5357 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
5358 dev1.connect(ssid, psk=passphrase, scan_freq="2412")
5359
5360 def stationAdded(self, station, properties):
5361 logger.debug("stationAdded: %s" % str(station))
5362 logger.debug(str(properties))
5363 self.sta_added = True
5364 res = if_obj.Get(WPAS_DBUS_IFACE, 'Stations',
5365 dbus_interface=dbus.PROPERTIES_IFACE)
5366 logger.info("Stations: " + str(res))
5367 if len(res) == 1:
5368 self.stations = True
5369 else:
5370 raise Exception("Missing Stations entry: " + str(res))
5371
5372 def stationRemoved(self, station):
5373 logger.debug("stationRemoved: %s" % str(station))
5374 self.sta_removed = True
5375 res = if_obj.Get(WPAS_DBUS_IFACE, 'Stations',
5376 dbus_interface=dbus.PROPERTIES_IFACE)
5377 logger.info("Stations: " + str(res))
5378 if len(res) != 0:
5379 self.stations = False
5380 raise Exception("Unexpected Stations entry: " + str(res))
5381 self.loop.quit()
5382
5383 def staAuthorized(self, name):
5384 logger.debug("staAuthorized: " + name)
5385 self.authorized = True
5386 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
5387 dev1.request("DISCONNECT")
5388
5389 def staDeauthorized(self, name):
5390 logger.debug("staDeauthorized: " + name)
5391 self.deauthorized = True
9bbce257
JM
5392
5393 def run_connect(self, *args):
5394 logger.debug("run_connect")
fab49f61
JM
5395 args = dbus.Dictionary({'ssid': ssid,
5396 'key_mgmt': 'WPA-PSK',
5397 'psk': passphrase,
5398 'mode': 2,
5399 'frequency': 2412,
5400 'scan_freq': 2412},
9bbce257
JM
5401 signature='sv')
5402 self.netw = iface.AddNetwork(args)
5403 iface.SelectNetwork(self.netw)
5404 return False
5405
5406 def success(self):
345d8b5e
JM
5407 return self.started and self.sta_added and self.sta_removed and \
5408 self.authorized and self.deauthorized
9bbce257
JM
5409
5410 with TestDbusConnect(bus) as t:
5411 if not t.success():
5412 raise Exception("Expected signals not seen")
d9e2a057
JM
5413
5414def test_dbus_connect_wpa_eap(dev, apdev):
5415 """D-Bus AddNetwork and connection with WPA+WPA2-Enterprise AP"""
fab49f61 5416 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
d9e2a057
JM
5417 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5418
5419 ssid = "test-wpa-eap"
5420 params = hostapd.wpa_eap_params(ssid=ssid)
5421 params["wpa"] = "3"
5422 params["rsn_pairwise"] = "CCMP"
8b8a1864 5423 hapd = hostapd.add_ap(apdev[0], params)
d9e2a057
JM
5424
5425 class TestDbusConnect(TestDbus):
5426 def __init__(self, bus):
5427 TestDbus.__init__(self, bus)
5428 self.done = False
5429
5430 def __enter__(self):
5431 gobject.timeout_add(1, self.run_connect)
5432 gobject.timeout_add(15000, self.timeout)
5433 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
5434 "PropertiesChanged")
5435 self.add_signal(self.eap, WPAS_DBUS_IFACE, "EAP")
5436 self.loop.run()
5437 return self
5438
5439 def propertiesChanged(self, properties):
5440 logger.debug("propertiesChanged: %s" % str(properties))
5441 if 'State' in properties and properties['State'] == "completed":
5442 self.done = True
5443 self.loop.quit()
5444
5445 def eap(self, status, parameter):
5446 logger.debug("EAP: status=%s parameter=%s" % (status, parameter))
5447
5448 def run_connect(self, *args):
5449 logger.debug("run_connect")
fab49f61
JM
5450 args = dbus.Dictionary({'ssid': ssid,
5451 'key_mgmt': 'WPA-EAP',
5452 'eap': 'PEAP',
5453 'identity': 'user',
5454 'password': 'password',
5455 'ca_cert': 'auth_serv/ca.pem',
5456 'phase2': 'auth=MSCHAPV2',
5457 'scan_freq': 2412},
d9e2a057
JM
5458 signature='sv')
5459 self.netw = iface.AddNetwork(args)
5460 iface.SelectNetwork(self.netw)
5461 return False
5462
5463 def success(self):
5464 return self.done
5465
5466 with TestDbusConnect(bus) as t:
5467 if not t.success():
5468 raise Exception("Expected signals not seen")
708ec753
JM
5469
5470def test_dbus_ap_scan_2_ap_mode_scan(dev, apdev):
5471 """AP_SCAN 2 AP mode and D-Bus Scan()"""
5472 try:
5473 _test_dbus_ap_scan_2_ap_mode_scan(dev, apdev)
5474 finally:
5475 dev[0].request("AP_SCAN 1")
5476
5477def _test_dbus_ap_scan_2_ap_mode_scan(dev, apdev):
fab49f61 5478 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
708ec753
JM
5479 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5480
5481 if "OK" not in dev[0].request("AP_SCAN 2"):
5482 raise Exception("Failed to set AP_SCAN 2")
5483
5484 id = dev[0].add_network()
5485 dev[0].set_network(id, "mode", "2")
5486 dev[0].set_network_quoted(id, "ssid", "wpas-ap-open")
5487 dev[0].set_network(id, "key_mgmt", "NONE")
5488 dev[0].set_network(id, "frequency", "2412")
5489 dev[0].set_network(id, "scan_freq", "2412")
5490 dev[0].set_network(id, "disabled", "0")
5491 dev[0].select_network(id)
5492 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=5)
5493 if ev is None:
5494 raise Exception("AP failed to start")
5495
5496 with fail_test(dev[0], 1, "wpa_driver_nl80211_scan"):
5497 iface.Scan({'Type': 'active',
5498 'AllowRoam': True,
5499 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
5500 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-FAILED",
5501 "AP-DISABLED"], timeout=5)
5502 if ev is None:
5503 raise Exception("CTRL-EVENT-SCAN-FAILED not seen")
5504 if "AP-DISABLED" in ev:
5505 raise Exception("Unexpected AP-DISABLED event")
5506 if "retry=1" in ev:
5507 # Wait for the retry to scan happen
5508 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-FAILED",
5509 "AP-DISABLED"], timeout=5)
5510 if ev is None:
5511 raise Exception("CTRL-EVENT-SCAN-FAILED not seen - retry")
5512 if "AP-DISABLED" in ev:
5513 raise Exception("Unexpected AP-DISABLED event - retry")
5514
5515 dev[1].connect("wpas-ap-open", key_mgmt="NONE", scan_freq="2412")
5516 dev[1].request("DISCONNECT")
5517 dev[1].wait_disconnected()
5518 dev[0].request("DISCONNECT")
5519 dev[0].wait_disconnected()
d679ab74
JM
5520
5521def test_dbus_expectdisconnect(dev, apdev):
5522 """D-Bus ExpectDisconnect"""
fab49f61 5523 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
d679ab74
JM
5524 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
5525
fab49f61 5526 params = {"ssid": "test-open"}
8b8a1864 5527 hapd = hostapd.add_ap(apdev[0], params)
d679ab74
JM
5528 dev[0].connect("test-open", key_mgmt="NONE", scan_freq="2412")
5529
5530 # This does not really verify the behavior other than by going through the
5531 # code path for additional coverage.
5532 wpas.ExpectDisconnect()
5533 dev[0].request("DISCONNECT")
5534 dev[0].wait_disconnected()
2299b543
JM
5535
5536def test_dbus_save_config(dev, apdev):
5537 """D-Bus SaveConfig"""
fab49f61 5538 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2299b543
JM
5539 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5540 try:
5541 iface.SaveConfig()
5542 raise Exception("SaveConfig() accepted unexpectedly")
bab493b9 5543 except dbus.exceptions.DBusException as e:
2299b543
JM
5544 if not str(e).startswith("fi.w1.wpa_supplicant1.UnknownError: Not allowed to update configuration"):
5545 raise Exception("Unexpected error message for SaveConfig(): " + str(e))
ce96e65c
JM
5546
5547def test_dbus_vendor_elem(dev, apdev):
5548 """D-Bus vendor element operations"""
5549 try:
5550 _test_dbus_vendor_elem(dev, apdev)
5551 finally:
5552 dev[0].request("VENDOR_ELEM_REMOVE 1 *")
5553
5554def _test_dbus_vendor_elem(dev, apdev):
fab49f61 5555 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
ce96e65c
JM
5556 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5557
5558 dev[0].request("VENDOR_ELEM_REMOVE 1 *")
5559
5560 try:
15dfcb69 5561 ie = dbus.ByteArray(b"\x00\x00")
ce96e65c
JM
5562 iface.VendorElemAdd(-1, ie)
5563 raise Exception("Invalid VendorElemAdd() accepted")
bab493b9 5564 except dbus.exceptions.DBusException as e:
ce96e65c
JM
5565 if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5566 raise Exception("Unexpected error message for invalid VendorElemAdd[1]: " + str(e))
5567
5568 try:
15dfcb69 5569 ie = dbus.ByteArray(b'')
ce96e65c
JM
5570 iface.VendorElemAdd(1, ie)
5571 raise Exception("Invalid VendorElemAdd() accepted")
bab493b9 5572 except dbus.exceptions.DBusException as e:
ce96e65c
JM
5573 if "InvalidArgs" not in str(e) or "Invalid value" not in str(e):
5574 raise Exception("Unexpected error message for invalid VendorElemAdd[2]: " + str(e))
5575
5576 try:
15dfcb69 5577 ie = dbus.ByteArray(b"\x00\x01")
ce96e65c
JM
5578 iface.VendorElemAdd(1, ie)
5579 raise Exception("Invalid VendorElemAdd() accepted")
bab493b9 5580 except dbus.exceptions.DBusException as e:
ce96e65c
JM
5581 if "InvalidArgs" not in str(e) or "Parse error" not in str(e):
5582 raise Exception("Unexpected error message for invalid VendorElemAdd[3]: " + str(e))
5583
5584 try:
5585 iface.VendorElemGet(-1)
5586 raise Exception("Invalid VendorElemGet() accepted")
bab493b9 5587 except dbus.exceptions.DBusException as e:
ce96e65c
JM
5588 if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5589 raise Exception("Unexpected error message for invalid VendorElemGet[1]: " + str(e))
5590
5591 try:
5592 iface.VendorElemGet(1)
5593 raise Exception("Invalid VendorElemGet() accepted")
bab493b9 5594 except dbus.exceptions.DBusException as e:
ce96e65c
JM
5595 if "InvalidArgs" not in str(e) or "ID value does not exist" not in str(e):
5596 raise Exception("Unexpected error message for invalid VendorElemGet[2]: " + str(e))
5597
5598 try:
15dfcb69 5599 ie = dbus.ByteArray(b"\x00\x00")
ce96e65c
JM
5600 iface.VendorElemRem(-1, ie)
5601 raise Exception("Invalid VendorElemRemove() accepted")
bab493b9 5602 except dbus.exceptions.DBusException as e:
ce96e65c
JM
5603 if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5604 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5605
5606 try:
15dfcb69 5607 ie = dbus.ByteArray(b'')
ce96e65c
JM
5608 iface.VendorElemRem(1, ie)
5609 raise Exception("Invalid VendorElemRemove() accepted")
bab493b9 5610 except dbus.exceptions.DBusException as e:
ce96e65c
JM
5611 if "InvalidArgs" not in str(e) or "Invalid value" not in str(e):
5612 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5613
15dfcb69 5614 iface.VendorElemRem(1, b"*")
ce96e65c 5615
15dfcb69 5616 ie = dbus.ByteArray(b"\x00\x01\x00")
ce96e65c
JM
5617 iface.VendorElemAdd(1, ie)
5618
5619 val = iface.VendorElemGet(1)
5620 if len(val) != len(ie):
5621 raise Exception("Unexpected VendorElemGet length")
5622 for i in range(len(val)):
5623 if val[i] != dbus.Byte(ie[i]):
5624 raise Exception("Unexpected VendorElemGet data")
5625
15dfcb69 5626 ie2 = dbus.ByteArray(b"\xe0\x00")
ce96e65c
JM
5627 iface.VendorElemAdd(1, ie2)
5628
5629 ies = ie + ie2
5630 val = iface.VendorElemGet(1)
5631 if len(val) != len(ies):
5632 raise Exception("Unexpected VendorElemGet length[2]")
5633 for i in range(len(val)):
5634 if val[i] != dbus.Byte(ies[i]):
5635 raise Exception("Unexpected VendorElemGet data[2]")
5636
5637 try:
15dfcb69 5638 test_ie = dbus.ByteArray(b"\x01\x01")
ce96e65c
JM
5639 iface.VendorElemRem(1, test_ie)
5640 raise Exception("Invalid VendorElemRemove() accepted")
bab493b9 5641 except dbus.exceptions.DBusException as e:
ce96e65c
JM
5642 if "InvalidArgs" not in str(e) or "Parse error" not in str(e):
5643 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5644
5645 iface.VendorElemRem(1, ie)
5646 val = iface.VendorElemGet(1)
5647 if len(val) != len(ie2):
5648 raise Exception("Unexpected VendorElemGet length[3]")
5649
15dfcb69 5650 iface.VendorElemRem(1, b"*")
ce96e65c
JM
5651 try:
5652 iface.VendorElemGet(1)
5653 raise Exception("Invalid VendorElemGet() accepted after removal")
bab493b9 5654 except dbus.exceptions.DBusException as e:
ce96e65c
JM
5655 if "InvalidArgs" not in str(e) or "ID value does not exist" not in str(e):
5656 raise Exception("Unexpected error message for invalid VendorElemGet after removal: " + str(e))
dbd183c7
JM
5657
5658def test_dbus_assoc_reject(dev, apdev):
5659 """D-Bus AssocStatusCode"""
fab49f61 5660 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
dbd183c7
JM
5661 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5662
5663 ssid = "test-open"
fab49f61
JM
5664 params = {"ssid": ssid,
5665 "max_listen_interval": "1"}
8b8a1864 5666 hapd = hostapd.add_ap(apdev[0], params)
dbd183c7
JM
5667
5668 class TestDbusConnect(TestDbus):
5669 def __init__(self, bus):
5670 TestDbus.__init__(self, bus)
5671 self.assoc_status_seen = False
5672 self.state = 0
5673
5674 def __enter__(self):
5675 gobject.timeout_add(1, self.run_connect)
5676 gobject.timeout_add(15000, self.timeout)
5677 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
5678 "PropertiesChanged")
5679 self.loop.run()
5680 return self
5681
5682 def propertiesChanged(self, properties):
5683 logger.debug("propertiesChanged: %s" % str(properties))
5684 if 'AssocStatusCode' in properties:
5685 status = properties['AssocStatusCode']
5686 if status != 51:
5687 logger.info("Unexpected status code: " + str(status))
5688 else:
5689 self.assoc_status_seen = True
5690 iface.Disconnect()
5691 self.loop.quit()
5692
5693 def run_connect(self, *args):
fab49f61
JM
5694 args = dbus.Dictionary({'ssid': ssid,
5695 'key_mgmt': 'NONE',
5696 'scan_freq': 2412},
dbd183c7
JM
5697 signature='sv')
5698 self.netw = iface.AddNetwork(args)
5699 iface.SelectNetwork(self.netw)
5700 return False
5701
5702 def success(self):
5703 return self.assoc_status_seen
5704
5705 with TestDbusConnect(bus) as t:
5706 if not t.success():
5707 raise Exception("Expected signals not seen")
504c7ffd
JM
5708
5709def test_dbus_mesh(dev, apdev):
5710 """D-Bus mesh"""
5711 check_mesh_support(dev[0])
fab49f61 5712 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
504c7ffd
JM
5713 mesh = dbus.Interface(if_obj, WPAS_DBUS_IFACE_MESH)
5714
5715 add_open_mesh_network(dev[1])
5716 addr1 = dev[1].own_addr()
5717
5718 class TestDbusMesh(TestDbus):
5719 def __init__(self, bus):
5720 TestDbus.__init__(self, bus)
5721 self.done = False
5722
5723 def __enter__(self):
5724 gobject.timeout_add(1, self.run_test)
5725 gobject.timeout_add(15000, self.timeout)
5726 self.add_signal(self.meshGroupStarted, WPAS_DBUS_IFACE_MESH,
5727 "MeshGroupStarted")
5728 self.add_signal(self.meshGroupRemoved, WPAS_DBUS_IFACE_MESH,
5729 "MeshGroupRemoved")
5730 self.add_signal(self.meshPeerConnected, WPAS_DBUS_IFACE_MESH,
5731 "MeshPeerConnected")
5732 self.add_signal(self.meshPeerDisconnected, WPAS_DBUS_IFACE_MESH,
5733 "MeshPeerDisconnected")
5734 self.loop.run()
5735 return self
5736
5737 def meshGroupStarted(self, args):
5738 logger.debug("MeshGroupStarted: " + str(args))
5739
5740 def meshGroupRemoved(self, args):
5741 logger.debug("MeshGroupRemoved: " + str(args))
5742 self.done = True
5743 self.loop.quit()
5744
5745 def meshPeerConnected(self, args):
5746 logger.debug("MeshPeerConnected: " + str(args))
5747
5748 res = if_obj.Get(WPAS_DBUS_IFACE_MESH, 'MeshPeers',
5749 dbus_interface=dbus.PROPERTIES_IFACE,
5750 byte_arrays=True)
5751 logger.debug("MeshPeers: " + str(res))
5752 if len(res) != 1:
5753 raise Exception("Unexpected number of MeshPeer values")
7ab74770 5754 if binascii.hexlify(res[0]).decode() != addr1.replace(':', ''):
504c7ffd
JM
5755 raise Exception("Unexpected peer address")
5756
5757 res = if_obj.Get(WPAS_DBUS_IFACE_MESH, 'MeshGroup',
5758 dbus_interface=dbus.PROPERTIES_IFACE,
5759 byte_arrays=True)
5760 logger.debug("MeshGroup: " + str(res))
15dfcb69 5761 if res != b"wpas-mesh-open":
504c7ffd
JM
5762 raise Exception("Unexpected MeshGroup")
5763 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
5764 dev1.mesh_group_remove()
5765
5766 def meshPeerDisconnected(self, args):
5767 logger.debug("MeshPeerDisconnected: " + str(args))
5768 dev0 = WpaSupplicant('wlan0', '/tmp/wpas-wlan0')
5769 dev0.mesh_group_remove()
5770
5771 def run_test(self, *args):
5772 logger.debug("run_test")
5773 dev0 = WpaSupplicant('wlan0', '/tmp/wpas-wlan0')
5774 add_open_mesh_network(dev0)
5775 return False
5776
5777 def success(self):
5778 return self.done
5779
5780 with TestDbusMesh(bus) as t:
5781 if not t.success():
5782 raise Exception("Expected signals not seen")