]> git.ipfire.org Git - thirdparty/hostap.git/blame - tests/hwsim/test_dbus.py
tests: Convert binascii.hexlify() output to a string object for python3
[thirdparty/hostap.git] / tests / hwsim / test_dbus.py
CommitLineData
2ec82e67 1# wpa_supplicant D-Bus interface tests
910eb5b5 2# Copyright (c) 2014-2015, Jouni Malinen <j@w1.fi>
2ec82e67
JM
3#
4# This software may be distributed under the terms of the BSD license.
5# See README for more details.
6
7import binascii
2ec82e67
JM
8import logging
9logger = logging.getLogger()
10import subprocess
11import time
12
910eb5b5 13try:
b4de353c 14 import gobject
910eb5b5
JM
15 import dbus
16 dbus_imported = True
17except ImportError:
18 dbus_imported = False
19
2ec82e67
JM
20import hostapd
21from wpasupplicant import WpaSupplicant
708ec753 22from utils import HwsimSkip, alloc_fail, fail_test
1992af11 23from p2p_utils import *
2ec82e67 24from test_ap_tdls import connect_2sta_open
089e7ca3 25from test_ap_eap import check_altsubject_match_support
afe28053 26from test_nfc_p2p import set_ip_addr_info
504c7ffd 27from test_wpas_mesh import check_mesh_support, add_open_mesh_network
2ec82e67
JM
28
29WPAS_DBUS_SERVICE = "fi.w1.wpa_supplicant1"
30WPAS_DBUS_PATH = "/fi/w1/wpa_supplicant1"
31WPAS_DBUS_IFACE = "fi.w1.wpa_supplicant1.Interface"
32WPAS_DBUS_IFACE_WPS = WPAS_DBUS_IFACE + ".WPS"
33WPAS_DBUS_NETWORK = "fi.w1.wpa_supplicant1.Network"
34WPAS_DBUS_BSS = "fi.w1.wpa_supplicant1.BSS"
35WPAS_DBUS_IFACE_P2PDEVICE = WPAS_DBUS_IFACE + ".P2PDevice"
36WPAS_DBUS_P2P_PEER = "fi.w1.wpa_supplicant1.Peer"
37WPAS_DBUS_GROUP = "fi.w1.wpa_supplicant1.Group"
38WPAS_DBUS_PERSISTENT_GROUP = "fi.w1.wpa_supplicant1.PersistentGroup"
504c7ffd 39WPAS_DBUS_IFACE_MESH = WPAS_DBUS_IFACE + ".Mesh"
2ec82e67
JM
40
41def prepare_dbus(dev):
910eb5b5
JM
42 if not dbus_imported:
43 logger.info("No dbus module available")
51c5aeb4 44 raise HwsimSkip("No dbus module available")
2ec82e67 45 try:
2ec82e67
JM
46 from dbus.mainloop.glib import DBusGMainLoop
47 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
48 bus = dbus.SystemBus()
49 wpas_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH)
50 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
51 path = wpas.GetInterface(dev.ifname)
52 if_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
910eb5b5 53 return (bus,wpas_obj,path,if_obj)
bab493b9 54 except Exception as e:
51c5aeb4 55 raise HwsimSkip("Could not connect to D-Bus: %s" % e)
2ec82e67
JM
56
57class TestDbus(object):
58 def __init__(self, bus):
59 self.loop = gobject.MainLoop()
60 self.signals = []
61 self.bus = bus
62
63 def __exit__(self, type, value, traceback):
64 for s in self.signals:
65 s.remove()
66
67 def add_signal(self, handler, interface, name, byte_arrays=False):
68 s = self.bus.add_signal_receiver(handler, dbus_interface=interface,
69 signal_name=name,
70 byte_arrays=byte_arrays)
71 self.signals.append(s)
72
73 def timeout(self, *args):
74 logger.debug("timeout")
75 self.loop.quit()
76 return False
77
0e126c6d
JM
78class alloc_fail_dbus(object):
79 def __init__(self, dev, count, funcs, operation="Operation",
80 expected="NoMemory"):
81 self._dev = dev
82 self._count = count
83 self._funcs = funcs
84 self._operation = operation
85 self._expected = expected
86 def __enter__(self):
87 cmd = "TEST_ALLOC_FAIL %d:%s" % (self._count, self._funcs)
88 if "OK" not in self._dev.request(cmd):
89 raise HwsimSkip("TEST_ALLOC_FAIL not supported")
90 def __exit__(self, type, value, traceback):
91 if type is None:
92 raise Exception("%s succeeded during out-of-memory" % self._operation)
93 if type == dbus.exceptions.DBusException and self._expected in str(value):
94 return True
95 if self._dev.request("GET_ALLOC_FAIL") != "0:%s" % self._funcs:
96 raise Exception("%s did not trigger allocation failure" % self._operation)
97 return False
98
3a59cda1
JM
99def start_ap(ap, ssid="test-wps",
100 ap_uuid="27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"):
2ec82e67
JM
101 params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
102 "wpa_passphrase": "12345678", "wpa": "2",
103 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
3a59cda1 104 "ap_pin": "12345670", "uuid": ap_uuid}
afc26df2 105 return hostapd.add_ap(ap, params)
2ec82e67
JM
106
107def test_dbus_getall(dev, apdev):
108 """D-Bus GetAll"""
910eb5b5 109 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
110
111 props = wpas_obj.GetAll(WPAS_DBUS_SERVICE,
112 dbus_interface=dbus.PROPERTIES_IFACE)
113 logger.debug("GetAll(fi.w1.wpa.supplicant1, /fi/w1/wpa_supplicant1) ==> " + str(props))
114
115 props = if_obj.GetAll(WPAS_DBUS_IFACE,
116 dbus_interface=dbus.PROPERTIES_IFACE)
117 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE, path, str(props)))
118
119 props = if_obj.GetAll(WPAS_DBUS_IFACE_WPS,
120 dbus_interface=dbus.PROPERTIES_IFACE)
121 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE_WPS, path, str(props)))
122
123 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
124 dbus_interface=dbus.PROPERTIES_IFACE)
125 if len(res) != 0:
126 raise Exception("Unexpected BSSs entry: " + str(res))
127
128 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
129 dbus_interface=dbus.PROPERTIES_IFACE)
130 if len(res) != 0:
131 raise Exception("Unexpected Networks entry: " + str(res))
132
8b8a1864 133 hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
2ec82e67
JM
134 bssid = apdev[0]['bssid']
135 dev[0].scan_for_bss(bssid, freq=2412)
136 id = dev[0].add_network()
137 dev[0].set_network(id, "disabled", "0")
138 dev[0].set_network_quoted(id, "ssid", "test")
139
140 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
141 dbus_interface=dbus.PROPERTIES_IFACE)
142 if len(res) != 1:
143 raise Exception("Missing BSSs entry: " + str(res))
144 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
145 props = bss_obj.GetAll(WPAS_DBUS_BSS, dbus_interface=dbus.PROPERTIES_IFACE)
146 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS, res[0], str(props)))
147 bssid_str = ''
148 for item in props['BSSID']:
149 if len(bssid_str) > 0:
150 bssid_str += ':'
151 bssid_str += '%02x' % item
152 if bssid_str != bssid:
153 raise Exception("Unexpected BSSID in BSSs entry")
154
155 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
156 dbus_interface=dbus.PROPERTIES_IFACE)
157 if len(res) != 1:
158 raise Exception("Missing Networks entry: " + str(res))
159 net_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
160 props = net_obj.GetAll(WPAS_DBUS_NETWORK,
161 dbus_interface=dbus.PROPERTIES_IFACE)
162 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_NETWORK, res[0], str(props)))
163 ssid = props['Properties']['ssid']
164 if ssid != '"test"':
165 raise Exception("Unexpected SSID in network entry")
166
73ece491
JM
167def test_dbus_getall_oom(dev, apdev):
168 """D-Bus GetAll wpa_config_get_all() OOM"""
169 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
170
171 id = dev[0].add_network()
172 dev[0].set_network(id, "disabled", "0")
173 dev[0].set_network_quoted(id, "ssid", "test")
174
175 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
176 dbus_interface=dbus.PROPERTIES_IFACE)
177 if len(res) != 1:
178 raise Exception("Missing Networks entry: " + str(res))
179 net_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
180 for i in range(1, 50):
181 with alloc_fail(dev[0], i, "wpa_config_get_all"):
182 try:
183 props = net_obj.GetAll(WPAS_DBUS_NETWORK,
184 dbus_interface=dbus.PROPERTIES_IFACE)
bab493b9 185 except dbus.exceptions.DBusException as e:
73ece491
JM
186 pass
187
2ec82e67
JM
188def dbus_get(dbus, wpas_obj, prop, expect=None, byte_arrays=False):
189 val = wpas_obj.Get(WPAS_DBUS_SERVICE, prop,
190 dbus_interface=dbus.PROPERTIES_IFACE,
191 byte_arrays=byte_arrays)
192 if expect is not None and val != expect:
193 raise Exception("Unexpected %s: %s (expected: %s)" %
194 (prop, str(val), str(expect)))
195 return val
196
197def dbus_set(dbus, wpas_obj, prop, val):
198 wpas_obj.Set(WPAS_DBUS_SERVICE, prop, val,
199 dbus_interface=dbus.PROPERTIES_IFACE)
200
201def test_dbus_properties(dev, apdev):
202 """D-Bus Get/Set fi.w1.wpa_supplicant1 properties"""
910eb5b5 203 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
204
205 dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump")
206 dbus_set(dbus, wpas_obj, "DebugLevel", "debug")
207 dbus_get(dbus, wpas_obj, "DebugLevel", expect="debug")
208 for (val,err) in [ (3, "Error.Failed: wrong property type"),
209 ("foo", "Error.Failed: wrong debug level value") ]:
210 try:
211 dbus_set(dbus, wpas_obj, "DebugLevel", val)
212 raise Exception("Invalid DebugLevel value accepted: " + str(val))
bab493b9 213 except dbus.exceptions.DBusException as e:
2ec82e67
JM
214 if err not in str(e):
215 raise Exception("Unexpected error message: " + str(e))
216 dbus_set(dbus, wpas_obj, "DebugLevel", "msgdump")
217 dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump")
218
219 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True)
220 dbus_set(dbus, wpas_obj, "DebugTimestamp", False)
221 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=False)
222 try:
223 dbus_set(dbus, wpas_obj, "DebugTimestamp", "foo")
224 raise Exception("Invalid DebugTimestamp value accepted")
bab493b9 225 except dbus.exceptions.DBusException as e:
2ec82e67
JM
226 if "Error.Failed: wrong property type" not in str(e):
227 raise Exception("Unexpected error message: " + str(e))
228 dbus_set(dbus, wpas_obj, "DebugTimestamp", True)
229 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True)
230
231 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True)
232 dbus_set(dbus, wpas_obj, "DebugShowKeys", False)
233 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=False)
234 try:
235 dbus_set(dbus, wpas_obj, "DebugShowKeys", "foo")
236 raise Exception("Invalid DebugShowKeys value accepted")
bab493b9 237 except dbus.exceptions.DBusException as e:
2ec82e67
JM
238 if "Error.Failed: wrong property type" not in str(e):
239 raise Exception("Unexpected error message: " + str(e))
240 dbus_set(dbus, wpas_obj, "DebugShowKeys", True)
241 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True)
242
243 res = dbus_get(dbus, wpas_obj, "Interfaces")
244 if len(res) != 1:
245 raise Exception("Unexpected Interfaces value: " + str(res))
246
247 res = dbus_get(dbus, wpas_obj, "EapMethods")
248 if len(res) < 5 or "TTLS" not in res:
249 raise Exception("Unexpected EapMethods value: " + str(res))
250
251 res = dbus_get(dbus, wpas_obj, "Capabilities")
252 if len(res) < 2 or "p2p" not in res:
253 raise Exception("Unexpected Capabilities value: " + str(res))
254
255 dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
256 val = binascii.unhexlify("010006020304050608")
257 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val))
258 res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
259 if val != res:
260 raise Exception("WFDIEs value changed")
261 try:
262 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray('\x00'))
263 raise Exception("Invalid WFDIEs value accepted")
bab493b9 264 except dbus.exceptions.DBusException as e:
2ec82e67
JM
265 if "InvalidArgs" not in str(e):
266 raise Exception("Unexpected error message: " + str(e))
267 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(''))
268 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val))
269 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(''))
270 res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
271 if len(res) != 0:
272 raise Exception("WFDIEs not cleared properly")
273
274 res = dbus_get(dbus, wpas_obj, "EapMethods")
275 try:
276 dbus_set(dbus, wpas_obj, "EapMethods", res)
277 raise Exception("Invalid Set accepted")
bab493b9 278 except dbus.exceptions.DBusException as e:
2ec82e67
JM
279 if "InvalidArgs: Property is read-only" not in str(e):
280 raise Exception("Unexpected error message: " + str(e))
281
282 try:
283 wpas_obj.SetFoo(WPAS_DBUS_SERVICE, "DebugShowKeys", True,
284 dbus_interface=dbus.PROPERTIES_IFACE)
285 raise Exception("Unknown method accepted")
bab493b9 286 except dbus.exceptions.DBusException as e:
2ec82e67
JM
287 if "UnknownMethod" not in str(e):
288 raise Exception("Unexpected error message: " + str(e))
289
290 try:
291 wpas_obj.Get("foo", "DebugShowKeys",
292 dbus_interface=dbus.PROPERTIES_IFACE)
293 raise Exception("Invalid Get accepted")
bab493b9 294 except dbus.exceptions.DBusException as e:
2ec82e67
JM
295 if "InvalidArgs: No such property" not in str(e):
296 raise Exception("Unexpected error message: " + str(e))
297
298 test_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH,
299 introspect=False)
300 try:
301 test_obj.Get(123, "DebugShowKeys",
302 dbus_interface=dbus.PROPERTIES_IFACE)
303 raise Exception("Invalid Get accepted")
bab493b9 304 except dbus.exceptions.DBusException as e:
2ec82e67
JM
305 if "InvalidArgs: Invalid arguments" not in str(e):
306 raise Exception("Unexpected error message: " + str(e))
307 try:
308 test_obj.Get(WPAS_DBUS_SERVICE, 123,
309 dbus_interface=dbus.PROPERTIES_IFACE)
310 raise Exception("Invalid Get accepted")
bab493b9 311 except dbus.exceptions.DBusException as e:
2ec82e67
JM
312 if "InvalidArgs: Invalid arguments" not in str(e):
313 raise Exception("Unexpected error message: " + str(e))
314
315 try:
316 wpas_obj.Set(WPAS_DBUS_SERVICE, "WFDIEs",
317 dbus.ByteArray('', variant_level=2),
318 dbus_interface=dbus.PROPERTIES_IFACE)
319 raise Exception("Invalid Set accepted")
bab493b9 320 except dbus.exceptions.DBusException as e:
2ec82e67
JM
321 if "InvalidArgs: invalid message format" not in str(e):
322 raise Exception("Unexpected error message: " + str(e))
323
f0ef6889
DW
324def test_dbus_set_global_properties(dev, apdev):
325 """D-Bus Get/Set fi.w1.wpa_supplicant1 interface global properties"""
326 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
327
e51e49fc 328 dev[0].set("model_name", "")
f0ef6889
DW
329 props = [ ('Okc', '0', '1'), ('ModelName', '', 'blahblahblah') ]
330
331 for p in props:
332 res = if_obj.Get(WPAS_DBUS_IFACE, p[0],
333 dbus_interface=dbus.PROPERTIES_IFACE)
334 if res != p[1]:
335 raise Exception("Unexpected " + p[0] + " value: " + str(res))
336
337 if_obj.Set(WPAS_DBUS_IFACE, p[0], p[2],
338 dbus_interface=dbus.PROPERTIES_IFACE)
339
340 res = if_obj.Get(WPAS_DBUS_IFACE, p[0],
341 dbus_interface=dbus.PROPERTIES_IFACE)
342 if res != p[2]:
343 raise Exception("Unexpected " + p[0] + " value after set: " + str(res))
e51e49fc 344 dev[0].set("model_name", "")
f0ef6889 345
2ec82e67
JM
346def test_dbus_invalid_method(dev, apdev):
347 """D-Bus invalid method"""
910eb5b5 348 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
349 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
350
351 try:
352 wps.Foo()
353 raise Exception("Unknown method accepted")
bab493b9 354 except dbus.exceptions.DBusException as e:
2ec82e67
JM
355 if "UnknownMethod" not in str(e):
356 raise Exception("Unexpected error message: " + str(e))
357
358 test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False)
359 test_wps = dbus.Interface(test_obj, WPAS_DBUS_IFACE_WPS)
360 try:
361 test_wps.Start(123)
362 raise Exception("WPS.Start with incorrect signature accepted")
bab493b9 363 except dbus.exceptions.DBusException as e:
2ec82e67
JM
364 if "InvalidArgs: Invalid arg" not in str(e):
365 raise Exception("Unexpected error message: " + str(e))
366
367def test_dbus_get_set_wps(dev, apdev):
368 """D-Bus Get/Set for WPS properties"""
369 try:
81e787b7 370 _test_dbus_get_set_wps(dev, apdev)
2ec82e67
JM
371 finally:
372 dev[0].request("SET wps_cred_processing 0")
364e28c9 373 dev[0].request("SET config_methods display keypad virtual_display nfc_interface p2ps")
4e6bd667
JM
374 dev[0].set("device_name", "Device A")
375 dev[0].set("manufacturer", "")
376 dev[0].set("model_name", "")
377 dev[0].set("model_number", "")
378 dev[0].set("serial_number", "")
379 dev[0].set("device_type", "0-00000000-0")
2ec82e67
JM
380
381def _test_dbus_get_set_wps(dev, apdev):
910eb5b5 382 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
383
384 if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
385 dbus_interface=dbus.PROPERTIES_IFACE)
386
387 val = "display keypad virtual_display nfc_interface"
388 dev[0].request("SET config_methods " + val)
389
390 config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
391 dbus_interface=dbus.PROPERTIES_IFACE)
392 if config != val:
393 raise Exception("Unexpected Get(ConfigMethods) result: " + config)
394
395 val2 = "push_button display"
396 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2,
397 dbus_interface=dbus.PROPERTIES_IFACE)
398 config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
399 dbus_interface=dbus.PROPERTIES_IFACE)
400 if config != val2:
401 raise Exception("Unexpected Get(ConfigMethods) result after Set: " + config)
402
403 dev[0].request("SET config_methods " + val)
404
405 for i in range(3):
406 dev[0].request("SET wps_cred_processing " + str(i))
407 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
408 dbus_interface=dbus.PROPERTIES_IFACE)
409 expected_val = False if i == 1 else True
410 if val != expected_val:
411 raise Exception("Unexpected Get(ProcessCredentials) result({}): {}".format(i, val))
412
4e6bd667
JM
413 tests = [ ("device_name", "DeviceName"),
414 ("manufacturer", "Manufacturer"),
415 ("model_name", "ModelName"),
416 ("model_number", "ModelNumber"),
417 ("serial_number", "SerialNumber") ]
418
419 for f1,f2 in tests:
420 val2 = "test-value-test"
421 dev[0].set(f1, val2)
422 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, f2,
423 dbus_interface=dbus.PROPERTIES_IFACE)
424 if val != val2:
425 raise Exception("Get(%s) returned unexpected value" % f2)
426 val2 = "TEST-value"
427 if_obj.Set(WPAS_DBUS_IFACE_WPS, f2, val2,
428 dbus_interface=dbus.PROPERTIES_IFACE)
429 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, f2,
430 dbus_interface=dbus.PROPERTIES_IFACE)
431 if val != val2:
432 raise Exception("Get(%s) returned unexpected value after Set" % f2)
433
434 dev[0].set("device_type", "5-0050F204-1")
435 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "DeviceType",
436 dbus_interface=dbus.PROPERTIES_IFACE)
437 if val[0] != 0x00 or val[1] != 0x05 != val[2] != 0x00 or val[3] != 0x50 or val[4] != 0xf2 or val[5] != 0x04 or val[6] != 0x00 or val[7] != 0x01:
438 raise Exception("DeviceType mismatch")
439 if_obj.Set(WPAS_DBUS_IFACE_WPS, "DeviceType", val,
440 dbus_interface=dbus.PROPERTIES_IFACE)
441 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "DeviceType",
442 dbus_interface=dbus.PROPERTIES_IFACE)
443 if val[0] != 0x00 or val[1] != 0x05 != val[2] != 0x00 or val[3] != 0x50 or val[4] != 0xf2 or val[5] != 0x04 or val[6] != 0x00 or val[7] != 0x01:
444 raise Exception("DeviceType mismatch after Set")
445
446 val2 = '\x01\x02\x03\x04\x05\x06\x07\x08'
447 if_obj.Set(WPAS_DBUS_IFACE_WPS, "DeviceType", dbus.ByteArray(val2),
448 dbus_interface=dbus.PROPERTIES_IFACE)
449 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "DeviceType",
450 dbus_interface=dbus.PROPERTIES_IFACE,
451 byte_arrays=True)
452 if val != val2:
453 raise Exception("DeviceType mismatch after Set (2)")
454
2ec82e67
JM
455 class TestDbusGetSet(TestDbus):
456 def __init__(self, bus):
457 TestDbus.__init__(self, bus)
458 self.signal_received = False
459 self.signal_received_deprecated = False
460 self.sets_done = False
461
462 def __enter__(self):
463 gobject.timeout_add(1, self.run_sets)
464 gobject.timeout_add(1000, self.timeout)
465 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE_WPS,
466 "PropertiesChanged")
467 self.add_signal(self.propertiesChanged2, dbus.PROPERTIES_IFACE,
468 "PropertiesChanged")
469 self.loop.run()
470 return self
471
472 def propertiesChanged(self, properties):
473 logger.debug("PropertiesChanged: " + str(properties))
474 if properties.has_key("ProcessCredentials"):
475 self.signal_received_deprecated = True
476 if self.sets_done and self.signal_received:
477 self.loop.quit()
478
479 def propertiesChanged2(self, interface_name, changed_properties,
480 invalidated_properties):
481 logger.debug("propertiesChanged2: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
482 if interface_name != WPAS_DBUS_IFACE_WPS:
483 return
484 if changed_properties.has_key("ProcessCredentials"):
485 self.signal_received = True
486 if self.sets_done and self.signal_received_deprecated:
487 self.loop.quit()
488
489 def run_sets(self, *args):
490 logger.debug("run_sets")
491 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
492 dbus.Boolean(1),
493 dbus_interface=dbus.PROPERTIES_IFACE)
494 if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
495 dbus_interface=dbus.PROPERTIES_IFACE) != True:
bc6e3288 496 raise Exception("Unexpected Get(ProcessCredentials) result after Set")
2ec82e67
JM
497 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
498 dbus.Boolean(0),
499 dbus_interface=dbus.PROPERTIES_IFACE)
500 if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
501 dbus_interface=dbus.PROPERTIES_IFACE) != False:
bc6e3288 502 raise Exception("Unexpected Get(ProcessCredentials) result after Set")
2ec82e67
JM
503
504 self.dbus_sets_done = True
505 return False
506
507 def success(self):
508 return self.signal_received and self.signal_received_deprecated
509
510 with TestDbusGetSet(bus) as t:
511 if not t.success():
512 raise Exception("No signal received for ProcessCredentials change")
513
514def test_dbus_wps_invalid(dev, apdev):
515 """D-Bus invaldi WPS operation"""
910eb5b5 516 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
517 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
518
519 failures = [ {'Role': 'foo', 'Type': 'pbc'},
520 {'Role': 123, 'Type': 'pbc'},
521 {'Type': 'pbc'},
522 {'Role': 'enrollee'},
523 {'Role': 'registrar'},
524 {'Role': 'enrollee', 'Type': 123},
525 {'Role': 'enrollee', 'Type': 'foo'},
526 {'Role': 'enrollee', 'Type': 'pbc',
527 'Bssid': '02:33:44:55:66:77'},
528 {'Role': 'enrollee', 'Type': 'pin', 'Pin': 123},
529 {'Role': 'enrollee', 'Type': 'pbc',
530 'Bssid': dbus.ByteArray('12345')},
531 {'Role': 'enrollee', 'Type': 'pbc',
532 'P2PDeviceAddress': 12345},
533 {'Role': 'enrollee', 'Type': 'pbc',
534 'P2PDeviceAddress': dbus.ByteArray('12345')},
535 {'Role': 'enrollee', 'Type': 'pbc', 'Foo': 'bar'} ]
536 for args in failures:
537 try:
538 wps.Start(args)
539 raise Exception("Invalid WPS.Start() arguments accepted: " + str(args))
bab493b9 540 except dbus.exceptions.DBusException as e:
2ec82e67
JM
541 if not str(e).startswith("fi.w1.wpa_supplicant1.InvalidArgs"):
542 raise Exception("Unexpected error message: " + str(e))
543
0e126c6d
JM
544def test_dbus_wps_oom(dev, apdev):
545 """D-Bus WPS operation (OOM)"""
546 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
547 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
548
549 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_state", "Get"):
550 if_obj.Get(WPAS_DBUS_IFACE, "State",
551 dbus_interface=dbus.PROPERTIES_IFACE)
552
8b8a1864 553 hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
0e126c6d
JM
554 bssid = apdev[0]['bssid']
555 dev[0].scan_for_bss(bssid, freq=2412)
556
1d32bc2c 557 time.sleep(0.05)
0e126c6d
JM
558 for i in range(1, 3):
559 with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_bsss", "Get"):
560 if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
561 dbus_interface=dbus.PROPERTIES_IFACE)
562
563 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
564 dbus_interface=dbus.PROPERTIES_IFACE)
565 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
566 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_bss_rates", "Get"):
567 bss_obj.Get(WPAS_DBUS_BSS, "Rates",
568 dbus_interface=dbus.PROPERTIES_IFACE)
20c7d26f
JM
569 with alloc_fail(dev[0], 1,
570 "wpa_bss_get_bit_rates;wpas_dbus_getter_bss_rates"):
571 try:
572 bss_obj.Get(WPAS_DBUS_BSS, "Rates",
573 dbus_interface=dbus.PROPERTIES_IFACE)
bab493b9 574 except dbus.exceptions.DBusException as e:
20c7d26f 575 pass
0e126c6d
JM
576
577 id = dev[0].add_network()
578 dev[0].set_network(id, "disabled", "0")
579 dev[0].set_network_quoted(id, "ssid", "test")
580
581 for i in range(1, 3):
582 with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_networks", "Get"):
583 if_obj.Get(WPAS_DBUS_IFACE, "Networks",
584 dbus_interface=dbus.PROPERTIES_IFACE)
585
586 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_getter_interfaces", "Get"):
587 dbus_get(dbus, wpas_obj, "Interfaces")
588
589 for i in range(1, 6):
590 with alloc_fail_dbus(dev[0], i, "=eap_get_names_as_string_array;wpas_dbus_getter_eap_methods", "Get"):
591 dbus_get(dbus, wpas_obj, "EapMethods")
592
593 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_setter_config_methods", "Set",
594 expected="Error.Failed: Failed to set property"):
595 val2 = "push_button display"
596 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2,
597 dbus_interface=dbus.PROPERTIES_IFACE)
598
599 with alloc_fail_dbus(dev[0], 1, "=wpa_config_add_network;wpas_dbus_handler_wps_start",
600 "WPS.Start",
601 expected="UnknownError: WPS start failed"):
602 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670'})
603
2ec82e67
JM
604def test_dbus_wps_pbc(dev, apdev):
605 """D-Bus WPS/PBC operation and signals"""
606 try:
81e787b7 607 _test_dbus_wps_pbc(dev, apdev)
2ec82e67
JM
608 finally:
609 dev[0].request("SET wps_cred_processing 0")
610
611def _test_dbus_wps_pbc(dev, apdev):
910eb5b5 612 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
613 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
614
615 hapd = start_ap(apdev[0])
616 hapd.request("WPS_PBC")
617 bssid = apdev[0]['bssid']
618 dev[0].scan_for_bss(bssid, freq="2412")
619 dev[0].request("SET wps_cred_processing 2")
620
1d0a917f
JM
621 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
622 dbus_interface=dbus.PROPERTIES_IFACE)
623 if len(res) != 1:
624 raise Exception("Missing BSSs entry: " + str(res))
625 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
626 props = bss_obj.GetAll(WPAS_DBUS_BSS, dbus_interface=dbus.PROPERTIES_IFACE)
627 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS, res[0], str(props)))
628 if 'WPS' not in props:
629 raise Exception("No WPS information in the BSS entry")
630 if 'Type' not in props['WPS']:
631 raise Exception("No Type field in the WPS dictionary")
632 if props['WPS']['Type'] != 'pbc':
633 raise Exception("Unexpected WPS Type: " + props['WPS']['Type'])
634
2ec82e67
JM
635 class TestDbusWps(TestDbus):
636 def __init__(self, bus, wps):
637 TestDbus.__init__(self, bus)
638 self.success_seen = False
639 self.credentials_received = False
640 self.wps = wps
641
642 def __enter__(self):
643 gobject.timeout_add(1, self.start_pbc)
644 gobject.timeout_add(15000, self.timeout)
645 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
646 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
647 "Credentials")
648 self.loop.run()
649 return self
650
651 def wpsEvent(self, name, args):
652 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
653 if name == "success":
654 self.success_seen = True
655 if self.credentials_received:
656 self.loop.quit()
657
658 def credentials(self, args):
659 logger.debug("credentials: " + str(args))
660 self.credentials_received = True
661 if self.success_seen:
662 self.loop.quit()
663
664 def start_pbc(self, *args):
665 logger.debug("start_pbc")
666 self.wps.Start({'Role': 'enrollee', 'Type': 'pbc'})
667 return False
668
669 def success(self):
670 return self.success_seen and self.credentials_received
671
672 with TestDbusWps(bus, wps) as t:
673 if not t.success():
674 raise Exception("Failure in D-Bus operations")
675
676 dev[0].wait_connected(timeout=10)
677 dev[0].request("DISCONNECT")
678 hapd.disable()
679 dev[0].flush_scan_cache()
680
3a59cda1
JM
681def test_dbus_wps_pbc_overlap(dev, apdev):
682 """D-Bus WPS/PBC operation and signal for PBC overlap"""
683 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
684 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
685
686 hapd = start_ap(apdev[0])
687 hapd2 = start_ap(apdev[1], ssid="test-wps2",
688 ap_uuid="27ea801a-9e5c-4e73-bd82-f89cbcd10d7f")
689 hapd.request("WPS_PBC")
690 hapd2.request("WPS_PBC")
691 bssid = apdev[0]['bssid']
692 dev[0].scan_for_bss(bssid, freq="2412")
693 bssid2 = apdev[1]['bssid']
694 dev[0].scan_for_bss(bssid2, freq="2412")
695
696 class TestDbusWps(TestDbus):
697 def __init__(self, bus, wps):
698 TestDbus.__init__(self, bus)
699 self.overlap_seen = False
700 self.wps = wps
701
702 def __enter__(self):
703 gobject.timeout_add(1, self.start_pbc)
704 gobject.timeout_add(15000, self.timeout)
705 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
706 self.loop.run()
707 return self
708
709 def wpsEvent(self, name, args):
710 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
711 if name == "pbc-overlap":
712 self.overlap_seen = True
713 self.loop.quit()
714
715 def start_pbc(self, *args):
716 logger.debug("start_pbc")
717 self.wps.Start({'Role': 'enrollee', 'Type': 'pbc'})
718 return False
719
720 def success(self):
721 return self.overlap_seen
722
723 with TestDbusWps(bus, wps) as t:
724 if not t.success():
725 raise Exception("Failure in D-Bus operations")
726
727 dev[0].request("WPS_CANCEL")
728 dev[0].request("DISCONNECT")
729 hapd.disable()
730 dev[0].flush_scan_cache()
731
2ec82e67
JM
732def test_dbus_wps_pin(dev, apdev):
733 """D-Bus WPS/PIN operation and signals"""
734 try:
81e787b7 735 _test_dbus_wps_pin(dev, apdev)
2ec82e67
JM
736 finally:
737 dev[0].request("SET wps_cred_processing 0")
738
739def _test_dbus_wps_pin(dev, apdev):
910eb5b5 740 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
741 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
742
743 hapd = start_ap(apdev[0])
744 hapd.request("WPS_PIN any 12345670")
745 bssid = apdev[0]['bssid']
746 dev[0].scan_for_bss(bssid, freq="2412")
747 dev[0].request("SET wps_cred_processing 2")
748
749 class TestDbusWps(TestDbus):
750 def __init__(self, bus):
751 TestDbus.__init__(self, bus)
752 self.success_seen = False
753 self.credentials_received = False
754
755 def __enter__(self):
756 gobject.timeout_add(1, self.start_pin)
757 gobject.timeout_add(15000, self.timeout)
758 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
759 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
760 "Credentials")
761 self.loop.run()
762 return self
763
764 def wpsEvent(self, name, args):
765 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
766 if name == "success":
767 self.success_seen = True
768 if self.credentials_received:
769 self.loop.quit()
770
771 def credentials(self, args):
772 logger.debug("credentials: " + str(args))
773 self.credentials_received = True
774 if self.success_seen:
775 self.loop.quit()
776
777 def start_pin(self, *args):
778 logger.debug("start_pin")
779 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
780 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
781 'Bssid': bssid_ay})
782 return False
783
784 def success(self):
785 return self.success_seen and self.credentials_received
786
787 with TestDbusWps(bus) as t:
788 if not t.success():
789 raise Exception("Failure in D-Bus operations")
790
791 dev[0].wait_connected(timeout=10)
792
793def test_dbus_wps_pin2(dev, apdev):
794 """D-Bus WPS/PIN operation and signals (PIN from wpa_supplicant)"""
795 try:
81e787b7 796 _test_dbus_wps_pin2(dev, apdev)
2ec82e67
JM
797 finally:
798 dev[0].request("SET wps_cred_processing 0")
799
800def _test_dbus_wps_pin2(dev, apdev):
910eb5b5 801 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
802 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
803
804 hapd = start_ap(apdev[0])
805 bssid = apdev[0]['bssid']
806 dev[0].scan_for_bss(bssid, freq="2412")
807 dev[0].request("SET wps_cred_processing 2")
808
809 class TestDbusWps(TestDbus):
810 def __init__(self, bus):
811 TestDbus.__init__(self, bus)
812 self.success_seen = False
813 self.failed = False
814
815 def __enter__(self):
816 gobject.timeout_add(1, self.start_pin)
817 gobject.timeout_add(15000, self.timeout)
818 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
819 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
820 "Credentials")
821 self.loop.run()
822 return self
823
824 def wpsEvent(self, name, args):
825 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
826 if name == "success":
827 self.success_seen = True
828 if self.credentials_received:
829 self.loop.quit()
830
831 def credentials(self, args):
832 logger.debug("credentials: " + str(args))
833 self.credentials_received = True
834 if self.success_seen:
835 self.loop.quit()
836
837 def start_pin(self, *args):
838 logger.debug("start_pin")
839 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
840 res = wps.Start({'Role': 'enrollee', 'Type': 'pin',
841 'Bssid': bssid_ay})
842 pin = res['Pin']
843 h = hostapd.Hostapd(apdev[0]['ifname'])
844 h.request("WPS_PIN any " + pin)
845 return False
846
847 def success(self):
848 return self.success_seen and self.credentials_received
849
850 with TestDbusWps(bus) as t:
851 if not t.success():
852 raise Exception("Failure in D-Bus operations")
853
854 dev[0].wait_connected(timeout=10)
855
856def test_dbus_wps_pin_m2d(dev, apdev):
857 """D-Bus WPS/PIN operation and signals with M2D"""
858 try:
81e787b7 859 _test_dbus_wps_pin_m2d(dev, apdev)
2ec82e67
JM
860 finally:
861 dev[0].request("SET wps_cred_processing 0")
862
863def _test_dbus_wps_pin_m2d(dev, apdev):
910eb5b5 864 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
865 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
866
867 hapd = start_ap(apdev[0])
868 bssid = apdev[0]['bssid']
869 dev[0].scan_for_bss(bssid, freq="2412")
870 dev[0].request("SET wps_cred_processing 2")
871
872 class TestDbusWps(TestDbus):
873 def __init__(self, bus):
874 TestDbus.__init__(self, bus)
875 self.success_seen = False
876 self.credentials_received = False
877
878 def __enter__(self):
879 gobject.timeout_add(1, self.start_pin)
880 gobject.timeout_add(15000, self.timeout)
881 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
882 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
883 "Credentials")
884 self.loop.run()
885 return self
886
887 def wpsEvent(self, name, args):
888 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
889 if name == "success":
890 self.success_seen = True
891 if self.credentials_received:
892 self.loop.quit()
893 elif name == "m2d":
894 h = hostapd.Hostapd(apdev[0]['ifname'])
895 h.request("WPS_PIN any 12345670")
896
897 def credentials(self, args):
898 logger.debug("credentials: " + str(args))
899 self.credentials_received = True
900 if self.success_seen:
901 self.loop.quit()
902
903 def start_pin(self, *args):
904 logger.debug("start_pin")
905 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
906 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
907 'Bssid': bssid_ay})
908 return False
909
910 def success(self):
911 return self.success_seen and self.credentials_received
912
913 with TestDbusWps(bus) as t:
914 if not t.success():
915 raise Exception("Failure in D-Bus operations")
916
917 dev[0].wait_connected(timeout=10)
918
919def test_dbus_wps_reg(dev, apdev):
920 """D-Bus WPS/Registrar operation and signals"""
921 try:
81e787b7 922 _test_dbus_wps_reg(dev, apdev)
2ec82e67
JM
923 finally:
924 dev[0].request("SET wps_cred_processing 0")
925
926def _test_dbus_wps_reg(dev, apdev):
910eb5b5 927 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
928 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
929
930 hapd = start_ap(apdev[0])
931 hapd.request("WPS_PIN any 12345670")
932 bssid = apdev[0]['bssid']
933 dev[0].scan_for_bss(bssid, freq="2412")
934 dev[0].request("SET wps_cred_processing 2")
935
936 class TestDbusWps(TestDbus):
937 def __init__(self, bus):
938 TestDbus.__init__(self, bus)
939 self.credentials_received = False
940
941 def __enter__(self):
942 gobject.timeout_add(100, self.start_reg)
943 gobject.timeout_add(15000, self.timeout)
944 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
945 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
946 "Credentials")
947 self.loop.run()
948 return self
949
950 def wpsEvent(self, name, args):
951 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
952
953 def credentials(self, args):
954 logger.debug("credentials: " + str(args))
955 self.credentials_received = True
956 self.loop.quit()
957
958 def start_reg(self, *args):
959 logger.debug("start_reg")
960 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
961 wps.Start({'Role': 'registrar', 'Type': 'pin',
962 'Pin': '12345670', 'Bssid': bssid_ay})
963 return False
964
965 def success(self):
966 return self.credentials_received
967
968 with TestDbusWps(bus) as t:
969 if not t.success():
970 raise Exception("Failure in D-Bus operations")
971
972 dev[0].wait_connected(timeout=10)
973
2a8e3f35
JM
974def test_dbus_wps_cancel(dev, apdev):
975 """D-Bus WPS Cancel operation"""
976 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
977 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
978
979 hapd = start_ap(apdev[0])
980 bssid = apdev[0]['bssid']
981
982 wps.Cancel()
983 dev[0].scan_for_bss(bssid, freq="2412")
984 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
985 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
986 'Bssid': bssid_ay})
987 wps.Cancel()
988 dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 1)
989
2ec82e67
JM
990def test_dbus_scan_invalid(dev, apdev):
991 """D-Bus invalid scan method"""
910eb5b5 992 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
993 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
994
995 tests = [ ({}, "InvalidArgs"),
996 ({'Type': 123}, "InvalidArgs"),
997 ({'Type': 'foo'}, "InvalidArgs"),
998 ({'Type': 'active', 'Foo': 'bar'}, "InvalidArgs"),
999 ({'Type': 'active', 'SSIDs': 'foo'}, "InvalidArgs"),
1000 ({'Type': 'active', 'SSIDs': ['foo']}, "InvalidArgs"),
1001 ({'Type': 'active',
1002 'SSIDs': [ dbus.ByteArray("1"), dbus.ByteArray("2"),
1003 dbus.ByteArray("3"), dbus.ByteArray("4"),
1004 dbus.ByteArray("5"), dbus.ByteArray("6"),
1005 dbus.ByteArray("7"), dbus.ByteArray("8"),
1006 dbus.ByteArray("9"), dbus.ByteArray("10"),
1007 dbus.ByteArray("11"), dbus.ByteArray("12"),
1008 dbus.ByteArray("13"), dbus.ByteArray("14"),
1009 dbus.ByteArray("15"), dbus.ByteArray("16"),
1010 dbus.ByteArray("17") ]},
1011 "InvalidArgs"),
1012 ({'Type': 'active',
1013 'SSIDs': [ dbus.ByteArray("1234567890abcdef1234567890abcdef1") ]},
1014 "InvalidArgs"),
1015 ({'Type': 'active', 'IEs': 'foo'}, "InvalidArgs"),
1016 ({'Type': 'active', 'IEs': ['foo']}, "InvalidArgs"),
1017 ({'Type': 'active', 'Channels': 2412 }, "InvalidArgs"),
1018 ({'Type': 'active', 'Channels': [ 2412 ] }, "InvalidArgs"),
1019 ({'Type': 'active',
1020 'Channels': [ (dbus.Int32(2412), dbus.UInt32(20)) ] },
1021 "InvalidArgs"),
1022 ({'Type': 'active',
1023 'Channels': [ (dbus.UInt32(2412), dbus.Int32(20)) ] },
1024 "InvalidArgs"),
1025 ({'Type': 'active', 'AllowRoam': "yes" }, "InvalidArgs"),
1026 ({'Type': 'passive', 'IEs': [ dbus.ByteArray("\xdd\x00") ]},
1027 "InvalidArgs"),
1028 ({'Type': 'passive', 'SSIDs': [ dbus.ByteArray("foo") ]},
1029 "InvalidArgs")]
1030 for (t,err) in tests:
1031 try:
1032 iface.Scan(t)
1033 raise Exception("Invalid Scan() arguments accepted: " + str(t))
bab493b9 1034 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1035 if err not in str(e):
1036 raise Exception("Unexpected error message for invalid Scan(%s): %s" % (str(t), str(e)))
1037
0e126c6d
JM
1038def test_dbus_scan_oom(dev, apdev):
1039 """D-Bus scan method and OOM"""
1040 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1041 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1042
1043 with alloc_fail_dbus(dev[0], 1,
1044 "wpa_scan_clone_params;wpas_dbus_handler_scan",
1045 "Scan", expected="ScanError: Scan request rejected"):
1046 iface.Scan({ 'Type': 'passive',
1047 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
1048
1049 with alloc_fail_dbus(dev[0], 1,
1050 "=wpas_dbus_get_scan_channels;wpas_dbus_handler_scan",
1051 "Scan"):
1052 iface.Scan({ 'Type': 'passive',
1053 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
1054
1055 with alloc_fail_dbus(dev[0], 1,
1056 "=wpas_dbus_get_scan_ies;wpas_dbus_handler_scan",
1057 "Scan"):
1058 iface.Scan({ 'Type': 'active',
1059 'IEs': [ dbus.ByteArray("\xdd\x00") ],
1060 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
1061
1062 with alloc_fail_dbus(dev[0], 1,
1063 "=wpas_dbus_get_scan_ssids;wpas_dbus_handler_scan",
1064 "Scan"):
1065 iface.Scan({ 'Type': 'active',
1066 'SSIDs': [ dbus.ByteArray("open"),
1067 dbus.ByteArray() ],
1068 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
1069
2ec82e67
JM
1070def test_dbus_scan(dev, apdev):
1071 """D-Bus scan and related signals"""
910eb5b5 1072 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1073 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1074
8b8a1864 1075 hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
2ec82e67
JM
1076
1077 class TestDbusScan(TestDbus):
1078 def __init__(self, bus):
1079 TestDbus.__init__(self, bus)
1080 self.scan_completed = 0
1081 self.bss_added = False
1d0a917f 1082 self.fail_reason = None
2ec82e67
JM
1083
1084 def __enter__(self):
1085 gobject.timeout_add(1, self.run_scan)
1086 gobject.timeout_add(15000, self.timeout)
1087 self.add_signal(self.scanDone, WPAS_DBUS_IFACE, "ScanDone")
1088 self.add_signal(self.bssAdded, WPAS_DBUS_IFACE, "BSSAdded")
1089 self.add_signal(self.bssRemoved, WPAS_DBUS_IFACE, "BSSRemoved")
1090 self.loop.run()
1091 return self
1092
1093 def scanDone(self, success):
1094 logger.debug("scanDone: success=%s" % success)
1095 self.scan_completed += 1
1096 if self.scan_completed == 1:
1097 iface.Scan({'Type': 'passive',
1098 'AllowRoam': True,
1099 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1100 elif self.scan_completed == 2:
1101 iface.Scan({'Type': 'passive',
1102 'AllowRoam': False})
1103 elif self.bss_added and self.scan_completed == 3:
1104 self.loop.quit()
1105
1106 def bssAdded(self, bss, properties):
1107 logger.debug("bssAdded: %s" % bss)
1108 logger.debug(str(properties))
1d0a917f
JM
1109 if 'WPS' in properties:
1110 if 'Type' in properties['WPS']:
1111 self.fail_reason = "Unexpected WPS dictionary entry in non-WPS BSS"
1112 self.loop.quit()
2ec82e67
JM
1113 self.bss_added = True
1114 if self.scan_completed == 3:
1115 self.loop.quit()
1116
1117 def bssRemoved(self, bss):
1118 logger.debug("bssRemoved: %s" % bss)
1119
1120 def run_scan(self, *args):
1121 logger.debug("run_scan")
1122 iface.Scan({'Type': 'active',
1123 'SSIDs': [ dbus.ByteArray("open"),
1124 dbus.ByteArray() ],
1125 'IEs': [ dbus.ByteArray("\xdd\x00"),
1126 dbus.ByteArray() ],
1127 'AllowRoam': False,
1128 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1129 return False
1130
1131 def success(self):
1132 return self.scan_completed == 3 and self.bss_added
1133
1134 with TestDbusScan(bus) as t:
1d0a917f
JM
1135 if t.fail_reason:
1136 raise Exception(t.fail_reason)
2ec82e67
JM
1137 if not t.success():
1138 raise Exception("Expected signals not seen")
1139
1140 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
1141 dbus_interface=dbus.PROPERTIES_IFACE)
1142 if len(res) < 1:
1143 raise Exception("Scan result not in BSSs property")
1144 iface.FlushBSS(0)
1145 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
1146 dbus_interface=dbus.PROPERTIES_IFACE)
1147 if len(res) != 0:
1148 raise Exception("FlushBSS() did not remove scan results from BSSs property")
1149 iface.FlushBSS(1)
1150
1151def test_dbus_scan_busy(dev, apdev):
1152 """D-Bus scan trigger rejection when busy with previous scan"""
910eb5b5 1153 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1154 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1155
1156 if "OK" not in dev[0].request("SCAN freq=2412-2462"):
1157 raise Exception("Failed to start scan")
1158 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-STARTED"], 15)
1159 if ev is None:
1160 raise Exception("Scan start timed out")
1161
1162 try:
1163 iface.Scan({'Type': 'active', 'AllowRoam': False})
1164 raise Exception("Scan() accepted when busy")
bab493b9 1165 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1166 if "ScanError: Scan request reject" not in str(e):
1167 raise Exception("Unexpected error message: " + str(e))
1168
1169 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
1170 if ev is None:
1171 raise Exception("Scan timed out")
1172
0238e8ba
JM
1173def test_dbus_scan_abort(dev, apdev):
1174 """D-Bus scan trigger and abort"""
1175 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1176 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1177
1178 iface.Scan({'Type': 'active', 'AllowRoam': False})
1179 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-STARTED"], 15)
1180 if ev is None:
1181 raise Exception("Scan start timed out")
1182
1183 iface.AbortScan()
f41f04d0
JM
1184 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
1185 if ev is None:
1186 raise Exception("Scan abort result timed out")
1187 dev[0].dump_monitor()
0238e8ba
JM
1188 iface.Scan({'Type': 'active', 'AllowRoam': False})
1189 iface.AbortScan()
1190
1191 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
1192 if ev is None:
1193 raise Exception("Scan timed out")
1194
2ec82e67
JM
1195def test_dbus_connect(dev, apdev):
1196 """D-Bus AddNetwork and connect"""
910eb5b5 1197 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1198 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1199
1200 ssid = "test-wpa2-psk"
1201 passphrase = 'qwertyuiop'
1202 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
8b8a1864 1203 hapd = hostapd.add_ap(apdev[0], params)
2ec82e67
JM
1204
1205 class TestDbusConnect(TestDbus):
1206 def __init__(self, bus):
1207 TestDbus.__init__(self, bus)
1208 self.network_added = False
1209 self.network_selected = False
1210 self.network_removed = False
1211 self.state = 0
1212
1213 def __enter__(self):
1214 gobject.timeout_add(1, self.run_connect)
1215 gobject.timeout_add(15000, self.timeout)
1216 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
1217 self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE,
1218 "NetworkRemoved")
1219 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
1220 "NetworkSelected")
1221 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1222 "PropertiesChanged")
1223 self.loop.run()
1224 return self
1225
1226 def networkAdded(self, network, properties):
1227 logger.debug("networkAdded: %s" % str(network))
1228 logger.debug(str(properties))
1229 self.network_added = True
1230
1231 def networkRemoved(self, network):
1232 logger.debug("networkRemoved: %s" % str(network))
1233 self.network_removed = True
1234
1235 def networkSelected(self, network):
1236 logger.debug("networkSelected: %s" % str(network))
1237 self.network_selected = True
1238
1239 def propertiesChanged(self, properties):
1240 logger.debug("propertiesChanged: %s" % str(properties))
1241 if 'State' in properties and properties['State'] == "completed":
1242 if self.state == 0:
1243 self.state = 1
1244 iface.Disconnect()
1245 elif self.state == 2:
1246 self.state = 3
1247 iface.Disconnect()
1248 elif self.state == 4:
1249 self.state = 5
1250 iface.Reattach()
1251 elif self.state == 5:
1252 self.state = 6
13b34972
JM
1253 iface.Disconnect()
1254 elif self.state == 7:
1255 self.state = 8
2ec82e67
JM
1256 res = iface.SignalPoll()
1257 logger.debug("SignalPoll: " + str(res))
1258 if 'frequency' not in res or res['frequency'] != 2412:
1259 self.state = -1
1260 logger.info("Unexpected SignalPoll result")
1261 iface.RemoveNetwork(self.netw)
1262 if 'State' in properties and properties['State'] == "disconnected":
1263 if self.state == 1:
1264 self.state = 2
1265 iface.SelectNetwork(self.netw)
1266 elif self.state == 3:
1267 self.state = 4
1268 iface.Reassociate()
1269 elif self.state == 6:
1270 self.state = 7
13b34972
JM
1271 iface.Reconnect()
1272 elif self.state == 8:
1273 self.state = 9
2ec82e67
JM
1274 self.loop.quit()
1275
1276 def run_connect(self, *args):
1277 logger.debug("run_connect")
1278 args = dbus.Dictionary({ 'ssid': ssid,
1279 'key_mgmt': 'WPA-PSK',
1280 'psk': passphrase,
1281 'scan_freq': 2412 },
1282 signature='sv')
1283 self.netw = iface.AddNetwork(args)
1284 iface.SelectNetwork(self.netw)
1285 return False
1286
1287 def success(self):
1288 if not self.network_added or \
1289 not self.network_removed or \
1290 not self.network_selected:
1291 return False
13b34972 1292 return self.state == 9
2ec82e67
JM
1293
1294 with TestDbusConnect(bus) as t:
1295 if not t.success():
1296 raise Exception("Expected signals not seen")
1297
53f4ed68
JM
1298def test_dbus_connect_psk_mem(dev, apdev):
1299 """D-Bus AddNetwork and connect with memory-only PSK"""
1300 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1301 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1302
1303 ssid = "test-wpa2-psk"
1304 passphrase = 'qwertyuiop'
1305 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
8b8a1864 1306 hapd = hostapd.add_ap(apdev[0], params)
53f4ed68
JM
1307
1308 class TestDbusConnect(TestDbus):
1309 def __init__(self, bus):
1310 TestDbus.__init__(self, bus)
1311 self.connected = False
1312
1313 def __enter__(self):
1314 gobject.timeout_add(1, self.run_connect)
1315 gobject.timeout_add(15000, self.timeout)
1316 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1317 "PropertiesChanged")
1318 self.add_signal(self.networkRequest, WPAS_DBUS_IFACE,
1319 "NetworkRequest")
1320 self.loop.run()
1321 return self
1322
1323 def propertiesChanged(self, properties):
1324 logger.debug("propertiesChanged: %s" % str(properties))
1325 if 'State' in properties and properties['State'] == "completed":
1326 self.connected = True
1327 self.loop.quit()
1328
1329 def networkRequest(self, path, field, txt):
1330 logger.debug("networkRequest: %s %s %s" % (path, field, txt))
1331 if field == "PSK_PASSPHRASE":
1332 iface.NetworkReply(path, field, '"' + passphrase + '"')
1333
1334 def run_connect(self, *args):
1335 logger.debug("run_connect")
1336 args = dbus.Dictionary({ 'ssid': ssid,
1337 'key_mgmt': 'WPA-PSK',
1338 'mem_only_psk': 1,
1339 'scan_freq': 2412 },
1340 signature='sv')
1341 self.netw = iface.AddNetwork(args)
1342 iface.SelectNetwork(self.netw)
1343 return False
1344
1345 def success(self):
1346 return self.connected
1347
1348 with TestDbusConnect(bus) as t:
1349 if not t.success():
1350 raise Exception("Expected signals not seen")
1351
0e126c6d
JM
1352def test_dbus_connect_oom(dev, apdev):
1353 """D-Bus AddNetwork and connect when out-of-memory"""
1354 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1355 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1356
1357 if "OK" not in dev[0].request("TEST_ALLOC_FAIL 0:"):
1358 raise HwsimSkip("TEST_ALLOC_FAIL not supported in the build")
1359
1360 ssid = "test-wpa2-psk"
1361 passphrase = 'qwertyuiop'
1362 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
8b8a1864 1363 hapd = hostapd.add_ap(apdev[0], params)
0e126c6d
JM
1364
1365 class TestDbusConnect(TestDbus):
1366 def __init__(self, bus):
1367 TestDbus.__init__(self, bus)
1368 self.network_added = False
1369 self.network_selected = False
1370 self.network_removed = False
1371 self.state = 0
1372
1373 def __enter__(self):
1374 gobject.timeout_add(1, self.run_connect)
1375 gobject.timeout_add(1500, self.timeout)
1376 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
1377 self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE,
1378 "NetworkRemoved")
1379 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
1380 "NetworkSelected")
1381 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1382 "PropertiesChanged")
1383 self.loop.run()
1384 return self
1385
1386 def networkAdded(self, network, properties):
1387 logger.debug("networkAdded: %s" % str(network))
1388 logger.debug(str(properties))
1389 self.network_added = True
1390
1391 def networkRemoved(self, network):
1392 logger.debug("networkRemoved: %s" % str(network))
1393 self.network_removed = True
1394
1395 def networkSelected(self, network):
1396 logger.debug("networkSelected: %s" % str(network))
1397 self.network_selected = True
1398
1399 def propertiesChanged(self, properties):
1400 logger.debug("propertiesChanged: %s" % str(properties))
1401 if 'State' in properties and properties['State'] == "completed":
1402 if self.state == 0:
1403 self.state = 1
1404 iface.Disconnect()
1405 elif self.state == 2:
1406 self.state = 3
1407 iface.Disconnect()
1408 elif self.state == 4:
1409 self.state = 5
1410 iface.Reattach()
1411 elif self.state == 5:
1412 self.state = 6
1413 res = iface.SignalPoll()
1414 logger.debug("SignalPoll: " + str(res))
1415 if 'frequency' not in res or res['frequency'] != 2412:
1416 self.state = -1
1417 logger.info("Unexpected SignalPoll result")
1418 iface.RemoveNetwork(self.netw)
1419 if 'State' in properties and properties['State'] == "disconnected":
1420 if self.state == 1:
1421 self.state = 2
1422 iface.SelectNetwork(self.netw)
1423 elif self.state == 3:
1424 self.state = 4
1425 iface.Reassociate()
1426 elif self.state == 6:
1427 self.state = 7
1428 self.loop.quit()
1429
1430 def run_connect(self, *args):
1431 logger.debug("run_connect")
1432 args = dbus.Dictionary({ 'ssid': ssid,
1433 'key_mgmt': 'WPA-PSK',
1434 'psk': passphrase,
1435 'scan_freq': 2412 },
1436 signature='sv')
1437 try:
1438 self.netw = iface.AddNetwork(args)
bab493b9 1439 except Exception as e:
0e126c6d
JM
1440 logger.info("Exception on AddNetwork: " + str(e))
1441 self.loop.quit()
1442 return False
1443 try:
1444 iface.SelectNetwork(self.netw)
bab493b9 1445 except Exception as e:
0e126c6d
JM
1446 logger.info("Exception on SelectNetwork: " + str(e))
1447 self.loop.quit()
1448
1449 return False
1450
1451 def success(self):
1452 if not self.network_added or \
1453 not self.network_removed or \
1454 not self.network_selected:
1455 return False
1456 return self.state == 7
1457
1458 count = 0
1459 for i in range(1, 1000):
1460 for j in range(3):
1461 dev[j].dump_monitor()
1462 dev[0].request("TEST_ALLOC_FAIL %d:main" % i)
1463 try:
1464 with TestDbusConnect(bus) as t:
1465 if not t.success():
1466 logger.info("Iteration %d - Expected signals not seen" % i)
1467 else:
1468 logger.info("Iteration %d - success" % i)
1469
1470 state = dev[0].request('GET_ALLOC_FAIL')
1471 logger.info("GET_ALLOC_FAIL: " + state)
1472 dev[0].dump_monitor()
1473 dev[0].request("TEST_ALLOC_FAIL 0:")
1474 if i < 3:
1475 raise Exception("Connection succeeded during out-of-memory")
1476 if not state.startswith('0:'):
1477 count += 1
1478 if count == 5:
1479 break
1480 except:
1481 pass
2ec82e67 1482
89a79ca2
JM
1483 # Force regulatory update to re-fetch hw capabilities for the following
1484 # test cases.
1485 try:
1486 dev[0].dump_monitor()
1487 subprocess.call(['iw', 'reg', 'set', 'US'])
1488 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1489 finally:
1490 dev[0].dump_monitor()
1491 subprocess.call(['iw', 'reg', 'set', '00'])
1492 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1493
2ec82e67
JM
1494def test_dbus_while_not_connected(dev, apdev):
1495 """D-Bus invalid operations while not connected"""
910eb5b5 1496 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1497 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1498
1499 try:
1500 iface.Disconnect()
1501 raise Exception("Disconnect() accepted when not connected")
bab493b9 1502 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1503 if "NotConnected" not in str(e):
1504 raise Exception("Unexpected error message for invalid Disconnect: " + str(e))
1505
1506 try:
1507 iface.Reattach()
1508 raise Exception("Reattach() accepted when not connected")
bab493b9 1509 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1510 if "NotConnected" not in str(e):
1511 raise Exception("Unexpected error message for invalid Reattach: " + str(e))
1512
1513def test_dbus_connect_eap(dev, apdev):
1514 """D-Bus AddNetwork and connect to EAP network"""
089e7ca3 1515 check_altsubject_match_support(dev[0])
910eb5b5 1516 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1517 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1518
1519 ssid = "ieee8021x-open"
1520 params = hostapd.radius_params()
1521 params["ssid"] = ssid
1522 params["ieee8021x"] = "1"
8b8a1864 1523 hapd = hostapd.add_ap(apdev[0], params)
2ec82e67
JM
1524
1525 class TestDbusConnect(TestDbus):
1526 def __init__(self, bus):
1527 TestDbus.__init__(self, bus)
1528 self.certification_received = False
1529 self.eap_status = False
1530 self.state = 0
1531
1532 def __enter__(self):
1533 gobject.timeout_add(1, self.run_connect)
1534 gobject.timeout_add(15000, self.timeout)
1535 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1536 "PropertiesChanged")
1537 self.add_signal(self.certification, WPAS_DBUS_IFACE,
2099fed4 1538 "Certification", byte_arrays=True)
2ec82e67
JM
1539 self.add_signal(self.networkRequest, WPAS_DBUS_IFACE,
1540 "NetworkRequest")
1541 self.add_signal(self.eap, WPAS_DBUS_IFACE, "EAP")
1542 self.loop.run()
1543 return self
1544
1545 def propertiesChanged(self, properties):
1546 logger.debug("propertiesChanged: %s" % str(properties))
1547 if 'State' in properties and properties['State'] == "completed":
1548 if self.state == 0:
1549 self.state = 1
1550 iface.EAPLogoff()
2099fed4
JM
1551 logger.info("Set dNSName constraint")
1552 net_obj = bus.get_object(WPAS_DBUS_SERVICE, self.netw)
1553 args = dbus.Dictionary({ 'altsubject_match':
1554 self.server_dnsname },
1555 signature='sv')
1556 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1557 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67
JM
1558 elif self.state == 2:
1559 self.state = 3
2099fed4
JM
1560 iface.Disconnect()
1561 logger.info("Set non-matching dNSName constraint")
1562 net_obj = bus.get_object(WPAS_DBUS_SERVICE, self.netw)
1563 args = dbus.Dictionary({ 'altsubject_match':
1564 self.server_dnsname + "FOO" },
1565 signature='sv')
1566 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1567 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67
JM
1568 if 'State' in properties and properties['State'] == "disconnected":
1569 if self.state == 1:
1570 self.state = 2
1571 iface.EAPLogon()
1572 iface.SelectNetwork(self.netw)
2099fed4
JM
1573 if self.state == 3:
1574 self.state = 4
1575 iface.SelectNetwork(self.netw)
2ec82e67
JM
1576
1577 def certification(self, args):
1578 logger.debug("certification: %s" % str(args))
1579 self.certification_received = True
2099fed4
JM
1580 if args['depth'] == 0:
1581 # The test server certificate is supposed to have dNSName
1582 if len(args['altsubject']) < 1:
1583 raise Exception("Missing dNSName")
1584 dnsname = args['altsubject'][0]
1585 if not dnsname.startswith("DNS:"):
1586 raise Exception("Expected dNSName not found: " + dnsname)
1587 logger.info("altsubject: " + dnsname)
1588 self.server_dnsname = dnsname
2ec82e67
JM
1589
1590 def eap(self, status, parameter):
1591 logger.debug("EAP: status=%s parameter=%s" % (status, parameter))
1592 if status == 'completion' and parameter == 'success':
1593 self.eap_status = True
2099fed4
JM
1594 if self.state == 4 and status == 'remote certificate verification' and parameter == 'AltSubject mismatch':
1595 self.state = 5
1596 self.loop.quit()
2ec82e67
JM
1597
1598 def networkRequest(self, path, field, txt):
1599 logger.debug("networkRequest: %s %s %s" % (path, field, txt))
1600 if field == "PASSWORD":
1601 iface.NetworkReply(path, field, "password")
1602
1603 def run_connect(self, *args):
1604 logger.debug("run_connect")
1605 args = dbus.Dictionary({ 'ssid': ssid,
1606 'key_mgmt': 'IEEE8021X',
1607 'eapol_flags': 0,
1608 'eap': 'TTLS',
1609 'anonymous_identity': 'ttls',
1610 'identity': 'pap user',
1611 'ca_cert': 'auth_serv/ca.pem',
1612 'phase2': 'auth=PAP',
1613 'scan_freq': 2412 },
1614 signature='sv')
1615 self.netw = iface.AddNetwork(args)
1616 iface.SelectNetwork(self.netw)
1617 return False
1618
1619 def success(self):
1620 if not self.eap_status or not self.certification_received:
1621 return False
2099fed4 1622 return self.state == 5
2ec82e67
JM
1623
1624 with TestDbusConnect(bus) as t:
1625 if not t.success():
1626 raise Exception("Expected signals not seen")
1627
1628def test_dbus_network(dev, apdev):
1629 """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
910eb5b5 1630 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1631 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1632
1633 args = dbus.Dictionary({ 'ssid': "foo",
1634 'key_mgmt': 'WPA-PSK',
1635 'psk': "12345678",
1636 'identity': dbus.ByteArray([ 1, 2 ]),
1637 'priority': dbus.Int32(0),
1638 'scan_freq': dbus.UInt32(2412) },
1639 signature='sv')
1640 netw = iface.AddNetwork(args)
5a233fbd
JM
1641 id = int(dev[0].list_networks()[0]['id'])
1642 val = dev[0].get_network(id, "scan_freq")
1643 if val != "2412":
1644 raise Exception("Invalid scan_freq value: " + str(val))
1645 iface.RemoveNetwork(netw)
1646
1647 args = dbus.Dictionary({ 'ssid': "foo",
1648 'key_mgmt': 'NONE',
1649 'scan_freq': "2412 2432",
1650 'freq_list': "2412 2417 2432" },
1651 signature='sv')
1652 netw = iface.AddNetwork(args)
1653 id = int(dev[0].list_networks()[0]['id'])
1654 val = dev[0].get_network(id, "scan_freq")
1655 if val != "2412 2432":
1656 raise Exception("Invalid scan_freq value (2): " + str(val))
1657 val = dev[0].get_network(id, "freq_list")
1658 if val != "2412 2417 2432":
1659 raise Exception("Invalid freq_list value: " + str(val))
2ec82e67
JM
1660 iface.RemoveNetwork(netw)
1661 try:
1662 iface.RemoveNetwork(netw)
1663 raise Exception("Invalid RemoveNetwork() accepted")
bab493b9 1664 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1665 if "NetworkUnknown" not in str(e):
1666 raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e))
1667 try:
1668 iface.SelectNetwork(netw)
1669 raise Exception("Invalid SelectNetwork() accepted")
bab493b9 1670 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1671 if "NetworkUnknown" not in str(e):
1672 raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e))
1673
1674 args = dbus.Dictionary({ 'ssid': "foo1", 'key_mgmt': 'NONE',
1675 'identity': "testuser", 'scan_freq': '2412' },
1676 signature='sv')
1677 netw1 = iface.AddNetwork(args)
1678 args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1679 signature='sv')
1680 netw2 = iface.AddNetwork(args)
1681 res = if_obj.Get(WPAS_DBUS_IFACE, "Networks",
1682 dbus_interface=dbus.PROPERTIES_IFACE)
1683 if len(res) != 2:
1684 raise Exception("Unexpected number of networks")
1685
1686 net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1)
1687 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1688 dbus_interface=dbus.PROPERTIES_IFACE)
1689 if res != False:
1690 raise Exception("Added network was unexpectedly enabled by default")
1691 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(True),
1692 dbus_interface=dbus.PROPERTIES_IFACE)
1693 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1694 dbus_interface=dbus.PROPERTIES_IFACE)
1695 if res != True:
1696 raise Exception("Set(Enabled,True) did not seem to change property value")
1697 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(False),
1698 dbus_interface=dbus.PROPERTIES_IFACE)
1699 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1700 dbus_interface=dbus.PROPERTIES_IFACE)
1701 if res != False:
1702 raise Exception("Set(Enabled,False) did not seem to change property value")
1703 try:
1704 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.UInt32(1),
1705 dbus_interface=dbus.PROPERTIES_IFACE)
1706 raise Exception("Invalid Set(Enabled,1) accepted")
bab493b9 1707 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1708 if "Error.Failed: wrong property type" not in str(e):
1709 raise Exception("Unexpected error message for invalid Set(Enabled,1): " + str(e))
1710
1711 args = dbus.Dictionary({ 'ssid': "foo1new" }, signature='sv')
1712 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1713 dbus_interface=dbus.PROPERTIES_IFACE)
1714 res = net_obj.Get(WPAS_DBUS_NETWORK, "Properties",
1715 dbus_interface=dbus.PROPERTIES_IFACE)
1716 if res['ssid'] != '"foo1new"':
1717 raise Exception("Set(Properties) failed to update ssid")
1718 if res['identity'] != '"testuser"':
1719 raise Exception("Set(Properties) unexpectedly changed unrelated parameter")
1720
1721 iface.RemoveAllNetworks()
1722 res = if_obj.Get(WPAS_DBUS_IFACE, "Networks",
1723 dbus_interface=dbus.PROPERTIES_IFACE)
1724 if len(res) != 0:
1725 raise Exception("Unexpected number of networks")
1726 iface.RemoveAllNetworks()
1727
1728 tests = [ dbus.Dictionary({ 'psk': "1234567" }, signature='sv'),
1729 dbus.Dictionary({ 'identity': dbus.ByteArray() },
1730 signature='sv'),
1731 dbus.Dictionary({ 'identity': dbus.Byte(1) }, signature='sv'),
1732 dbus.Dictionary({ 'identity': "" }, signature='sv') ]
1733 for args in tests:
1734 try:
1735 iface.AddNetwork(args)
1736 raise Exception("Invalid AddNetwork args accepted: " + str(args))
bab493b9 1737 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1738 if "InvalidArgs" not in str(e):
1739 raise Exception("Unexpected error message for invalid AddNetwork: " + str(e))
1740
0e126c6d
JM
1741def test_dbus_network_oom(dev, apdev):
1742 """D-Bus AddNetwork/RemoveNetwork parameters and OOM error cases"""
1743 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1744 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1745
1746 args = dbus.Dictionary({ 'ssid': "foo1", 'key_mgmt': 'NONE',
1747 'identity': "testuser", 'scan_freq': '2412' },
1748 signature='sv')
1749 netw1 = iface.AddNetwork(args)
1750 net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1)
1751
1752 with alloc_fail_dbus(dev[0], 1,
1753 "wpa_config_get_all;wpas_dbus_getter_network_properties",
1754 "Get"):
1755 net_obj.Get(WPAS_DBUS_NETWORK, "Properties",
1756 dbus_interface=dbus.PROPERTIES_IFACE)
1757
1758 iface.RemoveAllNetworks()
1759
1760 with alloc_fail_dbus(dev[0], 1,
1761 "wpas_dbus_new_decompose_object_path;wpas_dbus_handler_remove_network",
1762 "RemoveNetwork", "InvalidArgs"):
1763 iface.RemoveNetwork(dbus.ObjectPath("/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234"))
1764
1765 with alloc_fail(dev[0], 1, "wpa_dbus_register_object_per_iface;wpas_dbus_register_network"):
1766 args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1767 signature='sv')
1768 try:
1769 netw = iface.AddNetwork(args)
1770 # Currently, AddNetwork() succeeds even if os_strdup() for path
1771 # fails, so remove the network if that occurs.
1772 iface.RemoveNetwork(netw)
bab493b9 1773 except dbus.exceptions.DBusException as e:
0e126c6d
JM
1774 pass
1775
1776 for i in range(1, 3):
1777 with alloc_fail(dev[0], i, "=wpas_dbus_register_network"):
1778 try:
1779 netw = iface.AddNetwork(args)
1780 # Currently, AddNetwork() succeeds even if network registration
1781 # fails, so remove the network if that occurs.
1782 iface.RemoveNetwork(netw)
bab493b9 1783 except dbus.exceptions.DBusException as e:
0e126c6d
JM
1784 pass
1785
1786 with alloc_fail_dbus(dev[0], 1,
1787 "=wpa_config_add_network;wpas_dbus_handler_add_network",
1788 "AddNetwork",
1789 "UnknownError: wpa_supplicant could not add a network"):
1790 args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1791 signature='sv')
1792 netw = iface.AddNetwork(args)
1793
1794 tests = [ (1,
1795 'wpa_dbus_dict_get_entry;set_network_properties;wpas_dbus_handler_add_network',
1796 dbus.Dictionary({ 'ssid': dbus.ByteArray(' ') },
1797 signature='sv')),
1798 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1799 dbus.Dictionary({ 'ssid': 'foo' }, signature='sv')),
1800 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1801 dbus.Dictionary({ 'eap': 'foo' }, signature='sv')),
1802 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1803 dbus.Dictionary({ 'priority': dbus.UInt32(1) },
1804 signature='sv')),
1805 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1806 dbus.Dictionary({ 'priority': dbus.Int32(1) },
1807 signature='sv')),
1808 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1809 dbus.Dictionary({ 'ssid': dbus.ByteArray(' ') },
1810 signature='sv')) ]
1811 for (count,funcs,args) in tests:
1812 with alloc_fail_dbus(dev[0], count, funcs, "AddNetwork", "InvalidArgs"):
1813 netw = iface.AddNetwork(args)
1814
1815 if len(if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
1816 dbus_interface=dbus.PROPERTIES_IFACE)) > 0:
1817 raise Exception("Unexpected network block added")
1818 if len(dev[0].list_networks()) > 0:
1819 raise Exception("Unexpected network block visible")
1820
2ec82e67
JM
1821def test_dbus_interface(dev, apdev):
1822 """D-Bus CreateInterface/GetInterface/RemoveInterface parameters and error cases"""
7966674d
JM
1823 try:
1824 _test_dbus_interface(dev, apdev)
1825 finally:
1826 # Need to force P2P channel list update since the 'lo' interface
1827 # with driver=none ends up configuring default dualband channels.
1828 dev[0].request("SET country US")
1829 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1830 if ev is None:
1831 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"],
1832 timeout=1)
1833 dev[0].request("SET country 00")
1834 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1835 if ev is None:
1836 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"],
1837 timeout=1)
1838 subprocess.call(['iw', 'reg', 'set', '00'])
1839
1840def _test_dbus_interface(dev, apdev):
910eb5b5 1841 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1842 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
1843
1844 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1845 signature='sv')
1846 path = wpas.CreateInterface(params)
1847 logger.debug("New interface path: " + str(path))
1848 path2 = wpas.GetInterface("lo")
1849 if path != path2:
1850 raise Exception("Interface object mismatch")
1851
1852 params = dbus.Dictionary({ 'Ifname': 'lo',
1853 'Driver': 'none',
1854 'ConfigFile': 'foo',
1855 'BridgeIfname': 'foo', },
1856 signature='sv')
1857 try:
1858 wpas.CreateInterface(params)
1859 raise Exception("Invalid CreateInterface() accepted")
bab493b9 1860 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1861 if "InterfaceExists" not in str(e):
1862 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1863
1864 wpas.RemoveInterface(path)
1865 try:
1866 wpas.RemoveInterface(path)
1867 raise Exception("Invalid RemoveInterface() accepted")
bab493b9 1868 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1869 if "InterfaceUnknown" not in str(e):
1870 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e))
1871
1872 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none',
1873 'Foo': 123 },
1874 signature='sv')
1875 try:
1876 wpas.CreateInterface(params)
1877 raise Exception("Invalid CreateInterface() accepted")
bab493b9 1878 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1879 if "InvalidArgs" not in str(e):
1880 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1881
1882 params = dbus.Dictionary({ 'Driver': 'none' }, signature='sv')
1883 try:
1884 wpas.CreateInterface(params)
1885 raise Exception("Invalid CreateInterface() accepted")
bab493b9 1886 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1887 if "InvalidArgs" not in str(e):
1888 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1889
1890 try:
1891 wpas.GetInterface("lo")
1892 raise Exception("Invalid GetInterface() accepted")
bab493b9 1893 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1894 if "InterfaceUnknown" not in str(e):
1895 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e))
1896
0e126c6d
JM
1897def test_dbus_interface_oom(dev, apdev):
1898 """D-Bus CreateInterface/GetInterface/RemoveInterface OOM error cases"""
1899 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1900 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
1901
1902 with alloc_fail_dbus(dev[0], 1, "wpa_dbus_dict_get_entry;wpas_dbus_handler_create_interface", "CreateInterface", "InvalidArgs"):
1903 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1904 signature='sv')
1905 wpas.CreateInterface(params)
1906
1907 for i in range(1, 1000):
1908 dev[0].request("TEST_ALLOC_FAIL %d:wpa_supplicant_add_iface;wpas_dbus_handler_create_interface" % i)
1909 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1910 signature='sv')
1911 try:
1912 npath = wpas.CreateInterface(params)
1913 wpas.RemoveInterface(npath)
1914 logger.info("CreateInterface succeeds after %d allocation failures" % i)
1915 state = dev[0].request('GET_ALLOC_FAIL')
1916 logger.info("GET_ALLOC_FAIL: " + state)
1917 dev[0].dump_monitor()
1918 dev[0].request("TEST_ALLOC_FAIL 0:")
1919 if i < 5:
1920 raise Exception("CreateInterface succeeded during out-of-memory")
1921 if not state.startswith('0:'):
1922 break
bab493b9 1923 except dbus.exceptions.DBusException as e:
0e126c6d
JM
1924 pass
1925
1926 for arg in [ 'Driver', 'Ifname', 'ConfigFile', 'BridgeIfname' ]:
1927 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_handler_create_interface",
1928 "CreateInterface"):
1929 params = dbus.Dictionary({ arg: 'foo' }, signature='sv')
1930 wpas.CreateInterface(params)
1931
2ec82e67
JM
1932def test_dbus_blob(dev, apdev):
1933 """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
910eb5b5 1934 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1935 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1936
1937 blob = dbus.ByteArray("\x01\x02\x03")
1938 iface.AddBlob('blob1', blob)
1939 try:
1940 iface.AddBlob('blob1', dbus.ByteArray("\x01\x02\x04"))
1941 raise Exception("Invalid AddBlob() accepted")
bab493b9 1942 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1943 if "BlobExists" not in str(e):
1944 raise Exception("Unexpected error message for invalid AddBlob: " + str(e))
1945 res = iface.GetBlob('blob1')
1946 if len(res) != len(blob):
1947 raise Exception("Unexpected blob data length")
1948 for i in range(len(res)):
1949 if res[i] != dbus.Byte(blob[i]):
1950 raise Exception("Unexpected blob data")
1951 res = if_obj.Get(WPAS_DBUS_IFACE, "Blobs",
1952 dbus_interface=dbus.PROPERTIES_IFACE)
1953 if 'blob1' not in res:
1954 raise Exception("Added blob missing from Blobs property")
1955 iface.RemoveBlob('blob1')
1956 try:
1957 iface.RemoveBlob('blob1')
1958 raise Exception("Invalid RemoveBlob() accepted")
bab493b9 1959 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1960 if "BlobUnknown" not in str(e):
1961 raise Exception("Unexpected error message for invalid RemoveBlob: " + str(e))
1962 try:
1963 iface.GetBlob('blob1')
1964 raise Exception("Invalid GetBlob() accepted")
bab493b9 1965 except dbus.exceptions.DBusException as e:
2ec82e67
JM
1966 if "BlobUnknown" not in str(e):
1967 raise Exception("Unexpected error message for invalid GetBlob: " + str(e))
1968
1969 class TestDbusBlob(TestDbus):
1970 def __init__(self, bus):
1971 TestDbus.__init__(self, bus)
1972 self.blob_added = False
1973 self.blob_removed = False
1974
1975 def __enter__(self):
1976 gobject.timeout_add(1, self.run_blob)
1977 gobject.timeout_add(15000, self.timeout)
1978 self.add_signal(self.blobAdded, WPAS_DBUS_IFACE, "BlobAdded")
1979 self.add_signal(self.blobRemoved, WPAS_DBUS_IFACE, "BlobRemoved")
1980 self.loop.run()
1981 return self
1982
1983 def blobAdded(self, blobName):
1984 logger.debug("blobAdded: %s" % blobName)
1985 if blobName == 'blob2':
1986 self.blob_added = True
1987
1988 def blobRemoved(self, blobName):
1989 logger.debug("blobRemoved: %s" % blobName)
1990 if blobName == 'blob2':
1991 self.blob_removed = True
1992 self.loop.quit()
1993
1994 def run_blob(self, *args):
1995 logger.debug("run_blob")
1996 iface.AddBlob('blob2', dbus.ByteArray("\x01\x02\x04"))
1997 iface.RemoveBlob('blob2')
1998 return False
1999
2000 def success(self):
2001 return self.blob_added and self.blob_removed
2002
2003 with TestDbusBlob(bus) as t:
2004 if not t.success():
2005 raise Exception("Expected signals not seen")
2006
0e126c6d
JM
2007def test_dbus_blob_oom(dev, apdev):
2008 """D-Bus AddNetwork/RemoveNetwork OOM error cases"""
2009 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2010 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2011
2012 for i in range(1, 4):
2013 with alloc_fail_dbus(dev[0], i, "wpas_dbus_handler_add_blob",
2014 "AddBlob"):
2015 iface.AddBlob('blob_no_mem', dbus.ByteArray("\x01\x02\x03\x04"))
2016
2ec82e67
JM
2017def test_dbus_autoscan(dev, apdev):
2018 """D-Bus Autoscan()"""
910eb5b5 2019 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2020 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2021
2022 iface.AutoScan("foo")
2023 iface.AutoScan("periodic:1")
2024 iface.AutoScan("")
2025 dev[0].request("AUTOSCAN ")
2026
0e126c6d
JM
2027def test_dbus_autoscan_oom(dev, apdev):
2028 """D-Bus Autoscan() OOM"""
2029 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2030 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2031
2032 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_autoscan", "AutoScan"):
2033 iface.AutoScan("foo")
2034 dev[0].request("AUTOSCAN ")
2035
2ec82e67
JM
2036def test_dbus_tdls_invalid(dev, apdev):
2037 """D-Bus invalid TDLS operations"""
910eb5b5 2038 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2039 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2040
8b8a1864 2041 hapd = hostapd.add_ap(apdev[0], { "ssid": "test-open" })
2ec82e67
JM
2042 connect_2sta_open(dev, hapd)
2043 addr1 = dev[1].p2p_interface_addr()
2044
2045 try:
2046 iface.TDLSDiscover("foo")
2047 raise Exception("Invalid TDLSDiscover() accepted")
bab493b9 2048 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2049 if "InvalidArgs" not in str(e):
2050 raise Exception("Unexpected error message for invalid TDLSDiscover: " + str(e))
2051
2052 try:
2053 iface.TDLSStatus("foo")
2054 raise Exception("Invalid TDLSStatus() accepted")
bab493b9 2055 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2056 if "InvalidArgs" not in str(e):
2057 raise Exception("Unexpected error message for invalid TDLSStatus: " + str(e))
2058
2059 res = iface.TDLSStatus(addr1)
2060 if res != "peer does not exist":
2061 raise Exception("Unexpected TDLSStatus response")
2062
2063 try:
2064 iface.TDLSSetup("foo")
2065 raise Exception("Invalid TDLSSetup() accepted")
bab493b9 2066 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2067 if "InvalidArgs" not in str(e):
2068 raise Exception("Unexpected error message for invalid TDLSSetup: " + str(e))
2069
2070 try:
2071 iface.TDLSTeardown("foo")
2072 raise Exception("Invalid TDLSTeardown() accepted")
bab493b9 2073 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2074 if "InvalidArgs" not in str(e):
2075 raise Exception("Unexpected error message for invalid TDLSTeardown: " + str(e))
2076
795b6f57
JM
2077 try:
2078 iface.TDLSTeardown("00:11:22:33:44:55")
2079 raise Exception("TDLSTeardown accepted for unknown peer")
bab493b9 2080 except dbus.exceptions.DBusException as e:
795b6f57
JM
2081 if "UnknownError: error performing TDLS teardown" not in str(e):
2082 raise Exception("Unexpected error message: " + str(e))
2083
82aea622
JM
2084 try:
2085 iface.TDLSChannelSwitch({})
2086 raise Exception("Invalid TDLSChannelSwitch() accepted")
bab493b9 2087 except dbus.exceptions.DBusException as e:
82aea622
JM
2088 if "InvalidArgs" not in str(e):
2089 raise Exception("Unexpected error message for invalid TDLSChannelSwitch: " + str(e))
2090
2091 try:
2092 iface.TDLSCancelChannelSwitch("foo")
2093 raise Exception("Invalid TDLSCancelChannelSwitch() accepted")
bab493b9 2094 except dbus.exceptions.DBusException as e:
82aea622
JM
2095 if "InvalidArgs" not in str(e):
2096 raise Exception("Unexpected error message for invalid TDLSCancelChannelSwitch: " + str(e))
2097
0e126c6d
JM
2098def test_dbus_tdls_oom(dev, apdev):
2099 """D-Bus TDLS operations during OOM"""
2100 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2101 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2102
2103 with alloc_fail_dbus(dev[0], 1, "wpa_tdls_add_peer", "TDLSSetup",
2104 "UnknownError: error performing TDLS setup"):
2105 iface.TDLSSetup("00:11:22:33:44:55")
2106
2ec82e67
JM
2107def test_dbus_tdls(dev, apdev):
2108 """D-Bus TDLS"""
910eb5b5 2109 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2110 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2111
8b8a1864 2112 hapd = hostapd.add_ap(apdev[0], { "ssid": "test-open" })
2ec82e67
JM
2113 connect_2sta_open(dev, hapd)
2114
2115 addr1 = dev[1].p2p_interface_addr()
2116
2117 class TestDbusTdls(TestDbus):
2118 def __init__(self, bus):
2119 TestDbus.__init__(self, bus)
2120 self.tdls_setup = False
2121 self.tdls_teardown = False
2122
2123 def __enter__(self):
2124 gobject.timeout_add(1, self.run_tdls)
2125 gobject.timeout_add(15000, self.timeout)
2126 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
2127 "PropertiesChanged")
2128 self.loop.run()
2129 return self
2130
2131 def propertiesChanged(self, properties):
2132 logger.debug("propertiesChanged: %s" % str(properties))
2133
2134 def run_tdls(self, *args):
2135 logger.debug("run_tdls")
2136 iface.TDLSDiscover(addr1)
2137 gobject.timeout_add(100, self.run_tdls2)
2138 return False
2139
2140 def run_tdls2(self, *args):
2141 logger.debug("run_tdls2")
2142 iface.TDLSSetup(addr1)
2143 gobject.timeout_add(500, self.run_tdls3)
2144 return False
2145
2146 def run_tdls3(self, *args):
2147 logger.debug("run_tdls3")
2148 res = iface.TDLSStatus(addr1)
2149 if res == "connected":
2150 self.tdls_setup = True
2151 else:
2152 logger.info("Unexpected TDLSStatus: " + res)
2153 iface.TDLSTeardown(addr1)
2154 gobject.timeout_add(200, self.run_tdls4)
2155 return False
2156
2157 def run_tdls4(self, *args):
2158 logger.debug("run_tdls4")
2159 res = iface.TDLSStatus(addr1)
2160 if res == "peer does not exist":
2161 self.tdls_teardown = True
2162 else:
2163 logger.info("Unexpected TDLSStatus: " + res)
2164 self.loop.quit()
2165 return False
2166
2167 def success(self):
2168 return self.tdls_setup and self.tdls_teardown
2169
2170 with TestDbusTdls(bus) as t:
2171 if not t.success():
2172 raise Exception("Expected signals not seen")
2173
82aea622
JM
2174def test_dbus_tdls_channel_switch(dev, apdev):
2175 """D-Bus TDLS channel switch configuration"""
79467d74
JM
2176 flags = int(dev[0].get_driver_status_field('capa.flags'), 16)
2177 if flags & 0x800000000 == 0:
2178 raise HwsimSkip("Driver does not support TDLS channel switching")
2179
82aea622
JM
2180 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2181 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2182
2183 hapd = hostapd.add_ap(apdev[0], { "ssid": "test-open" })
2184 connect_2sta_open(dev, hapd)
2185
2186 addr1 = dev[1].p2p_interface_addr()
2187
2188 class TestDbusTdls(TestDbus):
2189 def __init__(self, bus):
2190 TestDbus.__init__(self, bus)
2191 self.tdls_setup = False
2192 self.tdls_done = False
2193
2194 def __enter__(self):
2195 gobject.timeout_add(1, self.run_tdls)
2196 gobject.timeout_add(15000, self.timeout)
2197 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
2198 "PropertiesChanged")
2199 self.loop.run()
2200 return self
2201
2202 def propertiesChanged(self, properties):
2203 logger.debug("propertiesChanged: %s" % str(properties))
2204
2205 def run_tdls(self, *args):
2206 logger.debug("run_tdls")
2207 iface.TDLSDiscover(addr1)
2208 gobject.timeout_add(100, self.run_tdls2)
2209 return False
2210
2211 def run_tdls2(self, *args):
2212 logger.debug("run_tdls2")
2213 iface.TDLSSetup(addr1)
2214 gobject.timeout_add(500, self.run_tdls3)
2215 return False
2216
2217 def run_tdls3(self, *args):
2218 logger.debug("run_tdls3")
2219 res = iface.TDLSStatus(addr1)
2220 if res == "connected":
2221 self.tdls_setup = True
2222 else:
2223 logger.info("Unexpected TDLSStatus: " + res)
2224
2225 # Unknown dict entry
2226 args = dbus.Dictionary({ 'Foobar': dbus.Byte(1) },
2227 signature='sv')
2228 try:
2229 iface.TDLSChannelSwitch(args)
bab493b9 2230 except Exception as e:
82aea622
JM
2231 if "InvalidArgs" not in str(e):
2232 raise Exception("Unexpected exception")
2233
2234 # Missing OperClass
2235 args = dbus.Dictionary({}, signature='sv')
2236 try:
2237 iface.TDLSChannelSwitch(args)
bab493b9 2238 except Exception as e:
82aea622
JM
2239 if "InvalidArgs" not in str(e):
2240 raise Exception("Unexpected exception")
2241
2242 # Missing Frequency
2243 args = dbus.Dictionary({ 'OperClass': dbus.Byte(1) },
2244 signature='sv')
2245 try:
2246 iface.TDLSChannelSwitch(args)
bab493b9 2247 except Exception as e:
82aea622
JM
2248 if "InvalidArgs" not in str(e):
2249 raise Exception("Unexpected exception")
2250
2251 # Missing PeerAddress
2252 args = dbus.Dictionary({ 'OperClass': dbus.Byte(1),
2253 'Frequency': dbus.UInt32(2417) },
2254 signature='sv')
2255 try:
2256 iface.TDLSChannelSwitch(args)
bab493b9 2257 except Exception as e:
82aea622
JM
2258 if "InvalidArgs" not in str(e):
2259 raise Exception("Unexpected exception")
2260
2261 # Valid parameters
2262 args = dbus.Dictionary({ 'OperClass': dbus.Byte(1),
2263 'Frequency': dbus.UInt32(2417),
2264 'PeerAddress': addr1,
2265 'SecChannelOffset': dbus.UInt32(0),
2266 'CenterFrequency1': dbus.UInt32(0),
2267 'CenterFrequency2': dbus.UInt32(0),
2268 'Bandwidth': dbus.UInt32(20),
2269 'HT': dbus.Boolean(False),
2270 'VHT': dbus.Boolean(False) },
2271 signature='sv')
2272 iface.TDLSChannelSwitch(args)
2273
2274 gobject.timeout_add(200, self.run_tdls4)
2275 return False
2276
2277 def run_tdls4(self, *args):
2278 logger.debug("run_tdls4")
2279 iface.TDLSCancelChannelSwitch(addr1)
2280 self.tdls_done = True
2281 self.loop.quit()
2282 return False
2283
2284 def success(self):
2285 return self.tdls_setup and self.tdls_done
2286
2287 with TestDbusTdls(bus) as t:
2288 if not t.success():
2289 raise Exception("Expected signals not seen")
2290
2ec82e67
JM
2291def test_dbus_pkcs11(dev, apdev):
2292 """D-Bus SetPKCS11EngineAndModulePath()"""
910eb5b5 2293 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2294 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2295
2296 try:
2297 iface.SetPKCS11EngineAndModulePath("foo", "bar")
bab493b9 2298 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2299 if "Error.Failed: Reinit of the EAPOL" not in str(e):
2300 raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e))
2301
2302 try:
2303 iface.SetPKCS11EngineAndModulePath("foo", "")
bab493b9 2304 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2305 if "Error.Failed: Reinit of the EAPOL" not in str(e):
2306 raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e))
2307
2308 iface.SetPKCS11EngineAndModulePath("", "bar")
2309 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath",
2310 dbus_interface=dbus.PROPERTIES_IFACE)
2311 if res != "":
2312 raise Exception("Unexpected PKCS11EnginePath value: " + res)
2313 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath",
2314 dbus_interface=dbus.PROPERTIES_IFACE)
2315 if res != "bar":
2316 raise Exception("Unexpected PKCS11ModulePath value: " + res)
2317
2318 iface.SetPKCS11EngineAndModulePath("", "")
2319 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath",
2320 dbus_interface=dbus.PROPERTIES_IFACE)
2321 if res != "":
2322 raise Exception("Unexpected PKCS11EnginePath value: " + res)
2323 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath",
2324 dbus_interface=dbus.PROPERTIES_IFACE)
2325 if res != "":
2326 raise Exception("Unexpected PKCS11ModulePath value: " + res)
2327
2328def test_dbus_apscan(dev, apdev):
2329 """D-Bus Get/Set ApScan"""
2330 try:
81e787b7 2331 _test_dbus_apscan(dev, apdev)
2ec82e67
JM
2332 finally:
2333 dev[0].request("AP_SCAN 1")
2334
2335def _test_dbus_apscan(dev, apdev):
910eb5b5 2336 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2337
2338 res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan",
2339 dbus_interface=dbus.PROPERTIES_IFACE)
2340 if res != 1:
2341 raise Exception("Unexpected initial ApScan value: %d" % res)
2342
2343 for i in range(3):
2344 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(i),
2345 dbus_interface=dbus.PROPERTIES_IFACE)
2346 res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan",
2347 dbus_interface=dbus.PROPERTIES_IFACE)
2348 if res != i:
2349 raise Exception("Unexpected ApScan value %d (expected %d)" % (res, i))
2350
2351 try:
2352 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.Int16(-1),
2353 dbus_interface=dbus.PROPERTIES_IFACE)
2354 raise Exception("Invalid Set(ApScan,-1) accepted")
bab493b9 2355 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2356 if "Error.Failed: wrong property type" not in str(e):
2357 raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e))
2358
2359 try:
2360 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(123),
2361 dbus_interface=dbus.PROPERTIES_IFACE)
2362 raise Exception("Invalid Set(ApScan,123) accepted")
bab493b9 2363 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2364 if "Error.Failed: ap_scan must be 0, 1, or 2" not in str(e):
2365 raise Exception("Unexpected error message for invalid Set(ApScan,123): " + str(e))
2366
2367 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(1),
2368 dbus_interface=dbus.PROPERTIES_IFACE)
2369
8b67f649
LR
2370def test_dbus_pmf(dev, apdev):
2371 """D-Bus Get/Set Pmf"""
2372 try:
2373 _test_dbus_pmf(dev, apdev)
2374 finally:
2375 dev[0].request("SET pmf 0")
2376
2377def _test_dbus_pmf(dev, apdev):
2378 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2379
2380 dev[0].set("pmf", "0")
2381 res = if_obj.Get(WPAS_DBUS_IFACE, "Pmf",
2382 dbus_interface=dbus.PROPERTIES_IFACE)
2383 if res != "0":
2384 raise Exception("Unexpected initial Pmf value: %s" % res)
2385
2386 for i in range(3):
2387 if_obj.Set(WPAS_DBUS_IFACE, "Pmf", str(i),
2388 dbus_interface=dbus.PROPERTIES_IFACE)
2389 res = if_obj.Get(WPAS_DBUS_IFACE, "Pmf",
2390 dbus_interface=dbus.PROPERTIES_IFACE)
2391 if res != str(i):
2392 raise Exception("Unexpected Pmf value %s (expected %d)" % (res, i))
2393
2394 if_obj.Set(WPAS_DBUS_IFACE, "Pmf", "1",
2395 dbus_interface=dbus.PROPERTIES_IFACE)
2396
2ec82e67
JM
2397def test_dbus_fastreauth(dev, apdev):
2398 """D-Bus Get/Set FastReauth"""
910eb5b5 2399 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2400
2401 res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth",
2402 dbus_interface=dbus.PROPERTIES_IFACE)
2403 if res != True:
2404 raise Exception("Unexpected initial FastReauth value: " + str(res))
2405
2406 for i in [ False, True ]:
2407 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(i),
2408 dbus_interface=dbus.PROPERTIES_IFACE)
2409 res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth",
2410 dbus_interface=dbus.PROPERTIES_IFACE)
2411 if res != i:
2412 raise Exception("Unexpected FastReauth value %d (expected %d)" % (res, i))
2413
2414 try:
2415 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Int16(-1),
2416 dbus_interface=dbus.PROPERTIES_IFACE)
2417 raise Exception("Invalid Set(FastReauth,-1) accepted")
bab493b9 2418 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2419 if "Error.Failed: wrong property type" not in str(e):
2420 raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e))
2421
2422 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(True),
2423 dbus_interface=dbus.PROPERTIES_IFACE)
2424
2425def test_dbus_bss_expire(dev, apdev):
2426 """D-Bus Get/Set BSSExpireAge and BSSExpireCount"""
910eb5b5 2427 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2428
2429 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(179),
2430 dbus_interface=dbus.PROPERTIES_IFACE)
2431 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireAge",
2432 dbus_interface=dbus.PROPERTIES_IFACE)
2433 if res != 179:
2434 raise Exception("Unexpected BSSExpireAge value %d (expected %d)" % (res, i))
2435
2436 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(3),
2437 dbus_interface=dbus.PROPERTIES_IFACE)
2438 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireCount",
2439 dbus_interface=dbus.PROPERTIES_IFACE)
2440 if res != 3:
2441 raise Exception("Unexpected BSSExpireCount value %d (expected %d)" % (res, i))
2442
2443 try:
2444 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.Int16(-1),
2445 dbus_interface=dbus.PROPERTIES_IFACE)
2446 raise Exception("Invalid Set(BSSExpireAge,-1) accepted")
bab493b9 2447 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2448 if "Error.Failed: wrong property type" not in str(e):
2449 raise Exception("Unexpected error message for invalid Set(BSSExpireAge,-1): " + str(e))
2450
2451 try:
2452 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(9),
2453 dbus_interface=dbus.PROPERTIES_IFACE)
2454 raise Exception("Invalid Set(BSSExpireAge,9) accepted")
bab493b9 2455 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2456 if "Error.Failed: BSSExpireAge must be >= 10" not in str(e):
2457 raise Exception("Unexpected error message for invalid Set(BSSExpireAge,9): " + str(e))
2458
2459 try:
2460 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.Int16(-1),
2461 dbus_interface=dbus.PROPERTIES_IFACE)
2462 raise Exception("Invalid Set(BSSExpireCount,-1) accepted")
bab493b9 2463 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2464 if "Error.Failed: wrong property type" not in str(e):
2465 raise Exception("Unexpected error message for invalid Set(BSSExpireCount,-1): " + str(e))
2466
2467 try:
2468 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(0),
2469 dbus_interface=dbus.PROPERTIES_IFACE)
2470 raise Exception("Invalid Set(BSSExpireCount,0) accepted")
bab493b9 2471 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2472 if "Error.Failed: BSSExpireCount must be > 0" not in str(e):
2473 raise Exception("Unexpected error message for invalid Set(BSSExpireCount,0): " + str(e))
2474
2475 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(180),
2476 dbus_interface=dbus.PROPERTIES_IFACE)
2477 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(2),
2478 dbus_interface=dbus.PROPERTIES_IFACE)
2479
2480def test_dbus_country(dev, apdev):
2481 """D-Bus Get/Set Country"""
2482 try:
81e787b7 2483 _test_dbus_country(dev, apdev)
2ec82e67
JM
2484 finally:
2485 dev[0].request("SET country 00")
2486 subprocess.call(['iw', 'reg', 'set', '00'])
2487
2488def _test_dbus_country(dev, apdev):
910eb5b5 2489 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2490
2491 # work around issues with possible pending regdom event from the end of
2492 # the previous test case
2493 time.sleep(0.2)
2494 dev[0].dump_monitor()
2495
2496 if_obj.Set(WPAS_DBUS_IFACE, "Country", "FI",
2497 dbus_interface=dbus.PROPERTIES_IFACE)
2498 res = if_obj.Get(WPAS_DBUS_IFACE, "Country",
2499 dbus_interface=dbus.PROPERTIES_IFACE)
2500 if res != "FI":
2501 raise Exception("Unexpected Country value %s (expected FI)" % res)
2502
2503 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
2504 if ev is None:
cb346b49
JM
2505 # For now, work around separate P2P Device interface event delivery
2506 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
2507 if ev is None:
2508 raise Exception("regdom change event not seen")
2ec82e67
JM
2509 if "init=USER type=COUNTRY alpha2=FI" not in ev:
2510 raise Exception("Unexpected event contents: " + ev)
2511
2512 try:
2513 if_obj.Set(WPAS_DBUS_IFACE, "Country", dbus.Int16(-1),
2514 dbus_interface=dbus.PROPERTIES_IFACE)
2515 raise Exception("Invalid Set(Country,-1) accepted")
bab493b9 2516 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2517 if "Error.Failed: wrong property type" not in str(e):
2518 raise Exception("Unexpected error message for invalid Set(Country,-1): " + str(e))
2519
2520 try:
2521 if_obj.Set(WPAS_DBUS_IFACE, "Country", "F",
2522 dbus_interface=dbus.PROPERTIES_IFACE)
2523 raise Exception("Invalid Set(Country,F) accepted")
bab493b9 2524 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2525 if "Error.Failed: invalid country code" not in str(e):
2526 raise Exception("Unexpected error message for invalid Set(Country,F): " + str(e))
2527
2528 if_obj.Set(WPAS_DBUS_IFACE, "Country", "00",
2529 dbus_interface=dbus.PROPERTIES_IFACE)
2530
2531 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
2532 if ev is None:
cb346b49
JM
2533 # For now, work around separate P2P Device interface event delivery
2534 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
2535 if ev is None:
2536 raise Exception("regdom change event not seen")
be90370b
JM
2537 # init=CORE was previously used due to invalid db.txt data for 00. For
2538 # now, allow both it and the new init=USER after fixed db.txt.
2539 if "init=CORE type=WORLD" not in ev and "init=USER type=WORLD" not in ev:
2ec82e67
JM
2540 raise Exception("Unexpected event contents: " + ev)
2541
2542def test_dbus_scan_interval(dev, apdev):
2543 """D-Bus Get/Set ScanInterval"""
2544 try:
2545 _test_dbus_scan_interval(dev, apdev)
2546 finally:
2547 dev[0].request("SCAN_INTERVAL 5")
2548
2549def _test_dbus_scan_interval(dev, apdev):
910eb5b5 2550 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2551
2552 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(3),
2553 dbus_interface=dbus.PROPERTIES_IFACE)
2554 res = if_obj.Get(WPAS_DBUS_IFACE, "ScanInterval",
2555 dbus_interface=dbus.PROPERTIES_IFACE)
2556 if res != 3:
2557 raise Exception("Unexpected ScanInterval value %d (expected %d)" % (res, i))
2558
2559 try:
2560 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.UInt16(100),
2561 dbus_interface=dbus.PROPERTIES_IFACE)
2562 raise Exception("Invalid Set(ScanInterval,100) accepted")
bab493b9 2563 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2564 if "Error.Failed: wrong property type" not in str(e):
2565 raise Exception("Unexpected error message for invalid Set(ScanInterval,100): " + str(e))
2566
2567 try:
2568 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(-1),
2569 dbus_interface=dbus.PROPERTIES_IFACE)
2570 raise Exception("Invalid Set(ScanInterval,-1) accepted")
bab493b9 2571 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2572 if "Error.Failed: scan_interval must be >= 0" not in str(e):
2573 raise Exception("Unexpected error message for invalid Set(ScanInterval,-1): " + str(e))
2574
2575 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(5),
2576 dbus_interface=dbus.PROPERTIES_IFACE)
2577
2578def test_dbus_probe_req_reporting(dev, apdev):
2579 """D-Bus Probe Request reporting"""
910eb5b5 2580 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67 2581
2ec82e67
JM
2582 dev[1].p2p_find(social=True)
2583
2584 class TestDbusProbe(TestDbus):
2585 def __init__(self, bus):
2586 TestDbus.__init__(self, bus)
2587 self.reported = False
2588
2589 def __enter__(self):
2590 gobject.timeout_add(1, self.run_test)
2591 gobject.timeout_add(15000, self.timeout)
cb346b49
JM
2592 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
2593 "GroupStarted")
2ec82e67
JM
2594 self.add_signal(self.probeRequest, WPAS_DBUS_IFACE, "ProbeRequest",
2595 byte_arrays=True)
2ec82e67
JM
2596 self.loop.run()
2597 return self
2598
cb346b49
JM
2599 def groupStarted(self, properties):
2600 logger.debug("groupStarted: " + str(properties))
2601 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
2602 properties['interface_object'])
2603 self.iface = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE)
2604 self.iface.SubscribeProbeReq()
2605 self.group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2606
2ec82e67
JM
2607 def probeRequest(self, args):
2608 logger.debug("probeRequest: args=%s" % str(args))
2609 self.reported = True
2610 self.loop.quit()
2611
2612 def run_test(self, *args):
2613 logger.debug("run_test")
cb346b49
JM
2614 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2615 params = dbus.Dictionary({ 'frequency': 2412 })
2616 p2p.GroupAdd(params)
2ec82e67
JM
2617 return False
2618
2619 def success(self):
2620 return self.reported
2621
2622 with TestDbusProbe(bus) as t:
2623 if not t.success():
2624 raise Exception("Expected signals not seen")
cb346b49
JM
2625 t.iface.UnsubscribeProbeReq()
2626 try:
2627 t.iface.UnsubscribeProbeReq()
2628 raise Exception("Invalid UnsubscribeProbeReq() accepted")
bab493b9 2629 except dbus.exceptions.DBusException as e:
cb346b49
JM
2630 if "NoSubscription" not in str(e):
2631 raise Exception("Unexpected error message for invalid UnsubscribeProbeReq(): " + str(e))
2632 t.group_p2p.Disconnect()
2ec82e67
JM
2633
2634 with TestDbusProbe(bus) as t:
2635 if not t.success():
2636 raise Exception("Expected signals not seen")
2637 # On purpose, leave ProbeReq subscription in place to test automatic
2638 # cleanup.
2639
2640 dev[1].p2p_stop_find()
2ec82e67 2641
0e126c6d
JM
2642def test_dbus_probe_req_reporting_oom(dev, apdev):
2643 """D-Bus Probe Request reporting (OOM)"""
2644 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2645 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2646
fb9adae4
JM
2647 # Need to make sure this process has not already subscribed to avoid false
2648 # failures due to the operation succeeding due to os_strdup() not even
2649 # getting called.
2650 try:
2651 iface.UnsubscribeProbeReq()
2652 was_subscribed = True
bab493b9 2653 except dbus.exceptions.DBusException as e:
fb9adae4
JM
2654 was_subscribed = False
2655 pass
2656
0e126c6d
JM
2657 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_subscribe_preq",
2658 "SubscribeProbeReq"):
2659 iface.SubscribeProbeReq()
2660
fb9adae4
JM
2661 if was_subscribed:
2662 # On purpose, leave ProbeReq subscription in place to test automatic
2663 # cleanup.
2664 iface.SubscribeProbeReq()
2665
2ec82e67
JM
2666def test_dbus_p2p_invalid(dev, apdev):
2667 """D-Bus invalid P2P operations"""
910eb5b5 2668 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2669 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2670
2671 try:
2672 p2p.RejectPeer(path + "/Peers/00112233445566")
2673 raise Exception("Invalid RejectPeer accepted")
bab493b9 2674 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2675 if "UnknownError: Failed to call wpas_p2p_reject" not in str(e):
2676 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2677
2678 try:
2679 p2p.RejectPeer("/foo")
2680 raise Exception("Invalid RejectPeer accepted")
bab493b9 2681 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2682 if "InvalidArgs" not in str(e):
2683 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2684
001c4bf5
JM
2685 tests = [ { },
2686 { 'peer': 'foo' },
2687 { 'foo': "bar" },
2688 { 'iface': "abc" },
2689 { 'iface': 123 } ]
2690 for t in tests:
2691 try:
2692 p2p.RemoveClient(t)
2693 raise Exception("Invalid RemoveClient accepted")
bab493b9 2694 except dbus.exceptions.DBusException as e:
001c4bf5
JM
2695 if "InvalidArgs" not in str(e):
2696 raise Exception("Unexpected error message for invalid RemoveClient(): " + str(e))
2697
2ec82e67
JM
2698 tests = [ {'DiscoveryType': 'foo'},
2699 {'RequestedDeviceTypes': 'foo'},
2700 {'RequestedDeviceTypes': ['foo']},
2701 {'RequestedDeviceTypes': ['1','2','3','4','5','6','7','8','9',
2702 '10','11','12','13','14','15','16',
2703 '17']},
2704 {'RequestedDeviceTypes': dbus.Array([], signature="s")},
2705 {'RequestedDeviceTypes': dbus.Array([['foo']], signature="as")},
2706 {'RequestedDeviceTypes': dbus.Array([], signature="i")},
2707 {'RequestedDeviceTypes': [dbus.ByteArray('12345678'),
2708 dbus.ByteArray('1234567')]},
2709 {'Foo': dbus.Int16(1)},
2710 {'Foo': dbus.UInt16(1)},
2711 {'Foo': dbus.Int64(1)},
2712 {'Foo': dbus.UInt64(1)},
2713 {'Foo': dbus.Double(1.23)},
2714 {'Foo': dbus.Signature('s')},
2715 {'Foo': 'bar'}]
2716 for t in tests:
2717 try:
2718 p2p.Find(dbus.Dictionary(t))
2719 raise Exception("Invalid Find accepted")
bab493b9 2720 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2721 if "InvalidArgs" not in str(e):
2722 raise Exception("Unexpected error message for invalid Find(): " + str(e))
2723
2724 for p in [ "/foo",
2725 "/fi/w1/wpa_supplicant1/Interfaces/1234",
2726 "/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234" ]:
2727 try:
2728 p2p.RemovePersistentGroup(dbus.ObjectPath(p))
2729 raise Exception("Invalid RemovePersistentGroup accepted")
bab493b9 2730 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2731 if "InvalidArgs" not in str(e):
2732 raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e))
2733
2734 try:
2735 dev[0].request("P2P_SET disabled 1")
2736 p2p.Listen(5)
2737 raise Exception("Invalid Listen accepted")
bab493b9 2738 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2739 if "UnknownError: Could not start P2P listen" not in str(e):
2740 raise Exception("Unexpected error message for invalid Listen: " + str(e))
2741 finally:
2742 dev[0].request("P2P_SET disabled 0")
2743
2744 test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False)
2745 test_p2p = dbus.Interface(test_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2746 try:
2747 test_p2p.Listen("foo")
2748 raise Exception("Invalid Listen accepted")
bab493b9 2749 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2750 if "InvalidArgs" not in str(e):
2751 raise Exception("Unexpected error message for invalid Listen: " + str(e))
2752
2753 try:
2754 dev[0].request("P2P_SET disabled 1")
2755 p2p.ExtendedListen(dbus.Dictionary({}))
2756 raise Exception("Invalid ExtendedListen accepted")
bab493b9 2757 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2758 if "UnknownError: failed to initiate a p2p_ext_listen" not in str(e):
2759 raise Exception("Unexpected error message for invalid ExtendedListen: " + str(e))
2760 finally:
2761 dev[0].request("P2P_SET disabled 0")
2762
2763 try:
2764 dev[0].request("P2P_SET disabled 1")
2765 args = { 'duration1': 30000, 'interval1': 102400,
2766 'duration2': 20000, 'interval2': 102400 }
2767 p2p.PresenceRequest(args)
2768 raise Exception("Invalid PresenceRequest accepted")
bab493b9 2769 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2770 if "UnknownError: Failed to invoke presence request" not in str(e):
2771 raise Exception("Unexpected error message for invalid PresenceRequest: " + str(e))
2772 finally:
2773 dev[0].request("P2P_SET disabled 0")
2774
2775 try:
2776 params = dbus.Dictionary({'frequency': dbus.Int32(-1)})
2777 p2p.GroupAdd(params)
2778 raise Exception("Invalid GroupAdd accepted")
bab493b9 2779 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2780 if "InvalidArgs" not in str(e):
2781 raise Exception("Unexpected error message for invalid GroupAdd: " + str(e))
2782
2783 try:
2784 params = dbus.Dictionary({'persistent_group_object':
2785 dbus.ObjectPath(path),
2786 'frequency': 2412})
2787 p2p.GroupAdd(params)
2788 raise Exception("Invalid GroupAdd accepted")
bab493b9 2789 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2790 if "InvalidArgs" not in str(e):
2791 raise Exception("Unexpected error message for invalid GroupAdd: " + str(e))
2792
2793 try:
2794 p2p.Disconnect()
2795 raise Exception("Invalid Disconnect accepted")
bab493b9 2796 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2797 if "UnknownError: failed to disconnect" not in str(e):
2798 raise Exception("Unexpected error message for invalid Disconnect: " + str(e))
2799
2800 try:
2801 dev[0].request("P2P_SET disabled 1")
2802 p2p.Flush()
2803 raise Exception("Invalid Flush accepted")
bab493b9 2804 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2805 if "Error.Failed: P2P is not available for this interface" not in str(e):
2806 raise Exception("Unexpected error message for invalid Flush: " + str(e))
2807 finally:
2808 dev[0].request("P2P_SET disabled 0")
2809
2810 try:
2811 dev[0].request("P2P_SET disabled 1")
2812 args = { 'peer': path,
2813 'join': True,
2814 'wps_method': 'pbc',
2815 'frequency': 2412 }
2816 pin = p2p.Connect(args)
2817 raise Exception("Invalid Connect accepted")
bab493b9 2818 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2819 if "Error.Failed: P2P is not available for this interface" not in str(e):
2820 raise Exception("Unexpected error message for invalid Connect: " + str(e))
2821 finally:
2822 dev[0].request("P2P_SET disabled 0")
2823
2824 tests = [ { 'frequency': dbus.Int32(-1) },
2825 { 'wps_method': 'pbc' },
2826 { 'wps_method': 'foo' } ]
2827 for args in tests:
2828 try:
2829 pin = p2p.Connect(args)
2830 raise Exception("Invalid Connect accepted")
bab493b9 2831 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2832 if "InvalidArgs" not in str(e):
2833 raise Exception("Unexpected error message for invalid Connect: " + str(e))
2834
2835 try:
2836 dev[0].request("P2P_SET disabled 1")
2837 args = { 'peer': path }
2838 pin = p2p.Invite(args)
2839 raise Exception("Invalid Invite accepted")
bab493b9 2840 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2841 if "Error.Failed: P2P is not available for this interface" not in str(e):
2842 raise Exception("Unexpected error message for invalid Invite: " + str(e))
2843 finally:
2844 dev[0].request("P2P_SET disabled 0")
2845
2846 try:
2847 args = { 'foo': 'bar' }
2848 pin = p2p.Invite(args)
2849 raise Exception("Invalid Invite accepted")
bab493b9 2850 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2851 if "InvalidArgs" not in str(e):
2852 raise Exception("Unexpected error message for invalid Connect: " + str(e))
2853
2854 tests = [ (path, 'display', "InvalidArgs"),
2855 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2856 'display',
2857 "UnknownError: Failed to send provision discovery request"),
2858 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2859 'keypad',
2860 "UnknownError: Failed to send provision discovery request"),
2861 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2862 'pbc',
2863 "UnknownError: Failed to send provision discovery request"),
2864 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2865 'pushbutton',
2866 "UnknownError: Failed to send provision discovery request"),
2867 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2868 'foo', "InvalidArgs") ]
2869 for (p,method,err) in tests:
2870 try:
2871 p2p.ProvisionDiscoveryRequest(p, method)
2872 raise Exception("Invalid ProvisionDiscoveryRequest accepted")
bab493b9 2873 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2874 if err not in str(e):
2875 raise Exception("Unexpected error message for invalid ProvisionDiscoveryRequest: " + str(e))
2876
2877 try:
2878 dev[0].request("P2P_SET disabled 1")
2879 if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers",
2880 dbus_interface=dbus.PROPERTIES_IFACE)
2881 raise Exception("Invalid Get(Peers) accepted")
bab493b9 2882 except dbus.exceptions.DBusException as e:
2ec82e67
JM
2883 if "Error.Failed: P2P is not available for this interface" not in str(e):
2884 raise Exception("Unexpected error message for invalid Get(Peers): " + str(e))
2885 finally:
2886 dev[0].request("P2P_SET disabled 0")
2887
0e126c6d
JM
2888def test_dbus_p2p_oom(dev, apdev):
2889 """D-Bus P2P operations and OOM"""
2890 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2891 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2892
2893 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_string_array",
2894 "Find", "InvalidArgs"):
2895 p2p.Find(dbus.Dictionary({ 'Foo': [ 'bar' ] }))
2896
2897 with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_string_array",
2898 "Find", "InvalidArgs"):
2899 p2p.Find(dbus.Dictionary({ 'Foo': [ 'bar' ] }))
2900
2901 with alloc_fail_dbus(dev[0], 10, "_wpa_dbus_dict_entry_get_string_array",
2902 "Find", "InvalidArgs"):
2903 p2p.Find(dbus.Dictionary({ 'Foo': [ '1','2','3','4','5','6','7','8','9' ] }))
2904
2905 with alloc_fail_dbus(dev[0], 1, ":=_wpa_dbus_dict_entry_get_binarray",
2906 "Find", "InvalidArgs"):
2907 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] }))
2908
2909 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array;_wpa_dbus_dict_entry_get_binarray",
2910 "Find", "InvalidArgs"):
2911 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] }))
2912
2913 with alloc_fail_dbus(dev[0], 2, "=_wpa_dbus_dict_entry_get_binarray",
2914 "Find", "InvalidArgs"):
2915 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123'),
2916 dbus.ByteArray('123'),
2917 dbus.ByteArray('123'),
2918 dbus.ByteArray('123'),
2919 dbus.ByteArray('123'),
2920 dbus.ByteArray('123'),
2921 dbus.ByteArray('123'),
2922 dbus.ByteArray('123'),
2923 dbus.ByteArray('123'),
2924 dbus.ByteArray('123'),
2925 dbus.ByteArray('123') ] }))
2926
2927 with alloc_fail_dbus(dev[0], 1, "wpabuf_alloc_ext_data;_wpa_dbus_dict_entry_get_binarray",
2928 "Find", "InvalidArgs"):
2929 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] }))
2930
2931 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_handler_p2p_find",
2932 "Find", "InvalidArgs"):
2933 p2p.Find(dbus.Dictionary({ 'Foo': path }))
2934
2935 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array",
2936 "AddService", "InvalidArgs"):
2937 args = { 'service_type': 'bonjour',
2938 'response': dbus.ByteArray(500*'b') }
2939 p2p.AddService(args)
2940
2941 with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_byte_array",
2942 "AddService", "InvalidArgs"):
2943 p2p.AddService(args)
2944
2ec82e67
JM
2945def test_dbus_p2p_discovery(dev, apdev):
2946 """D-Bus P2P discovery"""
6a94fdf2
JM
2947 try:
2948 run_dbus_p2p_discovery(dev, apdev)
2949 finally:
2950 dev[1].request("VENDOR_ELEM_REMOVE 1 *")
2951
2952def run_dbus_p2p_discovery(dev, apdev):
910eb5b5 2953 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2954 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2955
2956 addr0 = dev[0].p2p_dev_addr()
2957
2958 dev[1].request("SET sec_device_type 1-0050F204-2")
2959 dev[1].request("VENDOR_ELEM_ADD 1 dd0c0050f2041049000411223344")
6a94fdf2 2960 dev[1].request("VENDOR_ELEM_ADD 1 dd06001122335566")
2ec82e67
JM
2961 dev[1].p2p_listen()
2962 addr1 = dev[1].p2p_dev_addr()
2963 a1 = binascii.unhexlify(addr1.replace(':',''))
2964
2965 wfd_devinfo = "00001c440028"
2966 dev[2].request("SET wifi_display 1")
2967 dev[2].request("WFD_SUBELEM_SET 0 0006" + wfd_devinfo)
2968 wfd = binascii.unhexlify('000006' + wfd_devinfo)
2969 dev[2].p2p_listen()
2970 addr2 = dev[2].p2p_dev_addr()
2971 a2 = binascii.unhexlify(addr2.replace(':',''))
2972
2973 res = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
2974 dbus_interface=dbus.PROPERTIES_IFACE)
2975 if 'Peers' not in res:
2976 raise Exception("GetAll result missing Peers")
2977 if len(res['Peers']) != 0:
2978 raise Exception("Unexpected peer(s) in the list")
2979
2980 args = {'DiscoveryType': 'social',
2981 'RequestedDeviceTypes': [dbus.ByteArray('12345678')],
2982 'Timeout': dbus.Int32(1) }
2983 p2p.Find(dbus.Dictionary(args))
2984 p2p.StopFind()
2985
2986 class TestDbusP2p(TestDbus):
2987 def __init__(self, bus):
2988 TestDbus.__init__(self, bus)
2989 self.found = False
2990 self.found2 = False
e7d454bb 2991 self.found_prop = False
2ec82e67 2992 self.lost = False
571a1af2 2993 self.find_stopped = False
2ec82e67
JM
2994
2995 def __enter__(self):
2996 gobject.timeout_add(1, self.run_test)
2997 gobject.timeout_add(15000, self.timeout)
2998 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
2999 "DeviceFound")
e7d454bb
JM
3000 self.add_signal(self.deviceFoundProperties,
3001 WPAS_DBUS_IFACE_P2PDEVICE, "DeviceFoundProperties")
2ec82e67
JM
3002 self.add_signal(self.deviceLost, WPAS_DBUS_IFACE_P2PDEVICE,
3003 "DeviceLost")
3004 self.add_signal(self.provisionDiscoveryResponseEnterPin,
3005 WPAS_DBUS_IFACE_P2PDEVICE,
3006 "ProvisionDiscoveryResponseEnterPin")
571a1af2
JM
3007 self.add_signal(self.findStopped, WPAS_DBUS_IFACE_P2PDEVICE,
3008 "FindStopped")
2ec82e67
JM
3009 self.loop.run()
3010 return self
3011
3012 def deviceFound(self, path):
3013 logger.debug("deviceFound: path=%s" % path)
3014 res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers",
3015 dbus_interface=dbus.PROPERTIES_IFACE)
3016 if len(res) < 1:
3017 raise Exception("Unexpected number of peers")
3018 if path not in res:
3019 raise Exception("Mismatch in peer object path")
3020 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3021 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3022 dbus_interface=dbus.PROPERTIES_IFACE,
3023 byte_arrays=True)
3024 logger.debug("peer properties: " + str(res))
3025
3026 if res['DeviceAddress'] == a1:
3027 if 'SecondaryDeviceTypes' not in res:
3028 raise Exception("Missing SecondaryDeviceTypes")
3029 sec = res['SecondaryDeviceTypes']
3030 if len(sec) < 1:
3031 raise Exception("Secondary device type missing")
3032 if "\x00\x01\x00\x50\xF2\x04\x00\x02" not in sec:
3033 raise Exception("Secondary device type mismatch")
3034
3035 if 'VendorExtension' not in res:
3036 raise Exception("Missing VendorExtension")
3037 vendor = res['VendorExtension']
3038 if len(vendor) < 1:
3039 raise Exception("Vendor extension missing")
3040 if "\x11\x22\x33\x44" not in vendor:
3041 raise Exception("Secondary device type mismatch")
3042
6a94fdf2
JM
3043 if 'VSIE' not in res:
3044 raise Exception("Missing VSIE")
3045 vendor = res['VSIE']
3046 if len(vendor) < 1:
3047 raise Exception("VSIE missing")
3048 if vendor != "\xdd\x06\x00\x11\x22\x33\x55\x66":
3049 raise Exception("VSIE mismatch")
3050
2ec82e67
JM
3051 self.found = True
3052 elif res['DeviceAddress'] == a2:
3053 if 'IEs' not in res:
3054 raise Exception("IEs missing")
3055 if res['IEs'] != wfd:
3056 raise Exception("IEs mismatch")
3057 self.found2 = True
3058 else:
3059 raise Exception("Unexpected peer device address")
3060
3061 if self.found and self.found2:
3062 p2p.StopFind()
3063 p2p.RejectPeer(path)
3064 p2p.ProvisionDiscoveryRequest(path, 'display')
3065
3066 def deviceLost(self, path):
3067 logger.debug("deviceLost: path=%s" % path)
50d7cded
JM
3068 if not self.found or not self.found2:
3069 # This may happen if a previous test case ended up scheduling
3070 # deviceLost event and that event did not get delivered before
3071 # starting the next test execution.
3072 logger.debug("Ignore deviceLost before the deviceFound events")
3073 return
2ec82e67
JM
3074 self.lost = True
3075 try:
3076 p2p.RejectPeer(path)
3077 raise Exception("Invalid RejectPeer accepted")
bab493b9 3078 except dbus.exceptions.DBusException as e:
2ec82e67
JM
3079 if "UnknownError: Failed to call wpas_p2p_reject" not in str(e):
3080 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
3081 self.loop.quit()
3082
e7d454bb
JM
3083 def deviceFoundProperties(self, path, properties):
3084 logger.debug("deviceFoundProperties: path=%s" % path)
3085 logger.debug("peer properties: " + str(properties))
3086 if properties['DeviceAddress'] == a1:
3087 self.found_prop = True
3088
2ec82e67
JM
3089 def provisionDiscoveryResponseEnterPin(self, peer_object):
3090 logger.debug("provisionDiscoveryResponseEnterPin - peer=%s" % peer_object)
3091 p2p.Flush()
3092
571a1af2
JM
3093 def findStopped(self):
3094 logger.debug("findStopped")
3095 self.find_stopped = True
3096
2ec82e67
JM
3097 def run_test(self, *args):
3098 logger.debug("run_test")
3099 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social',
3100 'Timeout': dbus.Int32(10)}))
3101 return False
3102
3103 def success(self):
571a1af2 3104 return self.found and self.lost and self.found2 and self.find_stopped
2ec82e67
JM
3105
3106 with TestDbusP2p(bus) as t:
3107 if not t.success():
3108 raise Exception("Expected signals not seen")
3109
3110 dev[1].request("VENDOR_ELEM_REMOVE 1 *")
3111 dev[1].p2p_stop_find()
3112
3113 p2p.Listen(1)
3114 dev[2].p2p_stop_find()
3115 dev[2].request("P2P_FLUSH")
3116 if not dev[2].discover_peer(addr0):
3117 raise Exception("Peer not found")
3118 p2p.StopFind()
3119 dev[2].p2p_stop_find()
3120
3121 try:
3122 p2p.ExtendedListen(dbus.Dictionary({'foo': 100}))
3123 raise Exception("Invalid ExtendedListen accepted")
bab493b9 3124 except dbus.exceptions.DBusException as e:
2ec82e67
JM
3125 if "InvalidArgs" not in str(e):
3126 raise Exception("Unexpected error message for invalid ExtendedListen(): " + str(e))
3127
3128 p2p.ExtendedListen(dbus.Dictionary({'period': 100, 'interval': 1000}))
3129 p2p.ExtendedListen(dbus.Dictionary({}))
3130 dev[0].global_request("P2P_EXT_LISTEN")
3131
6c44d97b
JM
3132def test_dbus_p2p_discovery_freq(dev, apdev):
3133 """D-Bus P2P discovery on a specific non-social channel"""
3134 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3135 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3136
3137 addr1 = dev[1].p2p_dev_addr()
3138 autogo(dev[1], freq=2422)
3139
3140 class TestDbusP2p(TestDbus):
3141 def __init__(self, bus):
3142 TestDbus.__init__(self, bus)
3143 self.found = False
3144
3145 def __enter__(self):
3146 gobject.timeout_add(1, self.run_test)
3147 gobject.timeout_add(5000, self.timeout)
3148 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3149 "DeviceFound")
3150 self.loop.run()
3151 return self
3152
3153 def deviceFound(self, path):
3154 logger.debug("deviceFound: path=%s" % path)
3155 self.found = True
3156 self.loop.quit()
3157
3158 def run_test(self, *args):
3159 logger.debug("run_test")
3160 p2p.Find(dbus.Dictionary({'freq': 2422}))
3161 return False
3162
3163 def success(self):
3164 return self.found
3165
3166 with TestDbusP2p(bus) as t:
3167 if not t.success():
3168 raise Exception("Expected signals not seen")
3169
3170 dev[1].remove_group()
3171 p2p.StopFind()
3172
2ec82e67
JM
3173def test_dbus_p2p_service_discovery(dev, apdev):
3174 """D-Bus P2P service discovery"""
910eb5b5 3175 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3176 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3177
3178 addr0 = dev[0].p2p_dev_addr()
3179 addr1 = dev[1].p2p_dev_addr()
3180
3181 bonjour_query = dbus.ByteArray(binascii.unhexlify('0b5f6166706f766572746370c00c000c01'))
3182 bonjour_response = dbus.ByteArray(binascii.unhexlify('074578616d706c65c027'))
db98b587 3183
2ec82e67
JM
3184 args = { 'service_type': 'bonjour',
3185 'query': bonjour_query,
3186 'response': bonjour_response }
3187 p2p.AddService(args)
3188 p2p.FlushService()
3189 p2p.AddService(args)
3190
3191 try:
3192 p2p.DeleteService(args)
3193 raise Exception("Invalid DeleteService() accepted")
bab493b9 3194 except dbus.exceptions.DBusException as e:
2ec82e67
JM
3195 if "InvalidArgs" not in str(e):
3196 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
3197
3198 args = { 'service_type': 'bonjour',
3199 'query': bonjour_query }
3200 p2p.DeleteService(args)
3201 try:
3202 p2p.DeleteService(args)
3203 raise Exception("Invalid DeleteService() accepted")
bab493b9 3204 except dbus.exceptions.DBusException as e:
2ec82e67
JM
3205 if "InvalidArgs" not in str(e):
3206 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
3207
3208 args = { 'service_type': 'upnp',
3209 'version': 0x10,
3210 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' }
3211 p2p.AddService(args)
3212 p2p.DeleteService(args)
3213 try:
3214 p2p.DeleteService(args)
3215 raise Exception("Invalid DeleteService() accepted")
bab493b9 3216 except dbus.exceptions.DBusException as e:
2ec82e67
JM
3217 if "InvalidArgs" not in str(e):
3218 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
3219
3220 tests = [ { 'service_type': 'foo' },
3221 { 'service_type': 'foo', 'query': bonjour_query },
3222 { 'service_type': 'upnp' },
3223 { 'service_type': 'upnp', 'version': 0x10 },
3224 { 'service_type': 'upnp',
3225 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
3226 { 'version': 0x10,
3227 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
3228 { 'service_type': 'upnp', 'foo': 'bar' },
3229 { 'service_type': 'bonjour' },
3230 { 'service_type': 'bonjour', 'query': 'foo' },
3231 { 'service_type': 'bonjour', 'foo': 'bar' } ]
3232 for args in tests:
3233 try:
3234 p2p.DeleteService(args)
3235 raise Exception("Invalid DeleteService() accepted")
bab493b9 3236 except dbus.exceptions.DBusException as e:
2ec82e67
JM
3237 if "InvalidArgs" not in str(e):
3238 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
3239
3240 tests = [ { 'service_type': 'foo' },
3241 { 'service_type': 'upnp' },
3242 { 'service_type': 'upnp', 'version': 0x10 },
3243 { 'service_type': 'upnp',
3244 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
3245 { 'version': 0x10,
3246 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
3247 { 'service_type': 'upnp', 'foo': 'bar' },
3248 { 'service_type': 'bonjour' },
3249 { 'service_type': 'bonjour', 'query': 'foo' },
3250 { 'service_type': 'bonjour', 'response': 'foo' },
3251 { 'service_type': 'bonjour', 'query': bonjour_query },
3252 { 'service_type': 'bonjour', 'response': bonjour_response },
3253 { 'service_type': 'bonjour', 'query': dbus.ByteArray(500*'a') },
3254 { 'service_type': 'bonjour', 'foo': 'bar' } ]
3255 for args in tests:
3256 try:
3257 p2p.AddService(args)
3258 raise Exception("Invalid AddService() accepted")
bab493b9 3259 except dbus.exceptions.DBusException as e:
2ec82e67
JM
3260 if "InvalidArgs" not in str(e):
3261 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
3262
3263 args = { 'tlv': dbus.ByteArray("\x02\x00\x00\x01") }
3264 ref = p2p.ServiceDiscoveryRequest(args)
3265 p2p.ServiceDiscoveryCancelRequest(ref)
3266 try:
3267 p2p.ServiceDiscoveryCancelRequest(ref)
3268 raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
bab493b9 3269 except dbus.exceptions.DBusException as e:
2ec82e67
JM
3270 if "InvalidArgs" not in str(e):
3271 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
3272 try:
3273 p2p.ServiceDiscoveryCancelRequest(dbus.UInt64(0))
3274 raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
bab493b9 3275 except dbus.exceptions.DBusException as e:
2ec82e67
JM
3276 if "InvalidArgs" not in str(e):
3277 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
3278
3279 args = { 'service_type': 'upnp',
3280 'version': 0x10,
3281 'service': 'ssdp:foo' }
3282 ref = p2p.ServiceDiscoveryRequest(args)
3283 p2p.ServiceDiscoveryCancelRequest(ref)
3284
3285 tests = [ { 'service_type': 'foo' },
3286 { 'foo': 'bar' },
3287 { 'tlv': 'foo' },
3288 { },
3289 { 'version': 0 },
3290 { 'service_type': 'upnp',
3291 'service': 'ssdp:foo' },
3292 { 'service_type': 'upnp',
3293 'version': 0x10 },
3294 { 'service_type': 'upnp',
3295 'version': 0x10,
3296 'service': 'ssdp:foo',
3297 'peer_object': dbus.ObjectPath(path + "/Peers") },
3298 { 'service_type': 'upnp',
3299 'version': 0x10,
3300 'service': 'ssdp:foo',
3301 'peer_object': path + "/Peers" },
3302 { 'service_type': 'upnp',
3303 'version': 0x10,
3304 'service': 'ssdp:foo',
3305 'peer_object': dbus.ObjectPath(path + "/Peers/00112233445566") } ]
3306 for args in tests:
3307 try:
3308 p2p.ServiceDiscoveryRequest(args)
3309 raise Exception("Invalid ServiceDiscoveryRequest accepted")
bab493b9 3310 except dbus.exceptions.DBusException as e:
2ec82e67
JM
3311 if "InvalidArgs" not in str(e):
3312 raise Exception("Unexpected error message for invalid ServiceDiscoveryRequest(): " + str(e))
3313
3314 args = { 'foo': 'bar' }
3315 try:
3316 p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv'))
3317 raise Exception("Invalid ServiceDiscoveryResponse accepted")
bab493b9 3318 except dbus.exceptions.DBusException as e:
2ec82e67
JM
3319 if "InvalidArgs" not in str(e):
3320 raise Exception("Unexpected error message for invalid ServiceDiscoveryResponse(): " + str(e))
3321
3322def test_dbus_p2p_service_discovery_query(dev, apdev):
3323 """D-Bus P2P service discovery query"""
910eb5b5 3324 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3325 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3326
3327 addr0 = dev[0].p2p_dev_addr()
3328 dev[1].request("P2P_SERVICE_ADD bonjour 0b5f6166706f766572746370c00c000c01 074578616d706c65c027")
3329 dev[1].p2p_listen()
3330 addr1 = dev[1].p2p_dev_addr()
3331
3332 class TestDbusP2p(TestDbus):
3333 def __init__(self, bus):
3334 TestDbus.__init__(self, bus)
3335 self.done = False
3336
3337 def __enter__(self):
3338 gobject.timeout_add(1, self.run_test)
3339 gobject.timeout_add(15000, self.timeout)
3340 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3341 "DeviceFound")
3342 self.add_signal(self.serviceDiscoveryResponse,
3343 WPAS_DBUS_IFACE_P2PDEVICE,
3344 "ServiceDiscoveryResponse", byte_arrays=True)
3345 self.loop.run()
3346 return self
3347
3348 def deviceFound(self, path):
3349 logger.debug("deviceFound: path=%s" % path)
3350 args = { 'peer_object': path,
3351 'tlv': dbus.ByteArray("\x02\x00\x00\x01") }
3352 p2p.ServiceDiscoveryRequest(args)
3353
3354 def serviceDiscoveryResponse(self, sd_request):
3355 logger.debug("serviceDiscoveryResponse: sd_request=%s" % str(sd_request))
3356 self.done = True
3357 self.loop.quit()
3358
3359 def run_test(self, *args):
3360 logger.debug("run_test")
3361 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social',
3362 'Timeout': dbus.Int32(10)}))
3363 return False
3364
3365 def success(self):
3366 return self.done
3367
3368 with TestDbusP2p(bus) as t:
3369 if not t.success():
3370 raise Exception("Expected signals not seen")
3371
3372 dev[1].p2p_stop_find()
3373
3374def test_dbus_p2p_service_discovery_external(dev, apdev):
3375 """D-Bus P2P service discovery with external response"""
3376 try:
3377 _test_dbus_p2p_service_discovery_external(dev, apdev)
3378 finally:
3379 dev[0].request("P2P_SERV_DISC_EXTERNAL 0")
3380
3381def _test_dbus_p2p_service_discovery_external(dev, apdev):
910eb5b5 3382 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3383 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3384
3385 addr0 = dev[0].p2p_dev_addr()
3386 addr1 = dev[1].p2p_dev_addr()
3387 resp = "0300000101"
3388
3389 dev[1].request("P2P_FLUSH")
3390 dev[1].request("P2P_SERV_DISC_REQ " + addr0 + " 02000001")
3391 dev[1].p2p_find(social=True)
3392
3393 class TestDbusP2p(TestDbus):
3394 def __init__(self, bus):
3395 TestDbus.__init__(self, bus)
3396 self.sd = False
3397
3398 def __enter__(self):
3399 gobject.timeout_add(1, self.run_test)
3400 gobject.timeout_add(15000, self.timeout)
3401 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3402 "DeviceFound")
3403 self.add_signal(self.serviceDiscoveryRequest,
3404 WPAS_DBUS_IFACE_P2PDEVICE,
3405 "ServiceDiscoveryRequest")
3406 self.loop.run()
3407 return self
3408
3409 def deviceFound(self, path):
3410 logger.debug("deviceFound: path=%s" % path)
3411
3412 def serviceDiscoveryRequest(self, sd_request):
3413 logger.debug("serviceDiscoveryRequest: sd_request=%s" % str(sd_request))
3414 self.sd = True
3415 args = { 'peer_object': sd_request['peer_object'],
3416 'frequency': sd_request['frequency'],
3417 'dialog_token': sd_request['dialog_token'],
3418 'tlvs': dbus.ByteArray(binascii.unhexlify(resp)) }
3419 p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv'))
3420 self.loop.quit()
3421
3422 def run_test(self, *args):
3423 logger.debug("run_test")
3424 p2p.ServiceDiscoveryExternal(1)
3425 p2p.ServiceUpdate()
3426 p2p.Listen(15)
3427 return False
3428
3429 def success(self):
3430 return self.sd
3431
3432 with TestDbusP2p(bus) as t:
3433 if not t.success():
3434 raise Exception("Expected signals not seen")
3435
3436 ev = dev[1].wait_global_event(["P2P-SERV-DISC-RESP"], timeout=5)
3437 if ev is None:
3438 raise Exception("Service discovery timed out")
3439 if addr0 not in ev:
3440 raise Exception("Unexpected address in SD Response: " + ev)
3441 if ev.split(' ')[4] != resp:
3442 raise Exception("Unexpected response data SD Response: " + ev)
3443 dev[1].p2p_stop_find()
3444
3445 p2p.StopFind()
3446 p2p.ServiceDiscoveryExternal(0)
3447
3448def test_dbus_p2p_autogo(dev, apdev):
3449 """D-Bus P2P autonomous GO"""
910eb5b5 3450 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67 3451 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2ec82e67
JM
3452
3453 addr0 = dev[0].p2p_dev_addr()
3454
3455 class TestDbusP2p(TestDbus):
3456 def __init__(self, bus):
3457 TestDbus.__init__(self, bus)
3458 self.first = True
3459 self.waiting_end = False
035efb2c 3460 self.exceptions = False
001c4bf5 3461 self.deauthorized = False
2ec82e67
JM
3462 self.done = False
3463
3464 def __enter__(self):
3465 gobject.timeout_add(1, self.run_test)
3466 gobject.timeout_add(15000, self.timeout)
3467 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3468 "DeviceFound")
3469 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3470 "GroupStarted")
3471 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3472 "GroupFinished")
3473 self.add_signal(self.persistentGroupAdded,
3474 WPAS_DBUS_IFACE_P2PDEVICE,
3475 "PersistentGroupAdded")
3476 self.add_signal(self.persistentGroupRemoved,
3477 WPAS_DBUS_IFACE_P2PDEVICE,
3478 "PersistentGroupRemoved")
3479 self.add_signal(self.provisionDiscoveryRequestDisplayPin,
3480 WPAS_DBUS_IFACE_P2PDEVICE,
3481 "ProvisionDiscoveryRequestDisplayPin")
3482 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3483 "StaAuthorized")
001c4bf5
JM
3484 self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE,
3485 "StaDeauthorized")
2ec82e67
JM
3486 self.loop.run()
3487 return self
3488
3489 def groupStarted(self, properties):
3490 logger.debug("groupStarted: " + str(properties))
3491 self.group = properties['group_object']
cb346b49
JM
3492 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3493 properties['interface_object'])
3494 role = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
3495 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67 3496 if role != "GO":
035efb2c 3497 self.exceptions = True
2ec82e67 3498 raise Exception("Unexpected role reported: " + role)
cb346b49
JM
3499 group = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
3500 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67 3501 if group != properties['group_object']:
035efb2c 3502 self.exceptions = True
2ec82e67 3503 raise Exception("Unexpected Group reported: " + str(group))
cb346b49
JM
3504 go = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
3505 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67 3506 if go != '/':
035efb2c 3507 self.exceptions = True
2ec82e67
JM
3508 raise Exception("Unexpected PeerGO value: " + str(go))
3509 if self.first:
3510 self.first = False
3511 logger.info("Remove persistent group instance")
cb346b49
JM
3512 group_p2p = dbus.Interface(self.g_if_obj,
3513 WPAS_DBUS_IFACE_P2PDEVICE)
3514 group_p2p.Disconnect()
2ec82e67
JM
3515 else:
3516 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3517 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join")
3518
3519 def groupFinished(self, properties):
3520 logger.debug("groupFinished: " + str(properties))
3521 if self.waiting_end:
3522 logger.info("Remove persistent group")
3523 p2p.RemovePersistentGroup(self.persistent)
3524 else:
3525 logger.info("Re-start persistent group")
3526 params = dbus.Dictionary({'persistent_group_object':
3527 self.persistent,
3528 'frequency': 2412})
3529 p2p.GroupAdd(params)
3530
3531 def persistentGroupAdded(self, path, properties):
3532 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
3533 self.persistent = path
3534
3535 def persistentGroupRemoved(self, path):
3536 logger.debug("persistentGroupRemoved: %s" % path)
3537 self.done = True
3538 self.loop.quit()
3539
3540 def deviceFound(self, path):
3541 logger.debug("deviceFound: path=%s" % path)
3542 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3543 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3544 dbus_interface=dbus.PROPERTIES_IFACE,
3545 byte_arrays=True)
3546 logger.debug('peer properties: ' + str(self.peer))
3547
3548 def provisionDiscoveryRequestDisplayPin(self, peer_object, pin):
3549 logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin))
3550 self.peer_path = peer_object
3551 peer = binascii.unhexlify(peer_object.split('/')[-1])
3552 addr = ""
3553 for p in peer:
3554 if len(addr) > 0:
3555 addr += ':'
3556 addr += '%02x' % ord(p)
795b6f57
JM
3557
3558 params = { 'Role': 'registrar',
3559 'P2PDeviceAddress': self.peer['DeviceAddress'],
3560 'Bssid': self.peer['DeviceAddress'],
3561 'Type': 'pin' }
cb346b49 3562 wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
795b6f57
JM
3563 try:
3564 wps.Start(params)
035efb2c 3565 self.exceptions = True
795b6f57 3566 raise Exception("Invalid WPS.Start() accepted")
bab493b9 3567 except dbus.exceptions.DBusException as e:
795b6f57 3568 if "InvalidArgs" not in str(e):
035efb2c 3569 self.exceptions = True
795b6f57 3570 raise Exception("Unexpected error message: " + str(e))
2ec82e67
JM
3571 params = { 'Role': 'registrar',
3572 'P2PDeviceAddress': self.peer['DeviceAddress'],
2ec82e67
JM
3573 'Type': 'pin',
3574 'Pin': '12345670' }
3575 logger.info("Authorize peer to connect to the group")
3576 wps.Start(params)
3577
3578 def staAuthorized(self, name):
3579 logger.debug("staAuthorized: " + name)
3580 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, self.peer_path)
3581 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3582 dbus_interface=dbus.PROPERTIES_IFACE,
3583 byte_arrays=True)
035efb2c 3584 logger.debug("Peer properties: " + str(res))
2ec82e67 3585 if 'Groups' not in res or len(res['Groups']) != 1:
035efb2c 3586 self.exceptions = True
2ec82e67
JM
3587 raise Exception("Unexpected number of peer Groups entries")
3588 if res['Groups'][0] != self.group:
035efb2c 3589 self.exceptions = True
2ec82e67
JM
3590 raise Exception("Unexpected peer Groups[0] value")
3591
3592 g_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group)
3593 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3594 dbus_interface=dbus.PROPERTIES_IFACE,
3595 byte_arrays=True)
3596 logger.debug("Group properties: " + str(res))
3597 if 'Members' not in res or len(res['Members']) != 1:
035efb2c 3598 self.exceptions = True
2ec82e67
JM
3599 raise Exception("Unexpected number of group members")
3600
3601 ext = dbus.ByteArray("\x11\x22\x33\x44")
3602 # Earlier implementation of this interface was a bit strange. The
3603 # property is defined to have aay signature and that is what the
3604 # getter returned. However, the setter expected there to be a
3605 # dictionary with 'WPSVendorExtensions' as the key surrounding these
3606 # values.. The current implementations maintains support for that
3607 # for backwards compability reasons. Verify that encoding first.
3608 vals = dbus.Dictionary({ 'WPSVendorExtensions': [ ext ]},
3609 signature='sv')
3610 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3611 dbus_interface=dbus.PROPERTIES_IFACE)
3612 res = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions',
3613 dbus_interface=dbus.PROPERTIES_IFACE,
3614 byte_arrays=True)
3615 if len(res) != 1:
035efb2c 3616 self.exceptions = True
2ec82e67
JM
3617 raise Exception("Unexpected number of vendor extensions")
3618 if res[0] != ext:
035efb2c 3619 self.exceptions = True
2ec82e67
JM
3620 raise Exception("Vendor extension value changed")
3621
3622 # And now verify that the more appropriate encoding is accepted as
3623 # well.
3624 res.append(dbus.ByteArray('\xaa\xbb\xcc\xdd\xee\xff'))
3625 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3626 dbus_interface=dbus.PROPERTIES_IFACE)
3627 res2 = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions',
3628 dbus_interface=dbus.PROPERTIES_IFACE,
3629 byte_arrays=True)
3630 if len(res) != 2:
035efb2c 3631 self.exceptions = True
2ec82e67
JM
3632 raise Exception("Unexpected number of vendor extensions")
3633 if res[0] != res2[0] or res[1] != res2[1]:
035efb2c 3634 self.exceptions = True
2ec82e67
JM
3635 raise Exception("Vendor extension value changed")
3636
3637 for i in range(10):
3638 res.append(dbus.ByteArray('\xaa\xbb'))
3639 try:
3640 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3641 dbus_interface=dbus.PROPERTIES_IFACE)
035efb2c 3642 self.exceptions = True
2ec82e67 3643 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
bab493b9 3644 except dbus.exceptions.DBusException as e:
2ec82e67 3645 if "Error.Failed" not in str(e):
035efb2c 3646 self.exceptions = True
2ec82e67
JM
3647 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3648
3649 vals = dbus.Dictionary({ 'Foo': [ ext ]}, signature='sv')
3650 try:
3651 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3652 dbus_interface=dbus.PROPERTIES_IFACE)
035efb2c 3653 self.exceptions = True
2ec82e67 3654 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
bab493b9 3655 except dbus.exceptions.DBusException as e:
2ec82e67 3656 if "InvalidArgs" not in str(e):
035efb2c 3657 self.exceptions = True
2ec82e67
JM
3658 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3659
3660 vals = [ "foo" ]
3661 try:
3662 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3663 dbus_interface=dbus.PROPERTIES_IFACE)
035efb2c 3664 self.exceptions = True
2ec82e67 3665 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
bab493b9 3666 except dbus.exceptions.DBusException as e:
2ec82e67 3667 if "Error.Failed" not in str(e):
035efb2c 3668 self.exceptions = True
2ec82e67
JM
3669 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3670
3671 vals = [ [ "foo" ] ]
3672 try:
3673 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3674 dbus_interface=dbus.PROPERTIES_IFACE)
035efb2c 3675 self.exceptions = True
2ec82e67 3676 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
bab493b9 3677 except dbus.exceptions.DBusException as e:
2ec82e67 3678 if "Error.Failed" not in str(e):
035efb2c 3679 self.exceptions = True
2ec82e67
JM
3680 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3681
001c4bf5
JM
3682 p2p.RemoveClient({ 'peer': self.peer_path })
3683
2ec82e67 3684 self.waiting_end = True
cb346b49
JM
3685 group_p2p = dbus.Interface(self.g_if_obj,
3686 WPAS_DBUS_IFACE_P2PDEVICE)
3687 group_p2p.Disconnect()
2ec82e67 3688
001c4bf5
JM
3689 def staDeauthorized(self, name):
3690 logger.debug("staDeauthorized: " + name)
3691 self.deauthorized = True
3692
2ec82e67
JM
3693 def run_test(self, *args):
3694 logger.debug("run_test")
3695 params = dbus.Dictionary({'persistent': True,
3696 'frequency': 2412})
3697 logger.info("Add a persistent group")
3698 p2p.GroupAdd(params)
3699 return False
3700
3701 def success(self):
035efb2c 3702 return self.done and self.deauthorized and not self.exceptions
2ec82e67
JM
3703
3704 with TestDbusP2p(bus) as t:
3705 if not t.success():
3706 raise Exception("Expected signals not seen")
3707
3708 dev[1].wait_go_ending_session()
3709
3710def test_dbus_p2p_autogo_pbc(dev, apdev):
3711 """D-Bus P2P autonomous GO and PBC"""
910eb5b5 3712 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67 3713 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2ec82e67
JM
3714
3715 addr0 = dev[0].p2p_dev_addr()
3716
3717 class TestDbusP2p(TestDbus):
3718 def __init__(self, bus):
3719 TestDbus.__init__(self, bus)
3720 self.first = True
3721 self.waiting_end = False
3722 self.done = False
3723
3724 def __enter__(self):
3725 gobject.timeout_add(1, self.run_test)
3726 gobject.timeout_add(15000, self.timeout)
3727 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3728 "DeviceFound")
3729 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3730 "GroupStarted")
3731 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3732 "GroupFinished")
3733 self.add_signal(self.provisionDiscoveryPBCRequest,
3734 WPAS_DBUS_IFACE_P2PDEVICE,
3735 "ProvisionDiscoveryPBCRequest")
3736 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3737 "StaAuthorized")
3738 self.loop.run()
3739 return self
3740
3741 def groupStarted(self, properties):
3742 logger.debug("groupStarted: " + str(properties))
3743 self.group = properties['group_object']
cb346b49
JM
3744 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3745 properties['interface_object'])
2ec82e67
JM
3746 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3747 dev1.global_request("P2P_CONNECT " + addr0 + " pbc join")
3748
3749 def groupFinished(self, properties):
3750 logger.debug("groupFinished: " + str(properties))
3751 self.done = True
3752 self.loop.quit()
3753
3754 def deviceFound(self, path):
3755 logger.debug("deviceFound: path=%s" % path)
3756 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3757 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3758 dbus_interface=dbus.PROPERTIES_IFACE,
3759 byte_arrays=True)
3760 logger.debug('peer properties: ' + str(self.peer))
3761
3762 def provisionDiscoveryPBCRequest(self, peer_object):
3763 logger.debug("provisionDiscoveryPBCRequest - peer=%s" % peer_object)
3764 self.peer_path = peer_object
3765 peer = binascii.unhexlify(peer_object.split('/')[-1])
3766 addr = ""
3767 for p in peer:
3768 if len(addr) > 0:
3769 addr += ':'
3770 addr += '%02x' % ord(p)
3771 params = { 'Role': 'registrar',
3772 'P2PDeviceAddress': self.peer['DeviceAddress'],
2ec82e67
JM
3773 'Type': 'pbc' }
3774 logger.info("Authorize peer to connect to the group")
cb346b49 3775 wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
2ec82e67
JM
3776 wps.Start(params)
3777
3778 def staAuthorized(self, name):
3779 logger.debug("staAuthorized: " + name)
cb346b49
JM
3780 group_p2p = dbus.Interface(self.g_if_obj,
3781 WPAS_DBUS_IFACE_P2PDEVICE)
3782 group_p2p.Disconnect()
2ec82e67
JM
3783
3784 def run_test(self, *args):
3785 logger.debug("run_test")
3786 params = dbus.Dictionary({'frequency': 2412})
3787 p2p.GroupAdd(params)
3788 return False
3789
3790 def success(self):
3791 return self.done
3792
3793 with TestDbusP2p(bus) as t:
3794 if not t.success():
3795 raise Exception("Expected signals not seen")
3796
3797 dev[1].wait_go_ending_session()
3798 dev[1].flush_scan_cache()
3799
3800def test_dbus_p2p_autogo_legacy(dev, apdev):
3801 """D-Bus P2P autonomous GO and legacy STA"""
910eb5b5 3802 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67 3803 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2ec82e67
JM
3804
3805 addr0 = dev[0].p2p_dev_addr()
3806
3807 class TestDbusP2p(TestDbus):
3808 def __init__(self, bus):
3809 TestDbus.__init__(self, bus)
3810 self.done = False
3811
3812 def __enter__(self):
3813 gobject.timeout_add(1, self.run_test)
3814 gobject.timeout_add(15000, self.timeout)
3815 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3816 "GroupStarted")
3817 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3818 "GroupFinished")
3819 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3820 "StaAuthorized")
3821 self.loop.run()
3822 return self
3823
3824 def groupStarted(self, properties):
3825 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
3826 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
3827 properties['group_object'])
3828 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3829 dbus_interface=dbus.PROPERTIES_IFACE,
3830 byte_arrays=True)
3831 bssid = ':'.join([binascii.hexlify(l) for l in res['BSSID']])
3832
2ec82e67
JM
3833 pin = '12345670'
3834 params = { 'Role': 'enrollee',
3835 'Type': 'pin',
3836 'Pin': pin }
cb346b49
JM
3837 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3838 properties['interface_object'])
3839 wps = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_WPS)
2ec82e67
JM
3840 wps.Start(params)
3841 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49
JM
3842 dev1.scan_for_bss(bssid, freq=2412)
3843 dev1.request("WPS_PIN " + bssid + " " + pin)
3844 self.group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2ec82e67
JM
3845
3846 def groupFinished(self, properties):
3847 logger.debug("groupFinished: " + str(properties))
3848 self.done = True
3849 self.loop.quit()
3850
3851 def staAuthorized(self, name):
3852 logger.debug("staAuthorized: " + name)
3853 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3854 dev1.request("DISCONNECT")
cb346b49 3855 self.group_p2p.Disconnect()
2ec82e67
JM
3856
3857 def run_test(self, *args):
3858 logger.debug("run_test")
3859 params = dbus.Dictionary({'frequency': 2412})
3860 p2p.GroupAdd(params)
3861 return False
3862
3863 def success(self):
3864 return self.done
3865
3866 with TestDbusP2p(bus) as t:
3867 if not t.success():
3868 raise Exception("Expected signals not seen")
3869
3870def test_dbus_p2p_join(dev, apdev):
3871 """D-Bus P2P join an autonomous GO"""
910eb5b5 3872 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3873 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3874
3875 addr1 = dev[1].p2p_dev_addr()
3876 addr2 = dev[2].p2p_dev_addr()
3877 dev[1].p2p_start_go(freq=2412)
cb346b49 3878 dev1_group_ifname = dev[1].group_ifname
2ec82e67
JM
3879 dev[2].p2p_listen()
3880
3881 class TestDbusP2p(TestDbus):
3882 def __init__(self, bus):
3883 TestDbus.__init__(self, bus)
3884 self.done = False
3885 self.peer = None
3886 self.go = None
3887
3888 def __enter__(self):
3889 gobject.timeout_add(1, self.run_test)
3890 gobject.timeout_add(15000, self.timeout)
3891 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3892 "DeviceFound")
3893 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3894 "GroupStarted")
3895 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3896 "GroupFinished")
3897 self.add_signal(self.invitationResult, WPAS_DBUS_IFACE_P2PDEVICE,
3898 "InvitationResult")
3899 self.loop.run()
3900 return self
3901
3902 def deviceFound(self, path):
3903 logger.debug("deviceFound: path=%s" % path)
3904 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3905 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3906 dbus_interface=dbus.PROPERTIES_IFACE,
3907 byte_arrays=True)
3908 logger.debug('peer properties: ' + str(res))
3909 if addr2.replace(':','') in path:
3910 self.peer = path
3911 elif addr1.replace(':','') in path:
3912 self.go = path
3913 if self.peer and self.go:
3914 logger.info("Join the group")
3915 p2p.StopFind()
3916 args = { 'peer': self.go,
3917 'join': True,
3918 'wps_method': 'pin',
3919 'frequency': 2412 }
3920 pin = p2p.Connect(args)
3921
3922 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49
JM
3923 dev1.group_ifname = dev1_group_ifname
3924 dev1.group_request("WPS_PIN any " + pin)
2ec82e67
JM
3925
3926 def groupStarted(self, properties):
3927 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
3928 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3929 properties['interface_object'])
3930 role = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
3931 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67
JM
3932 if role != "client":
3933 raise Exception("Unexpected role reported: " + role)
cb346b49
JM
3934 group = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
3935 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67
JM
3936 if group != properties['group_object']:
3937 raise Exception("Unexpected Group reported: " + str(group))
cb346b49
JM
3938 go = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
3939 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67
JM
3940 if go != self.go:
3941 raise Exception("Unexpected PeerGO value: " + str(go))
3942
3943 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
3944 properties['group_object'])
3945 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3946 dbus_interface=dbus.PROPERTIES_IFACE,
3947 byte_arrays=True)
3948 logger.debug("Group properties: " + str(res))
3949
3950 ext = dbus.ByteArray("\x11\x22\x33\x44")
3951 try:
3952 # Set(WPSVendorExtensions) not allowed for P2P Client
3953 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3954 dbus_interface=dbus.PROPERTIES_IFACE)
3955 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
bab493b9 3956 except dbus.exceptions.DBusException as e:
2ec82e67
JM
3957 if "Error.Failed: Failed to set property" not in str(e):
3958 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3959
cb346b49 3960 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2ec82e67
JM
3961 args = { 'duration1': 30000, 'interval1': 102400,
3962 'duration2': 20000, 'interval2': 102400 }
cb346b49 3963 group_p2p.PresenceRequest(args)
2ec82e67
JM
3964
3965 args = { 'peer': self.peer }
cb346b49 3966 group_p2p.Invite(args)
2ec82e67
JM
3967
3968 def groupFinished(self, properties):
3969 logger.debug("groupFinished: " + str(properties))
3970 self.done = True
3971 self.loop.quit()
3972
3973 def invitationResult(self, result):
3974 logger.debug("invitationResult: " + str(result))
3975 if result['status'] != 1:
3976 raise Exception("Unexpected invitation result: " + str(result))
3977 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49 3978 dev1.group_ifname = dev1_group_ifname
2ec82e67
JM
3979 dev1.remove_group()
3980
3981 def run_test(self, *args):
3982 logger.debug("run_test")
3983 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
3984 return False
3985
3986 def success(self):
3987 return self.done
3988
3989 with TestDbusP2p(bus) as t:
3990 if not t.success():
3991 raise Exception("Expected signals not seen")
3992
3993 dev[2].p2p_stop_find()
3994
1992af11
JM
3995def test_dbus_p2p_invitation_received(dev, apdev):
3996 """D-Bus P2P and InvitationReceived"""
3997 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3998 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3999
4000 form(dev[0], dev[1])
4001 addr0 = dev[0].p2p_dev_addr()
4002 dev[0].p2p_listen()
4003 dev[0].global_request("SET persistent_reconnect 0")
4004
4005 if not dev[1].discover_peer(addr0, social=True):
4006 raise Exception("Peer " + addr0 + " not found")
4007 peer = dev[1].get_peer(addr0)
4008
4009 class TestDbusP2p(TestDbus):
4010 def __init__(self, bus):
4011 TestDbus.__init__(self, bus)
4012 self.done = False
4013
4014 def __enter__(self):
4015 gobject.timeout_add(1, self.run_test)
4016 gobject.timeout_add(15000, self.timeout)
4017 self.add_signal(self.invitationReceived, WPAS_DBUS_IFACE_P2PDEVICE,
4018 "InvitationReceived")
4019 self.loop.run()
4020 return self
4021
4022 def invitationReceived(self, result):
4023 logger.debug("invitationReceived: " + str(result))
4024 self.done = True
4025 self.loop.quit()
4026
4027 def run_test(self, *args):
4028 logger.debug("run_test")
4029 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4030 cmd = "P2P_INVITE persistent=" + peer['persistent'] + " peer=" + addr0
4031 dev1.global_request(cmd)
4032 return False
4033
4034 def success(self):
4035 return self.done
4036
4037 with TestDbusP2p(bus) as t:
4038 if not t.success():
4039 raise Exception("Expected signals not seen")
4040
4041 dev[0].p2p_stop_find()
4042 dev[1].p2p_stop_find()
4043
2ec82e67
JM
4044def test_dbus_p2p_config(dev, apdev):
4045 """D-Bus Get/Set P2PDeviceConfig"""
4046 try:
4047 _test_dbus_p2p_config(dev, apdev)
4048 finally:
4049 dev[0].request("P2P_SET ssid_postfix ")
4050
4051def _test_dbus_p2p_config(dev, apdev):
910eb5b5 4052 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
4053 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4054
4055 res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4056 dbus_interface=dbus.PROPERTIES_IFACE,
4057 byte_arrays=True)
4058 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", res,
4059 dbus_interface=dbus.PROPERTIES_IFACE)
4060 res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4061 dbus_interface=dbus.PROPERTIES_IFACE,
4062 byte_arrays=True)
4063
4064 if len(res) != len(res2):
4065 raise Exception("Different number of parameters")
4066 for k in res:
4067 if res[k] != res2[k]:
4068 raise Exception("Parameter %s value changes" % k)
4069
4070 changes = { 'SsidPostfix': 'foo',
4071 'VendorExtension': [ dbus.ByteArray('\x11\x22\x33\x44') ],
4072 'SecondaryDeviceTypes': [ dbus.ByteArray('\x11\x22\x33\x44\x55\x66\x77\x88') ]}
4073 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4074 dbus.Dictionary(changes, signature='sv'),
4075 dbus_interface=dbus.PROPERTIES_IFACE)
4076
4077 res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4078 dbus_interface=dbus.PROPERTIES_IFACE,
4079 byte_arrays=True)
4080 logger.debug("P2PDeviceConfig: " + str(res2))
4081 if 'VendorExtension' not in res2 or len(res2['VendorExtension']) != 1:
4082 raise Exception("VendorExtension does not match")
4083 if 'SecondaryDeviceTypes' not in res2 or len(res2['SecondaryDeviceTypes']) != 1:
4084 raise Exception("SecondaryDeviceType does not match")
4085
4086 changes = { 'SsidPostfix': '',
4087 'VendorExtension': dbus.Array([], signature="ay"),
4088 'SecondaryDeviceTypes': dbus.Array([], signature="ay") }
4089 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4090 dbus.Dictionary(changes, signature='sv'),
4091 dbus_interface=dbus.PROPERTIES_IFACE)
4092
4093 res3 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4094 dbus_interface=dbus.PROPERTIES_IFACE,
4095 byte_arrays=True)
4096 logger.debug("P2PDeviceConfig: " + str(res3))
4097 if 'VendorExtension' in res3:
4098 raise Exception("VendorExtension not removed")
4099 if 'SecondaryDeviceTypes' in res3:
4100 raise Exception("SecondaryDeviceType not removed")
4101
4102 try:
4103 dev[0].request("P2P_SET disabled 1")
4104 if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4105 dbus_interface=dbus.PROPERTIES_IFACE,
4106 byte_arrays=True)
4107 raise Exception("Invalid Get(P2PDeviceConfig) accepted")
bab493b9 4108 except dbus.exceptions.DBusException as e:
2ec82e67
JM
4109 if "Error.Failed: P2P is not available for this interface" not in str(e):
4110 raise Exception("Unexpected error message for invalid Invite: " + str(e))
4111 finally:
4112 dev[0].request("P2P_SET disabled 0")
4113
4114 try:
4115 dev[0].request("P2P_SET disabled 1")
4116 changes = { 'SsidPostfix': 'foo' }
4117 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4118 dbus.Dictionary(changes, signature='sv'),
4119 dbus_interface=dbus.PROPERTIES_IFACE)
4120 raise Exception("Invalid Set(P2PDeviceConfig) accepted")
bab493b9 4121 except dbus.exceptions.DBusException as e:
2ec82e67
JM
4122 if "Error.Failed: P2P is not available for this interface" not in str(e):
4123 raise Exception("Unexpected error message for invalid Invite: " + str(e))
4124 finally:
4125 dev[0].request("P2P_SET disabled 0")
4126
4127 tests = [ { 'DeviceName': 123 },
4128 { 'SsidPostfix': 123 },
4129 { 'Foo': 'Bar' } ]
4130 for changes in tests:
4131 try:
4132 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4133 dbus.Dictionary(changes, signature='sv'),
4134 dbus_interface=dbus.PROPERTIES_IFACE)
4135 raise Exception("Invalid Set(P2PDeviceConfig) accepted")
bab493b9 4136 except dbus.exceptions.DBusException as e:
2ec82e67
JM
4137 if "InvalidArgs" not in str(e):
4138 raise Exception("Unexpected error message for invalid Invite: " + str(e))
4139
4140def test_dbus_p2p_persistent(dev, apdev):
4141 """D-Bus P2P persistent group"""
910eb5b5 4142 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
4143 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4144
4145 class TestDbusP2p(TestDbus):
4146 def __init__(self, bus):
4147 TestDbus.__init__(self, bus)
4148
4149 def __enter__(self):
4150 gobject.timeout_add(1, self.run_test)
4151 gobject.timeout_add(15000, self.timeout)
4152 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4153 "GroupStarted")
4154 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4155 "GroupFinished")
4156 self.add_signal(self.persistentGroupAdded,
4157 WPAS_DBUS_IFACE_P2PDEVICE,
4158 "PersistentGroupAdded")
4159 self.loop.run()
4160 return self
4161
4162 def groupStarted(self, properties):
4163 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
4164 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4165 properties['interface_object'])
4166 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4167 group_p2p.Disconnect()
2ec82e67
JM
4168
4169 def groupFinished(self, properties):
4170 logger.debug("groupFinished: " + str(properties))
4171 self.loop.quit()
4172
4173 def persistentGroupAdded(self, path, properties):
4174 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
4175 self.persistent = path
4176
4177 def run_test(self, *args):
4178 logger.debug("run_test")
4179 params = dbus.Dictionary({'persistent': True,
4180 'frequency': 2412})
4181 logger.info("Add a persistent group")
4182 p2p.GroupAdd(params)
4183 return False
4184
4185 def success(self):
4186 return True
4187
4188 with TestDbusP2p(bus) as t:
4189 if not t.success():
4190 raise Exception("Expected signals not seen")
4191 persistent = t.persistent
4192
4193 p_obj = bus.get_object(WPAS_DBUS_SERVICE, persistent)
4194 res = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties",
4195 dbus_interface=dbus.PROPERTIES_IFACE, byte_arrays=True)
4196 logger.info("Persistent group Properties: " + str(res))
4197 vals = dbus.Dictionary({ 'ssid': 'DIRECT-foo' }, signature='sv')
4198 p_obj.Set(WPAS_DBUS_PERSISTENT_GROUP, "Properties", vals,
4199 dbus_interface=dbus.PROPERTIES_IFACE)
4200 res2 = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties",
4201 dbus_interface=dbus.PROPERTIES_IFACE)
4202 if len(res) != len(res2):
4203 raise Exception("Different number of parameters")
4204 for k in res:
4205 if k != 'ssid' and res[k] != res2[k]:
4206 raise Exception("Parameter %s value changes" % k)
4207 if res2['ssid'] != '"DIRECT-foo"':
4208 raise Exception("Unexpected ssid")
4209
4210 args = dbus.Dictionary({ 'ssid': 'DIRECT-testing',
4211 'psk': '1234567890' }, signature='sv')
4212 group = p2p.AddPersistentGroup(args)
4213
4214 groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups",
4215 dbus_interface=dbus.PROPERTIES_IFACE)
4216 if len(groups) != 2:
4217 raise Exception("Unexpected number of persistent groups: " + str(groups))
4218
4219 p2p.RemoveAllPersistentGroups()
4220
4221 groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups",
4222 dbus_interface=dbus.PROPERTIES_IFACE)
4223 if len(groups) != 0:
4224 raise Exception("Unexpected number of persistent groups: " + str(groups))
4225
4226 try:
4227 p2p.RemovePersistentGroup(persistent)
4228 raise Exception("Invalid RemovePersistentGroup accepted")
bab493b9 4229 except dbus.exceptions.DBusException as e:
2ec82e67
JM
4230 if "NetworkUnknown: There is no such persistent group" not in str(e):
4231 raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e))
4232
4233def test_dbus_p2p_reinvoke_persistent(dev, apdev):
4234 """D-Bus P2P reinvoke persistent group"""
910eb5b5 4235 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67 4236 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2ec82e67
JM
4237
4238 addr0 = dev[0].p2p_dev_addr()
4239
4240 class TestDbusP2p(TestDbus):
4241 def __init__(self, bus):
4242 TestDbus.__init__(self, bus)
4243 self.first = True
4244 self.waiting_end = False
4245 self.done = False
4246 self.invited = False
4247
4248 def __enter__(self):
4249 gobject.timeout_add(1, self.run_test)
4250 gobject.timeout_add(15000, self.timeout)
4251 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4252 "DeviceFound")
4253 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4254 "GroupStarted")
4255 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4256 "GroupFinished")
4257 self.add_signal(self.persistentGroupAdded,
4258 WPAS_DBUS_IFACE_P2PDEVICE,
4259 "PersistentGroupAdded")
4260 self.add_signal(self.provisionDiscoveryRequestDisplayPin,
4261 WPAS_DBUS_IFACE_P2PDEVICE,
4262 "ProvisionDiscoveryRequestDisplayPin")
4263 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
4264 "StaAuthorized")
4265 self.loop.run()
4266 return self
4267
4268 def groupStarted(self, properties):
4269 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
4270 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4271 properties['interface_object'])
2ec82e67 4272 if not self.invited:
cb346b49
JM
4273 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
4274 properties['group_object'])
4275 res = g_obj.GetAll(WPAS_DBUS_GROUP,
4276 dbus_interface=dbus.PROPERTIES_IFACE,
4277 byte_arrays=True)
4278 bssid = ':'.join([binascii.hexlify(l) for l in res['BSSID']])
2ec82e67 4279 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49 4280 dev1.scan_for_bss(bssid, freq=2412)
2ec82e67
JM
4281 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join")
4282
4283 def groupFinished(self, properties):
4284 logger.debug("groupFinished: " + str(properties))
4285 if self.invited:
4286 self.done = True
4287 self.loop.quit()
4288 else:
4289 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49 4290 dev1.global_request("SET persistent_reconnect 1")
2ec82e67
JM
4291 dev1.p2p_listen()
4292
4293 args = { 'persistent_group_object': dbus.ObjectPath(path),
4294 'peer': self.peer_path }
4295 try:
4296 pin = p2p.Invite(args)
4297 raise Exception("Invalid Invite accepted")
bab493b9 4298 except dbus.exceptions.DBusException as e:
2ec82e67
JM
4299 if "InvalidArgs" not in str(e):
4300 raise Exception("Unexpected error message for invalid Invite: " + str(e))
4301
4302 args = { 'persistent_group_object': self.persistent,
4303 'peer': self.peer_path }
4304 pin = p2p.Invite(args)
4305 self.invited = True
4306
cb346b49
JM
4307 self.sta_group_ev = dev1.wait_global_event(["P2P-GROUP-STARTED"],
4308 timeout=15)
4309 if self.sta_group_ev is None:
4310 raise Exception("P2P-GROUP-STARTED event not seen")
4311
2ec82e67
JM
4312 def persistentGroupAdded(self, path, properties):
4313 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
4314 self.persistent = path
4315
4316 def deviceFound(self, path):
4317 logger.debug("deviceFound: path=%s" % path)
4318 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
4319 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
4320 dbus_interface=dbus.PROPERTIES_IFACE,
4321 byte_arrays=True)
4322
4323 def provisionDiscoveryRequestDisplayPin(self, peer_object, pin):
4324 logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin))
4325 self.peer_path = peer_object
4326 peer = binascii.unhexlify(peer_object.split('/')[-1])
4327 addr = ""
4328 for p in peer:
4329 if len(addr) > 0:
4330 addr += ':'
4331 addr += '%02x' % ord(p)
4332 params = { 'Role': 'registrar',
4333 'P2PDeviceAddress': self.peer['DeviceAddress'],
4334 'Bssid': self.peer['DeviceAddress'],
4335 'Type': 'pin',
4336 'Pin': '12345670' }
4337 logger.info("Authorize peer to connect to the group")
cb346b49
JM
4338 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4339 wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
2ec82e67 4340 wps.Start(params)
cb346b49
JM
4341 self.sta_group_ev = dev1.wait_global_event(["P2P-GROUP-STARTED"],
4342 timeout=15)
4343 if self.sta_group_ev is None:
4344 raise Exception("P2P-GROUP-STARTED event not seen")
2ec82e67
JM
4345
4346 def staAuthorized(self, name):
4347 logger.debug("staAuthorized: " + name)
4348 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49 4349 dev1.group_form_result(self.sta_group_ev)
2ec82e67 4350 dev1.remove_group()
cb346b49 4351 ev = dev1.wait_global_event(["P2P-GROUP-REMOVED"], timeout=10)
2ec82e67
JM
4352 if ev is None:
4353 raise Exception("Group removal timed out")
cb346b49
JM
4354 group_p2p = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4355 group_p2p.Disconnect()
2ec82e67
JM
4356
4357 def run_test(self, *args):
4358 logger.debug("run_test")
4359 params = dbus.Dictionary({'persistent': True,
4360 'frequency': 2412})
4361 logger.info("Add a persistent group")
4362 p2p.GroupAdd(params)
4363 return False
4364
4365 def success(self):
4366 return self.done
4367
4368 with TestDbusP2p(bus) as t:
4369 if not t.success():
4370 raise Exception("Expected signals not seen")
4371
4372def test_dbus_p2p_go_neg_rx(dev, apdev):
4373 """D-Bus P2P GO Negotiation receive"""
910eb5b5 4374 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
4375 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4376 addr0 = dev[0].p2p_dev_addr()
4377
4378 class TestDbusP2p(TestDbus):
4379 def __init__(self, bus):
4380 TestDbus.__init__(self, bus)
4381 self.done = False
4382
4383 def __enter__(self):
4384 gobject.timeout_add(1, self.run_test)
4385 gobject.timeout_add(15000, self.timeout)
4386 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4387 "DeviceFound")
4388 self.add_signal(self.goNegotiationRequest,
4389 WPAS_DBUS_IFACE_P2PDEVICE,
4390 "GONegotiationRequest",
4391 byte_arrays=True)
4392 self.add_signal(self.goNegotiationSuccess,
4393 WPAS_DBUS_IFACE_P2PDEVICE,
4394 "GONegotiationSuccess",
4395 byte_arrays=True)
4396 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4397 "GroupStarted")
4398 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4399 "GroupFinished")
4400 self.loop.run()
4401 return self
4402
4403 def deviceFound(self, path):
4404 logger.debug("deviceFound: path=%s" % path)
4405
f5d5161d
JM
4406 def goNegotiationRequest(self, path, dev_passwd_id, go_intent=0):
4407 logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path, dev_passwd_id, go_intent))
2ec82e67
JM
4408 if dev_passwd_id != 1:
4409 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id)
4410 args = { 'peer': path, 'wps_method': 'display', 'pin': '12345670',
4411 'go_intent': 15, 'persistent': False, 'frequency': 5175 }
4412 try:
4413 p2p.Connect(args)
4414 raise Exception("Invalid Connect accepted")
bab493b9 4415 except dbus.exceptions.DBusException as e:
2ec82e67
JM
4416 if "ConnectChannelUnsupported" not in str(e):
4417 raise Exception("Unexpected error message for invalid Connect: " + str(e))
4418
4419 args = { 'peer': path, 'wps_method': 'display', 'pin': '12345670',
4420 'go_intent': 15, 'persistent': False }
4421 p2p.Connect(args)
4422
4423 def goNegotiationSuccess(self, properties):
4424 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4425
4426 def groupStarted(self, properties):
4427 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
4428 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4429 properties['interface_object'])
4430 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4431 group_p2p.Disconnect()
2ec82e67
JM
4432
4433 def groupFinished(self, properties):
4434 logger.debug("groupFinished: " + str(properties))
4435 self.done = True
4436 self.loop.quit()
4437
4438 def run_test(self, *args):
4439 logger.debug("run_test")
4440 p2p.Listen(10)
4441 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4442 if not dev1.discover_peer(addr0):
4443 raise Exception("Peer not found")
4444 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 enter")
4445 return False
4446
4447 def success(self):
4448 return self.done
4449
4450 with TestDbusP2p(bus) as t:
4451 if not t.success():
4452 raise Exception("Expected signals not seen")
4453
4454def test_dbus_p2p_go_neg_auth(dev, apdev):
4455 """D-Bus P2P GO Negotiation authorized"""
910eb5b5 4456 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
4457 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4458 addr0 = dev[0].p2p_dev_addr()
4459 dev[1].p2p_listen()
4460
4461 class TestDbusP2p(TestDbus):
4462 def __init__(self, bus):
4463 TestDbus.__init__(self, bus)
4464 self.done = False
4465 self.peer_joined = False
4466 self.peer_disconnected = False
4467
4468 def __enter__(self):
4469 gobject.timeout_add(1, self.run_test)
4470 gobject.timeout_add(15000, self.timeout)
4471 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4472 "DeviceFound")
4473 self.add_signal(self.goNegotiationSuccess,
4474 WPAS_DBUS_IFACE_P2PDEVICE,
4475 "GONegotiationSuccess",
4476 byte_arrays=True)
4477 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4478 "GroupStarted")
4479 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4480 "GroupFinished")
4481 self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE,
4482 "StaDeauthorized")
4483 self.add_signal(self.peerJoined, WPAS_DBUS_GROUP,
4484 "PeerJoined")
4485 self.add_signal(self.peerDisconnected, WPAS_DBUS_GROUP,
4486 "PeerDisconnected")
4487 self.loop.run()
4488 return self
4489
4490 def deviceFound(self, path):
4491 logger.debug("deviceFound: path=%s" % path)
4492 args = { 'peer': path, 'wps_method': 'keypad',
4493 'go_intent': 15, 'authorize_only': True }
4494 try:
4495 p2p.Connect(args)
4496 raise Exception("Invalid Connect accepted")
bab493b9 4497 except dbus.exceptions.DBusException as e:
2ec82e67
JM
4498 if "InvalidArgs" not in str(e):
4499 raise Exception("Unexpected error message for invalid Connect: " + str(e))
4500
4501 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4502 'go_intent': 15, 'authorize_only': True }
4503 p2p.Connect(args)
4504 p2p.Listen(10)
4505 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4506 if not dev1.discover_peer(addr0):
4507 raise Exception("Peer not found")
4508 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=0")
bc6e3288 4509 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
2ec82e67
JM
4510 if ev is None:
4511 raise Exception("Group formation timed out")
cb346b49 4512 self.sta_group_ev = ev
2ec82e67
JM
4513
4514 def goNegotiationSuccess(self, properties):
4515 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4516
4517 def groupStarted(self, properties):
4518 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
4519 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4520 properties['interface_object'])
2ec82e67 4521 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49 4522 dev1.group_form_result(self.sta_group_ev)
2ec82e67
JM
4523 dev1.remove_group()
4524
4525 def staDeauthorized(self, name):
4526 logger.debug("staDeuthorized: " + name)
cb346b49
JM
4527 group_p2p = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4528 group_p2p.Disconnect()
2ec82e67
JM
4529
4530 def peerJoined(self, peer):
4531 logger.debug("peerJoined: " + peer)
4532 self.peer_joined = True
4533
4534 def peerDisconnected(self, peer):
4535 logger.debug("peerDisconnected: " + peer)
4536 self.peer_disconnected = True
4537
4538 def groupFinished(self, properties):
4539 logger.debug("groupFinished: " + str(properties))
4540 self.done = True
4541 self.loop.quit()
4542
4543 def run_test(self, *args):
4544 logger.debug("run_test")
4545 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4546 return False
4547
4548 def success(self):
4549 return self.done and self.peer_joined and self.peer_disconnected
4550
4551 with TestDbusP2p(bus) as t:
4552 if not t.success():
4553 raise Exception("Expected signals not seen")
4554
4555def test_dbus_p2p_go_neg_init(dev, apdev):
4556 """D-Bus P2P GO Negotiation initiation"""
910eb5b5 4557 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
4558 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4559 addr0 = dev[0].p2p_dev_addr()
4560 dev[1].p2p_listen()
4561
4562 class TestDbusP2p(TestDbus):
4563 def __init__(self, bus):
4564 TestDbus.__init__(self, bus)
4565 self.done = False
20fd8def
JM
4566 self.peer_group_added = False
4567 self.peer_group_removed = False
2ec82e67
JM
4568
4569 def __enter__(self):
4570 gobject.timeout_add(1, self.run_test)
4571 gobject.timeout_add(15000, self.timeout)
4572 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4573 "DeviceFound")
4574 self.add_signal(self.goNegotiationSuccess,
4575 WPAS_DBUS_IFACE_P2PDEVICE,
4576 "GONegotiationSuccess",
4577 byte_arrays=True)
4578 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4579 "GroupStarted")
4580 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4581 "GroupFinished")
20fd8def
JM
4582 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4583 "PropertiesChanged")
2ec82e67
JM
4584 self.loop.run()
4585 return self
4586
4587 def deviceFound(self, path):
4588 logger.debug("deviceFound: path=%s" % path)
4589 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4590 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4591 'go_intent': 0 }
4592 p2p.Connect(args)
4593
4594 ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4595 if ev is None:
4596 raise Exception("Timeout while waiting for GO Neg Request")
4597 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
bc6e3288 4598 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
2ec82e67
JM
4599 if ev is None:
4600 raise Exception("Group formation timed out")
cb346b49 4601 self.sta_group_ev = ev
2ec82e67
JM
4602
4603 def goNegotiationSuccess(self, properties):
4604 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4605
4606 def groupStarted(self, properties):
4607 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
4608 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4609 properties['interface_object'])
4610 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4611 group_p2p.Disconnect()
2ec82e67 4612 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49 4613 dev1.group_form_result(self.sta_group_ev)
2ec82e67
JM
4614 dev1.remove_group()
4615
4616 def groupFinished(self, properties):
4617 logger.debug("groupFinished: " + str(properties))
4618 self.done = True
20fd8def
JM
4619
4620 def propertiesChanged(self, interface_name, changed_properties,
4621 invalidated_properties):
4622 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4623 if interface_name != WPAS_DBUS_P2P_PEER:
4624 return
4625 if "Groups" not in changed_properties:
4626 return
4627 if len(changed_properties["Groups"]) > 0:
4628 self.peer_group_added = True
4629 if len(changed_properties["Groups"]) == 0:
3301e925
JM
4630 if not self.peer_group_added:
4631 # This is likely a leftover event from an earlier test case,
4632 # ignore it to allow this test case to go through its steps.
4633 logger.info("Ignore propertiesChanged indicating group removal before group has been added")
4634 return
20fd8def
JM
4635 self.peer_group_removed = True
4636 self.loop.quit()
2ec82e67
JM
4637
4638 def run_test(self, *args):
4639 logger.debug("run_test")
4640 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4641 return False
4642
4643 def success(self):
20fd8def
JM
4644 return self.done and self.peer_group_added and self.peer_group_removed
4645
4646 with TestDbusP2p(bus) as t:
4647 if not t.success():
4648 raise Exception("Expected signals not seen")
4649
4650def test_dbus_p2p_group_termination_by_go(dev, apdev):
4651 """D-Bus P2P group removal on GO terminating the group"""
4652 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4653 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4654 addr0 = dev[0].p2p_dev_addr()
4655 dev[1].p2p_listen()
4656
4657 class TestDbusP2p(TestDbus):
4658 def __init__(self, bus):
4659 TestDbus.__init__(self, bus)
4660 self.done = False
4661 self.peer_group_added = False
4662 self.peer_group_removed = False
4663
4664 def __enter__(self):
4665 gobject.timeout_add(1, self.run_test)
4666 gobject.timeout_add(15000, self.timeout)
4667 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4668 "DeviceFound")
4669 self.add_signal(self.goNegotiationSuccess,
4670 WPAS_DBUS_IFACE_P2PDEVICE,
4671 "GONegotiationSuccess",
4672 byte_arrays=True)
4673 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4674 "GroupStarted")
4675 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4676 "GroupFinished")
4677 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4678 "PropertiesChanged")
4679 self.loop.run()
4680 return self
4681
4682 def deviceFound(self, path):
4683 logger.debug("deviceFound: path=%s" % path)
4684 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4685 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4686 'go_intent': 0 }
4687 p2p.Connect(args)
4688
4689 ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4690 if ev is None:
4691 raise Exception("Timeout while waiting for GO Neg Request")
4692 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
bc6e3288 4693 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
20fd8def
JM
4694 if ev is None:
4695 raise Exception("Group formation timed out")
4696 self.sta_group_ev = ev
4697
4698 def goNegotiationSuccess(self, properties):
4699 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4700
4701 def groupStarted(self, properties):
4702 logger.debug("groupStarted: " + str(properties))
4703 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4704 properties['interface_object'])
4705 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4706 dev1.group_form_result(self.sta_group_ev)
4707 dev1.remove_group()
4708
4709 def groupFinished(self, properties):
4710 logger.debug("groupFinished: " + str(properties))
4711 self.done = True
4712
4713 def propertiesChanged(self, interface_name, changed_properties,
4714 invalidated_properties):
4715 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4716 if interface_name != WPAS_DBUS_P2P_PEER:
4717 return
4718 if "Groups" not in changed_properties:
4719 return
4720 if len(changed_properties["Groups"]) > 0:
4721 self.peer_group_added = True
6fad40df 4722 if len(changed_properties["Groups"]) == 0 and self.peer_group_added:
20fd8def
JM
4723 self.peer_group_removed = True
4724 self.loop.quit()
4725
4726 def run_test(self, *args):
4727 logger.debug("run_test")
4728 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4729 return False
4730
4731 def success(self):
4732 return self.done and self.peer_group_added and self.peer_group_removed
4733
4734 with TestDbusP2p(bus) as t:
4735 if not t.success():
4736 raise Exception("Expected signals not seen")
4737
4738def test_dbus_p2p_group_idle_timeout(dev, apdev):
4739 """D-Bus P2P group removal on idle timeout"""
4740 try:
4741 dev[0].global_request("SET p2p_group_idle 1")
4742 _test_dbus_p2p_group_idle_timeout(dev, apdev)
4743 finally:
4744 dev[0].global_request("SET p2p_group_idle 0")
4745
4746def _test_dbus_p2p_group_idle_timeout(dev, apdev):
4747 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4748 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4749 addr0 = dev[0].p2p_dev_addr()
4750 dev[1].p2p_listen()
4751
4752 class TestDbusP2p(TestDbus):
4753 def __init__(self, bus):
4754 TestDbus.__init__(self, bus)
4755 self.done = False
845d48c1 4756 self.group_started = False
20fd8def
JM
4757 self.peer_group_added = False
4758 self.peer_group_removed = False
4759
4760 def __enter__(self):
4761 gobject.timeout_add(1, self.run_test)
4762 gobject.timeout_add(15000, self.timeout)
4763 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4764 "DeviceFound")
4765 self.add_signal(self.goNegotiationSuccess,
4766 WPAS_DBUS_IFACE_P2PDEVICE,
4767 "GONegotiationSuccess",
4768 byte_arrays=True)
4769 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4770 "GroupStarted")
4771 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4772 "GroupFinished")
4773 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4774 "PropertiesChanged")
4775 self.loop.run()
4776 return self
4777
4778 def deviceFound(self, path):
4779 logger.debug("deviceFound: path=%s" % path)
4780 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4781 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4782 'go_intent': 0 }
4783 p2p.Connect(args)
4784
4785 ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4786 if ev is None:
4787 raise Exception("Timeout while waiting for GO Neg Request")
4788 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
bc6e3288 4789 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
20fd8def
JM
4790 if ev is None:
4791 raise Exception("Group formation timed out")
4792 self.sta_group_ev = ev
4793
4794 def goNegotiationSuccess(self, properties):
4795 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4796
4797 def groupStarted(self, properties):
4798 logger.debug("groupStarted: " + str(properties))
845d48c1 4799 self.group_started = True
20fd8def
JM
4800 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4801 properties['interface_object'])
4802 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4803 dev1.group_form_result(self.sta_group_ev)
4804 ifaddr = dev1.group_request("STA-FIRST").splitlines()[0]
4805 # Force disassociation with different reason code so that the
4806 # P2P Client using D-Bus does not get normal group termination event
4807 # from the GO.
4808 dev1.group_request("DEAUTHENTICATE " + ifaddr + " reason=0 test=0")
4809 dev1.remove_group()
4810
4811 def groupFinished(self, properties):
4812 logger.debug("groupFinished: " + str(properties))
4813 self.done = True
4814
4815 def propertiesChanged(self, interface_name, changed_properties,
4816 invalidated_properties):
4817 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4818 if interface_name != WPAS_DBUS_P2P_PEER:
4819 return
845d48c1
JM
4820 if not self.group_started:
4821 return
20fd8def
JM
4822 if "Groups" not in changed_properties:
4823 return
4824 if len(changed_properties["Groups"]) > 0:
4825 self.peer_group_added = True
4826 if len(changed_properties["Groups"]) == 0:
4827 self.peer_group_removed = True
4828 self.loop.quit()
4829
4830 def run_test(self, *args):
4831 logger.debug("run_test")
4832 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4833 return False
4834
4835 def success(self):
4836 return self.done and self.peer_group_added and self.peer_group_removed
2ec82e67
JM
4837
4838 with TestDbusP2p(bus) as t:
4839 if not t.success():
4840 raise Exception("Expected signals not seen")
4841
4842def test_dbus_p2p_wps_failure(dev, apdev):
4843 """D-Bus P2P WPS failure"""
910eb5b5 4844 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
4845 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4846 addr0 = dev[0].p2p_dev_addr()
4847
4848 class TestDbusP2p(TestDbus):
4849 def __init__(self, bus):
4850 TestDbus.__init__(self, bus)
084780f1
JM
4851 self.wps_failed = False
4852 self.formation_failure = False
2ec82e67
JM
4853
4854 def __enter__(self):
4855 gobject.timeout_add(1, self.run_test)
4856 gobject.timeout_add(15000, self.timeout)
4857 self.add_signal(self.goNegotiationRequest,
4858 WPAS_DBUS_IFACE_P2PDEVICE,
4859 "GONegotiationRequest",
4860 byte_arrays=True)
4861 self.add_signal(self.goNegotiationSuccess,
4862 WPAS_DBUS_IFACE_P2PDEVICE,
4863 "GONegotiationSuccess",
4864 byte_arrays=True)
4865 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4866 "GroupStarted")
4867 self.add_signal(self.wpsFailed, WPAS_DBUS_IFACE_P2PDEVICE,
4868 "WpsFailed")
084780f1
JM
4869 self.add_signal(self.groupFormationFailure,
4870 WPAS_DBUS_IFACE_P2PDEVICE,
4871 "GroupFormationFailure")
2ec82e67
JM
4872 self.loop.run()
4873 return self
4874
f5d5161d
JM
4875 def goNegotiationRequest(self, path, dev_passwd_id, go_intent=0):
4876 logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path, dev_passwd_id, go_intent))
2ec82e67
JM
4877 if dev_passwd_id != 1:
4878 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id)
4879 args = { 'peer': path, 'wps_method': 'display', 'pin': '12345670',
4880 'go_intent': 15 }
4881 p2p.Connect(args)
4882
4883 def goNegotiationSuccess(self, properties):
4884 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4885
4886 def groupStarted(self, properties):
4887 logger.debug("groupStarted: " + str(properties))
4888 raise Exception("Unexpected GroupStarted")
4889
4890 def wpsFailed(self, name, args):
4891 logger.debug("wpsFailed - name=%s args=%s" % (name, str(args)))
084780f1
JM
4892 self.wps_failed = True
4893 if self.formation_failure:
4894 self.loop.quit()
4895
4896 def groupFormationFailure(self, reason):
4897 logger.debug("groupFormationFailure - reason=%s" % reason)
4898 self.formation_failure = True
4899 if self.wps_failed:
4900 self.loop.quit()
2ec82e67
JM
4901
4902 def run_test(self, *args):
4903 logger.debug("run_test")
4904 p2p.Listen(10)
4905 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4906 if not dev1.discover_peer(addr0):
4907 raise Exception("Peer not found")
4908 dev1.global_request("P2P_CONNECT " + addr0 + " 87654321 enter")
4909 return False
4910
4911 def success(self):
084780f1 4912 return self.wps_failed and self.formation_failure
2ec82e67
JM
4913
4914 with TestDbusP2p(bus) as t:
4915 if not t.success():
4916 raise Exception("Expected signals not seen")
4917
4918def test_dbus_p2p_two_groups(dev, apdev):
4919 """D-Bus P2P with two concurrent groups"""
910eb5b5 4920 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
4921 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4922
4923 dev[0].request("SET p2p_no_group_iface 0")
4924 addr0 = dev[0].p2p_dev_addr()
4925 addr1 = dev[1].p2p_dev_addr()
4926 addr2 = dev[2].p2p_dev_addr()
4927 dev[1].p2p_start_go(freq=2412)
cb346b49 4928 dev1_group_ifname = dev[1].group_ifname
2ec82e67
JM
4929
4930 class TestDbusP2p(TestDbus):
4931 def __init__(self, bus):
4932 TestDbus.__init__(self, bus)
4933 self.done = False
4934 self.peer = None
4935 self.go = None
4936 self.group1 = None
4937 self.group2 = None
5a217649 4938 self.groups_removed = False
2ec82e67
JM
4939
4940 def __enter__(self):
4941 gobject.timeout_add(1, self.run_test)
4942 gobject.timeout_add(15000, self.timeout)
4943 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4944 "PropertiesChanged", byte_arrays=True)
4945 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4946 "DeviceFound")
4947 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4948 "GroupStarted")
4949 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4950 "GroupFinished")
4951 self.add_signal(self.peerJoined, WPAS_DBUS_GROUP,
4952 "PeerJoined")
4953 self.loop.run()
4954 return self
4955
4956 def propertiesChanged(self, interface_name, changed_properties,
4957 invalidated_properties):
4958 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4959
4960 def deviceFound(self, path):
4961 logger.debug("deviceFound: path=%s" % path)
4962 if addr2.replace(':','') in path:
4963 self.peer = path
4964 elif addr1.replace(':','') in path:
4965 self.go = path
4966 if self.go and not self.group1:
4967 logger.info("Join the group")
4968 p2p.StopFind()
4969 pin = '12345670'
4970 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49
JM
4971 dev1.group_ifname = dev1_group_ifname
4972 dev1.group_request("WPS_PIN any " + pin)
2ec82e67
JM
4973 args = { 'peer': self.go,
4974 'join': True,
4975 'wps_method': 'pin',
4976 'pin': pin,
4977 'frequency': 2412 }
4978 p2p.Connect(args)
4979
4980 def groupStarted(self, properties):
4981 logger.debug("groupStarted: " + str(properties))
4982 prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
4983 dbus_interface=dbus.PROPERTIES_IFACE)
4984 logger.debug("p2pdevice properties: " + str(prop))
4985
4986 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
4987 properties['group_object'])
4988 res = g_obj.GetAll(WPAS_DBUS_GROUP,
4989 dbus_interface=dbus.PROPERTIES_IFACE,
4990 byte_arrays=True)
4991 logger.debug("Group properties: " + str(res))
4992
4993 if not self.group1:
4994 self.group1 = properties['group_object']
4995 self.group1iface = properties['interface_object']
4996 self.g1_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4997 self.group1iface)
4998
4999 logger.info("Start autonomous GO")
5000 params = dbus.Dictionary({ 'frequency': 2412 })
5001 p2p.GroupAdd(params)
5002 elif not self.group2:
5003 self.group2 = properties['group_object']
5004 self.group2iface = properties['interface_object']
5005 self.g2_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
5006 self.group2iface)
5007 self.g2_bssid = res['BSSID']
5008
5009 if self.group1 and self.group2:
5010 logger.info("Authorize peer to join the group")
5011 a2 = binascii.unhexlify(addr2.replace(':',''))
5012 params = { 'Role': 'enrollee',
5013 'P2PDeviceAddress': dbus.ByteArray(a2),
5014 'Bssid': dbus.ByteArray(a2),
5015 'Type': 'pin',
5016 'Pin': '12345670' }
5017 g_wps = dbus.Interface(self.g2_if_obj, WPAS_DBUS_IFACE_WPS)
5018 g_wps.Start(params)
5019
5020 bssid = ':'.join([binascii.hexlify(l) for l in self.g2_bssid])
5021 dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
5022 dev2.scan_for_bss(bssid, freq=2412)
5023 dev2.global_request("P2P_CONNECT " + bssid + " 12345670 join freq=2412")
bc6e3288 5024 ev = dev2.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
cb346b49
JM
5025 if ev is None:
5026 raise Exception("Group join timed out")
5027 self.dev2_group_ev = ev
2ec82e67
JM
5028
5029 def groupFinished(self, properties):
5030 logger.debug("groupFinished: " + str(properties))
5031
5032 if self.group1 == properties['group_object']:
5033 self.group1 = None
5034 elif self.group2 == properties['group_object']:
5035 self.group2 = None
5036
5037 if not self.group1 and not self.group2:
5038 self.done = True
5039 self.loop.quit()
5040
5041 def peerJoined(self, peer):
5042 logger.debug("peerJoined: " + peer)
5a217649
JM
5043 if self.groups_removed:
5044 return
2ec82e67
JM
5045 self.check_results()
5046
5047 dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
cb346b49 5048 dev2.group_form_result(self.dev2_group_ev)
2ec82e67
JM
5049 dev2.remove_group()
5050
5051 logger.info("Disconnect group2")
5052 group_p2p = dbus.Interface(self.g2_if_obj,
5053 WPAS_DBUS_IFACE_P2PDEVICE)
5054 group_p2p.Disconnect()
5055
5056 logger.info("Disconnect group1")
5057 group_p2p = dbus.Interface(self.g1_if_obj,
5058 WPAS_DBUS_IFACE_P2PDEVICE)
5059 group_p2p.Disconnect()
5a217649 5060 self.groups_removed = True
2ec82e67
JM
5061
5062 def check_results(self):
5063 logger.info("Check results with two concurrent groups in operation")
5064
5065 g1_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group1)
5066 res1 = g1_obj.GetAll(WPAS_DBUS_GROUP,
5067 dbus_interface=dbus.PROPERTIES_IFACE,
5068 byte_arrays=True)
5069
5070 g2_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group2)
5071 res2 = g2_obj.GetAll(WPAS_DBUS_GROUP,
5072 dbus_interface=dbus.PROPERTIES_IFACE,
5073 byte_arrays=True)
5074
5075 logger.info("group1 = " + self.group1)
5076 logger.debug("Group properties: " + str(res1))
5077
5078 logger.info("group2 = " + self.group2)
5079 logger.debug("Group properties: " + str(res2))
5080
5081 prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
5082 dbus_interface=dbus.PROPERTIES_IFACE)
5083 logger.debug("p2pdevice properties: " + str(prop))
5084
5085 if res1['Role'] != 'client':
5086 raise Exception("Group1 role reported incorrectly: " + res1['Role'])
5087 if res2['Role'] != 'GO':
5088 raise Exception("Group2 role reported incorrectly: " + res2['Role'])
5089 if prop['Role'] != 'device':
5090 raise Exception("p2pdevice role reported incorrectly: " + prop['Role'])
5091
5092 if len(res2['Members']) != 1:
5093 raise Exception("Unexpected Members value for group 2")
5094
5095 def run_test(self, *args):
5096 logger.debug("run_test")
5097 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
5098 return False
5099
5100 def success(self):
5101 return self.done
5102
5103 with TestDbusP2p(bus) as t:
5104 if not t.success():
5105 raise Exception("Expected signals not seen")
5106
5107 dev[1].remove_group()
0e126c6d 5108
f572ae80
JM
5109def test_dbus_p2p_cancel(dev, apdev):
5110 """D-Bus P2P Cancel"""
5111 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5112 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
5113 try:
5114 p2p.Cancel()
5115 raise Exception("Unexpected p2p.Cancel() success")
bab493b9 5116 except dbus.exceptions.DBusException as e:
f572ae80
JM
5117 pass
5118
5119 addr0 = dev[0].p2p_dev_addr()
5120 dev[1].p2p_listen()
5121
5122 class TestDbusP2p(TestDbus):
5123 def __init__(self, bus):
5124 TestDbus.__init__(self, bus)
5125 self.done = False
5126
5127 def __enter__(self):
5128 gobject.timeout_add(1, self.run_test)
5129 gobject.timeout_add(15000, self.timeout)
5130 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
5131 "DeviceFound")
5132 self.loop.run()
5133 return self
5134
5135 def deviceFound(self, path):
5136 logger.debug("deviceFound: path=%s" % path)
5137 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
5138 'go_intent': 0 }
5139 p2p.Connect(args)
5140 p2p.Cancel()
5141 self.done = True
5142 self.loop.quit()
5143
5144 def run_test(self, *args):
5145 logger.debug("run_test")
5146 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
5147 return False
5148
5149 def success(self):
5150 return self.done
5151
5152 with TestDbusP2p(bus) as t:
5153 if not t.success():
5154 raise Exception("Expected signals not seen")
5155
afe28053
JM
5156def test_dbus_p2p_ip_addr(dev, apdev):
5157 """D-Bus P2P and IP address parameters"""
5158 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5159 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
5160
5161 vals = [ ("IpAddrGo", "192.168.43.1"),
5162 ("IpAddrMask", "255.255.255.0"),
5163 ("IpAddrStart", "192.168.43.100"),
5164 ("IpAddrEnd", "192.168.43.199") ]
5165 for field, value in vals:
5166 if_obj.Set(WPAS_DBUS_IFACE, field, value,
5167 dbus_interface=dbus.PROPERTIES_IFACE)
5168 val = if_obj.Get(WPAS_DBUS_IFACE, field,
5169 dbus_interface=dbus.PROPERTIES_IFACE)
5170 if val != value:
5171 raise Exception("Unexpected %s value: %s" % (field, val))
5172
5173 set_ip_addr_info(dev[1])
5174
5175 dev[0].global_request("SET p2p_go_intent 0")
5176
5177 req = dev[0].global_request("NFC_GET_HANDOVER_REQ NDEF P2P-CR").rstrip()
5178 if "FAIL" in req:
5179 raise Exception("Failed to generate NFC connection handover request")
5180 sel = dev[1].global_request("NFC_GET_HANDOVER_SEL NDEF P2P-CR").rstrip()
5181 if "FAIL" in sel:
5182 raise Exception("Failed to generate NFC connection handover select")
5183 dev[0].dump_monitor()
5184 dev[1].dump_monitor()
5185 res = dev[1].global_request("NFC_REPORT_HANDOVER RESP P2P " + req + " " + sel)
5186 if "FAIL" in res:
5187 raise Exception("Failed to report NFC connection handover to wpa_supplicant(resp)")
5188 res = dev[0].global_request("NFC_REPORT_HANDOVER INIT P2P " + req + " " + sel)
5189 if "FAIL" in res:
5190 raise Exception("Failed to report NFC connection handover to wpa_supplicant(init)")
5191
5192 class TestDbusP2p(TestDbus):
5193 def __init__(self, bus):
5194 TestDbus.__init__(self, bus)
5195 self.done = False
5196
5197 def __enter__(self):
5198 gobject.timeout_add(1, self.run_test)
5199 gobject.timeout_add(15000, self.timeout)
5200 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
5201 "GroupStarted")
5202 self.loop.run()
5203 return self
5204
5205 def groupStarted(self, properties):
5206 logger.debug("groupStarted: " + str(properties))
5207 self.loop.quit()
5208
5209 if 'IpAddrGo' not in properties:
5210 logger.info("IpAddrGo missing from GroupStarted")
5211 ip_addr_go = properties['IpAddrGo']
5212 addr = "%d.%d.%d.%d" % (ip_addr_go[0], ip_addr_go[1], ip_addr_go[2], ip_addr_go[3])
5213 if addr != "192.168.42.1":
5214 logger.info("Unexpected IpAddrGo value: " + addr)
5215 self.done = True
5216
5217 def run_test(self, *args):
5218 logger.debug("run_test")
5219 return False
5220
5221 def success(self):
5222 return self.done
5223
5224 with TestDbusP2p(bus) as t:
5225 if not t.success():
5226 raise Exception("Expected signals not seen")
5227
0e126c6d
JM
5228def test_dbus_introspect(dev, apdev):
5229 """D-Bus introspection"""
5230 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5231
5232 res = if_obj.Introspect(WPAS_DBUS_IFACE,
5233 dbus_interface=dbus.INTROSPECTABLE_IFACE)
5234 logger.info("Initial Introspect: " + str(res))
5235 if res is None or "Introspectable" not in res or "GroupStarted" not in res:
5236 raise Exception("Unexpected initial Introspect response: " + str(res))
f0ef6889
DW
5237 if "FastReauth" not in res or "PassiveScan" not in res:
5238 raise Exception("Unexpected initial Introspect response: " + str(res))
0e126c6d
JM
5239
5240 with alloc_fail(dev[0], 1, "wpa_dbus_introspect"):
5241 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
5242 dbus_interface=dbus.INTROSPECTABLE_IFACE)
5243 logger.info("Introspect: " + str(res2))
5244 if res2 is not None:
5245 raise Exception("Unexpected Introspect response")
5246
5247 with alloc_fail(dev[0], 1, "=add_interface;wpa_dbus_introspect"):
5248 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
5249 dbus_interface=dbus.INTROSPECTABLE_IFACE)
5250 logger.info("Introspect: " + str(res2))
5251 if res2 is None:
5252 raise Exception("No Introspect response")
5253 if len(res2) >= len(res):
5254 raise Exception("Unexpected Introspect response")
5255
5256 with alloc_fail(dev[0], 1, "wpabuf_alloc;add_interface;wpa_dbus_introspect"):
5257 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
5258 dbus_interface=dbus.INTROSPECTABLE_IFACE)
5259 logger.info("Introspect: " + str(res2))
5260 if res2 is None:
5261 raise Exception("No Introspect response")
5262 if len(res2) >= len(res):
5263 raise Exception("Unexpected Introspect response")
5264
5265 with alloc_fail(dev[0], 2, "=add_interface;wpa_dbus_introspect"):
5266 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
5267 dbus_interface=dbus.INTROSPECTABLE_IFACE)
5268 logger.info("Introspect: " + str(res2))
5269 if res2 is None:
5270 raise Exception("No Introspect response")
5271 if len(res2) >= len(res):
5272 raise Exception("Unexpected Introspect response")
9bbce257 5273
a21eadcf
JM
5274def run_busctl(service, obj):
5275 logger.info("busctl introspect %s %s" % (service, obj))
5276 cmd = subprocess.Popen([ 'busctl', 'introspect', service, obj ],
5277 stdout=subprocess.PIPE,
5278 stderr=subprocess.PIPE)
5279 out = cmd.communicate()
5280 cmd.wait()
5281 logger.info("busctl stdout:\n%s" % out[0].strip())
5282 if len(out[1]) > 0:
04fa9fc7
MH
5283 logger.info("busctl stderr: %s" % out[1].decode().strip())
5284 if "Duplicate property" in out[1].decode():
a21eadcf
JM
5285 raise Exception("Duplicate property")
5286
5287def test_dbus_introspect_busctl(dev, apdev):
5288 """D-Bus introspection with busctl"""
5289 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5290 ifaces = dbus_get(dbus, wpas_obj, "Interfaces")
5291 run_busctl(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH)
5292 run_busctl(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH + "/Interfaces")
5293 run_busctl(WPAS_DBUS_SERVICE, ifaces[0])
5294
5295 hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
5296 bssid = apdev[0]['bssid']
5297 dev[0].scan_for_bss(bssid, freq=2412)
5298 id = dev[0].add_network()
5299 dev[0].set_network(id, "disabled", "0")
5300 dev[0].set_network_quoted(id, "ssid", "test")
5301
5302 run_busctl(WPAS_DBUS_SERVICE, ifaces[0] + "/BSSs/0")
5303 run_busctl(WPAS_DBUS_SERVICE, ifaces[0] + "/Networks/0")
5304
9bbce257
JM
5305def test_dbus_ap(dev, apdev):
5306 """D-Bus AddNetwork for AP mode"""
5307 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5308 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5309
5310 ssid = "test-wpa2-psk"
5311 passphrase = 'qwertyuiop'
5312
5313 class TestDbusConnect(TestDbus):
5314 def __init__(self, bus):
5315 TestDbus.__init__(self, bus)
5316 self.started = False
345d8b5e
JM
5317 self.sta_added = False
5318 self.sta_removed = False
5319 self.authorized = False
5320 self.deauthorized = False
5321 self.stations = False
9bbce257
JM
5322
5323 def __enter__(self):
5324 gobject.timeout_add(1, self.run_connect)
5325 gobject.timeout_add(15000, self.timeout)
5326 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
5327 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
5328 "NetworkSelected")
5329 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
5330 "PropertiesChanged")
345d8b5e
JM
5331 self.add_signal(self.stationAdded, WPAS_DBUS_IFACE, "StationAdded")
5332 self.add_signal(self.stationRemoved, WPAS_DBUS_IFACE,
5333 "StationRemoved")
5334 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
5335 "StaAuthorized")
5336 self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE,
5337 "StaDeauthorized")
9bbce257
JM
5338 self.loop.run()
5339 return self
5340
5341 def networkAdded(self, network, properties):
5342 logger.debug("networkAdded: %s" % str(network))
5343 logger.debug(str(properties))
5344
5345 def networkSelected(self, network):
5346 logger.debug("networkSelected: %s" % str(network))
5347 self.network_selected = True
5348
5349 def propertiesChanged(self, properties):
5350 logger.debug("propertiesChanged: %s" % str(properties))
5351 if 'State' in properties and properties['State'] == "completed":
5352 self.started = True
345d8b5e
JM
5353 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
5354 dev1.connect(ssid, psk=passphrase, scan_freq="2412")
5355
5356 def stationAdded(self, station, properties):
5357 logger.debug("stationAdded: %s" % str(station))
5358 logger.debug(str(properties))
5359 self.sta_added = True
5360 res = if_obj.Get(WPAS_DBUS_IFACE, 'Stations',
5361 dbus_interface=dbus.PROPERTIES_IFACE)
5362 logger.info("Stations: " + str(res))
5363 if len(res) == 1:
5364 self.stations = True
5365 else:
5366 raise Exception("Missing Stations entry: " + str(res))
5367
5368 def stationRemoved(self, station):
5369 logger.debug("stationRemoved: %s" % str(station))
5370 self.sta_removed = True
5371 res = if_obj.Get(WPAS_DBUS_IFACE, 'Stations',
5372 dbus_interface=dbus.PROPERTIES_IFACE)
5373 logger.info("Stations: " + str(res))
5374 if len(res) != 0:
5375 self.stations = False
5376 raise Exception("Unexpected Stations entry: " + str(res))
5377 self.loop.quit()
5378
5379 def staAuthorized(self, name):
5380 logger.debug("staAuthorized: " + name)
5381 self.authorized = True
5382 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
5383 dev1.request("DISCONNECT")
5384
5385 def staDeauthorized(self, name):
5386 logger.debug("staDeauthorized: " + name)
5387 self.deauthorized = True
9bbce257
JM
5388
5389 def run_connect(self, *args):
5390 logger.debug("run_connect")
5391 args = dbus.Dictionary({ 'ssid': ssid,
5392 'key_mgmt': 'WPA-PSK',
5393 'psk': passphrase,
5394 'mode': 2,
345d8b5e
JM
5395 'frequency': 2412,
5396 'scan_freq': 2412 },
9bbce257
JM
5397 signature='sv')
5398 self.netw = iface.AddNetwork(args)
5399 iface.SelectNetwork(self.netw)
5400 return False
5401
5402 def success(self):
345d8b5e
JM
5403 return self.started and self.sta_added and self.sta_removed and \
5404 self.authorized and self.deauthorized
9bbce257
JM
5405
5406 with TestDbusConnect(bus) as t:
5407 if not t.success():
5408 raise Exception("Expected signals not seen")
d9e2a057
JM
5409
5410def test_dbus_connect_wpa_eap(dev, apdev):
5411 """D-Bus AddNetwork and connection with WPA+WPA2-Enterprise AP"""
5412 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5413 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5414
5415 ssid = "test-wpa-eap"
5416 params = hostapd.wpa_eap_params(ssid=ssid)
5417 params["wpa"] = "3"
5418 params["rsn_pairwise"] = "CCMP"
8b8a1864 5419 hapd = hostapd.add_ap(apdev[0], params)
d9e2a057
JM
5420
5421 class TestDbusConnect(TestDbus):
5422 def __init__(self, bus):
5423 TestDbus.__init__(self, bus)
5424 self.done = False
5425
5426 def __enter__(self):
5427 gobject.timeout_add(1, self.run_connect)
5428 gobject.timeout_add(15000, self.timeout)
5429 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
5430 "PropertiesChanged")
5431 self.add_signal(self.eap, WPAS_DBUS_IFACE, "EAP")
5432 self.loop.run()
5433 return self
5434
5435 def propertiesChanged(self, properties):
5436 logger.debug("propertiesChanged: %s" % str(properties))
5437 if 'State' in properties and properties['State'] == "completed":
5438 self.done = True
5439 self.loop.quit()
5440
5441 def eap(self, status, parameter):
5442 logger.debug("EAP: status=%s parameter=%s" % (status, parameter))
5443
5444 def run_connect(self, *args):
5445 logger.debug("run_connect")
5446 args = dbus.Dictionary({ 'ssid': ssid,
5447 'key_mgmt': 'WPA-EAP',
5448 'eap': 'PEAP',
5449 'identity': 'user',
5450 'password': 'password',
5451 'ca_cert': 'auth_serv/ca.pem',
5452 'phase2': 'auth=MSCHAPV2',
5453 'scan_freq': 2412 },
5454 signature='sv')
5455 self.netw = iface.AddNetwork(args)
5456 iface.SelectNetwork(self.netw)
5457 return False
5458
5459 def success(self):
5460 return self.done
5461
5462 with TestDbusConnect(bus) as t:
5463 if not t.success():
5464 raise Exception("Expected signals not seen")
708ec753
JM
5465
5466def test_dbus_ap_scan_2_ap_mode_scan(dev, apdev):
5467 """AP_SCAN 2 AP mode and D-Bus Scan()"""
5468 try:
5469 _test_dbus_ap_scan_2_ap_mode_scan(dev, apdev)
5470 finally:
5471 dev[0].request("AP_SCAN 1")
5472
5473def _test_dbus_ap_scan_2_ap_mode_scan(dev, apdev):
5474 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5475 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5476
5477 if "OK" not in dev[0].request("AP_SCAN 2"):
5478 raise Exception("Failed to set AP_SCAN 2")
5479
5480 id = dev[0].add_network()
5481 dev[0].set_network(id, "mode", "2")
5482 dev[0].set_network_quoted(id, "ssid", "wpas-ap-open")
5483 dev[0].set_network(id, "key_mgmt", "NONE")
5484 dev[0].set_network(id, "frequency", "2412")
5485 dev[0].set_network(id, "scan_freq", "2412")
5486 dev[0].set_network(id, "disabled", "0")
5487 dev[0].select_network(id)
5488 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=5)
5489 if ev is None:
5490 raise Exception("AP failed to start")
5491
5492 with fail_test(dev[0], 1, "wpa_driver_nl80211_scan"):
5493 iface.Scan({'Type': 'active',
5494 'AllowRoam': True,
5495 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
5496 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-FAILED",
5497 "AP-DISABLED"], timeout=5)
5498 if ev is None:
5499 raise Exception("CTRL-EVENT-SCAN-FAILED not seen")
5500 if "AP-DISABLED" in ev:
5501 raise Exception("Unexpected AP-DISABLED event")
5502 if "retry=1" in ev:
5503 # Wait for the retry to scan happen
5504 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-FAILED",
5505 "AP-DISABLED"], timeout=5)
5506 if ev is None:
5507 raise Exception("CTRL-EVENT-SCAN-FAILED not seen - retry")
5508 if "AP-DISABLED" in ev:
5509 raise Exception("Unexpected AP-DISABLED event - retry")
5510
5511 dev[1].connect("wpas-ap-open", key_mgmt="NONE", scan_freq="2412")
5512 dev[1].request("DISCONNECT")
5513 dev[1].wait_disconnected()
5514 dev[0].request("DISCONNECT")
5515 dev[0].wait_disconnected()
d679ab74
JM
5516
5517def test_dbus_expectdisconnect(dev, apdev):
5518 """D-Bus ExpectDisconnect"""
5519 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5520 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
5521
5522 params = { "ssid": "test-open" }
8b8a1864 5523 hapd = hostapd.add_ap(apdev[0], params)
d679ab74
JM
5524 dev[0].connect("test-open", key_mgmt="NONE", scan_freq="2412")
5525
5526 # This does not really verify the behavior other than by going through the
5527 # code path for additional coverage.
5528 wpas.ExpectDisconnect()
5529 dev[0].request("DISCONNECT")
5530 dev[0].wait_disconnected()
2299b543
JM
5531
5532def test_dbus_save_config(dev, apdev):
5533 """D-Bus SaveConfig"""
5534 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5535 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5536 try:
5537 iface.SaveConfig()
5538 raise Exception("SaveConfig() accepted unexpectedly")
bab493b9 5539 except dbus.exceptions.DBusException as e:
2299b543
JM
5540 if not str(e).startswith("fi.w1.wpa_supplicant1.UnknownError: Not allowed to update configuration"):
5541 raise Exception("Unexpected error message for SaveConfig(): " + str(e))
ce96e65c
JM
5542
5543def test_dbus_vendor_elem(dev, apdev):
5544 """D-Bus vendor element operations"""
5545 try:
5546 _test_dbus_vendor_elem(dev, apdev)
5547 finally:
5548 dev[0].request("VENDOR_ELEM_REMOVE 1 *")
5549
5550def _test_dbus_vendor_elem(dev, apdev):
5551 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5552 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5553
5554 dev[0].request("VENDOR_ELEM_REMOVE 1 *")
5555
5556 try:
5557 ie = dbus.ByteArray("\x00\x00")
5558 iface.VendorElemAdd(-1, ie)
5559 raise Exception("Invalid VendorElemAdd() accepted")
bab493b9 5560 except dbus.exceptions.DBusException as e:
ce96e65c
JM
5561 if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5562 raise Exception("Unexpected error message for invalid VendorElemAdd[1]: " + str(e))
5563
5564 try:
5565 ie = dbus.ByteArray("")
5566 iface.VendorElemAdd(1, ie)
5567 raise Exception("Invalid VendorElemAdd() accepted")
bab493b9 5568 except dbus.exceptions.DBusException as e:
ce96e65c
JM
5569 if "InvalidArgs" not in str(e) or "Invalid value" not in str(e):
5570 raise Exception("Unexpected error message for invalid VendorElemAdd[2]: " + str(e))
5571
5572 try:
5573 ie = dbus.ByteArray("\x00\x01")
5574 iface.VendorElemAdd(1, ie)
5575 raise Exception("Invalid VendorElemAdd() accepted")
bab493b9 5576 except dbus.exceptions.DBusException as e:
ce96e65c
JM
5577 if "InvalidArgs" not in str(e) or "Parse error" not in str(e):
5578 raise Exception("Unexpected error message for invalid VendorElemAdd[3]: " + str(e))
5579
5580 try:
5581 iface.VendorElemGet(-1)
5582 raise Exception("Invalid VendorElemGet() accepted")
bab493b9 5583 except dbus.exceptions.DBusException as e:
ce96e65c
JM
5584 if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5585 raise Exception("Unexpected error message for invalid VendorElemGet[1]: " + str(e))
5586
5587 try:
5588 iface.VendorElemGet(1)
5589 raise Exception("Invalid VendorElemGet() accepted")
bab493b9 5590 except dbus.exceptions.DBusException as e:
ce96e65c
JM
5591 if "InvalidArgs" not in str(e) or "ID value does not exist" not in str(e):
5592 raise Exception("Unexpected error message for invalid VendorElemGet[2]: " + str(e))
5593
5594 try:
5595 ie = dbus.ByteArray("\x00\x00")
5596 iface.VendorElemRem(-1, ie)
5597 raise Exception("Invalid VendorElemRemove() accepted")
bab493b9 5598 except dbus.exceptions.DBusException as e:
ce96e65c
JM
5599 if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5600 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5601
5602 try:
5603 ie = dbus.ByteArray("")
5604 iface.VendorElemRem(1, ie)
5605 raise Exception("Invalid VendorElemRemove() accepted")
bab493b9 5606 except dbus.exceptions.DBusException as e:
ce96e65c
JM
5607 if "InvalidArgs" not in str(e) or "Invalid value" not in str(e):
5608 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5609
5610 iface.VendorElemRem(1, "*")
5611
5612 ie = dbus.ByteArray("\x00\x01\x00")
5613 iface.VendorElemAdd(1, ie)
5614
5615 val = iface.VendorElemGet(1)
5616 if len(val) != len(ie):
5617 raise Exception("Unexpected VendorElemGet length")
5618 for i in range(len(val)):
5619 if val[i] != dbus.Byte(ie[i]):
5620 raise Exception("Unexpected VendorElemGet data")
5621
5622 ie2 = dbus.ByteArray("\xe0\x00")
5623 iface.VendorElemAdd(1, ie2)
5624
5625 ies = ie + ie2
5626 val = iface.VendorElemGet(1)
5627 if len(val) != len(ies):
5628 raise Exception("Unexpected VendorElemGet length[2]")
5629 for i in range(len(val)):
5630 if val[i] != dbus.Byte(ies[i]):
5631 raise Exception("Unexpected VendorElemGet data[2]")
5632
5633 try:
5634 test_ie = dbus.ByteArray("\x01\x01")
5635 iface.VendorElemRem(1, test_ie)
5636 raise Exception("Invalid VendorElemRemove() accepted")
bab493b9 5637 except dbus.exceptions.DBusException as e:
ce96e65c
JM
5638 if "InvalidArgs" not in str(e) or "Parse error" not in str(e):
5639 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5640
5641 iface.VendorElemRem(1, ie)
5642 val = iface.VendorElemGet(1)
5643 if len(val) != len(ie2):
5644 raise Exception("Unexpected VendorElemGet length[3]")
5645
5646 iface.VendorElemRem(1, "*")
5647 try:
5648 iface.VendorElemGet(1)
5649 raise Exception("Invalid VendorElemGet() accepted after removal")
bab493b9 5650 except dbus.exceptions.DBusException as e:
ce96e65c
JM
5651 if "InvalidArgs" not in str(e) or "ID value does not exist" not in str(e):
5652 raise Exception("Unexpected error message for invalid VendorElemGet after removal: " + str(e))
dbd183c7
JM
5653
5654def test_dbus_assoc_reject(dev, apdev):
5655 """D-Bus AssocStatusCode"""
5656 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5657 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5658
5659 ssid = "test-open"
5660 params = { "ssid": ssid,
5661 "max_listen_interval": "1" }
8b8a1864 5662 hapd = hostapd.add_ap(apdev[0], params)
dbd183c7
JM
5663
5664 class TestDbusConnect(TestDbus):
5665 def __init__(self, bus):
5666 TestDbus.__init__(self, bus)
5667 self.assoc_status_seen = False
5668 self.state = 0
5669
5670 def __enter__(self):
5671 gobject.timeout_add(1, self.run_connect)
5672 gobject.timeout_add(15000, self.timeout)
5673 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
5674 "PropertiesChanged")
5675 self.loop.run()
5676 return self
5677
5678 def propertiesChanged(self, properties):
5679 logger.debug("propertiesChanged: %s" % str(properties))
5680 if 'AssocStatusCode' in properties:
5681 status = properties['AssocStatusCode']
5682 if status != 51:
5683 logger.info("Unexpected status code: " + str(status))
5684 else:
5685 self.assoc_status_seen = True
5686 iface.Disconnect()
5687 self.loop.quit()
5688
5689 def run_connect(self, *args):
5690 args = dbus.Dictionary({ 'ssid': ssid,
5691 'key_mgmt': 'NONE',
5692 'scan_freq': 2412 },
5693 signature='sv')
5694 self.netw = iface.AddNetwork(args)
5695 iface.SelectNetwork(self.netw)
5696 return False
5697
5698 def success(self):
5699 return self.assoc_status_seen
5700
5701 with TestDbusConnect(bus) as t:
5702 if not t.success():
5703 raise Exception("Expected signals not seen")
504c7ffd
JM
5704
5705def test_dbus_mesh(dev, apdev):
5706 """D-Bus mesh"""
5707 check_mesh_support(dev[0])
5708 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5709 mesh = dbus.Interface(if_obj, WPAS_DBUS_IFACE_MESH)
5710
5711 add_open_mesh_network(dev[1])
5712 addr1 = dev[1].own_addr()
5713
5714 class TestDbusMesh(TestDbus):
5715 def __init__(self, bus):
5716 TestDbus.__init__(self, bus)
5717 self.done = False
5718
5719 def __enter__(self):
5720 gobject.timeout_add(1, self.run_test)
5721 gobject.timeout_add(15000, self.timeout)
5722 self.add_signal(self.meshGroupStarted, WPAS_DBUS_IFACE_MESH,
5723 "MeshGroupStarted")
5724 self.add_signal(self.meshGroupRemoved, WPAS_DBUS_IFACE_MESH,
5725 "MeshGroupRemoved")
5726 self.add_signal(self.meshPeerConnected, WPAS_DBUS_IFACE_MESH,
5727 "MeshPeerConnected")
5728 self.add_signal(self.meshPeerDisconnected, WPAS_DBUS_IFACE_MESH,
5729 "MeshPeerDisconnected")
5730 self.loop.run()
5731 return self
5732
5733 def meshGroupStarted(self, args):
5734 logger.debug("MeshGroupStarted: " + str(args))
5735
5736 def meshGroupRemoved(self, args):
5737 logger.debug("MeshGroupRemoved: " + str(args))
5738 self.done = True
5739 self.loop.quit()
5740
5741 def meshPeerConnected(self, args):
5742 logger.debug("MeshPeerConnected: " + str(args))
5743
5744 res = if_obj.Get(WPAS_DBUS_IFACE_MESH, 'MeshPeers',
5745 dbus_interface=dbus.PROPERTIES_IFACE,
5746 byte_arrays=True)
5747 logger.debug("MeshPeers: " + str(res))
5748 if len(res) != 1:
5749 raise Exception("Unexpected number of MeshPeer values")
7ab74770 5750 if binascii.hexlify(res[0]).decode() != addr1.replace(':', ''):
504c7ffd
JM
5751 raise Exception("Unexpected peer address")
5752
5753 res = if_obj.Get(WPAS_DBUS_IFACE_MESH, 'MeshGroup',
5754 dbus_interface=dbus.PROPERTIES_IFACE,
5755 byte_arrays=True)
5756 logger.debug("MeshGroup: " + str(res))
5757 if res != "wpas-mesh-open":
5758 raise Exception("Unexpected MeshGroup")
5759 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
5760 dev1.mesh_group_remove()
5761
5762 def meshPeerDisconnected(self, args):
5763 logger.debug("MeshPeerDisconnected: " + str(args))
5764 dev0 = WpaSupplicant('wlan0', '/tmp/wpas-wlan0')
5765 dev0.mesh_group_remove()
5766
5767 def run_test(self, *args):
5768 logger.debug("run_test")
5769 dev0 = WpaSupplicant('wlan0', '/tmp/wpas-wlan0')
5770 add_open_mesh_network(dev0)
5771 return False
5772
5773 def success(self):
5774 return self.done
5775
5776 with TestDbusMesh(bus) as t:
5777 if not t.success():
5778 raise Exception("Expected signals not seen")