]> git.ipfire.org Git - thirdparty/hostap.git/blame - tests/hwsim/test_dbus.py
tests: Do not fail if driver supports power saving
[thirdparty/hostap.git] / tests / hwsim / test_dbus.py
CommitLineData
2ec82e67 1# wpa_supplicant D-Bus interface tests
910eb5b5 2# Copyright (c) 2014-2015, Jouni Malinen <j@w1.fi>
2ec82e67
JM
3#
4# This software may be distributed under the terms of the BSD license.
5# See README for more details.
6
7import binascii
2ec82e67
JM
8import logging
9logger = logging.getLogger()
10import subprocess
11import time
12
910eb5b5 13try:
b4de353c 14 import gobject
910eb5b5
JM
15 import dbus
16 dbus_imported = True
17except ImportError:
18 dbus_imported = False
19
2ec82e67
JM
20import hostapd
21from wpasupplicant import WpaSupplicant
708ec753 22from utils import HwsimSkip, alloc_fail, fail_test
1992af11 23from p2p_utils import *
2ec82e67 24from test_ap_tdls import connect_2sta_open
089e7ca3 25from test_ap_eap import check_altsubject_match_support
afe28053 26from test_nfc_p2p import set_ip_addr_info
504c7ffd 27from test_wpas_mesh import check_mesh_support, add_open_mesh_network
2ec82e67
JM
28
29WPAS_DBUS_SERVICE = "fi.w1.wpa_supplicant1"
30WPAS_DBUS_PATH = "/fi/w1/wpa_supplicant1"
31WPAS_DBUS_IFACE = "fi.w1.wpa_supplicant1.Interface"
32WPAS_DBUS_IFACE_WPS = WPAS_DBUS_IFACE + ".WPS"
33WPAS_DBUS_NETWORK = "fi.w1.wpa_supplicant1.Network"
34WPAS_DBUS_BSS = "fi.w1.wpa_supplicant1.BSS"
35WPAS_DBUS_IFACE_P2PDEVICE = WPAS_DBUS_IFACE + ".P2PDevice"
36WPAS_DBUS_P2P_PEER = "fi.w1.wpa_supplicant1.Peer"
37WPAS_DBUS_GROUP = "fi.w1.wpa_supplicant1.Group"
38WPAS_DBUS_PERSISTENT_GROUP = "fi.w1.wpa_supplicant1.PersistentGroup"
504c7ffd 39WPAS_DBUS_IFACE_MESH = WPAS_DBUS_IFACE + ".Mesh"
2ec82e67
JM
40
41def prepare_dbus(dev):
910eb5b5
JM
42 if not dbus_imported:
43 logger.info("No dbus module available")
51c5aeb4 44 raise HwsimSkip("No dbus module available")
2ec82e67 45 try:
2ec82e67
JM
46 from dbus.mainloop.glib import DBusGMainLoop
47 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
48 bus = dbus.SystemBus()
49 wpas_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH)
50 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
51 path = wpas.GetInterface(dev.ifname)
52 if_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
910eb5b5 53 return (bus,wpas_obj,path,if_obj)
2ec82e67 54 except Exception, e:
51c5aeb4 55 raise HwsimSkip("Could not connect to D-Bus: %s" % e)
2ec82e67
JM
56
57class TestDbus(object):
58 def __init__(self, bus):
59 self.loop = gobject.MainLoop()
60 self.signals = []
61 self.bus = bus
62
63 def __exit__(self, type, value, traceback):
64 for s in self.signals:
65 s.remove()
66
67 def add_signal(self, handler, interface, name, byte_arrays=False):
68 s = self.bus.add_signal_receiver(handler, dbus_interface=interface,
69 signal_name=name,
70 byte_arrays=byte_arrays)
71 self.signals.append(s)
72
73 def timeout(self, *args):
74 logger.debug("timeout")
75 self.loop.quit()
76 return False
77
0e126c6d
JM
78class alloc_fail_dbus(object):
79 def __init__(self, dev, count, funcs, operation="Operation",
80 expected="NoMemory"):
81 self._dev = dev
82 self._count = count
83 self._funcs = funcs
84 self._operation = operation
85 self._expected = expected
86 def __enter__(self):
87 cmd = "TEST_ALLOC_FAIL %d:%s" % (self._count, self._funcs)
88 if "OK" not in self._dev.request(cmd):
89 raise HwsimSkip("TEST_ALLOC_FAIL not supported")
90 def __exit__(self, type, value, traceback):
91 if type is None:
92 raise Exception("%s succeeded during out-of-memory" % self._operation)
93 if type == dbus.exceptions.DBusException and self._expected in str(value):
94 return True
95 if self._dev.request("GET_ALLOC_FAIL") != "0:%s" % self._funcs:
96 raise Exception("%s did not trigger allocation failure" % self._operation)
97 return False
98
3a59cda1
JM
99def start_ap(ap, ssid="test-wps",
100 ap_uuid="27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"):
2ec82e67
JM
101 params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
102 "wpa_passphrase": "12345678", "wpa": "2",
103 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
3a59cda1 104 "ap_pin": "12345670", "uuid": ap_uuid}
afc26df2 105 return hostapd.add_ap(ap, params)
2ec82e67
JM
106
107def test_dbus_getall(dev, apdev):
108 """D-Bus GetAll"""
910eb5b5 109 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
110
111 props = wpas_obj.GetAll(WPAS_DBUS_SERVICE,
112 dbus_interface=dbus.PROPERTIES_IFACE)
113 logger.debug("GetAll(fi.w1.wpa.supplicant1, /fi/w1/wpa_supplicant1) ==> " + str(props))
114
115 props = if_obj.GetAll(WPAS_DBUS_IFACE,
116 dbus_interface=dbus.PROPERTIES_IFACE)
117 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE, path, str(props)))
118
119 props = if_obj.GetAll(WPAS_DBUS_IFACE_WPS,
120 dbus_interface=dbus.PROPERTIES_IFACE)
121 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE_WPS, path, str(props)))
122
123 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
124 dbus_interface=dbus.PROPERTIES_IFACE)
125 if len(res) != 0:
126 raise Exception("Unexpected BSSs entry: " + str(res))
127
128 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
129 dbus_interface=dbus.PROPERTIES_IFACE)
130 if len(res) != 0:
131 raise Exception("Unexpected Networks entry: " + str(res))
132
8b8a1864 133 hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
2ec82e67
JM
134 bssid = apdev[0]['bssid']
135 dev[0].scan_for_bss(bssid, freq=2412)
136 id = dev[0].add_network()
137 dev[0].set_network(id, "disabled", "0")
138 dev[0].set_network_quoted(id, "ssid", "test")
139
140 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
141 dbus_interface=dbus.PROPERTIES_IFACE)
142 if len(res) != 1:
143 raise Exception("Missing BSSs entry: " + str(res))
144 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
145 props = bss_obj.GetAll(WPAS_DBUS_BSS, dbus_interface=dbus.PROPERTIES_IFACE)
146 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS, res[0], str(props)))
147 bssid_str = ''
148 for item in props['BSSID']:
149 if len(bssid_str) > 0:
150 bssid_str += ':'
151 bssid_str += '%02x' % item
152 if bssid_str != bssid:
153 raise Exception("Unexpected BSSID in BSSs entry")
154
155 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
156 dbus_interface=dbus.PROPERTIES_IFACE)
157 if len(res) != 1:
158 raise Exception("Missing Networks entry: " + str(res))
159 net_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
160 props = net_obj.GetAll(WPAS_DBUS_NETWORK,
161 dbus_interface=dbus.PROPERTIES_IFACE)
162 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_NETWORK, res[0], str(props)))
163 ssid = props['Properties']['ssid']
164 if ssid != '"test"':
165 raise Exception("Unexpected SSID in network entry")
166
73ece491
JM
167def test_dbus_getall_oom(dev, apdev):
168 """D-Bus GetAll wpa_config_get_all() OOM"""
169 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
170
171 id = dev[0].add_network()
172 dev[0].set_network(id, "disabled", "0")
173 dev[0].set_network_quoted(id, "ssid", "test")
174
175 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
176 dbus_interface=dbus.PROPERTIES_IFACE)
177 if len(res) != 1:
178 raise Exception("Missing Networks entry: " + str(res))
179 net_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
180 for i in range(1, 50):
181 with alloc_fail(dev[0], i, "wpa_config_get_all"):
182 try:
183 props = net_obj.GetAll(WPAS_DBUS_NETWORK,
184 dbus_interface=dbus.PROPERTIES_IFACE)
185 except dbus.exceptions.DBusException, e:
186 pass
187
2ec82e67
JM
188def dbus_get(dbus, wpas_obj, prop, expect=None, byte_arrays=False):
189 val = wpas_obj.Get(WPAS_DBUS_SERVICE, prop,
190 dbus_interface=dbus.PROPERTIES_IFACE,
191 byte_arrays=byte_arrays)
192 if expect is not None and val != expect:
193 raise Exception("Unexpected %s: %s (expected: %s)" %
194 (prop, str(val), str(expect)))
195 return val
196
197def dbus_set(dbus, wpas_obj, prop, val):
198 wpas_obj.Set(WPAS_DBUS_SERVICE, prop, val,
199 dbus_interface=dbus.PROPERTIES_IFACE)
200
201def test_dbus_properties(dev, apdev):
202 """D-Bus Get/Set fi.w1.wpa_supplicant1 properties"""
910eb5b5 203 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
204
205 dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump")
206 dbus_set(dbus, wpas_obj, "DebugLevel", "debug")
207 dbus_get(dbus, wpas_obj, "DebugLevel", expect="debug")
208 for (val,err) in [ (3, "Error.Failed: wrong property type"),
209 ("foo", "Error.Failed: wrong debug level value") ]:
210 try:
211 dbus_set(dbus, wpas_obj, "DebugLevel", val)
212 raise Exception("Invalid DebugLevel value accepted: " + str(val))
213 except dbus.exceptions.DBusException, e:
214 if err not in str(e):
215 raise Exception("Unexpected error message: " + str(e))
216 dbus_set(dbus, wpas_obj, "DebugLevel", "msgdump")
217 dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump")
218
219 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True)
220 dbus_set(dbus, wpas_obj, "DebugTimestamp", False)
221 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=False)
222 try:
223 dbus_set(dbus, wpas_obj, "DebugTimestamp", "foo")
224 raise Exception("Invalid DebugTimestamp value accepted")
225 except dbus.exceptions.DBusException, e:
226 if "Error.Failed: wrong property type" not in str(e):
227 raise Exception("Unexpected error message: " + str(e))
228 dbus_set(dbus, wpas_obj, "DebugTimestamp", True)
229 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True)
230
231 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True)
232 dbus_set(dbus, wpas_obj, "DebugShowKeys", False)
233 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=False)
234 try:
235 dbus_set(dbus, wpas_obj, "DebugShowKeys", "foo")
236 raise Exception("Invalid DebugShowKeys value accepted")
237 except dbus.exceptions.DBusException, e:
238 if "Error.Failed: wrong property type" not in str(e):
239 raise Exception("Unexpected error message: " + str(e))
240 dbus_set(dbus, wpas_obj, "DebugShowKeys", True)
241 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True)
242
243 res = dbus_get(dbus, wpas_obj, "Interfaces")
244 if len(res) != 1:
245 raise Exception("Unexpected Interfaces value: " + str(res))
246
247 res = dbus_get(dbus, wpas_obj, "EapMethods")
248 if len(res) < 5 or "TTLS" not in res:
249 raise Exception("Unexpected EapMethods value: " + str(res))
250
251 res = dbus_get(dbus, wpas_obj, "Capabilities")
252 if len(res) < 2 or "p2p" not in res:
253 raise Exception("Unexpected Capabilities value: " + str(res))
254
255 dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
256 val = binascii.unhexlify("010006020304050608")
257 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val))
258 res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
259 if val != res:
260 raise Exception("WFDIEs value changed")
261 try:
262 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray('\x00'))
263 raise Exception("Invalid WFDIEs value accepted")
264 except dbus.exceptions.DBusException, e:
265 if "InvalidArgs" not in str(e):
266 raise Exception("Unexpected error message: " + str(e))
267 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(''))
268 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val))
269 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(''))
270 res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
271 if len(res) != 0:
272 raise Exception("WFDIEs not cleared properly")
273
274 res = dbus_get(dbus, wpas_obj, "EapMethods")
275 try:
276 dbus_set(dbus, wpas_obj, "EapMethods", res)
277 raise Exception("Invalid Set accepted")
278 except dbus.exceptions.DBusException, e:
279 if "InvalidArgs: Property is read-only" not in str(e):
280 raise Exception("Unexpected error message: " + str(e))
281
282 try:
283 wpas_obj.SetFoo(WPAS_DBUS_SERVICE, "DebugShowKeys", True,
284 dbus_interface=dbus.PROPERTIES_IFACE)
285 raise Exception("Unknown method accepted")
286 except dbus.exceptions.DBusException, e:
287 if "UnknownMethod" not in str(e):
288 raise Exception("Unexpected error message: " + str(e))
289
290 try:
291 wpas_obj.Get("foo", "DebugShowKeys",
292 dbus_interface=dbus.PROPERTIES_IFACE)
293 raise Exception("Invalid Get accepted")
294 except dbus.exceptions.DBusException, e:
295 if "InvalidArgs: No such property" not in str(e):
296 raise Exception("Unexpected error message: " + str(e))
297
298 test_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH,
299 introspect=False)
300 try:
301 test_obj.Get(123, "DebugShowKeys",
302 dbus_interface=dbus.PROPERTIES_IFACE)
303 raise Exception("Invalid Get accepted")
304 except dbus.exceptions.DBusException, e:
305 if "InvalidArgs: Invalid arguments" not in str(e):
306 raise Exception("Unexpected error message: " + str(e))
307 try:
308 test_obj.Get(WPAS_DBUS_SERVICE, 123,
309 dbus_interface=dbus.PROPERTIES_IFACE)
310 raise Exception("Invalid Get accepted")
311 except dbus.exceptions.DBusException, e:
312 if "InvalidArgs: Invalid arguments" not in str(e):
313 raise Exception("Unexpected error message: " + str(e))
314
315 try:
316 wpas_obj.Set(WPAS_DBUS_SERVICE, "WFDIEs",
317 dbus.ByteArray('', variant_level=2),
318 dbus_interface=dbus.PROPERTIES_IFACE)
319 raise Exception("Invalid Set accepted")
320 except dbus.exceptions.DBusException, e:
321 if "InvalidArgs: invalid message format" not in str(e):
322 raise Exception("Unexpected error message: " + str(e))
323
f0ef6889
DW
324def test_dbus_set_global_properties(dev, apdev):
325 """D-Bus Get/Set fi.w1.wpa_supplicant1 interface global properties"""
326 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
327
e51e49fc 328 dev[0].set("model_name", "")
f0ef6889
DW
329 props = [ ('Okc', '0', '1'), ('ModelName', '', 'blahblahblah') ]
330
331 for p in props:
332 res = if_obj.Get(WPAS_DBUS_IFACE, p[0],
333 dbus_interface=dbus.PROPERTIES_IFACE)
334 if res != p[1]:
335 raise Exception("Unexpected " + p[0] + " value: " + str(res))
336
337 if_obj.Set(WPAS_DBUS_IFACE, p[0], p[2],
338 dbus_interface=dbus.PROPERTIES_IFACE)
339
340 res = if_obj.Get(WPAS_DBUS_IFACE, p[0],
341 dbus_interface=dbus.PROPERTIES_IFACE)
342 if res != p[2]:
343 raise Exception("Unexpected " + p[0] + " value after set: " + str(res))
e51e49fc 344 dev[0].set("model_name", "")
f0ef6889 345
2ec82e67
JM
346def test_dbus_invalid_method(dev, apdev):
347 """D-Bus invalid method"""
910eb5b5 348 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
349 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
350
351 try:
352 wps.Foo()
353 raise Exception("Unknown method accepted")
354 except dbus.exceptions.DBusException, e:
355 if "UnknownMethod" not in str(e):
356 raise Exception("Unexpected error message: " + str(e))
357
358 test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False)
359 test_wps = dbus.Interface(test_obj, WPAS_DBUS_IFACE_WPS)
360 try:
361 test_wps.Start(123)
362 raise Exception("WPS.Start with incorrect signature accepted")
363 except dbus.exceptions.DBusException, e:
364 if "InvalidArgs: Invalid arg" not in str(e):
365 raise Exception("Unexpected error message: " + str(e))
366
367def test_dbus_get_set_wps(dev, apdev):
368 """D-Bus Get/Set for WPS properties"""
369 try:
81e787b7 370 _test_dbus_get_set_wps(dev, apdev)
2ec82e67
JM
371 finally:
372 dev[0].request("SET wps_cred_processing 0")
364e28c9 373 dev[0].request("SET config_methods display keypad virtual_display nfc_interface p2ps")
4e6bd667
JM
374 dev[0].set("device_name", "Device A")
375 dev[0].set("manufacturer", "")
376 dev[0].set("model_name", "")
377 dev[0].set("model_number", "")
378 dev[0].set("serial_number", "")
379 dev[0].set("device_type", "0-00000000-0")
2ec82e67
JM
380
381def _test_dbus_get_set_wps(dev, apdev):
910eb5b5 382 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
383
384 if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
385 dbus_interface=dbus.PROPERTIES_IFACE)
386
387 val = "display keypad virtual_display nfc_interface"
388 dev[0].request("SET config_methods " + val)
389
390 config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
391 dbus_interface=dbus.PROPERTIES_IFACE)
392 if config != val:
393 raise Exception("Unexpected Get(ConfigMethods) result: " + config)
394
395 val2 = "push_button display"
396 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2,
397 dbus_interface=dbus.PROPERTIES_IFACE)
398 config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
399 dbus_interface=dbus.PROPERTIES_IFACE)
400 if config != val2:
401 raise Exception("Unexpected Get(ConfigMethods) result after Set: " + config)
402
403 dev[0].request("SET config_methods " + val)
404
405 for i in range(3):
406 dev[0].request("SET wps_cred_processing " + str(i))
407 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
408 dbus_interface=dbus.PROPERTIES_IFACE)
409 expected_val = False if i == 1 else True
410 if val != expected_val:
411 raise Exception("Unexpected Get(ProcessCredentials) result({}): {}".format(i, val))
412
4e6bd667
JM
413 tests = [ ("device_name", "DeviceName"),
414 ("manufacturer", "Manufacturer"),
415 ("model_name", "ModelName"),
416 ("model_number", "ModelNumber"),
417 ("serial_number", "SerialNumber") ]
418
419 for f1,f2 in tests:
420 val2 = "test-value-test"
421 dev[0].set(f1, val2)
422 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, f2,
423 dbus_interface=dbus.PROPERTIES_IFACE)
424 if val != val2:
425 raise Exception("Get(%s) returned unexpected value" % f2)
426 val2 = "TEST-value"
427 if_obj.Set(WPAS_DBUS_IFACE_WPS, f2, val2,
428 dbus_interface=dbus.PROPERTIES_IFACE)
429 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, f2,
430 dbus_interface=dbus.PROPERTIES_IFACE)
431 if val != val2:
432 raise Exception("Get(%s) returned unexpected value after Set" % f2)
433
434 dev[0].set("device_type", "5-0050F204-1")
435 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "DeviceType",
436 dbus_interface=dbus.PROPERTIES_IFACE)
437 if val[0] != 0x00 or val[1] != 0x05 != val[2] != 0x00 or val[3] != 0x50 or val[4] != 0xf2 or val[5] != 0x04 or val[6] != 0x00 or val[7] != 0x01:
438 raise Exception("DeviceType mismatch")
439 if_obj.Set(WPAS_DBUS_IFACE_WPS, "DeviceType", val,
440 dbus_interface=dbus.PROPERTIES_IFACE)
441 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "DeviceType",
442 dbus_interface=dbus.PROPERTIES_IFACE)
443 if val[0] != 0x00 or val[1] != 0x05 != val[2] != 0x00 or val[3] != 0x50 or val[4] != 0xf2 or val[5] != 0x04 or val[6] != 0x00 or val[7] != 0x01:
444 raise Exception("DeviceType mismatch after Set")
445
446 val2 = '\x01\x02\x03\x04\x05\x06\x07\x08'
447 if_obj.Set(WPAS_DBUS_IFACE_WPS, "DeviceType", dbus.ByteArray(val2),
448 dbus_interface=dbus.PROPERTIES_IFACE)
449 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "DeviceType",
450 dbus_interface=dbus.PROPERTIES_IFACE,
451 byte_arrays=True)
452 if val != val2:
453 raise Exception("DeviceType mismatch after Set (2)")
454
2ec82e67
JM
455 class TestDbusGetSet(TestDbus):
456 def __init__(self, bus):
457 TestDbus.__init__(self, bus)
458 self.signal_received = False
459 self.signal_received_deprecated = False
460 self.sets_done = False
461
462 def __enter__(self):
463 gobject.timeout_add(1, self.run_sets)
464 gobject.timeout_add(1000, self.timeout)
465 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE_WPS,
466 "PropertiesChanged")
467 self.add_signal(self.propertiesChanged2, dbus.PROPERTIES_IFACE,
468 "PropertiesChanged")
469 self.loop.run()
470 return self
471
472 def propertiesChanged(self, properties):
473 logger.debug("PropertiesChanged: " + str(properties))
474 if properties.has_key("ProcessCredentials"):
475 self.signal_received_deprecated = True
476 if self.sets_done and self.signal_received:
477 self.loop.quit()
478
479 def propertiesChanged2(self, interface_name, changed_properties,
480 invalidated_properties):
481 logger.debug("propertiesChanged2: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
482 if interface_name != WPAS_DBUS_IFACE_WPS:
483 return
484 if changed_properties.has_key("ProcessCredentials"):
485 self.signal_received = True
486 if self.sets_done and self.signal_received_deprecated:
487 self.loop.quit()
488
489 def run_sets(self, *args):
490 logger.debug("run_sets")
491 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
492 dbus.Boolean(1),
493 dbus_interface=dbus.PROPERTIES_IFACE)
494 if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
495 dbus_interface=dbus.PROPERTIES_IFACE) != True:
bc6e3288 496 raise Exception("Unexpected Get(ProcessCredentials) result after Set")
2ec82e67
JM
497 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
498 dbus.Boolean(0),
499 dbus_interface=dbus.PROPERTIES_IFACE)
500 if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
501 dbus_interface=dbus.PROPERTIES_IFACE) != False:
bc6e3288 502 raise Exception("Unexpected Get(ProcessCredentials) result after Set")
2ec82e67
JM
503
504 self.dbus_sets_done = True
505 return False
506
507 def success(self):
508 return self.signal_received and self.signal_received_deprecated
509
510 with TestDbusGetSet(bus) as t:
511 if not t.success():
512 raise Exception("No signal received for ProcessCredentials change")
513
514def test_dbus_wps_invalid(dev, apdev):
515 """D-Bus invaldi WPS operation"""
910eb5b5 516 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
517 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
518
519 failures = [ {'Role': 'foo', 'Type': 'pbc'},
520 {'Role': 123, 'Type': 'pbc'},
521 {'Type': 'pbc'},
522 {'Role': 'enrollee'},
523 {'Role': 'registrar'},
524 {'Role': 'enrollee', 'Type': 123},
525 {'Role': 'enrollee', 'Type': 'foo'},
526 {'Role': 'enrollee', 'Type': 'pbc',
527 'Bssid': '02:33:44:55:66:77'},
528 {'Role': 'enrollee', 'Type': 'pin', 'Pin': 123},
529 {'Role': 'enrollee', 'Type': 'pbc',
530 'Bssid': dbus.ByteArray('12345')},
531 {'Role': 'enrollee', 'Type': 'pbc',
532 'P2PDeviceAddress': 12345},
533 {'Role': 'enrollee', 'Type': 'pbc',
534 'P2PDeviceAddress': dbus.ByteArray('12345')},
535 {'Role': 'enrollee', 'Type': 'pbc', 'Foo': 'bar'} ]
536 for args in failures:
537 try:
538 wps.Start(args)
539 raise Exception("Invalid WPS.Start() arguments accepted: " + str(args))
540 except dbus.exceptions.DBusException, e:
541 if not str(e).startswith("fi.w1.wpa_supplicant1.InvalidArgs"):
542 raise Exception("Unexpected error message: " + str(e))
543
0e126c6d
JM
544def test_dbus_wps_oom(dev, apdev):
545 """D-Bus WPS operation (OOM)"""
546 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
547 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
548
549 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_state", "Get"):
550 if_obj.Get(WPAS_DBUS_IFACE, "State",
551 dbus_interface=dbus.PROPERTIES_IFACE)
552
8b8a1864 553 hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
0e126c6d
JM
554 bssid = apdev[0]['bssid']
555 dev[0].scan_for_bss(bssid, freq=2412)
556
1d32bc2c 557 time.sleep(0.05)
0e126c6d
JM
558 for i in range(1, 3):
559 with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_bsss", "Get"):
560 if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
561 dbus_interface=dbus.PROPERTIES_IFACE)
562
563 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
564 dbus_interface=dbus.PROPERTIES_IFACE)
565 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
566 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_bss_rates", "Get"):
567 bss_obj.Get(WPAS_DBUS_BSS, "Rates",
568 dbus_interface=dbus.PROPERTIES_IFACE)
20c7d26f
JM
569 with alloc_fail(dev[0], 1,
570 "wpa_bss_get_bit_rates;wpas_dbus_getter_bss_rates"):
571 try:
572 bss_obj.Get(WPAS_DBUS_BSS, "Rates",
573 dbus_interface=dbus.PROPERTIES_IFACE)
574 except dbus.exceptions.DBusException, e:
575 pass
0e126c6d
JM
576
577 id = dev[0].add_network()
578 dev[0].set_network(id, "disabled", "0")
579 dev[0].set_network_quoted(id, "ssid", "test")
580
581 for i in range(1, 3):
582 with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_networks", "Get"):
583 if_obj.Get(WPAS_DBUS_IFACE, "Networks",
584 dbus_interface=dbus.PROPERTIES_IFACE)
585
586 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_getter_interfaces", "Get"):
587 dbus_get(dbus, wpas_obj, "Interfaces")
588
589 for i in range(1, 6):
590 with alloc_fail_dbus(dev[0], i, "=eap_get_names_as_string_array;wpas_dbus_getter_eap_methods", "Get"):
591 dbus_get(dbus, wpas_obj, "EapMethods")
592
593 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_setter_config_methods", "Set",
594 expected="Error.Failed: Failed to set property"):
595 val2 = "push_button display"
596 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2,
597 dbus_interface=dbus.PROPERTIES_IFACE)
598
599 with alloc_fail_dbus(dev[0], 1, "=wpa_config_add_network;wpas_dbus_handler_wps_start",
600 "WPS.Start",
601 expected="UnknownError: WPS start failed"):
602 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670'})
603
2ec82e67
JM
604def test_dbus_wps_pbc(dev, apdev):
605 """D-Bus WPS/PBC operation and signals"""
606 try:
81e787b7 607 _test_dbus_wps_pbc(dev, apdev)
2ec82e67
JM
608 finally:
609 dev[0].request("SET wps_cred_processing 0")
610
611def _test_dbus_wps_pbc(dev, apdev):
910eb5b5 612 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
613 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
614
615 hapd = start_ap(apdev[0])
616 hapd.request("WPS_PBC")
617 bssid = apdev[0]['bssid']
618 dev[0].scan_for_bss(bssid, freq="2412")
619 dev[0].request("SET wps_cred_processing 2")
620
1d0a917f
JM
621 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
622 dbus_interface=dbus.PROPERTIES_IFACE)
623 if len(res) != 1:
624 raise Exception("Missing BSSs entry: " + str(res))
625 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
626 props = bss_obj.GetAll(WPAS_DBUS_BSS, dbus_interface=dbus.PROPERTIES_IFACE)
627 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS, res[0], str(props)))
628 if 'WPS' not in props:
629 raise Exception("No WPS information in the BSS entry")
630 if 'Type' not in props['WPS']:
631 raise Exception("No Type field in the WPS dictionary")
632 if props['WPS']['Type'] != 'pbc':
633 raise Exception("Unexpected WPS Type: " + props['WPS']['Type'])
634
2ec82e67
JM
635 class TestDbusWps(TestDbus):
636 def __init__(self, bus, wps):
637 TestDbus.__init__(self, bus)
638 self.success_seen = False
639 self.credentials_received = False
640 self.wps = wps
641
642 def __enter__(self):
643 gobject.timeout_add(1, self.start_pbc)
644 gobject.timeout_add(15000, self.timeout)
645 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
646 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
647 "Credentials")
648 self.loop.run()
649 return self
650
651 def wpsEvent(self, name, args):
652 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
653 if name == "success":
654 self.success_seen = True
655 if self.credentials_received:
656 self.loop.quit()
657
658 def credentials(self, args):
659 logger.debug("credentials: " + str(args))
660 self.credentials_received = True
661 if self.success_seen:
662 self.loop.quit()
663
664 def start_pbc(self, *args):
665 logger.debug("start_pbc")
666 self.wps.Start({'Role': 'enrollee', 'Type': 'pbc'})
667 return False
668
669 def success(self):
670 return self.success_seen and self.credentials_received
671
672 with TestDbusWps(bus, wps) as t:
673 if not t.success():
674 raise Exception("Failure in D-Bus operations")
675
676 dev[0].wait_connected(timeout=10)
677 dev[0].request("DISCONNECT")
678 hapd.disable()
679 dev[0].flush_scan_cache()
680
3a59cda1
JM
681def test_dbus_wps_pbc_overlap(dev, apdev):
682 """D-Bus WPS/PBC operation and signal for PBC overlap"""
683 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
684 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
685
686 hapd = start_ap(apdev[0])
687 hapd2 = start_ap(apdev[1], ssid="test-wps2",
688 ap_uuid="27ea801a-9e5c-4e73-bd82-f89cbcd10d7f")
689 hapd.request("WPS_PBC")
690 hapd2.request("WPS_PBC")
691 bssid = apdev[0]['bssid']
692 dev[0].scan_for_bss(bssid, freq="2412")
693 bssid2 = apdev[1]['bssid']
694 dev[0].scan_for_bss(bssid2, freq="2412")
695
696 class TestDbusWps(TestDbus):
697 def __init__(self, bus, wps):
698 TestDbus.__init__(self, bus)
699 self.overlap_seen = False
700 self.wps = wps
701
702 def __enter__(self):
703 gobject.timeout_add(1, self.start_pbc)
704 gobject.timeout_add(15000, self.timeout)
705 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
706 self.loop.run()
707 return self
708
709 def wpsEvent(self, name, args):
710 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
711 if name == "pbc-overlap":
712 self.overlap_seen = True
713 self.loop.quit()
714
715 def start_pbc(self, *args):
716 logger.debug("start_pbc")
717 self.wps.Start({'Role': 'enrollee', 'Type': 'pbc'})
718 return False
719
720 def success(self):
721 return self.overlap_seen
722
723 with TestDbusWps(bus, wps) as t:
724 if not t.success():
725 raise Exception("Failure in D-Bus operations")
726
727 dev[0].request("WPS_CANCEL")
728 dev[0].request("DISCONNECT")
729 hapd.disable()
730 dev[0].flush_scan_cache()
731
2ec82e67
JM
732def test_dbus_wps_pin(dev, apdev):
733 """D-Bus WPS/PIN operation and signals"""
734 try:
81e787b7 735 _test_dbus_wps_pin(dev, apdev)
2ec82e67
JM
736 finally:
737 dev[0].request("SET wps_cred_processing 0")
738
739def _test_dbus_wps_pin(dev, apdev):
910eb5b5 740 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
741 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
742
743 hapd = start_ap(apdev[0])
744 hapd.request("WPS_PIN any 12345670")
745 bssid = apdev[0]['bssid']
746 dev[0].scan_for_bss(bssid, freq="2412")
747 dev[0].request("SET wps_cred_processing 2")
748
749 class TestDbusWps(TestDbus):
750 def __init__(self, bus):
751 TestDbus.__init__(self, bus)
752 self.success_seen = False
753 self.credentials_received = False
754
755 def __enter__(self):
756 gobject.timeout_add(1, self.start_pin)
757 gobject.timeout_add(15000, self.timeout)
758 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
759 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
760 "Credentials")
761 self.loop.run()
762 return self
763
764 def wpsEvent(self, name, args):
765 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
766 if name == "success":
767 self.success_seen = True
768 if self.credentials_received:
769 self.loop.quit()
770
771 def credentials(self, args):
772 logger.debug("credentials: " + str(args))
773 self.credentials_received = True
774 if self.success_seen:
775 self.loop.quit()
776
777 def start_pin(self, *args):
778 logger.debug("start_pin")
779 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
780 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
781 'Bssid': bssid_ay})
782 return False
783
784 def success(self):
785 return self.success_seen and self.credentials_received
786
787 with TestDbusWps(bus) as t:
788 if not t.success():
789 raise Exception("Failure in D-Bus operations")
790
791 dev[0].wait_connected(timeout=10)
792
793def test_dbus_wps_pin2(dev, apdev):
794 """D-Bus WPS/PIN operation and signals (PIN from wpa_supplicant)"""
795 try:
81e787b7 796 _test_dbus_wps_pin2(dev, apdev)
2ec82e67
JM
797 finally:
798 dev[0].request("SET wps_cred_processing 0")
799
800def _test_dbus_wps_pin2(dev, apdev):
910eb5b5 801 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
802 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
803
804 hapd = start_ap(apdev[0])
805 bssid = apdev[0]['bssid']
806 dev[0].scan_for_bss(bssid, freq="2412")
807 dev[0].request("SET wps_cred_processing 2")
808
809 class TestDbusWps(TestDbus):
810 def __init__(self, bus):
811 TestDbus.__init__(self, bus)
812 self.success_seen = False
813 self.failed = False
814
815 def __enter__(self):
816 gobject.timeout_add(1, self.start_pin)
817 gobject.timeout_add(15000, self.timeout)
818 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
819 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
820 "Credentials")
821 self.loop.run()
822 return self
823
824 def wpsEvent(self, name, args):
825 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
826 if name == "success":
827 self.success_seen = True
828 if self.credentials_received:
829 self.loop.quit()
830
831 def credentials(self, args):
832 logger.debug("credentials: " + str(args))
833 self.credentials_received = True
834 if self.success_seen:
835 self.loop.quit()
836
837 def start_pin(self, *args):
838 logger.debug("start_pin")
839 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
840 res = wps.Start({'Role': 'enrollee', 'Type': 'pin',
841 'Bssid': bssid_ay})
842 pin = res['Pin']
843 h = hostapd.Hostapd(apdev[0]['ifname'])
844 h.request("WPS_PIN any " + pin)
845 return False
846
847 def success(self):
848 return self.success_seen and self.credentials_received
849
850 with TestDbusWps(bus) as t:
851 if not t.success():
852 raise Exception("Failure in D-Bus operations")
853
854 dev[0].wait_connected(timeout=10)
855
856def test_dbus_wps_pin_m2d(dev, apdev):
857 """D-Bus WPS/PIN operation and signals with M2D"""
858 try:
81e787b7 859 _test_dbus_wps_pin_m2d(dev, apdev)
2ec82e67
JM
860 finally:
861 dev[0].request("SET wps_cred_processing 0")
862
863def _test_dbus_wps_pin_m2d(dev, apdev):
910eb5b5 864 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
865 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
866
867 hapd = start_ap(apdev[0])
868 bssid = apdev[0]['bssid']
869 dev[0].scan_for_bss(bssid, freq="2412")
870 dev[0].request("SET wps_cred_processing 2")
871
872 class TestDbusWps(TestDbus):
873 def __init__(self, bus):
874 TestDbus.__init__(self, bus)
875 self.success_seen = False
876 self.credentials_received = False
877
878 def __enter__(self):
879 gobject.timeout_add(1, self.start_pin)
880 gobject.timeout_add(15000, self.timeout)
881 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
882 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
883 "Credentials")
884 self.loop.run()
885 return self
886
887 def wpsEvent(self, name, args):
888 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
889 if name == "success":
890 self.success_seen = True
891 if self.credentials_received:
892 self.loop.quit()
893 elif name == "m2d":
894 h = hostapd.Hostapd(apdev[0]['ifname'])
895 h.request("WPS_PIN any 12345670")
896
897 def credentials(self, args):
898 logger.debug("credentials: " + str(args))
899 self.credentials_received = True
900 if self.success_seen:
901 self.loop.quit()
902
903 def start_pin(self, *args):
904 logger.debug("start_pin")
905 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
906 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
907 'Bssid': bssid_ay})
908 return False
909
910 def success(self):
911 return self.success_seen and self.credentials_received
912
913 with TestDbusWps(bus) as t:
914 if not t.success():
915 raise Exception("Failure in D-Bus operations")
916
917 dev[0].wait_connected(timeout=10)
918
919def test_dbus_wps_reg(dev, apdev):
920 """D-Bus WPS/Registrar operation and signals"""
921 try:
81e787b7 922 _test_dbus_wps_reg(dev, apdev)
2ec82e67
JM
923 finally:
924 dev[0].request("SET wps_cred_processing 0")
925
926def _test_dbus_wps_reg(dev, apdev):
910eb5b5 927 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
928 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
929
930 hapd = start_ap(apdev[0])
931 hapd.request("WPS_PIN any 12345670")
932 bssid = apdev[0]['bssid']
933 dev[0].scan_for_bss(bssid, freq="2412")
934 dev[0].request("SET wps_cred_processing 2")
935
936 class TestDbusWps(TestDbus):
937 def __init__(self, bus):
938 TestDbus.__init__(self, bus)
939 self.credentials_received = False
940
941 def __enter__(self):
942 gobject.timeout_add(100, self.start_reg)
943 gobject.timeout_add(15000, self.timeout)
944 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
945 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
946 "Credentials")
947 self.loop.run()
948 return self
949
950 def wpsEvent(self, name, args):
951 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
952
953 def credentials(self, args):
954 logger.debug("credentials: " + str(args))
955 self.credentials_received = True
956 self.loop.quit()
957
958 def start_reg(self, *args):
959 logger.debug("start_reg")
960 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
961 wps.Start({'Role': 'registrar', 'Type': 'pin',
962 'Pin': '12345670', 'Bssid': bssid_ay})
963 return False
964
965 def success(self):
966 return self.credentials_received
967
968 with TestDbusWps(bus) as t:
969 if not t.success():
970 raise Exception("Failure in D-Bus operations")
971
972 dev[0].wait_connected(timeout=10)
973
2a8e3f35
JM
974def test_dbus_wps_cancel(dev, apdev):
975 """D-Bus WPS Cancel operation"""
976 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
977 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
978
979 hapd = start_ap(apdev[0])
980 bssid = apdev[0]['bssid']
981
982 wps.Cancel()
983 dev[0].scan_for_bss(bssid, freq="2412")
984 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
985 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
986 'Bssid': bssid_ay})
987 wps.Cancel()
988 dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 1)
989
2ec82e67
JM
990def test_dbus_scan_invalid(dev, apdev):
991 """D-Bus invalid scan method"""
910eb5b5 992 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
993 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
994
995 tests = [ ({}, "InvalidArgs"),
996 ({'Type': 123}, "InvalidArgs"),
997 ({'Type': 'foo'}, "InvalidArgs"),
998 ({'Type': 'active', 'Foo': 'bar'}, "InvalidArgs"),
999 ({'Type': 'active', 'SSIDs': 'foo'}, "InvalidArgs"),
1000 ({'Type': 'active', 'SSIDs': ['foo']}, "InvalidArgs"),
1001 ({'Type': 'active',
1002 'SSIDs': [ dbus.ByteArray("1"), dbus.ByteArray("2"),
1003 dbus.ByteArray("3"), dbus.ByteArray("4"),
1004 dbus.ByteArray("5"), dbus.ByteArray("6"),
1005 dbus.ByteArray("7"), dbus.ByteArray("8"),
1006 dbus.ByteArray("9"), dbus.ByteArray("10"),
1007 dbus.ByteArray("11"), dbus.ByteArray("12"),
1008 dbus.ByteArray("13"), dbus.ByteArray("14"),
1009 dbus.ByteArray("15"), dbus.ByteArray("16"),
1010 dbus.ByteArray("17") ]},
1011 "InvalidArgs"),
1012 ({'Type': 'active',
1013 'SSIDs': [ dbus.ByteArray("1234567890abcdef1234567890abcdef1") ]},
1014 "InvalidArgs"),
1015 ({'Type': 'active', 'IEs': 'foo'}, "InvalidArgs"),
1016 ({'Type': 'active', 'IEs': ['foo']}, "InvalidArgs"),
1017 ({'Type': 'active', 'Channels': 2412 }, "InvalidArgs"),
1018 ({'Type': 'active', 'Channels': [ 2412 ] }, "InvalidArgs"),
1019 ({'Type': 'active',
1020 'Channels': [ (dbus.Int32(2412), dbus.UInt32(20)) ] },
1021 "InvalidArgs"),
1022 ({'Type': 'active',
1023 'Channels': [ (dbus.UInt32(2412), dbus.Int32(20)) ] },
1024 "InvalidArgs"),
1025 ({'Type': 'active', 'AllowRoam': "yes" }, "InvalidArgs"),
1026 ({'Type': 'passive', 'IEs': [ dbus.ByteArray("\xdd\x00") ]},
1027 "InvalidArgs"),
1028 ({'Type': 'passive', 'SSIDs': [ dbus.ByteArray("foo") ]},
1029 "InvalidArgs")]
1030 for (t,err) in tests:
1031 try:
1032 iface.Scan(t)
1033 raise Exception("Invalid Scan() arguments accepted: " + str(t))
1034 except dbus.exceptions.DBusException, e:
1035 if err not in str(e):
1036 raise Exception("Unexpected error message for invalid Scan(%s): %s" % (str(t), str(e)))
1037
0e126c6d
JM
1038def test_dbus_scan_oom(dev, apdev):
1039 """D-Bus scan method and OOM"""
1040 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1041 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1042
1043 with alloc_fail_dbus(dev[0], 1,
1044 "wpa_scan_clone_params;wpas_dbus_handler_scan",
1045 "Scan", expected="ScanError: Scan request rejected"):
1046 iface.Scan({ 'Type': 'passive',
1047 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
1048
1049 with alloc_fail_dbus(dev[0], 1,
1050 "=wpas_dbus_get_scan_channels;wpas_dbus_handler_scan",
1051 "Scan"):
1052 iface.Scan({ 'Type': 'passive',
1053 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
1054
1055 with alloc_fail_dbus(dev[0], 1,
1056 "=wpas_dbus_get_scan_ies;wpas_dbus_handler_scan",
1057 "Scan"):
1058 iface.Scan({ 'Type': 'active',
1059 'IEs': [ dbus.ByteArray("\xdd\x00") ],
1060 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
1061
1062 with alloc_fail_dbus(dev[0], 1,
1063 "=wpas_dbus_get_scan_ssids;wpas_dbus_handler_scan",
1064 "Scan"):
1065 iface.Scan({ 'Type': 'active',
1066 'SSIDs': [ dbus.ByteArray("open"),
1067 dbus.ByteArray() ],
1068 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
1069
2ec82e67
JM
1070def test_dbus_scan(dev, apdev):
1071 """D-Bus scan and related signals"""
910eb5b5 1072 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1073 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1074
8b8a1864 1075 hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
2ec82e67
JM
1076
1077 class TestDbusScan(TestDbus):
1078 def __init__(self, bus):
1079 TestDbus.__init__(self, bus)
1080 self.scan_completed = 0
1081 self.bss_added = False
1d0a917f 1082 self.fail_reason = None
2ec82e67
JM
1083
1084 def __enter__(self):
1085 gobject.timeout_add(1, self.run_scan)
1086 gobject.timeout_add(15000, self.timeout)
1087 self.add_signal(self.scanDone, WPAS_DBUS_IFACE, "ScanDone")
1088 self.add_signal(self.bssAdded, WPAS_DBUS_IFACE, "BSSAdded")
1089 self.add_signal(self.bssRemoved, WPAS_DBUS_IFACE, "BSSRemoved")
1090 self.loop.run()
1091 return self
1092
1093 def scanDone(self, success):
1094 logger.debug("scanDone: success=%s" % success)
1095 self.scan_completed += 1
1096 if self.scan_completed == 1:
1097 iface.Scan({'Type': 'passive',
1098 'AllowRoam': True,
1099 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1100 elif self.scan_completed == 2:
1101 iface.Scan({'Type': 'passive',
1102 'AllowRoam': False})
1103 elif self.bss_added and self.scan_completed == 3:
1104 self.loop.quit()
1105
1106 def bssAdded(self, bss, properties):
1107 logger.debug("bssAdded: %s" % bss)
1108 logger.debug(str(properties))
1d0a917f
JM
1109 if 'WPS' in properties:
1110 if 'Type' in properties['WPS']:
1111 self.fail_reason = "Unexpected WPS dictionary entry in non-WPS BSS"
1112 self.loop.quit()
2ec82e67
JM
1113 self.bss_added = True
1114 if self.scan_completed == 3:
1115 self.loop.quit()
1116
1117 def bssRemoved(self, bss):
1118 logger.debug("bssRemoved: %s" % bss)
1119
1120 def run_scan(self, *args):
1121 logger.debug("run_scan")
1122 iface.Scan({'Type': 'active',
1123 'SSIDs': [ dbus.ByteArray("open"),
1124 dbus.ByteArray() ],
1125 'IEs': [ dbus.ByteArray("\xdd\x00"),
1126 dbus.ByteArray() ],
1127 'AllowRoam': False,
1128 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1129 return False
1130
1131 def success(self):
1132 return self.scan_completed == 3 and self.bss_added
1133
1134 with TestDbusScan(bus) as t:
1d0a917f
JM
1135 if t.fail_reason:
1136 raise Exception(t.fail_reason)
2ec82e67
JM
1137 if not t.success():
1138 raise Exception("Expected signals not seen")
1139
1140 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
1141 dbus_interface=dbus.PROPERTIES_IFACE)
1142 if len(res) < 1:
1143 raise Exception("Scan result not in BSSs property")
1144 iface.FlushBSS(0)
1145 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
1146 dbus_interface=dbus.PROPERTIES_IFACE)
1147 if len(res) != 0:
1148 raise Exception("FlushBSS() did not remove scan results from BSSs property")
1149 iface.FlushBSS(1)
1150
1151def test_dbus_scan_busy(dev, apdev):
1152 """D-Bus scan trigger rejection when busy with previous scan"""
910eb5b5 1153 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1154 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1155
1156 if "OK" not in dev[0].request("SCAN freq=2412-2462"):
1157 raise Exception("Failed to start scan")
1158 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-STARTED"], 15)
1159 if ev is None:
1160 raise Exception("Scan start timed out")
1161
1162 try:
1163 iface.Scan({'Type': 'active', 'AllowRoam': False})
1164 raise Exception("Scan() accepted when busy")
1165 except dbus.exceptions.DBusException, e:
1166 if "ScanError: Scan request reject" not in str(e):
1167 raise Exception("Unexpected error message: " + str(e))
1168
1169 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
1170 if ev is None:
1171 raise Exception("Scan timed out")
1172
0238e8ba
JM
1173def test_dbus_scan_abort(dev, apdev):
1174 """D-Bus scan trigger and abort"""
1175 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1176 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1177
1178 iface.Scan({'Type': 'active', 'AllowRoam': False})
1179 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-STARTED"], 15)
1180 if ev is None:
1181 raise Exception("Scan start timed out")
1182
1183 iface.AbortScan()
f41f04d0
JM
1184 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
1185 if ev is None:
1186 raise Exception("Scan abort result timed out")
1187 dev[0].dump_monitor()
0238e8ba
JM
1188 iface.Scan({'Type': 'active', 'AllowRoam': False})
1189 iface.AbortScan()
1190
1191 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
1192 if ev is None:
1193 raise Exception("Scan timed out")
1194
2ec82e67
JM
1195def test_dbus_connect(dev, apdev):
1196 """D-Bus AddNetwork and connect"""
910eb5b5 1197 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1198 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1199
1200 ssid = "test-wpa2-psk"
1201 passphrase = 'qwertyuiop'
1202 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
8b8a1864 1203 hapd = hostapd.add_ap(apdev[0], params)
2ec82e67
JM
1204
1205 class TestDbusConnect(TestDbus):
1206 def __init__(self, bus):
1207 TestDbus.__init__(self, bus)
1208 self.network_added = False
1209 self.network_selected = False
1210 self.network_removed = False
1211 self.state = 0
1212
1213 def __enter__(self):
1214 gobject.timeout_add(1, self.run_connect)
1215 gobject.timeout_add(15000, self.timeout)
1216 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
1217 self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE,
1218 "NetworkRemoved")
1219 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
1220 "NetworkSelected")
1221 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1222 "PropertiesChanged")
1223 self.loop.run()
1224 return self
1225
1226 def networkAdded(self, network, properties):
1227 logger.debug("networkAdded: %s" % str(network))
1228 logger.debug(str(properties))
1229 self.network_added = True
1230
1231 def networkRemoved(self, network):
1232 logger.debug("networkRemoved: %s" % str(network))
1233 self.network_removed = True
1234
1235 def networkSelected(self, network):
1236 logger.debug("networkSelected: %s" % str(network))
1237 self.network_selected = True
1238
1239 def propertiesChanged(self, properties):
1240 logger.debug("propertiesChanged: %s" % str(properties))
1241 if 'State' in properties and properties['State'] == "completed":
1242 if self.state == 0:
1243 self.state = 1
1244 iface.Disconnect()
1245 elif self.state == 2:
1246 self.state = 3
1247 iface.Disconnect()
1248 elif self.state == 4:
1249 self.state = 5
1250 iface.Reattach()
1251 elif self.state == 5:
1252 self.state = 6
13b34972
JM
1253 iface.Disconnect()
1254 elif self.state == 7:
1255 self.state = 8
2ec82e67
JM
1256 res = iface.SignalPoll()
1257 logger.debug("SignalPoll: " + str(res))
1258 if 'frequency' not in res or res['frequency'] != 2412:
1259 self.state = -1
1260 logger.info("Unexpected SignalPoll result")
1261 iface.RemoveNetwork(self.netw)
1262 if 'State' in properties and properties['State'] == "disconnected":
1263 if self.state == 1:
1264 self.state = 2
1265 iface.SelectNetwork(self.netw)
1266 elif self.state == 3:
1267 self.state = 4
1268 iface.Reassociate()
1269 elif self.state == 6:
1270 self.state = 7
13b34972
JM
1271 iface.Reconnect()
1272 elif self.state == 8:
1273 self.state = 9
2ec82e67
JM
1274 self.loop.quit()
1275
1276 def run_connect(self, *args):
1277 logger.debug("run_connect")
1278 args = dbus.Dictionary({ 'ssid': ssid,
1279 'key_mgmt': 'WPA-PSK',
1280 'psk': passphrase,
1281 'scan_freq': 2412 },
1282 signature='sv')
1283 self.netw = iface.AddNetwork(args)
1284 iface.SelectNetwork(self.netw)
1285 return False
1286
1287 def success(self):
1288 if not self.network_added or \
1289 not self.network_removed or \
1290 not self.network_selected:
1291 return False
13b34972 1292 return self.state == 9
2ec82e67
JM
1293
1294 with TestDbusConnect(bus) as t:
1295 if not t.success():
1296 raise Exception("Expected signals not seen")
1297
53f4ed68
JM
1298def test_dbus_connect_psk_mem(dev, apdev):
1299 """D-Bus AddNetwork and connect with memory-only PSK"""
1300 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1301 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1302
1303 ssid = "test-wpa2-psk"
1304 passphrase = 'qwertyuiop'
1305 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
8b8a1864 1306 hapd = hostapd.add_ap(apdev[0], params)
53f4ed68
JM
1307
1308 class TestDbusConnect(TestDbus):
1309 def __init__(self, bus):
1310 TestDbus.__init__(self, bus)
1311 self.connected = False
1312
1313 def __enter__(self):
1314 gobject.timeout_add(1, self.run_connect)
1315 gobject.timeout_add(15000, self.timeout)
1316 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1317 "PropertiesChanged")
1318 self.add_signal(self.networkRequest, WPAS_DBUS_IFACE,
1319 "NetworkRequest")
1320 self.loop.run()
1321 return self
1322
1323 def propertiesChanged(self, properties):
1324 logger.debug("propertiesChanged: %s" % str(properties))
1325 if 'State' in properties and properties['State'] == "completed":
1326 self.connected = True
1327 self.loop.quit()
1328
1329 def networkRequest(self, path, field, txt):
1330 logger.debug("networkRequest: %s %s %s" % (path, field, txt))
1331 if field == "PSK_PASSPHRASE":
1332 iface.NetworkReply(path, field, '"' + passphrase + '"')
1333
1334 def run_connect(self, *args):
1335 logger.debug("run_connect")
1336 args = dbus.Dictionary({ 'ssid': ssid,
1337 'key_mgmt': 'WPA-PSK',
1338 'mem_only_psk': 1,
1339 'scan_freq': 2412 },
1340 signature='sv')
1341 self.netw = iface.AddNetwork(args)
1342 iface.SelectNetwork(self.netw)
1343 return False
1344
1345 def success(self):
1346 return self.connected
1347
1348 with TestDbusConnect(bus) as t:
1349 if not t.success():
1350 raise Exception("Expected signals not seen")
1351
0e126c6d
JM
1352def test_dbus_connect_oom(dev, apdev):
1353 """D-Bus AddNetwork and connect when out-of-memory"""
1354 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1355 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1356
1357 if "OK" not in dev[0].request("TEST_ALLOC_FAIL 0:"):
1358 raise HwsimSkip("TEST_ALLOC_FAIL not supported in the build")
1359
1360 ssid = "test-wpa2-psk"
1361 passphrase = 'qwertyuiop'
1362 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
8b8a1864 1363 hapd = hostapd.add_ap(apdev[0], params)
0e126c6d
JM
1364
1365 class TestDbusConnect(TestDbus):
1366 def __init__(self, bus):
1367 TestDbus.__init__(self, bus)
1368 self.network_added = False
1369 self.network_selected = False
1370 self.network_removed = False
1371 self.state = 0
1372
1373 def __enter__(self):
1374 gobject.timeout_add(1, self.run_connect)
1375 gobject.timeout_add(1500, self.timeout)
1376 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
1377 self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE,
1378 "NetworkRemoved")
1379 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
1380 "NetworkSelected")
1381 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1382 "PropertiesChanged")
1383 self.loop.run()
1384 return self
1385
1386 def networkAdded(self, network, properties):
1387 logger.debug("networkAdded: %s" % str(network))
1388 logger.debug(str(properties))
1389 self.network_added = True
1390
1391 def networkRemoved(self, network):
1392 logger.debug("networkRemoved: %s" % str(network))
1393 self.network_removed = True
1394
1395 def networkSelected(self, network):
1396 logger.debug("networkSelected: %s" % str(network))
1397 self.network_selected = True
1398
1399 def propertiesChanged(self, properties):
1400 logger.debug("propertiesChanged: %s" % str(properties))
1401 if 'State' in properties and properties['State'] == "completed":
1402 if self.state == 0:
1403 self.state = 1
1404 iface.Disconnect()
1405 elif self.state == 2:
1406 self.state = 3
1407 iface.Disconnect()
1408 elif self.state == 4:
1409 self.state = 5
1410 iface.Reattach()
1411 elif self.state == 5:
1412 self.state = 6
1413 res = iface.SignalPoll()
1414 logger.debug("SignalPoll: " + str(res))
1415 if 'frequency' not in res or res['frequency'] != 2412:
1416 self.state = -1
1417 logger.info("Unexpected SignalPoll result")
1418 iface.RemoveNetwork(self.netw)
1419 if 'State' in properties and properties['State'] == "disconnected":
1420 if self.state == 1:
1421 self.state = 2
1422 iface.SelectNetwork(self.netw)
1423 elif self.state == 3:
1424 self.state = 4
1425 iface.Reassociate()
1426 elif self.state == 6:
1427 self.state = 7
1428 self.loop.quit()
1429
1430 def run_connect(self, *args):
1431 logger.debug("run_connect")
1432 args = dbus.Dictionary({ 'ssid': ssid,
1433 'key_mgmt': 'WPA-PSK',
1434 'psk': passphrase,
1435 'scan_freq': 2412 },
1436 signature='sv')
1437 try:
1438 self.netw = iface.AddNetwork(args)
1439 except Exception, e:
1440 logger.info("Exception on AddNetwork: " + str(e))
1441 self.loop.quit()
1442 return False
1443 try:
1444 iface.SelectNetwork(self.netw)
1445 except Exception, e:
1446 logger.info("Exception on SelectNetwork: " + str(e))
1447 self.loop.quit()
1448
1449 return False
1450
1451 def success(self):
1452 if not self.network_added or \
1453 not self.network_removed or \
1454 not self.network_selected:
1455 return False
1456 return self.state == 7
1457
1458 count = 0
1459 for i in range(1, 1000):
1460 for j in range(3):
1461 dev[j].dump_monitor()
1462 dev[0].request("TEST_ALLOC_FAIL %d:main" % i)
1463 try:
1464 with TestDbusConnect(bus) as t:
1465 if not t.success():
1466 logger.info("Iteration %d - Expected signals not seen" % i)
1467 else:
1468 logger.info("Iteration %d - success" % i)
1469
1470 state = dev[0].request('GET_ALLOC_FAIL')
1471 logger.info("GET_ALLOC_FAIL: " + state)
1472 dev[0].dump_monitor()
1473 dev[0].request("TEST_ALLOC_FAIL 0:")
1474 if i < 3:
1475 raise Exception("Connection succeeded during out-of-memory")
1476 if not state.startswith('0:'):
1477 count += 1
1478 if count == 5:
1479 break
1480 except:
1481 pass
2ec82e67 1482
89a79ca2
JM
1483 # Force regulatory update to re-fetch hw capabilities for the following
1484 # test cases.
1485 try:
1486 dev[0].dump_monitor()
1487 subprocess.call(['iw', 'reg', 'set', 'US'])
1488 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1489 finally:
1490 dev[0].dump_monitor()
1491 subprocess.call(['iw', 'reg', 'set', '00'])
1492 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1493
2ec82e67
JM
1494def test_dbus_while_not_connected(dev, apdev):
1495 """D-Bus invalid operations while not connected"""
910eb5b5 1496 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1497 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1498
1499 try:
1500 iface.Disconnect()
1501 raise Exception("Disconnect() accepted when not connected")
1502 except dbus.exceptions.DBusException, e:
1503 if "NotConnected" not in str(e):
1504 raise Exception("Unexpected error message for invalid Disconnect: " + str(e))
1505
1506 try:
1507 iface.Reattach()
1508 raise Exception("Reattach() accepted when not connected")
1509 except dbus.exceptions.DBusException, e:
1510 if "NotConnected" not in str(e):
1511 raise Exception("Unexpected error message for invalid Reattach: " + str(e))
1512
1513def test_dbus_connect_eap(dev, apdev):
1514 """D-Bus AddNetwork and connect to EAP network"""
089e7ca3 1515 check_altsubject_match_support(dev[0])
910eb5b5 1516 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1517 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1518
1519 ssid = "ieee8021x-open"
1520 params = hostapd.radius_params()
1521 params["ssid"] = ssid
1522 params["ieee8021x"] = "1"
8b8a1864 1523 hapd = hostapd.add_ap(apdev[0], params)
2ec82e67
JM
1524
1525 class TestDbusConnect(TestDbus):
1526 def __init__(self, bus):
1527 TestDbus.__init__(self, bus)
1528 self.certification_received = False
1529 self.eap_status = False
1530 self.state = 0
1531
1532 def __enter__(self):
1533 gobject.timeout_add(1, self.run_connect)
1534 gobject.timeout_add(15000, self.timeout)
1535 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1536 "PropertiesChanged")
1537 self.add_signal(self.certification, WPAS_DBUS_IFACE,
2099fed4 1538 "Certification", byte_arrays=True)
2ec82e67
JM
1539 self.add_signal(self.networkRequest, WPAS_DBUS_IFACE,
1540 "NetworkRequest")
1541 self.add_signal(self.eap, WPAS_DBUS_IFACE, "EAP")
1542 self.loop.run()
1543 return self
1544
1545 def propertiesChanged(self, properties):
1546 logger.debug("propertiesChanged: %s" % str(properties))
1547 if 'State' in properties and properties['State'] == "completed":
1548 if self.state == 0:
1549 self.state = 1
1550 iface.EAPLogoff()
2099fed4
JM
1551 logger.info("Set dNSName constraint")
1552 net_obj = bus.get_object(WPAS_DBUS_SERVICE, self.netw)
1553 args = dbus.Dictionary({ 'altsubject_match':
1554 self.server_dnsname },
1555 signature='sv')
1556 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1557 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67
JM
1558 elif self.state == 2:
1559 self.state = 3
2099fed4
JM
1560 iface.Disconnect()
1561 logger.info("Set non-matching dNSName constraint")
1562 net_obj = bus.get_object(WPAS_DBUS_SERVICE, self.netw)
1563 args = dbus.Dictionary({ 'altsubject_match':
1564 self.server_dnsname + "FOO" },
1565 signature='sv')
1566 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1567 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67
JM
1568 if 'State' in properties and properties['State'] == "disconnected":
1569 if self.state == 1:
1570 self.state = 2
1571 iface.EAPLogon()
1572 iface.SelectNetwork(self.netw)
2099fed4
JM
1573 if self.state == 3:
1574 self.state = 4
1575 iface.SelectNetwork(self.netw)
2ec82e67
JM
1576
1577 def certification(self, args):
1578 logger.debug("certification: %s" % str(args))
1579 self.certification_received = True
2099fed4
JM
1580 if args['depth'] == 0:
1581 # The test server certificate is supposed to have dNSName
1582 if len(args['altsubject']) < 1:
1583 raise Exception("Missing dNSName")
1584 dnsname = args['altsubject'][0]
1585 if not dnsname.startswith("DNS:"):
1586 raise Exception("Expected dNSName not found: " + dnsname)
1587 logger.info("altsubject: " + dnsname)
1588 self.server_dnsname = dnsname
2ec82e67
JM
1589
1590 def eap(self, status, parameter):
1591 logger.debug("EAP: status=%s parameter=%s" % (status, parameter))
1592 if status == 'completion' and parameter == 'success':
1593 self.eap_status = True
2099fed4
JM
1594 if self.state == 4 and status == 'remote certificate verification' and parameter == 'AltSubject mismatch':
1595 self.state = 5
1596 self.loop.quit()
2ec82e67
JM
1597
1598 def networkRequest(self, path, field, txt):
1599 logger.debug("networkRequest: %s %s %s" % (path, field, txt))
1600 if field == "PASSWORD":
1601 iface.NetworkReply(path, field, "password")
1602
1603 def run_connect(self, *args):
1604 logger.debug("run_connect")
1605 args = dbus.Dictionary({ 'ssid': ssid,
1606 'key_mgmt': 'IEEE8021X',
1607 'eapol_flags': 0,
1608 'eap': 'TTLS',
1609 'anonymous_identity': 'ttls',
1610 'identity': 'pap user',
1611 'ca_cert': 'auth_serv/ca.pem',
1612 'phase2': 'auth=PAP',
1613 'scan_freq': 2412 },
1614 signature='sv')
1615 self.netw = iface.AddNetwork(args)
1616 iface.SelectNetwork(self.netw)
1617 return False
1618
1619 def success(self):
1620 if not self.eap_status or not self.certification_received:
1621 return False
2099fed4 1622 return self.state == 5
2ec82e67
JM
1623
1624 with TestDbusConnect(bus) as t:
1625 if not t.success():
1626 raise Exception("Expected signals not seen")
1627
1628def test_dbus_network(dev, apdev):
1629 """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
910eb5b5 1630 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1631 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1632
1633 args = dbus.Dictionary({ 'ssid': "foo",
1634 'key_mgmt': 'WPA-PSK',
1635 'psk': "12345678",
1636 'identity': dbus.ByteArray([ 1, 2 ]),
1637 'priority': dbus.Int32(0),
1638 'scan_freq': dbus.UInt32(2412) },
1639 signature='sv')
1640 netw = iface.AddNetwork(args)
5a233fbd
JM
1641 id = int(dev[0].list_networks()[0]['id'])
1642 val = dev[0].get_network(id, "scan_freq")
1643 if val != "2412":
1644 raise Exception("Invalid scan_freq value: " + str(val))
1645 iface.RemoveNetwork(netw)
1646
1647 args = dbus.Dictionary({ 'ssid': "foo",
1648 'key_mgmt': 'NONE',
1649 'scan_freq': "2412 2432",
1650 'freq_list': "2412 2417 2432" },
1651 signature='sv')
1652 netw = iface.AddNetwork(args)
1653 id = int(dev[0].list_networks()[0]['id'])
1654 val = dev[0].get_network(id, "scan_freq")
1655 if val != "2412 2432":
1656 raise Exception("Invalid scan_freq value (2): " + str(val))
1657 val = dev[0].get_network(id, "freq_list")
1658 if val != "2412 2417 2432":
1659 raise Exception("Invalid freq_list value: " + str(val))
2ec82e67
JM
1660 iface.RemoveNetwork(netw)
1661 try:
1662 iface.RemoveNetwork(netw)
1663 raise Exception("Invalid RemoveNetwork() accepted")
1664 except dbus.exceptions.DBusException, e:
1665 if "NetworkUnknown" not in str(e):
1666 raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e))
1667 try:
1668 iface.SelectNetwork(netw)
1669 raise Exception("Invalid SelectNetwork() accepted")
1670 except dbus.exceptions.DBusException, e:
1671 if "NetworkUnknown" not in str(e):
1672 raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e))
1673
1674 args = dbus.Dictionary({ 'ssid': "foo1", 'key_mgmt': 'NONE',
1675 'identity': "testuser", 'scan_freq': '2412' },
1676 signature='sv')
1677 netw1 = iface.AddNetwork(args)
1678 args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1679 signature='sv')
1680 netw2 = iface.AddNetwork(args)
1681 res = if_obj.Get(WPAS_DBUS_IFACE, "Networks",
1682 dbus_interface=dbus.PROPERTIES_IFACE)
1683 if len(res) != 2:
1684 raise Exception("Unexpected number of networks")
1685
1686 net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1)
1687 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1688 dbus_interface=dbus.PROPERTIES_IFACE)
1689 if res != False:
1690 raise Exception("Added network was unexpectedly enabled by default")
1691 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(True),
1692 dbus_interface=dbus.PROPERTIES_IFACE)
1693 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1694 dbus_interface=dbus.PROPERTIES_IFACE)
1695 if res != True:
1696 raise Exception("Set(Enabled,True) did not seem to change property value")
1697 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(False),
1698 dbus_interface=dbus.PROPERTIES_IFACE)
1699 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1700 dbus_interface=dbus.PROPERTIES_IFACE)
1701 if res != False:
1702 raise Exception("Set(Enabled,False) did not seem to change property value")
1703 try:
1704 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.UInt32(1),
1705 dbus_interface=dbus.PROPERTIES_IFACE)
1706 raise Exception("Invalid Set(Enabled,1) accepted")
1707 except dbus.exceptions.DBusException, e:
1708 if "Error.Failed: wrong property type" not in str(e):
1709 raise Exception("Unexpected error message for invalid Set(Enabled,1): " + str(e))
1710
1711 args = dbus.Dictionary({ 'ssid': "foo1new" }, signature='sv')
1712 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1713 dbus_interface=dbus.PROPERTIES_IFACE)
1714 res = net_obj.Get(WPAS_DBUS_NETWORK, "Properties",
1715 dbus_interface=dbus.PROPERTIES_IFACE)
1716 if res['ssid'] != '"foo1new"':
1717 raise Exception("Set(Properties) failed to update ssid")
1718 if res['identity'] != '"testuser"':
1719 raise Exception("Set(Properties) unexpectedly changed unrelated parameter")
1720
1721 iface.RemoveAllNetworks()
1722 res = if_obj.Get(WPAS_DBUS_IFACE, "Networks",
1723 dbus_interface=dbus.PROPERTIES_IFACE)
1724 if len(res) != 0:
1725 raise Exception("Unexpected number of networks")
1726 iface.RemoveAllNetworks()
1727
1728 tests = [ dbus.Dictionary({ 'psk': "1234567" }, signature='sv'),
1729 dbus.Dictionary({ 'identity': dbus.ByteArray() },
1730 signature='sv'),
1731 dbus.Dictionary({ 'identity': dbus.Byte(1) }, signature='sv'),
1732 dbus.Dictionary({ 'identity': "" }, signature='sv') ]
1733 for args in tests:
1734 try:
1735 iface.AddNetwork(args)
1736 raise Exception("Invalid AddNetwork args accepted: " + str(args))
1737 except dbus.exceptions.DBusException, e:
1738 if "InvalidArgs" not in str(e):
1739 raise Exception("Unexpected error message for invalid AddNetwork: " + str(e))
1740
0e126c6d
JM
1741def test_dbus_network_oom(dev, apdev):
1742 """D-Bus AddNetwork/RemoveNetwork parameters and OOM error cases"""
1743 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1744 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1745
1746 args = dbus.Dictionary({ 'ssid': "foo1", 'key_mgmt': 'NONE',
1747 'identity': "testuser", 'scan_freq': '2412' },
1748 signature='sv')
1749 netw1 = iface.AddNetwork(args)
1750 net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1)
1751
1752 with alloc_fail_dbus(dev[0], 1,
1753 "wpa_config_get_all;wpas_dbus_getter_network_properties",
1754 "Get"):
1755 net_obj.Get(WPAS_DBUS_NETWORK, "Properties",
1756 dbus_interface=dbus.PROPERTIES_IFACE)
1757
1758 iface.RemoveAllNetworks()
1759
1760 with alloc_fail_dbus(dev[0], 1,
1761 "wpas_dbus_new_decompose_object_path;wpas_dbus_handler_remove_network",
1762 "RemoveNetwork", "InvalidArgs"):
1763 iface.RemoveNetwork(dbus.ObjectPath("/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234"))
1764
1765 with alloc_fail(dev[0], 1, "wpa_dbus_register_object_per_iface;wpas_dbus_register_network"):
1766 args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1767 signature='sv')
1768 try:
1769 netw = iface.AddNetwork(args)
1770 # Currently, AddNetwork() succeeds even if os_strdup() for path
1771 # fails, so remove the network if that occurs.
1772 iface.RemoveNetwork(netw)
1773 except dbus.exceptions.DBusException, e:
1774 pass
1775
1776 for i in range(1, 3):
1777 with alloc_fail(dev[0], i, "=wpas_dbus_register_network"):
1778 try:
1779 netw = iface.AddNetwork(args)
1780 # Currently, AddNetwork() succeeds even if network registration
1781 # fails, so remove the network if that occurs.
1782 iface.RemoveNetwork(netw)
1783 except dbus.exceptions.DBusException, e:
1784 pass
1785
1786 with alloc_fail_dbus(dev[0], 1,
1787 "=wpa_config_add_network;wpas_dbus_handler_add_network",
1788 "AddNetwork",
1789 "UnknownError: wpa_supplicant could not add a network"):
1790 args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1791 signature='sv')
1792 netw = iface.AddNetwork(args)
1793
1794 tests = [ (1,
1795 'wpa_dbus_dict_get_entry;set_network_properties;wpas_dbus_handler_add_network',
1796 dbus.Dictionary({ 'ssid': dbus.ByteArray(' ') },
1797 signature='sv')),
1798 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1799 dbus.Dictionary({ 'ssid': 'foo' }, signature='sv')),
1800 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1801 dbus.Dictionary({ 'eap': 'foo' }, signature='sv')),
1802 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1803 dbus.Dictionary({ 'priority': dbus.UInt32(1) },
1804 signature='sv')),
1805 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1806 dbus.Dictionary({ 'priority': dbus.Int32(1) },
1807 signature='sv')),
1808 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1809 dbus.Dictionary({ 'ssid': dbus.ByteArray(' ') },
1810 signature='sv')) ]
1811 for (count,funcs,args) in tests:
1812 with alloc_fail_dbus(dev[0], count, funcs, "AddNetwork", "InvalidArgs"):
1813 netw = iface.AddNetwork(args)
1814
1815 if len(if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
1816 dbus_interface=dbus.PROPERTIES_IFACE)) > 0:
1817 raise Exception("Unexpected network block added")
1818 if len(dev[0].list_networks()) > 0:
1819 raise Exception("Unexpected network block visible")
1820
2ec82e67
JM
1821def test_dbus_interface(dev, apdev):
1822 """D-Bus CreateInterface/GetInterface/RemoveInterface parameters and error cases"""
7966674d
JM
1823 try:
1824 _test_dbus_interface(dev, apdev)
1825 finally:
1826 # Need to force P2P channel list update since the 'lo' interface
1827 # with driver=none ends up configuring default dualband channels.
1828 dev[0].request("SET country US")
1829 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1830 if ev is None:
1831 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"],
1832 timeout=1)
1833 dev[0].request("SET country 00")
1834 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1835 if ev is None:
1836 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"],
1837 timeout=1)
1838 subprocess.call(['iw', 'reg', 'set', '00'])
1839
1840def _test_dbus_interface(dev, apdev):
910eb5b5 1841 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1842 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
1843
1844 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1845 signature='sv')
1846 path = wpas.CreateInterface(params)
1847 logger.debug("New interface path: " + str(path))
1848 path2 = wpas.GetInterface("lo")
1849 if path != path2:
1850 raise Exception("Interface object mismatch")
1851
1852 params = dbus.Dictionary({ 'Ifname': 'lo',
1853 'Driver': 'none',
1854 'ConfigFile': 'foo',
1855 'BridgeIfname': 'foo', },
1856 signature='sv')
1857 try:
1858 wpas.CreateInterface(params)
1859 raise Exception("Invalid CreateInterface() accepted")
1860 except dbus.exceptions.DBusException, e:
1861 if "InterfaceExists" not in str(e):
1862 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1863
1864 wpas.RemoveInterface(path)
1865 try:
1866 wpas.RemoveInterface(path)
1867 raise Exception("Invalid RemoveInterface() accepted")
1868 except dbus.exceptions.DBusException, e:
1869 if "InterfaceUnknown" not in str(e):
1870 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e))
1871
1872 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none',
1873 'Foo': 123 },
1874 signature='sv')
1875 try:
1876 wpas.CreateInterface(params)
1877 raise Exception("Invalid CreateInterface() accepted")
1878 except dbus.exceptions.DBusException, e:
1879 if "InvalidArgs" not in str(e):
1880 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1881
1882 params = dbus.Dictionary({ 'Driver': 'none' }, signature='sv')
1883 try:
1884 wpas.CreateInterface(params)
1885 raise Exception("Invalid CreateInterface() accepted")
1886 except dbus.exceptions.DBusException, e:
1887 if "InvalidArgs" not in str(e):
1888 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1889
1890 try:
1891 wpas.GetInterface("lo")
1892 raise Exception("Invalid GetInterface() accepted")
1893 except dbus.exceptions.DBusException, e:
1894 if "InterfaceUnknown" not in str(e):
1895 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e))
1896
0e126c6d
JM
1897def test_dbus_interface_oom(dev, apdev):
1898 """D-Bus CreateInterface/GetInterface/RemoveInterface OOM error cases"""
1899 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1900 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
1901
1902 with alloc_fail_dbus(dev[0], 1, "wpa_dbus_dict_get_entry;wpas_dbus_handler_create_interface", "CreateInterface", "InvalidArgs"):
1903 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1904 signature='sv')
1905 wpas.CreateInterface(params)
1906
1907 for i in range(1, 1000):
1908 dev[0].request("TEST_ALLOC_FAIL %d:wpa_supplicant_add_iface;wpas_dbus_handler_create_interface" % i)
1909 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1910 signature='sv')
1911 try:
1912 npath = wpas.CreateInterface(params)
1913 wpas.RemoveInterface(npath)
1914 logger.info("CreateInterface succeeds after %d allocation failures" % i)
1915 state = dev[0].request('GET_ALLOC_FAIL')
1916 logger.info("GET_ALLOC_FAIL: " + state)
1917 dev[0].dump_monitor()
1918 dev[0].request("TEST_ALLOC_FAIL 0:")
1919 if i < 5:
1920 raise Exception("CreateInterface succeeded during out-of-memory")
1921 if not state.startswith('0:'):
1922 break
1923 except dbus.exceptions.DBusException, e:
1924 pass
1925
1926 for arg in [ 'Driver', 'Ifname', 'ConfigFile', 'BridgeIfname' ]:
1927 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_handler_create_interface",
1928 "CreateInterface"):
1929 params = dbus.Dictionary({ arg: 'foo' }, signature='sv')
1930 wpas.CreateInterface(params)
1931
2ec82e67
JM
1932def test_dbus_blob(dev, apdev):
1933 """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
910eb5b5 1934 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
1935 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1936
1937 blob = dbus.ByteArray("\x01\x02\x03")
1938 iface.AddBlob('blob1', blob)
1939 try:
1940 iface.AddBlob('blob1', dbus.ByteArray("\x01\x02\x04"))
1941 raise Exception("Invalid AddBlob() accepted")
1942 except dbus.exceptions.DBusException, e:
1943 if "BlobExists" not in str(e):
1944 raise Exception("Unexpected error message for invalid AddBlob: " + str(e))
1945 res = iface.GetBlob('blob1')
1946 if len(res) != len(blob):
1947 raise Exception("Unexpected blob data length")
1948 for i in range(len(res)):
1949 if res[i] != dbus.Byte(blob[i]):
1950 raise Exception("Unexpected blob data")
1951 res = if_obj.Get(WPAS_DBUS_IFACE, "Blobs",
1952 dbus_interface=dbus.PROPERTIES_IFACE)
1953 if 'blob1' not in res:
1954 raise Exception("Added blob missing from Blobs property")
1955 iface.RemoveBlob('blob1')
1956 try:
1957 iface.RemoveBlob('blob1')
1958 raise Exception("Invalid RemoveBlob() accepted")
1959 except dbus.exceptions.DBusException, e:
1960 if "BlobUnknown" not in str(e):
1961 raise Exception("Unexpected error message for invalid RemoveBlob: " + str(e))
1962 try:
1963 iface.GetBlob('blob1')
1964 raise Exception("Invalid GetBlob() accepted")
1965 except dbus.exceptions.DBusException, e:
1966 if "BlobUnknown" not in str(e):
1967 raise Exception("Unexpected error message for invalid GetBlob: " + str(e))
1968
1969 class TestDbusBlob(TestDbus):
1970 def __init__(self, bus):
1971 TestDbus.__init__(self, bus)
1972 self.blob_added = False
1973 self.blob_removed = False
1974
1975 def __enter__(self):
1976 gobject.timeout_add(1, self.run_blob)
1977 gobject.timeout_add(15000, self.timeout)
1978 self.add_signal(self.blobAdded, WPAS_DBUS_IFACE, "BlobAdded")
1979 self.add_signal(self.blobRemoved, WPAS_DBUS_IFACE, "BlobRemoved")
1980 self.loop.run()
1981 return self
1982
1983 def blobAdded(self, blobName):
1984 logger.debug("blobAdded: %s" % blobName)
1985 if blobName == 'blob2':
1986 self.blob_added = True
1987
1988 def blobRemoved(self, blobName):
1989 logger.debug("blobRemoved: %s" % blobName)
1990 if blobName == 'blob2':
1991 self.blob_removed = True
1992 self.loop.quit()
1993
1994 def run_blob(self, *args):
1995 logger.debug("run_blob")
1996 iface.AddBlob('blob2', dbus.ByteArray("\x01\x02\x04"))
1997 iface.RemoveBlob('blob2')
1998 return False
1999
2000 def success(self):
2001 return self.blob_added and self.blob_removed
2002
2003 with TestDbusBlob(bus) as t:
2004 if not t.success():
2005 raise Exception("Expected signals not seen")
2006
0e126c6d
JM
2007def test_dbus_blob_oom(dev, apdev):
2008 """D-Bus AddNetwork/RemoveNetwork OOM error cases"""
2009 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2010 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2011
2012 for i in range(1, 4):
2013 with alloc_fail_dbus(dev[0], i, "wpas_dbus_handler_add_blob",
2014 "AddBlob"):
2015 iface.AddBlob('blob_no_mem', dbus.ByteArray("\x01\x02\x03\x04"))
2016
2ec82e67
JM
2017def test_dbus_autoscan(dev, apdev):
2018 """D-Bus Autoscan()"""
910eb5b5 2019 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2020 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2021
2022 iface.AutoScan("foo")
2023 iface.AutoScan("periodic:1")
2024 iface.AutoScan("")
2025 dev[0].request("AUTOSCAN ")
2026
0e126c6d
JM
2027def test_dbus_autoscan_oom(dev, apdev):
2028 """D-Bus Autoscan() OOM"""
2029 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2030 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2031
2032 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_autoscan", "AutoScan"):
2033 iface.AutoScan("foo")
2034 dev[0].request("AUTOSCAN ")
2035
2ec82e67
JM
2036def test_dbus_tdls_invalid(dev, apdev):
2037 """D-Bus invalid TDLS operations"""
910eb5b5 2038 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2039 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2040
8b8a1864 2041 hapd = hostapd.add_ap(apdev[0], { "ssid": "test-open" })
2ec82e67
JM
2042 connect_2sta_open(dev, hapd)
2043 addr1 = dev[1].p2p_interface_addr()
2044
2045 try:
2046 iface.TDLSDiscover("foo")
2047 raise Exception("Invalid TDLSDiscover() accepted")
2048 except dbus.exceptions.DBusException, e:
2049 if "InvalidArgs" not in str(e):
2050 raise Exception("Unexpected error message for invalid TDLSDiscover: " + str(e))
2051
2052 try:
2053 iface.TDLSStatus("foo")
2054 raise Exception("Invalid TDLSStatus() accepted")
2055 except dbus.exceptions.DBusException, e:
2056 if "InvalidArgs" not in str(e):
2057 raise Exception("Unexpected error message for invalid TDLSStatus: " + str(e))
2058
2059 res = iface.TDLSStatus(addr1)
2060 if res != "peer does not exist":
2061 raise Exception("Unexpected TDLSStatus response")
2062
2063 try:
2064 iface.TDLSSetup("foo")
2065 raise Exception("Invalid TDLSSetup() accepted")
2066 except dbus.exceptions.DBusException, e:
2067 if "InvalidArgs" not in str(e):
2068 raise Exception("Unexpected error message for invalid TDLSSetup: " + str(e))
2069
2070 try:
2071 iface.TDLSTeardown("foo")
2072 raise Exception("Invalid TDLSTeardown() accepted")
2073 except dbus.exceptions.DBusException, e:
2074 if "InvalidArgs" not in str(e):
2075 raise Exception("Unexpected error message for invalid TDLSTeardown: " + str(e))
2076
795b6f57
JM
2077 try:
2078 iface.TDLSTeardown("00:11:22:33:44:55")
2079 raise Exception("TDLSTeardown accepted for unknown peer")
2080 except dbus.exceptions.DBusException, e:
2081 if "UnknownError: error performing TDLS teardown" not in str(e):
2082 raise Exception("Unexpected error message: " + str(e))
2083
82aea622
JM
2084 try:
2085 iface.TDLSChannelSwitch({})
2086 raise Exception("Invalid TDLSChannelSwitch() accepted")
2087 except dbus.exceptions.DBusException, e:
2088 if "InvalidArgs" not in str(e):
2089 raise Exception("Unexpected error message for invalid TDLSChannelSwitch: " + str(e))
2090
2091 try:
2092 iface.TDLSCancelChannelSwitch("foo")
2093 raise Exception("Invalid TDLSCancelChannelSwitch() accepted")
2094 except dbus.exceptions.DBusException, e:
2095 if "InvalidArgs" not in str(e):
2096 raise Exception("Unexpected error message for invalid TDLSCancelChannelSwitch: " + str(e))
2097
0e126c6d
JM
2098def test_dbus_tdls_oom(dev, apdev):
2099 """D-Bus TDLS operations during OOM"""
2100 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2101 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2102
2103 with alloc_fail_dbus(dev[0], 1, "wpa_tdls_add_peer", "TDLSSetup",
2104 "UnknownError: error performing TDLS setup"):
2105 iface.TDLSSetup("00:11:22:33:44:55")
2106
2ec82e67
JM
2107def test_dbus_tdls(dev, apdev):
2108 """D-Bus TDLS"""
910eb5b5 2109 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2110 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2111
8b8a1864 2112 hapd = hostapd.add_ap(apdev[0], { "ssid": "test-open" })
2ec82e67
JM
2113 connect_2sta_open(dev, hapd)
2114
2115 addr1 = dev[1].p2p_interface_addr()
2116
2117 class TestDbusTdls(TestDbus):
2118 def __init__(self, bus):
2119 TestDbus.__init__(self, bus)
2120 self.tdls_setup = False
2121 self.tdls_teardown = False
2122
2123 def __enter__(self):
2124 gobject.timeout_add(1, self.run_tdls)
2125 gobject.timeout_add(15000, self.timeout)
2126 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
2127 "PropertiesChanged")
2128 self.loop.run()
2129 return self
2130
2131 def propertiesChanged(self, properties):
2132 logger.debug("propertiesChanged: %s" % str(properties))
2133
2134 def run_tdls(self, *args):
2135 logger.debug("run_tdls")
2136 iface.TDLSDiscover(addr1)
2137 gobject.timeout_add(100, self.run_tdls2)
2138 return False
2139
2140 def run_tdls2(self, *args):
2141 logger.debug("run_tdls2")
2142 iface.TDLSSetup(addr1)
2143 gobject.timeout_add(500, self.run_tdls3)
2144 return False
2145
2146 def run_tdls3(self, *args):
2147 logger.debug("run_tdls3")
2148 res = iface.TDLSStatus(addr1)
2149 if res == "connected":
2150 self.tdls_setup = True
2151 else:
2152 logger.info("Unexpected TDLSStatus: " + res)
2153 iface.TDLSTeardown(addr1)
2154 gobject.timeout_add(200, self.run_tdls4)
2155 return False
2156
2157 def run_tdls4(self, *args):
2158 logger.debug("run_tdls4")
2159 res = iface.TDLSStatus(addr1)
2160 if res == "peer does not exist":
2161 self.tdls_teardown = True
2162 else:
2163 logger.info("Unexpected TDLSStatus: " + res)
2164 self.loop.quit()
2165 return False
2166
2167 def success(self):
2168 return self.tdls_setup and self.tdls_teardown
2169
2170 with TestDbusTdls(bus) as t:
2171 if not t.success():
2172 raise Exception("Expected signals not seen")
2173
82aea622
JM
2174def test_dbus_tdls_channel_switch(dev, apdev):
2175 """D-Bus TDLS channel switch configuration"""
2176 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2177 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2178
2179 hapd = hostapd.add_ap(apdev[0], { "ssid": "test-open" })
2180 connect_2sta_open(dev, hapd)
2181
2182 addr1 = dev[1].p2p_interface_addr()
2183
2184 class TestDbusTdls(TestDbus):
2185 def __init__(self, bus):
2186 TestDbus.__init__(self, bus)
2187 self.tdls_setup = False
2188 self.tdls_done = False
2189
2190 def __enter__(self):
2191 gobject.timeout_add(1, self.run_tdls)
2192 gobject.timeout_add(15000, self.timeout)
2193 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
2194 "PropertiesChanged")
2195 self.loop.run()
2196 return self
2197
2198 def propertiesChanged(self, properties):
2199 logger.debug("propertiesChanged: %s" % str(properties))
2200
2201 def run_tdls(self, *args):
2202 logger.debug("run_tdls")
2203 iface.TDLSDiscover(addr1)
2204 gobject.timeout_add(100, self.run_tdls2)
2205 return False
2206
2207 def run_tdls2(self, *args):
2208 logger.debug("run_tdls2")
2209 iface.TDLSSetup(addr1)
2210 gobject.timeout_add(500, self.run_tdls3)
2211 return False
2212
2213 def run_tdls3(self, *args):
2214 logger.debug("run_tdls3")
2215 res = iface.TDLSStatus(addr1)
2216 if res == "connected":
2217 self.tdls_setup = True
2218 else:
2219 logger.info("Unexpected TDLSStatus: " + res)
2220
2221 # Unknown dict entry
2222 args = dbus.Dictionary({ 'Foobar': dbus.Byte(1) },
2223 signature='sv')
2224 try:
2225 iface.TDLSChannelSwitch(args)
2226 except Exception, e:
2227 if "InvalidArgs" not in str(e):
2228 raise Exception("Unexpected exception")
2229
2230 # Missing OperClass
2231 args = dbus.Dictionary({}, signature='sv')
2232 try:
2233 iface.TDLSChannelSwitch(args)
2234 except Exception, e:
2235 if "InvalidArgs" not in str(e):
2236 raise Exception("Unexpected exception")
2237
2238 # Missing Frequency
2239 args = dbus.Dictionary({ 'OperClass': dbus.Byte(1) },
2240 signature='sv')
2241 try:
2242 iface.TDLSChannelSwitch(args)
2243 except Exception, e:
2244 if "InvalidArgs" not in str(e):
2245 raise Exception("Unexpected exception")
2246
2247 # Missing PeerAddress
2248 args = dbus.Dictionary({ 'OperClass': dbus.Byte(1),
2249 'Frequency': dbus.UInt32(2417) },
2250 signature='sv')
2251 try:
2252 iface.TDLSChannelSwitch(args)
2253 except Exception, e:
2254 if "InvalidArgs" not in str(e):
2255 raise Exception("Unexpected exception")
2256
2257 # Valid parameters
2258 args = dbus.Dictionary({ 'OperClass': dbus.Byte(1),
2259 'Frequency': dbus.UInt32(2417),
2260 'PeerAddress': addr1,
2261 'SecChannelOffset': dbus.UInt32(0),
2262 'CenterFrequency1': dbus.UInt32(0),
2263 'CenterFrequency2': dbus.UInt32(0),
2264 'Bandwidth': dbus.UInt32(20),
2265 'HT': dbus.Boolean(False),
2266 'VHT': dbus.Boolean(False) },
2267 signature='sv')
2268 iface.TDLSChannelSwitch(args)
2269
2270 gobject.timeout_add(200, self.run_tdls4)
2271 return False
2272
2273 def run_tdls4(self, *args):
2274 logger.debug("run_tdls4")
2275 iface.TDLSCancelChannelSwitch(addr1)
2276 self.tdls_done = True
2277 self.loop.quit()
2278 return False
2279
2280 def success(self):
2281 return self.tdls_setup and self.tdls_done
2282
2283 with TestDbusTdls(bus) as t:
2284 if not t.success():
2285 raise Exception("Expected signals not seen")
2286
2ec82e67
JM
2287def test_dbus_pkcs11(dev, apdev):
2288 """D-Bus SetPKCS11EngineAndModulePath()"""
910eb5b5 2289 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2290 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2291
2292 try:
2293 iface.SetPKCS11EngineAndModulePath("foo", "bar")
2294 except dbus.exceptions.DBusException, e:
2295 if "Error.Failed: Reinit of the EAPOL" not in str(e):
2296 raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e))
2297
2298 try:
2299 iface.SetPKCS11EngineAndModulePath("foo", "")
2300 except dbus.exceptions.DBusException, e:
2301 if "Error.Failed: Reinit of the EAPOL" not in str(e):
2302 raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e))
2303
2304 iface.SetPKCS11EngineAndModulePath("", "bar")
2305 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath",
2306 dbus_interface=dbus.PROPERTIES_IFACE)
2307 if res != "":
2308 raise Exception("Unexpected PKCS11EnginePath value: " + res)
2309 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath",
2310 dbus_interface=dbus.PROPERTIES_IFACE)
2311 if res != "bar":
2312 raise Exception("Unexpected PKCS11ModulePath value: " + res)
2313
2314 iface.SetPKCS11EngineAndModulePath("", "")
2315 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath",
2316 dbus_interface=dbus.PROPERTIES_IFACE)
2317 if res != "":
2318 raise Exception("Unexpected PKCS11EnginePath value: " + res)
2319 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath",
2320 dbus_interface=dbus.PROPERTIES_IFACE)
2321 if res != "":
2322 raise Exception("Unexpected PKCS11ModulePath value: " + res)
2323
2324def test_dbus_apscan(dev, apdev):
2325 """D-Bus Get/Set ApScan"""
2326 try:
81e787b7 2327 _test_dbus_apscan(dev, apdev)
2ec82e67
JM
2328 finally:
2329 dev[0].request("AP_SCAN 1")
2330
2331def _test_dbus_apscan(dev, apdev):
910eb5b5 2332 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2333
2334 res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan",
2335 dbus_interface=dbus.PROPERTIES_IFACE)
2336 if res != 1:
2337 raise Exception("Unexpected initial ApScan value: %d" % res)
2338
2339 for i in range(3):
2340 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(i),
2341 dbus_interface=dbus.PROPERTIES_IFACE)
2342 res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan",
2343 dbus_interface=dbus.PROPERTIES_IFACE)
2344 if res != i:
2345 raise Exception("Unexpected ApScan value %d (expected %d)" % (res, i))
2346
2347 try:
2348 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.Int16(-1),
2349 dbus_interface=dbus.PROPERTIES_IFACE)
2350 raise Exception("Invalid Set(ApScan,-1) accepted")
2351 except dbus.exceptions.DBusException, e:
2352 if "Error.Failed: wrong property type" not in str(e):
2353 raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e))
2354
2355 try:
2356 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(123),
2357 dbus_interface=dbus.PROPERTIES_IFACE)
2358 raise Exception("Invalid Set(ApScan,123) accepted")
2359 except dbus.exceptions.DBusException, e:
2360 if "Error.Failed: ap_scan must be 0, 1, or 2" not in str(e):
2361 raise Exception("Unexpected error message for invalid Set(ApScan,123): " + str(e))
2362
2363 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(1),
2364 dbus_interface=dbus.PROPERTIES_IFACE)
2365
76055b4c
JM
2366def test_dbus_pmf(dev, apdev):
2367 """D-Bus Get/Set Pmf"""
2368 try:
2369 _test_dbus_pmf(dev, apdev)
2370 finally:
2371 dev[0].request("SET pmf 0")
2372
2373def _test_dbus_pmf(dev, apdev):
2374 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2375
2376 dev[0].set("pmf", "0")
2377 res = if_obj.Get(WPAS_DBUS_IFACE, "Pmf",
2378 dbus_interface=dbus.PROPERTIES_IFACE)
2379 if res != 0:
2380 raise Exception("Unexpected initial Pmf value: %d" % res)
2381
2382 for i in range(3):
2383 if_obj.Set(WPAS_DBUS_IFACE, "Pmf", dbus.UInt32(i),
2384 dbus_interface=dbus.PROPERTIES_IFACE)
2385 res = if_obj.Get(WPAS_DBUS_IFACE, "Pmf",
2386 dbus_interface=dbus.PROPERTIES_IFACE)
2387 if res != i:
2388 raise Exception("Unexpected Pmf value %d (expected %d)" % (res, i))
2389
2390 try:
2391 if_obj.Set(WPAS_DBUS_IFACE, "Pmf", dbus.Int16(-1),
2392 dbus_interface=dbus.PROPERTIES_IFACE)
2393 raise Exception("Invalid Set(Pmf,-1) accepted")
2394 except dbus.exceptions.DBusException, e:
2395 if "Error.Failed: wrong property type" not in str(e):
2396 raise Exception("Unexpected error message for invalid Set(Pmf,-1): " + str(e))
2397
2398 try:
2399 if_obj.Set(WPAS_DBUS_IFACE, "Pmf", dbus.UInt32(123),
2400 dbus_interface=dbus.PROPERTIES_IFACE)
2401 raise Exception("Invalid Set(Pmf,123) accepted")
2402 except dbus.exceptions.DBusException, e:
2403 if "Error.Failed: Pmf must be 0, 1, or 2" not in str(e):
2404 raise Exception("Unexpected error message for invalid Set(Pmf,123): " + str(e))
2405
2406 if_obj.Set(WPAS_DBUS_IFACE, "Pmf", dbus.UInt32(1),
2407 dbus_interface=dbus.PROPERTIES_IFACE)
2408
2ec82e67
JM
2409def test_dbus_fastreauth(dev, apdev):
2410 """D-Bus Get/Set FastReauth"""
910eb5b5 2411 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2412
2413 res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth",
2414 dbus_interface=dbus.PROPERTIES_IFACE)
2415 if res != True:
2416 raise Exception("Unexpected initial FastReauth value: " + str(res))
2417
2418 for i in [ False, True ]:
2419 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(i),
2420 dbus_interface=dbus.PROPERTIES_IFACE)
2421 res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth",
2422 dbus_interface=dbus.PROPERTIES_IFACE)
2423 if res != i:
2424 raise Exception("Unexpected FastReauth value %d (expected %d)" % (res, i))
2425
2426 try:
2427 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Int16(-1),
2428 dbus_interface=dbus.PROPERTIES_IFACE)
2429 raise Exception("Invalid Set(FastReauth,-1) accepted")
2430 except dbus.exceptions.DBusException, e:
2431 if "Error.Failed: wrong property type" not in str(e):
2432 raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e))
2433
2434 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(True),
2435 dbus_interface=dbus.PROPERTIES_IFACE)
2436
2437def test_dbus_bss_expire(dev, apdev):
2438 """D-Bus Get/Set BSSExpireAge and BSSExpireCount"""
910eb5b5 2439 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2440
2441 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(179),
2442 dbus_interface=dbus.PROPERTIES_IFACE)
2443 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireAge",
2444 dbus_interface=dbus.PROPERTIES_IFACE)
2445 if res != 179:
2446 raise Exception("Unexpected BSSExpireAge value %d (expected %d)" % (res, i))
2447
2448 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(3),
2449 dbus_interface=dbus.PROPERTIES_IFACE)
2450 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireCount",
2451 dbus_interface=dbus.PROPERTIES_IFACE)
2452 if res != 3:
2453 raise Exception("Unexpected BSSExpireCount value %d (expected %d)" % (res, i))
2454
2455 try:
2456 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.Int16(-1),
2457 dbus_interface=dbus.PROPERTIES_IFACE)
2458 raise Exception("Invalid Set(BSSExpireAge,-1) accepted")
2459 except dbus.exceptions.DBusException, e:
2460 if "Error.Failed: wrong property type" not in str(e):
2461 raise Exception("Unexpected error message for invalid Set(BSSExpireAge,-1): " + str(e))
2462
2463 try:
2464 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(9),
2465 dbus_interface=dbus.PROPERTIES_IFACE)
2466 raise Exception("Invalid Set(BSSExpireAge,9) accepted")
2467 except dbus.exceptions.DBusException, e:
2468 if "Error.Failed: BSSExpireAge must be >= 10" not in str(e):
2469 raise Exception("Unexpected error message for invalid Set(BSSExpireAge,9): " + str(e))
2470
2471 try:
2472 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.Int16(-1),
2473 dbus_interface=dbus.PROPERTIES_IFACE)
2474 raise Exception("Invalid Set(BSSExpireCount,-1) accepted")
2475 except dbus.exceptions.DBusException, e:
2476 if "Error.Failed: wrong property type" not in str(e):
2477 raise Exception("Unexpected error message for invalid Set(BSSExpireCount,-1): " + str(e))
2478
2479 try:
2480 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(0),
2481 dbus_interface=dbus.PROPERTIES_IFACE)
2482 raise Exception("Invalid Set(BSSExpireCount,0) accepted")
2483 except dbus.exceptions.DBusException, e:
2484 if "Error.Failed: BSSExpireCount must be > 0" not in str(e):
2485 raise Exception("Unexpected error message for invalid Set(BSSExpireCount,0): " + str(e))
2486
2487 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(180),
2488 dbus_interface=dbus.PROPERTIES_IFACE)
2489 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(2),
2490 dbus_interface=dbus.PROPERTIES_IFACE)
2491
2492def test_dbus_country(dev, apdev):
2493 """D-Bus Get/Set Country"""
2494 try:
81e787b7 2495 _test_dbus_country(dev, apdev)
2ec82e67
JM
2496 finally:
2497 dev[0].request("SET country 00")
2498 subprocess.call(['iw', 'reg', 'set', '00'])
2499
2500def _test_dbus_country(dev, apdev):
910eb5b5 2501 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2502
2503 # work around issues with possible pending regdom event from the end of
2504 # the previous test case
2505 time.sleep(0.2)
2506 dev[0].dump_monitor()
2507
2508 if_obj.Set(WPAS_DBUS_IFACE, "Country", "FI",
2509 dbus_interface=dbus.PROPERTIES_IFACE)
2510 res = if_obj.Get(WPAS_DBUS_IFACE, "Country",
2511 dbus_interface=dbus.PROPERTIES_IFACE)
2512 if res != "FI":
2513 raise Exception("Unexpected Country value %s (expected FI)" % res)
2514
2515 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
2516 if ev is None:
cb346b49
JM
2517 # For now, work around separate P2P Device interface event delivery
2518 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
2519 if ev is None:
2520 raise Exception("regdom change event not seen")
2ec82e67
JM
2521 if "init=USER type=COUNTRY alpha2=FI" not in ev:
2522 raise Exception("Unexpected event contents: " + ev)
2523
2524 try:
2525 if_obj.Set(WPAS_DBUS_IFACE, "Country", dbus.Int16(-1),
2526 dbus_interface=dbus.PROPERTIES_IFACE)
2527 raise Exception("Invalid Set(Country,-1) accepted")
2528 except dbus.exceptions.DBusException, e:
2529 if "Error.Failed: wrong property type" not in str(e):
2530 raise Exception("Unexpected error message for invalid Set(Country,-1): " + str(e))
2531
2532 try:
2533 if_obj.Set(WPAS_DBUS_IFACE, "Country", "F",
2534 dbus_interface=dbus.PROPERTIES_IFACE)
2535 raise Exception("Invalid Set(Country,F) accepted")
2536 except dbus.exceptions.DBusException, e:
2537 if "Error.Failed: invalid country code" not in str(e):
2538 raise Exception("Unexpected error message for invalid Set(Country,F): " + str(e))
2539
2540 if_obj.Set(WPAS_DBUS_IFACE, "Country", "00",
2541 dbus_interface=dbus.PROPERTIES_IFACE)
2542
2543 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
2544 if ev is None:
cb346b49
JM
2545 # For now, work around separate P2P Device interface event delivery
2546 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
2547 if ev is None:
2548 raise Exception("regdom change event not seen")
be90370b
JM
2549 # init=CORE was previously used due to invalid db.txt data for 00. For
2550 # now, allow both it and the new init=USER after fixed db.txt.
2551 if "init=CORE type=WORLD" not in ev and "init=USER type=WORLD" not in ev:
2ec82e67
JM
2552 raise Exception("Unexpected event contents: " + ev)
2553
2554def test_dbus_scan_interval(dev, apdev):
2555 """D-Bus Get/Set ScanInterval"""
2556 try:
2557 _test_dbus_scan_interval(dev, apdev)
2558 finally:
2559 dev[0].request("SCAN_INTERVAL 5")
2560
2561def _test_dbus_scan_interval(dev, apdev):
910eb5b5 2562 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2563
2564 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(3),
2565 dbus_interface=dbus.PROPERTIES_IFACE)
2566 res = if_obj.Get(WPAS_DBUS_IFACE, "ScanInterval",
2567 dbus_interface=dbus.PROPERTIES_IFACE)
2568 if res != 3:
2569 raise Exception("Unexpected ScanInterval value %d (expected %d)" % (res, i))
2570
2571 try:
2572 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.UInt16(100),
2573 dbus_interface=dbus.PROPERTIES_IFACE)
2574 raise Exception("Invalid Set(ScanInterval,100) accepted")
2575 except dbus.exceptions.DBusException, e:
2576 if "Error.Failed: wrong property type" not in str(e):
2577 raise Exception("Unexpected error message for invalid Set(ScanInterval,100): " + str(e))
2578
2579 try:
2580 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(-1),
2581 dbus_interface=dbus.PROPERTIES_IFACE)
2582 raise Exception("Invalid Set(ScanInterval,-1) accepted")
2583 except dbus.exceptions.DBusException, e:
2584 if "Error.Failed: scan_interval must be >= 0" not in str(e):
2585 raise Exception("Unexpected error message for invalid Set(ScanInterval,-1): " + str(e))
2586
2587 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(5),
2588 dbus_interface=dbus.PROPERTIES_IFACE)
2589
2590def test_dbus_probe_req_reporting(dev, apdev):
2591 """D-Bus Probe Request reporting"""
910eb5b5 2592 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67 2593
2ec82e67
JM
2594 dev[1].p2p_find(social=True)
2595
2596 class TestDbusProbe(TestDbus):
2597 def __init__(self, bus):
2598 TestDbus.__init__(self, bus)
2599 self.reported = False
2600
2601 def __enter__(self):
2602 gobject.timeout_add(1, self.run_test)
2603 gobject.timeout_add(15000, self.timeout)
cb346b49
JM
2604 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
2605 "GroupStarted")
2ec82e67
JM
2606 self.add_signal(self.probeRequest, WPAS_DBUS_IFACE, "ProbeRequest",
2607 byte_arrays=True)
2ec82e67
JM
2608 self.loop.run()
2609 return self
2610
cb346b49
JM
2611 def groupStarted(self, properties):
2612 logger.debug("groupStarted: " + str(properties))
2613 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
2614 properties['interface_object'])
2615 self.iface = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE)
2616 self.iface.SubscribeProbeReq()
2617 self.group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2618
2ec82e67
JM
2619 def probeRequest(self, args):
2620 logger.debug("probeRequest: args=%s" % str(args))
2621 self.reported = True
2622 self.loop.quit()
2623
2624 def run_test(self, *args):
2625 logger.debug("run_test")
cb346b49
JM
2626 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2627 params = dbus.Dictionary({ 'frequency': 2412 })
2628 p2p.GroupAdd(params)
2ec82e67
JM
2629 return False
2630
2631 def success(self):
2632 return self.reported
2633
2634 with TestDbusProbe(bus) as t:
2635 if not t.success():
2636 raise Exception("Expected signals not seen")
cb346b49
JM
2637 t.iface.UnsubscribeProbeReq()
2638 try:
2639 t.iface.UnsubscribeProbeReq()
2640 raise Exception("Invalid UnsubscribeProbeReq() accepted")
2641 except dbus.exceptions.DBusException, e:
2642 if "NoSubscription" not in str(e):
2643 raise Exception("Unexpected error message for invalid UnsubscribeProbeReq(): " + str(e))
2644 t.group_p2p.Disconnect()
2ec82e67
JM
2645
2646 with TestDbusProbe(bus) as t:
2647 if not t.success():
2648 raise Exception("Expected signals not seen")
2649 # On purpose, leave ProbeReq subscription in place to test automatic
2650 # cleanup.
2651
2652 dev[1].p2p_stop_find()
2ec82e67 2653
0e126c6d
JM
2654def test_dbus_probe_req_reporting_oom(dev, apdev):
2655 """D-Bus Probe Request reporting (OOM)"""
2656 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2657 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2658
fb9adae4
JM
2659 # Need to make sure this process has not already subscribed to avoid false
2660 # failures due to the operation succeeding due to os_strdup() not even
2661 # getting called.
2662 try:
2663 iface.UnsubscribeProbeReq()
2664 was_subscribed = True
2665 except dbus.exceptions.DBusException, e:
2666 was_subscribed = False
2667 pass
2668
0e126c6d
JM
2669 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_subscribe_preq",
2670 "SubscribeProbeReq"):
2671 iface.SubscribeProbeReq()
2672
fb9adae4
JM
2673 if was_subscribed:
2674 # On purpose, leave ProbeReq subscription in place to test automatic
2675 # cleanup.
2676 iface.SubscribeProbeReq()
2677
2ec82e67
JM
2678def test_dbus_p2p_invalid(dev, apdev):
2679 """D-Bus invalid P2P operations"""
910eb5b5 2680 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2681 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2682
2683 try:
2684 p2p.RejectPeer(path + "/Peers/00112233445566")
2685 raise Exception("Invalid RejectPeer accepted")
2686 except dbus.exceptions.DBusException, e:
2687 if "UnknownError: Failed to call wpas_p2p_reject" not in str(e):
2688 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2689
2690 try:
2691 p2p.RejectPeer("/foo")
2692 raise Exception("Invalid RejectPeer accepted")
2693 except dbus.exceptions.DBusException, e:
2694 if "InvalidArgs" not in str(e):
2695 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2696
001c4bf5
JM
2697 tests = [ { },
2698 { 'peer': 'foo' },
2699 { 'foo': "bar" },
2700 { 'iface': "abc" },
2701 { 'iface': 123 } ]
2702 for t in tests:
2703 try:
2704 p2p.RemoveClient(t)
2705 raise Exception("Invalid RemoveClient accepted")
2706 except dbus.exceptions.DBusException, e:
2707 if "InvalidArgs" not in str(e):
2708 raise Exception("Unexpected error message for invalid RemoveClient(): " + str(e))
2709
2ec82e67
JM
2710 tests = [ {'DiscoveryType': 'foo'},
2711 {'RequestedDeviceTypes': 'foo'},
2712 {'RequestedDeviceTypes': ['foo']},
2713 {'RequestedDeviceTypes': ['1','2','3','4','5','6','7','8','9',
2714 '10','11','12','13','14','15','16',
2715 '17']},
2716 {'RequestedDeviceTypes': dbus.Array([], signature="s")},
2717 {'RequestedDeviceTypes': dbus.Array([['foo']], signature="as")},
2718 {'RequestedDeviceTypes': dbus.Array([], signature="i")},
2719 {'RequestedDeviceTypes': [dbus.ByteArray('12345678'),
2720 dbus.ByteArray('1234567')]},
2721 {'Foo': dbus.Int16(1)},
2722 {'Foo': dbus.UInt16(1)},
2723 {'Foo': dbus.Int64(1)},
2724 {'Foo': dbus.UInt64(1)},
2725 {'Foo': dbus.Double(1.23)},
2726 {'Foo': dbus.Signature('s')},
2727 {'Foo': 'bar'}]
2728 for t in tests:
2729 try:
2730 p2p.Find(dbus.Dictionary(t))
2731 raise Exception("Invalid Find accepted")
2732 except dbus.exceptions.DBusException, e:
2733 if "InvalidArgs" not in str(e):
2734 raise Exception("Unexpected error message for invalid Find(): " + str(e))
2735
2736 for p in [ "/foo",
2737 "/fi/w1/wpa_supplicant1/Interfaces/1234",
2738 "/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234" ]:
2739 try:
2740 p2p.RemovePersistentGroup(dbus.ObjectPath(p))
2741 raise Exception("Invalid RemovePersistentGroup accepted")
2742 except dbus.exceptions.DBusException, e:
2743 if "InvalidArgs" not in str(e):
2744 raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e))
2745
2746 try:
2747 dev[0].request("P2P_SET disabled 1")
2748 p2p.Listen(5)
2749 raise Exception("Invalid Listen accepted")
2750 except dbus.exceptions.DBusException, e:
2751 if "UnknownError: Could not start P2P listen" not in str(e):
2752 raise Exception("Unexpected error message for invalid Listen: " + str(e))
2753 finally:
2754 dev[0].request("P2P_SET disabled 0")
2755
2756 test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False)
2757 test_p2p = dbus.Interface(test_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2758 try:
2759 test_p2p.Listen("foo")
2760 raise Exception("Invalid Listen accepted")
2761 except dbus.exceptions.DBusException, e:
2762 if "InvalidArgs" not in str(e):
2763 raise Exception("Unexpected error message for invalid Listen: " + str(e))
2764
2765 try:
2766 dev[0].request("P2P_SET disabled 1")
2767 p2p.ExtendedListen(dbus.Dictionary({}))
2768 raise Exception("Invalid ExtendedListen accepted")
2769 except dbus.exceptions.DBusException, e:
2770 if "UnknownError: failed to initiate a p2p_ext_listen" not in str(e):
2771 raise Exception("Unexpected error message for invalid ExtendedListen: " + str(e))
2772 finally:
2773 dev[0].request("P2P_SET disabled 0")
2774
2775 try:
2776 dev[0].request("P2P_SET disabled 1")
2777 args = { 'duration1': 30000, 'interval1': 102400,
2778 'duration2': 20000, 'interval2': 102400 }
2779 p2p.PresenceRequest(args)
2780 raise Exception("Invalid PresenceRequest accepted")
2781 except dbus.exceptions.DBusException, e:
2782 if "UnknownError: Failed to invoke presence request" not in str(e):
2783 raise Exception("Unexpected error message for invalid PresenceRequest: " + str(e))
2784 finally:
2785 dev[0].request("P2P_SET disabled 0")
2786
2787 try:
2788 params = dbus.Dictionary({'frequency': dbus.Int32(-1)})
2789 p2p.GroupAdd(params)
2790 raise Exception("Invalid GroupAdd accepted")
2791 except dbus.exceptions.DBusException, e:
2792 if "InvalidArgs" not in str(e):
2793 raise Exception("Unexpected error message for invalid GroupAdd: " + str(e))
2794
2795 try:
2796 params = dbus.Dictionary({'persistent_group_object':
2797 dbus.ObjectPath(path),
2798 'frequency': 2412})
2799 p2p.GroupAdd(params)
2800 raise Exception("Invalid GroupAdd accepted")
2801 except dbus.exceptions.DBusException, e:
2802 if "InvalidArgs" not in str(e):
2803 raise Exception("Unexpected error message for invalid GroupAdd: " + str(e))
2804
2805 try:
2806 p2p.Disconnect()
2807 raise Exception("Invalid Disconnect accepted")
2808 except dbus.exceptions.DBusException, e:
2809 if "UnknownError: failed to disconnect" not in str(e):
2810 raise Exception("Unexpected error message for invalid Disconnect: " + str(e))
2811
2812 try:
2813 dev[0].request("P2P_SET disabled 1")
2814 p2p.Flush()
2815 raise Exception("Invalid Flush accepted")
2816 except dbus.exceptions.DBusException, e:
2817 if "Error.Failed: P2P is not available for this interface" not in str(e):
2818 raise Exception("Unexpected error message for invalid Flush: " + str(e))
2819 finally:
2820 dev[0].request("P2P_SET disabled 0")
2821
2822 try:
2823 dev[0].request("P2P_SET disabled 1")
2824 args = { 'peer': path,
2825 'join': True,
2826 'wps_method': 'pbc',
2827 'frequency': 2412 }
2828 pin = p2p.Connect(args)
2829 raise Exception("Invalid Connect accepted")
2830 except dbus.exceptions.DBusException, e:
2831 if "Error.Failed: P2P is not available for this interface" not in str(e):
2832 raise Exception("Unexpected error message for invalid Connect: " + str(e))
2833 finally:
2834 dev[0].request("P2P_SET disabled 0")
2835
2836 tests = [ { 'frequency': dbus.Int32(-1) },
2837 { 'wps_method': 'pbc' },
2838 { 'wps_method': 'foo' } ]
2839 for args in tests:
2840 try:
2841 pin = p2p.Connect(args)
2842 raise Exception("Invalid Connect accepted")
2843 except dbus.exceptions.DBusException, e:
2844 if "InvalidArgs" not in str(e):
2845 raise Exception("Unexpected error message for invalid Connect: " + str(e))
2846
2847 try:
2848 dev[0].request("P2P_SET disabled 1")
2849 args = { 'peer': path }
2850 pin = p2p.Invite(args)
2851 raise Exception("Invalid Invite accepted")
2852 except dbus.exceptions.DBusException, e:
2853 if "Error.Failed: P2P is not available for this interface" not in str(e):
2854 raise Exception("Unexpected error message for invalid Invite: " + str(e))
2855 finally:
2856 dev[0].request("P2P_SET disabled 0")
2857
2858 try:
2859 args = { 'foo': 'bar' }
2860 pin = p2p.Invite(args)
2861 raise Exception("Invalid Invite accepted")
2862 except dbus.exceptions.DBusException, e:
2863 if "InvalidArgs" not in str(e):
2864 raise Exception("Unexpected error message for invalid Connect: " + str(e))
2865
2866 tests = [ (path, 'display', "InvalidArgs"),
2867 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2868 'display',
2869 "UnknownError: Failed to send provision discovery request"),
2870 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2871 'keypad',
2872 "UnknownError: Failed to send provision discovery request"),
2873 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2874 'pbc',
2875 "UnknownError: Failed to send provision discovery request"),
2876 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2877 'pushbutton',
2878 "UnknownError: Failed to send provision discovery request"),
2879 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2880 'foo', "InvalidArgs") ]
2881 for (p,method,err) in tests:
2882 try:
2883 p2p.ProvisionDiscoveryRequest(p, method)
2884 raise Exception("Invalid ProvisionDiscoveryRequest accepted")
2885 except dbus.exceptions.DBusException, e:
2886 if err not in str(e):
2887 raise Exception("Unexpected error message for invalid ProvisionDiscoveryRequest: " + str(e))
2888
2889 try:
2890 dev[0].request("P2P_SET disabled 1")
2891 if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers",
2892 dbus_interface=dbus.PROPERTIES_IFACE)
2893 raise Exception("Invalid Get(Peers) accepted")
2894 except dbus.exceptions.DBusException, e:
2895 if "Error.Failed: P2P is not available for this interface" not in str(e):
2896 raise Exception("Unexpected error message for invalid Get(Peers): " + str(e))
2897 finally:
2898 dev[0].request("P2P_SET disabled 0")
2899
0e126c6d
JM
2900def test_dbus_p2p_oom(dev, apdev):
2901 """D-Bus P2P operations and OOM"""
2902 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2903 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2904
2905 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_string_array",
2906 "Find", "InvalidArgs"):
2907 p2p.Find(dbus.Dictionary({ 'Foo': [ 'bar' ] }))
2908
2909 with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_string_array",
2910 "Find", "InvalidArgs"):
2911 p2p.Find(dbus.Dictionary({ 'Foo': [ 'bar' ] }))
2912
2913 with alloc_fail_dbus(dev[0], 10, "_wpa_dbus_dict_entry_get_string_array",
2914 "Find", "InvalidArgs"):
2915 p2p.Find(dbus.Dictionary({ 'Foo': [ '1','2','3','4','5','6','7','8','9' ] }))
2916
2917 with alloc_fail_dbus(dev[0], 1, ":=_wpa_dbus_dict_entry_get_binarray",
2918 "Find", "InvalidArgs"):
2919 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] }))
2920
2921 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array;_wpa_dbus_dict_entry_get_binarray",
2922 "Find", "InvalidArgs"):
2923 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] }))
2924
2925 with alloc_fail_dbus(dev[0], 2, "=_wpa_dbus_dict_entry_get_binarray",
2926 "Find", "InvalidArgs"):
2927 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123'),
2928 dbus.ByteArray('123'),
2929 dbus.ByteArray('123'),
2930 dbus.ByteArray('123'),
2931 dbus.ByteArray('123'),
2932 dbus.ByteArray('123'),
2933 dbus.ByteArray('123'),
2934 dbus.ByteArray('123'),
2935 dbus.ByteArray('123'),
2936 dbus.ByteArray('123'),
2937 dbus.ByteArray('123') ] }))
2938
2939 with alloc_fail_dbus(dev[0], 1, "wpabuf_alloc_ext_data;_wpa_dbus_dict_entry_get_binarray",
2940 "Find", "InvalidArgs"):
2941 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] }))
2942
2943 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_handler_p2p_find",
2944 "Find", "InvalidArgs"):
2945 p2p.Find(dbus.Dictionary({ 'Foo': path }))
2946
2947 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array",
2948 "AddService", "InvalidArgs"):
2949 args = { 'service_type': 'bonjour',
2950 'response': dbus.ByteArray(500*'b') }
2951 p2p.AddService(args)
2952
2953 with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_byte_array",
2954 "AddService", "InvalidArgs"):
2955 p2p.AddService(args)
2956
2ec82e67
JM
2957def test_dbus_p2p_discovery(dev, apdev):
2958 """D-Bus P2P discovery"""
910eb5b5 2959 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
2960 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2961
2962 addr0 = dev[0].p2p_dev_addr()
2963
2964 dev[1].request("SET sec_device_type 1-0050F204-2")
2965 dev[1].request("VENDOR_ELEM_ADD 1 dd0c0050f2041049000411223344")
2966 dev[1].p2p_listen()
2967 addr1 = dev[1].p2p_dev_addr()
2968 a1 = binascii.unhexlify(addr1.replace(':',''))
2969
2970 wfd_devinfo = "00001c440028"
2971 dev[2].request("SET wifi_display 1")
2972 dev[2].request("WFD_SUBELEM_SET 0 0006" + wfd_devinfo)
2973 wfd = binascii.unhexlify('000006' + wfd_devinfo)
2974 dev[2].p2p_listen()
2975 addr2 = dev[2].p2p_dev_addr()
2976 a2 = binascii.unhexlify(addr2.replace(':',''))
2977
2978 res = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
2979 dbus_interface=dbus.PROPERTIES_IFACE)
2980 if 'Peers' not in res:
2981 raise Exception("GetAll result missing Peers")
2982 if len(res['Peers']) != 0:
2983 raise Exception("Unexpected peer(s) in the list")
2984
2985 args = {'DiscoveryType': 'social',
2986 'RequestedDeviceTypes': [dbus.ByteArray('12345678')],
2987 'Timeout': dbus.Int32(1) }
2988 p2p.Find(dbus.Dictionary(args))
2989 p2p.StopFind()
2990
2991 class TestDbusP2p(TestDbus):
2992 def __init__(self, bus):
2993 TestDbus.__init__(self, bus)
2994 self.found = False
2995 self.found2 = False
e7d454bb 2996 self.found_prop = False
2ec82e67 2997 self.lost = False
571a1af2 2998 self.find_stopped = False
2ec82e67
JM
2999
3000 def __enter__(self):
3001 gobject.timeout_add(1, self.run_test)
3002 gobject.timeout_add(15000, self.timeout)
3003 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3004 "DeviceFound")
e7d454bb
JM
3005 self.add_signal(self.deviceFoundProperties,
3006 WPAS_DBUS_IFACE_P2PDEVICE, "DeviceFoundProperties")
2ec82e67
JM
3007 self.add_signal(self.deviceLost, WPAS_DBUS_IFACE_P2PDEVICE,
3008 "DeviceLost")
3009 self.add_signal(self.provisionDiscoveryResponseEnterPin,
3010 WPAS_DBUS_IFACE_P2PDEVICE,
3011 "ProvisionDiscoveryResponseEnterPin")
571a1af2
JM
3012 self.add_signal(self.findStopped, WPAS_DBUS_IFACE_P2PDEVICE,
3013 "FindStopped")
2ec82e67
JM
3014 self.loop.run()
3015 return self
3016
3017 def deviceFound(self, path):
3018 logger.debug("deviceFound: path=%s" % path)
3019 res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers",
3020 dbus_interface=dbus.PROPERTIES_IFACE)
3021 if len(res) < 1:
3022 raise Exception("Unexpected number of peers")
3023 if path not in res:
3024 raise Exception("Mismatch in peer object path")
3025 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3026 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3027 dbus_interface=dbus.PROPERTIES_IFACE,
3028 byte_arrays=True)
3029 logger.debug("peer properties: " + str(res))
3030
3031 if res['DeviceAddress'] == a1:
3032 if 'SecondaryDeviceTypes' not in res:
3033 raise Exception("Missing SecondaryDeviceTypes")
3034 sec = res['SecondaryDeviceTypes']
3035 if len(sec) < 1:
3036 raise Exception("Secondary device type missing")
3037 if "\x00\x01\x00\x50\xF2\x04\x00\x02" not in sec:
3038 raise Exception("Secondary device type mismatch")
3039
3040 if 'VendorExtension' not in res:
3041 raise Exception("Missing VendorExtension")
3042 vendor = res['VendorExtension']
3043 if len(vendor) < 1:
3044 raise Exception("Vendor extension missing")
3045 if "\x11\x22\x33\x44" not in vendor:
3046 raise Exception("Secondary device type mismatch")
3047
3048 self.found = True
3049 elif res['DeviceAddress'] == a2:
3050 if 'IEs' not in res:
3051 raise Exception("IEs missing")
3052 if res['IEs'] != wfd:
3053 raise Exception("IEs mismatch")
3054 self.found2 = True
3055 else:
3056 raise Exception("Unexpected peer device address")
3057
3058 if self.found and self.found2:
3059 p2p.StopFind()
3060 p2p.RejectPeer(path)
3061 p2p.ProvisionDiscoveryRequest(path, 'display')
3062
3063 def deviceLost(self, path):
3064 logger.debug("deviceLost: path=%s" % path)
50d7cded
JM
3065 if not self.found or not self.found2:
3066 # This may happen if a previous test case ended up scheduling
3067 # deviceLost event and that event did not get delivered before
3068 # starting the next test execution.
3069 logger.debug("Ignore deviceLost before the deviceFound events")
3070 return
2ec82e67
JM
3071 self.lost = True
3072 try:
3073 p2p.RejectPeer(path)
3074 raise Exception("Invalid RejectPeer accepted")
3075 except dbus.exceptions.DBusException, e:
3076 if "UnknownError: Failed to call wpas_p2p_reject" not in str(e):
3077 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
3078 self.loop.quit()
3079
e7d454bb
JM
3080 def deviceFoundProperties(self, path, properties):
3081 logger.debug("deviceFoundProperties: path=%s" % path)
3082 logger.debug("peer properties: " + str(properties))
3083 if properties['DeviceAddress'] == a1:
3084 self.found_prop = True
3085
2ec82e67
JM
3086 def provisionDiscoveryResponseEnterPin(self, peer_object):
3087 logger.debug("provisionDiscoveryResponseEnterPin - peer=%s" % peer_object)
3088 p2p.Flush()
3089
571a1af2
JM
3090 def findStopped(self):
3091 logger.debug("findStopped")
3092 self.find_stopped = True
3093
2ec82e67
JM
3094 def run_test(self, *args):
3095 logger.debug("run_test")
3096 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social',
3097 'Timeout': dbus.Int32(10)}))
3098 return False
3099
3100 def success(self):
571a1af2 3101 return self.found and self.lost and self.found2 and self.find_stopped
2ec82e67
JM
3102
3103 with TestDbusP2p(bus) as t:
3104 if not t.success():
3105 raise Exception("Expected signals not seen")
3106
3107 dev[1].request("VENDOR_ELEM_REMOVE 1 *")
3108 dev[1].p2p_stop_find()
3109
3110 p2p.Listen(1)
3111 dev[2].p2p_stop_find()
3112 dev[2].request("P2P_FLUSH")
3113 if not dev[2].discover_peer(addr0):
3114 raise Exception("Peer not found")
3115 p2p.StopFind()
3116 dev[2].p2p_stop_find()
3117
3118 try:
3119 p2p.ExtendedListen(dbus.Dictionary({'foo': 100}))
3120 raise Exception("Invalid ExtendedListen accepted")
3121 except dbus.exceptions.DBusException, e:
3122 if "InvalidArgs" not in str(e):
3123 raise Exception("Unexpected error message for invalid ExtendedListen(): " + str(e))
3124
3125 p2p.ExtendedListen(dbus.Dictionary({'period': 100, 'interval': 1000}))
3126 p2p.ExtendedListen(dbus.Dictionary({}))
3127 dev[0].global_request("P2P_EXT_LISTEN")
3128
6c44d97b
JM
3129def test_dbus_p2p_discovery_freq(dev, apdev):
3130 """D-Bus P2P discovery on a specific non-social channel"""
3131 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3132 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3133
3134 addr1 = dev[1].p2p_dev_addr()
3135 autogo(dev[1], freq=2422)
3136
3137 class TestDbusP2p(TestDbus):
3138 def __init__(self, bus):
3139 TestDbus.__init__(self, bus)
3140 self.found = False
3141
3142 def __enter__(self):
3143 gobject.timeout_add(1, self.run_test)
3144 gobject.timeout_add(5000, self.timeout)
3145 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3146 "DeviceFound")
3147 self.loop.run()
3148 return self
3149
3150 def deviceFound(self, path):
3151 logger.debug("deviceFound: path=%s" % path)
3152 self.found = True
3153 self.loop.quit()
3154
3155 def run_test(self, *args):
3156 logger.debug("run_test")
3157 p2p.Find(dbus.Dictionary({'freq': 2422}))
3158 return False
3159
3160 def success(self):
3161 return self.found
3162
3163 with TestDbusP2p(bus) as t:
3164 if not t.success():
3165 raise Exception("Expected signals not seen")
3166
3167 dev[1].remove_group()
3168 p2p.StopFind()
3169
2ec82e67
JM
3170def test_dbus_p2p_service_discovery(dev, apdev):
3171 """D-Bus P2P service discovery"""
910eb5b5 3172 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3173 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3174
3175 addr0 = dev[0].p2p_dev_addr()
3176 addr1 = dev[1].p2p_dev_addr()
3177
3178 bonjour_query = dbus.ByteArray(binascii.unhexlify('0b5f6166706f766572746370c00c000c01'))
3179 bonjour_response = dbus.ByteArray(binascii.unhexlify('074578616d706c65c027'))
db98b587 3180
2ec82e67
JM
3181 args = { 'service_type': 'bonjour',
3182 'query': bonjour_query,
3183 'response': bonjour_response }
3184 p2p.AddService(args)
3185 p2p.FlushService()
3186 p2p.AddService(args)
3187
3188 try:
3189 p2p.DeleteService(args)
3190 raise Exception("Invalid DeleteService() accepted")
3191 except dbus.exceptions.DBusException, e:
3192 if "InvalidArgs" not in str(e):
3193 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
3194
3195 args = { 'service_type': 'bonjour',
3196 'query': bonjour_query }
3197 p2p.DeleteService(args)
3198 try:
3199 p2p.DeleteService(args)
3200 raise Exception("Invalid DeleteService() accepted")
3201 except dbus.exceptions.DBusException, e:
3202 if "InvalidArgs" not in str(e):
3203 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
3204
3205 args = { 'service_type': 'upnp',
3206 'version': 0x10,
3207 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' }
3208 p2p.AddService(args)
3209 p2p.DeleteService(args)
3210 try:
3211 p2p.DeleteService(args)
3212 raise Exception("Invalid DeleteService() accepted")
3213 except dbus.exceptions.DBusException, e:
3214 if "InvalidArgs" not in str(e):
3215 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
3216
3217 tests = [ { 'service_type': 'foo' },
3218 { 'service_type': 'foo', 'query': bonjour_query },
3219 { 'service_type': 'upnp' },
3220 { 'service_type': 'upnp', 'version': 0x10 },
3221 { 'service_type': 'upnp',
3222 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
3223 { 'version': 0x10,
3224 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
3225 { 'service_type': 'upnp', 'foo': 'bar' },
3226 { 'service_type': 'bonjour' },
3227 { 'service_type': 'bonjour', 'query': 'foo' },
3228 { 'service_type': 'bonjour', 'foo': 'bar' } ]
3229 for args in tests:
3230 try:
3231 p2p.DeleteService(args)
3232 raise Exception("Invalid DeleteService() accepted")
3233 except dbus.exceptions.DBusException, e:
3234 if "InvalidArgs" not in str(e):
3235 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
3236
3237 tests = [ { 'service_type': 'foo' },
3238 { 'service_type': 'upnp' },
3239 { 'service_type': 'upnp', 'version': 0x10 },
3240 { 'service_type': 'upnp',
3241 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
3242 { 'version': 0x10,
3243 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
3244 { 'service_type': 'upnp', 'foo': 'bar' },
3245 { 'service_type': 'bonjour' },
3246 { 'service_type': 'bonjour', 'query': 'foo' },
3247 { 'service_type': 'bonjour', 'response': 'foo' },
3248 { 'service_type': 'bonjour', 'query': bonjour_query },
3249 { 'service_type': 'bonjour', 'response': bonjour_response },
3250 { 'service_type': 'bonjour', 'query': dbus.ByteArray(500*'a') },
3251 { 'service_type': 'bonjour', 'foo': 'bar' } ]
3252 for args in tests:
3253 try:
3254 p2p.AddService(args)
3255 raise Exception("Invalid AddService() accepted")
3256 except dbus.exceptions.DBusException, e:
3257 if "InvalidArgs" not in str(e):
3258 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
3259
3260 args = { 'tlv': dbus.ByteArray("\x02\x00\x00\x01") }
3261 ref = p2p.ServiceDiscoveryRequest(args)
3262 p2p.ServiceDiscoveryCancelRequest(ref)
3263 try:
3264 p2p.ServiceDiscoveryCancelRequest(ref)
3265 raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
3266 except dbus.exceptions.DBusException, e:
3267 if "InvalidArgs" not in str(e):
3268 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
3269 try:
3270 p2p.ServiceDiscoveryCancelRequest(dbus.UInt64(0))
3271 raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
3272 except dbus.exceptions.DBusException, e:
3273 if "InvalidArgs" not in str(e):
3274 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
3275
3276 args = { 'service_type': 'upnp',
3277 'version': 0x10,
3278 'service': 'ssdp:foo' }
3279 ref = p2p.ServiceDiscoveryRequest(args)
3280 p2p.ServiceDiscoveryCancelRequest(ref)
3281
3282 tests = [ { 'service_type': 'foo' },
3283 { 'foo': 'bar' },
3284 { 'tlv': 'foo' },
3285 { },
3286 { 'version': 0 },
3287 { 'service_type': 'upnp',
3288 'service': 'ssdp:foo' },
3289 { 'service_type': 'upnp',
3290 'version': 0x10 },
3291 { 'service_type': 'upnp',
3292 'version': 0x10,
3293 'service': 'ssdp:foo',
3294 'peer_object': dbus.ObjectPath(path + "/Peers") },
3295 { 'service_type': 'upnp',
3296 'version': 0x10,
3297 'service': 'ssdp:foo',
3298 'peer_object': path + "/Peers" },
3299 { 'service_type': 'upnp',
3300 'version': 0x10,
3301 'service': 'ssdp:foo',
3302 'peer_object': dbus.ObjectPath(path + "/Peers/00112233445566") } ]
3303 for args in tests:
3304 try:
3305 p2p.ServiceDiscoveryRequest(args)
3306 raise Exception("Invalid ServiceDiscoveryRequest accepted")
3307 except dbus.exceptions.DBusException, e:
3308 if "InvalidArgs" not in str(e):
3309 raise Exception("Unexpected error message for invalid ServiceDiscoveryRequest(): " + str(e))
3310
3311 args = { 'foo': 'bar' }
3312 try:
3313 p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv'))
3314 raise Exception("Invalid ServiceDiscoveryResponse accepted")
3315 except dbus.exceptions.DBusException, e:
3316 if "InvalidArgs" not in str(e):
3317 raise Exception("Unexpected error message for invalid ServiceDiscoveryResponse(): " + str(e))
3318
3319def test_dbus_p2p_service_discovery_query(dev, apdev):
3320 """D-Bus P2P service discovery query"""
910eb5b5 3321 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3322 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3323
3324 addr0 = dev[0].p2p_dev_addr()
3325 dev[1].request("P2P_SERVICE_ADD bonjour 0b5f6166706f766572746370c00c000c01 074578616d706c65c027")
3326 dev[1].p2p_listen()
3327 addr1 = dev[1].p2p_dev_addr()
3328
3329 class TestDbusP2p(TestDbus):
3330 def __init__(self, bus):
3331 TestDbus.__init__(self, bus)
3332 self.done = False
3333
3334 def __enter__(self):
3335 gobject.timeout_add(1, self.run_test)
3336 gobject.timeout_add(15000, self.timeout)
3337 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3338 "DeviceFound")
3339 self.add_signal(self.serviceDiscoveryResponse,
3340 WPAS_DBUS_IFACE_P2PDEVICE,
3341 "ServiceDiscoveryResponse", byte_arrays=True)
3342 self.loop.run()
3343 return self
3344
3345 def deviceFound(self, path):
3346 logger.debug("deviceFound: path=%s" % path)
3347 args = { 'peer_object': path,
3348 'tlv': dbus.ByteArray("\x02\x00\x00\x01") }
3349 p2p.ServiceDiscoveryRequest(args)
3350
3351 def serviceDiscoveryResponse(self, sd_request):
3352 logger.debug("serviceDiscoveryResponse: sd_request=%s" % str(sd_request))
3353 self.done = True
3354 self.loop.quit()
3355
3356 def run_test(self, *args):
3357 logger.debug("run_test")
3358 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social',
3359 'Timeout': dbus.Int32(10)}))
3360 return False
3361
3362 def success(self):
3363 return self.done
3364
3365 with TestDbusP2p(bus) as t:
3366 if not t.success():
3367 raise Exception("Expected signals not seen")
3368
3369 dev[1].p2p_stop_find()
3370
3371def test_dbus_p2p_service_discovery_external(dev, apdev):
3372 """D-Bus P2P service discovery with external response"""
3373 try:
3374 _test_dbus_p2p_service_discovery_external(dev, apdev)
3375 finally:
3376 dev[0].request("P2P_SERV_DISC_EXTERNAL 0")
3377
3378def _test_dbus_p2p_service_discovery_external(dev, apdev):
910eb5b5 3379 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3380 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3381
3382 addr0 = dev[0].p2p_dev_addr()
3383 addr1 = dev[1].p2p_dev_addr()
3384 resp = "0300000101"
3385
3386 dev[1].request("P2P_FLUSH")
3387 dev[1].request("P2P_SERV_DISC_REQ " + addr0 + " 02000001")
3388 dev[1].p2p_find(social=True)
3389
3390 class TestDbusP2p(TestDbus):
3391 def __init__(self, bus):
3392 TestDbus.__init__(self, bus)
3393 self.sd = False
3394
3395 def __enter__(self):
3396 gobject.timeout_add(1, self.run_test)
3397 gobject.timeout_add(15000, self.timeout)
3398 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3399 "DeviceFound")
3400 self.add_signal(self.serviceDiscoveryRequest,
3401 WPAS_DBUS_IFACE_P2PDEVICE,
3402 "ServiceDiscoveryRequest")
3403 self.loop.run()
3404 return self
3405
3406 def deviceFound(self, path):
3407 logger.debug("deviceFound: path=%s" % path)
3408
3409 def serviceDiscoveryRequest(self, sd_request):
3410 logger.debug("serviceDiscoveryRequest: sd_request=%s" % str(sd_request))
3411 self.sd = True
3412 args = { 'peer_object': sd_request['peer_object'],
3413 'frequency': sd_request['frequency'],
3414 'dialog_token': sd_request['dialog_token'],
3415 'tlvs': dbus.ByteArray(binascii.unhexlify(resp)) }
3416 p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv'))
3417 self.loop.quit()
3418
3419 def run_test(self, *args):
3420 logger.debug("run_test")
3421 p2p.ServiceDiscoveryExternal(1)
3422 p2p.ServiceUpdate()
3423 p2p.Listen(15)
3424 return False
3425
3426 def success(self):
3427 return self.sd
3428
3429 with TestDbusP2p(bus) as t:
3430 if not t.success():
3431 raise Exception("Expected signals not seen")
3432
3433 ev = dev[1].wait_global_event(["P2P-SERV-DISC-RESP"], timeout=5)
3434 if ev is None:
3435 raise Exception("Service discovery timed out")
3436 if addr0 not in ev:
3437 raise Exception("Unexpected address in SD Response: " + ev)
3438 if ev.split(' ')[4] != resp:
3439 raise Exception("Unexpected response data SD Response: " + ev)
3440 dev[1].p2p_stop_find()
3441
3442 p2p.StopFind()
3443 p2p.ServiceDiscoveryExternal(0)
3444
3445def test_dbus_p2p_autogo(dev, apdev):
3446 """D-Bus P2P autonomous GO"""
910eb5b5 3447 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67 3448 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2ec82e67
JM
3449
3450 addr0 = dev[0].p2p_dev_addr()
3451
3452 class TestDbusP2p(TestDbus):
3453 def __init__(self, bus):
3454 TestDbus.__init__(self, bus)
3455 self.first = True
3456 self.waiting_end = False
035efb2c 3457 self.exceptions = False
001c4bf5 3458 self.deauthorized = False
2ec82e67
JM
3459 self.done = False
3460
3461 def __enter__(self):
3462 gobject.timeout_add(1, self.run_test)
3463 gobject.timeout_add(15000, self.timeout)
3464 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3465 "DeviceFound")
3466 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3467 "GroupStarted")
3468 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3469 "GroupFinished")
3470 self.add_signal(self.persistentGroupAdded,
3471 WPAS_DBUS_IFACE_P2PDEVICE,
3472 "PersistentGroupAdded")
3473 self.add_signal(self.persistentGroupRemoved,
3474 WPAS_DBUS_IFACE_P2PDEVICE,
3475 "PersistentGroupRemoved")
3476 self.add_signal(self.provisionDiscoveryRequestDisplayPin,
3477 WPAS_DBUS_IFACE_P2PDEVICE,
3478 "ProvisionDiscoveryRequestDisplayPin")
3479 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3480 "StaAuthorized")
001c4bf5
JM
3481 self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE,
3482 "StaDeauthorized")
2ec82e67
JM
3483 self.loop.run()
3484 return self
3485
3486 def groupStarted(self, properties):
3487 logger.debug("groupStarted: " + str(properties))
3488 self.group = properties['group_object']
cb346b49
JM
3489 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3490 properties['interface_object'])
3491 role = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
3492 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67 3493 if role != "GO":
035efb2c 3494 self.exceptions = True
2ec82e67 3495 raise Exception("Unexpected role reported: " + role)
cb346b49
JM
3496 group = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
3497 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67 3498 if group != properties['group_object']:
035efb2c 3499 self.exceptions = True
2ec82e67 3500 raise Exception("Unexpected Group reported: " + str(group))
cb346b49
JM
3501 go = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
3502 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67 3503 if go != '/':
035efb2c 3504 self.exceptions = True
2ec82e67
JM
3505 raise Exception("Unexpected PeerGO value: " + str(go))
3506 if self.first:
3507 self.first = False
3508 logger.info("Remove persistent group instance")
cb346b49
JM
3509 group_p2p = dbus.Interface(self.g_if_obj,
3510 WPAS_DBUS_IFACE_P2PDEVICE)
3511 group_p2p.Disconnect()
2ec82e67
JM
3512 else:
3513 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3514 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join")
3515
3516 def groupFinished(self, properties):
3517 logger.debug("groupFinished: " + str(properties))
3518 if self.waiting_end:
3519 logger.info("Remove persistent group")
3520 p2p.RemovePersistentGroup(self.persistent)
3521 else:
3522 logger.info("Re-start persistent group")
3523 params = dbus.Dictionary({'persistent_group_object':
3524 self.persistent,
3525 'frequency': 2412})
3526 p2p.GroupAdd(params)
3527
3528 def persistentGroupAdded(self, path, properties):
3529 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
3530 self.persistent = path
3531
3532 def persistentGroupRemoved(self, path):
3533 logger.debug("persistentGroupRemoved: %s" % path)
3534 self.done = True
3535 self.loop.quit()
3536
3537 def deviceFound(self, path):
3538 logger.debug("deviceFound: path=%s" % path)
3539 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3540 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3541 dbus_interface=dbus.PROPERTIES_IFACE,
3542 byte_arrays=True)
3543 logger.debug('peer properties: ' + str(self.peer))
3544
3545 def provisionDiscoveryRequestDisplayPin(self, peer_object, pin):
3546 logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin))
3547 self.peer_path = peer_object
3548 peer = binascii.unhexlify(peer_object.split('/')[-1])
3549 addr = ""
3550 for p in peer:
3551 if len(addr) > 0:
3552 addr += ':'
3553 addr += '%02x' % ord(p)
795b6f57
JM
3554
3555 params = { 'Role': 'registrar',
3556 'P2PDeviceAddress': self.peer['DeviceAddress'],
3557 'Bssid': self.peer['DeviceAddress'],
3558 'Type': 'pin' }
cb346b49 3559 wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
795b6f57
JM
3560 try:
3561 wps.Start(params)
035efb2c 3562 self.exceptions = True
795b6f57
JM
3563 raise Exception("Invalid WPS.Start() accepted")
3564 except dbus.exceptions.DBusException, e:
3565 if "InvalidArgs" not in str(e):
035efb2c 3566 self.exceptions = True
795b6f57 3567 raise Exception("Unexpected error message: " + str(e))
2ec82e67
JM
3568 params = { 'Role': 'registrar',
3569 'P2PDeviceAddress': self.peer['DeviceAddress'],
2ec82e67
JM
3570 'Type': 'pin',
3571 'Pin': '12345670' }
3572 logger.info("Authorize peer to connect to the group")
3573 wps.Start(params)
3574
3575 def staAuthorized(self, name):
3576 logger.debug("staAuthorized: " + name)
3577 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, self.peer_path)
3578 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3579 dbus_interface=dbus.PROPERTIES_IFACE,
3580 byte_arrays=True)
035efb2c 3581 logger.debug("Peer properties: " + str(res))
2ec82e67 3582 if 'Groups' not in res or len(res['Groups']) != 1:
035efb2c 3583 self.exceptions = True
2ec82e67
JM
3584 raise Exception("Unexpected number of peer Groups entries")
3585 if res['Groups'][0] != self.group:
035efb2c 3586 self.exceptions = True
2ec82e67
JM
3587 raise Exception("Unexpected peer Groups[0] value")
3588
3589 g_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group)
3590 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3591 dbus_interface=dbus.PROPERTIES_IFACE,
3592 byte_arrays=True)
3593 logger.debug("Group properties: " + str(res))
3594 if 'Members' not in res or len(res['Members']) != 1:
035efb2c 3595 self.exceptions = True
2ec82e67
JM
3596 raise Exception("Unexpected number of group members")
3597
3598 ext = dbus.ByteArray("\x11\x22\x33\x44")
3599 # Earlier implementation of this interface was a bit strange. The
3600 # property is defined to have aay signature and that is what the
3601 # getter returned. However, the setter expected there to be a
3602 # dictionary with 'WPSVendorExtensions' as the key surrounding these
3603 # values.. The current implementations maintains support for that
3604 # for backwards compability reasons. Verify that encoding first.
3605 vals = dbus.Dictionary({ 'WPSVendorExtensions': [ ext ]},
3606 signature='sv')
3607 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3608 dbus_interface=dbus.PROPERTIES_IFACE)
3609 res = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions',
3610 dbus_interface=dbus.PROPERTIES_IFACE,
3611 byte_arrays=True)
3612 if len(res) != 1:
035efb2c 3613 self.exceptions = True
2ec82e67
JM
3614 raise Exception("Unexpected number of vendor extensions")
3615 if res[0] != ext:
035efb2c 3616 self.exceptions = True
2ec82e67
JM
3617 raise Exception("Vendor extension value changed")
3618
3619 # And now verify that the more appropriate encoding is accepted as
3620 # well.
3621 res.append(dbus.ByteArray('\xaa\xbb\xcc\xdd\xee\xff'))
3622 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3623 dbus_interface=dbus.PROPERTIES_IFACE)
3624 res2 = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions',
3625 dbus_interface=dbus.PROPERTIES_IFACE,
3626 byte_arrays=True)
3627 if len(res) != 2:
035efb2c 3628 self.exceptions = True
2ec82e67
JM
3629 raise Exception("Unexpected number of vendor extensions")
3630 if res[0] != res2[0] or res[1] != res2[1]:
035efb2c 3631 self.exceptions = True
2ec82e67
JM
3632 raise Exception("Vendor extension value changed")
3633
3634 for i in range(10):
3635 res.append(dbus.ByteArray('\xaa\xbb'))
3636 try:
3637 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3638 dbus_interface=dbus.PROPERTIES_IFACE)
035efb2c 3639 self.exceptions = True
2ec82e67
JM
3640 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3641 except dbus.exceptions.DBusException, e:
3642 if "Error.Failed" not in str(e):
035efb2c 3643 self.exceptions = True
2ec82e67
JM
3644 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3645
3646 vals = dbus.Dictionary({ 'Foo': [ ext ]}, signature='sv')
3647 try:
3648 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3649 dbus_interface=dbus.PROPERTIES_IFACE)
035efb2c 3650 self.exceptions = True
2ec82e67
JM
3651 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3652 except dbus.exceptions.DBusException, e:
3653 if "InvalidArgs" not in str(e):
035efb2c 3654 self.exceptions = True
2ec82e67
JM
3655 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3656
3657 vals = [ "foo" ]
3658 try:
3659 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3660 dbus_interface=dbus.PROPERTIES_IFACE)
035efb2c 3661 self.exceptions = True
2ec82e67
JM
3662 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3663 except dbus.exceptions.DBusException, e:
3664 if "Error.Failed" not in str(e):
035efb2c 3665 self.exceptions = True
2ec82e67
JM
3666 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3667
3668 vals = [ [ "foo" ] ]
3669 try:
3670 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3671 dbus_interface=dbus.PROPERTIES_IFACE)
035efb2c 3672 self.exceptions = True
2ec82e67
JM
3673 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3674 except dbus.exceptions.DBusException, e:
3675 if "Error.Failed" not in str(e):
035efb2c 3676 self.exceptions = True
2ec82e67
JM
3677 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3678
001c4bf5
JM
3679 p2p.RemoveClient({ 'peer': self.peer_path })
3680
2ec82e67 3681 self.waiting_end = True
cb346b49
JM
3682 group_p2p = dbus.Interface(self.g_if_obj,
3683 WPAS_DBUS_IFACE_P2PDEVICE)
3684 group_p2p.Disconnect()
2ec82e67 3685
001c4bf5
JM
3686 def staDeauthorized(self, name):
3687 logger.debug("staDeauthorized: " + name)
3688 self.deauthorized = True
3689
2ec82e67
JM
3690 def run_test(self, *args):
3691 logger.debug("run_test")
3692 params = dbus.Dictionary({'persistent': True,
3693 'frequency': 2412})
3694 logger.info("Add a persistent group")
3695 p2p.GroupAdd(params)
3696 return False
3697
3698 def success(self):
035efb2c 3699 return self.done and self.deauthorized and not self.exceptions
2ec82e67
JM
3700
3701 with TestDbusP2p(bus) as t:
3702 if not t.success():
3703 raise Exception("Expected signals not seen")
3704
3705 dev[1].wait_go_ending_session()
3706
3707def test_dbus_p2p_autogo_pbc(dev, apdev):
3708 """D-Bus P2P autonomous GO and PBC"""
910eb5b5 3709 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67 3710 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2ec82e67
JM
3711
3712 addr0 = dev[0].p2p_dev_addr()
3713
3714 class TestDbusP2p(TestDbus):
3715 def __init__(self, bus):
3716 TestDbus.__init__(self, bus)
3717 self.first = True
3718 self.waiting_end = False
3719 self.done = False
3720
3721 def __enter__(self):
3722 gobject.timeout_add(1, self.run_test)
3723 gobject.timeout_add(15000, self.timeout)
3724 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3725 "DeviceFound")
3726 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3727 "GroupStarted")
3728 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3729 "GroupFinished")
3730 self.add_signal(self.provisionDiscoveryPBCRequest,
3731 WPAS_DBUS_IFACE_P2PDEVICE,
3732 "ProvisionDiscoveryPBCRequest")
3733 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3734 "StaAuthorized")
3735 self.loop.run()
3736 return self
3737
3738 def groupStarted(self, properties):
3739 logger.debug("groupStarted: " + str(properties))
3740 self.group = properties['group_object']
cb346b49
JM
3741 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3742 properties['interface_object'])
2ec82e67
JM
3743 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3744 dev1.global_request("P2P_CONNECT " + addr0 + " pbc join")
3745
3746 def groupFinished(self, properties):
3747 logger.debug("groupFinished: " + str(properties))
3748 self.done = True
3749 self.loop.quit()
3750
3751 def deviceFound(self, path):
3752 logger.debug("deviceFound: path=%s" % path)
3753 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3754 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3755 dbus_interface=dbus.PROPERTIES_IFACE,
3756 byte_arrays=True)
3757 logger.debug('peer properties: ' + str(self.peer))
3758
3759 def provisionDiscoveryPBCRequest(self, peer_object):
3760 logger.debug("provisionDiscoveryPBCRequest - peer=%s" % peer_object)
3761 self.peer_path = peer_object
3762 peer = binascii.unhexlify(peer_object.split('/')[-1])
3763 addr = ""
3764 for p in peer:
3765 if len(addr) > 0:
3766 addr += ':'
3767 addr += '%02x' % ord(p)
3768 params = { 'Role': 'registrar',
3769 'P2PDeviceAddress': self.peer['DeviceAddress'],
2ec82e67
JM
3770 'Type': 'pbc' }
3771 logger.info("Authorize peer to connect to the group")
cb346b49 3772 wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
2ec82e67
JM
3773 wps.Start(params)
3774
3775 def staAuthorized(self, name):
3776 logger.debug("staAuthorized: " + name)
cb346b49
JM
3777 group_p2p = dbus.Interface(self.g_if_obj,
3778 WPAS_DBUS_IFACE_P2PDEVICE)
3779 group_p2p.Disconnect()
2ec82e67
JM
3780
3781 def run_test(self, *args):
3782 logger.debug("run_test")
3783 params = dbus.Dictionary({'frequency': 2412})
3784 p2p.GroupAdd(params)
3785 return False
3786
3787 def success(self):
3788 return self.done
3789
3790 with TestDbusP2p(bus) as t:
3791 if not t.success():
3792 raise Exception("Expected signals not seen")
3793
3794 dev[1].wait_go_ending_session()
3795 dev[1].flush_scan_cache()
3796
3797def test_dbus_p2p_autogo_legacy(dev, apdev):
3798 """D-Bus P2P autonomous GO and legacy STA"""
910eb5b5 3799 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67 3800 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2ec82e67
JM
3801
3802 addr0 = dev[0].p2p_dev_addr()
3803
3804 class TestDbusP2p(TestDbus):
3805 def __init__(self, bus):
3806 TestDbus.__init__(self, bus)
3807 self.done = False
3808
3809 def __enter__(self):
3810 gobject.timeout_add(1, self.run_test)
3811 gobject.timeout_add(15000, self.timeout)
3812 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3813 "GroupStarted")
3814 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3815 "GroupFinished")
3816 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3817 "StaAuthorized")
3818 self.loop.run()
3819 return self
3820
3821 def groupStarted(self, properties):
3822 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
3823 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
3824 properties['group_object'])
3825 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3826 dbus_interface=dbus.PROPERTIES_IFACE,
3827 byte_arrays=True)
3828 bssid = ':'.join([binascii.hexlify(l) for l in res['BSSID']])
3829
2ec82e67
JM
3830 pin = '12345670'
3831 params = { 'Role': 'enrollee',
3832 'Type': 'pin',
3833 'Pin': pin }
cb346b49
JM
3834 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3835 properties['interface_object'])
3836 wps = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_WPS)
2ec82e67
JM
3837 wps.Start(params)
3838 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49
JM
3839 dev1.scan_for_bss(bssid, freq=2412)
3840 dev1.request("WPS_PIN " + bssid + " " + pin)
3841 self.group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2ec82e67
JM
3842
3843 def groupFinished(self, properties):
3844 logger.debug("groupFinished: " + str(properties))
3845 self.done = True
3846 self.loop.quit()
3847
3848 def staAuthorized(self, name):
3849 logger.debug("staAuthorized: " + name)
3850 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3851 dev1.request("DISCONNECT")
cb346b49 3852 self.group_p2p.Disconnect()
2ec82e67
JM
3853
3854 def run_test(self, *args):
3855 logger.debug("run_test")
3856 params = dbus.Dictionary({'frequency': 2412})
3857 p2p.GroupAdd(params)
3858 return False
3859
3860 def success(self):
3861 return self.done
3862
3863 with TestDbusP2p(bus) as t:
3864 if not t.success():
3865 raise Exception("Expected signals not seen")
3866
3867def test_dbus_p2p_join(dev, apdev):
3868 """D-Bus P2P join an autonomous GO"""
910eb5b5 3869 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
3870 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3871
3872 addr1 = dev[1].p2p_dev_addr()
3873 addr2 = dev[2].p2p_dev_addr()
3874 dev[1].p2p_start_go(freq=2412)
cb346b49 3875 dev1_group_ifname = dev[1].group_ifname
2ec82e67
JM
3876 dev[2].p2p_listen()
3877
3878 class TestDbusP2p(TestDbus):
3879 def __init__(self, bus):
3880 TestDbus.__init__(self, bus)
3881 self.done = False
3882 self.peer = None
3883 self.go = None
3884
3885 def __enter__(self):
3886 gobject.timeout_add(1, self.run_test)
3887 gobject.timeout_add(15000, self.timeout)
3888 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3889 "DeviceFound")
3890 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3891 "GroupStarted")
3892 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3893 "GroupFinished")
3894 self.add_signal(self.invitationResult, WPAS_DBUS_IFACE_P2PDEVICE,
3895 "InvitationResult")
3896 self.loop.run()
3897 return self
3898
3899 def deviceFound(self, path):
3900 logger.debug("deviceFound: path=%s" % path)
3901 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3902 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3903 dbus_interface=dbus.PROPERTIES_IFACE,
3904 byte_arrays=True)
3905 logger.debug('peer properties: ' + str(res))
3906 if addr2.replace(':','') in path:
3907 self.peer = path
3908 elif addr1.replace(':','') in path:
3909 self.go = path
3910 if self.peer and self.go:
3911 logger.info("Join the group")
3912 p2p.StopFind()
3913 args = { 'peer': self.go,
3914 'join': True,
3915 'wps_method': 'pin',
3916 'frequency': 2412 }
3917 pin = p2p.Connect(args)
3918
3919 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49
JM
3920 dev1.group_ifname = dev1_group_ifname
3921 dev1.group_request("WPS_PIN any " + pin)
2ec82e67
JM
3922
3923 def groupStarted(self, properties):
3924 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
3925 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3926 properties['interface_object'])
3927 role = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
3928 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67
JM
3929 if role != "client":
3930 raise Exception("Unexpected role reported: " + role)
cb346b49
JM
3931 group = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
3932 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67
JM
3933 if group != properties['group_object']:
3934 raise Exception("Unexpected Group reported: " + str(group))
cb346b49
JM
3935 go = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
3936 dbus_interface=dbus.PROPERTIES_IFACE)
2ec82e67
JM
3937 if go != self.go:
3938 raise Exception("Unexpected PeerGO value: " + str(go))
3939
3940 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
3941 properties['group_object'])
3942 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3943 dbus_interface=dbus.PROPERTIES_IFACE,
3944 byte_arrays=True)
3945 logger.debug("Group properties: " + str(res))
3946
3947 ext = dbus.ByteArray("\x11\x22\x33\x44")
3948 try:
3949 # Set(WPSVendorExtensions) not allowed for P2P Client
3950 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3951 dbus_interface=dbus.PROPERTIES_IFACE)
3952 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3953 except dbus.exceptions.DBusException, e:
3954 if "Error.Failed: Failed to set property" not in str(e):
3955 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3956
cb346b49 3957 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2ec82e67
JM
3958 args = { 'duration1': 30000, 'interval1': 102400,
3959 'duration2': 20000, 'interval2': 102400 }
cb346b49 3960 group_p2p.PresenceRequest(args)
2ec82e67
JM
3961
3962 args = { 'peer': self.peer }
cb346b49 3963 group_p2p.Invite(args)
2ec82e67
JM
3964
3965 def groupFinished(self, properties):
3966 logger.debug("groupFinished: " + str(properties))
3967 self.done = True
3968 self.loop.quit()
3969
3970 def invitationResult(self, result):
3971 logger.debug("invitationResult: " + str(result))
3972 if result['status'] != 1:
3973 raise Exception("Unexpected invitation result: " + str(result))
3974 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49 3975 dev1.group_ifname = dev1_group_ifname
2ec82e67
JM
3976 dev1.remove_group()
3977
3978 def run_test(self, *args):
3979 logger.debug("run_test")
3980 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
3981 return False
3982
3983 def success(self):
3984 return self.done
3985
3986 with TestDbusP2p(bus) as t:
3987 if not t.success():
3988 raise Exception("Expected signals not seen")
3989
3990 dev[2].p2p_stop_find()
3991
1992af11
JM
3992def test_dbus_p2p_invitation_received(dev, apdev):
3993 """D-Bus P2P and InvitationReceived"""
3994 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3995 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3996
3997 form(dev[0], dev[1])
3998 addr0 = dev[0].p2p_dev_addr()
3999 dev[0].p2p_listen()
4000 dev[0].global_request("SET persistent_reconnect 0")
4001
4002 if not dev[1].discover_peer(addr0, social=True):
4003 raise Exception("Peer " + addr0 + " not found")
4004 peer = dev[1].get_peer(addr0)
4005
4006 class TestDbusP2p(TestDbus):
4007 def __init__(self, bus):
4008 TestDbus.__init__(self, bus)
4009 self.done = False
4010
4011 def __enter__(self):
4012 gobject.timeout_add(1, self.run_test)
4013 gobject.timeout_add(15000, self.timeout)
4014 self.add_signal(self.invitationReceived, WPAS_DBUS_IFACE_P2PDEVICE,
4015 "InvitationReceived")
4016 self.loop.run()
4017 return self
4018
4019 def invitationReceived(self, result):
4020 logger.debug("invitationReceived: " + str(result))
4021 self.done = True
4022 self.loop.quit()
4023
4024 def run_test(self, *args):
4025 logger.debug("run_test")
4026 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4027 cmd = "P2P_INVITE persistent=" + peer['persistent'] + " peer=" + addr0
4028 dev1.global_request(cmd)
4029 return False
4030
4031 def success(self):
4032 return self.done
4033
4034 with TestDbusP2p(bus) as t:
4035 if not t.success():
4036 raise Exception("Expected signals not seen")
4037
4038 dev[0].p2p_stop_find()
4039 dev[1].p2p_stop_find()
4040
2ec82e67
JM
4041def test_dbus_p2p_config(dev, apdev):
4042 """D-Bus Get/Set P2PDeviceConfig"""
4043 try:
4044 _test_dbus_p2p_config(dev, apdev)
4045 finally:
4046 dev[0].request("P2P_SET ssid_postfix ")
4047
4048def _test_dbus_p2p_config(dev, apdev):
910eb5b5 4049 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
4050 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4051
4052 res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4053 dbus_interface=dbus.PROPERTIES_IFACE,
4054 byte_arrays=True)
4055 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", res,
4056 dbus_interface=dbus.PROPERTIES_IFACE)
4057 res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4058 dbus_interface=dbus.PROPERTIES_IFACE,
4059 byte_arrays=True)
4060
4061 if len(res) != len(res2):
4062 raise Exception("Different number of parameters")
4063 for k in res:
4064 if res[k] != res2[k]:
4065 raise Exception("Parameter %s value changes" % k)
4066
4067 changes = { 'SsidPostfix': 'foo',
4068 'VendorExtension': [ dbus.ByteArray('\x11\x22\x33\x44') ],
4069 'SecondaryDeviceTypes': [ dbus.ByteArray('\x11\x22\x33\x44\x55\x66\x77\x88') ]}
4070 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4071 dbus.Dictionary(changes, signature='sv'),
4072 dbus_interface=dbus.PROPERTIES_IFACE)
4073
4074 res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4075 dbus_interface=dbus.PROPERTIES_IFACE,
4076 byte_arrays=True)
4077 logger.debug("P2PDeviceConfig: " + str(res2))
4078 if 'VendorExtension' not in res2 or len(res2['VendorExtension']) != 1:
4079 raise Exception("VendorExtension does not match")
4080 if 'SecondaryDeviceTypes' not in res2 or len(res2['SecondaryDeviceTypes']) != 1:
4081 raise Exception("SecondaryDeviceType does not match")
4082
4083 changes = { 'SsidPostfix': '',
4084 'VendorExtension': dbus.Array([], signature="ay"),
4085 'SecondaryDeviceTypes': dbus.Array([], signature="ay") }
4086 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4087 dbus.Dictionary(changes, signature='sv'),
4088 dbus_interface=dbus.PROPERTIES_IFACE)
4089
4090 res3 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4091 dbus_interface=dbus.PROPERTIES_IFACE,
4092 byte_arrays=True)
4093 logger.debug("P2PDeviceConfig: " + str(res3))
4094 if 'VendorExtension' in res3:
4095 raise Exception("VendorExtension not removed")
4096 if 'SecondaryDeviceTypes' in res3:
4097 raise Exception("SecondaryDeviceType not removed")
4098
4099 try:
4100 dev[0].request("P2P_SET disabled 1")
4101 if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4102 dbus_interface=dbus.PROPERTIES_IFACE,
4103 byte_arrays=True)
4104 raise Exception("Invalid Get(P2PDeviceConfig) accepted")
4105 except dbus.exceptions.DBusException, e:
4106 if "Error.Failed: P2P is not available for this interface" not in str(e):
4107 raise Exception("Unexpected error message for invalid Invite: " + str(e))
4108 finally:
4109 dev[0].request("P2P_SET disabled 0")
4110
4111 try:
4112 dev[0].request("P2P_SET disabled 1")
4113 changes = { 'SsidPostfix': 'foo' }
4114 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4115 dbus.Dictionary(changes, signature='sv'),
4116 dbus_interface=dbus.PROPERTIES_IFACE)
4117 raise Exception("Invalid Set(P2PDeviceConfig) accepted")
4118 except dbus.exceptions.DBusException, e:
4119 if "Error.Failed: P2P is not available for this interface" not in str(e):
4120 raise Exception("Unexpected error message for invalid Invite: " + str(e))
4121 finally:
4122 dev[0].request("P2P_SET disabled 0")
4123
4124 tests = [ { 'DeviceName': 123 },
4125 { 'SsidPostfix': 123 },
4126 { 'Foo': 'Bar' } ]
4127 for changes in tests:
4128 try:
4129 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4130 dbus.Dictionary(changes, signature='sv'),
4131 dbus_interface=dbus.PROPERTIES_IFACE)
4132 raise Exception("Invalid Set(P2PDeviceConfig) accepted")
4133 except dbus.exceptions.DBusException, e:
4134 if "InvalidArgs" not in str(e):
4135 raise Exception("Unexpected error message for invalid Invite: " + str(e))
4136
4137def test_dbus_p2p_persistent(dev, apdev):
4138 """D-Bus P2P persistent group"""
910eb5b5 4139 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
4140 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4141
4142 class TestDbusP2p(TestDbus):
4143 def __init__(self, bus):
4144 TestDbus.__init__(self, bus)
4145
4146 def __enter__(self):
4147 gobject.timeout_add(1, self.run_test)
4148 gobject.timeout_add(15000, self.timeout)
4149 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4150 "GroupStarted")
4151 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4152 "GroupFinished")
4153 self.add_signal(self.persistentGroupAdded,
4154 WPAS_DBUS_IFACE_P2PDEVICE,
4155 "PersistentGroupAdded")
4156 self.loop.run()
4157 return self
4158
4159 def groupStarted(self, properties):
4160 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
4161 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4162 properties['interface_object'])
4163 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4164 group_p2p.Disconnect()
2ec82e67
JM
4165
4166 def groupFinished(self, properties):
4167 logger.debug("groupFinished: " + str(properties))
4168 self.loop.quit()
4169
4170 def persistentGroupAdded(self, path, properties):
4171 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
4172 self.persistent = path
4173
4174 def run_test(self, *args):
4175 logger.debug("run_test")
4176 params = dbus.Dictionary({'persistent': True,
4177 'frequency': 2412})
4178 logger.info("Add a persistent group")
4179 p2p.GroupAdd(params)
4180 return False
4181
4182 def success(self):
4183 return True
4184
4185 with TestDbusP2p(bus) as t:
4186 if not t.success():
4187 raise Exception("Expected signals not seen")
4188 persistent = t.persistent
4189
4190 p_obj = bus.get_object(WPAS_DBUS_SERVICE, persistent)
4191 res = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties",
4192 dbus_interface=dbus.PROPERTIES_IFACE, byte_arrays=True)
4193 logger.info("Persistent group Properties: " + str(res))
4194 vals = dbus.Dictionary({ 'ssid': 'DIRECT-foo' }, signature='sv')
4195 p_obj.Set(WPAS_DBUS_PERSISTENT_GROUP, "Properties", vals,
4196 dbus_interface=dbus.PROPERTIES_IFACE)
4197 res2 = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties",
4198 dbus_interface=dbus.PROPERTIES_IFACE)
4199 if len(res) != len(res2):
4200 raise Exception("Different number of parameters")
4201 for k in res:
4202 if k != 'ssid' and res[k] != res2[k]:
4203 raise Exception("Parameter %s value changes" % k)
4204 if res2['ssid'] != '"DIRECT-foo"':
4205 raise Exception("Unexpected ssid")
4206
4207 args = dbus.Dictionary({ 'ssid': 'DIRECT-testing',
4208 'psk': '1234567890' }, signature='sv')
4209 group = p2p.AddPersistentGroup(args)
4210
4211 groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups",
4212 dbus_interface=dbus.PROPERTIES_IFACE)
4213 if len(groups) != 2:
4214 raise Exception("Unexpected number of persistent groups: " + str(groups))
4215
4216 p2p.RemoveAllPersistentGroups()
4217
4218 groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups",
4219 dbus_interface=dbus.PROPERTIES_IFACE)
4220 if len(groups) != 0:
4221 raise Exception("Unexpected number of persistent groups: " + str(groups))
4222
4223 try:
4224 p2p.RemovePersistentGroup(persistent)
4225 raise Exception("Invalid RemovePersistentGroup accepted")
4226 except dbus.exceptions.DBusException, e:
4227 if "NetworkUnknown: There is no such persistent group" not in str(e):
4228 raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e))
4229
4230def test_dbus_p2p_reinvoke_persistent(dev, apdev):
4231 """D-Bus P2P reinvoke persistent group"""
910eb5b5 4232 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67 4233 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2ec82e67
JM
4234
4235 addr0 = dev[0].p2p_dev_addr()
4236
4237 class TestDbusP2p(TestDbus):
4238 def __init__(self, bus):
4239 TestDbus.__init__(self, bus)
4240 self.first = True
4241 self.waiting_end = False
4242 self.done = False
4243 self.invited = False
4244
4245 def __enter__(self):
4246 gobject.timeout_add(1, self.run_test)
4247 gobject.timeout_add(15000, self.timeout)
4248 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4249 "DeviceFound")
4250 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4251 "GroupStarted")
4252 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4253 "GroupFinished")
4254 self.add_signal(self.persistentGroupAdded,
4255 WPAS_DBUS_IFACE_P2PDEVICE,
4256 "PersistentGroupAdded")
4257 self.add_signal(self.provisionDiscoveryRequestDisplayPin,
4258 WPAS_DBUS_IFACE_P2PDEVICE,
4259 "ProvisionDiscoveryRequestDisplayPin")
4260 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
4261 "StaAuthorized")
4262 self.loop.run()
4263 return self
4264
4265 def groupStarted(self, properties):
4266 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
4267 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4268 properties['interface_object'])
2ec82e67 4269 if not self.invited:
cb346b49
JM
4270 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
4271 properties['group_object'])
4272 res = g_obj.GetAll(WPAS_DBUS_GROUP,
4273 dbus_interface=dbus.PROPERTIES_IFACE,
4274 byte_arrays=True)
4275 bssid = ':'.join([binascii.hexlify(l) for l in res['BSSID']])
2ec82e67 4276 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49 4277 dev1.scan_for_bss(bssid, freq=2412)
2ec82e67
JM
4278 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join")
4279
4280 def groupFinished(self, properties):
4281 logger.debug("groupFinished: " + str(properties))
4282 if self.invited:
4283 self.done = True
4284 self.loop.quit()
4285 else:
4286 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49 4287 dev1.global_request("SET persistent_reconnect 1")
2ec82e67
JM
4288 dev1.p2p_listen()
4289
4290 args = { 'persistent_group_object': dbus.ObjectPath(path),
4291 'peer': self.peer_path }
4292 try:
4293 pin = p2p.Invite(args)
4294 raise Exception("Invalid Invite accepted")
4295 except dbus.exceptions.DBusException, e:
4296 if "InvalidArgs" not in str(e):
4297 raise Exception("Unexpected error message for invalid Invite: " + str(e))
4298
4299 args = { 'persistent_group_object': self.persistent,
4300 'peer': self.peer_path }
4301 pin = p2p.Invite(args)
4302 self.invited = True
4303
cb346b49
JM
4304 self.sta_group_ev = dev1.wait_global_event(["P2P-GROUP-STARTED"],
4305 timeout=15)
4306 if self.sta_group_ev is None:
4307 raise Exception("P2P-GROUP-STARTED event not seen")
4308
2ec82e67
JM
4309 def persistentGroupAdded(self, path, properties):
4310 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
4311 self.persistent = path
4312
4313 def deviceFound(self, path):
4314 logger.debug("deviceFound: path=%s" % path)
4315 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
4316 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
4317 dbus_interface=dbus.PROPERTIES_IFACE,
4318 byte_arrays=True)
4319
4320 def provisionDiscoveryRequestDisplayPin(self, peer_object, pin):
4321 logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin))
4322 self.peer_path = peer_object
4323 peer = binascii.unhexlify(peer_object.split('/')[-1])
4324 addr = ""
4325 for p in peer:
4326 if len(addr) > 0:
4327 addr += ':'
4328 addr += '%02x' % ord(p)
4329 params = { 'Role': 'registrar',
4330 'P2PDeviceAddress': self.peer['DeviceAddress'],
4331 'Bssid': self.peer['DeviceAddress'],
4332 'Type': 'pin',
4333 'Pin': '12345670' }
4334 logger.info("Authorize peer to connect to the group")
cb346b49
JM
4335 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4336 wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
2ec82e67 4337 wps.Start(params)
cb346b49
JM
4338 self.sta_group_ev = dev1.wait_global_event(["P2P-GROUP-STARTED"],
4339 timeout=15)
4340 if self.sta_group_ev is None:
4341 raise Exception("P2P-GROUP-STARTED event not seen")
2ec82e67
JM
4342
4343 def staAuthorized(self, name):
4344 logger.debug("staAuthorized: " + name)
4345 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49 4346 dev1.group_form_result(self.sta_group_ev)
2ec82e67 4347 dev1.remove_group()
cb346b49 4348 ev = dev1.wait_global_event(["P2P-GROUP-REMOVED"], timeout=10)
2ec82e67
JM
4349 if ev is None:
4350 raise Exception("Group removal timed out")
cb346b49
JM
4351 group_p2p = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4352 group_p2p.Disconnect()
2ec82e67
JM
4353
4354 def run_test(self, *args):
4355 logger.debug("run_test")
4356 params = dbus.Dictionary({'persistent': True,
4357 'frequency': 2412})
4358 logger.info("Add a persistent group")
4359 p2p.GroupAdd(params)
4360 return False
4361
4362 def success(self):
4363 return self.done
4364
4365 with TestDbusP2p(bus) as t:
4366 if not t.success():
4367 raise Exception("Expected signals not seen")
4368
4369def test_dbus_p2p_go_neg_rx(dev, apdev):
4370 """D-Bus P2P GO Negotiation receive"""
910eb5b5 4371 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
4372 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4373 addr0 = dev[0].p2p_dev_addr()
4374
4375 class TestDbusP2p(TestDbus):
4376 def __init__(self, bus):
4377 TestDbus.__init__(self, bus)
4378 self.done = False
4379
4380 def __enter__(self):
4381 gobject.timeout_add(1, self.run_test)
4382 gobject.timeout_add(15000, self.timeout)
4383 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4384 "DeviceFound")
4385 self.add_signal(self.goNegotiationRequest,
4386 WPAS_DBUS_IFACE_P2PDEVICE,
4387 "GONegotiationRequest",
4388 byte_arrays=True)
4389 self.add_signal(self.goNegotiationSuccess,
4390 WPAS_DBUS_IFACE_P2PDEVICE,
4391 "GONegotiationSuccess",
4392 byte_arrays=True)
4393 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4394 "GroupStarted")
4395 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4396 "GroupFinished")
4397 self.loop.run()
4398 return self
4399
4400 def deviceFound(self, path):
4401 logger.debug("deviceFound: path=%s" % path)
4402
f5d5161d
JM
4403 def goNegotiationRequest(self, path, dev_passwd_id, go_intent=0):
4404 logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path, dev_passwd_id, go_intent))
2ec82e67
JM
4405 if dev_passwd_id != 1:
4406 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id)
4407 args = { 'peer': path, 'wps_method': 'display', 'pin': '12345670',
4408 'go_intent': 15, 'persistent': False, 'frequency': 5175 }
4409 try:
4410 p2p.Connect(args)
4411 raise Exception("Invalid Connect accepted")
4412 except dbus.exceptions.DBusException, e:
4413 if "ConnectChannelUnsupported" not in str(e):
4414 raise Exception("Unexpected error message for invalid Connect: " + str(e))
4415
4416 args = { 'peer': path, 'wps_method': 'display', 'pin': '12345670',
4417 'go_intent': 15, 'persistent': False }
4418 p2p.Connect(args)
4419
4420 def goNegotiationSuccess(self, properties):
4421 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4422
4423 def groupStarted(self, properties):
4424 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
4425 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4426 properties['interface_object'])
4427 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4428 group_p2p.Disconnect()
2ec82e67
JM
4429
4430 def groupFinished(self, properties):
4431 logger.debug("groupFinished: " + str(properties))
4432 self.done = True
4433 self.loop.quit()
4434
4435 def run_test(self, *args):
4436 logger.debug("run_test")
4437 p2p.Listen(10)
4438 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4439 if not dev1.discover_peer(addr0):
4440 raise Exception("Peer not found")
4441 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 enter")
4442 return False
4443
4444 def success(self):
4445 return self.done
4446
4447 with TestDbusP2p(bus) as t:
4448 if not t.success():
4449 raise Exception("Expected signals not seen")
4450
4451def test_dbus_p2p_go_neg_auth(dev, apdev):
4452 """D-Bus P2P GO Negotiation authorized"""
910eb5b5 4453 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
4454 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4455 addr0 = dev[0].p2p_dev_addr()
4456 dev[1].p2p_listen()
4457
4458 class TestDbusP2p(TestDbus):
4459 def __init__(self, bus):
4460 TestDbus.__init__(self, bus)
4461 self.done = False
4462 self.peer_joined = False
4463 self.peer_disconnected = False
4464
4465 def __enter__(self):
4466 gobject.timeout_add(1, self.run_test)
4467 gobject.timeout_add(15000, self.timeout)
4468 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4469 "DeviceFound")
4470 self.add_signal(self.goNegotiationSuccess,
4471 WPAS_DBUS_IFACE_P2PDEVICE,
4472 "GONegotiationSuccess",
4473 byte_arrays=True)
4474 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4475 "GroupStarted")
4476 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4477 "GroupFinished")
4478 self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE,
4479 "StaDeauthorized")
4480 self.add_signal(self.peerJoined, WPAS_DBUS_GROUP,
4481 "PeerJoined")
4482 self.add_signal(self.peerDisconnected, WPAS_DBUS_GROUP,
4483 "PeerDisconnected")
4484 self.loop.run()
4485 return self
4486
4487 def deviceFound(self, path):
4488 logger.debug("deviceFound: path=%s" % path)
4489 args = { 'peer': path, 'wps_method': 'keypad',
4490 'go_intent': 15, 'authorize_only': True }
4491 try:
4492 p2p.Connect(args)
4493 raise Exception("Invalid Connect accepted")
4494 except dbus.exceptions.DBusException, e:
4495 if "InvalidArgs" not in str(e):
4496 raise Exception("Unexpected error message for invalid Connect: " + str(e))
4497
4498 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4499 'go_intent': 15, 'authorize_only': True }
4500 p2p.Connect(args)
4501 p2p.Listen(10)
4502 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4503 if not dev1.discover_peer(addr0):
4504 raise Exception("Peer not found")
4505 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=0")
bc6e3288 4506 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
2ec82e67
JM
4507 if ev is None:
4508 raise Exception("Group formation timed out")
cb346b49 4509 self.sta_group_ev = ev
2ec82e67
JM
4510
4511 def goNegotiationSuccess(self, properties):
4512 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4513
4514 def groupStarted(self, properties):
4515 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
4516 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4517 properties['interface_object'])
2ec82e67 4518 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49 4519 dev1.group_form_result(self.sta_group_ev)
2ec82e67
JM
4520 dev1.remove_group()
4521
4522 def staDeauthorized(self, name):
4523 logger.debug("staDeuthorized: " + name)
cb346b49
JM
4524 group_p2p = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4525 group_p2p.Disconnect()
2ec82e67
JM
4526
4527 def peerJoined(self, peer):
4528 logger.debug("peerJoined: " + peer)
4529 self.peer_joined = True
4530
4531 def peerDisconnected(self, peer):
4532 logger.debug("peerDisconnected: " + peer)
4533 self.peer_disconnected = True
4534
4535 def groupFinished(self, properties):
4536 logger.debug("groupFinished: " + str(properties))
4537 self.done = True
4538 self.loop.quit()
4539
4540 def run_test(self, *args):
4541 logger.debug("run_test")
4542 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4543 return False
4544
4545 def success(self):
4546 return self.done and self.peer_joined and self.peer_disconnected
4547
4548 with TestDbusP2p(bus) as t:
4549 if not t.success():
4550 raise Exception("Expected signals not seen")
4551
4552def test_dbus_p2p_go_neg_init(dev, apdev):
4553 """D-Bus P2P GO Negotiation initiation"""
910eb5b5 4554 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
4555 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4556 addr0 = dev[0].p2p_dev_addr()
4557 dev[1].p2p_listen()
4558
4559 class TestDbusP2p(TestDbus):
4560 def __init__(self, bus):
4561 TestDbus.__init__(self, bus)
4562 self.done = False
20fd8def
JM
4563 self.peer_group_added = False
4564 self.peer_group_removed = False
2ec82e67
JM
4565
4566 def __enter__(self):
4567 gobject.timeout_add(1, self.run_test)
4568 gobject.timeout_add(15000, self.timeout)
4569 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4570 "DeviceFound")
4571 self.add_signal(self.goNegotiationSuccess,
4572 WPAS_DBUS_IFACE_P2PDEVICE,
4573 "GONegotiationSuccess",
4574 byte_arrays=True)
4575 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4576 "GroupStarted")
4577 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4578 "GroupFinished")
20fd8def
JM
4579 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4580 "PropertiesChanged")
2ec82e67
JM
4581 self.loop.run()
4582 return self
4583
4584 def deviceFound(self, path):
4585 logger.debug("deviceFound: path=%s" % path)
4586 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4587 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4588 'go_intent': 0 }
4589 p2p.Connect(args)
4590
4591 ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4592 if ev is None:
4593 raise Exception("Timeout while waiting for GO Neg Request")
4594 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
bc6e3288 4595 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
2ec82e67
JM
4596 if ev is None:
4597 raise Exception("Group formation timed out")
cb346b49 4598 self.sta_group_ev = ev
2ec82e67
JM
4599
4600 def goNegotiationSuccess(self, properties):
4601 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4602
4603 def groupStarted(self, properties):
4604 logger.debug("groupStarted: " + str(properties))
cb346b49
JM
4605 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4606 properties['interface_object'])
4607 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4608 group_p2p.Disconnect()
2ec82e67 4609 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49 4610 dev1.group_form_result(self.sta_group_ev)
2ec82e67
JM
4611 dev1.remove_group()
4612
4613 def groupFinished(self, properties):
4614 logger.debug("groupFinished: " + str(properties))
4615 self.done = True
20fd8def
JM
4616
4617 def propertiesChanged(self, interface_name, changed_properties,
4618 invalidated_properties):
4619 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4620 if interface_name != WPAS_DBUS_P2P_PEER:
4621 return
4622 if "Groups" not in changed_properties:
4623 return
4624 if len(changed_properties["Groups"]) > 0:
4625 self.peer_group_added = True
4626 if len(changed_properties["Groups"]) == 0:
3301e925
JM
4627 if not self.peer_group_added:
4628 # This is likely a leftover event from an earlier test case,
4629 # ignore it to allow this test case to go through its steps.
4630 logger.info("Ignore propertiesChanged indicating group removal before group has been added")
4631 return
20fd8def
JM
4632 self.peer_group_removed = True
4633 self.loop.quit()
2ec82e67
JM
4634
4635 def run_test(self, *args):
4636 logger.debug("run_test")
4637 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4638 return False
4639
4640 def success(self):
20fd8def
JM
4641 return self.done and self.peer_group_added and self.peer_group_removed
4642
4643 with TestDbusP2p(bus) as t:
4644 if not t.success():
4645 raise Exception("Expected signals not seen")
4646
4647def test_dbus_p2p_group_termination_by_go(dev, apdev):
4648 """D-Bus P2P group removal on GO terminating the group"""
4649 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4650 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4651 addr0 = dev[0].p2p_dev_addr()
4652 dev[1].p2p_listen()
4653
4654 class TestDbusP2p(TestDbus):
4655 def __init__(self, bus):
4656 TestDbus.__init__(self, bus)
4657 self.done = False
4658 self.peer_group_added = False
4659 self.peer_group_removed = False
4660
4661 def __enter__(self):
4662 gobject.timeout_add(1, self.run_test)
4663 gobject.timeout_add(15000, self.timeout)
4664 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4665 "DeviceFound")
4666 self.add_signal(self.goNegotiationSuccess,
4667 WPAS_DBUS_IFACE_P2PDEVICE,
4668 "GONegotiationSuccess",
4669 byte_arrays=True)
4670 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4671 "GroupStarted")
4672 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4673 "GroupFinished")
4674 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4675 "PropertiesChanged")
4676 self.loop.run()
4677 return self
4678
4679 def deviceFound(self, path):
4680 logger.debug("deviceFound: path=%s" % path)
4681 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4682 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4683 'go_intent': 0 }
4684 p2p.Connect(args)
4685
4686 ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4687 if ev is None:
4688 raise Exception("Timeout while waiting for GO Neg Request")
4689 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
bc6e3288 4690 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
20fd8def
JM
4691 if ev is None:
4692 raise Exception("Group formation timed out")
4693 self.sta_group_ev = ev
4694
4695 def goNegotiationSuccess(self, properties):
4696 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4697
4698 def groupStarted(self, properties):
4699 logger.debug("groupStarted: " + str(properties))
4700 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4701 properties['interface_object'])
4702 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4703 dev1.group_form_result(self.sta_group_ev)
4704 dev1.remove_group()
4705
4706 def groupFinished(self, properties):
4707 logger.debug("groupFinished: " + str(properties))
4708 self.done = True
4709
4710 def propertiesChanged(self, interface_name, changed_properties,
4711 invalidated_properties):
4712 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4713 if interface_name != WPAS_DBUS_P2P_PEER:
4714 return
4715 if "Groups" not in changed_properties:
4716 return
4717 if len(changed_properties["Groups"]) > 0:
4718 self.peer_group_added = True
6fad40df 4719 if len(changed_properties["Groups"]) == 0 and self.peer_group_added:
20fd8def
JM
4720 self.peer_group_removed = True
4721 self.loop.quit()
4722
4723 def run_test(self, *args):
4724 logger.debug("run_test")
4725 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4726 return False
4727
4728 def success(self):
4729 return self.done and self.peer_group_added and self.peer_group_removed
4730
4731 with TestDbusP2p(bus) as t:
4732 if not t.success():
4733 raise Exception("Expected signals not seen")
4734
4735def test_dbus_p2p_group_idle_timeout(dev, apdev):
4736 """D-Bus P2P group removal on idle timeout"""
4737 try:
4738 dev[0].global_request("SET p2p_group_idle 1")
4739 _test_dbus_p2p_group_idle_timeout(dev, apdev)
4740 finally:
4741 dev[0].global_request("SET p2p_group_idle 0")
4742
4743def _test_dbus_p2p_group_idle_timeout(dev, apdev):
4744 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4745 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4746 addr0 = dev[0].p2p_dev_addr()
4747 dev[1].p2p_listen()
4748
4749 class TestDbusP2p(TestDbus):
4750 def __init__(self, bus):
4751 TestDbus.__init__(self, bus)
4752 self.done = False
845d48c1 4753 self.group_started = False
20fd8def
JM
4754 self.peer_group_added = False
4755 self.peer_group_removed = False
4756
4757 def __enter__(self):
4758 gobject.timeout_add(1, self.run_test)
4759 gobject.timeout_add(15000, self.timeout)
4760 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4761 "DeviceFound")
4762 self.add_signal(self.goNegotiationSuccess,
4763 WPAS_DBUS_IFACE_P2PDEVICE,
4764 "GONegotiationSuccess",
4765 byte_arrays=True)
4766 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4767 "GroupStarted")
4768 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4769 "GroupFinished")
4770 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4771 "PropertiesChanged")
4772 self.loop.run()
4773 return self
4774
4775 def deviceFound(self, path):
4776 logger.debug("deviceFound: path=%s" % path)
4777 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4778 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4779 'go_intent': 0 }
4780 p2p.Connect(args)
4781
4782 ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4783 if ev is None:
4784 raise Exception("Timeout while waiting for GO Neg Request")
4785 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
bc6e3288 4786 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
20fd8def
JM
4787 if ev is None:
4788 raise Exception("Group formation timed out")
4789 self.sta_group_ev = ev
4790
4791 def goNegotiationSuccess(self, properties):
4792 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4793
4794 def groupStarted(self, properties):
4795 logger.debug("groupStarted: " + str(properties))
845d48c1 4796 self.group_started = True
20fd8def
JM
4797 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4798 properties['interface_object'])
4799 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4800 dev1.group_form_result(self.sta_group_ev)
4801 ifaddr = dev1.group_request("STA-FIRST").splitlines()[0]
4802 # Force disassociation with different reason code so that the
4803 # P2P Client using D-Bus does not get normal group termination event
4804 # from the GO.
4805 dev1.group_request("DEAUTHENTICATE " + ifaddr + " reason=0 test=0")
4806 dev1.remove_group()
4807
4808 def groupFinished(self, properties):
4809 logger.debug("groupFinished: " + str(properties))
4810 self.done = True
4811
4812 def propertiesChanged(self, interface_name, changed_properties,
4813 invalidated_properties):
4814 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4815 if interface_name != WPAS_DBUS_P2P_PEER:
4816 return
845d48c1
JM
4817 if not self.group_started:
4818 return
20fd8def
JM
4819 if "Groups" not in changed_properties:
4820 return
4821 if len(changed_properties["Groups"]) > 0:
4822 self.peer_group_added = True
4823 if len(changed_properties["Groups"]) == 0:
4824 self.peer_group_removed = True
4825 self.loop.quit()
4826
4827 def run_test(self, *args):
4828 logger.debug("run_test")
4829 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4830 return False
4831
4832 def success(self):
4833 return self.done and self.peer_group_added and self.peer_group_removed
2ec82e67
JM
4834
4835 with TestDbusP2p(bus) as t:
4836 if not t.success():
4837 raise Exception("Expected signals not seen")
4838
4839def test_dbus_p2p_wps_failure(dev, apdev):
4840 """D-Bus P2P WPS failure"""
910eb5b5 4841 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
4842 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4843 addr0 = dev[0].p2p_dev_addr()
4844
4845 class TestDbusP2p(TestDbus):
4846 def __init__(self, bus):
4847 TestDbus.__init__(self, bus)
084780f1
JM
4848 self.wps_failed = False
4849 self.formation_failure = False
2ec82e67
JM
4850
4851 def __enter__(self):
4852 gobject.timeout_add(1, self.run_test)
4853 gobject.timeout_add(15000, self.timeout)
4854 self.add_signal(self.goNegotiationRequest,
4855 WPAS_DBUS_IFACE_P2PDEVICE,
4856 "GONegotiationRequest",
4857 byte_arrays=True)
4858 self.add_signal(self.goNegotiationSuccess,
4859 WPAS_DBUS_IFACE_P2PDEVICE,
4860 "GONegotiationSuccess",
4861 byte_arrays=True)
4862 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4863 "GroupStarted")
4864 self.add_signal(self.wpsFailed, WPAS_DBUS_IFACE_P2PDEVICE,
4865 "WpsFailed")
084780f1
JM
4866 self.add_signal(self.groupFormationFailure,
4867 WPAS_DBUS_IFACE_P2PDEVICE,
4868 "GroupFormationFailure")
2ec82e67
JM
4869 self.loop.run()
4870 return self
4871
f5d5161d
JM
4872 def goNegotiationRequest(self, path, dev_passwd_id, go_intent=0):
4873 logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path, dev_passwd_id, go_intent))
2ec82e67
JM
4874 if dev_passwd_id != 1:
4875 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id)
4876 args = { 'peer': path, 'wps_method': 'display', 'pin': '12345670',
4877 'go_intent': 15 }
4878 p2p.Connect(args)
4879
4880 def goNegotiationSuccess(self, properties):
4881 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4882
4883 def groupStarted(self, properties):
4884 logger.debug("groupStarted: " + str(properties))
4885 raise Exception("Unexpected GroupStarted")
4886
4887 def wpsFailed(self, name, args):
4888 logger.debug("wpsFailed - name=%s args=%s" % (name, str(args)))
084780f1
JM
4889 self.wps_failed = True
4890 if self.formation_failure:
4891 self.loop.quit()
4892
4893 def groupFormationFailure(self, reason):
4894 logger.debug("groupFormationFailure - reason=%s" % reason)
4895 self.formation_failure = True
4896 if self.wps_failed:
4897 self.loop.quit()
2ec82e67
JM
4898
4899 def run_test(self, *args):
4900 logger.debug("run_test")
4901 p2p.Listen(10)
4902 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4903 if not dev1.discover_peer(addr0):
4904 raise Exception("Peer not found")
4905 dev1.global_request("P2P_CONNECT " + addr0 + " 87654321 enter")
4906 return False
4907
4908 def success(self):
084780f1 4909 return self.wps_failed and self.formation_failure
2ec82e67
JM
4910
4911 with TestDbusP2p(bus) as t:
4912 if not t.success():
4913 raise Exception("Expected signals not seen")
4914
4915def test_dbus_p2p_two_groups(dev, apdev):
4916 """D-Bus P2P with two concurrent groups"""
910eb5b5 4917 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2ec82e67
JM
4918 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4919
4920 dev[0].request("SET p2p_no_group_iface 0")
4921 addr0 = dev[0].p2p_dev_addr()
4922 addr1 = dev[1].p2p_dev_addr()
4923 addr2 = dev[2].p2p_dev_addr()
4924 dev[1].p2p_start_go(freq=2412)
cb346b49 4925 dev1_group_ifname = dev[1].group_ifname
2ec82e67
JM
4926
4927 class TestDbusP2p(TestDbus):
4928 def __init__(self, bus):
4929 TestDbus.__init__(self, bus)
4930 self.done = False
4931 self.peer = None
4932 self.go = None
4933 self.group1 = None
4934 self.group2 = None
5a217649 4935 self.groups_removed = False
2ec82e67
JM
4936
4937 def __enter__(self):
4938 gobject.timeout_add(1, self.run_test)
4939 gobject.timeout_add(15000, self.timeout)
4940 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4941 "PropertiesChanged", byte_arrays=True)
4942 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4943 "DeviceFound")
4944 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4945 "GroupStarted")
4946 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4947 "GroupFinished")
4948 self.add_signal(self.peerJoined, WPAS_DBUS_GROUP,
4949 "PeerJoined")
4950 self.loop.run()
4951 return self
4952
4953 def propertiesChanged(self, interface_name, changed_properties,
4954 invalidated_properties):
4955 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4956
4957 def deviceFound(self, path):
4958 logger.debug("deviceFound: path=%s" % path)
4959 if addr2.replace(':','') in path:
4960 self.peer = path
4961 elif addr1.replace(':','') in path:
4962 self.go = path
4963 if self.go and not self.group1:
4964 logger.info("Join the group")
4965 p2p.StopFind()
4966 pin = '12345670'
4967 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
cb346b49
JM
4968 dev1.group_ifname = dev1_group_ifname
4969 dev1.group_request("WPS_PIN any " + pin)
2ec82e67
JM
4970 args = { 'peer': self.go,
4971 'join': True,
4972 'wps_method': 'pin',
4973 'pin': pin,
4974 'frequency': 2412 }
4975 p2p.Connect(args)
4976
4977 def groupStarted(self, properties):
4978 logger.debug("groupStarted: " + str(properties))
4979 prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
4980 dbus_interface=dbus.PROPERTIES_IFACE)
4981 logger.debug("p2pdevice properties: " + str(prop))
4982
4983 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
4984 properties['group_object'])
4985 res = g_obj.GetAll(WPAS_DBUS_GROUP,
4986 dbus_interface=dbus.PROPERTIES_IFACE,
4987 byte_arrays=True)
4988 logger.debug("Group properties: " + str(res))
4989
4990 if not self.group1:
4991 self.group1 = properties['group_object']
4992 self.group1iface = properties['interface_object']
4993 self.g1_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4994 self.group1iface)
4995
4996 logger.info("Start autonomous GO")
4997 params = dbus.Dictionary({ 'frequency': 2412 })
4998 p2p.GroupAdd(params)
4999 elif not self.group2:
5000 self.group2 = properties['group_object']
5001 self.group2iface = properties['interface_object']
5002 self.g2_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
5003 self.group2iface)
5004 self.g2_bssid = res['BSSID']
5005
5006 if self.group1 and self.group2:
5007 logger.info("Authorize peer to join the group")
5008 a2 = binascii.unhexlify(addr2.replace(':',''))
5009 params = { 'Role': 'enrollee',
5010 'P2PDeviceAddress': dbus.ByteArray(a2),
5011 'Bssid': dbus.ByteArray(a2),
5012 'Type': 'pin',
5013 'Pin': '12345670' }
5014 g_wps = dbus.Interface(self.g2_if_obj, WPAS_DBUS_IFACE_WPS)
5015 g_wps.Start(params)
5016
5017 bssid = ':'.join([binascii.hexlify(l) for l in self.g2_bssid])
5018 dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
5019 dev2.scan_for_bss(bssid, freq=2412)
5020 dev2.global_request("P2P_CONNECT " + bssid + " 12345670 join freq=2412")
bc6e3288 5021 ev = dev2.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
cb346b49
JM
5022 if ev is None:
5023 raise Exception("Group join timed out")
5024 self.dev2_group_ev = ev
2ec82e67
JM
5025
5026 def groupFinished(self, properties):
5027 logger.debug("groupFinished: " + str(properties))
5028
5029 if self.group1 == properties['group_object']:
5030 self.group1 = None
5031 elif self.group2 == properties['group_object']:
5032 self.group2 = None
5033
5034 if not self.group1 and not self.group2:
5035 self.done = True
5036 self.loop.quit()
5037
5038 def peerJoined(self, peer):
5039 logger.debug("peerJoined: " + peer)
5a217649
JM
5040 if self.groups_removed:
5041 return
2ec82e67
JM
5042 self.check_results()
5043
5044 dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
cb346b49 5045 dev2.group_form_result(self.dev2_group_ev)
2ec82e67
JM
5046 dev2.remove_group()
5047
5048 logger.info("Disconnect group2")
5049 group_p2p = dbus.Interface(self.g2_if_obj,
5050 WPAS_DBUS_IFACE_P2PDEVICE)
5051 group_p2p.Disconnect()
5052
5053 logger.info("Disconnect group1")
5054 group_p2p = dbus.Interface(self.g1_if_obj,
5055 WPAS_DBUS_IFACE_P2PDEVICE)
5056 group_p2p.Disconnect()
5a217649 5057 self.groups_removed = True
2ec82e67
JM
5058
5059 def check_results(self):
5060 logger.info("Check results with two concurrent groups in operation")
5061
5062 g1_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group1)
5063 res1 = g1_obj.GetAll(WPAS_DBUS_GROUP,
5064 dbus_interface=dbus.PROPERTIES_IFACE,
5065 byte_arrays=True)
5066
5067 g2_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group2)
5068 res2 = g2_obj.GetAll(WPAS_DBUS_GROUP,
5069 dbus_interface=dbus.PROPERTIES_IFACE,
5070 byte_arrays=True)
5071
5072 logger.info("group1 = " + self.group1)
5073 logger.debug("Group properties: " + str(res1))
5074
5075 logger.info("group2 = " + self.group2)
5076 logger.debug("Group properties: " + str(res2))
5077
5078 prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
5079 dbus_interface=dbus.PROPERTIES_IFACE)
5080 logger.debug("p2pdevice properties: " + str(prop))
5081
5082 if res1['Role'] != 'client':
5083 raise Exception("Group1 role reported incorrectly: " + res1['Role'])
5084 if res2['Role'] != 'GO':
5085 raise Exception("Group2 role reported incorrectly: " + res2['Role'])
5086 if prop['Role'] != 'device':
5087 raise Exception("p2pdevice role reported incorrectly: " + prop['Role'])
5088
5089 if len(res2['Members']) != 1:
5090 raise Exception("Unexpected Members value for group 2")
5091
5092 def run_test(self, *args):
5093 logger.debug("run_test")
5094 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
5095 return False
5096
5097 def success(self):
5098 return self.done
5099
5100 with TestDbusP2p(bus) as t:
5101 if not t.success():
5102 raise Exception("Expected signals not seen")
5103
5104 dev[1].remove_group()
0e126c6d 5105
f572ae80
JM
5106def test_dbus_p2p_cancel(dev, apdev):
5107 """D-Bus P2P Cancel"""
5108 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5109 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
5110 try:
5111 p2p.Cancel()
5112 raise Exception("Unexpected p2p.Cancel() success")
5113 except dbus.exceptions.DBusException, e:
5114 pass
5115
5116 addr0 = dev[0].p2p_dev_addr()
5117 dev[1].p2p_listen()
5118
5119 class TestDbusP2p(TestDbus):
5120 def __init__(self, bus):
5121 TestDbus.__init__(self, bus)
5122 self.done = False
5123
5124 def __enter__(self):
5125 gobject.timeout_add(1, self.run_test)
5126 gobject.timeout_add(15000, self.timeout)
5127 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
5128 "DeviceFound")
5129 self.loop.run()
5130 return self
5131
5132 def deviceFound(self, path):
5133 logger.debug("deviceFound: path=%s" % path)
5134 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
5135 'go_intent': 0 }
5136 p2p.Connect(args)
5137 p2p.Cancel()
5138 self.done = True
5139 self.loop.quit()
5140
5141 def run_test(self, *args):
5142 logger.debug("run_test")
5143 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
5144 return False
5145
5146 def success(self):
5147 return self.done
5148
5149 with TestDbusP2p(bus) as t:
5150 if not t.success():
5151 raise Exception("Expected signals not seen")
5152
afe28053
JM
5153def test_dbus_p2p_ip_addr(dev, apdev):
5154 """D-Bus P2P and IP address parameters"""
5155 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5156 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
5157
5158 vals = [ ("IpAddrGo", "192.168.43.1"),
5159 ("IpAddrMask", "255.255.255.0"),
5160 ("IpAddrStart", "192.168.43.100"),
5161 ("IpAddrEnd", "192.168.43.199") ]
5162 for field, value in vals:
5163 if_obj.Set(WPAS_DBUS_IFACE, field, value,
5164 dbus_interface=dbus.PROPERTIES_IFACE)
5165 val = if_obj.Get(WPAS_DBUS_IFACE, field,
5166 dbus_interface=dbus.PROPERTIES_IFACE)
5167 if val != value:
5168 raise Exception("Unexpected %s value: %s" % (field, val))
5169
5170 set_ip_addr_info(dev[1])
5171
5172 dev[0].global_request("SET p2p_go_intent 0")
5173
5174 req = dev[0].global_request("NFC_GET_HANDOVER_REQ NDEF P2P-CR").rstrip()
5175 if "FAIL" in req:
5176 raise Exception("Failed to generate NFC connection handover request")
5177 sel = dev[1].global_request("NFC_GET_HANDOVER_SEL NDEF P2P-CR").rstrip()
5178 if "FAIL" in sel:
5179 raise Exception("Failed to generate NFC connection handover select")
5180 dev[0].dump_monitor()
5181 dev[1].dump_monitor()
5182 res = dev[1].global_request("NFC_REPORT_HANDOVER RESP P2P " + req + " " + sel)
5183 if "FAIL" in res:
5184 raise Exception("Failed to report NFC connection handover to wpa_supplicant(resp)")
5185 res = dev[0].global_request("NFC_REPORT_HANDOVER INIT P2P " + req + " " + sel)
5186 if "FAIL" in res:
5187 raise Exception("Failed to report NFC connection handover to wpa_supplicant(init)")
5188
5189 class TestDbusP2p(TestDbus):
5190 def __init__(self, bus):
5191 TestDbus.__init__(self, bus)
5192 self.done = False
5193
5194 def __enter__(self):
5195 gobject.timeout_add(1, self.run_test)
5196 gobject.timeout_add(15000, self.timeout)
5197 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
5198 "GroupStarted")
5199 self.loop.run()
5200 return self
5201
5202 def groupStarted(self, properties):
5203 logger.debug("groupStarted: " + str(properties))
5204 self.loop.quit()
5205
5206 if 'IpAddrGo' not in properties:
5207 logger.info("IpAddrGo missing from GroupStarted")
5208 ip_addr_go = properties['IpAddrGo']
5209 addr = "%d.%d.%d.%d" % (ip_addr_go[0], ip_addr_go[1], ip_addr_go[2], ip_addr_go[3])
5210 if addr != "192.168.42.1":
5211 logger.info("Unexpected IpAddrGo value: " + addr)
5212 self.done = True
5213
5214 def run_test(self, *args):
5215 logger.debug("run_test")
5216 return False
5217
5218 def success(self):
5219 return self.done
5220
5221 with TestDbusP2p(bus) as t:
5222 if not t.success():
5223 raise Exception("Expected signals not seen")
5224
0e126c6d
JM
5225def test_dbus_introspect(dev, apdev):
5226 """D-Bus introspection"""
5227 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5228
5229 res = if_obj.Introspect(WPAS_DBUS_IFACE,
5230 dbus_interface=dbus.INTROSPECTABLE_IFACE)
5231 logger.info("Initial Introspect: " + str(res))
5232 if res is None or "Introspectable" not in res or "GroupStarted" not in res:
5233 raise Exception("Unexpected initial Introspect response: " + str(res))
f0ef6889
DW
5234 if "FastReauth" not in res or "PassiveScan" not in res:
5235 raise Exception("Unexpected initial Introspect response: " + str(res))
0e126c6d
JM
5236
5237 with alloc_fail(dev[0], 1, "wpa_dbus_introspect"):
5238 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
5239 dbus_interface=dbus.INTROSPECTABLE_IFACE)
5240 logger.info("Introspect: " + str(res2))
5241 if res2 is not None:
5242 raise Exception("Unexpected Introspect response")
5243
5244 with alloc_fail(dev[0], 1, "=add_interface;wpa_dbus_introspect"):
5245 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
5246 dbus_interface=dbus.INTROSPECTABLE_IFACE)
5247 logger.info("Introspect: " + str(res2))
5248 if res2 is None:
5249 raise Exception("No Introspect response")
5250 if len(res2) >= len(res):
5251 raise Exception("Unexpected Introspect response")
5252
5253 with alloc_fail(dev[0], 1, "wpabuf_alloc;add_interface;wpa_dbus_introspect"):
5254 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
5255 dbus_interface=dbus.INTROSPECTABLE_IFACE)
5256 logger.info("Introspect: " + str(res2))
5257 if res2 is None:
5258 raise Exception("No Introspect response")
5259 if len(res2) >= len(res):
5260 raise Exception("Unexpected Introspect response")
5261
5262 with alloc_fail(dev[0], 2, "=add_interface;wpa_dbus_introspect"):
5263 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
5264 dbus_interface=dbus.INTROSPECTABLE_IFACE)
5265 logger.info("Introspect: " + str(res2))
5266 if res2 is None:
5267 raise Exception("No Introspect response")
5268 if len(res2) >= len(res):
5269 raise Exception("Unexpected Introspect response")
9bbce257
JM
5270
5271def test_dbus_ap(dev, apdev):
5272 """D-Bus AddNetwork for AP mode"""
5273 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5274 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5275
5276 ssid = "test-wpa2-psk"
5277 passphrase = 'qwertyuiop'
5278
5279 class TestDbusConnect(TestDbus):
5280 def __init__(self, bus):
5281 TestDbus.__init__(self, bus)
5282 self.started = False
5283
5284 def __enter__(self):
5285 gobject.timeout_add(1, self.run_connect)
5286 gobject.timeout_add(15000, self.timeout)
5287 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
5288 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
5289 "NetworkSelected")
5290 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
5291 "PropertiesChanged")
5292 self.loop.run()
5293 return self
5294
5295 def networkAdded(self, network, properties):
5296 logger.debug("networkAdded: %s" % str(network))
5297 logger.debug(str(properties))
5298
5299 def networkSelected(self, network):
5300 logger.debug("networkSelected: %s" % str(network))
5301 self.network_selected = True
5302
5303 def propertiesChanged(self, properties):
5304 logger.debug("propertiesChanged: %s" % str(properties))
5305 if 'State' in properties and properties['State'] == "completed":
5306 self.started = True
5307 self.loop.quit()
5308
5309 def run_connect(self, *args):
5310 logger.debug("run_connect")
5311 args = dbus.Dictionary({ 'ssid': ssid,
5312 'key_mgmt': 'WPA-PSK',
5313 'psk': passphrase,
5314 'mode': 2,
5315 'frequency': 2412 },
5316 signature='sv')
5317 self.netw = iface.AddNetwork(args)
5318 iface.SelectNetwork(self.netw)
5319 return False
5320
5321 def success(self):
5322 return self.started
5323
5324 with TestDbusConnect(bus) as t:
5325 if not t.success():
5326 raise Exception("Expected signals not seen")
5327 dev[1].connect(ssid, psk=passphrase, scan_freq="2412")
d9e2a057
JM
5328
5329def test_dbus_connect_wpa_eap(dev, apdev):
5330 """D-Bus AddNetwork and connection with WPA+WPA2-Enterprise AP"""
5331 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5332 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5333
5334 ssid = "test-wpa-eap"
5335 params = hostapd.wpa_eap_params(ssid=ssid)
5336 params["wpa"] = "3"
5337 params["rsn_pairwise"] = "CCMP"
8b8a1864 5338 hapd = hostapd.add_ap(apdev[0], params)
d9e2a057
JM
5339
5340 class TestDbusConnect(TestDbus):
5341 def __init__(self, bus):
5342 TestDbus.__init__(self, bus)
5343 self.done = False
5344
5345 def __enter__(self):
5346 gobject.timeout_add(1, self.run_connect)
5347 gobject.timeout_add(15000, self.timeout)
5348 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
5349 "PropertiesChanged")
5350 self.add_signal(self.eap, WPAS_DBUS_IFACE, "EAP")
5351 self.loop.run()
5352 return self
5353
5354 def propertiesChanged(self, properties):
5355 logger.debug("propertiesChanged: %s" % str(properties))
5356 if 'State' in properties and properties['State'] == "completed":
5357 self.done = True
5358 self.loop.quit()
5359
5360 def eap(self, status, parameter):
5361 logger.debug("EAP: status=%s parameter=%s" % (status, parameter))
5362
5363 def run_connect(self, *args):
5364 logger.debug("run_connect")
5365 args = dbus.Dictionary({ 'ssid': ssid,
5366 'key_mgmt': 'WPA-EAP',
5367 'eap': 'PEAP',
5368 'identity': 'user',
5369 'password': 'password',
5370 'ca_cert': 'auth_serv/ca.pem',
5371 'phase2': 'auth=MSCHAPV2',
5372 'scan_freq': 2412 },
5373 signature='sv')
5374 self.netw = iface.AddNetwork(args)
5375 iface.SelectNetwork(self.netw)
5376 return False
5377
5378 def success(self):
5379 return self.done
5380
5381 with TestDbusConnect(bus) as t:
5382 if not t.success():
5383 raise Exception("Expected signals not seen")
708ec753
JM
5384
5385def test_dbus_ap_scan_2_ap_mode_scan(dev, apdev):
5386 """AP_SCAN 2 AP mode and D-Bus Scan()"""
5387 try:
5388 _test_dbus_ap_scan_2_ap_mode_scan(dev, apdev)
5389 finally:
5390 dev[0].request("AP_SCAN 1")
5391
5392def _test_dbus_ap_scan_2_ap_mode_scan(dev, apdev):
5393 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5394 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5395
5396 if "OK" not in dev[0].request("AP_SCAN 2"):
5397 raise Exception("Failed to set AP_SCAN 2")
5398
5399 id = dev[0].add_network()
5400 dev[0].set_network(id, "mode", "2")
5401 dev[0].set_network_quoted(id, "ssid", "wpas-ap-open")
5402 dev[0].set_network(id, "key_mgmt", "NONE")
5403 dev[0].set_network(id, "frequency", "2412")
5404 dev[0].set_network(id, "scan_freq", "2412")
5405 dev[0].set_network(id, "disabled", "0")
5406 dev[0].select_network(id)
5407 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=5)
5408 if ev is None:
5409 raise Exception("AP failed to start")
5410
5411 with fail_test(dev[0], 1, "wpa_driver_nl80211_scan"):
5412 iface.Scan({'Type': 'active',
5413 'AllowRoam': True,
5414 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
5415 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-FAILED",
5416 "AP-DISABLED"], timeout=5)
5417 if ev is None:
5418 raise Exception("CTRL-EVENT-SCAN-FAILED not seen")
5419 if "AP-DISABLED" in ev:
5420 raise Exception("Unexpected AP-DISABLED event")
5421 if "retry=1" in ev:
5422 # Wait for the retry to scan happen
5423 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-FAILED",
5424 "AP-DISABLED"], timeout=5)
5425 if ev is None:
5426 raise Exception("CTRL-EVENT-SCAN-FAILED not seen - retry")
5427 if "AP-DISABLED" in ev:
5428 raise Exception("Unexpected AP-DISABLED event - retry")
5429
5430 dev[1].connect("wpas-ap-open", key_mgmt="NONE", scan_freq="2412")
5431 dev[1].request("DISCONNECT")
5432 dev[1].wait_disconnected()
5433 dev[0].request("DISCONNECT")
5434 dev[0].wait_disconnected()
d679ab74
JM
5435
5436def test_dbus_expectdisconnect(dev, apdev):
5437 """D-Bus ExpectDisconnect"""
5438 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5439 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
5440
5441 params = { "ssid": "test-open" }
8b8a1864 5442 hapd = hostapd.add_ap(apdev[0], params)
d679ab74
JM
5443 dev[0].connect("test-open", key_mgmt="NONE", scan_freq="2412")
5444
5445 # This does not really verify the behavior other than by going through the
5446 # code path for additional coverage.
5447 wpas.ExpectDisconnect()
5448 dev[0].request("DISCONNECT")
5449 dev[0].wait_disconnected()
2299b543
JM
5450
5451def test_dbus_save_config(dev, apdev):
5452 """D-Bus SaveConfig"""
5453 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5454 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5455 try:
5456 iface.SaveConfig()
5457 raise Exception("SaveConfig() accepted unexpectedly")
5458 except dbus.exceptions.DBusException, e:
5459 if not str(e).startswith("fi.w1.wpa_supplicant1.UnknownError: Not allowed to update configuration"):
5460 raise Exception("Unexpected error message for SaveConfig(): " + str(e))
ce96e65c
JM
5461
5462def test_dbus_vendor_elem(dev, apdev):
5463 """D-Bus vendor element operations"""
5464 try:
5465 _test_dbus_vendor_elem(dev, apdev)
5466 finally:
5467 dev[0].request("VENDOR_ELEM_REMOVE 1 *")
5468
5469def _test_dbus_vendor_elem(dev, apdev):
5470 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5471 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5472
5473 dev[0].request("VENDOR_ELEM_REMOVE 1 *")
5474
5475 try:
5476 ie = dbus.ByteArray("\x00\x00")
5477 iface.VendorElemAdd(-1, ie)
5478 raise Exception("Invalid VendorElemAdd() accepted")
5479 except dbus.exceptions.DBusException, e:
5480 if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5481 raise Exception("Unexpected error message for invalid VendorElemAdd[1]: " + str(e))
5482
5483 try:
5484 ie = dbus.ByteArray("")
5485 iface.VendorElemAdd(1, ie)
5486 raise Exception("Invalid VendorElemAdd() accepted")
5487 except dbus.exceptions.DBusException, e:
5488 if "InvalidArgs" not in str(e) or "Invalid value" not in str(e):
5489 raise Exception("Unexpected error message for invalid VendorElemAdd[2]: " + str(e))
5490
5491 try:
5492 ie = dbus.ByteArray("\x00\x01")
5493 iface.VendorElemAdd(1, ie)
5494 raise Exception("Invalid VendorElemAdd() accepted")
5495 except dbus.exceptions.DBusException, e:
5496 if "InvalidArgs" not in str(e) or "Parse error" not in str(e):
5497 raise Exception("Unexpected error message for invalid VendorElemAdd[3]: " + str(e))
5498
5499 try:
5500 iface.VendorElemGet(-1)
5501 raise Exception("Invalid VendorElemGet() accepted")
5502 except dbus.exceptions.DBusException, e:
5503 if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5504 raise Exception("Unexpected error message for invalid VendorElemGet[1]: " + str(e))
5505
5506 try:
5507 iface.VendorElemGet(1)
5508 raise Exception("Invalid VendorElemGet() accepted")
5509 except dbus.exceptions.DBusException, e:
5510 if "InvalidArgs" not in str(e) or "ID value does not exist" not in str(e):
5511 raise Exception("Unexpected error message for invalid VendorElemGet[2]: " + str(e))
5512
5513 try:
5514 ie = dbus.ByteArray("\x00\x00")
5515 iface.VendorElemRem(-1, ie)
5516 raise Exception("Invalid VendorElemRemove() accepted")
5517 except dbus.exceptions.DBusException, e:
5518 if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5519 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5520
5521 try:
5522 ie = dbus.ByteArray("")
5523 iface.VendorElemRem(1, ie)
5524 raise Exception("Invalid VendorElemRemove() accepted")
5525 except dbus.exceptions.DBusException, e:
5526 if "InvalidArgs" not in str(e) or "Invalid value" not in str(e):
5527 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5528
5529 iface.VendorElemRem(1, "*")
5530
5531 ie = dbus.ByteArray("\x00\x01\x00")
5532 iface.VendorElemAdd(1, ie)
5533
5534 val = iface.VendorElemGet(1)
5535 if len(val) != len(ie):
5536 raise Exception("Unexpected VendorElemGet length")
5537 for i in range(len(val)):
5538 if val[i] != dbus.Byte(ie[i]):
5539 raise Exception("Unexpected VendorElemGet data")
5540
5541 ie2 = dbus.ByteArray("\xe0\x00")
5542 iface.VendorElemAdd(1, ie2)
5543
5544 ies = ie + ie2
5545 val = iface.VendorElemGet(1)
5546 if len(val) != len(ies):
5547 raise Exception("Unexpected VendorElemGet length[2]")
5548 for i in range(len(val)):
5549 if val[i] != dbus.Byte(ies[i]):
5550 raise Exception("Unexpected VendorElemGet data[2]")
5551
5552 try:
5553 test_ie = dbus.ByteArray("\x01\x01")
5554 iface.VendorElemRem(1, test_ie)
5555 raise Exception("Invalid VendorElemRemove() accepted")
5556 except dbus.exceptions.DBusException, e:
5557 if "InvalidArgs" not in str(e) or "Parse error" not in str(e):
5558 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5559
5560 iface.VendorElemRem(1, ie)
5561 val = iface.VendorElemGet(1)
5562 if len(val) != len(ie2):
5563 raise Exception("Unexpected VendorElemGet length[3]")
5564
5565 iface.VendorElemRem(1, "*")
5566 try:
5567 iface.VendorElemGet(1)
5568 raise Exception("Invalid VendorElemGet() accepted after removal")
5569 except dbus.exceptions.DBusException, e:
5570 if "InvalidArgs" not in str(e) or "ID value does not exist" not in str(e):
5571 raise Exception("Unexpected error message for invalid VendorElemGet after removal: " + str(e))
dbd183c7
JM
5572
5573def test_dbus_assoc_reject(dev, apdev):
5574 """D-Bus AssocStatusCode"""
5575 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5576 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5577
5578 ssid = "test-open"
5579 params = { "ssid": ssid,
5580 "max_listen_interval": "1" }
8b8a1864 5581 hapd = hostapd.add_ap(apdev[0], params)
dbd183c7
JM
5582
5583 class TestDbusConnect(TestDbus):
5584 def __init__(self, bus):
5585 TestDbus.__init__(self, bus)
5586 self.assoc_status_seen = False
5587 self.state = 0
5588
5589 def __enter__(self):
5590 gobject.timeout_add(1, self.run_connect)
5591 gobject.timeout_add(15000, self.timeout)
5592 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
5593 "PropertiesChanged")
5594 self.loop.run()
5595 return self
5596
5597 def propertiesChanged(self, properties):
5598 logger.debug("propertiesChanged: %s" % str(properties))
5599 if 'AssocStatusCode' in properties:
5600 status = properties['AssocStatusCode']
5601 if status != 51:
5602 logger.info("Unexpected status code: " + str(status))
5603 else:
5604 self.assoc_status_seen = True
5605 iface.Disconnect()
5606 self.loop.quit()
5607
5608 def run_connect(self, *args):
5609 args = dbus.Dictionary({ 'ssid': ssid,
5610 'key_mgmt': 'NONE',
5611 'scan_freq': 2412 },
5612 signature='sv')
5613 self.netw = iface.AddNetwork(args)
5614 iface.SelectNetwork(self.netw)
5615 return False
5616
5617 def success(self):
5618 return self.assoc_status_seen
5619
5620 with TestDbusConnect(bus) as t:
5621 if not t.success():
5622 raise Exception("Expected signals not seen")
504c7ffd
JM
5623
5624def test_dbus_mesh(dev, apdev):
5625 """D-Bus mesh"""
5626 check_mesh_support(dev[0])
5627 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5628 mesh = dbus.Interface(if_obj, WPAS_DBUS_IFACE_MESH)
5629
5630 add_open_mesh_network(dev[1])
5631 addr1 = dev[1].own_addr()
5632
5633 class TestDbusMesh(TestDbus):
5634 def __init__(self, bus):
5635 TestDbus.__init__(self, bus)
5636 self.done = False
5637
5638 def __enter__(self):
5639 gobject.timeout_add(1, self.run_test)
5640 gobject.timeout_add(15000, self.timeout)
5641 self.add_signal(self.meshGroupStarted, WPAS_DBUS_IFACE_MESH,
5642 "MeshGroupStarted")
5643 self.add_signal(self.meshGroupRemoved, WPAS_DBUS_IFACE_MESH,
5644 "MeshGroupRemoved")
5645 self.add_signal(self.meshPeerConnected, WPAS_DBUS_IFACE_MESH,
5646 "MeshPeerConnected")
5647 self.add_signal(self.meshPeerDisconnected, WPAS_DBUS_IFACE_MESH,
5648 "MeshPeerDisconnected")
5649 self.loop.run()
5650 return self
5651
5652 def meshGroupStarted(self, args):
5653 logger.debug("MeshGroupStarted: " + str(args))
5654
5655 def meshGroupRemoved(self, args):
5656 logger.debug("MeshGroupRemoved: " + str(args))
5657 self.done = True
5658 self.loop.quit()
5659
5660 def meshPeerConnected(self, args):
5661 logger.debug("MeshPeerConnected: " + str(args))
5662
5663 res = if_obj.Get(WPAS_DBUS_IFACE_MESH, 'MeshPeers',
5664 dbus_interface=dbus.PROPERTIES_IFACE,
5665 byte_arrays=True)
5666 logger.debug("MeshPeers: " + str(res))
5667 if len(res) != 1:
5668 raise Exception("Unexpected number of MeshPeer values")
5669 if binascii.hexlify(res[0]) != addr1.replace(':', ''):
5670 raise Exception("Unexpected peer address")
5671
5672 res = if_obj.Get(WPAS_DBUS_IFACE_MESH, 'MeshGroup',
5673 dbus_interface=dbus.PROPERTIES_IFACE,
5674 byte_arrays=True)
5675 logger.debug("MeshGroup: " + str(res))
5676 if res != "wpas-mesh-open":
5677 raise Exception("Unexpected MeshGroup")
5678 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
5679 dev1.mesh_group_remove()
5680
5681 def meshPeerDisconnected(self, args):
5682 logger.debug("MeshPeerDisconnected: " + str(args))
5683 dev0 = WpaSupplicant('wlan0', '/tmp/wpas-wlan0')
5684 dev0.mesh_group_remove()
5685
5686 def run_test(self, *args):
5687 logger.debug("run_test")
5688 dev0 = WpaSupplicant('wlan0', '/tmp/wpas-wlan0')
5689 add_open_mesh_network(dev0)
5690 return False
5691
5692 def success(self):
5693 return self.done
5694
5695 with TestDbusMesh(bus) as t:
5696 if not t.success():
5697 raise Exception("Expected signals not seen")