]> git.ipfire.org Git - thirdparty/hostap.git/blame - tests/hwsim/test_dbus.py
tests: DPP and provisoning DPP and legacy AKMs
[thirdparty/hostap.git] / tests / hwsim / test_dbus.py
CommitLineData
2ec82e67 1# wpa_supplicant D-Bus interface tests
910eb5b5 2# Copyright (c) 2014-2015, Jouni Malinen <j@w1.fi>
2ec82e67
JM
3#
4# This software may be distributed under the terms of the BSD license.
5# See README for more details.
6
7import binascii
2ec82e67
JM
8import logging
9logger = logging.getLogger()
10import subprocess
11import time
c37ef330 12import struct
5c9ba341 13import sys
2ec82e67 14
910eb5b5 15try:
5c9ba341
JM
16 if sys.version_info[0] > 2:
17 from gi.repository import GObject as gobject
18 else:
19 import gobject
910eb5b5
JM
20 import dbus
21 dbus_imported = True
22except ImportError:
23 dbus_imported = False
24
2ec82e67
JM
25import hostapd
26from wpasupplicant import WpaSupplicant
708ec753 27from utils import HwsimSkip, alloc_fail, fail_test
1992af11 28from p2p_utils import *
2ec82e67 29from test_ap_tdls import connect_2sta_open
089e7ca3 30from test_ap_eap import check_altsubject_match_support
afe28053 31from test_nfc_p2p import set_ip_addr_info
504c7ffd 32from test_wpas_mesh import check_mesh_support, add_open_mesh_network
2ec82e67
JM
33
34WPAS_DBUS_SERVICE = "fi.w1.wpa_supplicant1"
35WPAS_DBUS_PATH = "/fi/w1/wpa_supplicant1"
36WPAS_DBUS_IFACE = "fi.w1.wpa_supplicant1.Interface"
37WPAS_DBUS_IFACE_WPS = WPAS_DBUS_IFACE + ".WPS"
38WPAS_DBUS_NETWORK = "fi.w1.wpa_supplicant1.Network"
39WPAS_DBUS_BSS = "fi.w1.wpa_supplicant1.BSS"
40WPAS_DBUS_IFACE_P2PDEVICE = WPAS_DBUS_IFACE + ".P2PDevice"
41WPAS_DBUS_P2P_PEER = "fi.w1.wpa_supplicant1.Peer"
42WPAS_DBUS_GROUP = "fi.w1.wpa_supplicant1.Group"
43WPAS_DBUS_PERSISTENT_GROUP = "fi.w1.wpa_supplicant1.PersistentGroup"
504c7ffd 44WPAS_DBUS_IFACE_MESH = WPAS_DBUS_IFACE + ".Mesh"
2ec82e67
JM
45
46def prepare_dbus(dev):
910eb5b5
JM
47 if not dbus_imported:
48 logger.info("No dbus module available")
51c5aeb4 49 raise HwsimSkip("No dbus module available")
2ec82e67 50 try:
2ec82e67
JM
51 from dbus.mainloop.glib import DBusGMainLoop
52 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
53 bus = dbus.SystemBus()
54 wpas_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH)
55 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
56 path = wpas.GetInterface(dev.ifname)
57 if_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
910eb5b5 58 return (bus,wpas_obj,path,if_obj)
bab493b9 59 except Exception as e:
51c5aeb4 60 raise HwsimSkip("Could not connect to D-Bus: %s" % e)
2ec82e67
JM
61
62class TestDbus(object):
63 def __init__(self, bus):
64 self.loop = gobject.MainLoop()
65 self.signals = []
66 self.bus = bus
67
68 def __exit__(self, type, value, traceback):
69 for s in self.signals:
70 s.remove()
71
72 def add_signal(self, handler, interface, name, byte_arrays=False):
73 s = self.bus.add_signal_receiver(handler, dbus_interface=interface,
74 signal_name=name,
75 byte_arrays=byte_arrays)
76 self.signals.append(s)
77
78 def timeout(self, *args):
79 logger.debug("timeout")
80 self.loop.quit()
81 return False
82
0e126c6d
JM
83class alloc_fail_dbus(object):
84 def __init__(self, dev, count, funcs, operation="Operation",
85 expected="NoMemory"):
86 self._dev = dev
87 self._count = count
88 self._funcs = funcs
89 self._operation = operation
90 self._expected = expected
91 def __enter__(self):
92 cmd = "TEST_ALLOC_FAIL %d:%s" % (self._count, self._funcs)
93 if "OK" not in self._dev.request(cmd):
94 raise HwsimSkip("TEST_ALLOC_FAIL not supported")
95 def __exit__(self, type, value, traceback):
96 if type is None:
97 raise Exception("%s succeeded during out-of-memory" % self._operation)
98 if type == dbus.exceptions.DBusException and self._expected in str(value):
99 return True
100 if self._dev.request("GET_ALLOC_FAIL") != "0:%s" % self._funcs:
101 raise Exception("%s did not trigger allocation failure" % self._operation)
102 return False
103
3a59cda1
JM
104def start_ap(ap, ssid="test-wps",
105 ap_uuid="27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"):
2ec82e67
JM
106 params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
107 "wpa_passphrase": "12345678", "wpa": "2",
108 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
3a59cda1 109 "ap_pin": "12345670", "uuid": ap_uuid}
afc26df2 110 return hostapd.add_ap(ap, params)
2ec82e67
JM
111
112def test_dbus_getall(dev, apdev):
113 """D-Bus GetAll"""
910eb5b5 114 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
115
116 props = wpas_obj.GetAll(WPAS_DBUS_SERVICE,
117 dbus_interface=dbus.PROPERTIES_IFACE)
118 logger.debug("GetAll(fi.w1.wpa.supplicant1, /fi/w1/wpa_supplicant1) ==> " + str(props))
119
120 props = if_obj.GetAll(WPAS_DBUS_IFACE,
121 dbus_interface=dbus.PROPERTIES_IFACE)
122 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE, path, str(props)))
123
124 props = if_obj.GetAll(WPAS_DBUS_IFACE_WPS,
125 dbus_interface=dbus.PROPERTIES_IFACE)
126 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE_WPS, path, str(props)))
127
128 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
129 dbus_interface=dbus.PROPERTIES_IFACE)
130 if len(res) != 0:
131 raise Exception("Unexpected BSSs entry: " + str(res))
132
133 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
134 dbus_interface=dbus.PROPERTIES_IFACE)
135 if len(res) != 0:
136 raise Exception("Unexpected Networks entry: " + str(res))
137
8b8a1864 138 hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
2ec82e67
JM
139 bssid = apdev[0]['bssid']
140 dev[0].scan_for_bss(bssid, freq=2412)
141 id = dev[0].add_network()
142 dev[0].set_network(id, "disabled", "0")
143 dev[0].set_network_quoted(id, "ssid", "test")
144
145 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
146 dbus_interface=dbus.PROPERTIES_IFACE)
147 if len(res) != 1:
148 raise Exception("Missing BSSs entry: " + str(res))
149 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
150 props = bss_obj.GetAll(WPAS_DBUS_BSS, dbus_interface=dbus.PROPERTIES_IFACE)
151 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS, res[0], str(props)))
152 bssid_str = ''
153 for item in props['BSSID']:
154 if len(bssid_str) > 0:
155 bssid_str += ':'
156 bssid_str += '%02x' % item
157 if bssid_str != bssid:
158 raise Exception("Unexpected BSSID in BSSs entry")
159
160 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
161 dbus_interface=dbus.PROPERTIES_IFACE)
162 if len(res) != 1:
163 raise Exception("Missing Networks entry: " + str(res))
164 net_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
165 props = net_obj.GetAll(WPAS_DBUS_NETWORK,
166 dbus_interface=dbus.PROPERTIES_IFACE)
167 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_NETWORK, res[0], str(props)))
168 ssid = props['Properties']['ssid']
169 if ssid != '"test"':
170 raise Exception("Unexpected SSID in network entry")
171
73ece491
JM
172def test_dbus_getall_oom(dev, apdev):
173 """D-Bus GetAll wpa_config_get_all() OOM"""
174 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
175
176 id = dev[0].add_network()
177 dev[0].set_network(id, "disabled", "0")
178 dev[0].set_network_quoted(id, "ssid", "test")
179
180 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
181 dbus_interface=dbus.PROPERTIES_IFACE)
182 if len(res) != 1:
183 raise Exception("Missing Networks entry: " + str(res))
184 net_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
185 for i in range(1, 50):
186 with alloc_fail(dev[0], i, "wpa_config_get_all"):
187 try:
188 props = net_obj.GetAll(WPAS_DBUS_NETWORK,
189 dbus_interface=dbus.PROPERTIES_IFACE)
bab493b9 190 except dbus.exceptions.DBusException as e:
73ece491
JM
191 pass
192
2ec82e67
JM
193def dbus_get(dbus, wpas_obj, prop, expect=None, byte_arrays=False):
194 val = wpas_obj.Get(WPAS_DBUS_SERVICE, prop,
195 dbus_interface=dbus.PROPERTIES_IFACE,
196 byte_arrays=byte_arrays)
197 if expect is not None and val != expect:
198 raise Exception("Unexpected %s: %s (expected: %s)" %
199 (prop, str(val), str(expect)))
200 return val
201
202def dbus_set(dbus, wpas_obj, prop, val):
203 wpas_obj.Set(WPAS_DBUS_SERVICE, prop, val,
204 dbus_interface=dbus.PROPERTIES_IFACE)
205
206def test_dbus_properties(dev, apdev):
207 """D-Bus Get/Set fi.w1.wpa_supplicant1 properties"""
910eb5b5 208 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
209
210 dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump")
211 dbus_set(dbus, wpas_obj, "DebugLevel", "debug")
212 dbus_get(dbus, wpas_obj, "DebugLevel", expect="debug")
213 for (val,err) in [ (3, "Error.Failed: wrong property type"),
214 ("foo", "Error.Failed: wrong debug level value") ]:
215 try:
216 dbus_set(dbus, wpas_obj, "DebugLevel", val)
217 raise Exception("Invalid DebugLevel value accepted: " + str(val))
bab493b9 218 except dbus.exceptions.DBusException as e:
2ec82e67
JM
219 if err not in str(e):
220 raise Exception("Unexpected error message: " + str(e))
221 dbus_set(dbus, wpas_obj, "DebugLevel", "msgdump")
222 dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump")
223
224 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True)
225 dbus_set(dbus, wpas_obj, "DebugTimestamp", False)
226 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=False)
227 try:
228 dbus_set(dbus, wpas_obj, "DebugTimestamp", "foo")
229 raise Exception("Invalid DebugTimestamp value accepted")
bab493b9 230 except dbus.exceptions.DBusException as e:
2ec82e67
JM
231 if "Error.Failed: wrong property type" not in str(e):
232 raise Exception("Unexpected error message: " + str(e))
233 dbus_set(dbus, wpas_obj, "DebugTimestamp", True)
234 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True)
235
236 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True)
237 dbus_set(dbus, wpas_obj, "DebugShowKeys", False)
238 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=False)
239 try:
240 dbus_set(dbus, wpas_obj, "DebugShowKeys", "foo")
241 raise Exception("Invalid DebugShowKeys value accepted")
bab493b9 242 except dbus.exceptions.DBusException as e:
2ec82e67
JM
243 if "Error.Failed: wrong property type" not in str(e):
244 raise Exception("Unexpected error message: " + str(e))
245 dbus_set(dbus, wpas_obj, "DebugShowKeys", True)
246 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True)
247
248 res = dbus_get(dbus, wpas_obj, "Interfaces")
249 if len(res) != 1:
250 raise Exception("Unexpected Interfaces value: " + str(res))
251
252 res = dbus_get(dbus, wpas_obj, "EapMethods")
253 if len(res) < 5 or "TTLS" not in res:
254 raise Exception("Unexpected EapMethods value: " + str(res))
255
256 res = dbus_get(dbus, wpas_obj, "Capabilities")
257 if len(res) < 2 or "p2p" not in res:
258 raise Exception("Unexpected Capabilities value: " + str(res))
259
260 dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
261 val = binascii.unhexlify("010006020304050608")
262 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val))
263 res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
264 if val != res:
265 raise Exception("WFDIEs value changed")
266 try:
15dfcb69 267 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(b'\x00'))
2ec82e67 268 raise Exception("Invalid WFDIEs value accepted")
bab493b9 269 except dbus.exceptions.DBusException as e:
2ec82e67
JM
270 if "InvalidArgs" not in str(e):
271 raise Exception("Unexpected error message: " + str(e))
15dfcb69 272 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(b''))
2ec82e67 273 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val))
15dfcb69 274 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(b''))
2ec82e67
JM
275 res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
276 if len(res) != 0:
277 raise Exception("WFDIEs not cleared properly")
278
279 res = dbus_get(dbus, wpas_obj, "EapMethods")
280 try:
281 dbus_set(dbus, wpas_obj, "EapMethods", res)
282 raise Exception("Invalid Set accepted")
bab493b9 283 except dbus.exceptions.DBusException as e:
2ec82e67
JM
284 if "InvalidArgs: Property is read-only" not in str(e):
285 raise Exception("Unexpected error message: " + str(e))
286
287 try:
288 wpas_obj.SetFoo(WPAS_DBUS_SERVICE, "DebugShowKeys", True,
289 dbus_interface=dbus.PROPERTIES_IFACE)
290 raise Exception("Unknown method accepted")
bab493b9 291 except dbus.exceptions.DBusException as e:
2ec82e67
JM
292 if "UnknownMethod" not in str(e):
293 raise Exception("Unexpected error message: " + str(e))
294
295 try:
296 wpas_obj.Get("foo", "DebugShowKeys",
297 dbus_interface=dbus.PROPERTIES_IFACE)
298 raise Exception("Invalid Get accepted")
bab493b9 299 except dbus.exceptions.DBusException as e:
2ec82e67
JM
300 if "InvalidArgs: No such property" not in str(e):
301 raise Exception("Unexpected error message: " + str(e))
302
303 test_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH,
304 introspect=False)
305 try:
306 test_obj.Get(123, "DebugShowKeys",
307 dbus_interface=dbus.PROPERTIES_IFACE)
308 raise Exception("Invalid Get accepted")
bab493b9 309 except dbus.exceptions.DBusException as e:
2ec82e67
JM
310 if "InvalidArgs: Invalid arguments" not in str(e):
311 raise Exception("Unexpected error message: " + str(e))
312 try:
313 test_obj.Get(WPAS_DBUS_SERVICE, 123,
314 dbus_interface=dbus.PROPERTIES_IFACE)
315 raise Exception("Invalid Get accepted")
bab493b9 316 except dbus.exceptions.DBusException as e:
2ec82e67
JM
317 if "InvalidArgs: Invalid arguments" not in str(e):
318 raise Exception("Unexpected error message: " + str(e))
319
320 try:
321 wpas_obj.Set(WPAS_DBUS_SERVICE, "WFDIEs",
15dfcb69 322 dbus.ByteArray(b'', variant_level=2),
2ec82e67
JM
323 dbus_interface=dbus.PROPERTIES_IFACE)
324 raise Exception("Invalid Set accepted")
bab493b9 325 except dbus.exceptions.DBusException as e:
2ec82e67
JM
326 if "InvalidArgs: invalid message format" not in str(e):
327 raise Exception("Unexpected error message: " + str(e))
328
f0ef6889
DW
329def test_dbus_set_global_properties(dev, apdev):
330 """D-Bus Get/Set fi.w1.wpa_supplicant1 interface global properties"""
331 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
332
e51e49fc 333 dev[0].set("model_name", "")
f0ef6889
DW
334 props = [ ('Okc', '0', '1'), ('ModelName', '', 'blahblahblah') ]
335
336 for p in props:
337 res = if_obj.Get(WPAS_DBUS_IFACE, p[0],
338 dbus_interface=dbus.PROPERTIES_IFACE)
339 if res != p[1]:
340 raise Exception("Unexpected " + p[0] + " value: " + str(res))
341
342 if_obj.Set(WPAS_DBUS_IFACE, p[0], p[2],
343 dbus_interface=dbus.PROPERTIES_IFACE)
344
345 res = if_obj.Get(WPAS_DBUS_IFACE, p[0],
346 dbus_interface=dbus.PROPERTIES_IFACE)
347 if res != p[2]:
348 raise Exception("Unexpected " + p[0] + " value after set: " + str(res))
e51e49fc 349 dev[0].set("model_name", "")
f0ef6889 350
2ec82e67
JM
351def test_dbus_invalid_method(dev, apdev):
352 """D-Bus invalid method"""
910eb5b5 353 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
354 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
355
356 try:
357 wps.Foo()
358 raise Exception("Unknown method accepted")
bab493b9 359 except dbus.exceptions.DBusException as e:
2ec82e67
JM
360 if "UnknownMethod" not in str(e):
361 raise Exception("Unexpected error message: " + str(e))
362
363 test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False)
364 test_wps = dbus.Interface(test_obj, WPAS_DBUS_IFACE_WPS)
365 try:
366 test_wps.Start(123)
367 raise Exception("WPS.Start with incorrect signature accepted")
bab493b9 368 except dbus.exceptions.DBusException as e:
2ec82e67
JM
369 if "InvalidArgs: Invalid arg" not in str(e):
370 raise Exception("Unexpected error message: " + str(e))
371
372def test_dbus_get_set_wps(dev, apdev):
373 """D-Bus Get/Set for WPS properties"""
374 try:
81e787b7 375 _test_dbus_get_set_wps(dev, apdev)
2ec82e67
JM
376 finally:
377 dev[0].request("SET wps_cred_processing 0")
364e28c9 378 dev[0].request("SET config_methods display keypad virtual_display nfc_interface p2ps")
4e6bd667
JM
379 dev[0].set("device_name", "Device A")
380 dev[0].set("manufacturer", "")
381 dev[0].set("model_name", "")
382 dev[0].set("model_number", "")
383 dev[0].set("serial_number", "")
384 dev[0].set("device_type", "0-00000000-0")
2ec82e67
JM
385
386def _test_dbus_get_set_wps(dev, apdev):
910eb5b5 387 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
388
389 if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
390 dbus_interface=dbus.PROPERTIES_IFACE)
391
392 val = "display keypad virtual_display nfc_interface"
393 dev[0].request("SET config_methods " + val)
394
395 config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
396 dbus_interface=dbus.PROPERTIES_IFACE)
397 if config != val:
398 raise Exception("Unexpected Get(ConfigMethods) result: " + config)
399
400 val2 = "push_button display"
401 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2,
402 dbus_interface=dbus.PROPERTIES_IFACE)
403 config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
404 dbus_interface=dbus.PROPERTIES_IFACE)
405 if config != val2:
406 raise Exception("Unexpected Get(ConfigMethods) result after Set: " + config)
407
408 dev[0].request("SET config_methods " + val)
409
410 for i in range(3):
411 dev[0].request("SET wps_cred_processing " + str(i))
412 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
413 dbus_interface=dbus.PROPERTIES_IFACE)
414 expected_val = False if i == 1 else True
415 if val != expected_val:
416 raise Exception("Unexpected Get(ProcessCredentials) result({}): {}".format(i, val))
417
4e6bd667
JM
418 tests = [ ("device_name", "DeviceName"),
419 ("manufacturer", "Manufacturer"),
420 ("model_name", "ModelName"),
421 ("model_number", "ModelNumber"),
422 ("serial_number", "SerialNumber") ]
423
424 for f1,f2 in tests:
425 val2 = "test-value-test"
426 dev[0].set(f1, val2)
427 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, f2,
428 dbus_interface=dbus.PROPERTIES_IFACE)
429 if val != val2:
430 raise Exception("Get(%s) returned unexpected value" % f2)
431 val2 = "TEST-value"
432 if_obj.Set(WPAS_DBUS_IFACE_WPS, f2, val2,
433 dbus_interface=dbus.PROPERTIES_IFACE)
434 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, f2,
435 dbus_interface=dbus.PROPERTIES_IFACE)
436 if val != val2:
437 raise Exception("Get(%s) returned unexpected value after Set" % f2)
438
439 dev[0].set("device_type", "5-0050F204-1")
440 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "DeviceType",
441 dbus_interface=dbus.PROPERTIES_IFACE)
442 if val[0] != 0x00 or val[1] != 0x05 != val[2] != 0x00 or val[3] != 0x50 or val[4] != 0xf2 or val[5] != 0x04 or val[6] != 0x00 or val[7] != 0x01:
443 raise Exception("DeviceType mismatch")
444 if_obj.Set(WPAS_DBUS_IFACE_WPS, "DeviceType", val,
445 dbus_interface=dbus.PROPERTIES_IFACE)
446 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "DeviceType",
447 dbus_interface=dbus.PROPERTIES_IFACE)
448 if val[0] != 0x00 or val[1] != 0x05 != val[2] != 0x00 or val[3] != 0x50 or val[4] != 0xf2 or val[5] != 0x04 or val[6] != 0x00 or val[7] != 0x01:
449 raise Exception("DeviceType mismatch after Set")
450
15dfcb69 451 val2 = b'\x01\x02\x03\x04\x05\x06\x07\x08'
4e6bd667
JM
452 if_obj.Set(WPAS_DBUS_IFACE_WPS, "DeviceType", dbus.ByteArray(val2),
453 dbus_interface=dbus.PROPERTIES_IFACE)
454 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "DeviceType",
455 dbus_interface=dbus.PROPERTIES_IFACE,
456 byte_arrays=True)
457 if val != val2:
458 raise Exception("DeviceType mismatch after Set (2)")
459
2ec82e67
JM
460 class TestDbusGetSet(TestDbus):
461 def __init__(self, bus):
462 TestDbus.__init__(self, bus)
463 self.signal_received = False
464 self.signal_received_deprecated = False
465 self.sets_done = False
466
467 def __enter__(self):
468 gobject.timeout_add(1, self.run_sets)
469 gobject.timeout_add(1000, self.timeout)
470 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE_WPS,
471 "PropertiesChanged")
472 self.add_signal(self.propertiesChanged2, dbus.PROPERTIES_IFACE,
473 "PropertiesChanged")
474 self.loop.run()
475 return self
476
477 def propertiesChanged(self, properties):
478 logger.debug("PropertiesChanged: " + str(properties))
b723b259 479 if "ProcessCredentials" in properties:
2ec82e67
JM
480 self.signal_received_deprecated = True
481 if self.sets_done and self.signal_received:
482 self.loop.quit()
483
484 def propertiesChanged2(self, interface_name, changed_properties,
485 invalidated_properties):
486 logger.debug("propertiesChanged2: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
487 if interface_name != WPAS_DBUS_IFACE_WPS:
488 return
b723b259 489 if "ProcessCredentials" in changed_properties:
2ec82e67
JM
490 self.signal_received = True
491 if self.sets_done and self.signal_received_deprecated:
492 self.loop.quit()
493
494 def run_sets(self, *args):
495 logger.debug("run_sets")
496 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
497 dbus.Boolean(1),
498 dbus_interface=dbus.PROPERTIES_IFACE)
499 if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
500 dbus_interface=dbus.PROPERTIES_IFACE) != True:
bc6e3288 501 raise Exception("Unexpected Get(ProcessCredentials) result after Set")
2ec82e67
JM
502 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
503 dbus.Boolean(0),
504 dbus_interface=dbus.PROPERTIES_IFACE)
505 if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
506 dbus_interface=dbus.PROPERTIES_IFACE) != False:
bc6e3288 507 raise Exception("Unexpected Get(ProcessCredentials) result after Set")
2ec82e67
JM
508
509 self.dbus_sets_done = True
510 return False
511
512 def success(self):
513 return self.signal_received and self.signal_received_deprecated
514
515 with TestDbusGetSet(bus) as t:
516 if not t.success():
517 raise Exception("No signal received for ProcessCredentials change")
518
519def test_dbus_wps_invalid(dev, apdev):
520 """D-Bus invaldi WPS operation"""
910eb5b5 521 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
522 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
523
524 failures = [ {'Role': 'foo', 'Type': 'pbc'},
525 {'Role': 123, 'Type': 'pbc'},
526 {'Type': 'pbc'},
527 {'Role': 'enrollee'},
528 {'Role': 'registrar'},
529 {'Role': 'enrollee', 'Type': 123},
530 {'Role': 'enrollee', 'Type': 'foo'},
531 {'Role': 'enrollee', 'Type': 'pbc',
532 'Bssid': '02:33:44:55:66:77'},
533 {'Role': 'enrollee', 'Type': 'pin', 'Pin': 123},
534 {'Role': 'enrollee', 'Type': 'pbc',
15dfcb69 535 'Bssid': dbus.ByteArray(b'12345')},
2ec82e67
JM
536 {'Role': 'enrollee', 'Type': 'pbc',
537 'P2PDeviceAddress': 12345},
538 {'Role': 'enrollee', 'Type': 'pbc',
15dfcb69 539 'P2PDeviceAddress': dbus.ByteArray(b'12345')},
2ec82e67
JM
540 {'Role': 'enrollee', 'Type': 'pbc', 'Foo': 'bar'} ]
541 for args in failures:
542 try:
543 wps.Start(args)
544 raise Exception("Invalid WPS.Start() arguments accepted: " + str(args))
bab493b9 545 except dbus.exceptions.DBusException as e:
2ec82e67
JM
546 if not str(e).startswith("fi.w1.wpa_supplicant1.InvalidArgs"):
547 raise Exception("Unexpected error message: " + str(e))
548
0e126c6d
JM
549def test_dbus_wps_oom(dev, apdev):
550 """D-Bus WPS operation (OOM)"""
551 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
552 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
553
554 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_state", "Get"):
555 if_obj.Get(WPAS_DBUS_IFACE, "State",
556 dbus_interface=dbus.PROPERTIES_IFACE)
557
8b8a1864 558 hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
0e126c6d
JM
559 bssid = apdev[0]['bssid']
560 dev[0].scan_for_bss(bssid, freq=2412)
561
1d32bc2c 562 time.sleep(0.05)
0e126c6d
JM
563 for i in range(1, 3):
564 with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_bsss", "Get"):
565 if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
566 dbus_interface=dbus.PROPERTIES_IFACE)
567
568 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
569 dbus_interface=dbus.PROPERTIES_IFACE)
570 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
571 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_bss_rates", "Get"):
572 bss_obj.Get(WPAS_DBUS_BSS, "Rates",
573 dbus_interface=dbus.PROPERTIES_IFACE)
20c7d26f
JM
574 with alloc_fail(dev[0], 1,
575 "wpa_bss_get_bit_rates;wpas_dbus_getter_bss_rates"):
576 try:
577 bss_obj.Get(WPAS_DBUS_BSS, "Rates",
578 dbus_interface=dbus.PROPERTIES_IFACE)
bab493b9 579 except dbus.exceptions.DBusException as e:
20c7d26f 580 pass
0e126c6d
JM
581
582 id = dev[0].add_network()
583 dev[0].set_network(id, "disabled", "0")
584 dev[0].set_network_quoted(id, "ssid", "test")
585
586 for i in range(1, 3):
587 with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_networks", "Get"):
588 if_obj.Get(WPAS_DBUS_IFACE, "Networks",
589 dbus_interface=dbus.PROPERTIES_IFACE)
590
591 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_getter_interfaces", "Get"):
592 dbus_get(dbus, wpas_obj, "Interfaces")
593
594 for i in range(1, 6):
595 with alloc_fail_dbus(dev[0], i, "=eap_get_names_as_string_array;wpas_dbus_getter_eap_methods", "Get"):
596 dbus_get(dbus, wpas_obj, "EapMethods")
597
598 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_setter_config_methods", "Set",
599 expected="Error.Failed: Failed to set property"):
600 val2 = "push_button display"
601 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2,
602 dbus_interface=dbus.PROPERTIES_IFACE)
603
604 with alloc_fail_dbus(dev[0], 1, "=wpa_config_add_network;wpas_dbus_handler_wps_start",
605 "WPS.Start",
606 expected="UnknownError: WPS start failed"):
607 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670'})
608
2ec82e67
JM
609def test_dbus_wps_pbc(dev, apdev):
610 """D-Bus WPS/PBC operation and signals"""
611 try:
81e787b7 612 _test_dbus_wps_pbc(dev, apdev)
2ec82e67
JM
613 finally:
614 dev[0].request("SET wps_cred_processing 0")
615
616def _test_dbus_wps_pbc(dev, apdev):
910eb5b5 617 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
618 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
619
620 hapd = start_ap(apdev[0])
621 hapd.request("WPS_PBC")
622 bssid = apdev[0]['bssid']
623 dev[0].scan_for_bss(bssid, freq="2412")
624 dev[0].request("SET wps_cred_processing 2")
625
1d0a917f
JM
626 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
627 dbus_interface=dbus.PROPERTIES_IFACE)
628 if len(res) != 1:
629 raise Exception("Missing BSSs entry: " + str(res))
630 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
631 props = bss_obj.GetAll(WPAS_DBUS_BSS, dbus_interface=dbus.PROPERTIES_IFACE)
632 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS, res[0], str(props)))
633 if 'WPS' not in props:
634 raise Exception("No WPS information in the BSS entry")
635 if 'Type' not in props['WPS']:
636 raise Exception("No Type field in the WPS dictionary")
637 if props['WPS']['Type'] != 'pbc':
638 raise Exception("Unexpected WPS Type: " + props['WPS']['Type'])
639
2ec82e67
JM
640 class TestDbusWps(TestDbus):
641 def __init__(self, bus, wps):
642 TestDbus.__init__(self, bus)
643 self.success_seen = False
644 self.credentials_received = False
645 self.wps = wps
646
647 def __enter__(self):
648 gobject.timeout_add(1, self.start_pbc)
649 gobject.timeout_add(15000, self.timeout)
650 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
651 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
652 "Credentials")
653 self.loop.run()
654 return self
655
656 def wpsEvent(self, name, args):
657 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
658 if name == "success":
659 self.success_seen = True
660 if self.credentials_received:
661 self.loop.quit()
662
663 def credentials(self, args):
664 logger.debug("credentials: " + str(args))
665 self.credentials_received = True
666 if self.success_seen:
667 self.loop.quit()
668
669 def start_pbc(self, *args):
670 logger.debug("start_pbc")
671 self.wps.Start({'Role': 'enrollee', 'Type': 'pbc'})
672 return False
673
674 def success(self):
675 return self.success_seen and self.credentials_received
676
677 with TestDbusWps(bus, wps) as t:
678 if not t.success():
679 raise Exception("Failure in D-Bus operations")
680
681 dev[0].wait_connected(timeout=10)
682 dev[0].request("DISCONNECT")
683 hapd.disable()
684 dev[0].flush_scan_cache()
685
3a59cda1
JM
686def test_dbus_wps_pbc_overlap(dev, apdev):
687 """D-Bus WPS/PBC operation and signal for PBC overlap"""
688 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
689 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
690
691 hapd = start_ap(apdev[0])
692 hapd2 = start_ap(apdev[1], ssid="test-wps2",
693 ap_uuid="27ea801a-9e5c-4e73-bd82-f89cbcd10d7f")
694 hapd.request("WPS_PBC")
695 hapd2.request("WPS_PBC")
696 bssid = apdev[0]['bssid']
697 dev[0].scan_for_bss(bssid, freq="2412")
698 bssid2 = apdev[1]['bssid']
699 dev[0].scan_for_bss(bssid2, freq="2412")
700
701 class TestDbusWps(TestDbus):
702 def __init__(self, bus, wps):
703 TestDbus.__init__(self, bus)
704 self.overlap_seen = False
705 self.wps = wps
706
707 def __enter__(self):
708 gobject.timeout_add(1, self.start_pbc)
709 gobject.timeout_add(15000, self.timeout)
710 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
711 self.loop.run()
712 return self
713
714 def wpsEvent(self, name, args):
715 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
716 if name == "pbc-overlap":
717 self.overlap_seen = True
718 self.loop.quit()
719
720 def start_pbc(self, *args):
721 logger.debug("start_pbc")
722 self.wps.Start({'Role': 'enrollee', 'Type': 'pbc'})
723 return False
724
725 def success(self):
726 return self.overlap_seen
727
728 with TestDbusWps(bus, wps) as t:
729 if not t.success():
730 raise Exception("Failure in D-Bus operations")
731
732 dev[0].request("WPS_CANCEL")
733 dev[0].request("DISCONNECT")
734 hapd.disable()
735 dev[0].flush_scan_cache()
736
2ec82e67
JM
737def test_dbus_wps_pin(dev, apdev):
738 """D-Bus WPS/PIN operation and signals"""
739 try:
81e787b7 740 _test_dbus_wps_pin(dev, apdev)
2ec82e67
JM
741 finally:
742 dev[0].request("SET wps_cred_processing 0")
743
744def _test_dbus_wps_pin(dev, apdev):
910eb5b5 745 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
746 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
747
748 hapd = start_ap(apdev[0])
749 hapd.request("WPS_PIN any 12345670")
750 bssid = apdev[0]['bssid']
751 dev[0].scan_for_bss(bssid, freq="2412")
752 dev[0].request("SET wps_cred_processing 2")
753
754 class TestDbusWps(TestDbus):
755 def __init__(self, bus):
756 TestDbus.__init__(self, bus)
757 self.success_seen = False
758 self.credentials_received = False
759
760 def __enter__(self):
761 gobject.timeout_add(1, self.start_pin)
762 gobject.timeout_add(15000, self.timeout)
763 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
764 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
765 "Credentials")
766 self.loop.run()
767 return self
768
769 def wpsEvent(self, name, args):
770 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
771 if name == "success":
772 self.success_seen = True
773 if self.credentials_received:
774 self.loop.quit()
775
776 def credentials(self, args):
777 logger.debug("credentials: " + str(args))
778 self.credentials_received = True
779 if self.success_seen:
780 self.loop.quit()
781
782 def start_pin(self, *args):
783 logger.debug("start_pin")
e1810300 784 bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':','').encode()))
2ec82e67
JM
785 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
786 'Bssid': bssid_ay})
787 return False
788
789 def success(self):
790 return self.success_seen and self.credentials_received
791
792 with TestDbusWps(bus) as t:
793 if not t.success():
794 raise Exception("Failure in D-Bus operations")
795
796 dev[0].wait_connected(timeout=10)
797
798def test_dbus_wps_pin2(dev, apdev):
799 """D-Bus WPS/PIN operation and signals (PIN from wpa_supplicant)"""
800 try:
81e787b7 801 _test_dbus_wps_pin2(dev, apdev)
2ec82e67
JM
802 finally:
803 dev[0].request("SET wps_cred_processing 0")
804
805def _test_dbus_wps_pin2(dev, apdev):
910eb5b5 806 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
807 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
808
809 hapd = start_ap(apdev[0])
810 bssid = apdev[0]['bssid']
811 dev[0].scan_for_bss(bssid, freq="2412")
812 dev[0].request("SET wps_cred_processing 2")
813
814 class TestDbusWps(TestDbus):
815 def __init__(self, bus):
816 TestDbus.__init__(self, bus)
817 self.success_seen = False
818 self.failed = False
819
820 def __enter__(self):
821 gobject.timeout_add(1, self.start_pin)
822 gobject.timeout_add(15000, self.timeout)
823 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
824 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
825 "Credentials")
826 self.loop.run()
827 return self
828
829 def wpsEvent(self, name, args):
830 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
831 if name == "success":
832 self.success_seen = True
833 if self.credentials_received:
834 self.loop.quit()
835
836 def credentials(self, args):
837 logger.debug("credentials: " + str(args))
838 self.credentials_received = True
839 if self.success_seen:
840 self.loop.quit()
841
842 def start_pin(self, *args):
843 logger.debug("start_pin")
e1810300 844 bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':','').encode()))
2ec82e67
JM
845 res = wps.Start({'Role': 'enrollee', 'Type': 'pin',
846 'Bssid': bssid_ay})
847 pin = res['Pin']
848 h = hostapd.Hostapd(apdev[0]['ifname'])
849 h.request("WPS_PIN any " + pin)
850 return False
851
852 def success(self):
853 return self.success_seen and self.credentials_received
854
855 with TestDbusWps(bus) as t:
856 if not t.success():
857 raise Exception("Failure in D-Bus operations")
858
859 dev[0].wait_connected(timeout=10)
860
861def test_dbus_wps_pin_m2d(dev, apdev):
862 """D-Bus WPS/PIN operation and signals with M2D"""
863 try:
81e787b7 864 _test_dbus_wps_pin_m2d(dev, apdev)
2ec82e67
JM
865 finally:
866 dev[0].request("SET wps_cred_processing 0")
867
868def _test_dbus_wps_pin_m2d(dev, apdev):
910eb5b5 869 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
870 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
871
872 hapd = start_ap(apdev[0])
873 bssid = apdev[0]['bssid']
874 dev[0].scan_for_bss(bssid, freq="2412")
875 dev[0].request("SET wps_cred_processing 2")
876
877 class TestDbusWps(TestDbus):
878 def __init__(self, bus):
879 TestDbus.__init__(self, bus)
880 self.success_seen = False
881 self.credentials_received = False
882
883 def __enter__(self):
884 gobject.timeout_add(1, self.start_pin)
885 gobject.timeout_add(15000, self.timeout)
886 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
887 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
888 "Credentials")
889 self.loop.run()
890 return self
891
892 def wpsEvent(self, name, args):
893 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
894 if name == "success":
895 self.success_seen = True
896 if self.credentials_received:
897 self.loop.quit()
898 elif name == "m2d":
899 h = hostapd.Hostapd(apdev[0]['ifname'])
900 h.request("WPS_PIN any 12345670")
901
902 def credentials(self, args):
903 logger.debug("credentials: " + str(args))
904 self.credentials_received = True
905 if self.success_seen:
906 self.loop.quit()
907
908 def start_pin(self, *args):
909 logger.debug("start_pin")
e1810300 910 bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':','').encode()))
2ec82e67
JM
911 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
912 'Bssid': bssid_ay})
913 return False
914
915 def success(self):
916 return self.success_seen and self.credentials_received
917
918 with TestDbusWps(bus) as t:
919 if not t.success():
920 raise Exception("Failure in D-Bus operations")
921
922 dev[0].wait_connected(timeout=10)
923
924def test_dbus_wps_reg(dev, apdev):
925 """D-Bus WPS/Registrar operation and signals"""
926 try:
81e787b7 927 _test_dbus_wps_reg(dev, apdev)
2ec82e67
JM
928 finally:
929 dev[0].request("SET wps_cred_processing 0")
930
931def _test_dbus_wps_reg(dev, apdev):
910eb5b5 932 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
933 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
934
935 hapd = start_ap(apdev[0])
936 hapd.request("WPS_PIN any 12345670")
937 bssid = apdev[0]['bssid']
938 dev[0].scan_for_bss(bssid, freq="2412")
939 dev[0].request("SET wps_cred_processing 2")
940
941 class TestDbusWps(TestDbus):
942 def __init__(self, bus):
943 TestDbus.__init__(self, bus)
944 self.credentials_received = False
945
946 def __enter__(self):
947 gobject.timeout_add(100, self.start_reg)
948 gobject.timeout_add(15000, self.timeout)
949 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
950 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
951 "Credentials")
952 self.loop.run()
953 return self
954
955 def wpsEvent(self, name, args):
956 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
957
958 def credentials(self, args):
959 logger.debug("credentials: " + str(args))
960 self.credentials_received = True
961 self.loop.quit()
962
963 def start_reg(self, *args):
964 logger.debug("start_reg")
e1810300 965 bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':','').encode()))
2ec82e67
JM
966 wps.Start({'Role': 'registrar', 'Type': 'pin',
967 'Pin': '12345670', 'Bssid': bssid_ay})
968 return False
969
970 def success(self):
971 return self.credentials_received
972
973 with TestDbusWps(bus) as t:
974 if not t.success():
975 raise Exception("Failure in D-Bus operations")
976
977 dev[0].wait_connected(timeout=10)
978
2a8e3f35
JM
979def test_dbus_wps_cancel(dev, apdev):
980 """D-Bus WPS Cancel operation"""
981 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
982 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
983
984 hapd = start_ap(apdev[0])
985 bssid = apdev[0]['bssid']
986
987 wps.Cancel()
988 dev[0].scan_for_bss(bssid, freq="2412")
e1810300 989 bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':','').encode()))
2a8e3f35
JM
990 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
991 'Bssid': bssid_ay})
992 wps.Cancel()
993 dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 1)
994
2ec82e67
JM
995def test_dbus_scan_invalid(dev, apdev):
996 """D-Bus invalid scan method"""
910eb5b5 997 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
998 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
999
1000 tests = [ ({}, "InvalidArgs"),
1001 ({'Type': 123}, "InvalidArgs"),
1002 ({'Type': 'foo'}, "InvalidArgs"),
1003 ({'Type': 'active', 'Foo': 'bar'}, "InvalidArgs"),
1004 ({'Type': 'active', 'SSIDs': 'foo'}, "InvalidArgs"),
1005 ({'Type': 'active', 'SSIDs': ['foo']}, "InvalidArgs"),
1006 ({'Type': 'active',
15dfcb69
MH
1007 'SSIDs': [ dbus.ByteArray(b"1"), dbus.ByteArray(b"2"),
1008 dbus.ByteArray(b"3"), dbus.ByteArray(b"4"),
1009 dbus.ByteArray(b"5"), dbus.ByteArray(b"6"),
1010 dbus.ByteArray(b"7"), dbus.ByteArray(b"8"),
1011 dbus.ByteArray(b"9"), dbus.ByteArray(b"10"),
1012 dbus.ByteArray(b"11"), dbus.ByteArray(b"12"),
1013 dbus.ByteArray(b"13"), dbus.ByteArray(b"14"),
1014 dbus.ByteArray(b"15"), dbus.ByteArray(b"16"),
1015 dbus.ByteArray(b"17") ]},
2ec82e67
JM
1016 "InvalidArgs"),
1017 ({'Type': 'active',
15dfcb69 1018 'SSIDs': [ dbus.ByteArray(b"1234567890abcdef1234567890abcdef1") ]},
2ec82e67
JM
1019 "InvalidArgs"),
1020 ({'Type': 'active', 'IEs': 'foo'}, "InvalidArgs"),
1021 ({'Type': 'active', 'IEs': ['foo']}, "InvalidArgs"),
1022 ({'Type': 'active', 'Channels': 2412 }, "InvalidArgs"),
1023 ({'Type': 'active', 'Channels': [ 2412 ] }, "InvalidArgs"),
1024 ({'Type': 'active',
1025 'Channels': [ (dbus.Int32(2412), dbus.UInt32(20)) ] },
1026 "InvalidArgs"),
1027 ({'Type': 'active',
1028 'Channels': [ (dbus.UInt32(2412), dbus.Int32(20)) ] },
1029 "InvalidArgs"),
1030 ({'Type': 'active', 'AllowRoam': "yes" }, "InvalidArgs"),
15dfcb69 1031 ({'Type': 'passive', 'IEs': [ dbus.ByteArray(b"\xdd\x00") ]},
2ec82e67 1032 "InvalidArgs"),
15dfcb69 1033 ({'Type': 'passive', 'SSIDs': [ dbus.ByteArray(b"foo") ]},
2ec82e67
JM
1034 "InvalidArgs")]
1035 for (t,err) in tests:
1036 try:
1037 iface.Scan(t)
1038 raise Exception("Invalid Scan() arguments accepted: " + str(t))
bab493b9 1039 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1040 if err not in str(e):
1041 raise Exception("Unexpected error message for invalid Scan(%s): %s" % (str(t), str(e)))
1042
0e126c6d
JM
1043def test_dbus_scan_oom(dev, apdev):
1044 """D-Bus scan method and OOM"""
1045 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1046 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1047
1048 with alloc_fail_dbus(dev[0], 1,
1049 "wpa_scan_clone_params;wpas_dbus_handler_scan",
1050 "Scan", expected="ScanError: Scan request rejected"):
1051 iface.Scan({ 'Type': 'passive',
1052 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
1053
1054 with alloc_fail_dbus(dev[0], 1,
1055 "=wpas_dbus_get_scan_channels;wpas_dbus_handler_scan",
1056 "Scan"):
1057 iface.Scan({ 'Type': 'passive',
1058 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
1059
1060 with alloc_fail_dbus(dev[0], 1,
1061 "=wpas_dbus_get_scan_ies;wpas_dbus_handler_scan",
1062 "Scan"):
1063 iface.Scan({ 'Type': 'active',
15dfcb69 1064 'IEs': [ dbus.ByteArray(b"\xdd\x00") ],
0e126c6d
JM
1065 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
1066
1067 with alloc_fail_dbus(dev[0], 1,
1068 "=wpas_dbus_get_scan_ssids;wpas_dbus_handler_scan",
1069 "Scan"):
1070 iface.Scan({ 'Type': 'active',
15dfcb69 1071 'SSIDs': [ dbus.ByteArray(b"open"),
0e126c6d
JM
1072 dbus.ByteArray() ],
1073 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
1074
2ec82e67
JM
1075def test_dbus_scan(dev, apdev):
1076 """D-Bus scan and related signals"""
910eb5b5 1077 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1078 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1079
8b8a1864 1080 hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
2ec82e67
JM
1081
1082 class TestDbusScan(TestDbus):
1083 def __init__(self, bus):
1084 TestDbus.__init__(self, bus)
1085 self.scan_completed = 0
1086 self.bss_added = False
1d0a917f 1087 self.fail_reason = None
2ec82e67
JM
1088
1089 def __enter__(self):
1090 gobject.timeout_add(1, self.run_scan)
1091 gobject.timeout_add(15000, self.timeout)
1092 self.add_signal(self.scanDone, WPAS_DBUS_IFACE, "ScanDone")
1093 self.add_signal(self.bssAdded, WPAS_DBUS_IFACE, "BSSAdded")
1094 self.add_signal(self.bssRemoved, WPAS_DBUS_IFACE, "BSSRemoved")
1095 self.loop.run()
1096 return self
1097
1098 def scanDone(self, success):
1099 logger.debug("scanDone: success=%s" % success)
1100 self.scan_completed += 1
1101 if self.scan_completed == 1:
1102 iface.Scan({'Type': 'passive',
1103 'AllowRoam': True,
1104 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1105 elif self.scan_completed == 2:
1106 iface.Scan({'Type': 'passive',
1107 'AllowRoam': False})
1108 elif self.bss_added and self.scan_completed == 3:
1109 self.loop.quit()
1110
1111 def bssAdded(self, bss, properties):
1112 logger.debug("bssAdded: %s" % bss)
1113 logger.debug(str(properties))
1d0a917f
JM
1114 if 'WPS' in properties:
1115 if 'Type' in properties['WPS']:
1116 self.fail_reason = "Unexpected WPS dictionary entry in non-WPS BSS"
1117 self.loop.quit()
2ec82e67
JM
1118 self.bss_added = True
1119 if self.scan_completed == 3:
1120 self.loop.quit()
1121
1122 def bssRemoved(self, bss):
1123 logger.debug("bssRemoved: %s" % bss)
1124
1125 def run_scan(self, *args):
1126 logger.debug("run_scan")
1127 iface.Scan({'Type': 'active',
15dfcb69 1128 'SSIDs': [ dbus.ByteArray(b"open"),
2ec82e67 1129 dbus.ByteArray() ],
15dfcb69 1130 'IEs': [ dbus.ByteArray(b"\xdd\x00"),
2ec82e67
JM
1131 dbus.ByteArray() ],
1132 'AllowRoam': False,
1133 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1134 return False
1135
1136 def success(self):
1137 return self.scan_completed == 3 and self.bss_added
1138
1139 with TestDbusScan(bus) as t:
1d0a917f
JM
1140 if t.fail_reason:
1141 raise Exception(t.fail_reason)
2ec82e67
JM
1142 if not t.success():
1143 raise Exception("Expected signals not seen")
1144
1145 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
1146 dbus_interface=dbus.PROPERTIES_IFACE)
1147 if len(res) < 1:
1148 raise Exception("Scan result not in BSSs property")
1149 iface.FlushBSS(0)
1150 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
1151 dbus_interface=dbus.PROPERTIES_IFACE)
1152 if len(res) != 0:
1153 raise Exception("FlushBSS() did not remove scan results from BSSs property")
1154 iface.FlushBSS(1)
1155
1156def test_dbus_scan_busy(dev, apdev):
1157 """D-Bus scan trigger rejection when busy with previous scan"""
910eb5b5 1158 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1159 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1160
1161 if "OK" not in dev[0].request("SCAN freq=2412-2462"):
1162 raise Exception("Failed to start scan")
1163 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-STARTED"], 15)
1164 if ev is None:
1165 raise Exception("Scan start timed out")
1166
1167 try:
1168 iface.Scan({'Type': 'active', 'AllowRoam': False})
1169 raise Exception("Scan() accepted when busy")
bab493b9 1170 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1171 if "ScanError: Scan request reject" not in str(e):
1172 raise Exception("Unexpected error message: " + str(e))
1173
1174 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
1175 if ev is None:
1176 raise Exception("Scan timed out")
1177
0238e8ba
JM
1178def test_dbus_scan_abort(dev, apdev):
1179 """D-Bus scan trigger and abort"""
1180 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1181 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1182
1183 iface.Scan({'Type': 'active', 'AllowRoam': False})
1184 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-STARTED"], 15)
1185 if ev is None:
1186 raise Exception("Scan start timed out")
1187
1188 iface.AbortScan()
f41f04d0
JM
1189 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
1190 if ev is None:
1191 raise Exception("Scan abort result timed out")
1192 dev[0].dump_monitor()
0238e8ba
JM
1193 iface.Scan({'Type': 'active', 'AllowRoam': False})
1194 iface.AbortScan()
1195
1196 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
1197 if ev is None:
1198 raise Exception("Scan timed out")
1199
2ec82e67
JM
1200def test_dbus_connect(dev, apdev):
1201 """D-Bus AddNetwork and connect"""
910eb5b5 1202 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1203 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1204
1205 ssid = "test-wpa2-psk"
1206 passphrase = 'qwertyuiop'
1207 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
8b8a1864 1208 hapd = hostapd.add_ap(apdev[0], params)
2ec82e67
JM
1209
1210 class TestDbusConnect(TestDbus):
1211 def __init__(self, bus):
1212 TestDbus.__init__(self, bus)
1213 self.network_added = False
1214 self.network_selected = False
1215 self.network_removed = False
1216 self.state = 0
1217
1218 def __enter__(self):
1219 gobject.timeout_add(1, self.run_connect)
1220 gobject.timeout_add(15000, self.timeout)
1221 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
1222 self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE,
1223 "NetworkRemoved")
1224 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
1225 "NetworkSelected")
1226 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1227 "PropertiesChanged")
1228 self.loop.run()
1229 return self
1230
1231 def networkAdded(self, network, properties):
1232 logger.debug("networkAdded: %s" % str(network))
1233 logger.debug(str(properties))
1234 self.network_added = True
1235
1236 def networkRemoved(self, network):
1237 logger.debug("networkRemoved: %s" % str(network))
1238 self.network_removed = True
1239
1240 def networkSelected(self, network):
1241 logger.debug("networkSelected: %s" % str(network))
1242 self.network_selected = True
1243
1244 def propertiesChanged(self, properties):
1245 logger.debug("propertiesChanged: %s" % str(properties))
1246 if 'State' in properties and properties['State'] == "completed":
1247 if self.state == 0:
1248 self.state = 1
1249 iface.Disconnect()
1250 elif self.state == 2:
1251 self.state = 3
1252 iface.Disconnect()
1253 elif self.state == 4:
1254 self.state = 5
1255 iface.Reattach()
1256 elif self.state == 5:
1257 self.state = 6
13b34972
JM
1258 iface.Disconnect()
1259 elif self.state == 7:
1260 self.state = 8
2ec82e67
JM
1261 res = iface.SignalPoll()
1262 logger.debug("SignalPoll: " + str(res))
1263 if 'frequency' not in res or res['frequency'] != 2412:
1264 self.state = -1
1265 logger.info("Unexpected SignalPoll result")
1266 iface.RemoveNetwork(self.netw)
1267 if 'State' in properties and properties['State'] == "disconnected":
1268 if self.state == 1:
1269 self.state = 2
1270 iface.SelectNetwork(self.netw)
1271 elif self.state == 3:
1272 self.state = 4
1273 iface.Reassociate()
1274 elif self.state == 6:
1275 self.state = 7
13b34972
JM
1276 iface.Reconnect()
1277 elif self.state == 8:
1278 self.state = 9
2ec82e67
JM
1279 self.loop.quit()
1280
1281 def run_connect(self, *args):
1282 logger.debug("run_connect")
1283 args = dbus.Dictionary({ 'ssid': ssid,
1284 'key_mgmt': 'WPA-PSK',
1285 'psk': passphrase,
1286 'scan_freq': 2412 },
1287 signature='sv')
1288 self.netw = iface.AddNetwork(args)
1289 iface.SelectNetwork(self.netw)
1290 return False
1291
1292 def success(self):
1293 if not self.network_added or \
1294 not self.network_removed or \
1295 not self.network_selected:
1296 return False
13b34972 1297 return self.state == 9
2ec82e67
JM
1298
1299 with TestDbusConnect(bus) as t:
1300 if not t.success():
1301 raise Exception("Expected signals not seen")
1302
53f4ed68
JM
1303def test_dbus_connect_psk_mem(dev, apdev):
1304 """D-Bus AddNetwork and connect with memory-only PSK"""
1305 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1306 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1307
1308 ssid = "test-wpa2-psk"
1309 passphrase = 'qwertyuiop'
1310 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
8b8a1864 1311 hapd = hostapd.add_ap(apdev[0], params)
53f4ed68
JM
1312
1313 class TestDbusConnect(TestDbus):
1314 def __init__(self, bus):
1315 TestDbus.__init__(self, bus)
1316 self.connected = False
1317
1318 def __enter__(self):
1319 gobject.timeout_add(1, self.run_connect)
1320 gobject.timeout_add(15000, self.timeout)
1321 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1322 "PropertiesChanged")
1323 self.add_signal(self.networkRequest, WPAS_DBUS_IFACE,
1324 "NetworkRequest")
1325 self.loop.run()
1326 return self
1327
1328 def propertiesChanged(self, properties):
1329 logger.debug("propertiesChanged: %s" % str(properties))
1330 if 'State' in properties and properties['State'] == "completed":
1331 self.connected = True
1332 self.loop.quit()
1333
1334 def networkRequest(self, path, field, txt):
1335 logger.debug("networkRequest: %s %s %s" % (path, field, txt))
1336 if field == "PSK_PASSPHRASE":
1337 iface.NetworkReply(path, field, '"' + passphrase + '"')
1338
1339 def run_connect(self, *args):
1340 logger.debug("run_connect")
1341 args = dbus.Dictionary({ 'ssid': ssid,
1342 'key_mgmt': 'WPA-PSK',
1343 'mem_only_psk': 1,
1344 'scan_freq': 2412 },
1345 signature='sv')
1346 self.netw = iface.AddNetwork(args)
1347 iface.SelectNetwork(self.netw)
1348 return False
1349
1350 def success(self):
1351 return self.connected
1352
1353 with TestDbusConnect(bus) as t:
1354 if not t.success():
1355 raise Exception("Expected signals not seen")
1356
0e126c6d
JM
1357def test_dbus_connect_oom(dev, apdev):
1358 """D-Bus AddNetwork and connect when out-of-memory"""
1359 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1360 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1361
1362 if "OK" not in dev[0].request("TEST_ALLOC_FAIL 0:"):
1363 raise HwsimSkip("TEST_ALLOC_FAIL not supported in the build")
1364
1365 ssid = "test-wpa2-psk"
1366 passphrase = 'qwertyuiop'
1367 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
8b8a1864 1368 hapd = hostapd.add_ap(apdev[0], params)
0e126c6d
JM
1369
1370 class TestDbusConnect(TestDbus):
1371 def __init__(self, bus):
1372 TestDbus.__init__(self, bus)
1373 self.network_added = False
1374 self.network_selected = False
1375 self.network_removed = False
1376 self.state = 0
1377
1378 def __enter__(self):
1379 gobject.timeout_add(1, self.run_connect)
1380 gobject.timeout_add(1500, self.timeout)
1381 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
1382 self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE,
1383 "NetworkRemoved")
1384 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
1385 "NetworkSelected")
1386 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1387 "PropertiesChanged")
1388 self.loop.run()
1389 return self
1390
1391 def networkAdded(self, network, properties):
1392 logger.debug("networkAdded: %s" % str(network))
1393 logger.debug(str(properties))
1394 self.network_added = True
1395
1396 def networkRemoved(self, network):
1397 logger.debug("networkRemoved: %s" % str(network))
1398 self.network_removed = True
1399
1400 def networkSelected(self, network):
1401 logger.debug("networkSelected: %s" % str(network))
1402 self.network_selected = True
1403
1404 def propertiesChanged(self, properties):
1405 logger.debug("propertiesChanged: %s" % str(properties))
1406 if 'State' in properties and properties['State'] == "completed":
1407 if self.state == 0:
1408 self.state = 1
1409 iface.Disconnect()
1410 elif self.state == 2:
1411 self.state = 3
1412 iface.Disconnect()
1413 elif self.state == 4:
1414 self.state = 5
1415 iface.Reattach()
1416 elif self.state == 5:
1417 self.state = 6
1418 res = iface.SignalPoll()
1419 logger.debug("SignalPoll: " + str(res))
1420 if 'frequency' not in res or res['frequency'] != 2412:
1421 self.state = -1
1422 logger.info("Unexpected SignalPoll result")
1423 iface.RemoveNetwork(self.netw)
1424 if 'State' in properties and properties['State'] == "disconnected":
1425 if self.state == 1:
1426 self.state = 2
1427 iface.SelectNetwork(self.netw)
1428 elif self.state == 3:
1429 self.state = 4
1430 iface.Reassociate()
1431 elif self.state == 6:
1432 self.state = 7
1433 self.loop.quit()
1434
1435 def run_connect(self, *args):
1436 logger.debug("run_connect")
1437 args = dbus.Dictionary({ 'ssid': ssid,
1438 'key_mgmt': 'WPA-PSK',
1439 'psk': passphrase,
1440 'scan_freq': 2412 },
1441 signature='sv')
1442 try:
1443 self.netw = iface.AddNetwork(args)
bab493b9 1444 except Exception as e:
0e126c6d
JM
1445 logger.info("Exception on AddNetwork: " + str(e))
1446 self.loop.quit()
1447 return False
1448 try:
1449 iface.SelectNetwork(self.netw)
bab493b9 1450 except Exception as e:
0e126c6d
JM
1451 logger.info("Exception on SelectNetwork: " + str(e))
1452 self.loop.quit()
1453
1454 return False
1455
1456 def success(self):
1457 if not self.network_added or \
1458 not self.network_removed or \
1459 not self.network_selected:
1460 return False
1461 return self.state == 7
1462
1463 count = 0
1464 for i in range(1, 1000):
1465 for j in range(3):
1466 dev[j].dump_monitor()
1467 dev[0].request("TEST_ALLOC_FAIL %d:main" % i)
1468 try:
1469 with TestDbusConnect(bus) as t:
1470 if not t.success():
1471 logger.info("Iteration %d - Expected signals not seen" % i)
1472 else:
1473 logger.info("Iteration %d - success" % i)
1474
1475 state = dev[0].request('GET_ALLOC_FAIL')
1476 logger.info("GET_ALLOC_FAIL: " + state)
1477 dev[0].dump_monitor()
1478 dev[0].request("TEST_ALLOC_FAIL 0:")
1479 if i < 3:
1480 raise Exception("Connection succeeded during out-of-memory")
1481 if not state.startswith('0:'):
1482 count += 1
1483 if count == 5:
1484 break
1485 except:
1486 pass
2ec82e67 1487
89a79ca2
JM
1488 # Force regulatory update to re-fetch hw capabilities for the following
1489 # test cases.
1490 try:
1491 dev[0].dump_monitor()
1492 subprocess.call(['iw', 'reg', 'set', 'US'])
1493 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1494 finally:
1495 dev[0].dump_monitor()
1496 subprocess.call(['iw', 'reg', 'set', '00'])
1497 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1498
2ec82e67
JM
1499def test_dbus_while_not_connected(dev, apdev):
1500 """D-Bus invalid operations while not connected"""
910eb5b5 1501 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1502 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1503
1504 try:
1505 iface.Disconnect()
1506 raise Exception("Disconnect() accepted when not connected")
bab493b9 1507 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1508 if "NotConnected" not in str(e):
1509 raise Exception("Unexpected error message for invalid Disconnect: " + str(e))
1510
1511 try:
1512 iface.Reattach()
1513 raise Exception("Reattach() accepted when not connected")
bab493b9 1514 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1515 if "NotConnected" not in str(e):
1516 raise Exception("Unexpected error message for invalid Reattach: " + str(e))
1517
1518def test_dbus_connect_eap(dev, apdev):
1519 """D-Bus AddNetwork and connect to EAP network"""
089e7ca3 1520 check_altsubject_match_support(dev[0])
910eb5b5 1521 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1522 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1523
1524 ssid = "ieee8021x-open"
1525 params = hostapd.radius_params()
1526 params["ssid"] = ssid
1527 params["ieee8021x"] = "1"
8b8a1864 1528 hapd = hostapd.add_ap(apdev[0], params)
2ec82e67
JM
1529
1530 class TestDbusConnect(TestDbus):
1531 def __init__(self, bus):
1532 TestDbus.__init__(self, bus)
1533 self.certification_received = False
1534 self.eap_status = False
1535 self.state = 0
1536
1537 def __enter__(self):
1538 gobject.timeout_add(1, self.run_connect)
1539 gobject.timeout_add(15000, self.timeout)
1540 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1541 "PropertiesChanged")
1542 self.add_signal(self.certification, WPAS_DBUS_IFACE,
2099fed4 1543 "Certification", byte_arrays=True)
2ec82e67
JM
1544 self.add_signal(self.networkRequest, WPAS_DBUS_IFACE,
1545 "NetworkRequest")
1546 self.add_signal(self.eap, WPAS_DBUS_IFACE, "EAP")
1547 self.loop.run()
1548 return self
1549
1550 def propertiesChanged(self, properties):
1551 logger.debug("propertiesChanged: %s" % str(properties))
1552 if 'State' in properties and properties['State'] == "completed":
1553 if self.state == 0:
1554 self.state = 1
1555 iface.EAPLogoff()
2099fed4
JM
1556 logger.info("Set dNSName constraint")
1557 net_obj = bus.get_object(WPAS_DBUS_SERVICE, self.netw)
1558 args = dbus.Dictionary({ 'altsubject_match':
1559 self.server_dnsname },
1560 signature='sv')
1561 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1562 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67
JM
1563 elif self.state == 2:
1564 self.state = 3
2099fed4
JM
1565 iface.Disconnect()
1566 logger.info("Set non-matching dNSName constraint")
1567 net_obj = bus.get_object(WPAS_DBUS_SERVICE, self.netw)
1568 args = dbus.Dictionary({ 'altsubject_match':
1569 self.server_dnsname + "FOO" },
1570 signature='sv')
1571 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1572 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67
JM
1573 if 'State' in properties and properties['State'] == "disconnected":
1574 if self.state == 1:
1575 self.state = 2
1576 iface.EAPLogon()
1577 iface.SelectNetwork(self.netw)
2099fed4
JM
1578 if self.state == 3:
1579 self.state = 4
1580 iface.SelectNetwork(self.netw)
2ec82e67
JM
1581
1582 def certification(self, args):
1583 logger.debug("certification: %s" % str(args))
1584 self.certification_received = True
2099fed4
JM
1585 if args['depth'] == 0:
1586 # The test server certificate is supposed to have dNSName
1587 if len(args['altsubject']) < 1:
1588 raise Exception("Missing dNSName")
1589 dnsname = args['altsubject'][0]
1590 if not dnsname.startswith("DNS:"):
1591 raise Exception("Expected dNSName not found: " + dnsname)
1592 logger.info("altsubject: " + dnsname)
1593 self.server_dnsname = dnsname
2ec82e67
JM
1594
1595 def eap(self, status, parameter):
1596 logger.debug("EAP: status=%s parameter=%s" % (status, parameter))
1597 if status == 'completion' and parameter == 'success':
1598 self.eap_status = True
2099fed4
JM
1599 if self.state == 4 and status == 'remote certificate verification' and parameter == 'AltSubject mismatch':
1600 self.state = 5
1601 self.loop.quit()
2ec82e67
JM
1602
1603 def networkRequest(self, path, field, txt):
1604 logger.debug("networkRequest: %s %s %s" % (path, field, txt))
1605 if field == "PASSWORD":
1606 iface.NetworkReply(path, field, "password")
1607
1608 def run_connect(self, *args):
1609 logger.debug("run_connect")
1610 args = dbus.Dictionary({ 'ssid': ssid,
1611 'key_mgmt': 'IEEE8021X',
1612 'eapol_flags': 0,
1613 'eap': 'TTLS',
1614 'anonymous_identity': 'ttls',
1615 'identity': 'pap user',
1616 'ca_cert': 'auth_serv/ca.pem',
1617 'phase2': 'auth=PAP',
1618 'scan_freq': 2412 },
1619 signature='sv')
1620 self.netw = iface.AddNetwork(args)
1621 iface.SelectNetwork(self.netw)
1622 return False
1623
1624 def success(self):
1625 if not self.eap_status or not self.certification_received:
1626 return False
2099fed4 1627 return self.state == 5
2ec82e67
JM
1628
1629 with TestDbusConnect(bus) as t:
1630 if not t.success():
1631 raise Exception("Expected signals not seen")
1632
1633def test_dbus_network(dev, apdev):
1634 """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
910eb5b5 1635 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1636 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1637
1638 args = dbus.Dictionary({ 'ssid': "foo",
1639 'key_mgmt': 'WPA-PSK',
1640 'psk': "12345678",
1641 'identity': dbus.ByteArray([ 1, 2 ]),
1642 'priority': dbus.Int32(0),
1643 'scan_freq': dbus.UInt32(2412) },
1644 signature='sv')
1645 netw = iface.AddNetwork(args)
5a233fbd
JM
1646 id = int(dev[0].list_networks()[0]['id'])
1647 val = dev[0].get_network(id, "scan_freq")
1648 if val != "2412":
1649 raise Exception("Invalid scan_freq value: " + str(val))
1650 iface.RemoveNetwork(netw)
1651
1652 args = dbus.Dictionary({ 'ssid': "foo",
1653 'key_mgmt': 'NONE',
1654 'scan_freq': "2412 2432",
1655 'freq_list': "2412 2417 2432" },
1656 signature='sv')
1657 netw = iface.AddNetwork(args)
1658 id = int(dev[0].list_networks()[0]['id'])
1659 val = dev[0].get_network(id, "scan_freq")
1660 if val != "2412 2432":
1661 raise Exception("Invalid scan_freq value (2): " + str(val))
1662 val = dev[0].get_network(id, "freq_list")
1663 if val != "2412 2417 2432":
1664 raise Exception("Invalid freq_list value: " + str(val))
2ec82e67
JM
1665 iface.RemoveNetwork(netw)
1666 try:
1667 iface.RemoveNetwork(netw)
1668 raise Exception("Invalid RemoveNetwork() accepted")
bab493b9 1669 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1670 if "NetworkUnknown" not in str(e):
1671 raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e))
1672 try:
1673 iface.SelectNetwork(netw)
1674 raise Exception("Invalid SelectNetwork() accepted")
bab493b9 1675 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1676 if "NetworkUnknown" not in str(e):
1677 raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e))
1678
1679 args = dbus.Dictionary({ 'ssid': "foo1", 'key_mgmt': 'NONE',
1680 'identity': "testuser", 'scan_freq': '2412' },
1681 signature='sv')
1682 netw1 = iface.AddNetwork(args)
1683 args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1684 signature='sv')
1685 netw2 = iface.AddNetwork(args)
1686 res = if_obj.Get(WPAS_DBUS_IFACE, "Networks",
1687 dbus_interface=dbus.PROPERTIES_IFACE)
1688 if len(res) != 2:
1689 raise Exception("Unexpected number of networks")
1690
1691 net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1)
1692 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1693 dbus_interface=dbus.PROPERTIES_IFACE)
1694 if res != False:
1695 raise Exception("Added network was unexpectedly enabled by default")
1696 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(True),
1697 dbus_interface=dbus.PROPERTIES_IFACE)
1698 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1699 dbus_interface=dbus.PROPERTIES_IFACE)
1700 if res != True:
1701 raise Exception("Set(Enabled,True) did not seem to change property value")
1702 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(False),
1703 dbus_interface=dbus.PROPERTIES_IFACE)
1704 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1705 dbus_interface=dbus.PROPERTIES_IFACE)
1706 if res != False:
1707 raise Exception("Set(Enabled,False) did not seem to change property value")
1708 try:
1709 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.UInt32(1),
1710 dbus_interface=dbus.PROPERTIES_IFACE)
1711 raise Exception("Invalid Set(Enabled,1) accepted")
bab493b9 1712 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1713 if "Error.Failed: wrong property type" not in str(e):
1714 raise Exception("Unexpected error message for invalid Set(Enabled,1): " + str(e))
1715
1716 args = dbus.Dictionary({ 'ssid': "foo1new" }, signature='sv')
1717 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1718 dbus_interface=dbus.PROPERTIES_IFACE)
1719 res = net_obj.Get(WPAS_DBUS_NETWORK, "Properties",
1720 dbus_interface=dbus.PROPERTIES_IFACE)
1721 if res['ssid'] != '"foo1new"':
1722 raise Exception("Set(Properties) failed to update ssid")
1723 if res['identity'] != '"testuser"':
1724 raise Exception("Set(Properties) unexpectedly changed unrelated parameter")
1725
1726 iface.RemoveAllNetworks()
1727 res = if_obj.Get(WPAS_DBUS_IFACE, "Networks",
1728 dbus_interface=dbus.PROPERTIES_IFACE)
1729 if len(res) != 0:
1730 raise Exception("Unexpected number of networks")
1731 iface.RemoveAllNetworks()
1732
1733 tests = [ dbus.Dictionary({ 'psk': "1234567" }, signature='sv'),
1734 dbus.Dictionary({ 'identity': dbus.ByteArray() },
1735 signature='sv'),
1736 dbus.Dictionary({ 'identity': dbus.Byte(1) }, signature='sv'),
1737 dbus.Dictionary({ 'identity': "" }, signature='sv') ]
1738 for args in tests:
1739 try:
1740 iface.AddNetwork(args)
1741 raise Exception("Invalid AddNetwork args accepted: " + str(args))
bab493b9 1742 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1743 if "InvalidArgs" not in str(e):
1744 raise Exception("Unexpected error message for invalid AddNetwork: " + str(e))
1745
0e126c6d
JM
1746def test_dbus_network_oom(dev, apdev):
1747 """D-Bus AddNetwork/RemoveNetwork parameters and OOM error cases"""
1748 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1749 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1750
1751 args = dbus.Dictionary({ 'ssid': "foo1", 'key_mgmt': 'NONE',
1752 'identity': "testuser", 'scan_freq': '2412' },
1753 signature='sv')
1754 netw1 = iface.AddNetwork(args)
1755 net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1)
1756
1757 with alloc_fail_dbus(dev[0], 1,
1758 "wpa_config_get_all;wpas_dbus_getter_network_properties",
1759 "Get"):
1760 net_obj.Get(WPAS_DBUS_NETWORK, "Properties",
1761 dbus_interface=dbus.PROPERTIES_IFACE)
1762
1763 iface.RemoveAllNetworks()
1764
1765 with alloc_fail_dbus(dev[0], 1,
1766 "wpas_dbus_new_decompose_object_path;wpas_dbus_handler_remove_network",
1767 "RemoveNetwork", "InvalidArgs"):
1768 iface.RemoveNetwork(dbus.ObjectPath("/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234"))
1769
1770 with alloc_fail(dev[0], 1, "wpa_dbus_register_object_per_iface;wpas_dbus_register_network"):
1771 args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1772 signature='sv')
1773 try:
1774 netw = iface.AddNetwork(args)
1775 # Currently, AddNetwork() succeeds even if os_strdup() for path
1776 # fails, so remove the network if that occurs.
1777 iface.RemoveNetwork(netw)
bab493b9 1778 except dbus.exceptions.DBusException as e:
0e126c6d
JM
1779 pass
1780
1781 for i in range(1, 3):
1782 with alloc_fail(dev[0], i, "=wpas_dbus_register_network"):
1783 try:
1784 netw = iface.AddNetwork(args)
1785 # Currently, AddNetwork() succeeds even if network registration
1786 # fails, so remove the network if that occurs.
1787 iface.RemoveNetwork(netw)
bab493b9 1788 except dbus.exceptions.DBusException as e:
0e126c6d
JM
1789 pass
1790
1791 with alloc_fail_dbus(dev[0], 1,
1792 "=wpa_config_add_network;wpas_dbus_handler_add_network",
1793 "AddNetwork",
1794 "UnknownError: wpa_supplicant could not add a network"):
1795 args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1796 signature='sv')
1797 netw = iface.AddNetwork(args)
1798
1799 tests = [ (1,
1800 'wpa_dbus_dict_get_entry;set_network_properties;wpas_dbus_handler_add_network',
15dfcb69 1801 dbus.Dictionary({ 'ssid': dbus.ByteArray(b' ') },
0e126c6d
JM
1802 signature='sv')),
1803 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1804 dbus.Dictionary({ 'ssid': 'foo' }, signature='sv')),
1805 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1806 dbus.Dictionary({ 'eap': 'foo' }, signature='sv')),
1807 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1808 dbus.Dictionary({ 'priority': dbus.UInt32(1) },
1809 signature='sv')),
1810 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1811 dbus.Dictionary({ 'priority': dbus.Int32(1) },
1812 signature='sv')),
1813 (1, '=set_network_properties;wpas_dbus_handler_add_network',
15dfcb69 1814 dbus.Dictionary({ 'ssid': dbus.ByteArray(b' ') },
0e126c6d
JM
1815 signature='sv')) ]
1816 for (count,funcs,args) in tests:
1817 with alloc_fail_dbus(dev[0], count, funcs, "AddNetwork", "InvalidArgs"):
1818 netw = iface.AddNetwork(args)
1819
1820 if len(if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
1821 dbus_interface=dbus.PROPERTIES_IFACE)) > 0:
1822 raise Exception("Unexpected network block added")
1823 if len(dev[0].list_networks()) > 0:
1824 raise Exception("Unexpected network block visible")
1825
2ec82e67
JM
1826def test_dbus_interface(dev, apdev):
1827 """D-Bus CreateInterface/GetInterface/RemoveInterface parameters and error cases"""
7966674d
JM
1828 try:
1829 _test_dbus_interface(dev, apdev)
1830 finally:
1831 # Need to force P2P channel list update since the 'lo' interface
1832 # with driver=none ends up configuring default dualband channels.
1833 dev[0].request("SET country US")
1834 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1835 if ev is None:
1836 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"],
1837 timeout=1)
1838 dev[0].request("SET country 00")
1839 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1840 if ev is None:
1841 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"],
1842 timeout=1)
1843 subprocess.call(['iw', 'reg', 'set', '00'])
1844
1845def _test_dbus_interface(dev, apdev):
910eb5b5 1846 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1847 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
1848
1849 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1850 signature='sv')
1851 path = wpas.CreateInterface(params)
1852 logger.debug("New interface path: " + str(path))
1853 path2 = wpas.GetInterface("lo")
1854 if path != path2:
1855 raise Exception("Interface object mismatch")
1856
1857 params = dbus.Dictionary({ 'Ifname': 'lo',
1858 'Driver': 'none',
1859 'ConfigFile': 'foo',
1860 'BridgeIfname': 'foo', },
1861 signature='sv')
1862 try:
1863 wpas.CreateInterface(params)
1864 raise Exception("Invalid CreateInterface() accepted")
bab493b9 1865 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1866 if "InterfaceExists" not in str(e):
1867 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1868
1869 wpas.RemoveInterface(path)
1870 try:
1871 wpas.RemoveInterface(path)
1872 raise Exception("Invalid RemoveInterface() accepted")
bab493b9 1873 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1874 if "InterfaceUnknown" not in str(e):
1875 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e))
1876
1877 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none',
1878 'Foo': 123 },
1879 signature='sv')
1880 try:
1881 wpas.CreateInterface(params)
1882 raise Exception("Invalid CreateInterface() accepted")
bab493b9 1883 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1884 if "InvalidArgs" not in str(e):
1885 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1886
1887 params = dbus.Dictionary({ 'Driver': 'none' }, signature='sv')
1888 try:
1889 wpas.CreateInterface(params)
1890 raise Exception("Invalid CreateInterface() accepted")
bab493b9 1891 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1892 if "InvalidArgs" not in str(e):
1893 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1894
1895 try:
1896 wpas.GetInterface("lo")
1897 raise Exception("Invalid GetInterface() accepted")
bab493b9 1898 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1899 if "InterfaceUnknown" not in str(e):
1900 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e))
1901
0e126c6d
JM
1902def test_dbus_interface_oom(dev, apdev):
1903 """D-Bus CreateInterface/GetInterface/RemoveInterface OOM error cases"""
1904 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1905 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
1906
1907 with alloc_fail_dbus(dev[0], 1, "wpa_dbus_dict_get_entry;wpas_dbus_handler_create_interface", "CreateInterface", "InvalidArgs"):
1908 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1909 signature='sv')
1910 wpas.CreateInterface(params)
1911
1912 for i in range(1, 1000):
1913 dev[0].request("TEST_ALLOC_FAIL %d:wpa_supplicant_add_iface;wpas_dbus_handler_create_interface" % i)
1914 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1915 signature='sv')
1916 try:
1917 npath = wpas.CreateInterface(params)
1918 wpas.RemoveInterface(npath)
1919 logger.info("CreateInterface succeeds after %d allocation failures" % i)
1920 state = dev[0].request('GET_ALLOC_FAIL')
1921 logger.info("GET_ALLOC_FAIL: " + state)
1922 dev[0].dump_monitor()
1923 dev[0].request("TEST_ALLOC_FAIL 0:")
1924 if i < 5:
1925 raise Exception("CreateInterface succeeded during out-of-memory")
1926 if not state.startswith('0:'):
1927 break
bab493b9 1928 except dbus.exceptions.DBusException as e:
0e126c6d
JM
1929 pass
1930
1931 for arg in [ 'Driver', 'Ifname', 'ConfigFile', 'BridgeIfname' ]:
1932 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_handler_create_interface",
1933 "CreateInterface"):
1934 params = dbus.Dictionary({ arg: 'foo' }, signature='sv')
1935 wpas.CreateInterface(params)
1936
2ec82e67
JM
1937def test_dbus_blob(dev, apdev):
1938 """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
910eb5b5 1939 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1940 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1941
15dfcb69 1942 blob = dbus.ByteArray(b"\x01\x02\x03")
2ec82e67
JM
1943 iface.AddBlob('blob1', blob)
1944 try:
15dfcb69 1945 iface.AddBlob('blob1', dbus.ByteArray(b"\x01\x02\x04"))
2ec82e67 1946 raise Exception("Invalid AddBlob() accepted")
bab493b9 1947 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1948 if "BlobExists" not in str(e):
1949 raise Exception("Unexpected error message for invalid AddBlob: " + str(e))
1950 res = iface.GetBlob('blob1')
1951 if len(res) != len(blob):
1952 raise Exception("Unexpected blob data length")
1953 for i in range(len(res)):
1954 if res[i] != dbus.Byte(blob[i]):
1955 raise Exception("Unexpected blob data")
1956 res = if_obj.Get(WPAS_DBUS_IFACE, "Blobs",
1957 dbus_interface=dbus.PROPERTIES_IFACE)
1958 if 'blob1' not in res:
1959 raise Exception("Added blob missing from Blobs property")
1960 iface.RemoveBlob('blob1')
1961 try:
1962 iface.RemoveBlob('blob1')
1963 raise Exception("Invalid RemoveBlob() accepted")
bab493b9 1964 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1965 if "BlobUnknown" not in str(e):
1966 raise Exception("Unexpected error message for invalid RemoveBlob: " + str(e))
1967 try:
1968 iface.GetBlob('blob1')
1969 raise Exception("Invalid GetBlob() accepted")
bab493b9 1970 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1971 if "BlobUnknown" not in str(e):
1972 raise Exception("Unexpected error message for invalid GetBlob: " + str(e))
1973
1974 class TestDbusBlob(TestDbus):
1975 def __init__(self, bus):
1976 TestDbus.__init__(self, bus)
1977 self.blob_added = False
1978 self.blob_removed = False
1979
1980 def __enter__(self):
1981 gobject.timeout_add(1, self.run_blob)
1982 gobject.timeout_add(15000, self.timeout)
1983 self.add_signal(self.blobAdded, WPAS_DBUS_IFACE, "BlobAdded")
1984 self.add_signal(self.blobRemoved, WPAS_DBUS_IFACE, "BlobRemoved")
1985 self.loop.run()
1986 return self
1987
1988 def blobAdded(self, blobName):
1989 logger.debug("blobAdded: %s" % blobName)
1990 if blobName == 'blob2':
1991 self.blob_added = True
1992
1993 def blobRemoved(self, blobName):
1994 logger.debug("blobRemoved: %s" % blobName)
1995 if blobName == 'blob2':
1996 self.blob_removed = True
1997 self.loop.quit()
1998
1999 def run_blob(self, *args):
2000 logger.debug("run_blob")
15dfcb69 2001 iface.AddBlob('blob2', dbus.ByteArray(b"\x01\x02\x04"))
2ec82e67
JM
2002 iface.RemoveBlob('blob2')
2003 return False
2004
2005 def success(self):
2006 return self.blob_added and self.blob_removed
2007
2008 with TestDbusBlob(bus) as t:
2009 if not t.success():
2010 raise Exception("Expected signals not seen")
2011
0e126c6d
JM
2012def test_dbus_blob_oom(dev, apdev):
2013 """D-Bus AddNetwork/RemoveNetwork OOM error cases"""
2014 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2015 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2016
2017 for i in range(1, 4):
2018 with alloc_fail_dbus(dev[0], i, "wpas_dbus_handler_add_blob",
2019 "AddBlob"):
15dfcb69 2020 iface.AddBlob('blob_no_mem', dbus.ByteArray(b"\x01\x02\x03\x04"))
0e126c6d 2021
2ec82e67
JM
2022def test_dbus_autoscan(dev, apdev):
2023 """D-Bus Autoscan()"""
910eb5b5 2024 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2025 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2026
2027 iface.AutoScan("foo")
2028 iface.AutoScan("periodic:1")
2029 iface.AutoScan("")
2030 dev[0].request("AUTOSCAN ")
2031
0e126c6d
JM
2032def test_dbus_autoscan_oom(dev, apdev):
2033 """D-Bus Autoscan() OOM"""
2034 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2035 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2036
2037 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_autoscan", "AutoScan"):
2038 iface.AutoScan("foo")
2039 dev[0].request("AUTOSCAN ")
2040
2ec82e67
JM
2041def test_dbus_tdls_invalid(dev, apdev):
2042 """D-Bus invalid TDLS operations"""
910eb5b5 2043 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2044 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2045
8b8a1864 2046 hapd = hostapd.add_ap(apdev[0], { "ssid": "test-open" })
2ec82e67
JM
2047 connect_2sta_open(dev, hapd)
2048 addr1 = dev[1].p2p_interface_addr()
2049
2050 try:
2051 iface.TDLSDiscover("foo")
2052 raise Exception("Invalid TDLSDiscover() accepted")
bab493b9 2053 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2054 if "InvalidArgs" not in str(e):
2055 raise Exception("Unexpected error message for invalid TDLSDiscover: " + str(e))
2056
2057 try:
2058 iface.TDLSStatus("foo")
2059 raise Exception("Invalid TDLSStatus() accepted")
bab493b9 2060 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2061 if "InvalidArgs" not in str(e):
2062 raise Exception("Unexpected error message for invalid TDLSStatus: " + str(e))
2063
2064 res = iface.TDLSStatus(addr1)
2065 if res != "peer does not exist":
2066 raise Exception("Unexpected TDLSStatus response")
2067
2068 try:
2069 iface.TDLSSetup("foo")
2070 raise Exception("Invalid TDLSSetup() accepted")
bab493b9 2071 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2072 if "InvalidArgs" not in str(e):
2073 raise Exception("Unexpected error message for invalid TDLSSetup: " + str(e))
2074
2075 try:
2076 iface.TDLSTeardown("foo")
2077 raise Exception("Invalid TDLSTeardown() accepted")
bab493b9 2078 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2079 if "InvalidArgs" not in str(e):
2080 raise Exception("Unexpected error message for invalid TDLSTeardown: " + str(e))
2081
795b6f57
JM
2082 try:
2083 iface.TDLSTeardown("00:11:22:33:44:55")
2084 raise Exception("TDLSTeardown accepted for unknown peer")
bab493b9 2085 except dbus.exceptions.DBusException as e:
795b6f57
JM
2086 if "UnknownError: error performing TDLS teardown" not in str(e):
2087 raise Exception("Unexpected error message: " + str(e))
2088
82aea622
JM
2089 try:
2090 iface.TDLSChannelSwitch({})
2091 raise Exception("Invalid TDLSChannelSwitch() accepted")
bab493b9 2092 except dbus.exceptions.DBusException as e:
82aea622
JM
2093 if "InvalidArgs" not in str(e):
2094 raise Exception("Unexpected error message for invalid TDLSChannelSwitch: " + str(e))
2095
2096 try:
2097 iface.TDLSCancelChannelSwitch("foo")
2098 raise Exception("Invalid TDLSCancelChannelSwitch() accepted")
bab493b9 2099 except dbus.exceptions.DBusException as e:
82aea622
JM
2100 if "InvalidArgs" not in str(e):
2101 raise Exception("Unexpected error message for invalid TDLSCancelChannelSwitch: " + str(e))
2102
0e126c6d
JM
2103def test_dbus_tdls_oom(dev, apdev):
2104 """D-Bus TDLS operations during OOM"""
2105 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2106 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2107
2108 with alloc_fail_dbus(dev[0], 1, "wpa_tdls_add_peer", "TDLSSetup",
2109 "UnknownError: error performing TDLS setup"):
2110 iface.TDLSSetup("00:11:22:33:44:55")
2111
2ec82e67
JM
2112def test_dbus_tdls(dev, apdev):
2113 """D-Bus TDLS"""
910eb5b5 2114 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2115 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2116
8b8a1864 2117 hapd = hostapd.add_ap(apdev[0], { "ssid": "test-open" })
2ec82e67
JM
2118 connect_2sta_open(dev, hapd)
2119
2120 addr1 = dev[1].p2p_interface_addr()
2121
2122 class TestDbusTdls(TestDbus):
2123 def __init__(self, bus):
2124 TestDbus.__init__(self, bus)
2125 self.tdls_setup = False
2126 self.tdls_teardown = False
2127
2128 def __enter__(self):
2129 gobject.timeout_add(1, self.run_tdls)
2130 gobject.timeout_add(15000, self.timeout)
2131 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
2132 "PropertiesChanged")
2133 self.loop.run()
2134 return self
2135
2136 def propertiesChanged(self, properties):
2137 logger.debug("propertiesChanged: %s" % str(properties))
2138
2139 def run_tdls(self, *args):
2140 logger.debug("run_tdls")
2141 iface.TDLSDiscover(addr1)
2142 gobject.timeout_add(100, self.run_tdls2)
2143 return False
2144
2145 def run_tdls2(self, *args):
2146 logger.debug("run_tdls2")
2147 iface.TDLSSetup(addr1)
2148 gobject.timeout_add(500, self.run_tdls3)
2149 return False
2150
2151 def run_tdls3(self, *args):
2152 logger.debug("run_tdls3")
2153 res = iface.TDLSStatus(addr1)
2154 if res == "connected":
2155 self.tdls_setup = True
2156 else:
2157 logger.info("Unexpected TDLSStatus: " + res)
2158 iface.TDLSTeardown(addr1)
2159 gobject.timeout_add(200, self.run_tdls4)
2160 return False
2161
2162 def run_tdls4(self, *args):
2163 logger.debug("run_tdls4")
2164 res = iface.TDLSStatus(addr1)
2165 if res == "peer does not exist":
2166 self.tdls_teardown = True
2167 else:
2168 logger.info("Unexpected TDLSStatus: " + res)
2169 self.loop.quit()
2170 return False
2171
2172 def success(self):
2173 return self.tdls_setup and self.tdls_teardown
2174
2175 with TestDbusTdls(bus) as t:
2176 if not t.success():
2177 raise Exception("Expected signals not seen")
2178
82aea622
JM
2179def test_dbus_tdls_channel_switch(dev, apdev):
2180 """D-Bus TDLS channel switch configuration"""
79467d74
JM
2181 flags = int(dev[0].get_driver_status_field('capa.flags'), 16)
2182 if flags & 0x800000000 == 0:
2183 raise HwsimSkip("Driver does not support TDLS channel switching")
2184
82aea622
JM
2185 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2186 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2187
2188 hapd = hostapd.add_ap(apdev[0], { "ssid": "test-open" })
2189 connect_2sta_open(dev, hapd)
2190
2191 addr1 = dev[1].p2p_interface_addr()
2192
2193 class TestDbusTdls(TestDbus):
2194 def __init__(self, bus):
2195 TestDbus.__init__(self, bus)
2196 self.tdls_setup = False
2197 self.tdls_done = False
2198
2199 def __enter__(self):
2200 gobject.timeout_add(1, self.run_tdls)
2201 gobject.timeout_add(15000, self.timeout)
2202 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
2203 "PropertiesChanged")
2204 self.loop.run()
2205 return self
2206
2207 def propertiesChanged(self, properties):
2208 logger.debug("propertiesChanged: %s" % str(properties))
2209
2210 def run_tdls(self, *args):
2211 logger.debug("run_tdls")
2212 iface.TDLSDiscover(addr1)
2213 gobject.timeout_add(100, self.run_tdls2)
2214 return False
2215
2216 def run_tdls2(self, *args):
2217 logger.debug("run_tdls2")
2218 iface.TDLSSetup(addr1)
2219 gobject.timeout_add(500, self.run_tdls3)
2220 return False
2221
2222 def run_tdls3(self, *args):
2223 logger.debug("run_tdls3")
2224 res = iface.TDLSStatus(addr1)
2225 if res == "connected":
2226 self.tdls_setup = True
2227 else:
2228 logger.info("Unexpected TDLSStatus: " + res)
2229
2230 # Unknown dict entry
2231 args = dbus.Dictionary({ 'Foobar': dbus.Byte(1) },
2232 signature='sv')
2233 try:
2234 iface.TDLSChannelSwitch(args)
bab493b9 2235 except Exception as e:
82aea622
JM
2236 if "InvalidArgs" not in str(e):
2237 raise Exception("Unexpected exception")
2238
2239 # Missing OperClass
2240 args = dbus.Dictionary({}, signature='sv')
2241 try:
2242 iface.TDLSChannelSwitch(args)
bab493b9 2243 except Exception as e:
82aea622
JM
2244 if "InvalidArgs" not in str(e):
2245 raise Exception("Unexpected exception")
2246
2247 # Missing Frequency
2248 args = dbus.Dictionary({ 'OperClass': dbus.Byte(1) },
2249 signature='sv')
2250 try:
2251 iface.TDLSChannelSwitch(args)
bab493b9 2252 except Exception as e:
82aea622
JM
2253 if "InvalidArgs" not in str(e):
2254 raise Exception("Unexpected exception")
2255
2256 # Missing PeerAddress
2257 args = dbus.Dictionary({ 'OperClass': dbus.Byte(1),
2258 'Frequency': dbus.UInt32(2417) },
2259 signature='sv')
2260 try:
2261 iface.TDLSChannelSwitch(args)
bab493b9 2262 except Exception as e:
82aea622
JM
2263 if "InvalidArgs" not in str(e):
2264 raise Exception("Unexpected exception")
2265
2266 # Valid parameters
2267 args = dbus.Dictionary({ 'OperClass': dbus.Byte(1),
2268 'Frequency': dbus.UInt32(2417),
2269 'PeerAddress': addr1,
2270 'SecChannelOffset': dbus.UInt32(0),
2271 'CenterFrequency1': dbus.UInt32(0),
2272 'CenterFrequency2': dbus.UInt32(0),
2273 'Bandwidth': dbus.UInt32(20),
2274 'HT': dbus.Boolean(False),
2275 'VHT': dbus.Boolean(False) },
2276 signature='sv')
2277 iface.TDLSChannelSwitch(args)
2278
2279 gobject.timeout_add(200, self.run_tdls4)
2280 return False
2281
2282 def run_tdls4(self, *args):
2283 logger.debug("run_tdls4")
2284 iface.TDLSCancelChannelSwitch(addr1)
2285 self.tdls_done = True
2286 self.loop.quit()
2287 return False
2288
2289 def success(self):
2290 return self.tdls_setup and self.tdls_done
2291
2292 with TestDbusTdls(bus) as t:
2293 if not t.success():
2294 raise Exception("Expected signals not seen")
2295
2ec82e67
JM
2296def test_dbus_pkcs11(dev, apdev):
2297 """D-Bus SetPKCS11EngineAndModulePath()"""
910eb5b5 2298 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2299 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2300
2301 try:
2302 iface.SetPKCS11EngineAndModulePath("foo", "bar")
bab493b9 2303 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2304 if "Error.Failed: Reinit of the EAPOL" not in str(e):
2305 raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e))
2306
2307 try:
2308 iface.SetPKCS11EngineAndModulePath("foo", "")
bab493b9 2309 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2310 if "Error.Failed: Reinit of the EAPOL" not in str(e):
2311 raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e))
2312
2313 iface.SetPKCS11EngineAndModulePath("", "bar")
2314 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath",
2315 dbus_interface=dbus.PROPERTIES_IFACE)
2316 if res != "":
2317 raise Exception("Unexpected PKCS11EnginePath value: " + res)
2318 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath",
2319 dbus_interface=dbus.PROPERTIES_IFACE)
2320 if res != "bar":
2321 raise Exception("Unexpected PKCS11ModulePath value: " + res)
2322
2323 iface.SetPKCS11EngineAndModulePath("", "")
2324 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath",
2325 dbus_interface=dbus.PROPERTIES_IFACE)
2326 if res != "":
2327 raise Exception("Unexpected PKCS11EnginePath value: " + res)
2328 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath",
2329 dbus_interface=dbus.PROPERTIES_IFACE)
2330 if res != "":
2331 raise Exception("Unexpected PKCS11ModulePath value: " + res)
2332
2333def test_dbus_apscan(dev, apdev):
2334 """D-Bus Get/Set ApScan"""
2335 try:
81e787b7 2336 _test_dbus_apscan(dev, apdev)
2ec82e67
JM
2337 finally:
2338 dev[0].request("AP_SCAN 1")
2339
2340def _test_dbus_apscan(dev, apdev):
910eb5b5 2341 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2342
2343 res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan",
2344 dbus_interface=dbus.PROPERTIES_IFACE)
2345 if res != 1:
2346 raise Exception("Unexpected initial ApScan value: %d" % res)
2347
2348 for i in range(3):
2349 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(i),
2350 dbus_interface=dbus.PROPERTIES_IFACE)
2351 res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan",
2352 dbus_interface=dbus.PROPERTIES_IFACE)
2353 if res != i:
2354 raise Exception("Unexpected ApScan value %d (expected %d)" % (res, i))
2355
2356 try:
2357 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.Int16(-1),
2358 dbus_interface=dbus.PROPERTIES_IFACE)
2359 raise Exception("Invalid Set(ApScan,-1) accepted")
bab493b9 2360 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2361 if "Error.Failed: wrong property type" not in str(e):
2362 raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e))
2363
2364 try:
2365 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(123),
2366 dbus_interface=dbus.PROPERTIES_IFACE)
2367 raise Exception("Invalid Set(ApScan,123) accepted")
bab493b9 2368 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2369 if "Error.Failed: ap_scan must be 0, 1, or 2" not in str(e):
2370 raise Exception("Unexpected error message for invalid Set(ApScan,123): " + str(e))
2371
2372 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(1),
2373 dbus_interface=dbus.PROPERTIES_IFACE)
2374
8b67f649
LR
2375def test_dbus_pmf(dev, apdev):
2376 """D-Bus Get/Set Pmf"""
2377 try:
2378 _test_dbus_pmf(dev, apdev)
2379 finally:
2380 dev[0].request("SET pmf 0")
2381
2382def _test_dbus_pmf(dev, apdev):
2383 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2384
2385 dev[0].set("pmf", "0")
2386 res = if_obj.Get(WPAS_DBUS_IFACE, "Pmf",
2387 dbus_interface=dbus.PROPERTIES_IFACE)
2388 if res != "0":
2389 raise Exception("Unexpected initial Pmf value: %s" % res)
2390
2391 for i in range(3):
2392 if_obj.Set(WPAS_DBUS_IFACE, "Pmf", str(i),
2393 dbus_interface=dbus.PROPERTIES_IFACE)
2394 res = if_obj.Get(WPAS_DBUS_IFACE, "Pmf",
2395 dbus_interface=dbus.PROPERTIES_IFACE)
2396 if res != str(i):
2397 raise Exception("Unexpected Pmf value %s (expected %d)" % (res, i))
2398
2399 if_obj.Set(WPAS_DBUS_IFACE, "Pmf", "1",
2400 dbus_interface=dbus.PROPERTIES_IFACE)
2401
2ec82e67
JM
2402def test_dbus_fastreauth(dev, apdev):
2403 """D-Bus Get/Set FastReauth"""
910eb5b5 2404 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2405
2406 res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth",
2407 dbus_interface=dbus.PROPERTIES_IFACE)
2408 if res != True:
2409 raise Exception("Unexpected initial FastReauth value: " + str(res))
2410
2411 for i in [ False, True ]:
2412 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(i),
2413 dbus_interface=dbus.PROPERTIES_IFACE)
2414 res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth",
2415 dbus_interface=dbus.PROPERTIES_IFACE)
2416 if res != i:
2417 raise Exception("Unexpected FastReauth value %d (expected %d)" % (res, i))
2418
2419 try:
2420 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Int16(-1),
2421 dbus_interface=dbus.PROPERTIES_IFACE)
2422 raise Exception("Invalid Set(FastReauth,-1) accepted")
bab493b9 2423 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2424 if "Error.Failed: wrong property type" not in str(e):
2425 raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e))
2426
2427 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(True),
2428 dbus_interface=dbus.PROPERTIES_IFACE)
2429
2430def test_dbus_bss_expire(dev, apdev):
2431 """D-Bus Get/Set BSSExpireAge and BSSExpireCount"""
910eb5b5 2432 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2433
2434 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(179),
2435 dbus_interface=dbus.PROPERTIES_IFACE)
2436 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireAge",
2437 dbus_interface=dbus.PROPERTIES_IFACE)
2438 if res != 179:
2439 raise Exception("Unexpected BSSExpireAge value %d (expected %d)" % (res, i))
2440
2441 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(3),
2442 dbus_interface=dbus.PROPERTIES_IFACE)
2443 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireCount",
2444 dbus_interface=dbus.PROPERTIES_IFACE)
2445 if res != 3:
2446 raise Exception("Unexpected BSSExpireCount value %d (expected %d)" % (res, i))
2447
2448 try:
2449 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.Int16(-1),
2450 dbus_interface=dbus.PROPERTIES_IFACE)
2451 raise Exception("Invalid Set(BSSExpireAge,-1) accepted")
bab493b9 2452 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2453 if "Error.Failed: wrong property type" not in str(e):
2454 raise Exception("Unexpected error message for invalid Set(BSSExpireAge,-1): " + str(e))
2455
2456 try:
2457 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(9),
2458 dbus_interface=dbus.PROPERTIES_IFACE)
2459 raise Exception("Invalid Set(BSSExpireAge,9) accepted")
bab493b9 2460 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2461 if "Error.Failed: BSSExpireAge must be >= 10" not in str(e):
2462 raise Exception("Unexpected error message for invalid Set(BSSExpireAge,9): " + str(e))
2463
2464 try:
2465 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.Int16(-1),
2466 dbus_interface=dbus.PROPERTIES_IFACE)
2467 raise Exception("Invalid Set(BSSExpireCount,-1) accepted")
bab493b9 2468 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2469 if "Error.Failed: wrong property type" not in str(e):
2470 raise Exception("Unexpected error message for invalid Set(BSSExpireCount,-1): " + str(e))
2471
2472 try:
2473 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(0),
2474 dbus_interface=dbus.PROPERTIES_IFACE)
2475 raise Exception("Invalid Set(BSSExpireCount,0) accepted")
bab493b9 2476 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2477 if "Error.Failed: BSSExpireCount must be > 0" not in str(e):
2478 raise Exception("Unexpected error message for invalid Set(BSSExpireCount,0): " + str(e))
2479
2480 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(180),
2481 dbus_interface=dbus.PROPERTIES_IFACE)
2482 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(2),
2483 dbus_interface=dbus.PROPERTIES_IFACE)
2484
2485def test_dbus_country(dev, apdev):
2486 """D-Bus Get/Set Country"""
2487 try:
81e787b7 2488 _test_dbus_country(dev, apdev)
2ec82e67
JM
2489 finally:
2490 dev[0].request("SET country 00")
2491 subprocess.call(['iw', 'reg', 'set', '00'])
2492
2493def _test_dbus_country(dev, apdev):
910eb5b5 2494 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2495
2496 # work around issues with possible pending regdom event from the end of
2497 # the previous test case
2498 time.sleep(0.2)
2499 dev[0].dump_monitor()
2500
2501 if_obj.Set(WPAS_DBUS_IFACE, "Country", "FI",
2502 dbus_interface=dbus.PROPERTIES_IFACE)
2503 res = if_obj.Get(WPAS_DBUS_IFACE, "Country",
2504 dbus_interface=dbus.PROPERTIES_IFACE)
2505 if res != "FI":
2506 raise Exception("Unexpected Country value %s (expected FI)" % res)
2507
2508 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
2509 if ev is None:
cb346b49
JM
2510 # For now, work around separate P2P Device interface event delivery
2511 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
2512 if ev is None:
2513 raise Exception("regdom change event not seen")
2ec82e67
JM
2514 if "init=USER type=COUNTRY alpha2=FI" not in ev:
2515 raise Exception("Unexpected event contents: " + ev)
2516
2517 try:
2518 if_obj.Set(WPAS_DBUS_IFACE, "Country", dbus.Int16(-1),
2519 dbus_interface=dbus.PROPERTIES_IFACE)
2520 raise Exception("Invalid Set(Country,-1) accepted")
bab493b9 2521 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2522 if "Error.Failed: wrong property type" not in str(e):
2523 raise Exception("Unexpected error message for invalid Set(Country,-1): " + str(e))
2524
2525 try:
2526 if_obj.Set(WPAS_DBUS_IFACE, "Country", "F",
2527 dbus_interface=dbus.PROPERTIES_IFACE)
2528 raise Exception("Invalid Set(Country,F) accepted")
bab493b9 2529 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2530 if "Error.Failed: invalid country code" not in str(e):
2531 raise Exception("Unexpected error message for invalid Set(Country,F): " + str(e))
2532
2533 if_obj.Set(WPAS_DBUS_IFACE, "Country", "00",
2534 dbus_interface=dbus.PROPERTIES_IFACE)
2535
2536 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
2537 if ev is None:
cb346b49
JM
2538 # For now, work around separate P2P Device interface event delivery
2539 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
2540 if ev is None:
2541 raise Exception("regdom change event not seen")
be90370b
JM
2542 # init=CORE was previously used due to invalid db.txt data for 00. For
2543 # now, allow both it and the new init=USER after fixed db.txt.
2544 if "init=CORE type=WORLD" not in ev and "init=USER type=WORLD" not in ev:
2ec82e67
JM
2545 raise Exception("Unexpected event contents: " + ev)
2546
2547def test_dbus_scan_interval(dev, apdev):
2548 """D-Bus Get/Set ScanInterval"""
2549 try:
2550 _test_dbus_scan_interval(dev, apdev)
2551 finally:
2552 dev[0].request("SCAN_INTERVAL 5")
2553
2554def _test_dbus_scan_interval(dev, apdev):
910eb5b5 2555 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2556
2557 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(3),
2558 dbus_interface=dbus.PROPERTIES_IFACE)
2559 res = if_obj.Get(WPAS_DBUS_IFACE, "ScanInterval",
2560 dbus_interface=dbus.PROPERTIES_IFACE)
2561 if res != 3:
2562 raise Exception("Unexpected ScanInterval value %d (expected %d)" % (res, i))
2563
2564 try:
2565 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.UInt16(100),
2566 dbus_interface=dbus.PROPERTIES_IFACE)
2567 raise Exception("Invalid Set(ScanInterval,100) accepted")
bab493b9 2568 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2569 if "Error.Failed: wrong property type" not in str(e):
2570 raise Exception("Unexpected error message for invalid Set(ScanInterval,100): " + str(e))
2571
2572 try:
2573 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(-1),
2574 dbus_interface=dbus.PROPERTIES_IFACE)
2575 raise Exception("Invalid Set(ScanInterval,-1) accepted")
bab493b9 2576 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2577 if "Error.Failed: scan_interval must be >= 0" not in str(e):
2578 raise Exception("Unexpected error message for invalid Set(ScanInterval,-1): " + str(e))
2579
2580 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(5),
2581 dbus_interface=dbus.PROPERTIES_IFACE)
2582
2583def test_dbus_probe_req_reporting(dev, apdev):
2584 """D-Bus Probe Request reporting"""
910eb5b5 2585 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67 2586
2ec82e67
JM
2587 dev[1].p2p_find(social=True)
2588
2589 class TestDbusProbe(TestDbus):
2590 def __init__(self, bus):
2591 TestDbus.__init__(self, bus)
2592 self.reported = False
2593
2594 def __enter__(self):
2595 gobject.timeout_add(1, self.run_test)
2596 gobject.timeout_add(15000, self.timeout)
cb346b49
JM
2597 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
2598 "GroupStarted")
2ec82e67
JM
2599 self.add_signal(self.probeRequest, WPAS_DBUS_IFACE, "ProbeRequest",
2600 byte_arrays=True)
2ec82e67
JM
2601 self.loop.run()
2602 return self
2603
cb346b49
JM
2604 def groupStarted(self, properties):
2605 logger.debug("groupStarted: " + str(properties))
2606 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
2607 properties['interface_object'])
2608 self.iface = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE)
2609 self.iface.SubscribeProbeReq()
2610 self.group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2611
2ec82e67
JM
2612 def probeRequest(self, args):
2613 logger.debug("probeRequest: args=%s" % str(args))
2614 self.reported = True
2615 self.loop.quit()
2616
2617 def run_test(self, *args):
2618 logger.debug("run_test")
cb346b49
JM
2619 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2620 params = dbus.Dictionary({ 'frequency': 2412 })
2621 p2p.GroupAdd(params)
2ec82e67
JM
2622 return False
2623
2624 def success(self):
2625 return self.reported
2626
2627 with TestDbusProbe(bus) as t:
2628 if not t.success():
2629 raise Exception("Expected signals not seen")
cb346b49
JM
2630 t.iface.UnsubscribeProbeReq()
2631 try:
2632 t.iface.UnsubscribeProbeReq()
2633 raise Exception("Invalid UnsubscribeProbeReq() accepted")
bab493b9 2634 except dbus.exceptions.DBusException as e:
cb346b49
JM
2635 if "NoSubscription" not in str(e):
2636 raise Exception("Unexpected error message for invalid UnsubscribeProbeReq(): " + str(e))
2637 t.group_p2p.Disconnect()
2ec82e67
JM
2638
2639 with TestDbusProbe(bus) as t:
2640 if not t.success():
2641 raise Exception("Expected signals not seen")
2642 # On purpose, leave ProbeReq subscription in place to test automatic
2643 # cleanup.
2644
2645 dev[1].p2p_stop_find()
2ec82e67 2646
0e126c6d
JM
2647def test_dbus_probe_req_reporting_oom(dev, apdev):
2648 """D-Bus Probe Request reporting (OOM)"""
2649 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2650 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2651
fb9adae4
JM
2652 # Need to make sure this process has not already subscribed to avoid false
2653 # failures due to the operation succeeding due to os_strdup() not even
2654 # getting called.
2655 try:
2656 iface.UnsubscribeProbeReq()
2657 was_subscribed = True
bab493b9 2658 except dbus.exceptions.DBusException as e:
fb9adae4
JM
2659 was_subscribed = False
2660 pass
2661
0e126c6d
JM
2662 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_subscribe_preq",
2663 "SubscribeProbeReq"):
2664 iface.SubscribeProbeReq()
2665
fb9adae4
JM
2666 if was_subscribed:
2667 # On purpose, leave ProbeReq subscription in place to test automatic
2668 # cleanup.
2669 iface.SubscribeProbeReq()
2670
2ec82e67
JM
2671def test_dbus_p2p_invalid(dev, apdev):
2672 """D-Bus invalid P2P operations"""
910eb5b5 2673 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2674 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2675
2676 try:
2677 p2p.RejectPeer(path + "/Peers/00112233445566")
2678 raise Exception("Invalid RejectPeer accepted")
bab493b9 2679 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2680 if "UnknownError: Failed to call wpas_p2p_reject" not in str(e):
2681 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2682
2683 try:
2684 p2p.RejectPeer("/foo")
2685 raise Exception("Invalid RejectPeer accepted")
bab493b9 2686 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2687 if "InvalidArgs" not in str(e):
2688 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2689
001c4bf5
JM
2690 tests = [ { },
2691 { 'peer': 'foo' },
2692 { 'foo': "bar" },
2693 { 'iface': "abc" },
2694 { 'iface': 123 } ]
2695 for t in tests:
2696 try:
2697 p2p.RemoveClient(t)
2698 raise Exception("Invalid RemoveClient accepted")
bab493b9 2699 except dbus.exceptions.DBusException as e:
001c4bf5
JM
2700 if "InvalidArgs" not in str(e):
2701 raise Exception("Unexpected error message for invalid RemoveClient(): " + str(e))
2702
2ec82e67
JM
2703 tests = [ {'DiscoveryType': 'foo'},
2704 {'RequestedDeviceTypes': 'foo'},
2705 {'RequestedDeviceTypes': ['foo']},
2706 {'RequestedDeviceTypes': ['1','2','3','4','5','6','7','8','9',
2707 '10','11','12','13','14','15','16',
2708 '17']},
2709 {'RequestedDeviceTypes': dbus.Array([], signature="s")},
2710 {'RequestedDeviceTypes': dbus.Array([['foo']], signature="as")},
2711 {'RequestedDeviceTypes': dbus.Array([], signature="i")},
15dfcb69
MH
2712 {'RequestedDeviceTypes': [dbus.ByteArray(b'12345678'),
2713 dbus.ByteArray(b'1234567')]},
2ec82e67
JM
2714 {'Foo': dbus.Int16(1)},
2715 {'Foo': dbus.UInt16(1)},
2716 {'Foo': dbus.Int64(1)},
2717 {'Foo': dbus.UInt64(1)},
2718 {'Foo': dbus.Double(1.23)},
2719 {'Foo': dbus.Signature('s')},
2720 {'Foo': 'bar'}]
2721 for t in tests:
2722 try:
2723 p2p.Find(dbus.Dictionary(t))
2724 raise Exception("Invalid Find accepted")
bab493b9 2725 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2726 if "InvalidArgs" not in str(e):
2727 raise Exception("Unexpected error message for invalid Find(): " + str(e))
2728
2729 for p in [ "/foo",
2730 "/fi/w1/wpa_supplicant1/Interfaces/1234",
2731 "/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234" ]:
2732 try:
2733 p2p.RemovePersistentGroup(dbus.ObjectPath(p))
2734 raise Exception("Invalid RemovePersistentGroup accepted")
bab493b9 2735 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2736 if "InvalidArgs" not in str(e):
2737 raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e))
2738
2739 try:
2740 dev[0].request("P2P_SET disabled 1")
2741 p2p.Listen(5)
2742 raise Exception("Invalid Listen accepted")
bab493b9 2743 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2744 if "UnknownError: Could not start P2P listen" not in str(e):
2745 raise Exception("Unexpected error message for invalid Listen: " + str(e))
2746 finally:
2747 dev[0].request("P2P_SET disabled 0")
2748
2749 test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False)
2750 test_p2p = dbus.Interface(test_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2751 try:
2752 test_p2p.Listen("foo")
2753 raise Exception("Invalid Listen accepted")
bab493b9 2754 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2755 if "InvalidArgs" not in str(e):
2756 raise Exception("Unexpected error message for invalid Listen: " + str(e))
2757
2758 try:
2759 dev[0].request("P2P_SET disabled 1")
2760 p2p.ExtendedListen(dbus.Dictionary({}))
2761 raise Exception("Invalid ExtendedListen accepted")
bab493b9 2762 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2763 if "UnknownError: failed to initiate a p2p_ext_listen" not in str(e):
2764 raise Exception("Unexpected error message for invalid ExtendedListen: " + str(e))
2765 finally:
2766 dev[0].request("P2P_SET disabled 0")
2767
2768 try:
2769 dev[0].request("P2P_SET disabled 1")
2770 args = { 'duration1': 30000, 'interval1': 102400,
2771 'duration2': 20000, 'interval2': 102400 }
2772 p2p.PresenceRequest(args)
2773 raise Exception("Invalid PresenceRequest accepted")
bab493b9 2774 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2775 if "UnknownError: Failed to invoke presence request" not in str(e):
2776 raise Exception("Unexpected error message for invalid PresenceRequest: " + str(e))
2777 finally:
2778 dev[0].request("P2P_SET disabled 0")
2779
2780 try:
2781 params = dbus.Dictionary({'frequency': dbus.Int32(-1)})
2782 p2p.GroupAdd(params)
2783 raise Exception("Invalid GroupAdd accepted")
bab493b9 2784 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2785 if "InvalidArgs" not in str(e):
2786 raise Exception("Unexpected error message for invalid GroupAdd: " + str(e))
2787
2788 try:
2789 params = dbus.Dictionary({'persistent_group_object':
2790 dbus.ObjectPath(path),
2791 'frequency': 2412})
2792 p2p.GroupAdd(params)
2793 raise Exception("Invalid GroupAdd accepted")
bab493b9 2794 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2795 if "InvalidArgs" not in str(e):
2796 raise Exception("Unexpected error message for invalid GroupAdd: " + str(e))
2797
2798 try:
2799 p2p.Disconnect()
2800 raise Exception("Invalid Disconnect accepted")
bab493b9 2801 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2802 if "UnknownError: failed to disconnect" not in str(e):
2803 raise Exception("Unexpected error message for invalid Disconnect: " + str(e))
2804
2805 try:
2806 dev[0].request("P2P_SET disabled 1")
2807 p2p.Flush()
2808 raise Exception("Invalid Flush accepted")
bab493b9 2809 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2810 if "Error.Failed: P2P is not available for this interface" not in str(e):
2811 raise Exception("Unexpected error message for invalid Flush: " + str(e))
2812 finally:
2813 dev[0].request("P2P_SET disabled 0")
2814
2815 try:
2816 dev[0].request("P2P_SET disabled 1")
2817 args = { 'peer': path,
2818 'join': True,
2819 'wps_method': 'pbc',
2820 'frequency': 2412 }
2821 pin = p2p.Connect(args)
2822 raise Exception("Invalid Connect accepted")
bab493b9 2823 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2824 if "Error.Failed: P2P is not available for this interface" not in str(e):
2825 raise Exception("Unexpected error message for invalid Connect: " + str(e))
2826 finally:
2827 dev[0].request("P2P_SET disabled 0")
2828
2829 tests = [ { 'frequency': dbus.Int32(-1) },
2830 { 'wps_method': 'pbc' },
2831 { 'wps_method': 'foo' } ]
2832 for args in tests:
2833 try:
2834 pin = p2p.Connect(args)
2835 raise Exception("Invalid Connect accepted")
bab493b9 2836 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2837 if "InvalidArgs" not in str(e):
2838 raise Exception("Unexpected error message for invalid Connect: " + str(e))
2839
2840 try:
2841 dev[0].request("P2P_SET disabled 1")
2842 args = { 'peer': path }
2843 pin = p2p.Invite(args)
2844 raise Exception("Invalid Invite accepted")
bab493b9 2845 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2846 if "Error.Failed: P2P is not available for this interface" not in str(e):
2847 raise Exception("Unexpected error message for invalid Invite: " + str(e))
2848 finally:
2849 dev[0].request("P2P_SET disabled 0")
2850
2851 try:
2852 args = { 'foo': 'bar' }
2853 pin = p2p.Invite(args)
2854 raise Exception("Invalid Invite accepted")
bab493b9 2855 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2856 if "InvalidArgs" not in str(e):
2857 raise Exception("Unexpected error message for invalid Connect: " + str(e))
2858
2859 tests = [ (path, 'display', "InvalidArgs"),
2860 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2861 'display',
2862 "UnknownError: Failed to send provision discovery request"),
2863 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2864 'keypad',
2865 "UnknownError: Failed to send provision discovery request"),
2866 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2867 'pbc',
2868 "UnknownError: Failed to send provision discovery request"),
2869 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2870 'pushbutton',
2871 "UnknownError: Failed to send provision discovery request"),
2872 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2873 'foo', "InvalidArgs") ]
2874 for (p,method,err) in tests:
2875 try:
2876 p2p.ProvisionDiscoveryRequest(p, method)
2877 raise Exception("Invalid ProvisionDiscoveryRequest accepted")
bab493b9 2878 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2879 if err not in str(e):
2880 raise Exception("Unexpected error message for invalid ProvisionDiscoveryRequest: " + str(e))
2881
2882 try:
2883 dev[0].request("P2P_SET disabled 1")
2884 if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers",
2885 dbus_interface=dbus.PROPERTIES_IFACE)
2886 raise Exception("Invalid Get(Peers) accepted")
bab493b9 2887 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2888 if "Error.Failed: P2P is not available for this interface" not in str(e):
2889 raise Exception("Unexpected error message for invalid Get(Peers): " + str(e))
2890 finally:
2891 dev[0].request("P2P_SET disabled 0")
2892
0e126c6d
JM
2893def test_dbus_p2p_oom(dev, apdev):
2894 """D-Bus P2P operations and OOM"""
2895 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2896 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2897
2898 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_string_array",
2899 "Find", "InvalidArgs"):
2900 p2p.Find(dbus.Dictionary({ 'Foo': [ 'bar' ] }))
2901
2902 with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_string_array",
2903 "Find", "InvalidArgs"):
2904 p2p.Find(dbus.Dictionary({ 'Foo': [ 'bar' ] }))
2905
2906 with alloc_fail_dbus(dev[0], 10, "_wpa_dbus_dict_entry_get_string_array",
2907 "Find", "InvalidArgs"):
2908 p2p.Find(dbus.Dictionary({ 'Foo': [ '1','2','3','4','5','6','7','8','9' ] }))
2909
2910 with alloc_fail_dbus(dev[0], 1, ":=_wpa_dbus_dict_entry_get_binarray",
2911 "Find", "InvalidArgs"):
15dfcb69 2912 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray(b'123') ] }))
0e126c6d
JM
2913
2914 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array;_wpa_dbus_dict_entry_get_binarray",
2915 "Find", "InvalidArgs"):
15dfcb69 2916 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray(b'123') ] }))
0e126c6d
JM
2917
2918 with alloc_fail_dbus(dev[0], 2, "=_wpa_dbus_dict_entry_get_binarray",
2919 "Find", "InvalidArgs"):
15dfcb69
MH
2920 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray(b'123'),
2921 dbus.ByteArray(b'123'),
2922 dbus.ByteArray(b'123'),
2923 dbus.ByteArray(b'123'),
2924 dbus.ByteArray(b'123'),
2925 dbus.ByteArray(b'123'),
2926 dbus.ByteArray(b'123'),
2927 dbus.ByteArray(b'123'),
2928 dbus.ByteArray(b'123'),
2929 dbus.ByteArray(b'123'),
2930 dbus.ByteArray(b'123') ] }))
0e126c6d
JM
2931
2932 with alloc_fail_dbus(dev[0], 1, "wpabuf_alloc_ext_data;_wpa_dbus_dict_entry_get_binarray",
2933 "Find", "InvalidArgs"):
15dfcb69 2934 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray(b'123') ] }))
0e126c6d
JM
2935
2936 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_handler_p2p_find",
2937 "Find", "InvalidArgs"):
2938 p2p.Find(dbus.Dictionary({ 'Foo': path }))
2939
2940 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array",
2941 "AddService", "InvalidArgs"):
2942 args = { 'service_type': 'bonjour',
15dfcb69 2943 'response': dbus.ByteArray(500*b'b') }
0e126c6d
JM
2944 p2p.AddService(args)
2945
2946 with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_byte_array",
2947 "AddService", "InvalidArgs"):
2948 p2p.AddService(args)
2949
2ec82e67
JM
2950def test_dbus_p2p_discovery(dev, apdev):
2951 """D-Bus P2P discovery"""
6a94fdf2
JM
2952 try:
2953 run_dbus_p2p_discovery(dev, apdev)
2954 finally:
2955 dev[1].request("VENDOR_ELEM_REMOVE 1 *")
2956
2957def run_dbus_p2p_discovery(dev, apdev):
910eb5b5 2958 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2959 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2960
2961 addr0 = dev[0].p2p_dev_addr()
2962
2963 dev[1].request("SET sec_device_type 1-0050F204-2")
2964 dev[1].request("VENDOR_ELEM_ADD 1 dd0c0050f2041049000411223344")
6a94fdf2 2965 dev[1].request("VENDOR_ELEM_ADD 1 dd06001122335566")
2ec82e67
JM
2966 dev[1].p2p_listen()
2967 addr1 = dev[1].p2p_dev_addr()
2968 a1 = binascii.unhexlify(addr1.replace(':',''))
2969
2970 wfd_devinfo = "00001c440028"
2971 dev[2].request("SET wifi_display 1")
2972 dev[2].request("WFD_SUBELEM_SET 0 0006" + wfd_devinfo)
2973 wfd = binascii.unhexlify('000006' + wfd_devinfo)
2974 dev[2].p2p_listen()
2975 addr2 = dev[2].p2p_dev_addr()
2976 a2 = binascii.unhexlify(addr2.replace(':',''))
2977
2978 res = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
2979 dbus_interface=dbus.PROPERTIES_IFACE)
2980 if 'Peers' not in res:
2981 raise Exception("GetAll result missing Peers")
2982 if len(res['Peers']) != 0:
2983 raise Exception("Unexpected peer(s) in the list")
2984
2985 args = {'DiscoveryType': 'social',
15dfcb69 2986 'RequestedDeviceTypes': [dbus.ByteArray(b'12345678')],
2ec82e67
JM
2987 'Timeout': dbus.Int32(1) }
2988 p2p.Find(dbus.Dictionary(args))
2989 p2p.StopFind()
2990
2991 class TestDbusP2p(TestDbus):
2992 def __init__(self, bus):
2993 TestDbus.__init__(self, bus)
2994 self.found = False
2995 self.found2 = False
e7d454bb 2996 self.found_prop = False
2ec82e67 2997 self.lost = False
571a1af2 2998 self.find_stopped = False
2ec82e67
JM
2999
3000 def __enter__(self):
3001 gobject.timeout_add(1, self.run_test)
3002 gobject.timeout_add(15000, self.timeout)
3003 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3004 "DeviceFound")
e7d454bb
JM
3005 self.add_signal(self.deviceFoundProperties,
3006 WPAS_DBUS_IFACE_P2PDEVICE, "DeviceFoundProperties")
2ec82e67
JM
3007 self.add_signal(self.deviceLost, WPAS_DBUS_IFACE_P2PDEVICE,
3008 "DeviceLost")
3009 self.add_signal(self.provisionDiscoveryResponseEnterPin,
3010 WPAS_DBUS_IFACE_P2PDEVICE,
3011 "ProvisionDiscoveryResponseEnterPin")
571a1af2
JM
3012 self.add_signal(self.findStopped, WPAS_DBUS_IFACE_P2PDEVICE,
3013 "FindStopped")
2ec82e67
JM
3014 self.loop.run()
3015 return self
3016
3017 def deviceFound(self, path):
3018 logger.debug("deviceFound: path=%s" % path)
3019 res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers",
3020 dbus_interface=dbus.PROPERTIES_IFACE)
3021 if len(res) < 1:
3022 raise Exception("Unexpected number of peers")
3023 if path not in res:
3024 raise Exception("Mismatch in peer object path")
3025 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3026 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3027 dbus_interface=dbus.PROPERTIES_IFACE,
3028 byte_arrays=True)
3029 logger.debug("peer properties: " + str(res))
3030
3031 if res['DeviceAddress'] == a1:
3032 if 'SecondaryDeviceTypes' not in res:
3033 raise Exception("Missing SecondaryDeviceTypes")
3034 sec = res['SecondaryDeviceTypes']
3035 if len(sec) < 1:
3036 raise Exception("Secondary device type missing")
15dfcb69 3037 if b"\x00\x01\x00\x50\xF2\x04\x00\x02" not in sec:
2ec82e67
JM
3038 raise Exception("Secondary device type mismatch")
3039
3040 if 'VendorExtension' not in res:
3041 raise Exception("Missing VendorExtension")
3042 vendor = res['VendorExtension']
3043 if len(vendor) < 1:
3044 raise Exception("Vendor extension missing")
15dfcb69 3045 if b"\x11\x22\x33\x44" not in vendor:
2ec82e67
JM
3046 raise Exception("Secondary device type mismatch")
3047
6a94fdf2
JM
3048 if 'VSIE' not in res:
3049 raise Exception("Missing VSIE")
3050 vendor = res['VSIE']
3051 if len(vendor) < 1:
3052 raise Exception("VSIE missing")
15dfcb69 3053 if vendor != b"\xdd\x06\x00\x11\x22\x33\x55\x66":
6a94fdf2
JM
3054 raise Exception("VSIE mismatch")
3055
2ec82e67
JM
3056 self.found = True
3057 elif res['DeviceAddress'] == a2:
3058 if 'IEs' not in res:
3059 raise Exception("IEs missing")
3060 if res['IEs'] != wfd:
3061 raise Exception("IEs mismatch")
3062 self.found2 = True
3063 else:
3064 raise Exception("Unexpected peer device address")
3065
3066 if self.found and self.found2:
3067 p2p.StopFind()
3068 p2p.RejectPeer(path)
3069 p2p.ProvisionDiscoveryRequest(path, 'display')
3070
3071 def deviceLost(self, path):
3072 logger.debug("deviceLost: path=%s" % path)
50d7cded
JM
3073 if not self.found or not self.found2:
3074 # This may happen if a previous test case ended up scheduling
3075 # deviceLost event and that event did not get delivered before
3076 # starting the next test execution.
3077 logger.debug("Ignore deviceLost before the deviceFound events")
3078 return
2ec82e67
JM
3079 self.lost = True
3080 try:
3081 p2p.RejectPeer(path)
3082 raise Exception("Invalid RejectPeer accepted")
bab493b9 3083 except dbus.exceptions.DBusException as e:
2ec82e67
JM
3084 if "UnknownError: Failed to call wpas_p2p_reject" not in str(e):
3085 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
3086 self.loop.quit()
3087
e7d454bb
JM
3088 def deviceFoundProperties(self, path, properties):
3089 logger.debug("deviceFoundProperties: path=%s" % path)
3090 logger.debug("peer properties: " + str(properties))
3091 if properties['DeviceAddress'] == a1:
3092 self.found_prop = True
3093
2ec82e67
JM
3094 def provisionDiscoveryResponseEnterPin(self, peer_object):
3095 logger.debug("provisionDiscoveryResponseEnterPin - peer=%s" % peer_object)
3096 p2p.Flush()
3097
571a1af2
JM
3098 def findStopped(self):
3099 logger.debug("findStopped")
3100 self.find_stopped = True
3101
2ec82e67
JM
3102 def run_test(self, *args):
3103 logger.debug("run_test")
3104 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social',
3105 'Timeout': dbus.Int32(10)}))
3106 return False
3107
3108 def success(self):
571a1af2 3109 return self.found and self.lost and self.found2 and self.find_stopped
2ec82e67
JM
3110
3111 with TestDbusP2p(bus) as t:
3112 if not t.success():
3113 raise Exception("Expected signals not seen")
3114
3115 dev[1].request("VENDOR_ELEM_REMOVE 1 *")
3116 dev[1].p2p_stop_find()
3117
3118 p2p.Listen(1)
3119 dev[2].p2p_stop_find()
3120 dev[2].request("P2P_FLUSH")
3121 if not dev[2].discover_peer(addr0):
3122 raise Exception("Peer not found")
3123 p2p.StopFind()
3124 dev[2].p2p_stop_find()
3125
3126 try:
3127 p2p.ExtendedListen(dbus.Dictionary({'foo': 100}))
3128 raise Exception("Invalid ExtendedListen accepted")
bab493b9 3129 except dbus.exceptions.DBusException as e:
2ec82e67
JM
3130 if "InvalidArgs" not in str(e):
3131 raise Exception("Unexpected error message for invalid ExtendedListen(): " + str(e))
3132
3133 p2p.ExtendedListen(dbus.Dictionary({'period': 100, 'interval': 1000}))
3134 p2p.ExtendedListen(dbus.Dictionary({}))
3135 dev[0].global_request("P2P_EXT_LISTEN")
3136
6c44d97b
JM
3137def test_dbus_p2p_discovery_freq(dev, apdev):
3138 """D-Bus P2P discovery on a specific non-social channel"""
3139 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3140 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3141
3142 addr1 = dev[1].p2p_dev_addr()
3143 autogo(dev[1], freq=2422)
3144
3145 class TestDbusP2p(TestDbus):
3146 def __init__(self, bus):
3147 TestDbus.__init__(self, bus)
3148 self.found = False
3149
3150 def __enter__(self):
3151 gobject.timeout_add(1, self.run_test)
3152 gobject.timeout_add(5000, self.timeout)
3153 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3154 "DeviceFound")
3155 self.loop.run()
3156 return self
3157
3158 def deviceFound(self, path):
3159 logger.debug("deviceFound: path=%s" % path)
3160 self.found = True
3161 self.loop.quit()
3162
3163 def run_test(self, *args):
3164 logger.debug("run_test")
3165 p2p.Find(dbus.Dictionary({'freq': 2422}))
3166 return False
3167
3168 def success(self):
3169 return self.found
3170
3171 with TestDbusP2p(bus) as t:
3172 if not t.success():
3173 raise Exception("Expected signals not seen")
3174
3175 dev[1].remove_group()
3176 p2p.StopFind()
3177
2ec82e67
JM
3178def test_dbus_p2p_service_discovery(dev, apdev):
3179 """D-Bus P2P service discovery"""
910eb5b5 3180 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3181 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3182
3183 addr0 = dev[0].p2p_dev_addr()
3184 addr1 = dev[1].p2p_dev_addr()
3185
3186 bonjour_query = dbus.ByteArray(binascii.unhexlify('0b5f6166706f766572746370c00c000c01'))
3187 bonjour_response = dbus.ByteArray(binascii.unhexlify('074578616d706c65c027'))
db98b587 3188
2ec82e67
JM
3189 args = { 'service_type': 'bonjour',
3190 'query': bonjour_query,
3191 'response': bonjour_response }
3192 p2p.AddService(args)
3193 p2p.FlushService()
3194 p2p.AddService(args)
3195
3196 try:
3197 p2p.DeleteService(args)
3198 raise Exception("Invalid DeleteService() accepted")
bab493b9 3199 except dbus.exceptions.DBusException as e:
2ec82e67
JM
3200 if "InvalidArgs" not in str(e):
3201 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
3202
3203 args = { 'service_type': 'bonjour',
3204 'query': bonjour_query }
3205 p2p.DeleteService(args)
3206 try:
3207 p2p.DeleteService(args)
3208 raise Exception("Invalid DeleteService() accepted")
bab493b9 3209 except dbus.exceptions.DBusException as e:
2ec82e67
JM
3210 if "InvalidArgs" not in str(e):
3211 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
3212
3213 args = { 'service_type': 'upnp',
3214 'version': 0x10,
3215 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' }
3216 p2p.AddService(args)
3217 p2p.DeleteService(args)
3218 try:
3219 p2p.DeleteService(args)
3220 raise Exception("Invalid DeleteService() accepted")
bab493b9 3221 except dbus.exceptions.DBusException as e:
2ec82e67
JM
3222 if "InvalidArgs" not in str(e):
3223 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
3224
3225 tests = [ { 'service_type': 'foo' },
3226 { 'service_type': 'foo', 'query': bonjour_query },
3227 { 'service_type': 'upnp' },
3228 { 'service_type': 'upnp', 'version': 0x10 },
3229 { 'service_type': 'upnp',
3230 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
3231 { 'version': 0x10,
3232 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
3233 { 'service_type': 'upnp', 'foo': 'bar' },
3234 { 'service_type': 'bonjour' },
3235 { 'service_type': 'bonjour', 'query': 'foo' },
3236 { 'service_type': 'bonjour', 'foo': 'bar' } ]
3237 for args in tests:
3238 try:
3239 p2p.DeleteService(args)
3240 raise Exception("Invalid DeleteService() accepted")
bab493b9 3241 except dbus.exceptions.DBusException as e:
2ec82e67
JM
3242 if "InvalidArgs" not in str(e):
3243 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
3244
3245 tests = [ { 'service_type': 'foo' },
3246 { 'service_type': 'upnp' },
3247 { 'service_type': 'upnp', 'version': 0x10 },
3248 { 'service_type': 'upnp',
3249 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
3250 { 'version': 0x10,
3251 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
3252 { 'service_type': 'upnp', 'foo': 'bar' },
3253 { 'service_type': 'bonjour' },
3254 { 'service_type': 'bonjour', 'query': 'foo' },
3255 { 'service_type': 'bonjour', 'response': 'foo' },
3256 { 'service_type': 'bonjour', 'query': bonjour_query },
3257 { 'service_type': 'bonjour', 'response': bonjour_response },
15dfcb69 3258 { 'service_type': 'bonjour', 'query': dbus.ByteArray(500*b'a') },
2ec82e67
JM
3259 { 'service_type': 'bonjour', 'foo': 'bar' } ]
3260 for args in tests:
3261 try:
3262 p2p.AddService(args)
3263 raise Exception("Invalid AddService() accepted")
bab493b9 3264 except dbus.exceptions.DBusException as e:
2ec82e67
JM
3265 if "InvalidArgs" not in str(e):
3266 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
3267
15dfcb69 3268 args = { 'tlv': dbus.ByteArray(b"\x02\x00\x00\x01") }
2ec82e67
JM
3269 ref = p2p.ServiceDiscoveryRequest(args)
3270 p2p.ServiceDiscoveryCancelRequest(ref)
3271 try:
3272 p2p.ServiceDiscoveryCancelRequest(ref)
3273 raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
bab493b9 3274 except dbus.exceptions.DBusException as e:
2ec82e67
JM
3275 if "InvalidArgs" not in str(e):
3276 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
3277 try:
3278 p2p.ServiceDiscoveryCancelRequest(dbus.UInt64(0))
3279 raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
bab493b9 3280 except dbus.exceptions.DBusException as e:
2ec82e67
JM
3281 if "InvalidArgs" not in str(e):
3282 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
3283
3284 args = { 'service_type': 'upnp',
3285 'version': 0x10,
3286 'service': 'ssdp:foo' }
3287 ref = p2p.ServiceDiscoveryRequest(args)
3288 p2p.ServiceDiscoveryCancelRequest(ref)
3289
3290 tests = [ { 'service_type': 'foo' },
3291 { 'foo': 'bar' },
3292 { 'tlv': 'foo' },
3293 { },
3294 { 'version': 0 },
3295 { 'service_type': 'upnp',
3296 'service': 'ssdp:foo' },
3297 { 'service_type': 'upnp',
3298 'version': 0x10 },
3299 { 'service_type': 'upnp',
3300 'version': 0x10,
3301 'service': 'ssdp:foo',
3302 'peer_object': dbus.ObjectPath(path + "/Peers") },
3303 { 'service_type': 'upnp',
3304 'version': 0x10,
3305 'service': 'ssdp:foo',
3306 'peer_object': path + "/Peers" },
3307 { 'service_type': 'upnp',
3308 'version': 0x10,
3309 'service': 'ssdp:foo',
3310 'peer_object': dbus.ObjectPath(path + "/Peers/00112233445566") } ]
3311 for args in tests:
3312 try:
3313 p2p.ServiceDiscoveryRequest(args)
3314 raise Exception("Invalid ServiceDiscoveryRequest accepted")
bab493b9 3315 except dbus.exceptions.DBusException as e:
2ec82e67
JM
3316 if "InvalidArgs" not in str(e):
3317 raise Exception("Unexpected error message for invalid ServiceDiscoveryRequest(): " + str(e))
3318
3319 args = { 'foo': 'bar' }
3320 try:
3321 p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv'))
3322 raise Exception("Invalid ServiceDiscoveryResponse accepted")
bab493b9 3323 except dbus.exceptions.DBusException as e:
2ec82e67
JM
3324 if "InvalidArgs" not in str(e):
3325 raise Exception("Unexpected error message for invalid ServiceDiscoveryResponse(): " + str(e))
3326
3327def test_dbus_p2p_service_discovery_query(dev, apdev):
3328 """D-Bus P2P service discovery query"""
910eb5b5 3329 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3330 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3331
3332 addr0 = dev[0].p2p_dev_addr()
3333 dev[1].request("P2P_SERVICE_ADD bonjour 0b5f6166706f766572746370c00c000c01 074578616d706c65c027")
3334 dev[1].p2p_listen()
3335 addr1 = dev[1].p2p_dev_addr()
3336
3337 class TestDbusP2p(TestDbus):
3338 def __init__(self, bus):
3339 TestDbus.__init__(self, bus)
3340 self.done = False
3341
3342 def __enter__(self):
3343 gobject.timeout_add(1, self.run_test)
3344 gobject.timeout_add(15000, self.timeout)
3345 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3346 "DeviceFound")
3347 self.add_signal(self.serviceDiscoveryResponse,
3348 WPAS_DBUS_IFACE_P2PDEVICE,
3349 "ServiceDiscoveryResponse", byte_arrays=True)
3350 self.loop.run()
3351 return self
3352
3353 def deviceFound(self, path):
3354 logger.debug("deviceFound: path=%s" % path)
3355 args = { 'peer_object': path,
15dfcb69 3356 'tlv': dbus.ByteArray(b"\x02\x00\x00\x01") }
2ec82e67
JM
3357 p2p.ServiceDiscoveryRequest(args)
3358
3359 def serviceDiscoveryResponse(self, sd_request):
3360 logger.debug("serviceDiscoveryResponse: sd_request=%s" % str(sd_request))
3361 self.done = True
3362 self.loop.quit()
3363
3364 def run_test(self, *args):
3365 logger.debug("run_test")
3366 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social',
3367 'Timeout': dbus.Int32(10)}))
3368 return False
3369
3370 def success(self):
3371 return self.done
3372
3373 with TestDbusP2p(bus) as t:
3374 if not t.success():
3375 raise Exception("Expected signals not seen")
3376
3377 dev[1].p2p_stop_find()
3378
3379def test_dbus_p2p_service_discovery_external(dev, apdev):
3380 """D-Bus P2P service discovery with external response"""
3381 try:
3382 _test_dbus_p2p_service_discovery_external(dev, apdev)
3383 finally:
3384 dev[0].request("P2P_SERV_DISC_EXTERNAL 0")
3385
3386def _test_dbus_p2p_service_discovery_external(dev, apdev):
910eb5b5 3387 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3388 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3389
3390 addr0 = dev[0].p2p_dev_addr()
3391 addr1 = dev[1].p2p_dev_addr()
3392 resp = "0300000101"
3393
3394 dev[1].request("P2P_FLUSH")
3395 dev[1].request("P2P_SERV_DISC_REQ " + addr0 + " 02000001")
3396 dev[1].p2p_find(social=True)
3397
3398 class TestDbusP2p(TestDbus):
3399 def __init__(self, bus):
3400 TestDbus.__init__(self, bus)
3401 self.sd = False
3402
3403 def __enter__(self):
3404 gobject.timeout_add(1, self.run_test)
3405 gobject.timeout_add(15000, self.timeout)
3406 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3407 "DeviceFound")
3408 self.add_signal(self.serviceDiscoveryRequest,
3409 WPAS_DBUS_IFACE_P2PDEVICE,
3410 "ServiceDiscoveryRequest")
3411 self.loop.run()
3412 return self
3413
3414 def deviceFound(self, path):
3415 logger.debug("deviceFound: path=%s" % path)
3416
3417 def serviceDiscoveryRequest(self, sd_request):
3418 logger.debug("serviceDiscoveryRequest: sd_request=%s" % str(sd_request))
3419 self.sd = True
3420 args = { 'peer_object': sd_request['peer_object'],
3421 'frequency': sd_request['frequency'],
3422 'dialog_token': sd_request['dialog_token'],
3423 'tlvs': dbus.ByteArray(binascii.unhexlify(resp)) }
3424 p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv'))
3425 self.loop.quit()
3426
3427 def run_test(self, *args):
3428 logger.debug("run_test")
3429 p2p.ServiceDiscoveryExternal(1)
3430 p2p.ServiceUpdate()
3431 p2p.Listen(15)
3432 return False
3433
3434 def success(self):
3435 return self.sd
3436
3437 with TestDbusP2p(bus) as t:
3438 if not t.success():
3439 raise Exception("Expected signals not seen")
3440
3441 ev = dev[1].wait_global_event(["P2P-SERV-DISC-RESP"], timeout=5)
3442 if ev is None:
3443 raise Exception("Service discovery timed out")
3444 if addr0 not in ev:
3445 raise Exception("Unexpected address in SD Response: " + ev)
3446 if ev.split(' ')[4] != resp:
3447 raise Exception("Unexpected response data SD Response: " + ev)
3448 dev[1].p2p_stop_find()
3449
3450 p2p.StopFind()
3451 p2p.ServiceDiscoveryExternal(0)
3452
3453def test_dbus_p2p_autogo(dev, apdev):
3454 """D-Bus P2P autonomous GO"""
910eb5b5 3455 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67 3456 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2ec82e67
JM
3457
3458 addr0 = dev[0].p2p_dev_addr()
3459
3460 class TestDbusP2p(TestDbus):
3461 def __init__(self, bus):
3462 TestDbus.__init__(self, bus)
3463 self.first = True
3464 self.waiting_end = False
035efb2c 3465 self.exceptions = False
001c4bf5 3466 self.deauthorized = False
2ec82e67
JM
3467 self.done = False
3468
3469 def __enter__(self):
3470 gobject.timeout_add(1, self.run_test)
3471 gobject.timeout_add(15000, self.timeout)
3472 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3473 "DeviceFound")
3474 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3475 "GroupStarted")
3476 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3477 "GroupFinished")
3478 self.add_signal(self.persistentGroupAdded,
3479 WPAS_DBUS_IFACE_P2PDEVICE,
3480 "PersistentGroupAdded")
3481 self.add_signal(self.persistentGroupRemoved,
3482 WPAS_DBUS_IFACE_P2PDEVICE,
3483 "PersistentGroupRemoved")
3484 self.add_signal(self.provisionDiscoveryRequestDisplayPin,
3485 WPAS_DBUS_IFACE_P2PDEVICE,
3486 "ProvisionDiscoveryRequestDisplayPin")
3487 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3488 "StaAuthorized")
001c4bf5
JM
3489 self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE,
3490 "StaDeauthorized")
2ec82e67
JM
3491 self.loop.run()
3492 return self
3493
3494 def groupStarted(self, properties):
3495 logger.debug("groupStarted: " + str(properties))
3496 self.group = properties['group_object']
cb346b49
JM
3497 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3498 properties['interface_object'])
3499 role = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
3500 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67 3501 if role != "GO":
035efb2c 3502 self.exceptions = True
2ec82e67 3503 raise Exception("Unexpected role reported: " + role)
cb346b49
JM
3504 group = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
3505 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67 3506 if group != properties['group_object']:
035efb2c 3507 self.exceptions = True
2ec82e67 3508 raise Exception("Unexpected Group reported: " + str(group))
cb346b49
JM
3509 go = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
3510 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67 3511 if go != '/':
035efb2c 3512 self.exceptions = True
2ec82e67
JM
3513 raise Exception("Unexpected PeerGO value: " + str(go))
3514 if self.first:
3515 self.first = False
3516 logger.info("Remove persistent group instance")
cb346b49
JM
3517 group_p2p = dbus.Interface(self.g_if_obj,
3518 WPAS_DBUS_IFACE_P2PDEVICE)
3519 group_p2p.Disconnect()
2ec82e67
JM
3520 else:
3521 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3522 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join")
3523
3524 def groupFinished(self, properties):
3525 logger.debug("groupFinished: " + str(properties))
3526 if self.waiting_end:
3527 logger.info("Remove persistent group")
3528 p2p.RemovePersistentGroup(self.persistent)
3529 else:
3530 logger.info("Re-start persistent group")
3531 params = dbus.Dictionary({'persistent_group_object':
3532 self.persistent,
3533 'frequency': 2412})
3534 p2p.GroupAdd(params)
3535
3536 def persistentGroupAdded(self, path, properties):
3537 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
3538 self.persistent = path
3539
3540 def persistentGroupRemoved(self, path):
3541 logger.debug("persistentGroupRemoved: %s" % path)
3542 self.done = True
3543 self.loop.quit()
3544
3545 def deviceFound(self, path):
3546 logger.debug("deviceFound: path=%s" % path)
3547 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3548 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3549 dbus_interface=dbus.PROPERTIES_IFACE,
3550 byte_arrays=True)
3551 logger.debug('peer properties: ' + str(self.peer))
3552
3553 def provisionDiscoveryRequestDisplayPin(self, peer_object, pin):
3554 logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin))
3555 self.peer_path = peer_object
3556 peer = binascii.unhexlify(peer_object.split('/')[-1])
c37ef330 3557 addr = ':'.join([ "%02x" % i for i in struct.unpack('6B', peer) ])
795b6f57
JM
3558
3559 params = { 'Role': 'registrar',
3560 'P2PDeviceAddress': self.peer['DeviceAddress'],
3561 'Bssid': self.peer['DeviceAddress'],
3562 'Type': 'pin' }
cb346b49 3563 wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
795b6f57
JM
3564 try:
3565 wps.Start(params)
035efb2c 3566 self.exceptions = True
795b6f57 3567 raise Exception("Invalid WPS.Start() accepted")
bab493b9 3568 except dbus.exceptions.DBusException as e:
795b6f57 3569 if "InvalidArgs" not in str(e):
035efb2c 3570 self.exceptions = True
795b6f57 3571 raise Exception("Unexpected error message: " + str(e))
2ec82e67
JM
3572 params = { 'Role': 'registrar',
3573 'P2PDeviceAddress': self.peer['DeviceAddress'],
2ec82e67
JM
3574 'Type': 'pin',
3575 'Pin': '12345670' }
3576 logger.info("Authorize peer to connect to the group")
3577 wps.Start(params)
3578
3579 def staAuthorized(self, name):
3580 logger.debug("staAuthorized: " + name)
3581 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, self.peer_path)
3582 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3583 dbus_interface=dbus.PROPERTIES_IFACE,
3584 byte_arrays=True)
035efb2c 3585 logger.debug("Peer properties: " + str(res))
2ec82e67 3586 if 'Groups' not in res or len(res['Groups']) != 1:
035efb2c 3587 self.exceptions = True
2ec82e67
JM
3588 raise Exception("Unexpected number of peer Groups entries")
3589 if res['Groups'][0] != self.group:
035efb2c 3590 self.exceptions = True
2ec82e67
JM
3591 raise Exception("Unexpected peer Groups[0] value")
3592
3593 g_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group)
3594 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3595 dbus_interface=dbus.PROPERTIES_IFACE,
3596 byte_arrays=True)
3597 logger.debug("Group properties: " + str(res))
3598 if 'Members' not in res or len(res['Members']) != 1:
035efb2c 3599 self.exceptions = True
2ec82e67
JM
3600 raise Exception("Unexpected number of group members")
3601
15dfcb69 3602 ext = dbus.ByteArray(b"\x11\x22\x33\x44")
2ec82e67
JM
3603 # Earlier implementation of this interface was a bit strange. The
3604 # property is defined to have aay signature and that is what the
3605 # getter returned. However, the setter expected there to be a
3606 # dictionary with 'WPSVendorExtensions' as the key surrounding these
3607 # values.. The current implementations maintains support for that
3608 # for backwards compability reasons. Verify that encoding first.
3609 vals = dbus.Dictionary({ 'WPSVendorExtensions': [ ext ]},
3610 signature='sv')
3611 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3612 dbus_interface=dbus.PROPERTIES_IFACE)
3613 res = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions',
3614 dbus_interface=dbus.PROPERTIES_IFACE,
3615 byte_arrays=True)
3616 if len(res) != 1:
035efb2c 3617 self.exceptions = True
2ec82e67
JM
3618 raise Exception("Unexpected number of vendor extensions")
3619 if res[0] != ext:
035efb2c 3620 self.exceptions = True
2ec82e67
JM
3621 raise Exception("Vendor extension value changed")
3622
3623 # And now verify that the more appropriate encoding is accepted as
3624 # well.
15dfcb69 3625 res.append(dbus.ByteArray(b'\xaa\xbb\xcc\xdd\xee\xff'))
2ec82e67
JM
3626 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3627 dbus_interface=dbus.PROPERTIES_IFACE)
3628 res2 = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions',
3629 dbus_interface=dbus.PROPERTIES_IFACE,
3630 byte_arrays=True)
3631 if len(res) != 2:
035efb2c 3632 self.exceptions = True
2ec82e67
JM
3633 raise Exception("Unexpected number of vendor extensions")
3634 if res[0] != res2[0] or res[1] != res2[1]:
035efb2c 3635 self.exceptions = True
2ec82e67
JM
3636 raise Exception("Vendor extension value changed")
3637
3638 for i in range(10):
15dfcb69 3639 res.append(dbus.ByteArray(b'\xaa\xbb'))
2ec82e67
JM
3640 try:
3641 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3642 dbus_interface=dbus.PROPERTIES_IFACE)
035efb2c 3643 self.exceptions = True
2ec82e67 3644 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
bab493b9 3645 except dbus.exceptions.DBusException as e:
2ec82e67 3646 if "Error.Failed" not in str(e):
035efb2c 3647 self.exceptions = True
2ec82e67
JM
3648 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3649
3650 vals = dbus.Dictionary({ 'Foo': [ ext ]}, signature='sv')
3651 try:
3652 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3653 dbus_interface=dbus.PROPERTIES_IFACE)
035efb2c 3654 self.exceptions = True
2ec82e67 3655 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
bab493b9 3656 except dbus.exceptions.DBusException as e:
2ec82e67 3657 if "InvalidArgs" not in str(e):
035efb2c 3658 self.exceptions = True
2ec82e67
JM
3659 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3660
3661 vals = [ "foo" ]
3662 try:
3663 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3664 dbus_interface=dbus.PROPERTIES_IFACE)
035efb2c 3665 self.exceptions = True
2ec82e67 3666 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
bab493b9 3667 except dbus.exceptions.DBusException as e:
2ec82e67 3668 if "Error.Failed" not in str(e):
035efb2c 3669 self.exceptions = True
2ec82e67
JM
3670 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3671
3672 vals = [ [ "foo" ] ]
3673 try:
3674 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3675 dbus_interface=dbus.PROPERTIES_IFACE)
035efb2c 3676 self.exceptions = True
2ec82e67 3677 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
bab493b9 3678 except dbus.exceptions.DBusException as e:
2ec82e67 3679 if "Error.Failed" not in str(e):
035efb2c 3680 self.exceptions = True
2ec82e67
JM
3681 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3682
001c4bf5
JM
3683 p2p.RemoveClient({ 'peer': self.peer_path })
3684
2ec82e67 3685 self.waiting_end = True
cb346b49
JM
3686 group_p2p = dbus.Interface(self.g_if_obj,
3687 WPAS_DBUS_IFACE_P2PDEVICE)
3688 group_p2p.Disconnect()
2ec82e67 3689
001c4bf5
JM
3690 def staDeauthorized(self, name):
3691 logger.debug("staDeauthorized: " + name)
3692 self.deauthorized = True
3693
2ec82e67
JM
3694 def run_test(self, *args):
3695 logger.debug("run_test")
3696 params = dbus.Dictionary({'persistent': True,
3697 'frequency': 2412})
3698 logger.info("Add a persistent group")
3699 p2p.GroupAdd(params)
3700 return False
3701
3702 def success(self):
035efb2c 3703 return self.done and self.deauthorized and not self.exceptions
2ec82e67
JM
3704
3705 with TestDbusP2p(bus) as t:
3706 if not t.success():
3707 raise Exception("Expected signals not seen")
3708
3709 dev[1].wait_go_ending_session()
3710
3711def test_dbus_p2p_autogo_pbc(dev, apdev):
3712 """D-Bus P2P autonomous GO and PBC"""
910eb5b5 3713 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67 3714 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2ec82e67
JM
3715
3716 addr0 = dev[0].p2p_dev_addr()
3717
3718 class TestDbusP2p(TestDbus):
3719 def __init__(self, bus):
3720 TestDbus.__init__(self, bus)
3721 self.first = True
3722 self.waiting_end = False
3723 self.done = False
3724
3725 def __enter__(self):
3726 gobject.timeout_add(1, self.run_test)
3727 gobject.timeout_add(15000, self.timeout)
3728 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3729 "DeviceFound")
3730 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3731 "GroupStarted")
3732 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3733 "GroupFinished")
3734 self.add_signal(self.provisionDiscoveryPBCRequest,
3735 WPAS_DBUS_IFACE_P2PDEVICE,
3736 "ProvisionDiscoveryPBCRequest")
3737 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3738 "StaAuthorized")
3739 self.loop.run()
3740 return self
3741
3742 def groupStarted(self, properties):
3743 logger.debug("groupStarted: " + str(properties))
3744 self.group = properties['group_object']
cb346b49
JM
3745 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3746 properties['interface_object'])
2ec82e67
JM
3747 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3748 dev1.global_request("P2P_CONNECT " + addr0 + " pbc join")
3749
3750 def groupFinished(self, properties):
3751 logger.debug("groupFinished: " + str(properties))
3752 self.done = True
3753 self.loop.quit()
3754
3755 def deviceFound(self, path):
3756 logger.debug("deviceFound: path=%s" % path)
3757 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3758 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3759 dbus_interface=dbus.PROPERTIES_IFACE,
3760 byte_arrays=True)
3761 logger.debug('peer properties: ' + str(self.peer))
3762
3763 def provisionDiscoveryPBCRequest(self, peer_object):
3764 logger.debug("provisionDiscoveryPBCRequest - peer=%s" % peer_object)
3765 self.peer_path = peer_object
3766 peer = binascii.unhexlify(peer_object.split('/')[-1])
c37ef330 3767 addr = ':'.join([ "%02x" % i for i in struct.unpack('6B', peer) ])
2ec82e67
JM
3768 params = { 'Role': 'registrar',
3769 'P2PDeviceAddress': self.peer['DeviceAddress'],
2ec82e67
JM
3770 'Type': 'pbc' }
3771 logger.info("Authorize peer to connect to the group")
cb346b49 3772 wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
2ec82e67
JM
3773 wps.Start(params)
3774
3775 def staAuthorized(self, name):
3776 logger.debug("staAuthorized: " + name)
cb346b49
JM
3777 group_p2p = dbus.Interface(self.g_if_obj,
3778 WPAS_DBUS_IFACE_P2PDEVICE)
3779 group_p2p.Disconnect()
2ec82e67
JM
3780
3781 def run_test(self, *args):
3782 logger.debug("run_test")
3783 params = dbus.Dictionary({'frequency': 2412})
3784 p2p.GroupAdd(params)
3785 return False
3786
3787 def success(self):
3788 return self.done
3789
3790 with TestDbusP2p(bus) as t:
3791 if not t.success():
3792 raise Exception("Expected signals not seen")
3793
3794 dev[1].wait_go_ending_session()
3795 dev[1].flush_scan_cache()
3796
3797def test_dbus_p2p_autogo_legacy(dev, apdev):
3798 """D-Bus P2P autonomous GO and legacy STA"""
910eb5b5 3799 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67 3800 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2ec82e67
JM
3801
3802 addr0 = dev[0].p2p_dev_addr()
3803
3804 class TestDbusP2p(TestDbus):
3805 def __init__(self, bus):
3806 TestDbus.__init__(self, bus)
3807 self.done = False
3808
3809 def __enter__(self):
3810 gobject.timeout_add(1, self.run_test)
3811 gobject.timeout_add(15000, self.timeout)
3812 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3813 "GroupStarted")
3814 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3815 "GroupFinished")
3816 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3817 "StaAuthorized")
3818 self.loop.run()
3819 return self
3820
3821 def groupStarted(self, properties):
3822 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
3823 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
3824 properties['group_object'])
3825 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3826 dbus_interface=dbus.PROPERTIES_IFACE,
3827 byte_arrays=True)
c37ef330 3828 bssid = ':'.join([ "%02x" % i for i in struct.unpack('6B', res['BSSID']) ])
cb346b49 3829
2ec82e67
JM
3830 pin = '12345670'
3831 params = { 'Role': 'enrollee',
3832 'Type': 'pin',
3833 'Pin': pin }
cb346b49
JM
3834 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3835 properties['interface_object'])
3836 wps = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_WPS)
2ec82e67
JM
3837 wps.Start(params)
3838 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49
JM
3839 dev1.scan_for_bss(bssid, freq=2412)
3840 dev1.request("WPS_PIN " + bssid + " " + pin)
3841 self.group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2ec82e67
JM
3842
3843 def groupFinished(self, properties):
3844 logger.debug("groupFinished: " + str(properties))
3845 self.done = True
3846 self.loop.quit()
3847
3848 def staAuthorized(self, name):
3849 logger.debug("staAuthorized: " + name)
3850 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3851 dev1.request("DISCONNECT")
cb346b49 3852 self.group_p2p.Disconnect()
2ec82e67
JM
3853
3854 def run_test(self, *args):
3855 logger.debug("run_test")
3856 params = dbus.Dictionary({'frequency': 2412})
3857 p2p.GroupAdd(params)
3858 return False
3859
3860 def success(self):
3861 return self.done
3862
3863 with TestDbusP2p(bus) as t:
3864 if not t.success():
3865 raise Exception("Expected signals not seen")
3866
3867def test_dbus_p2p_join(dev, apdev):
3868 """D-Bus P2P join an autonomous GO"""
910eb5b5 3869 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3870 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3871
3872 addr1 = dev[1].p2p_dev_addr()
3873 addr2 = dev[2].p2p_dev_addr()
3874 dev[1].p2p_start_go(freq=2412)
cb346b49 3875 dev1_group_ifname = dev[1].group_ifname
2ec82e67
JM
3876 dev[2].p2p_listen()
3877
3878 class TestDbusP2p(TestDbus):
3879 def __init__(self, bus):
3880 TestDbus.__init__(self, bus)
3881 self.done = False
3882 self.peer = None
3883 self.go = None
3884
3885 def __enter__(self):
3886 gobject.timeout_add(1, self.run_test)
3887 gobject.timeout_add(15000, self.timeout)
3888 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3889 "DeviceFound")
3890 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3891 "GroupStarted")
3892 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3893 "GroupFinished")
3894 self.add_signal(self.invitationResult, WPAS_DBUS_IFACE_P2PDEVICE,
3895 "InvitationResult")
3896 self.loop.run()
3897 return self
3898
3899 def deviceFound(self, path):
3900 logger.debug("deviceFound: path=%s" % path)
3901 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3902 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3903 dbus_interface=dbus.PROPERTIES_IFACE,
3904 byte_arrays=True)
3905 logger.debug('peer properties: ' + str(res))
3906 if addr2.replace(':','') in path:
3907 self.peer = path
3908 elif addr1.replace(':','') in path:
3909 self.go = path
3910 if self.peer and self.go:
3911 logger.info("Join the group")
3912 p2p.StopFind()
3913 args = { 'peer': self.go,
3914 'join': True,
3915 'wps_method': 'pin',
3916 'frequency': 2412 }
3917 pin = p2p.Connect(args)
3918
3919 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49
JM
3920 dev1.group_ifname = dev1_group_ifname
3921 dev1.group_request("WPS_PIN any " + pin)
2ec82e67
JM
3922
3923 def groupStarted(self, properties):
3924 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
3925 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3926 properties['interface_object'])
3927 role = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
3928 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67
JM
3929 if role != "client":
3930 raise Exception("Unexpected role reported: " + role)
cb346b49
JM
3931 group = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
3932 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67
JM
3933 if group != properties['group_object']:
3934 raise Exception("Unexpected Group reported: " + str(group))
cb346b49
JM
3935 go = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
3936 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67
JM
3937 if go != self.go:
3938 raise Exception("Unexpected PeerGO value: " + str(go))
3939
3940 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
3941 properties['group_object'])
3942 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3943 dbus_interface=dbus.PROPERTIES_IFACE,
3944 byte_arrays=True)
3945 logger.debug("Group properties: " + str(res))
3946
15dfcb69 3947 ext = dbus.ByteArray(b"\x11\x22\x33\x44")
2ec82e67
JM
3948 try:
3949 # Set(WPSVendorExtensions) not allowed for P2P Client
3950 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3951 dbus_interface=dbus.PROPERTIES_IFACE)
3952 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
bab493b9 3953 except dbus.exceptions.DBusException as e:
2ec82e67
JM
3954 if "Error.Failed: Failed to set property" not in str(e):
3955 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3956
cb346b49 3957 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2ec82e67
JM
3958 args = { 'duration1': 30000, 'interval1': 102400,
3959 'duration2': 20000, 'interval2': 102400 }
cb346b49 3960 group_p2p.PresenceRequest(args)
2ec82e67
JM
3961
3962 args = { 'peer': self.peer }
cb346b49 3963 group_p2p.Invite(args)
2ec82e67
JM
3964
3965 def groupFinished(self, properties):
3966 logger.debug("groupFinished: " + str(properties))
3967 self.done = True
3968 self.loop.quit()
3969
3970 def invitationResult(self, result):
3971 logger.debug("invitationResult: " + str(result))
3972 if result['status'] != 1:
3973 raise Exception("Unexpected invitation result: " + str(result))
3974 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49 3975 dev1.group_ifname = dev1_group_ifname
2ec82e67
JM
3976 dev1.remove_group()
3977
3978 def run_test(self, *args):
3979 logger.debug("run_test")
3980 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
3981 return False
3982
3983 def success(self):
3984 return self.done
3985
3986 with TestDbusP2p(bus) as t:
3987 if not t.success():
3988 raise Exception("Expected signals not seen")
3989
3990 dev[2].p2p_stop_find()
3991
1992af11
JM
3992def test_dbus_p2p_invitation_received(dev, apdev):
3993 """D-Bus P2P and InvitationReceived"""
3994 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3995 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3996
3997 form(dev[0], dev[1])
3998 addr0 = dev[0].p2p_dev_addr()
3999 dev[0].p2p_listen()
4000 dev[0].global_request("SET persistent_reconnect 0")
4001
4002 if not dev[1].discover_peer(addr0, social=True):
4003 raise Exception("Peer " + addr0 + " not found")
4004 peer = dev[1].get_peer(addr0)
4005
4006 class TestDbusP2p(TestDbus):
4007 def __init__(self, bus):
4008 TestDbus.__init__(self, bus)
4009 self.done = False
4010
4011 def __enter__(self):
4012 gobject.timeout_add(1, self.run_test)
4013 gobject.timeout_add(15000, self.timeout)
4014 self.add_signal(self.invitationReceived, WPAS_DBUS_IFACE_P2PDEVICE,
4015 "InvitationReceived")
4016 self.loop.run()
4017 return self
4018
4019 def invitationReceived(self, result):
4020 logger.debug("invitationReceived: " + str(result))
4021 self.done = True
4022 self.loop.quit()
4023
4024 def run_test(self, *args):
4025 logger.debug("run_test")
4026 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4027 cmd = "P2P_INVITE persistent=" + peer['persistent'] + " peer=" + addr0
4028 dev1.global_request(cmd)
4029 return False
4030
4031 def success(self):
4032 return self.done
4033
4034 with TestDbusP2p(bus) as t:
4035 if not t.success():
4036 raise Exception("Expected signals not seen")
4037
4038 dev[0].p2p_stop_find()
4039 dev[1].p2p_stop_find()
4040
2ec82e67
JM
4041def test_dbus_p2p_config(dev, apdev):
4042 """D-Bus Get/Set P2PDeviceConfig"""
4043 try:
4044 _test_dbus_p2p_config(dev, apdev)
4045 finally:
4046 dev[0].request("P2P_SET ssid_postfix ")
4047
4048def _test_dbus_p2p_config(dev, apdev):
910eb5b5 4049 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
4050 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4051
4052 res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4053 dbus_interface=dbus.PROPERTIES_IFACE,
4054 byte_arrays=True)
4055 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", res,
4056 dbus_interface=dbus.PROPERTIES_IFACE)
4057 res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4058 dbus_interface=dbus.PROPERTIES_IFACE,
4059 byte_arrays=True)
4060
4061 if len(res) != len(res2):
4062 raise Exception("Different number of parameters")
4063 for k in res:
4064 if res[k] != res2[k]:
4065 raise Exception("Parameter %s value changes" % k)
4066
4067 changes = { 'SsidPostfix': 'foo',
15dfcb69
MH
4068 'VendorExtension': [ dbus.ByteArray(b'\x11\x22\x33\x44') ],
4069 'SecondaryDeviceTypes': [ dbus.ByteArray(b'\x11\x22\x33\x44\x55\x66\x77\x88') ]}
2ec82e67
JM
4070 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4071 dbus.Dictionary(changes, signature='sv'),
4072 dbus_interface=dbus.PROPERTIES_IFACE)
4073
4074 res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4075 dbus_interface=dbus.PROPERTIES_IFACE,
4076 byte_arrays=True)
4077 logger.debug("P2PDeviceConfig: " + str(res2))
4078 if 'VendorExtension' not in res2 or len(res2['VendorExtension']) != 1:
4079 raise Exception("VendorExtension does not match")
4080 if 'SecondaryDeviceTypes' not in res2 or len(res2['SecondaryDeviceTypes']) != 1:
4081 raise Exception("SecondaryDeviceType does not match")
4082
4083 changes = { 'SsidPostfix': '',
4084 'VendorExtension': dbus.Array([], signature="ay"),
4085 'SecondaryDeviceTypes': dbus.Array([], signature="ay") }
4086 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4087 dbus.Dictionary(changes, signature='sv'),
4088 dbus_interface=dbus.PROPERTIES_IFACE)
4089
4090 res3 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4091 dbus_interface=dbus.PROPERTIES_IFACE,
4092 byte_arrays=True)
4093 logger.debug("P2PDeviceConfig: " + str(res3))
4094 if 'VendorExtension' in res3:
4095 raise Exception("VendorExtension not removed")
4096 if 'SecondaryDeviceTypes' in res3:
4097 raise Exception("SecondaryDeviceType not removed")
4098
4099 try:
4100 dev[0].request("P2P_SET disabled 1")
4101 if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4102 dbus_interface=dbus.PROPERTIES_IFACE,
4103 byte_arrays=True)
4104 raise Exception("Invalid Get(P2PDeviceConfig) accepted")
bab493b9 4105 except dbus.exceptions.DBusException as e:
2ec82e67
JM
4106 if "Error.Failed: P2P is not available for this interface" not in str(e):
4107 raise Exception("Unexpected error message for invalid Invite: " + str(e))
4108 finally:
4109 dev[0].request("P2P_SET disabled 0")
4110
4111 try:
4112 dev[0].request("P2P_SET disabled 1")
4113 changes = { 'SsidPostfix': 'foo' }
4114 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4115 dbus.Dictionary(changes, signature='sv'),
4116 dbus_interface=dbus.PROPERTIES_IFACE)
4117 raise Exception("Invalid Set(P2PDeviceConfig) accepted")
bab493b9 4118 except dbus.exceptions.DBusException as e:
2ec82e67
JM
4119 if "Error.Failed: P2P is not available for this interface" not in str(e):
4120 raise Exception("Unexpected error message for invalid Invite: " + str(e))
4121 finally:
4122 dev[0].request("P2P_SET disabled 0")
4123
4124 tests = [ { 'DeviceName': 123 },
4125 { 'SsidPostfix': 123 },
4126 { 'Foo': 'Bar' } ]
4127 for changes in tests:
4128 try:
4129 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4130 dbus.Dictionary(changes, signature='sv'),
4131 dbus_interface=dbus.PROPERTIES_IFACE)
4132 raise Exception("Invalid Set(P2PDeviceConfig) accepted")
bab493b9 4133 except dbus.exceptions.DBusException as e:
2ec82e67
JM
4134 if "InvalidArgs" not in str(e):
4135 raise Exception("Unexpected error message for invalid Invite: " + str(e))
4136
4137def test_dbus_p2p_persistent(dev, apdev):
4138 """D-Bus P2P persistent group"""
910eb5b5 4139 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
4140 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4141
4142 class TestDbusP2p(TestDbus):
4143 def __init__(self, bus):
4144 TestDbus.__init__(self, bus)
4145
4146 def __enter__(self):
4147 gobject.timeout_add(1, self.run_test)
4148 gobject.timeout_add(15000, self.timeout)
4149 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4150 "GroupStarted")
4151 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4152 "GroupFinished")
4153 self.add_signal(self.persistentGroupAdded,
4154 WPAS_DBUS_IFACE_P2PDEVICE,
4155 "PersistentGroupAdded")
4156 self.loop.run()
4157 return self
4158
4159 def groupStarted(self, properties):
4160 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
4161 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4162 properties['interface_object'])
4163 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4164 group_p2p.Disconnect()
2ec82e67
JM
4165
4166 def groupFinished(self, properties):
4167 logger.debug("groupFinished: " + str(properties))
4168 self.loop.quit()
4169
4170 def persistentGroupAdded(self, path, properties):
4171 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
4172 self.persistent = path
4173
4174 def run_test(self, *args):
4175 logger.debug("run_test")
4176 params = dbus.Dictionary({'persistent': True,
4177 'frequency': 2412})
4178 logger.info("Add a persistent group")
4179 p2p.GroupAdd(params)
4180 return False
4181
4182 def success(self):
4183 return True
4184
4185 with TestDbusP2p(bus) as t:
4186 if not t.success():
4187 raise Exception("Expected signals not seen")
4188 persistent = t.persistent
4189
4190 p_obj = bus.get_object(WPAS_DBUS_SERVICE, persistent)
4191 res = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties",
4192 dbus_interface=dbus.PROPERTIES_IFACE, byte_arrays=True)
4193 logger.info("Persistent group Properties: " + str(res))
4194 vals = dbus.Dictionary({ 'ssid': 'DIRECT-foo' }, signature='sv')
4195 p_obj.Set(WPAS_DBUS_PERSISTENT_GROUP, "Properties", vals,
4196 dbus_interface=dbus.PROPERTIES_IFACE)
4197 res2 = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties",
4198 dbus_interface=dbus.PROPERTIES_IFACE)
4199 if len(res) != len(res2):
4200 raise Exception("Different number of parameters")
4201 for k in res:
4202 if k != 'ssid' and res[k] != res2[k]:
4203 raise Exception("Parameter %s value changes" % k)
4204 if res2['ssid'] != '"DIRECT-foo"':
4205 raise Exception("Unexpected ssid")
4206
4207 args = dbus.Dictionary({ 'ssid': 'DIRECT-testing',
4208 'psk': '1234567890' }, signature='sv')
4209 group = p2p.AddPersistentGroup(args)
4210
4211 groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups",
4212 dbus_interface=dbus.PROPERTIES_IFACE)
4213 if len(groups) != 2:
4214 raise Exception("Unexpected number of persistent groups: " + str(groups))
4215
4216 p2p.RemoveAllPersistentGroups()
4217
4218 groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups",
4219 dbus_interface=dbus.PROPERTIES_IFACE)
4220 if len(groups) != 0:
4221 raise Exception("Unexpected number of persistent groups: " + str(groups))
4222
4223 try:
4224 p2p.RemovePersistentGroup(persistent)
4225 raise Exception("Invalid RemovePersistentGroup accepted")
bab493b9 4226 except dbus.exceptions.DBusException as e:
2ec82e67
JM
4227 if "NetworkUnknown: There is no such persistent group" not in str(e):
4228 raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e))
4229
4230def test_dbus_p2p_reinvoke_persistent(dev, apdev):
4231 """D-Bus P2P reinvoke persistent group"""
910eb5b5 4232 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67 4233 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2ec82e67
JM
4234
4235 addr0 = dev[0].p2p_dev_addr()
4236
4237 class TestDbusP2p(TestDbus):
4238 def __init__(self, bus):
4239 TestDbus.__init__(self, bus)
4240 self.first = True
4241 self.waiting_end = False
4242 self.done = False
4243 self.invited = False
4244
4245 def __enter__(self):
4246 gobject.timeout_add(1, self.run_test)
4247 gobject.timeout_add(15000, self.timeout)
4248 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4249 "DeviceFound")
4250 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4251 "GroupStarted")
4252 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4253 "GroupFinished")
4254 self.add_signal(self.persistentGroupAdded,
4255 WPAS_DBUS_IFACE_P2PDEVICE,
4256 "PersistentGroupAdded")
4257 self.add_signal(self.provisionDiscoveryRequestDisplayPin,
4258 WPAS_DBUS_IFACE_P2PDEVICE,
4259 "ProvisionDiscoveryRequestDisplayPin")
4260 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
4261 "StaAuthorized")
4262 self.loop.run()
4263 return self
4264
4265 def groupStarted(self, properties):
4266 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
4267 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4268 properties['interface_object'])
2ec82e67 4269 if not self.invited:
cb346b49
JM
4270 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
4271 properties['group_object'])
4272 res = g_obj.GetAll(WPAS_DBUS_GROUP,
4273 dbus_interface=dbus.PROPERTIES_IFACE,
4274 byte_arrays=True)
c37ef330 4275 bssid = ':'.join([ "%02x" % i for i in struct.unpack('6B', res['BSSID']) ])
2ec82e67 4276 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49 4277 dev1.scan_for_bss(bssid, freq=2412)
2ec82e67
JM
4278 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join")
4279
4280 def groupFinished(self, properties):
4281 logger.debug("groupFinished: " + str(properties))
4282 if self.invited:
4283 self.done = True
4284 self.loop.quit()
4285 else:
4286 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49 4287 dev1.global_request("SET persistent_reconnect 1")
2ec82e67
JM
4288 dev1.p2p_listen()
4289
4290 args = { 'persistent_group_object': dbus.ObjectPath(path),
4291 'peer': self.peer_path }
4292 try:
4293 pin = p2p.Invite(args)
4294 raise Exception("Invalid Invite accepted")
bab493b9 4295 except dbus.exceptions.DBusException as e:
2ec82e67
JM
4296 if "InvalidArgs" not in str(e):
4297 raise Exception("Unexpected error message for invalid Invite: " + str(e))
4298
4299 args = { 'persistent_group_object': self.persistent,
4300 'peer': self.peer_path }
4301 pin = p2p.Invite(args)
4302 self.invited = True
4303
cb346b49
JM
4304 self.sta_group_ev = dev1.wait_global_event(["P2P-GROUP-STARTED"],
4305 timeout=15)
4306 if self.sta_group_ev is None:
4307 raise Exception("P2P-GROUP-STARTED event not seen")
4308
2ec82e67
JM
4309 def persistentGroupAdded(self, path, properties):
4310 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
4311 self.persistent = path
4312
4313 def deviceFound(self, path):
4314 logger.debug("deviceFound: path=%s" % path)
4315 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
4316 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
4317 dbus_interface=dbus.PROPERTIES_IFACE,
4318 byte_arrays=True)
4319
4320 def provisionDiscoveryRequestDisplayPin(self, peer_object, pin):
4321 logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin))
4322 self.peer_path = peer_object
4323 peer = binascii.unhexlify(peer_object.split('/')[-1])
c37ef330 4324 addr = ':'.join([ "%02x" % i for i in struct.unpack('6B', peer) ])
2ec82e67
JM
4325 params = { 'Role': 'registrar',
4326 'P2PDeviceAddress': self.peer['DeviceAddress'],
4327 'Bssid': self.peer['DeviceAddress'],
4328 'Type': 'pin',
4329 'Pin': '12345670' }
4330 logger.info("Authorize peer to connect to the group")
cb346b49
JM
4331 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4332 wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
2ec82e67 4333 wps.Start(params)
cb346b49
JM
4334 self.sta_group_ev = dev1.wait_global_event(["P2P-GROUP-STARTED"],
4335 timeout=15)
4336 if self.sta_group_ev is None:
4337 raise Exception("P2P-GROUP-STARTED event not seen")
2ec82e67
JM
4338
4339 def staAuthorized(self, name):
4340 logger.debug("staAuthorized: " + name)
4341 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49 4342 dev1.group_form_result(self.sta_group_ev)
2ec82e67 4343 dev1.remove_group()
cb346b49 4344 ev = dev1.wait_global_event(["P2P-GROUP-REMOVED"], timeout=10)
2ec82e67
JM
4345 if ev is None:
4346 raise Exception("Group removal timed out")
cb346b49
JM
4347 group_p2p = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4348 group_p2p.Disconnect()
2ec82e67
JM
4349
4350 def run_test(self, *args):
4351 logger.debug("run_test")
4352 params = dbus.Dictionary({'persistent': True,
4353 'frequency': 2412})
4354 logger.info("Add a persistent group")
4355 p2p.GroupAdd(params)
4356 return False
4357
4358 def success(self):
4359 return self.done
4360
4361 with TestDbusP2p(bus) as t:
4362 if not t.success():
4363 raise Exception("Expected signals not seen")
4364
4365def test_dbus_p2p_go_neg_rx(dev, apdev):
4366 """D-Bus P2P GO Negotiation receive"""
910eb5b5 4367 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
4368 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4369 addr0 = dev[0].p2p_dev_addr()
4370
4371 class TestDbusP2p(TestDbus):
4372 def __init__(self, bus):
4373 TestDbus.__init__(self, bus)
4374 self.done = False
4375
4376 def __enter__(self):
4377 gobject.timeout_add(1, self.run_test)
4378 gobject.timeout_add(15000, self.timeout)
4379 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4380 "DeviceFound")
4381 self.add_signal(self.goNegotiationRequest,
4382 WPAS_DBUS_IFACE_P2PDEVICE,
4383 "GONegotiationRequest",
4384 byte_arrays=True)
4385 self.add_signal(self.goNegotiationSuccess,
4386 WPAS_DBUS_IFACE_P2PDEVICE,
4387 "GONegotiationSuccess",
4388 byte_arrays=True)
4389 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4390 "GroupStarted")
4391 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4392 "GroupFinished")
4393 self.loop.run()
4394 return self
4395
4396 def deviceFound(self, path):
4397 logger.debug("deviceFound: path=%s" % path)
4398
f5d5161d
JM
4399 def goNegotiationRequest(self, path, dev_passwd_id, go_intent=0):
4400 logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path, dev_passwd_id, go_intent))
2ec82e67
JM
4401 if dev_passwd_id != 1:
4402 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id)
4403 args = { 'peer': path, 'wps_method': 'display', 'pin': '12345670',
4404 'go_intent': 15, 'persistent': False, 'frequency': 5175 }
4405 try:
4406 p2p.Connect(args)
4407 raise Exception("Invalid Connect accepted")
bab493b9 4408 except dbus.exceptions.DBusException as e:
2ec82e67
JM
4409 if "ConnectChannelUnsupported" not in str(e):
4410 raise Exception("Unexpected error message for invalid Connect: " + str(e))
4411
4412 args = { 'peer': path, 'wps_method': 'display', 'pin': '12345670',
4413 'go_intent': 15, 'persistent': False }
4414 p2p.Connect(args)
4415
4416 def goNegotiationSuccess(self, properties):
4417 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4418
4419 def groupStarted(self, properties):
4420 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
4421 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4422 properties['interface_object'])
4423 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4424 group_p2p.Disconnect()
2ec82e67
JM
4425
4426 def groupFinished(self, properties):
4427 logger.debug("groupFinished: " + str(properties))
4428 self.done = True
4429 self.loop.quit()
4430
4431 def run_test(self, *args):
4432 logger.debug("run_test")
4433 p2p.Listen(10)
4434 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4435 if not dev1.discover_peer(addr0):
4436 raise Exception("Peer not found")
4437 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 enter")
4438 return False
4439
4440 def success(self):
4441 return self.done
4442
4443 with TestDbusP2p(bus) as t:
4444 if not t.success():
4445 raise Exception("Expected signals not seen")
4446
4447def test_dbus_p2p_go_neg_auth(dev, apdev):
4448 """D-Bus P2P GO Negotiation authorized"""
910eb5b5 4449 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
4450 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4451 addr0 = dev[0].p2p_dev_addr()
4452 dev[1].p2p_listen()
4453
4454 class TestDbusP2p(TestDbus):
4455 def __init__(self, bus):
4456 TestDbus.__init__(self, bus)
4457 self.done = False
4458 self.peer_joined = False
4459 self.peer_disconnected = False
4460
4461 def __enter__(self):
4462 gobject.timeout_add(1, self.run_test)
4463 gobject.timeout_add(15000, self.timeout)
4464 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4465 "DeviceFound")
4466 self.add_signal(self.goNegotiationSuccess,
4467 WPAS_DBUS_IFACE_P2PDEVICE,
4468 "GONegotiationSuccess",
4469 byte_arrays=True)
4470 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4471 "GroupStarted")
4472 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4473 "GroupFinished")
4474 self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE,
4475 "StaDeauthorized")
4476 self.add_signal(self.peerJoined, WPAS_DBUS_GROUP,
4477 "PeerJoined")
4478 self.add_signal(self.peerDisconnected, WPAS_DBUS_GROUP,
4479 "PeerDisconnected")
4480 self.loop.run()
4481 return self
4482
4483 def deviceFound(self, path):
4484 logger.debug("deviceFound: path=%s" % path)
4485 args = { 'peer': path, 'wps_method': 'keypad',
4486 'go_intent': 15, 'authorize_only': True }
4487 try:
4488 p2p.Connect(args)
4489 raise Exception("Invalid Connect accepted")
bab493b9 4490 except dbus.exceptions.DBusException as e:
2ec82e67
JM
4491 if "InvalidArgs" not in str(e):
4492 raise Exception("Unexpected error message for invalid Connect: " + str(e))
4493
4494 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4495 'go_intent': 15, 'authorize_only': True }
4496 p2p.Connect(args)
4497 p2p.Listen(10)
4498 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4499 if not dev1.discover_peer(addr0):
4500 raise Exception("Peer not found")
4501 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=0")
bc6e3288 4502 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
2ec82e67
JM
4503 if ev is None:
4504 raise Exception("Group formation timed out")
cb346b49 4505 self.sta_group_ev = ev
2ec82e67
JM
4506
4507 def goNegotiationSuccess(self, properties):
4508 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4509
4510 def groupStarted(self, properties):
4511 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
4512 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4513 properties['interface_object'])
2ec82e67 4514 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49 4515 dev1.group_form_result(self.sta_group_ev)
2ec82e67
JM
4516 dev1.remove_group()
4517
4518 def staDeauthorized(self, name):
4519 logger.debug("staDeuthorized: " + name)
cb346b49
JM
4520 group_p2p = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4521 group_p2p.Disconnect()
2ec82e67
JM
4522
4523 def peerJoined(self, peer):
4524 logger.debug("peerJoined: " + peer)
4525 self.peer_joined = True
4526
4527 def peerDisconnected(self, peer):
4528 logger.debug("peerDisconnected: " + peer)
4529 self.peer_disconnected = True
4530
4531 def groupFinished(self, properties):
4532 logger.debug("groupFinished: " + str(properties))
4533 self.done = True
4534 self.loop.quit()
4535
4536 def run_test(self, *args):
4537 logger.debug("run_test")
4538 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4539 return False
4540
4541 def success(self):
4542 return self.done and self.peer_joined and self.peer_disconnected
4543
4544 with TestDbusP2p(bus) as t:
4545 if not t.success():
4546 raise Exception("Expected signals not seen")
4547
4548def test_dbus_p2p_go_neg_init(dev, apdev):
4549 """D-Bus P2P GO Negotiation initiation"""
910eb5b5 4550 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
4551 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4552 addr0 = dev[0].p2p_dev_addr()
4553 dev[1].p2p_listen()
4554
4555 class TestDbusP2p(TestDbus):
4556 def __init__(self, bus):
4557 TestDbus.__init__(self, bus)
4558 self.done = False
20fd8def
JM
4559 self.peer_group_added = False
4560 self.peer_group_removed = False
2ec82e67
JM
4561
4562 def __enter__(self):
4563 gobject.timeout_add(1, self.run_test)
4564 gobject.timeout_add(15000, self.timeout)
4565 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4566 "DeviceFound")
4567 self.add_signal(self.goNegotiationSuccess,
4568 WPAS_DBUS_IFACE_P2PDEVICE,
4569 "GONegotiationSuccess",
4570 byte_arrays=True)
4571 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4572 "GroupStarted")
4573 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4574 "GroupFinished")
20fd8def
JM
4575 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4576 "PropertiesChanged")
2ec82e67
JM
4577 self.loop.run()
4578 return self
4579
4580 def deviceFound(self, path):
4581 logger.debug("deviceFound: path=%s" % path)
4582 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4583 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4584 'go_intent': 0 }
4585 p2p.Connect(args)
4586
4587 ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4588 if ev is None:
4589 raise Exception("Timeout while waiting for GO Neg Request")
4590 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
bc6e3288 4591 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
2ec82e67
JM
4592 if ev is None:
4593 raise Exception("Group formation timed out")
cb346b49 4594 self.sta_group_ev = ev
2ec82e67
JM
4595
4596 def goNegotiationSuccess(self, properties):
4597 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4598
4599 def groupStarted(self, properties):
4600 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
4601 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4602 properties['interface_object'])
4603 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4604 group_p2p.Disconnect()
2ec82e67 4605 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49 4606 dev1.group_form_result(self.sta_group_ev)
2ec82e67
JM
4607 dev1.remove_group()
4608
4609 def groupFinished(self, properties):
4610 logger.debug("groupFinished: " + str(properties))
4611 self.done = True
20fd8def
JM
4612
4613 def propertiesChanged(self, interface_name, changed_properties,
4614 invalidated_properties):
4615 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4616 if interface_name != WPAS_DBUS_P2P_PEER:
4617 return
4618 if "Groups" not in changed_properties:
4619 return
4620 if len(changed_properties["Groups"]) > 0:
4621 self.peer_group_added = True
4622 if len(changed_properties["Groups"]) == 0:
3301e925
JM
4623 if not self.peer_group_added:
4624 # This is likely a leftover event from an earlier test case,
4625 # ignore it to allow this test case to go through its steps.
4626 logger.info("Ignore propertiesChanged indicating group removal before group has been added")
4627 return
20fd8def
JM
4628 self.peer_group_removed = True
4629 self.loop.quit()
2ec82e67
JM
4630
4631 def run_test(self, *args):
4632 logger.debug("run_test")
4633 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4634 return False
4635
4636 def success(self):
20fd8def
JM
4637 return self.done and self.peer_group_added and self.peer_group_removed
4638
4639 with TestDbusP2p(bus) as t:
4640 if not t.success():
4641 raise Exception("Expected signals not seen")
4642
4643def test_dbus_p2p_group_termination_by_go(dev, apdev):
4644 """D-Bus P2P group removal on GO terminating the group"""
4645 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4646 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4647 addr0 = dev[0].p2p_dev_addr()
4648 dev[1].p2p_listen()
4649
4650 class TestDbusP2p(TestDbus):
4651 def __init__(self, bus):
4652 TestDbus.__init__(self, bus)
4653 self.done = False
4654 self.peer_group_added = False
4655 self.peer_group_removed = False
4656
4657 def __enter__(self):
4658 gobject.timeout_add(1, self.run_test)
4659 gobject.timeout_add(15000, self.timeout)
4660 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4661 "DeviceFound")
4662 self.add_signal(self.goNegotiationSuccess,
4663 WPAS_DBUS_IFACE_P2PDEVICE,
4664 "GONegotiationSuccess",
4665 byte_arrays=True)
4666 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4667 "GroupStarted")
4668 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4669 "GroupFinished")
4670 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4671 "PropertiesChanged")
4672 self.loop.run()
4673 return self
4674
4675 def deviceFound(self, path):
4676 logger.debug("deviceFound: path=%s" % path)
4677 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4678 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4679 'go_intent': 0 }
4680 p2p.Connect(args)
4681
4682 ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4683 if ev is None:
4684 raise Exception("Timeout while waiting for GO Neg Request")
4685 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
bc6e3288 4686 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
20fd8def
JM
4687 if ev is None:
4688 raise Exception("Group formation timed out")
4689 self.sta_group_ev = ev
4690
4691 def goNegotiationSuccess(self, properties):
4692 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4693
4694 def groupStarted(self, properties):
4695 logger.debug("groupStarted: " + str(properties))
4696 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4697 properties['interface_object'])
4698 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4699 dev1.group_form_result(self.sta_group_ev)
4700 dev1.remove_group()
4701
4702 def groupFinished(self, properties):
4703 logger.debug("groupFinished: " + str(properties))
4704 self.done = True
4705
4706 def propertiesChanged(self, interface_name, changed_properties,
4707 invalidated_properties):
4708 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4709 if interface_name != WPAS_DBUS_P2P_PEER:
4710 return
4711 if "Groups" not in changed_properties:
4712 return
4713 if len(changed_properties["Groups"]) > 0:
4714 self.peer_group_added = True
6fad40df 4715 if len(changed_properties["Groups"]) == 0 and self.peer_group_added:
20fd8def
JM
4716 self.peer_group_removed = True
4717 self.loop.quit()
4718
4719 def run_test(self, *args):
4720 logger.debug("run_test")
4721 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4722 return False
4723
4724 def success(self):
4725 return self.done and self.peer_group_added and self.peer_group_removed
4726
4727 with TestDbusP2p(bus) as t:
4728 if not t.success():
4729 raise Exception("Expected signals not seen")
4730
4731def test_dbus_p2p_group_idle_timeout(dev, apdev):
4732 """D-Bus P2P group removal on idle timeout"""
4733 try:
4734 dev[0].global_request("SET p2p_group_idle 1")
4735 _test_dbus_p2p_group_idle_timeout(dev, apdev)
4736 finally:
4737 dev[0].global_request("SET p2p_group_idle 0")
4738
4739def _test_dbus_p2p_group_idle_timeout(dev, apdev):
4740 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4741 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4742 addr0 = dev[0].p2p_dev_addr()
4743 dev[1].p2p_listen()
4744
4745 class TestDbusP2p(TestDbus):
4746 def __init__(self, bus):
4747 TestDbus.__init__(self, bus)
4748 self.done = False
845d48c1 4749 self.group_started = False
20fd8def
JM
4750 self.peer_group_added = False
4751 self.peer_group_removed = False
4752
4753 def __enter__(self):
4754 gobject.timeout_add(1, self.run_test)
4755 gobject.timeout_add(15000, self.timeout)
4756 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4757 "DeviceFound")
4758 self.add_signal(self.goNegotiationSuccess,
4759 WPAS_DBUS_IFACE_P2PDEVICE,
4760 "GONegotiationSuccess",
4761 byte_arrays=True)
4762 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4763 "GroupStarted")
4764 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4765 "GroupFinished")
4766 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4767 "PropertiesChanged")
4768 self.loop.run()
4769 return self
4770
4771 def deviceFound(self, path):
4772 logger.debug("deviceFound: path=%s" % path)
4773 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4774 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4775 'go_intent': 0 }
4776 p2p.Connect(args)
4777
4778 ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4779 if ev is None:
4780 raise Exception("Timeout while waiting for GO Neg Request")
4781 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
bc6e3288 4782 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
20fd8def
JM
4783 if ev is None:
4784 raise Exception("Group formation timed out")
4785 self.sta_group_ev = ev
4786
4787 def goNegotiationSuccess(self, properties):
4788 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4789
4790 def groupStarted(self, properties):
4791 logger.debug("groupStarted: " + str(properties))
845d48c1 4792 self.group_started = True
20fd8def
JM
4793 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4794 properties['interface_object'])
4795 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4796 dev1.group_form_result(self.sta_group_ev)
4797 ifaddr = dev1.group_request("STA-FIRST").splitlines()[0]
4798 # Force disassociation with different reason code so that the
4799 # P2P Client using D-Bus does not get normal group termination event
4800 # from the GO.
4801 dev1.group_request("DEAUTHENTICATE " + ifaddr + " reason=0 test=0")
4802 dev1.remove_group()
4803
4804 def groupFinished(self, properties):
4805 logger.debug("groupFinished: " + str(properties))
4806 self.done = True
4807
4808 def propertiesChanged(self, interface_name, changed_properties,
4809 invalidated_properties):
4810 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4811 if interface_name != WPAS_DBUS_P2P_PEER:
4812 return
845d48c1
JM
4813 if not self.group_started:
4814 return
20fd8def
JM
4815 if "Groups" not in changed_properties:
4816 return
4817 if len(changed_properties["Groups"]) > 0:
4818 self.peer_group_added = True
4819 if len(changed_properties["Groups"]) == 0:
4820 self.peer_group_removed = True
4821 self.loop.quit()
4822
4823 def run_test(self, *args):
4824 logger.debug("run_test")
4825 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4826 return False
4827
4828 def success(self):
4829 return self.done and self.peer_group_added and self.peer_group_removed
2ec82e67
JM
4830
4831 with TestDbusP2p(bus) as t:
4832 if not t.success():
4833 raise Exception("Expected signals not seen")
4834
4835def test_dbus_p2p_wps_failure(dev, apdev):
4836 """D-Bus P2P WPS failure"""
910eb5b5 4837 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
4838 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4839 addr0 = dev[0].p2p_dev_addr()
4840
4841 class TestDbusP2p(TestDbus):
4842 def __init__(self, bus):
4843 TestDbus.__init__(self, bus)
084780f1
JM
4844 self.wps_failed = False
4845 self.formation_failure = False
2ec82e67
JM
4846
4847 def __enter__(self):
4848 gobject.timeout_add(1, self.run_test)
4849 gobject.timeout_add(15000, self.timeout)
4850 self.add_signal(self.goNegotiationRequest,
4851 WPAS_DBUS_IFACE_P2PDEVICE,
4852 "GONegotiationRequest",
4853 byte_arrays=True)
4854 self.add_signal(self.goNegotiationSuccess,
4855 WPAS_DBUS_IFACE_P2PDEVICE,
4856 "GONegotiationSuccess",
4857 byte_arrays=True)
4858 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4859 "GroupStarted")
4860 self.add_signal(self.wpsFailed, WPAS_DBUS_IFACE_P2PDEVICE,
4861 "WpsFailed")
084780f1
JM
4862 self.add_signal(self.groupFormationFailure,
4863 WPAS_DBUS_IFACE_P2PDEVICE,
4864 "GroupFormationFailure")
2ec82e67
JM
4865 self.loop.run()
4866 return self
4867
f5d5161d
JM
4868 def goNegotiationRequest(self, path, dev_passwd_id, go_intent=0):
4869 logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path, dev_passwd_id, go_intent))
2ec82e67
JM
4870 if dev_passwd_id != 1:
4871 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id)
4872 args = { 'peer': path, 'wps_method': 'display', 'pin': '12345670',
4873 'go_intent': 15 }
4874 p2p.Connect(args)
4875
4876 def goNegotiationSuccess(self, properties):
4877 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4878
4879 def groupStarted(self, properties):
4880 logger.debug("groupStarted: " + str(properties))
4881 raise Exception("Unexpected GroupStarted")
4882
4883 def wpsFailed(self, name, args):
4884 logger.debug("wpsFailed - name=%s args=%s" % (name, str(args)))
084780f1
JM
4885 self.wps_failed = True
4886 if self.formation_failure:
4887 self.loop.quit()
4888
4889 def groupFormationFailure(self, reason):
4890 logger.debug("groupFormationFailure - reason=%s" % reason)
4891 self.formation_failure = True
4892 if self.wps_failed:
4893 self.loop.quit()
2ec82e67
JM
4894
4895 def run_test(self, *args):
4896 logger.debug("run_test")
4897 p2p.Listen(10)
4898 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4899 if not dev1.discover_peer(addr0):
4900 raise Exception("Peer not found")
4901 dev1.global_request("P2P_CONNECT " + addr0 + " 87654321 enter")
4902 return False
4903
4904 def success(self):
084780f1 4905 return self.wps_failed and self.formation_failure
2ec82e67
JM
4906
4907 with TestDbusP2p(bus) as t:
4908 if not t.success():
4909 raise Exception("Expected signals not seen")
4910
4911def test_dbus_p2p_two_groups(dev, apdev):
4912 """D-Bus P2P with two concurrent groups"""
910eb5b5 4913 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
4914 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4915
4916 dev[0].request("SET p2p_no_group_iface 0")
4917 addr0 = dev[0].p2p_dev_addr()
4918 addr1 = dev[1].p2p_dev_addr()
4919 addr2 = dev[2].p2p_dev_addr()
4920 dev[1].p2p_start_go(freq=2412)
cb346b49 4921 dev1_group_ifname = dev[1].group_ifname
2ec82e67
JM
4922
4923 class TestDbusP2p(TestDbus):
4924 def __init__(self, bus):
4925 TestDbus.__init__(self, bus)
4926 self.done = False
4927 self.peer = None
4928 self.go = None
4929 self.group1 = None
4930 self.group2 = None
5a217649 4931 self.groups_removed = False
2ec82e67
JM
4932
4933 def __enter__(self):
4934 gobject.timeout_add(1, self.run_test)
4935 gobject.timeout_add(15000, self.timeout)
4936 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4937 "PropertiesChanged", byte_arrays=True)
4938 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4939 "DeviceFound")
4940 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4941 "GroupStarted")
4942 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4943 "GroupFinished")
4944 self.add_signal(self.peerJoined, WPAS_DBUS_GROUP,
4945 "PeerJoined")
4946 self.loop.run()
4947 return self
4948
4949 def propertiesChanged(self, interface_name, changed_properties,
4950 invalidated_properties):
4951 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4952
4953 def deviceFound(self, path):
4954 logger.debug("deviceFound: path=%s" % path)
4955 if addr2.replace(':','') in path:
4956 self.peer = path
4957 elif addr1.replace(':','') in path:
4958 self.go = path
4959 if self.go and not self.group1:
4960 logger.info("Join the group")
4961 p2p.StopFind()
4962 pin = '12345670'
4963 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49
JM
4964 dev1.group_ifname = dev1_group_ifname
4965 dev1.group_request("WPS_PIN any " + pin)
2ec82e67
JM
4966 args = { 'peer': self.go,
4967 'join': True,
4968 'wps_method': 'pin',
4969 'pin': pin,
4970 'frequency': 2412 }
4971 p2p.Connect(args)
4972
4973 def groupStarted(self, properties):
4974 logger.debug("groupStarted: " + str(properties))
4975 prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
4976 dbus_interface=dbus.PROPERTIES_IFACE)
4977 logger.debug("p2pdevice properties: " + str(prop))
4978
4979 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
4980 properties['group_object'])
4981 res = g_obj.GetAll(WPAS_DBUS_GROUP,
4982 dbus_interface=dbus.PROPERTIES_IFACE,
4983 byte_arrays=True)
4984 logger.debug("Group properties: " + str(res))
4985
4986 if not self.group1:
4987 self.group1 = properties['group_object']
4988 self.group1iface = properties['interface_object']
4989 self.g1_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4990 self.group1iface)
4991
4992 logger.info("Start autonomous GO")
4993 params = dbus.Dictionary({ 'frequency': 2412 })
4994 p2p.GroupAdd(params)
4995 elif not self.group2:
4996 self.group2 = properties['group_object']
4997 self.group2iface = properties['interface_object']
4998 self.g2_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4999 self.group2iface)
5000 self.g2_bssid = res['BSSID']
5001
5002 if self.group1 and self.group2:
5003 logger.info("Authorize peer to join the group")
5004 a2 = binascii.unhexlify(addr2.replace(':',''))
5005 params = { 'Role': 'enrollee',
5006 'P2PDeviceAddress': dbus.ByteArray(a2),
5007 'Bssid': dbus.ByteArray(a2),
5008 'Type': 'pin',
5009 'Pin': '12345670' }
5010 g_wps = dbus.Interface(self.g2_if_obj, WPAS_DBUS_IFACE_WPS)
5011 g_wps.Start(params)
5012
c37ef330 5013 bssid = ':'.join([ "%02x" % i for i in struct.unpack('6B', self.g2_bssid) ])
2ec82e67
JM
5014 dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
5015 dev2.scan_for_bss(bssid, freq=2412)
5016 dev2.global_request("P2P_CONNECT " + bssid + " 12345670 join freq=2412")
bc6e3288 5017 ev = dev2.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
cb346b49
JM
5018 if ev is None:
5019 raise Exception("Group join timed out")
5020 self.dev2_group_ev = ev
2ec82e67
JM
5021
5022 def groupFinished(self, properties):
5023 logger.debug("groupFinished: " + str(properties))
5024
5025 if self.group1 == properties['group_object']:
5026 self.group1 = None
5027 elif self.group2 == properties['group_object']:
5028 self.group2 = None
5029
5030 if not self.group1 and not self.group2:
5031 self.done = True
5032 self.loop.quit()
5033
5034 def peerJoined(self, peer):
5035 logger.debug("peerJoined: " + peer)
5a217649
JM
5036 if self.groups_removed:
5037 return
2ec82e67
JM
5038 self.check_results()
5039
5040 dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
cb346b49 5041 dev2.group_form_result(self.dev2_group_ev)
2ec82e67
JM
5042 dev2.remove_group()
5043
5044 logger.info("Disconnect group2")
5045 group_p2p = dbus.Interface(self.g2_if_obj,
5046 WPAS_DBUS_IFACE_P2PDEVICE)
5047 group_p2p.Disconnect()
5048
5049 logger.info("Disconnect group1")
5050 group_p2p = dbus.Interface(self.g1_if_obj,
5051 WPAS_DBUS_IFACE_P2PDEVICE)
5052 group_p2p.Disconnect()
5a217649 5053 self.groups_removed = True
2ec82e67
JM
5054
5055 def check_results(self):
5056 logger.info("Check results with two concurrent groups in operation")
5057
5058 g1_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group1)
5059 res1 = g1_obj.GetAll(WPAS_DBUS_GROUP,
5060 dbus_interface=dbus.PROPERTIES_IFACE,
5061 byte_arrays=True)
5062
5063 g2_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group2)
5064 res2 = g2_obj.GetAll(WPAS_DBUS_GROUP,
5065 dbus_interface=dbus.PROPERTIES_IFACE,
5066 byte_arrays=True)
5067
5068 logger.info("group1 = " + self.group1)
5069 logger.debug("Group properties: " + str(res1))
5070
5071 logger.info("group2 = " + self.group2)
5072 logger.debug("Group properties: " + str(res2))
5073
5074 prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
5075 dbus_interface=dbus.PROPERTIES_IFACE)
5076 logger.debug("p2pdevice properties: " + str(prop))
5077
5078 if res1['Role'] != 'client':
5079 raise Exception("Group1 role reported incorrectly: " + res1['Role'])
5080 if res2['Role'] != 'GO':
5081 raise Exception("Group2 role reported incorrectly: " + res2['Role'])
5082 if prop['Role'] != 'device':
5083 raise Exception("p2pdevice role reported incorrectly: " + prop['Role'])
5084
5085 if len(res2['Members']) != 1:
5086 raise Exception("Unexpected Members value for group 2")
5087
5088 def run_test(self, *args):
5089 logger.debug("run_test")
5090 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
5091 return False
5092
5093 def success(self):
5094 return self.done
5095
5096 with TestDbusP2p(bus) as t:
5097 if not t.success():
5098 raise Exception("Expected signals not seen")
5099
5100 dev[1].remove_group()
0e126c6d 5101
f572ae80
JM
5102def test_dbus_p2p_cancel(dev, apdev):
5103 """D-Bus P2P Cancel"""
5104 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5105 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
5106 try:
5107 p2p.Cancel()
5108 raise Exception("Unexpected p2p.Cancel() success")
bab493b9 5109 except dbus.exceptions.DBusException as e:
f572ae80
JM
5110 pass
5111
5112 addr0 = dev[0].p2p_dev_addr()
5113 dev[1].p2p_listen()
5114
5115 class TestDbusP2p(TestDbus):
5116 def __init__(self, bus):
5117 TestDbus.__init__(self, bus)
5118 self.done = False
5119
5120 def __enter__(self):
5121 gobject.timeout_add(1, self.run_test)
5122 gobject.timeout_add(15000, self.timeout)
5123 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
5124 "DeviceFound")
5125 self.loop.run()
5126 return self
5127
5128 def deviceFound(self, path):
5129 logger.debug("deviceFound: path=%s" % path)
5130 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
5131 'go_intent': 0 }
5132 p2p.Connect(args)
5133 p2p.Cancel()
5134 self.done = True
5135 self.loop.quit()
5136
5137 def run_test(self, *args):
5138 logger.debug("run_test")
5139 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
5140 return False
5141
5142 def success(self):
5143 return self.done
5144
5145 with TestDbusP2p(bus) as t:
5146 if not t.success():
5147 raise Exception("Expected signals not seen")
5148
afe28053
JM
5149def test_dbus_p2p_ip_addr(dev, apdev):
5150 """D-Bus P2P and IP address parameters"""
5151 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5152 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
5153
5154 vals = [ ("IpAddrGo", "192.168.43.1"),
5155 ("IpAddrMask", "255.255.255.0"),
5156 ("IpAddrStart", "192.168.43.100"),
5157 ("IpAddrEnd", "192.168.43.199") ]
5158 for field, value in vals:
5159 if_obj.Set(WPAS_DBUS_IFACE, field, value,
5160 dbus_interface=dbus.PROPERTIES_IFACE)
5161 val = if_obj.Get(WPAS_DBUS_IFACE, field,
5162 dbus_interface=dbus.PROPERTIES_IFACE)
5163 if val != value:
5164 raise Exception("Unexpected %s value: %s" % (field, val))
5165
5166 set_ip_addr_info(dev[1])
5167
5168 dev[0].global_request("SET p2p_go_intent 0")
5169
5170 req = dev[0].global_request("NFC_GET_HANDOVER_REQ NDEF P2P-CR").rstrip()
5171 if "FAIL" in req:
5172 raise Exception("Failed to generate NFC connection handover request")
5173 sel = dev[1].global_request("NFC_GET_HANDOVER_SEL NDEF P2P-CR").rstrip()
5174 if "FAIL" in sel:
5175 raise Exception("Failed to generate NFC connection handover select")
5176 dev[0].dump_monitor()
5177 dev[1].dump_monitor()
5178 res = dev[1].global_request("NFC_REPORT_HANDOVER RESP P2P " + req + " " + sel)
5179 if "FAIL" in res:
5180 raise Exception("Failed to report NFC connection handover to wpa_supplicant(resp)")
5181 res = dev[0].global_request("NFC_REPORT_HANDOVER INIT P2P " + req + " " + sel)
5182 if "FAIL" in res:
5183 raise Exception("Failed to report NFC connection handover to wpa_supplicant(init)")
5184
5185 class TestDbusP2p(TestDbus):
5186 def __init__(self, bus):
5187 TestDbus.__init__(self, bus)
5188 self.done = False
5189
5190 def __enter__(self):
5191 gobject.timeout_add(1, self.run_test)
5192 gobject.timeout_add(15000, self.timeout)
5193 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
5194 "GroupStarted")
5195 self.loop.run()
5196 return self
5197
5198 def groupStarted(self, properties):
5199 logger.debug("groupStarted: " + str(properties))
5200 self.loop.quit()
5201
5202 if 'IpAddrGo' not in properties:
5203 logger.info("IpAddrGo missing from GroupStarted")
5204 ip_addr_go = properties['IpAddrGo']
5205 addr = "%d.%d.%d.%d" % (ip_addr_go[0], ip_addr_go[1], ip_addr_go[2], ip_addr_go[3])
5206 if addr != "192.168.42.1":
5207 logger.info("Unexpected IpAddrGo value: " + addr)
5208 self.done = True
5209
5210 def run_test(self, *args):
5211 logger.debug("run_test")
5212 return False
5213
5214 def success(self):
5215 return self.done
5216
5217 with TestDbusP2p(bus) as t:
5218 if not t.success():
5219 raise Exception("Expected signals not seen")
5220
0e126c6d
JM
5221def test_dbus_introspect(dev, apdev):
5222 """D-Bus introspection"""
5223 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5224
5225 res = if_obj.Introspect(WPAS_DBUS_IFACE,
5226 dbus_interface=dbus.INTROSPECTABLE_IFACE)
5227 logger.info("Initial Introspect: " + str(res))
5228 if res is None or "Introspectable" not in res or "GroupStarted" not in res:
5229 raise Exception("Unexpected initial Introspect response: " + str(res))
f0ef6889
DW
5230 if "FastReauth" not in res or "PassiveScan" not in res:
5231 raise Exception("Unexpected initial Introspect response: " + str(res))
0e126c6d
JM
5232
5233 with alloc_fail(dev[0], 1, "wpa_dbus_introspect"):
5234 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
5235 dbus_interface=dbus.INTROSPECTABLE_IFACE)
5236 logger.info("Introspect: " + str(res2))
5237 if res2 is not None:
5238 raise Exception("Unexpected Introspect response")
5239
5240 with alloc_fail(dev[0], 1, "=add_interface;wpa_dbus_introspect"):
5241 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
5242 dbus_interface=dbus.INTROSPECTABLE_IFACE)
5243 logger.info("Introspect: " + str(res2))
5244 if res2 is None:
5245 raise Exception("No Introspect response")
5246 if len(res2) >= len(res):
5247 raise Exception("Unexpected Introspect response")
5248
5249 with alloc_fail(dev[0], 1, "wpabuf_alloc;add_interface;wpa_dbus_introspect"):
5250 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
5251 dbus_interface=dbus.INTROSPECTABLE_IFACE)
5252 logger.info("Introspect: " + str(res2))
5253 if res2 is None:
5254 raise Exception("No Introspect response")
5255 if len(res2) >= len(res):
5256 raise Exception("Unexpected Introspect response")
5257
5258 with alloc_fail(dev[0], 2, "=add_interface;wpa_dbus_introspect"):
5259 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
5260 dbus_interface=dbus.INTROSPECTABLE_IFACE)
5261 logger.info("Introspect: " + str(res2))
5262 if res2 is None:
5263 raise Exception("No Introspect response")
5264 if len(res2) >= len(res):
5265 raise Exception("Unexpected Introspect response")
9bbce257 5266
a21eadcf
JM
5267def run_busctl(service, obj):
5268 logger.info("busctl introspect %s %s" % (service, obj))
5269 cmd = subprocess.Popen([ 'busctl', 'introspect', service, obj ],
5270 stdout=subprocess.PIPE,
5271 stderr=subprocess.PIPE)
5272 out = cmd.communicate()
5273 cmd.wait()
5274 logger.info("busctl stdout:\n%s" % out[0].strip())
5275 if len(out[1]) > 0:
04fa9fc7
MH
5276 logger.info("busctl stderr: %s" % out[1].decode().strip())
5277 if "Duplicate property" in out[1].decode():
a21eadcf
JM
5278 raise Exception("Duplicate property")
5279
5280def test_dbus_introspect_busctl(dev, apdev):
5281 """D-Bus introspection with busctl"""
5282 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5283 ifaces = dbus_get(dbus, wpas_obj, "Interfaces")
5284 run_busctl(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH)
5285 run_busctl(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH + "/Interfaces")
5286 run_busctl(WPAS_DBUS_SERVICE, ifaces[0])
5287
5288 hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
5289 bssid = apdev[0]['bssid']
5290 dev[0].scan_for_bss(bssid, freq=2412)
5291 id = dev[0].add_network()
5292 dev[0].set_network(id, "disabled", "0")
5293 dev[0].set_network_quoted(id, "ssid", "test")
5294
5295 run_busctl(WPAS_DBUS_SERVICE, ifaces[0] + "/BSSs/0")
5296 run_busctl(WPAS_DBUS_SERVICE, ifaces[0] + "/Networks/0")
5297
9bbce257
JM
5298def test_dbus_ap(dev, apdev):
5299 """D-Bus AddNetwork for AP mode"""
5300 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5301 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5302
5303 ssid = "test-wpa2-psk"
5304 passphrase = 'qwertyuiop'
5305
5306 class TestDbusConnect(TestDbus):
5307 def __init__(self, bus):
5308 TestDbus.__init__(self, bus)
5309 self.started = False
345d8b5e
JM
5310 self.sta_added = False
5311 self.sta_removed = False
5312 self.authorized = False
5313 self.deauthorized = False
5314 self.stations = False
9bbce257
JM
5315
5316 def __enter__(self):
5317 gobject.timeout_add(1, self.run_connect)
5318 gobject.timeout_add(15000, self.timeout)
5319 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
5320 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
5321 "NetworkSelected")
5322 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
5323 "PropertiesChanged")
345d8b5e
JM
5324 self.add_signal(self.stationAdded, WPAS_DBUS_IFACE, "StationAdded")
5325 self.add_signal(self.stationRemoved, WPAS_DBUS_IFACE,
5326 "StationRemoved")
5327 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
5328 "StaAuthorized")
5329 self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE,
5330 "StaDeauthorized")
9bbce257
JM
5331 self.loop.run()
5332 return self
5333
5334 def networkAdded(self, network, properties):
5335 logger.debug("networkAdded: %s" % str(network))
5336 logger.debug(str(properties))
5337
5338 def networkSelected(self, network):
5339 logger.debug("networkSelected: %s" % str(network))
5340 self.network_selected = True
5341
5342 def propertiesChanged(self, properties):
5343 logger.debug("propertiesChanged: %s" % str(properties))
5344 if 'State' in properties and properties['State'] == "completed":
5345 self.started = True
345d8b5e
JM
5346 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
5347 dev1.connect(ssid, psk=passphrase, scan_freq="2412")
5348
5349 def stationAdded(self, station, properties):
5350 logger.debug("stationAdded: %s" % str(station))
5351 logger.debug(str(properties))
5352 self.sta_added = True
5353 res = if_obj.Get(WPAS_DBUS_IFACE, 'Stations',
5354 dbus_interface=dbus.PROPERTIES_IFACE)
5355 logger.info("Stations: " + str(res))
5356 if len(res) == 1:
5357 self.stations = True
5358 else:
5359 raise Exception("Missing Stations entry: " + str(res))
5360
5361 def stationRemoved(self, station):
5362 logger.debug("stationRemoved: %s" % str(station))
5363 self.sta_removed = True
5364 res = if_obj.Get(WPAS_DBUS_IFACE, 'Stations',
5365 dbus_interface=dbus.PROPERTIES_IFACE)
5366 logger.info("Stations: " + str(res))
5367 if len(res) != 0:
5368 self.stations = False
5369 raise Exception("Unexpected Stations entry: " + str(res))
5370 self.loop.quit()
5371
5372 def staAuthorized(self, name):
5373 logger.debug("staAuthorized: " + name)
5374 self.authorized = True
5375 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
5376 dev1.request("DISCONNECT")
5377
5378 def staDeauthorized(self, name):
5379 logger.debug("staDeauthorized: " + name)
5380 self.deauthorized = True
9bbce257
JM
5381
5382 def run_connect(self, *args):
5383 logger.debug("run_connect")
5384 args = dbus.Dictionary({ 'ssid': ssid,
5385 'key_mgmt': 'WPA-PSK',
5386 'psk': passphrase,
5387 'mode': 2,
345d8b5e
JM
5388 'frequency': 2412,
5389 'scan_freq': 2412 },
9bbce257
JM
5390 signature='sv')
5391 self.netw = iface.AddNetwork(args)
5392 iface.SelectNetwork(self.netw)
5393 return False
5394
5395 def success(self):
345d8b5e
JM
5396 return self.started and self.sta_added and self.sta_removed and \
5397 self.authorized and self.deauthorized
9bbce257
JM
5398
5399 with TestDbusConnect(bus) as t:
5400 if not t.success():
5401 raise Exception("Expected signals not seen")
d9e2a057
JM
5402
5403def test_dbus_connect_wpa_eap(dev, apdev):
5404 """D-Bus AddNetwork and connection with WPA+WPA2-Enterprise AP"""
5405 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5406 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5407
5408 ssid = "test-wpa-eap"
5409 params = hostapd.wpa_eap_params(ssid=ssid)
5410 params["wpa"] = "3"
5411 params["rsn_pairwise"] = "CCMP"
8b8a1864 5412 hapd = hostapd.add_ap(apdev[0], params)
d9e2a057
JM
5413
5414 class TestDbusConnect(TestDbus):
5415 def __init__(self, bus):
5416 TestDbus.__init__(self, bus)
5417 self.done = False
5418
5419 def __enter__(self):
5420 gobject.timeout_add(1, self.run_connect)
5421 gobject.timeout_add(15000, self.timeout)
5422 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
5423 "PropertiesChanged")
5424 self.add_signal(self.eap, WPAS_DBUS_IFACE, "EAP")
5425 self.loop.run()
5426 return self
5427
5428 def propertiesChanged(self, properties):
5429 logger.debug("propertiesChanged: %s" % str(properties))
5430 if 'State' in properties and properties['State'] == "completed":
5431 self.done = True
5432 self.loop.quit()
5433
5434 def eap(self, status, parameter):
5435 logger.debug("EAP: status=%s parameter=%s" % (status, parameter))
5436
5437 def run_connect(self, *args):
5438 logger.debug("run_connect")
5439 args = dbus.Dictionary({ 'ssid': ssid,
5440 'key_mgmt': 'WPA-EAP',
5441 'eap': 'PEAP',
5442 'identity': 'user',
5443 'password': 'password',
5444 'ca_cert': 'auth_serv/ca.pem',
5445 'phase2': 'auth=MSCHAPV2',
5446 'scan_freq': 2412 },
5447 signature='sv')
5448 self.netw = iface.AddNetwork(args)
5449 iface.SelectNetwork(self.netw)
5450 return False
5451
5452 def success(self):
5453 return self.done
5454
5455 with TestDbusConnect(bus) as t:
5456 if not t.success():
5457 raise Exception("Expected signals not seen")
708ec753
JM
5458
5459def test_dbus_ap_scan_2_ap_mode_scan(dev, apdev):
5460 """AP_SCAN 2 AP mode and D-Bus Scan()"""
5461 try:
5462 _test_dbus_ap_scan_2_ap_mode_scan(dev, apdev)
5463 finally:
5464 dev[0].request("AP_SCAN 1")
5465
5466def _test_dbus_ap_scan_2_ap_mode_scan(dev, apdev):
5467 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5468 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5469
5470 if "OK" not in dev[0].request("AP_SCAN 2"):
5471 raise Exception("Failed to set AP_SCAN 2")
5472
5473 id = dev[0].add_network()
5474 dev[0].set_network(id, "mode", "2")
5475 dev[0].set_network_quoted(id, "ssid", "wpas-ap-open")
5476 dev[0].set_network(id, "key_mgmt", "NONE")
5477 dev[0].set_network(id, "frequency", "2412")
5478 dev[0].set_network(id, "scan_freq", "2412")
5479 dev[0].set_network(id, "disabled", "0")
5480 dev[0].select_network(id)
5481 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=5)
5482 if ev is None:
5483 raise Exception("AP failed to start")
5484
5485 with fail_test(dev[0], 1, "wpa_driver_nl80211_scan"):
5486 iface.Scan({'Type': 'active',
5487 'AllowRoam': True,
5488 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
5489 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-FAILED",
5490 "AP-DISABLED"], timeout=5)
5491 if ev is None:
5492 raise Exception("CTRL-EVENT-SCAN-FAILED not seen")
5493 if "AP-DISABLED" in ev:
5494 raise Exception("Unexpected AP-DISABLED event")
5495 if "retry=1" in ev:
5496 # Wait for the retry to scan happen
5497 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-FAILED",
5498 "AP-DISABLED"], timeout=5)
5499 if ev is None:
5500 raise Exception("CTRL-EVENT-SCAN-FAILED not seen - retry")
5501 if "AP-DISABLED" in ev:
5502 raise Exception("Unexpected AP-DISABLED event - retry")
5503
5504 dev[1].connect("wpas-ap-open", key_mgmt="NONE", scan_freq="2412")
5505 dev[1].request("DISCONNECT")
5506 dev[1].wait_disconnected()
5507 dev[0].request("DISCONNECT")
5508 dev[0].wait_disconnected()
d679ab74
JM
5509
5510def test_dbus_expectdisconnect(dev, apdev):
5511 """D-Bus ExpectDisconnect"""
5512 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5513 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
5514
5515 params = { "ssid": "test-open" }
8b8a1864 5516 hapd = hostapd.add_ap(apdev[0], params)
d679ab74
JM
5517 dev[0].connect("test-open", key_mgmt="NONE", scan_freq="2412")
5518
5519 # This does not really verify the behavior other than by going through the
5520 # code path for additional coverage.
5521 wpas.ExpectDisconnect()
5522 dev[0].request("DISCONNECT")
5523 dev[0].wait_disconnected()
2299b543
JM
5524
5525def test_dbus_save_config(dev, apdev):
5526 """D-Bus SaveConfig"""
5527 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5528 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5529 try:
5530 iface.SaveConfig()
5531 raise Exception("SaveConfig() accepted unexpectedly")
bab493b9 5532 except dbus.exceptions.DBusException as e:
2299b543
JM
5533 if not str(e).startswith("fi.w1.wpa_supplicant1.UnknownError: Not allowed to update configuration"):
5534 raise Exception("Unexpected error message for SaveConfig(): " + str(e))
ce96e65c
JM
5535
5536def test_dbus_vendor_elem(dev, apdev):
5537 """D-Bus vendor element operations"""
5538 try:
5539 _test_dbus_vendor_elem(dev, apdev)
5540 finally:
5541 dev[0].request("VENDOR_ELEM_REMOVE 1 *")
5542
5543def _test_dbus_vendor_elem(dev, apdev):
5544 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5545 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5546
5547 dev[0].request("VENDOR_ELEM_REMOVE 1 *")
5548
5549 try:
15dfcb69 5550 ie = dbus.ByteArray(b"\x00\x00")
ce96e65c
JM
5551 iface.VendorElemAdd(-1, ie)
5552 raise Exception("Invalid VendorElemAdd() accepted")
bab493b9 5553 except dbus.exceptions.DBusException as e:
ce96e65c
JM
5554 if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5555 raise Exception("Unexpected error message for invalid VendorElemAdd[1]: " + str(e))
5556
5557 try:
15dfcb69 5558 ie = dbus.ByteArray(b'')
ce96e65c
JM
5559 iface.VendorElemAdd(1, ie)
5560 raise Exception("Invalid VendorElemAdd() accepted")
bab493b9 5561 except dbus.exceptions.DBusException as e:
ce96e65c
JM
5562 if "InvalidArgs" not in str(e) or "Invalid value" not in str(e):
5563 raise Exception("Unexpected error message for invalid VendorElemAdd[2]: " + str(e))
5564
5565 try:
15dfcb69 5566 ie = dbus.ByteArray(b"\x00\x01")
ce96e65c
JM
5567 iface.VendorElemAdd(1, ie)
5568 raise Exception("Invalid VendorElemAdd() accepted")
bab493b9 5569 except dbus.exceptions.DBusException as e:
ce96e65c
JM
5570 if "InvalidArgs" not in str(e) or "Parse error" not in str(e):
5571 raise Exception("Unexpected error message for invalid VendorElemAdd[3]: " + str(e))
5572
5573 try:
5574 iface.VendorElemGet(-1)
5575 raise Exception("Invalid VendorElemGet() accepted")
bab493b9 5576 except dbus.exceptions.DBusException as e:
ce96e65c
JM
5577 if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5578 raise Exception("Unexpected error message for invalid VendorElemGet[1]: " + str(e))
5579
5580 try:
5581 iface.VendorElemGet(1)
5582 raise Exception("Invalid VendorElemGet() accepted")
bab493b9 5583 except dbus.exceptions.DBusException as e:
ce96e65c
JM
5584 if "InvalidArgs" not in str(e) or "ID value does not exist" not in str(e):
5585 raise Exception("Unexpected error message for invalid VendorElemGet[2]: " + str(e))
5586
5587 try:
15dfcb69 5588 ie = dbus.ByteArray(b"\x00\x00")
ce96e65c
JM
5589 iface.VendorElemRem(-1, ie)
5590 raise Exception("Invalid VendorElemRemove() accepted")
bab493b9 5591 except dbus.exceptions.DBusException as e:
ce96e65c
JM
5592 if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5593 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5594
5595 try:
15dfcb69 5596 ie = dbus.ByteArray(b'')
ce96e65c
JM
5597 iface.VendorElemRem(1, ie)
5598 raise Exception("Invalid VendorElemRemove() accepted")
bab493b9 5599 except dbus.exceptions.DBusException as e:
ce96e65c
JM
5600 if "InvalidArgs" not in str(e) or "Invalid value" not in str(e):
5601 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5602
15dfcb69 5603 iface.VendorElemRem(1, b"*")
ce96e65c 5604
15dfcb69 5605 ie = dbus.ByteArray(b"\x00\x01\x00")
ce96e65c
JM
5606 iface.VendorElemAdd(1, ie)
5607
5608 val = iface.VendorElemGet(1)
5609 if len(val) != len(ie):
5610 raise Exception("Unexpected VendorElemGet length")
5611 for i in range(len(val)):
5612 if val[i] != dbus.Byte(ie[i]):
5613 raise Exception("Unexpected VendorElemGet data")
5614
15dfcb69 5615 ie2 = dbus.ByteArray(b"\xe0\x00")
ce96e65c
JM
5616 iface.VendorElemAdd(1, ie2)
5617
5618 ies = ie + ie2
5619 val = iface.VendorElemGet(1)
5620 if len(val) != len(ies):
5621 raise Exception("Unexpected VendorElemGet length[2]")
5622 for i in range(len(val)):
5623 if val[i] != dbus.Byte(ies[i]):
5624 raise Exception("Unexpected VendorElemGet data[2]")
5625
5626 try:
15dfcb69 5627 test_ie = dbus.ByteArray(b"\x01\x01")
ce96e65c
JM
5628 iface.VendorElemRem(1, test_ie)
5629 raise Exception("Invalid VendorElemRemove() accepted")
bab493b9 5630 except dbus.exceptions.DBusException as e:
ce96e65c
JM
5631 if "InvalidArgs" not in str(e) or "Parse error" not in str(e):
5632 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5633
5634 iface.VendorElemRem(1, ie)
5635 val = iface.VendorElemGet(1)
5636 if len(val) != len(ie2):
5637 raise Exception("Unexpected VendorElemGet length[3]")
5638
15dfcb69 5639 iface.VendorElemRem(1, b"*")
ce96e65c
JM
5640 try:
5641 iface.VendorElemGet(1)
5642 raise Exception("Invalid VendorElemGet() accepted after removal")
bab493b9 5643 except dbus.exceptions.DBusException as e:
ce96e65c
JM
5644 if "InvalidArgs" not in str(e) or "ID value does not exist" not in str(e):
5645 raise Exception("Unexpected error message for invalid VendorElemGet after removal: " + str(e))
dbd183c7
JM
5646
5647def test_dbus_assoc_reject(dev, apdev):
5648 """D-Bus AssocStatusCode"""
5649 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5650 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5651
5652 ssid = "test-open"
5653 params = { "ssid": ssid,
5654 "max_listen_interval": "1" }
8b8a1864 5655 hapd = hostapd.add_ap(apdev[0], params)
dbd183c7
JM
5656
5657 class TestDbusConnect(TestDbus):
5658 def __init__(self, bus):
5659 TestDbus.__init__(self, bus)
5660 self.assoc_status_seen = False
5661 self.state = 0
5662
5663 def __enter__(self):
5664 gobject.timeout_add(1, self.run_connect)
5665 gobject.timeout_add(15000, self.timeout)
5666 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
5667 "PropertiesChanged")
5668 self.loop.run()
5669 return self
5670
5671 def propertiesChanged(self, properties):
5672 logger.debug("propertiesChanged: %s" % str(properties))
5673 if 'AssocStatusCode' in properties:
5674 status = properties['AssocStatusCode']
5675 if status != 51:
5676 logger.info("Unexpected status code: " + str(status))
5677 else:
5678 self.assoc_status_seen = True
5679 iface.Disconnect()
5680 self.loop.quit()
5681
5682 def run_connect(self, *args):
5683 args = dbus.Dictionary({ 'ssid': ssid,
5684 'key_mgmt': 'NONE',
5685 'scan_freq': 2412 },
5686 signature='sv')
5687 self.netw = iface.AddNetwork(args)
5688 iface.SelectNetwork(self.netw)
5689 return False
5690
5691 def success(self):
5692 return self.assoc_status_seen
5693
5694 with TestDbusConnect(bus) as t:
5695 if not t.success():
5696 raise Exception("Expected signals not seen")
504c7ffd
JM
5697
5698def test_dbus_mesh(dev, apdev):
5699 """D-Bus mesh"""
5700 check_mesh_support(dev[0])
5701 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5702 mesh = dbus.Interface(if_obj, WPAS_DBUS_IFACE_MESH)
5703
5704 add_open_mesh_network(dev[1])
5705 addr1 = dev[1].own_addr()
5706
5707 class TestDbusMesh(TestDbus):
5708 def __init__(self, bus):
5709 TestDbus.__init__(self, bus)
5710 self.done = False
5711
5712 def __enter__(self):
5713 gobject.timeout_add(1, self.run_test)
5714 gobject.timeout_add(15000, self.timeout)
5715 self.add_signal(self.meshGroupStarted, WPAS_DBUS_IFACE_MESH,
5716 "MeshGroupStarted")
5717 self.add_signal(self.meshGroupRemoved, WPAS_DBUS_IFACE_MESH,
5718 "MeshGroupRemoved")
5719 self.add_signal(self.meshPeerConnected, WPAS_DBUS_IFACE_MESH,
5720 "MeshPeerConnected")
5721 self.add_signal(self.meshPeerDisconnected, WPAS_DBUS_IFACE_MESH,
5722 "MeshPeerDisconnected")
5723 self.loop.run()
5724 return self
5725
5726 def meshGroupStarted(self, args):
5727 logger.debug("MeshGroupStarted: " + str(args))
5728
5729 def meshGroupRemoved(self, args):
5730 logger.debug("MeshGroupRemoved: " + str(args))
5731 self.done = True
5732 self.loop.quit()
5733
5734 def meshPeerConnected(self, args):
5735 logger.debug("MeshPeerConnected: " + str(args))
5736
5737 res = if_obj.Get(WPAS_DBUS_IFACE_MESH, 'MeshPeers',
5738 dbus_interface=dbus.PROPERTIES_IFACE,
5739 byte_arrays=True)
5740 logger.debug("MeshPeers: " + str(res))
5741 if len(res) != 1:
5742 raise Exception("Unexpected number of MeshPeer values")
7ab74770 5743 if binascii.hexlify(res[0]).decode() != addr1.replace(':', ''):
504c7ffd
JM
5744 raise Exception("Unexpected peer address")
5745
5746 res = if_obj.Get(WPAS_DBUS_IFACE_MESH, 'MeshGroup',
5747 dbus_interface=dbus.PROPERTIES_IFACE,
5748 byte_arrays=True)
5749 logger.debug("MeshGroup: " + str(res))
15dfcb69 5750 if res != b"wpas-mesh-open":
504c7ffd
JM
5751 raise Exception("Unexpected MeshGroup")
5752 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
5753 dev1.mesh_group_remove()
5754
5755 def meshPeerDisconnected(self, args):
5756 logger.debug("MeshPeerDisconnected: " + str(args))
5757 dev0 = WpaSupplicant('wlan0', '/tmp/wpas-wlan0')
5758 dev0.mesh_group_remove()
5759
5760 def run_test(self, *args):
5761 logger.debug("run_test")
5762 dev0 = WpaSupplicant('wlan0', '/tmp/wpas-wlan0')
5763 add_open_mesh_network(dev0)
5764 return False
5765
5766 def success(self):
5767 return self.done
5768
5769 with TestDbusMesh(bus) as t:
5770 if not t.success():
5771 raise Exception("Expected signals not seen")