]> git.ipfire.org Git - thirdparty/hostap.git/blob - tests/hwsim/test_dbus.py
tests: D-Bus Get/Set Pmf
[thirdparty/hostap.git] / tests / hwsim / test_dbus.py
1 # wpa_supplicant D-Bus interface tests
2 # Copyright (c) 2014-2015, Jouni Malinen <j@w1.fi>
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6
7 import binascii
8 import logging
9 logger = logging.getLogger()
10 import subprocess
11 import time
12
13 try:
14 import gobject
15 import dbus
16 dbus_imported = True
17 except ImportError:
18 dbus_imported = False
19
20 import hostapd
21 from wpasupplicant import WpaSupplicant
22 from utils import HwsimSkip, alloc_fail, fail_test
23 from p2p_utils import *
24 from test_ap_tdls import connect_2sta_open
25 from test_ap_eap import check_altsubject_match_support
26 from test_nfc_p2p import set_ip_addr_info
27 from test_wpas_mesh import check_mesh_support, add_open_mesh_network
28
29 WPAS_DBUS_SERVICE = "fi.w1.wpa_supplicant1"
30 WPAS_DBUS_PATH = "/fi/w1/wpa_supplicant1"
31 WPAS_DBUS_IFACE = "fi.w1.wpa_supplicant1.Interface"
32 WPAS_DBUS_IFACE_WPS = WPAS_DBUS_IFACE + ".WPS"
33 WPAS_DBUS_NETWORK = "fi.w1.wpa_supplicant1.Network"
34 WPAS_DBUS_BSS = "fi.w1.wpa_supplicant1.BSS"
35 WPAS_DBUS_IFACE_P2PDEVICE = WPAS_DBUS_IFACE + ".P2PDevice"
36 WPAS_DBUS_P2P_PEER = "fi.w1.wpa_supplicant1.Peer"
37 WPAS_DBUS_GROUP = "fi.w1.wpa_supplicant1.Group"
38 WPAS_DBUS_PERSISTENT_GROUP = "fi.w1.wpa_supplicant1.PersistentGroup"
39 WPAS_DBUS_IFACE_MESH = WPAS_DBUS_IFACE + ".Mesh"
40
41 def prepare_dbus(dev):
42 if not dbus_imported:
43 logger.info("No dbus module available")
44 raise HwsimSkip("No dbus module available")
45 try:
46 from dbus.mainloop.glib import DBusGMainLoop
47 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
48 bus = dbus.SystemBus()
49 wpas_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH)
50 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
51 path = wpas.GetInterface(dev.ifname)
52 if_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
53 return (bus,wpas_obj,path,if_obj)
54 except Exception, e:
55 raise HwsimSkip("Could not connect to D-Bus: %s" % e)
56
57 class TestDbus(object):
58 def __init__(self, bus):
59 self.loop = gobject.MainLoop()
60 self.signals = []
61 self.bus = bus
62
63 def __exit__(self, type, value, traceback):
64 for s in self.signals:
65 s.remove()
66
67 def add_signal(self, handler, interface, name, byte_arrays=False):
68 s = self.bus.add_signal_receiver(handler, dbus_interface=interface,
69 signal_name=name,
70 byte_arrays=byte_arrays)
71 self.signals.append(s)
72
73 def timeout(self, *args):
74 logger.debug("timeout")
75 self.loop.quit()
76 return False
77
78 class alloc_fail_dbus(object):
79 def __init__(self, dev, count, funcs, operation="Operation",
80 expected="NoMemory"):
81 self._dev = dev
82 self._count = count
83 self._funcs = funcs
84 self._operation = operation
85 self._expected = expected
86 def __enter__(self):
87 cmd = "TEST_ALLOC_FAIL %d:%s" % (self._count, self._funcs)
88 if "OK" not in self._dev.request(cmd):
89 raise HwsimSkip("TEST_ALLOC_FAIL not supported")
90 def __exit__(self, type, value, traceback):
91 if type is None:
92 raise Exception("%s succeeded during out-of-memory" % self._operation)
93 if type == dbus.exceptions.DBusException and self._expected in str(value):
94 return True
95 if self._dev.request("GET_ALLOC_FAIL") != "0:%s" % self._funcs:
96 raise Exception("%s did not trigger allocation failure" % self._operation)
97 return False
98
99 def start_ap(ap, ssid="test-wps",
100 ap_uuid="27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"):
101 params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
102 "wpa_passphrase": "12345678", "wpa": "2",
103 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
104 "ap_pin": "12345670", "uuid": ap_uuid}
105 return hostapd.add_ap(ap, params)
106
107 def test_dbus_getall(dev, apdev):
108 """D-Bus GetAll"""
109 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
110
111 props = wpas_obj.GetAll(WPAS_DBUS_SERVICE,
112 dbus_interface=dbus.PROPERTIES_IFACE)
113 logger.debug("GetAll(fi.w1.wpa.supplicant1, /fi/w1/wpa_supplicant1) ==> " + str(props))
114
115 props = if_obj.GetAll(WPAS_DBUS_IFACE,
116 dbus_interface=dbus.PROPERTIES_IFACE)
117 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE, path, str(props)))
118
119 props = if_obj.GetAll(WPAS_DBUS_IFACE_WPS,
120 dbus_interface=dbus.PROPERTIES_IFACE)
121 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE_WPS, path, str(props)))
122
123 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
124 dbus_interface=dbus.PROPERTIES_IFACE)
125 if len(res) != 0:
126 raise Exception("Unexpected BSSs entry: " + str(res))
127
128 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
129 dbus_interface=dbus.PROPERTIES_IFACE)
130 if len(res) != 0:
131 raise Exception("Unexpected Networks entry: " + str(res))
132
133 hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
134 bssid = apdev[0]['bssid']
135 dev[0].scan_for_bss(bssid, freq=2412)
136 id = dev[0].add_network()
137 dev[0].set_network(id, "disabled", "0")
138 dev[0].set_network_quoted(id, "ssid", "test")
139
140 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
141 dbus_interface=dbus.PROPERTIES_IFACE)
142 if len(res) != 1:
143 raise Exception("Missing BSSs entry: " + str(res))
144 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
145 props = bss_obj.GetAll(WPAS_DBUS_BSS, dbus_interface=dbus.PROPERTIES_IFACE)
146 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS, res[0], str(props)))
147 bssid_str = ''
148 for item in props['BSSID']:
149 if len(bssid_str) > 0:
150 bssid_str += ':'
151 bssid_str += '%02x' % item
152 if bssid_str != bssid:
153 raise Exception("Unexpected BSSID in BSSs entry")
154
155 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
156 dbus_interface=dbus.PROPERTIES_IFACE)
157 if len(res) != 1:
158 raise Exception("Missing Networks entry: " + str(res))
159 net_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
160 props = net_obj.GetAll(WPAS_DBUS_NETWORK,
161 dbus_interface=dbus.PROPERTIES_IFACE)
162 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_NETWORK, res[0], str(props)))
163 ssid = props['Properties']['ssid']
164 if ssid != '"test"':
165 raise Exception("Unexpected SSID in network entry")
166
167 def test_dbus_getall_oom(dev, apdev):
168 """D-Bus GetAll wpa_config_get_all() OOM"""
169 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
170
171 id = dev[0].add_network()
172 dev[0].set_network(id, "disabled", "0")
173 dev[0].set_network_quoted(id, "ssid", "test")
174
175 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
176 dbus_interface=dbus.PROPERTIES_IFACE)
177 if len(res) != 1:
178 raise Exception("Missing Networks entry: " + str(res))
179 net_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
180 for i in range(1, 50):
181 with alloc_fail(dev[0], i, "wpa_config_get_all"):
182 try:
183 props = net_obj.GetAll(WPAS_DBUS_NETWORK,
184 dbus_interface=dbus.PROPERTIES_IFACE)
185 except dbus.exceptions.DBusException, e:
186 pass
187
188 def dbus_get(dbus, wpas_obj, prop, expect=None, byte_arrays=False):
189 val = wpas_obj.Get(WPAS_DBUS_SERVICE, prop,
190 dbus_interface=dbus.PROPERTIES_IFACE,
191 byte_arrays=byte_arrays)
192 if expect is not None and val != expect:
193 raise Exception("Unexpected %s: %s (expected: %s)" %
194 (prop, str(val), str(expect)))
195 return val
196
197 def dbus_set(dbus, wpas_obj, prop, val):
198 wpas_obj.Set(WPAS_DBUS_SERVICE, prop, val,
199 dbus_interface=dbus.PROPERTIES_IFACE)
200
201 def test_dbus_properties(dev, apdev):
202 """D-Bus Get/Set fi.w1.wpa_supplicant1 properties"""
203 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
204
205 dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump")
206 dbus_set(dbus, wpas_obj, "DebugLevel", "debug")
207 dbus_get(dbus, wpas_obj, "DebugLevel", expect="debug")
208 for (val,err) in [ (3, "Error.Failed: wrong property type"),
209 ("foo", "Error.Failed: wrong debug level value") ]:
210 try:
211 dbus_set(dbus, wpas_obj, "DebugLevel", val)
212 raise Exception("Invalid DebugLevel value accepted: " + str(val))
213 except dbus.exceptions.DBusException, e:
214 if err not in str(e):
215 raise Exception("Unexpected error message: " + str(e))
216 dbus_set(dbus, wpas_obj, "DebugLevel", "msgdump")
217 dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump")
218
219 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True)
220 dbus_set(dbus, wpas_obj, "DebugTimestamp", False)
221 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=False)
222 try:
223 dbus_set(dbus, wpas_obj, "DebugTimestamp", "foo")
224 raise Exception("Invalid DebugTimestamp value accepted")
225 except dbus.exceptions.DBusException, e:
226 if "Error.Failed: wrong property type" not in str(e):
227 raise Exception("Unexpected error message: " + str(e))
228 dbus_set(dbus, wpas_obj, "DebugTimestamp", True)
229 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True)
230
231 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True)
232 dbus_set(dbus, wpas_obj, "DebugShowKeys", False)
233 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=False)
234 try:
235 dbus_set(dbus, wpas_obj, "DebugShowKeys", "foo")
236 raise Exception("Invalid DebugShowKeys value accepted")
237 except dbus.exceptions.DBusException, e:
238 if "Error.Failed: wrong property type" not in str(e):
239 raise Exception("Unexpected error message: " + str(e))
240 dbus_set(dbus, wpas_obj, "DebugShowKeys", True)
241 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True)
242
243 res = dbus_get(dbus, wpas_obj, "Interfaces")
244 if len(res) != 1:
245 raise Exception("Unexpected Interfaces value: " + str(res))
246
247 res = dbus_get(dbus, wpas_obj, "EapMethods")
248 if len(res) < 5 or "TTLS" not in res:
249 raise Exception("Unexpected EapMethods value: " + str(res))
250
251 res = dbus_get(dbus, wpas_obj, "Capabilities")
252 if len(res) < 2 or "p2p" not in res:
253 raise Exception("Unexpected Capabilities value: " + str(res))
254
255 dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
256 val = binascii.unhexlify("010006020304050608")
257 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val))
258 res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
259 if val != res:
260 raise Exception("WFDIEs value changed")
261 try:
262 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray('\x00'))
263 raise Exception("Invalid WFDIEs value accepted")
264 except dbus.exceptions.DBusException, e:
265 if "InvalidArgs" not in str(e):
266 raise Exception("Unexpected error message: " + str(e))
267 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(''))
268 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val))
269 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(''))
270 res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
271 if len(res) != 0:
272 raise Exception("WFDIEs not cleared properly")
273
274 res = dbus_get(dbus, wpas_obj, "EapMethods")
275 try:
276 dbus_set(dbus, wpas_obj, "EapMethods", res)
277 raise Exception("Invalid Set accepted")
278 except dbus.exceptions.DBusException, e:
279 if "InvalidArgs: Property is read-only" not in str(e):
280 raise Exception("Unexpected error message: " + str(e))
281
282 try:
283 wpas_obj.SetFoo(WPAS_DBUS_SERVICE, "DebugShowKeys", True,
284 dbus_interface=dbus.PROPERTIES_IFACE)
285 raise Exception("Unknown method accepted")
286 except dbus.exceptions.DBusException, e:
287 if "UnknownMethod" not in str(e):
288 raise Exception("Unexpected error message: " + str(e))
289
290 try:
291 wpas_obj.Get("foo", "DebugShowKeys",
292 dbus_interface=dbus.PROPERTIES_IFACE)
293 raise Exception("Invalid Get accepted")
294 except dbus.exceptions.DBusException, e:
295 if "InvalidArgs: No such property" not in str(e):
296 raise Exception("Unexpected error message: " + str(e))
297
298 test_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH,
299 introspect=False)
300 try:
301 test_obj.Get(123, "DebugShowKeys",
302 dbus_interface=dbus.PROPERTIES_IFACE)
303 raise Exception("Invalid Get accepted")
304 except dbus.exceptions.DBusException, e:
305 if "InvalidArgs: Invalid arguments" not in str(e):
306 raise Exception("Unexpected error message: " + str(e))
307 try:
308 test_obj.Get(WPAS_DBUS_SERVICE, 123,
309 dbus_interface=dbus.PROPERTIES_IFACE)
310 raise Exception("Invalid Get accepted")
311 except dbus.exceptions.DBusException, e:
312 if "InvalidArgs: Invalid arguments" not in str(e):
313 raise Exception("Unexpected error message: " + str(e))
314
315 try:
316 wpas_obj.Set(WPAS_DBUS_SERVICE, "WFDIEs",
317 dbus.ByteArray('', variant_level=2),
318 dbus_interface=dbus.PROPERTIES_IFACE)
319 raise Exception("Invalid Set accepted")
320 except dbus.exceptions.DBusException, e:
321 if "InvalidArgs: invalid message format" not in str(e):
322 raise Exception("Unexpected error message: " + str(e))
323
324 def test_dbus_set_global_properties(dev, apdev):
325 """D-Bus Get/Set fi.w1.wpa_supplicant1 interface global properties"""
326 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
327
328 dev[0].set("model_name", "")
329 props = [ ('Okc', '0', '1'), ('ModelName', '', 'blahblahblah') ]
330
331 for p in props:
332 res = if_obj.Get(WPAS_DBUS_IFACE, p[0],
333 dbus_interface=dbus.PROPERTIES_IFACE)
334 if res != p[1]:
335 raise Exception("Unexpected " + p[0] + " value: " + str(res))
336
337 if_obj.Set(WPAS_DBUS_IFACE, p[0], p[2],
338 dbus_interface=dbus.PROPERTIES_IFACE)
339
340 res = if_obj.Get(WPAS_DBUS_IFACE, p[0],
341 dbus_interface=dbus.PROPERTIES_IFACE)
342 if res != p[2]:
343 raise Exception("Unexpected " + p[0] + " value after set: " + str(res))
344 dev[0].set("model_name", "")
345
346 def test_dbus_invalid_method(dev, apdev):
347 """D-Bus invalid method"""
348 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
349 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
350
351 try:
352 wps.Foo()
353 raise Exception("Unknown method accepted")
354 except dbus.exceptions.DBusException, e:
355 if "UnknownMethod" not in str(e):
356 raise Exception("Unexpected error message: " + str(e))
357
358 test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False)
359 test_wps = dbus.Interface(test_obj, WPAS_DBUS_IFACE_WPS)
360 try:
361 test_wps.Start(123)
362 raise Exception("WPS.Start with incorrect signature accepted")
363 except dbus.exceptions.DBusException, e:
364 if "InvalidArgs: Invalid arg" not in str(e):
365 raise Exception("Unexpected error message: " + str(e))
366
367 def test_dbus_get_set_wps(dev, apdev):
368 """D-Bus Get/Set for WPS properties"""
369 try:
370 _test_dbus_get_set_wps(dev, apdev)
371 finally:
372 dev[0].request("SET wps_cred_processing 0")
373 dev[0].request("SET config_methods display keypad virtual_display nfc_interface p2ps")
374 dev[0].set("device_name", "Device A")
375 dev[0].set("manufacturer", "")
376 dev[0].set("model_name", "")
377 dev[0].set("model_number", "")
378 dev[0].set("serial_number", "")
379 dev[0].set("device_type", "0-00000000-0")
380
381 def _test_dbus_get_set_wps(dev, apdev):
382 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
383
384 if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
385 dbus_interface=dbus.PROPERTIES_IFACE)
386
387 val = "display keypad virtual_display nfc_interface"
388 dev[0].request("SET config_methods " + val)
389
390 config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
391 dbus_interface=dbus.PROPERTIES_IFACE)
392 if config != val:
393 raise Exception("Unexpected Get(ConfigMethods) result: " + config)
394
395 val2 = "push_button display"
396 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2,
397 dbus_interface=dbus.PROPERTIES_IFACE)
398 config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
399 dbus_interface=dbus.PROPERTIES_IFACE)
400 if config != val2:
401 raise Exception("Unexpected Get(ConfigMethods) result after Set: " + config)
402
403 dev[0].request("SET config_methods " + val)
404
405 for i in range(3):
406 dev[0].request("SET wps_cred_processing " + str(i))
407 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
408 dbus_interface=dbus.PROPERTIES_IFACE)
409 expected_val = False if i == 1 else True
410 if val != expected_val:
411 raise Exception("Unexpected Get(ProcessCredentials) result({}): {}".format(i, val))
412
413 tests = [ ("device_name", "DeviceName"),
414 ("manufacturer", "Manufacturer"),
415 ("model_name", "ModelName"),
416 ("model_number", "ModelNumber"),
417 ("serial_number", "SerialNumber") ]
418
419 for f1,f2 in tests:
420 val2 = "test-value-test"
421 dev[0].set(f1, val2)
422 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, f2,
423 dbus_interface=dbus.PROPERTIES_IFACE)
424 if val != val2:
425 raise Exception("Get(%s) returned unexpected value" % f2)
426 val2 = "TEST-value"
427 if_obj.Set(WPAS_DBUS_IFACE_WPS, f2, val2,
428 dbus_interface=dbus.PROPERTIES_IFACE)
429 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, f2,
430 dbus_interface=dbus.PROPERTIES_IFACE)
431 if val != val2:
432 raise Exception("Get(%s) returned unexpected value after Set" % f2)
433
434 dev[0].set("device_type", "5-0050F204-1")
435 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "DeviceType",
436 dbus_interface=dbus.PROPERTIES_IFACE)
437 if val[0] != 0x00 or val[1] != 0x05 != val[2] != 0x00 or val[3] != 0x50 or val[4] != 0xf2 or val[5] != 0x04 or val[6] != 0x00 or val[7] != 0x01:
438 raise Exception("DeviceType mismatch")
439 if_obj.Set(WPAS_DBUS_IFACE_WPS, "DeviceType", val,
440 dbus_interface=dbus.PROPERTIES_IFACE)
441 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "DeviceType",
442 dbus_interface=dbus.PROPERTIES_IFACE)
443 if val[0] != 0x00 or val[1] != 0x05 != val[2] != 0x00 or val[3] != 0x50 or val[4] != 0xf2 or val[5] != 0x04 or val[6] != 0x00 or val[7] != 0x01:
444 raise Exception("DeviceType mismatch after Set")
445
446 val2 = '\x01\x02\x03\x04\x05\x06\x07\x08'
447 if_obj.Set(WPAS_DBUS_IFACE_WPS, "DeviceType", dbus.ByteArray(val2),
448 dbus_interface=dbus.PROPERTIES_IFACE)
449 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "DeviceType",
450 dbus_interface=dbus.PROPERTIES_IFACE,
451 byte_arrays=True)
452 if val != val2:
453 raise Exception("DeviceType mismatch after Set (2)")
454
455 class TestDbusGetSet(TestDbus):
456 def __init__(self, bus):
457 TestDbus.__init__(self, bus)
458 self.signal_received = False
459 self.signal_received_deprecated = False
460 self.sets_done = False
461
462 def __enter__(self):
463 gobject.timeout_add(1, self.run_sets)
464 gobject.timeout_add(1000, self.timeout)
465 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE_WPS,
466 "PropertiesChanged")
467 self.add_signal(self.propertiesChanged2, dbus.PROPERTIES_IFACE,
468 "PropertiesChanged")
469 self.loop.run()
470 return self
471
472 def propertiesChanged(self, properties):
473 logger.debug("PropertiesChanged: " + str(properties))
474 if properties.has_key("ProcessCredentials"):
475 self.signal_received_deprecated = True
476 if self.sets_done and self.signal_received:
477 self.loop.quit()
478
479 def propertiesChanged2(self, interface_name, changed_properties,
480 invalidated_properties):
481 logger.debug("propertiesChanged2: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
482 if interface_name != WPAS_DBUS_IFACE_WPS:
483 return
484 if changed_properties.has_key("ProcessCredentials"):
485 self.signal_received = True
486 if self.sets_done and self.signal_received_deprecated:
487 self.loop.quit()
488
489 def run_sets(self, *args):
490 logger.debug("run_sets")
491 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
492 dbus.Boolean(1),
493 dbus_interface=dbus.PROPERTIES_IFACE)
494 if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
495 dbus_interface=dbus.PROPERTIES_IFACE) != True:
496 raise Exception("Unexpected Get(ProcessCredentials) result after Set")
497 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
498 dbus.Boolean(0),
499 dbus_interface=dbus.PROPERTIES_IFACE)
500 if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
501 dbus_interface=dbus.PROPERTIES_IFACE) != False:
502 raise Exception("Unexpected Get(ProcessCredentials) result after Set")
503
504 self.dbus_sets_done = True
505 return False
506
507 def success(self):
508 return self.signal_received and self.signal_received_deprecated
509
510 with TestDbusGetSet(bus) as t:
511 if not t.success():
512 raise Exception("No signal received for ProcessCredentials change")
513
514 def test_dbus_wps_invalid(dev, apdev):
515 """D-Bus invaldi WPS operation"""
516 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
517 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
518
519 failures = [ {'Role': 'foo', 'Type': 'pbc'},
520 {'Role': 123, 'Type': 'pbc'},
521 {'Type': 'pbc'},
522 {'Role': 'enrollee'},
523 {'Role': 'registrar'},
524 {'Role': 'enrollee', 'Type': 123},
525 {'Role': 'enrollee', 'Type': 'foo'},
526 {'Role': 'enrollee', 'Type': 'pbc',
527 'Bssid': '02:33:44:55:66:77'},
528 {'Role': 'enrollee', 'Type': 'pin', 'Pin': 123},
529 {'Role': 'enrollee', 'Type': 'pbc',
530 'Bssid': dbus.ByteArray('12345')},
531 {'Role': 'enrollee', 'Type': 'pbc',
532 'P2PDeviceAddress': 12345},
533 {'Role': 'enrollee', 'Type': 'pbc',
534 'P2PDeviceAddress': dbus.ByteArray('12345')},
535 {'Role': 'enrollee', 'Type': 'pbc', 'Foo': 'bar'} ]
536 for args in failures:
537 try:
538 wps.Start(args)
539 raise Exception("Invalid WPS.Start() arguments accepted: " + str(args))
540 except dbus.exceptions.DBusException, e:
541 if not str(e).startswith("fi.w1.wpa_supplicant1.InvalidArgs"):
542 raise Exception("Unexpected error message: " + str(e))
543
544 def test_dbus_wps_oom(dev, apdev):
545 """D-Bus WPS operation (OOM)"""
546 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
547 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
548
549 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_state", "Get"):
550 if_obj.Get(WPAS_DBUS_IFACE, "State",
551 dbus_interface=dbus.PROPERTIES_IFACE)
552
553 hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
554 bssid = apdev[0]['bssid']
555 dev[0].scan_for_bss(bssid, freq=2412)
556
557 time.sleep(0.05)
558 for i in range(1, 3):
559 with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_bsss", "Get"):
560 if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
561 dbus_interface=dbus.PROPERTIES_IFACE)
562
563 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
564 dbus_interface=dbus.PROPERTIES_IFACE)
565 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
566 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_bss_rates", "Get"):
567 bss_obj.Get(WPAS_DBUS_BSS, "Rates",
568 dbus_interface=dbus.PROPERTIES_IFACE)
569 with alloc_fail(dev[0], 1,
570 "wpa_bss_get_bit_rates;wpas_dbus_getter_bss_rates"):
571 try:
572 bss_obj.Get(WPAS_DBUS_BSS, "Rates",
573 dbus_interface=dbus.PROPERTIES_IFACE)
574 except dbus.exceptions.DBusException, e:
575 pass
576
577 id = dev[0].add_network()
578 dev[0].set_network(id, "disabled", "0")
579 dev[0].set_network_quoted(id, "ssid", "test")
580
581 for i in range(1, 3):
582 with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_networks", "Get"):
583 if_obj.Get(WPAS_DBUS_IFACE, "Networks",
584 dbus_interface=dbus.PROPERTIES_IFACE)
585
586 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_getter_interfaces", "Get"):
587 dbus_get(dbus, wpas_obj, "Interfaces")
588
589 for i in range(1, 6):
590 with alloc_fail_dbus(dev[0], i, "=eap_get_names_as_string_array;wpas_dbus_getter_eap_methods", "Get"):
591 dbus_get(dbus, wpas_obj, "EapMethods")
592
593 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_setter_config_methods", "Set",
594 expected="Error.Failed: Failed to set property"):
595 val2 = "push_button display"
596 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2,
597 dbus_interface=dbus.PROPERTIES_IFACE)
598
599 with alloc_fail_dbus(dev[0], 1, "=wpa_config_add_network;wpas_dbus_handler_wps_start",
600 "WPS.Start",
601 expected="UnknownError: WPS start failed"):
602 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670'})
603
604 def test_dbus_wps_pbc(dev, apdev):
605 """D-Bus WPS/PBC operation and signals"""
606 try:
607 _test_dbus_wps_pbc(dev, apdev)
608 finally:
609 dev[0].request("SET wps_cred_processing 0")
610
611 def _test_dbus_wps_pbc(dev, apdev):
612 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
613 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
614
615 hapd = start_ap(apdev[0])
616 hapd.request("WPS_PBC")
617 bssid = apdev[0]['bssid']
618 dev[0].scan_for_bss(bssid, freq="2412")
619 dev[0].request("SET wps_cred_processing 2")
620
621 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
622 dbus_interface=dbus.PROPERTIES_IFACE)
623 if len(res) != 1:
624 raise Exception("Missing BSSs entry: " + str(res))
625 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
626 props = bss_obj.GetAll(WPAS_DBUS_BSS, dbus_interface=dbus.PROPERTIES_IFACE)
627 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS, res[0], str(props)))
628 if 'WPS' not in props:
629 raise Exception("No WPS information in the BSS entry")
630 if 'Type' not in props['WPS']:
631 raise Exception("No Type field in the WPS dictionary")
632 if props['WPS']['Type'] != 'pbc':
633 raise Exception("Unexpected WPS Type: " + props['WPS']['Type'])
634
635 class TestDbusWps(TestDbus):
636 def __init__(self, bus, wps):
637 TestDbus.__init__(self, bus)
638 self.success_seen = False
639 self.credentials_received = False
640 self.wps = wps
641
642 def __enter__(self):
643 gobject.timeout_add(1, self.start_pbc)
644 gobject.timeout_add(15000, self.timeout)
645 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
646 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
647 "Credentials")
648 self.loop.run()
649 return self
650
651 def wpsEvent(self, name, args):
652 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
653 if name == "success":
654 self.success_seen = True
655 if self.credentials_received:
656 self.loop.quit()
657
658 def credentials(self, args):
659 logger.debug("credentials: " + str(args))
660 self.credentials_received = True
661 if self.success_seen:
662 self.loop.quit()
663
664 def start_pbc(self, *args):
665 logger.debug("start_pbc")
666 self.wps.Start({'Role': 'enrollee', 'Type': 'pbc'})
667 return False
668
669 def success(self):
670 return self.success_seen and self.credentials_received
671
672 with TestDbusWps(bus, wps) as t:
673 if not t.success():
674 raise Exception("Failure in D-Bus operations")
675
676 dev[0].wait_connected(timeout=10)
677 dev[0].request("DISCONNECT")
678 hapd.disable()
679 dev[0].flush_scan_cache()
680
681 def test_dbus_wps_pbc_overlap(dev, apdev):
682 """D-Bus WPS/PBC operation and signal for PBC overlap"""
683 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
684 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
685
686 hapd = start_ap(apdev[0])
687 hapd2 = start_ap(apdev[1], ssid="test-wps2",
688 ap_uuid="27ea801a-9e5c-4e73-bd82-f89cbcd10d7f")
689 hapd.request("WPS_PBC")
690 hapd2.request("WPS_PBC")
691 bssid = apdev[0]['bssid']
692 dev[0].scan_for_bss(bssid, freq="2412")
693 bssid2 = apdev[1]['bssid']
694 dev[0].scan_for_bss(bssid2, freq="2412")
695
696 class TestDbusWps(TestDbus):
697 def __init__(self, bus, wps):
698 TestDbus.__init__(self, bus)
699 self.overlap_seen = False
700 self.wps = wps
701
702 def __enter__(self):
703 gobject.timeout_add(1, self.start_pbc)
704 gobject.timeout_add(15000, self.timeout)
705 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
706 self.loop.run()
707 return self
708
709 def wpsEvent(self, name, args):
710 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
711 if name == "pbc-overlap":
712 self.overlap_seen = True
713 self.loop.quit()
714
715 def start_pbc(self, *args):
716 logger.debug("start_pbc")
717 self.wps.Start({'Role': 'enrollee', 'Type': 'pbc'})
718 return False
719
720 def success(self):
721 return self.overlap_seen
722
723 with TestDbusWps(bus, wps) as t:
724 if not t.success():
725 raise Exception("Failure in D-Bus operations")
726
727 dev[0].request("WPS_CANCEL")
728 dev[0].request("DISCONNECT")
729 hapd.disable()
730 dev[0].flush_scan_cache()
731
732 def test_dbus_wps_pin(dev, apdev):
733 """D-Bus WPS/PIN operation and signals"""
734 try:
735 _test_dbus_wps_pin(dev, apdev)
736 finally:
737 dev[0].request("SET wps_cred_processing 0")
738
739 def _test_dbus_wps_pin(dev, apdev):
740 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
741 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
742
743 hapd = start_ap(apdev[0])
744 hapd.request("WPS_PIN any 12345670")
745 bssid = apdev[0]['bssid']
746 dev[0].scan_for_bss(bssid, freq="2412")
747 dev[0].request("SET wps_cred_processing 2")
748
749 class TestDbusWps(TestDbus):
750 def __init__(self, bus):
751 TestDbus.__init__(self, bus)
752 self.success_seen = False
753 self.credentials_received = False
754
755 def __enter__(self):
756 gobject.timeout_add(1, self.start_pin)
757 gobject.timeout_add(15000, self.timeout)
758 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
759 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
760 "Credentials")
761 self.loop.run()
762 return self
763
764 def wpsEvent(self, name, args):
765 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
766 if name == "success":
767 self.success_seen = True
768 if self.credentials_received:
769 self.loop.quit()
770
771 def credentials(self, args):
772 logger.debug("credentials: " + str(args))
773 self.credentials_received = True
774 if self.success_seen:
775 self.loop.quit()
776
777 def start_pin(self, *args):
778 logger.debug("start_pin")
779 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
780 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
781 'Bssid': bssid_ay})
782 return False
783
784 def success(self):
785 return self.success_seen and self.credentials_received
786
787 with TestDbusWps(bus) as t:
788 if not t.success():
789 raise Exception("Failure in D-Bus operations")
790
791 dev[0].wait_connected(timeout=10)
792
793 def test_dbus_wps_pin2(dev, apdev):
794 """D-Bus WPS/PIN operation and signals (PIN from wpa_supplicant)"""
795 try:
796 _test_dbus_wps_pin2(dev, apdev)
797 finally:
798 dev[0].request("SET wps_cred_processing 0")
799
800 def _test_dbus_wps_pin2(dev, apdev):
801 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
802 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
803
804 hapd = start_ap(apdev[0])
805 bssid = apdev[0]['bssid']
806 dev[0].scan_for_bss(bssid, freq="2412")
807 dev[0].request("SET wps_cred_processing 2")
808
809 class TestDbusWps(TestDbus):
810 def __init__(self, bus):
811 TestDbus.__init__(self, bus)
812 self.success_seen = False
813 self.failed = False
814
815 def __enter__(self):
816 gobject.timeout_add(1, self.start_pin)
817 gobject.timeout_add(15000, self.timeout)
818 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
819 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
820 "Credentials")
821 self.loop.run()
822 return self
823
824 def wpsEvent(self, name, args):
825 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
826 if name == "success":
827 self.success_seen = True
828 if self.credentials_received:
829 self.loop.quit()
830
831 def credentials(self, args):
832 logger.debug("credentials: " + str(args))
833 self.credentials_received = True
834 if self.success_seen:
835 self.loop.quit()
836
837 def start_pin(self, *args):
838 logger.debug("start_pin")
839 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
840 res = wps.Start({'Role': 'enrollee', 'Type': 'pin',
841 'Bssid': bssid_ay})
842 pin = res['Pin']
843 h = hostapd.Hostapd(apdev[0]['ifname'])
844 h.request("WPS_PIN any " + pin)
845 return False
846
847 def success(self):
848 return self.success_seen and self.credentials_received
849
850 with TestDbusWps(bus) as t:
851 if not t.success():
852 raise Exception("Failure in D-Bus operations")
853
854 dev[0].wait_connected(timeout=10)
855
856 def test_dbus_wps_pin_m2d(dev, apdev):
857 """D-Bus WPS/PIN operation and signals with M2D"""
858 try:
859 _test_dbus_wps_pin_m2d(dev, apdev)
860 finally:
861 dev[0].request("SET wps_cred_processing 0")
862
863 def _test_dbus_wps_pin_m2d(dev, apdev):
864 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
865 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
866
867 hapd = start_ap(apdev[0])
868 bssid = apdev[0]['bssid']
869 dev[0].scan_for_bss(bssid, freq="2412")
870 dev[0].request("SET wps_cred_processing 2")
871
872 class TestDbusWps(TestDbus):
873 def __init__(self, bus):
874 TestDbus.__init__(self, bus)
875 self.success_seen = False
876 self.credentials_received = False
877
878 def __enter__(self):
879 gobject.timeout_add(1, self.start_pin)
880 gobject.timeout_add(15000, self.timeout)
881 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
882 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
883 "Credentials")
884 self.loop.run()
885 return self
886
887 def wpsEvent(self, name, args):
888 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
889 if name == "success":
890 self.success_seen = True
891 if self.credentials_received:
892 self.loop.quit()
893 elif name == "m2d":
894 h = hostapd.Hostapd(apdev[0]['ifname'])
895 h.request("WPS_PIN any 12345670")
896
897 def credentials(self, args):
898 logger.debug("credentials: " + str(args))
899 self.credentials_received = True
900 if self.success_seen:
901 self.loop.quit()
902
903 def start_pin(self, *args):
904 logger.debug("start_pin")
905 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
906 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
907 'Bssid': bssid_ay})
908 return False
909
910 def success(self):
911 return self.success_seen and self.credentials_received
912
913 with TestDbusWps(bus) as t:
914 if not t.success():
915 raise Exception("Failure in D-Bus operations")
916
917 dev[0].wait_connected(timeout=10)
918
919 def test_dbus_wps_reg(dev, apdev):
920 """D-Bus WPS/Registrar operation and signals"""
921 try:
922 _test_dbus_wps_reg(dev, apdev)
923 finally:
924 dev[0].request("SET wps_cred_processing 0")
925
926 def _test_dbus_wps_reg(dev, apdev):
927 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
928 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
929
930 hapd = start_ap(apdev[0])
931 hapd.request("WPS_PIN any 12345670")
932 bssid = apdev[0]['bssid']
933 dev[0].scan_for_bss(bssid, freq="2412")
934 dev[0].request("SET wps_cred_processing 2")
935
936 class TestDbusWps(TestDbus):
937 def __init__(self, bus):
938 TestDbus.__init__(self, bus)
939 self.credentials_received = False
940
941 def __enter__(self):
942 gobject.timeout_add(100, self.start_reg)
943 gobject.timeout_add(15000, self.timeout)
944 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
945 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
946 "Credentials")
947 self.loop.run()
948 return self
949
950 def wpsEvent(self, name, args):
951 logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
952
953 def credentials(self, args):
954 logger.debug("credentials: " + str(args))
955 self.credentials_received = True
956 self.loop.quit()
957
958 def start_reg(self, *args):
959 logger.debug("start_reg")
960 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
961 wps.Start({'Role': 'registrar', 'Type': 'pin',
962 'Pin': '12345670', 'Bssid': bssid_ay})
963 return False
964
965 def success(self):
966 return self.credentials_received
967
968 with TestDbusWps(bus) as t:
969 if not t.success():
970 raise Exception("Failure in D-Bus operations")
971
972 dev[0].wait_connected(timeout=10)
973
974 def test_dbus_wps_cancel(dev, apdev):
975 """D-Bus WPS Cancel operation"""
976 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
977 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
978
979 hapd = start_ap(apdev[0])
980 bssid = apdev[0]['bssid']
981
982 wps.Cancel()
983 dev[0].scan_for_bss(bssid, freq="2412")
984 bssid_ay = dbus.ByteArray(bssid.replace(':','').decode('hex'))
985 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
986 'Bssid': bssid_ay})
987 wps.Cancel()
988 dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 1)
989
990 def test_dbus_scan_invalid(dev, apdev):
991 """D-Bus invalid scan method"""
992 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
993 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
994
995 tests = [ ({}, "InvalidArgs"),
996 ({'Type': 123}, "InvalidArgs"),
997 ({'Type': 'foo'}, "InvalidArgs"),
998 ({'Type': 'active', 'Foo': 'bar'}, "InvalidArgs"),
999 ({'Type': 'active', 'SSIDs': 'foo'}, "InvalidArgs"),
1000 ({'Type': 'active', 'SSIDs': ['foo']}, "InvalidArgs"),
1001 ({'Type': 'active',
1002 'SSIDs': [ dbus.ByteArray("1"), dbus.ByteArray("2"),
1003 dbus.ByteArray("3"), dbus.ByteArray("4"),
1004 dbus.ByteArray("5"), dbus.ByteArray("6"),
1005 dbus.ByteArray("7"), dbus.ByteArray("8"),
1006 dbus.ByteArray("9"), dbus.ByteArray("10"),
1007 dbus.ByteArray("11"), dbus.ByteArray("12"),
1008 dbus.ByteArray("13"), dbus.ByteArray("14"),
1009 dbus.ByteArray("15"), dbus.ByteArray("16"),
1010 dbus.ByteArray("17") ]},
1011 "InvalidArgs"),
1012 ({'Type': 'active',
1013 'SSIDs': [ dbus.ByteArray("1234567890abcdef1234567890abcdef1") ]},
1014 "InvalidArgs"),
1015 ({'Type': 'active', 'IEs': 'foo'}, "InvalidArgs"),
1016 ({'Type': 'active', 'IEs': ['foo']}, "InvalidArgs"),
1017 ({'Type': 'active', 'Channels': 2412 }, "InvalidArgs"),
1018 ({'Type': 'active', 'Channels': [ 2412 ] }, "InvalidArgs"),
1019 ({'Type': 'active',
1020 'Channels': [ (dbus.Int32(2412), dbus.UInt32(20)) ] },
1021 "InvalidArgs"),
1022 ({'Type': 'active',
1023 'Channels': [ (dbus.UInt32(2412), dbus.Int32(20)) ] },
1024 "InvalidArgs"),
1025 ({'Type': 'active', 'AllowRoam': "yes" }, "InvalidArgs"),
1026 ({'Type': 'passive', 'IEs': [ dbus.ByteArray("\xdd\x00") ]},
1027 "InvalidArgs"),
1028 ({'Type': 'passive', 'SSIDs': [ dbus.ByteArray("foo") ]},
1029 "InvalidArgs")]
1030 for (t,err) in tests:
1031 try:
1032 iface.Scan(t)
1033 raise Exception("Invalid Scan() arguments accepted: " + str(t))
1034 except dbus.exceptions.DBusException, e:
1035 if err not in str(e):
1036 raise Exception("Unexpected error message for invalid Scan(%s): %s" % (str(t), str(e)))
1037
1038 def test_dbus_scan_oom(dev, apdev):
1039 """D-Bus scan method and OOM"""
1040 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1041 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1042
1043 with alloc_fail_dbus(dev[0], 1,
1044 "wpa_scan_clone_params;wpas_dbus_handler_scan",
1045 "Scan", expected="ScanError: Scan request rejected"):
1046 iface.Scan({ 'Type': 'passive',
1047 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
1048
1049 with alloc_fail_dbus(dev[0], 1,
1050 "=wpas_dbus_get_scan_channels;wpas_dbus_handler_scan",
1051 "Scan"):
1052 iface.Scan({ 'Type': 'passive',
1053 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
1054
1055 with alloc_fail_dbus(dev[0], 1,
1056 "=wpas_dbus_get_scan_ies;wpas_dbus_handler_scan",
1057 "Scan"):
1058 iface.Scan({ 'Type': 'active',
1059 'IEs': [ dbus.ByteArray("\xdd\x00") ],
1060 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
1061
1062 with alloc_fail_dbus(dev[0], 1,
1063 "=wpas_dbus_get_scan_ssids;wpas_dbus_handler_scan",
1064 "Scan"):
1065 iface.Scan({ 'Type': 'active',
1066 'SSIDs': [ dbus.ByteArray("open"),
1067 dbus.ByteArray() ],
1068 'Channels': [ (dbus.UInt32(2412), dbus.UInt32(20)) ] })
1069
1070 def test_dbus_scan(dev, apdev):
1071 """D-Bus scan and related signals"""
1072 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1073 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1074
1075 hapd = hostapd.add_ap(apdev[0], { "ssid": "open" })
1076
1077 class TestDbusScan(TestDbus):
1078 def __init__(self, bus):
1079 TestDbus.__init__(self, bus)
1080 self.scan_completed = 0
1081 self.bss_added = False
1082 self.fail_reason = None
1083
1084 def __enter__(self):
1085 gobject.timeout_add(1, self.run_scan)
1086 gobject.timeout_add(15000, self.timeout)
1087 self.add_signal(self.scanDone, WPAS_DBUS_IFACE, "ScanDone")
1088 self.add_signal(self.bssAdded, WPAS_DBUS_IFACE, "BSSAdded")
1089 self.add_signal(self.bssRemoved, WPAS_DBUS_IFACE, "BSSRemoved")
1090 self.loop.run()
1091 return self
1092
1093 def scanDone(self, success):
1094 logger.debug("scanDone: success=%s" % success)
1095 self.scan_completed += 1
1096 if self.scan_completed == 1:
1097 iface.Scan({'Type': 'passive',
1098 'AllowRoam': True,
1099 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1100 elif self.scan_completed == 2:
1101 iface.Scan({'Type': 'passive',
1102 'AllowRoam': False})
1103 elif self.bss_added and self.scan_completed == 3:
1104 self.loop.quit()
1105
1106 def bssAdded(self, bss, properties):
1107 logger.debug("bssAdded: %s" % bss)
1108 logger.debug(str(properties))
1109 if 'WPS' in properties:
1110 if 'Type' in properties['WPS']:
1111 self.fail_reason = "Unexpected WPS dictionary entry in non-WPS BSS"
1112 self.loop.quit()
1113 self.bss_added = True
1114 if self.scan_completed == 3:
1115 self.loop.quit()
1116
1117 def bssRemoved(self, bss):
1118 logger.debug("bssRemoved: %s" % bss)
1119
1120 def run_scan(self, *args):
1121 logger.debug("run_scan")
1122 iface.Scan({'Type': 'active',
1123 'SSIDs': [ dbus.ByteArray("open"),
1124 dbus.ByteArray() ],
1125 'IEs': [ dbus.ByteArray("\xdd\x00"),
1126 dbus.ByteArray() ],
1127 'AllowRoam': False,
1128 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1129 return False
1130
1131 def success(self):
1132 return self.scan_completed == 3 and self.bss_added
1133
1134 with TestDbusScan(bus) as t:
1135 if t.fail_reason:
1136 raise Exception(t.fail_reason)
1137 if not t.success():
1138 raise Exception("Expected signals not seen")
1139
1140 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
1141 dbus_interface=dbus.PROPERTIES_IFACE)
1142 if len(res) < 1:
1143 raise Exception("Scan result not in BSSs property")
1144 iface.FlushBSS(0)
1145 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
1146 dbus_interface=dbus.PROPERTIES_IFACE)
1147 if len(res) != 0:
1148 raise Exception("FlushBSS() did not remove scan results from BSSs property")
1149 iface.FlushBSS(1)
1150
1151 def test_dbus_scan_busy(dev, apdev):
1152 """D-Bus scan trigger rejection when busy with previous scan"""
1153 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1154 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1155
1156 if "OK" not in dev[0].request("SCAN freq=2412-2462"):
1157 raise Exception("Failed to start scan")
1158 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-STARTED"], 15)
1159 if ev is None:
1160 raise Exception("Scan start timed out")
1161
1162 try:
1163 iface.Scan({'Type': 'active', 'AllowRoam': False})
1164 raise Exception("Scan() accepted when busy")
1165 except dbus.exceptions.DBusException, e:
1166 if "ScanError: Scan request reject" not in str(e):
1167 raise Exception("Unexpected error message: " + str(e))
1168
1169 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
1170 if ev is None:
1171 raise Exception("Scan timed out")
1172
1173 def test_dbus_scan_abort(dev, apdev):
1174 """D-Bus scan trigger and abort"""
1175 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1176 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1177
1178 iface.Scan({'Type': 'active', 'AllowRoam': False})
1179 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-STARTED"], 15)
1180 if ev is None:
1181 raise Exception("Scan start timed out")
1182
1183 iface.AbortScan()
1184 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
1185 if ev is None:
1186 raise Exception("Scan abort result timed out")
1187 dev[0].dump_monitor()
1188 iface.Scan({'Type': 'active', 'AllowRoam': False})
1189 iface.AbortScan()
1190
1191 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
1192 if ev is None:
1193 raise Exception("Scan timed out")
1194
1195 def test_dbus_connect(dev, apdev):
1196 """D-Bus AddNetwork and connect"""
1197 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1198 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1199
1200 ssid = "test-wpa2-psk"
1201 passphrase = 'qwertyuiop'
1202 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
1203 hapd = hostapd.add_ap(apdev[0], params)
1204
1205 class TestDbusConnect(TestDbus):
1206 def __init__(self, bus):
1207 TestDbus.__init__(self, bus)
1208 self.network_added = False
1209 self.network_selected = False
1210 self.network_removed = False
1211 self.state = 0
1212
1213 def __enter__(self):
1214 gobject.timeout_add(1, self.run_connect)
1215 gobject.timeout_add(15000, self.timeout)
1216 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
1217 self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE,
1218 "NetworkRemoved")
1219 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
1220 "NetworkSelected")
1221 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1222 "PropertiesChanged")
1223 self.loop.run()
1224 return self
1225
1226 def networkAdded(self, network, properties):
1227 logger.debug("networkAdded: %s" % str(network))
1228 logger.debug(str(properties))
1229 self.network_added = True
1230
1231 def networkRemoved(self, network):
1232 logger.debug("networkRemoved: %s" % str(network))
1233 self.network_removed = True
1234
1235 def networkSelected(self, network):
1236 logger.debug("networkSelected: %s" % str(network))
1237 self.network_selected = True
1238
1239 def propertiesChanged(self, properties):
1240 logger.debug("propertiesChanged: %s" % str(properties))
1241 if 'State' in properties and properties['State'] == "completed":
1242 if self.state == 0:
1243 self.state = 1
1244 iface.Disconnect()
1245 elif self.state == 2:
1246 self.state = 3
1247 iface.Disconnect()
1248 elif self.state == 4:
1249 self.state = 5
1250 iface.Reattach()
1251 elif self.state == 5:
1252 self.state = 6
1253 iface.Disconnect()
1254 elif self.state == 7:
1255 self.state = 8
1256 res = iface.SignalPoll()
1257 logger.debug("SignalPoll: " + str(res))
1258 if 'frequency' not in res or res['frequency'] != 2412:
1259 self.state = -1
1260 logger.info("Unexpected SignalPoll result")
1261 iface.RemoveNetwork(self.netw)
1262 if 'State' in properties and properties['State'] == "disconnected":
1263 if self.state == 1:
1264 self.state = 2
1265 iface.SelectNetwork(self.netw)
1266 elif self.state == 3:
1267 self.state = 4
1268 iface.Reassociate()
1269 elif self.state == 6:
1270 self.state = 7
1271 iface.Reconnect()
1272 elif self.state == 8:
1273 self.state = 9
1274 self.loop.quit()
1275
1276 def run_connect(self, *args):
1277 logger.debug("run_connect")
1278 args = dbus.Dictionary({ 'ssid': ssid,
1279 'key_mgmt': 'WPA-PSK',
1280 'psk': passphrase,
1281 'scan_freq': 2412 },
1282 signature='sv')
1283 self.netw = iface.AddNetwork(args)
1284 iface.SelectNetwork(self.netw)
1285 return False
1286
1287 def success(self):
1288 if not self.network_added or \
1289 not self.network_removed or \
1290 not self.network_selected:
1291 return False
1292 return self.state == 9
1293
1294 with TestDbusConnect(bus) as t:
1295 if not t.success():
1296 raise Exception("Expected signals not seen")
1297
1298 def test_dbus_connect_psk_mem(dev, apdev):
1299 """D-Bus AddNetwork and connect with memory-only PSK"""
1300 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1301 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1302
1303 ssid = "test-wpa2-psk"
1304 passphrase = 'qwertyuiop'
1305 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
1306 hapd = hostapd.add_ap(apdev[0], params)
1307
1308 class TestDbusConnect(TestDbus):
1309 def __init__(self, bus):
1310 TestDbus.__init__(self, bus)
1311 self.connected = False
1312
1313 def __enter__(self):
1314 gobject.timeout_add(1, self.run_connect)
1315 gobject.timeout_add(15000, self.timeout)
1316 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1317 "PropertiesChanged")
1318 self.add_signal(self.networkRequest, WPAS_DBUS_IFACE,
1319 "NetworkRequest")
1320 self.loop.run()
1321 return self
1322
1323 def propertiesChanged(self, properties):
1324 logger.debug("propertiesChanged: %s" % str(properties))
1325 if 'State' in properties and properties['State'] == "completed":
1326 self.connected = True
1327 self.loop.quit()
1328
1329 def networkRequest(self, path, field, txt):
1330 logger.debug("networkRequest: %s %s %s" % (path, field, txt))
1331 if field == "PSK_PASSPHRASE":
1332 iface.NetworkReply(path, field, '"' + passphrase + '"')
1333
1334 def run_connect(self, *args):
1335 logger.debug("run_connect")
1336 args = dbus.Dictionary({ 'ssid': ssid,
1337 'key_mgmt': 'WPA-PSK',
1338 'mem_only_psk': 1,
1339 'scan_freq': 2412 },
1340 signature='sv')
1341 self.netw = iface.AddNetwork(args)
1342 iface.SelectNetwork(self.netw)
1343 return False
1344
1345 def success(self):
1346 return self.connected
1347
1348 with TestDbusConnect(bus) as t:
1349 if not t.success():
1350 raise Exception("Expected signals not seen")
1351
1352 def test_dbus_connect_oom(dev, apdev):
1353 """D-Bus AddNetwork and connect when out-of-memory"""
1354 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1355 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1356
1357 if "OK" not in dev[0].request("TEST_ALLOC_FAIL 0:"):
1358 raise HwsimSkip("TEST_ALLOC_FAIL not supported in the build")
1359
1360 ssid = "test-wpa2-psk"
1361 passphrase = 'qwertyuiop'
1362 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
1363 hapd = hostapd.add_ap(apdev[0], params)
1364
1365 class TestDbusConnect(TestDbus):
1366 def __init__(self, bus):
1367 TestDbus.__init__(self, bus)
1368 self.network_added = False
1369 self.network_selected = False
1370 self.network_removed = False
1371 self.state = 0
1372
1373 def __enter__(self):
1374 gobject.timeout_add(1, self.run_connect)
1375 gobject.timeout_add(1500, self.timeout)
1376 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
1377 self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE,
1378 "NetworkRemoved")
1379 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
1380 "NetworkSelected")
1381 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1382 "PropertiesChanged")
1383 self.loop.run()
1384 return self
1385
1386 def networkAdded(self, network, properties):
1387 logger.debug("networkAdded: %s" % str(network))
1388 logger.debug(str(properties))
1389 self.network_added = True
1390
1391 def networkRemoved(self, network):
1392 logger.debug("networkRemoved: %s" % str(network))
1393 self.network_removed = True
1394
1395 def networkSelected(self, network):
1396 logger.debug("networkSelected: %s" % str(network))
1397 self.network_selected = True
1398
1399 def propertiesChanged(self, properties):
1400 logger.debug("propertiesChanged: %s" % str(properties))
1401 if 'State' in properties and properties['State'] == "completed":
1402 if self.state == 0:
1403 self.state = 1
1404 iface.Disconnect()
1405 elif self.state == 2:
1406 self.state = 3
1407 iface.Disconnect()
1408 elif self.state == 4:
1409 self.state = 5
1410 iface.Reattach()
1411 elif self.state == 5:
1412 self.state = 6
1413 res = iface.SignalPoll()
1414 logger.debug("SignalPoll: " + str(res))
1415 if 'frequency' not in res or res['frequency'] != 2412:
1416 self.state = -1
1417 logger.info("Unexpected SignalPoll result")
1418 iface.RemoveNetwork(self.netw)
1419 if 'State' in properties and properties['State'] == "disconnected":
1420 if self.state == 1:
1421 self.state = 2
1422 iface.SelectNetwork(self.netw)
1423 elif self.state == 3:
1424 self.state = 4
1425 iface.Reassociate()
1426 elif self.state == 6:
1427 self.state = 7
1428 self.loop.quit()
1429
1430 def run_connect(self, *args):
1431 logger.debug("run_connect")
1432 args = dbus.Dictionary({ 'ssid': ssid,
1433 'key_mgmt': 'WPA-PSK',
1434 'psk': passphrase,
1435 'scan_freq': 2412 },
1436 signature='sv')
1437 try:
1438 self.netw = iface.AddNetwork(args)
1439 except Exception, e:
1440 logger.info("Exception on AddNetwork: " + str(e))
1441 self.loop.quit()
1442 return False
1443 try:
1444 iface.SelectNetwork(self.netw)
1445 except Exception, e:
1446 logger.info("Exception on SelectNetwork: " + str(e))
1447 self.loop.quit()
1448
1449 return False
1450
1451 def success(self):
1452 if not self.network_added or \
1453 not self.network_removed or \
1454 not self.network_selected:
1455 return False
1456 return self.state == 7
1457
1458 count = 0
1459 for i in range(1, 1000):
1460 for j in range(3):
1461 dev[j].dump_monitor()
1462 dev[0].request("TEST_ALLOC_FAIL %d:main" % i)
1463 try:
1464 with TestDbusConnect(bus) as t:
1465 if not t.success():
1466 logger.info("Iteration %d - Expected signals not seen" % i)
1467 else:
1468 logger.info("Iteration %d - success" % i)
1469
1470 state = dev[0].request('GET_ALLOC_FAIL')
1471 logger.info("GET_ALLOC_FAIL: " + state)
1472 dev[0].dump_monitor()
1473 dev[0].request("TEST_ALLOC_FAIL 0:")
1474 if i < 3:
1475 raise Exception("Connection succeeded during out-of-memory")
1476 if not state.startswith('0:'):
1477 count += 1
1478 if count == 5:
1479 break
1480 except:
1481 pass
1482
1483 # Force regulatory update to re-fetch hw capabilities for the following
1484 # test cases.
1485 try:
1486 dev[0].dump_monitor()
1487 subprocess.call(['iw', 'reg', 'set', 'US'])
1488 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1489 finally:
1490 dev[0].dump_monitor()
1491 subprocess.call(['iw', 'reg', 'set', '00'])
1492 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1493
1494 def test_dbus_while_not_connected(dev, apdev):
1495 """D-Bus invalid operations while not connected"""
1496 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1497 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1498
1499 try:
1500 iface.Disconnect()
1501 raise Exception("Disconnect() accepted when not connected")
1502 except dbus.exceptions.DBusException, e:
1503 if "NotConnected" not in str(e):
1504 raise Exception("Unexpected error message for invalid Disconnect: " + str(e))
1505
1506 try:
1507 iface.Reattach()
1508 raise Exception("Reattach() accepted when not connected")
1509 except dbus.exceptions.DBusException, e:
1510 if "NotConnected" not in str(e):
1511 raise Exception("Unexpected error message for invalid Reattach: " + str(e))
1512
1513 def test_dbus_connect_eap(dev, apdev):
1514 """D-Bus AddNetwork and connect to EAP network"""
1515 check_altsubject_match_support(dev[0])
1516 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1517 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1518
1519 ssid = "ieee8021x-open"
1520 params = hostapd.radius_params()
1521 params["ssid"] = ssid
1522 params["ieee8021x"] = "1"
1523 hapd = hostapd.add_ap(apdev[0], params)
1524
1525 class TestDbusConnect(TestDbus):
1526 def __init__(self, bus):
1527 TestDbus.__init__(self, bus)
1528 self.certification_received = False
1529 self.eap_status = False
1530 self.state = 0
1531
1532 def __enter__(self):
1533 gobject.timeout_add(1, self.run_connect)
1534 gobject.timeout_add(15000, self.timeout)
1535 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1536 "PropertiesChanged")
1537 self.add_signal(self.certification, WPAS_DBUS_IFACE,
1538 "Certification", byte_arrays=True)
1539 self.add_signal(self.networkRequest, WPAS_DBUS_IFACE,
1540 "NetworkRequest")
1541 self.add_signal(self.eap, WPAS_DBUS_IFACE, "EAP")
1542 self.loop.run()
1543 return self
1544
1545 def propertiesChanged(self, properties):
1546 logger.debug("propertiesChanged: %s" % str(properties))
1547 if 'State' in properties and properties['State'] == "completed":
1548 if self.state == 0:
1549 self.state = 1
1550 iface.EAPLogoff()
1551 logger.info("Set dNSName constraint")
1552 net_obj = bus.get_object(WPAS_DBUS_SERVICE, self.netw)
1553 args = dbus.Dictionary({ 'altsubject_match':
1554 self.server_dnsname },
1555 signature='sv')
1556 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1557 dbus_interface=dbus.PROPERTIES_IFACE)
1558 elif self.state == 2:
1559 self.state = 3
1560 iface.Disconnect()
1561 logger.info("Set non-matching dNSName constraint")
1562 net_obj = bus.get_object(WPAS_DBUS_SERVICE, self.netw)
1563 args = dbus.Dictionary({ 'altsubject_match':
1564 self.server_dnsname + "FOO" },
1565 signature='sv')
1566 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1567 dbus_interface=dbus.PROPERTIES_IFACE)
1568 if 'State' in properties and properties['State'] == "disconnected":
1569 if self.state == 1:
1570 self.state = 2
1571 iface.EAPLogon()
1572 iface.SelectNetwork(self.netw)
1573 if self.state == 3:
1574 self.state = 4
1575 iface.SelectNetwork(self.netw)
1576
1577 def certification(self, args):
1578 logger.debug("certification: %s" % str(args))
1579 self.certification_received = True
1580 if args['depth'] == 0:
1581 # The test server certificate is supposed to have dNSName
1582 if len(args['altsubject']) < 1:
1583 raise Exception("Missing dNSName")
1584 dnsname = args['altsubject'][0]
1585 if not dnsname.startswith("DNS:"):
1586 raise Exception("Expected dNSName not found: " + dnsname)
1587 logger.info("altsubject: " + dnsname)
1588 self.server_dnsname = dnsname
1589
1590 def eap(self, status, parameter):
1591 logger.debug("EAP: status=%s parameter=%s" % (status, parameter))
1592 if status == 'completion' and parameter == 'success':
1593 self.eap_status = True
1594 if self.state == 4 and status == 'remote certificate verification' and parameter == 'AltSubject mismatch':
1595 self.state = 5
1596 self.loop.quit()
1597
1598 def networkRequest(self, path, field, txt):
1599 logger.debug("networkRequest: %s %s %s" % (path, field, txt))
1600 if field == "PASSWORD":
1601 iface.NetworkReply(path, field, "password")
1602
1603 def run_connect(self, *args):
1604 logger.debug("run_connect")
1605 args = dbus.Dictionary({ 'ssid': ssid,
1606 'key_mgmt': 'IEEE8021X',
1607 'eapol_flags': 0,
1608 'eap': 'TTLS',
1609 'anonymous_identity': 'ttls',
1610 'identity': 'pap user',
1611 'ca_cert': 'auth_serv/ca.pem',
1612 'phase2': 'auth=PAP',
1613 'scan_freq': 2412 },
1614 signature='sv')
1615 self.netw = iface.AddNetwork(args)
1616 iface.SelectNetwork(self.netw)
1617 return False
1618
1619 def success(self):
1620 if not self.eap_status or not self.certification_received:
1621 return False
1622 return self.state == 5
1623
1624 with TestDbusConnect(bus) as t:
1625 if not t.success():
1626 raise Exception("Expected signals not seen")
1627
1628 def test_dbus_network(dev, apdev):
1629 """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
1630 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1631 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1632
1633 args = dbus.Dictionary({ 'ssid': "foo",
1634 'key_mgmt': 'WPA-PSK',
1635 'psk': "12345678",
1636 'identity': dbus.ByteArray([ 1, 2 ]),
1637 'priority': dbus.Int32(0),
1638 'scan_freq': dbus.UInt32(2412) },
1639 signature='sv')
1640 netw = iface.AddNetwork(args)
1641 id = int(dev[0].list_networks()[0]['id'])
1642 val = dev[0].get_network(id, "scan_freq")
1643 if val != "2412":
1644 raise Exception("Invalid scan_freq value: " + str(val))
1645 iface.RemoveNetwork(netw)
1646
1647 args = dbus.Dictionary({ 'ssid': "foo",
1648 'key_mgmt': 'NONE',
1649 'scan_freq': "2412 2432",
1650 'freq_list': "2412 2417 2432" },
1651 signature='sv')
1652 netw = iface.AddNetwork(args)
1653 id = int(dev[0].list_networks()[0]['id'])
1654 val = dev[0].get_network(id, "scan_freq")
1655 if val != "2412 2432":
1656 raise Exception("Invalid scan_freq value (2): " + str(val))
1657 val = dev[0].get_network(id, "freq_list")
1658 if val != "2412 2417 2432":
1659 raise Exception("Invalid freq_list value: " + str(val))
1660 iface.RemoveNetwork(netw)
1661 try:
1662 iface.RemoveNetwork(netw)
1663 raise Exception("Invalid RemoveNetwork() accepted")
1664 except dbus.exceptions.DBusException, e:
1665 if "NetworkUnknown" not in str(e):
1666 raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e))
1667 try:
1668 iface.SelectNetwork(netw)
1669 raise Exception("Invalid SelectNetwork() accepted")
1670 except dbus.exceptions.DBusException, e:
1671 if "NetworkUnknown" not in str(e):
1672 raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e))
1673
1674 args = dbus.Dictionary({ 'ssid': "foo1", 'key_mgmt': 'NONE',
1675 'identity': "testuser", 'scan_freq': '2412' },
1676 signature='sv')
1677 netw1 = iface.AddNetwork(args)
1678 args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1679 signature='sv')
1680 netw2 = iface.AddNetwork(args)
1681 res = if_obj.Get(WPAS_DBUS_IFACE, "Networks",
1682 dbus_interface=dbus.PROPERTIES_IFACE)
1683 if len(res) != 2:
1684 raise Exception("Unexpected number of networks")
1685
1686 net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1)
1687 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1688 dbus_interface=dbus.PROPERTIES_IFACE)
1689 if res != False:
1690 raise Exception("Added network was unexpectedly enabled by default")
1691 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(True),
1692 dbus_interface=dbus.PROPERTIES_IFACE)
1693 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1694 dbus_interface=dbus.PROPERTIES_IFACE)
1695 if res != True:
1696 raise Exception("Set(Enabled,True) did not seem to change property value")
1697 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(False),
1698 dbus_interface=dbus.PROPERTIES_IFACE)
1699 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1700 dbus_interface=dbus.PROPERTIES_IFACE)
1701 if res != False:
1702 raise Exception("Set(Enabled,False) did not seem to change property value")
1703 try:
1704 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.UInt32(1),
1705 dbus_interface=dbus.PROPERTIES_IFACE)
1706 raise Exception("Invalid Set(Enabled,1) accepted")
1707 except dbus.exceptions.DBusException, e:
1708 if "Error.Failed: wrong property type" not in str(e):
1709 raise Exception("Unexpected error message for invalid Set(Enabled,1): " + str(e))
1710
1711 args = dbus.Dictionary({ 'ssid': "foo1new" }, signature='sv')
1712 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1713 dbus_interface=dbus.PROPERTIES_IFACE)
1714 res = net_obj.Get(WPAS_DBUS_NETWORK, "Properties",
1715 dbus_interface=dbus.PROPERTIES_IFACE)
1716 if res['ssid'] != '"foo1new"':
1717 raise Exception("Set(Properties) failed to update ssid")
1718 if res['identity'] != '"testuser"':
1719 raise Exception("Set(Properties) unexpectedly changed unrelated parameter")
1720
1721 iface.RemoveAllNetworks()
1722 res = if_obj.Get(WPAS_DBUS_IFACE, "Networks",
1723 dbus_interface=dbus.PROPERTIES_IFACE)
1724 if len(res) != 0:
1725 raise Exception("Unexpected number of networks")
1726 iface.RemoveAllNetworks()
1727
1728 tests = [ dbus.Dictionary({ 'psk': "1234567" }, signature='sv'),
1729 dbus.Dictionary({ 'identity': dbus.ByteArray() },
1730 signature='sv'),
1731 dbus.Dictionary({ 'identity': dbus.Byte(1) }, signature='sv'),
1732 dbus.Dictionary({ 'identity': "" }, signature='sv') ]
1733 for args in tests:
1734 try:
1735 iface.AddNetwork(args)
1736 raise Exception("Invalid AddNetwork args accepted: " + str(args))
1737 except dbus.exceptions.DBusException, e:
1738 if "InvalidArgs" not in str(e):
1739 raise Exception("Unexpected error message for invalid AddNetwork: " + str(e))
1740
1741 def test_dbus_network_oom(dev, apdev):
1742 """D-Bus AddNetwork/RemoveNetwork parameters and OOM error cases"""
1743 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1744 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1745
1746 args = dbus.Dictionary({ 'ssid': "foo1", 'key_mgmt': 'NONE',
1747 'identity': "testuser", 'scan_freq': '2412' },
1748 signature='sv')
1749 netw1 = iface.AddNetwork(args)
1750 net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1)
1751
1752 with alloc_fail_dbus(dev[0], 1,
1753 "wpa_config_get_all;wpas_dbus_getter_network_properties",
1754 "Get"):
1755 net_obj.Get(WPAS_DBUS_NETWORK, "Properties",
1756 dbus_interface=dbus.PROPERTIES_IFACE)
1757
1758 iface.RemoveAllNetworks()
1759
1760 with alloc_fail_dbus(dev[0], 1,
1761 "wpas_dbus_new_decompose_object_path;wpas_dbus_handler_remove_network",
1762 "RemoveNetwork", "InvalidArgs"):
1763 iface.RemoveNetwork(dbus.ObjectPath("/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234"))
1764
1765 with alloc_fail(dev[0], 1, "wpa_dbus_register_object_per_iface;wpas_dbus_register_network"):
1766 args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1767 signature='sv')
1768 try:
1769 netw = iface.AddNetwork(args)
1770 # Currently, AddNetwork() succeeds even if os_strdup() for path
1771 # fails, so remove the network if that occurs.
1772 iface.RemoveNetwork(netw)
1773 except dbus.exceptions.DBusException, e:
1774 pass
1775
1776 for i in range(1, 3):
1777 with alloc_fail(dev[0], i, "=wpas_dbus_register_network"):
1778 try:
1779 netw = iface.AddNetwork(args)
1780 # Currently, AddNetwork() succeeds even if network registration
1781 # fails, so remove the network if that occurs.
1782 iface.RemoveNetwork(netw)
1783 except dbus.exceptions.DBusException, e:
1784 pass
1785
1786 with alloc_fail_dbus(dev[0], 1,
1787 "=wpa_config_add_network;wpas_dbus_handler_add_network",
1788 "AddNetwork",
1789 "UnknownError: wpa_supplicant could not add a network"):
1790 args = dbus.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1791 signature='sv')
1792 netw = iface.AddNetwork(args)
1793
1794 tests = [ (1,
1795 'wpa_dbus_dict_get_entry;set_network_properties;wpas_dbus_handler_add_network',
1796 dbus.Dictionary({ 'ssid': dbus.ByteArray(' ') },
1797 signature='sv')),
1798 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1799 dbus.Dictionary({ 'ssid': 'foo' }, signature='sv')),
1800 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1801 dbus.Dictionary({ 'eap': 'foo' }, signature='sv')),
1802 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1803 dbus.Dictionary({ 'priority': dbus.UInt32(1) },
1804 signature='sv')),
1805 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1806 dbus.Dictionary({ 'priority': dbus.Int32(1) },
1807 signature='sv')),
1808 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1809 dbus.Dictionary({ 'ssid': dbus.ByteArray(' ') },
1810 signature='sv')) ]
1811 for (count,funcs,args) in tests:
1812 with alloc_fail_dbus(dev[0], count, funcs, "AddNetwork", "InvalidArgs"):
1813 netw = iface.AddNetwork(args)
1814
1815 if len(if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
1816 dbus_interface=dbus.PROPERTIES_IFACE)) > 0:
1817 raise Exception("Unexpected network block added")
1818 if len(dev[0].list_networks()) > 0:
1819 raise Exception("Unexpected network block visible")
1820
1821 def test_dbus_interface(dev, apdev):
1822 """D-Bus CreateInterface/GetInterface/RemoveInterface parameters and error cases"""
1823 try:
1824 _test_dbus_interface(dev, apdev)
1825 finally:
1826 # Need to force P2P channel list update since the 'lo' interface
1827 # with driver=none ends up configuring default dualband channels.
1828 dev[0].request("SET country US")
1829 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1830 if ev is None:
1831 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"],
1832 timeout=1)
1833 dev[0].request("SET country 00")
1834 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1835 if ev is None:
1836 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"],
1837 timeout=1)
1838 subprocess.call(['iw', 'reg', 'set', '00'])
1839
1840 def _test_dbus_interface(dev, apdev):
1841 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1842 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
1843
1844 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1845 signature='sv')
1846 path = wpas.CreateInterface(params)
1847 logger.debug("New interface path: " + str(path))
1848 path2 = wpas.GetInterface("lo")
1849 if path != path2:
1850 raise Exception("Interface object mismatch")
1851
1852 params = dbus.Dictionary({ 'Ifname': 'lo',
1853 'Driver': 'none',
1854 'ConfigFile': 'foo',
1855 'BridgeIfname': 'foo', },
1856 signature='sv')
1857 try:
1858 wpas.CreateInterface(params)
1859 raise Exception("Invalid CreateInterface() accepted")
1860 except dbus.exceptions.DBusException, e:
1861 if "InterfaceExists" not in str(e):
1862 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1863
1864 wpas.RemoveInterface(path)
1865 try:
1866 wpas.RemoveInterface(path)
1867 raise Exception("Invalid RemoveInterface() accepted")
1868 except dbus.exceptions.DBusException, e:
1869 if "InterfaceUnknown" not in str(e):
1870 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e))
1871
1872 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none',
1873 'Foo': 123 },
1874 signature='sv')
1875 try:
1876 wpas.CreateInterface(params)
1877 raise Exception("Invalid CreateInterface() accepted")
1878 except dbus.exceptions.DBusException, e:
1879 if "InvalidArgs" not in str(e):
1880 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1881
1882 params = dbus.Dictionary({ 'Driver': 'none' }, signature='sv')
1883 try:
1884 wpas.CreateInterface(params)
1885 raise Exception("Invalid CreateInterface() accepted")
1886 except dbus.exceptions.DBusException, e:
1887 if "InvalidArgs" not in str(e):
1888 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
1889
1890 try:
1891 wpas.GetInterface("lo")
1892 raise Exception("Invalid GetInterface() accepted")
1893 except dbus.exceptions.DBusException, e:
1894 if "InterfaceUnknown" not in str(e):
1895 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e))
1896
1897 def test_dbus_interface_oom(dev, apdev):
1898 """D-Bus CreateInterface/GetInterface/RemoveInterface OOM error cases"""
1899 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1900 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
1901
1902 with alloc_fail_dbus(dev[0], 1, "wpa_dbus_dict_get_entry;wpas_dbus_handler_create_interface", "CreateInterface", "InvalidArgs"):
1903 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1904 signature='sv')
1905 wpas.CreateInterface(params)
1906
1907 for i in range(1, 1000):
1908 dev[0].request("TEST_ALLOC_FAIL %d:wpa_supplicant_add_iface;wpas_dbus_handler_create_interface" % i)
1909 params = dbus.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1910 signature='sv')
1911 try:
1912 npath = wpas.CreateInterface(params)
1913 wpas.RemoveInterface(npath)
1914 logger.info("CreateInterface succeeds after %d allocation failures" % i)
1915 state = dev[0].request('GET_ALLOC_FAIL')
1916 logger.info("GET_ALLOC_FAIL: " + state)
1917 dev[0].dump_monitor()
1918 dev[0].request("TEST_ALLOC_FAIL 0:")
1919 if i < 5:
1920 raise Exception("CreateInterface succeeded during out-of-memory")
1921 if not state.startswith('0:'):
1922 break
1923 except dbus.exceptions.DBusException, e:
1924 pass
1925
1926 for arg in [ 'Driver', 'Ifname', 'ConfigFile', 'BridgeIfname' ]:
1927 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_handler_create_interface",
1928 "CreateInterface"):
1929 params = dbus.Dictionary({ arg: 'foo' }, signature='sv')
1930 wpas.CreateInterface(params)
1931
1932 def test_dbus_blob(dev, apdev):
1933 """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
1934 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
1935 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1936
1937 blob = dbus.ByteArray("\x01\x02\x03")
1938 iface.AddBlob('blob1', blob)
1939 try:
1940 iface.AddBlob('blob1', dbus.ByteArray("\x01\x02\x04"))
1941 raise Exception("Invalid AddBlob() accepted")
1942 except dbus.exceptions.DBusException, e:
1943 if "BlobExists" not in str(e):
1944 raise Exception("Unexpected error message for invalid AddBlob: " + str(e))
1945 res = iface.GetBlob('blob1')
1946 if len(res) != len(blob):
1947 raise Exception("Unexpected blob data length")
1948 for i in range(len(res)):
1949 if res[i] != dbus.Byte(blob[i]):
1950 raise Exception("Unexpected blob data")
1951 res = if_obj.Get(WPAS_DBUS_IFACE, "Blobs",
1952 dbus_interface=dbus.PROPERTIES_IFACE)
1953 if 'blob1' not in res:
1954 raise Exception("Added blob missing from Blobs property")
1955 iface.RemoveBlob('blob1')
1956 try:
1957 iface.RemoveBlob('blob1')
1958 raise Exception("Invalid RemoveBlob() accepted")
1959 except dbus.exceptions.DBusException, e:
1960 if "BlobUnknown" not in str(e):
1961 raise Exception("Unexpected error message for invalid RemoveBlob: " + str(e))
1962 try:
1963 iface.GetBlob('blob1')
1964 raise Exception("Invalid GetBlob() accepted")
1965 except dbus.exceptions.DBusException, e:
1966 if "BlobUnknown" not in str(e):
1967 raise Exception("Unexpected error message for invalid GetBlob: " + str(e))
1968
1969 class TestDbusBlob(TestDbus):
1970 def __init__(self, bus):
1971 TestDbus.__init__(self, bus)
1972 self.blob_added = False
1973 self.blob_removed = False
1974
1975 def __enter__(self):
1976 gobject.timeout_add(1, self.run_blob)
1977 gobject.timeout_add(15000, self.timeout)
1978 self.add_signal(self.blobAdded, WPAS_DBUS_IFACE, "BlobAdded")
1979 self.add_signal(self.blobRemoved, WPAS_DBUS_IFACE, "BlobRemoved")
1980 self.loop.run()
1981 return self
1982
1983 def blobAdded(self, blobName):
1984 logger.debug("blobAdded: %s" % blobName)
1985 if blobName == 'blob2':
1986 self.blob_added = True
1987
1988 def blobRemoved(self, blobName):
1989 logger.debug("blobRemoved: %s" % blobName)
1990 if blobName == 'blob2':
1991 self.blob_removed = True
1992 self.loop.quit()
1993
1994 def run_blob(self, *args):
1995 logger.debug("run_blob")
1996 iface.AddBlob('blob2', dbus.ByteArray("\x01\x02\x04"))
1997 iface.RemoveBlob('blob2')
1998 return False
1999
2000 def success(self):
2001 return self.blob_added and self.blob_removed
2002
2003 with TestDbusBlob(bus) as t:
2004 if not t.success():
2005 raise Exception("Expected signals not seen")
2006
2007 def test_dbus_blob_oom(dev, apdev):
2008 """D-Bus AddNetwork/RemoveNetwork OOM error cases"""
2009 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2010 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2011
2012 for i in range(1, 4):
2013 with alloc_fail_dbus(dev[0], i, "wpas_dbus_handler_add_blob",
2014 "AddBlob"):
2015 iface.AddBlob('blob_no_mem', dbus.ByteArray("\x01\x02\x03\x04"))
2016
2017 def test_dbus_autoscan(dev, apdev):
2018 """D-Bus Autoscan()"""
2019 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2020 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2021
2022 iface.AutoScan("foo")
2023 iface.AutoScan("periodic:1")
2024 iface.AutoScan("")
2025 dev[0].request("AUTOSCAN ")
2026
2027 def test_dbus_autoscan_oom(dev, apdev):
2028 """D-Bus Autoscan() OOM"""
2029 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2030 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2031
2032 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_autoscan", "AutoScan"):
2033 iface.AutoScan("foo")
2034 dev[0].request("AUTOSCAN ")
2035
2036 def test_dbus_tdls_invalid(dev, apdev):
2037 """D-Bus invalid TDLS operations"""
2038 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2039 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2040
2041 hapd = hostapd.add_ap(apdev[0], { "ssid": "test-open" })
2042 connect_2sta_open(dev, hapd)
2043 addr1 = dev[1].p2p_interface_addr()
2044
2045 try:
2046 iface.TDLSDiscover("foo")
2047 raise Exception("Invalid TDLSDiscover() accepted")
2048 except dbus.exceptions.DBusException, e:
2049 if "InvalidArgs" not in str(e):
2050 raise Exception("Unexpected error message for invalid TDLSDiscover: " + str(e))
2051
2052 try:
2053 iface.TDLSStatus("foo")
2054 raise Exception("Invalid TDLSStatus() accepted")
2055 except dbus.exceptions.DBusException, e:
2056 if "InvalidArgs" not in str(e):
2057 raise Exception("Unexpected error message for invalid TDLSStatus: " + str(e))
2058
2059 res = iface.TDLSStatus(addr1)
2060 if res != "peer does not exist":
2061 raise Exception("Unexpected TDLSStatus response")
2062
2063 try:
2064 iface.TDLSSetup("foo")
2065 raise Exception("Invalid TDLSSetup() accepted")
2066 except dbus.exceptions.DBusException, e:
2067 if "InvalidArgs" not in str(e):
2068 raise Exception("Unexpected error message for invalid TDLSSetup: " + str(e))
2069
2070 try:
2071 iface.TDLSTeardown("foo")
2072 raise Exception("Invalid TDLSTeardown() accepted")
2073 except dbus.exceptions.DBusException, e:
2074 if "InvalidArgs" not in str(e):
2075 raise Exception("Unexpected error message for invalid TDLSTeardown: " + str(e))
2076
2077 try:
2078 iface.TDLSTeardown("00:11:22:33:44:55")
2079 raise Exception("TDLSTeardown accepted for unknown peer")
2080 except dbus.exceptions.DBusException, e:
2081 if "UnknownError: error performing TDLS teardown" not in str(e):
2082 raise Exception("Unexpected error message: " + str(e))
2083
2084 try:
2085 iface.TDLSChannelSwitch({})
2086 raise Exception("Invalid TDLSChannelSwitch() accepted")
2087 except dbus.exceptions.DBusException, e:
2088 if "InvalidArgs" not in str(e):
2089 raise Exception("Unexpected error message for invalid TDLSChannelSwitch: " + str(e))
2090
2091 try:
2092 iface.TDLSCancelChannelSwitch("foo")
2093 raise Exception("Invalid TDLSCancelChannelSwitch() accepted")
2094 except dbus.exceptions.DBusException, e:
2095 if "InvalidArgs" not in str(e):
2096 raise Exception("Unexpected error message for invalid TDLSCancelChannelSwitch: " + str(e))
2097
2098 def test_dbus_tdls_oom(dev, apdev):
2099 """D-Bus TDLS operations during OOM"""
2100 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2101 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2102
2103 with alloc_fail_dbus(dev[0], 1, "wpa_tdls_add_peer", "TDLSSetup",
2104 "UnknownError: error performing TDLS setup"):
2105 iface.TDLSSetup("00:11:22:33:44:55")
2106
2107 def test_dbus_tdls(dev, apdev):
2108 """D-Bus TDLS"""
2109 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2110 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2111
2112 hapd = hostapd.add_ap(apdev[0], { "ssid": "test-open" })
2113 connect_2sta_open(dev, hapd)
2114
2115 addr1 = dev[1].p2p_interface_addr()
2116
2117 class TestDbusTdls(TestDbus):
2118 def __init__(self, bus):
2119 TestDbus.__init__(self, bus)
2120 self.tdls_setup = False
2121 self.tdls_teardown = False
2122
2123 def __enter__(self):
2124 gobject.timeout_add(1, self.run_tdls)
2125 gobject.timeout_add(15000, self.timeout)
2126 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
2127 "PropertiesChanged")
2128 self.loop.run()
2129 return self
2130
2131 def propertiesChanged(self, properties):
2132 logger.debug("propertiesChanged: %s" % str(properties))
2133
2134 def run_tdls(self, *args):
2135 logger.debug("run_tdls")
2136 iface.TDLSDiscover(addr1)
2137 gobject.timeout_add(100, self.run_tdls2)
2138 return False
2139
2140 def run_tdls2(self, *args):
2141 logger.debug("run_tdls2")
2142 iface.TDLSSetup(addr1)
2143 gobject.timeout_add(500, self.run_tdls3)
2144 return False
2145
2146 def run_tdls3(self, *args):
2147 logger.debug("run_tdls3")
2148 res = iface.TDLSStatus(addr1)
2149 if res == "connected":
2150 self.tdls_setup = True
2151 else:
2152 logger.info("Unexpected TDLSStatus: " + res)
2153 iface.TDLSTeardown(addr1)
2154 gobject.timeout_add(200, self.run_tdls4)
2155 return False
2156
2157 def run_tdls4(self, *args):
2158 logger.debug("run_tdls4")
2159 res = iface.TDLSStatus(addr1)
2160 if res == "peer does not exist":
2161 self.tdls_teardown = True
2162 else:
2163 logger.info("Unexpected TDLSStatus: " + res)
2164 self.loop.quit()
2165 return False
2166
2167 def success(self):
2168 return self.tdls_setup and self.tdls_teardown
2169
2170 with TestDbusTdls(bus) as t:
2171 if not t.success():
2172 raise Exception("Expected signals not seen")
2173
2174 def test_dbus_tdls_channel_switch(dev, apdev):
2175 """D-Bus TDLS channel switch configuration"""
2176 flags = int(dev[0].get_driver_status_field('capa.flags'), 16)
2177 if flags & 0x800000000 == 0:
2178 raise HwsimSkip("Driver does not support TDLS channel switching")
2179
2180 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2181 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2182
2183 hapd = hostapd.add_ap(apdev[0], { "ssid": "test-open" })
2184 connect_2sta_open(dev, hapd)
2185
2186 addr1 = dev[1].p2p_interface_addr()
2187
2188 class TestDbusTdls(TestDbus):
2189 def __init__(self, bus):
2190 TestDbus.__init__(self, bus)
2191 self.tdls_setup = False
2192 self.tdls_done = False
2193
2194 def __enter__(self):
2195 gobject.timeout_add(1, self.run_tdls)
2196 gobject.timeout_add(15000, self.timeout)
2197 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
2198 "PropertiesChanged")
2199 self.loop.run()
2200 return self
2201
2202 def propertiesChanged(self, properties):
2203 logger.debug("propertiesChanged: %s" % str(properties))
2204
2205 def run_tdls(self, *args):
2206 logger.debug("run_tdls")
2207 iface.TDLSDiscover(addr1)
2208 gobject.timeout_add(100, self.run_tdls2)
2209 return False
2210
2211 def run_tdls2(self, *args):
2212 logger.debug("run_tdls2")
2213 iface.TDLSSetup(addr1)
2214 gobject.timeout_add(500, self.run_tdls3)
2215 return False
2216
2217 def run_tdls3(self, *args):
2218 logger.debug("run_tdls3")
2219 res = iface.TDLSStatus(addr1)
2220 if res == "connected":
2221 self.tdls_setup = True
2222 else:
2223 logger.info("Unexpected TDLSStatus: " + res)
2224
2225 # Unknown dict entry
2226 args = dbus.Dictionary({ 'Foobar': dbus.Byte(1) },
2227 signature='sv')
2228 try:
2229 iface.TDLSChannelSwitch(args)
2230 except Exception, e:
2231 if "InvalidArgs" not in str(e):
2232 raise Exception("Unexpected exception")
2233
2234 # Missing OperClass
2235 args = dbus.Dictionary({}, signature='sv')
2236 try:
2237 iface.TDLSChannelSwitch(args)
2238 except Exception, e:
2239 if "InvalidArgs" not in str(e):
2240 raise Exception("Unexpected exception")
2241
2242 # Missing Frequency
2243 args = dbus.Dictionary({ 'OperClass': dbus.Byte(1) },
2244 signature='sv')
2245 try:
2246 iface.TDLSChannelSwitch(args)
2247 except Exception, e:
2248 if "InvalidArgs" not in str(e):
2249 raise Exception("Unexpected exception")
2250
2251 # Missing PeerAddress
2252 args = dbus.Dictionary({ 'OperClass': dbus.Byte(1),
2253 'Frequency': dbus.UInt32(2417) },
2254 signature='sv')
2255 try:
2256 iface.TDLSChannelSwitch(args)
2257 except Exception, e:
2258 if "InvalidArgs" not in str(e):
2259 raise Exception("Unexpected exception")
2260
2261 # Valid parameters
2262 args = dbus.Dictionary({ 'OperClass': dbus.Byte(1),
2263 'Frequency': dbus.UInt32(2417),
2264 'PeerAddress': addr1,
2265 'SecChannelOffset': dbus.UInt32(0),
2266 'CenterFrequency1': dbus.UInt32(0),
2267 'CenterFrequency2': dbus.UInt32(0),
2268 'Bandwidth': dbus.UInt32(20),
2269 'HT': dbus.Boolean(False),
2270 'VHT': dbus.Boolean(False) },
2271 signature='sv')
2272 iface.TDLSChannelSwitch(args)
2273
2274 gobject.timeout_add(200, self.run_tdls4)
2275 return False
2276
2277 def run_tdls4(self, *args):
2278 logger.debug("run_tdls4")
2279 iface.TDLSCancelChannelSwitch(addr1)
2280 self.tdls_done = True
2281 self.loop.quit()
2282 return False
2283
2284 def success(self):
2285 return self.tdls_setup and self.tdls_done
2286
2287 with TestDbusTdls(bus) as t:
2288 if not t.success():
2289 raise Exception("Expected signals not seen")
2290
2291 def test_dbus_pkcs11(dev, apdev):
2292 """D-Bus SetPKCS11EngineAndModulePath()"""
2293 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2294 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2295
2296 try:
2297 iface.SetPKCS11EngineAndModulePath("foo", "bar")
2298 except dbus.exceptions.DBusException, e:
2299 if "Error.Failed: Reinit of the EAPOL" not in str(e):
2300 raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e))
2301
2302 try:
2303 iface.SetPKCS11EngineAndModulePath("foo", "")
2304 except dbus.exceptions.DBusException, e:
2305 if "Error.Failed: Reinit of the EAPOL" not in str(e):
2306 raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e))
2307
2308 iface.SetPKCS11EngineAndModulePath("", "bar")
2309 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath",
2310 dbus_interface=dbus.PROPERTIES_IFACE)
2311 if res != "":
2312 raise Exception("Unexpected PKCS11EnginePath value: " + res)
2313 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath",
2314 dbus_interface=dbus.PROPERTIES_IFACE)
2315 if res != "bar":
2316 raise Exception("Unexpected PKCS11ModulePath value: " + res)
2317
2318 iface.SetPKCS11EngineAndModulePath("", "")
2319 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath",
2320 dbus_interface=dbus.PROPERTIES_IFACE)
2321 if res != "":
2322 raise Exception("Unexpected PKCS11EnginePath value: " + res)
2323 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath",
2324 dbus_interface=dbus.PROPERTIES_IFACE)
2325 if res != "":
2326 raise Exception("Unexpected PKCS11ModulePath value: " + res)
2327
2328 def test_dbus_apscan(dev, apdev):
2329 """D-Bus Get/Set ApScan"""
2330 try:
2331 _test_dbus_apscan(dev, apdev)
2332 finally:
2333 dev[0].request("AP_SCAN 1")
2334
2335 def _test_dbus_apscan(dev, apdev):
2336 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2337
2338 res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan",
2339 dbus_interface=dbus.PROPERTIES_IFACE)
2340 if res != 1:
2341 raise Exception("Unexpected initial ApScan value: %d" % res)
2342
2343 for i in range(3):
2344 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(i),
2345 dbus_interface=dbus.PROPERTIES_IFACE)
2346 res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan",
2347 dbus_interface=dbus.PROPERTIES_IFACE)
2348 if res != i:
2349 raise Exception("Unexpected ApScan value %d (expected %d)" % (res, i))
2350
2351 try:
2352 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.Int16(-1),
2353 dbus_interface=dbus.PROPERTIES_IFACE)
2354 raise Exception("Invalid Set(ApScan,-1) accepted")
2355 except dbus.exceptions.DBusException, e:
2356 if "Error.Failed: wrong property type" not in str(e):
2357 raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e))
2358
2359 try:
2360 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(123),
2361 dbus_interface=dbus.PROPERTIES_IFACE)
2362 raise Exception("Invalid Set(ApScan,123) accepted")
2363 except dbus.exceptions.DBusException, e:
2364 if "Error.Failed: ap_scan must be 0, 1, or 2" not in str(e):
2365 raise Exception("Unexpected error message for invalid Set(ApScan,123): " + str(e))
2366
2367 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(1),
2368 dbus_interface=dbus.PROPERTIES_IFACE)
2369
2370 def test_dbus_pmf(dev, apdev):
2371 """D-Bus Get/Set Pmf"""
2372 try:
2373 _test_dbus_pmf(dev, apdev)
2374 finally:
2375 dev[0].request("SET pmf 0")
2376
2377 def _test_dbus_pmf(dev, apdev):
2378 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2379
2380 dev[0].set("pmf", "0")
2381 res = if_obj.Get(WPAS_DBUS_IFACE, "Pmf",
2382 dbus_interface=dbus.PROPERTIES_IFACE)
2383 if res != "0":
2384 raise Exception("Unexpected initial Pmf value: %s" % res)
2385
2386 for i in range(3):
2387 if_obj.Set(WPAS_DBUS_IFACE, "Pmf", str(i),
2388 dbus_interface=dbus.PROPERTIES_IFACE)
2389 res = if_obj.Get(WPAS_DBUS_IFACE, "Pmf",
2390 dbus_interface=dbus.PROPERTIES_IFACE)
2391 if res != str(i):
2392 raise Exception("Unexpected Pmf value %s (expected %d)" % (res, i))
2393
2394 if_obj.Set(WPAS_DBUS_IFACE, "Pmf", "1",
2395 dbus_interface=dbus.PROPERTIES_IFACE)
2396
2397 def test_dbus_fastreauth(dev, apdev):
2398 """D-Bus Get/Set FastReauth"""
2399 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2400
2401 res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth",
2402 dbus_interface=dbus.PROPERTIES_IFACE)
2403 if res != True:
2404 raise Exception("Unexpected initial FastReauth value: " + str(res))
2405
2406 for i in [ False, True ]:
2407 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(i),
2408 dbus_interface=dbus.PROPERTIES_IFACE)
2409 res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth",
2410 dbus_interface=dbus.PROPERTIES_IFACE)
2411 if res != i:
2412 raise Exception("Unexpected FastReauth value %d (expected %d)" % (res, i))
2413
2414 try:
2415 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Int16(-1),
2416 dbus_interface=dbus.PROPERTIES_IFACE)
2417 raise Exception("Invalid Set(FastReauth,-1) accepted")
2418 except dbus.exceptions.DBusException, e:
2419 if "Error.Failed: wrong property type" not in str(e):
2420 raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e))
2421
2422 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(True),
2423 dbus_interface=dbus.PROPERTIES_IFACE)
2424
2425 def test_dbus_bss_expire(dev, apdev):
2426 """D-Bus Get/Set BSSExpireAge and BSSExpireCount"""
2427 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2428
2429 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(179),
2430 dbus_interface=dbus.PROPERTIES_IFACE)
2431 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireAge",
2432 dbus_interface=dbus.PROPERTIES_IFACE)
2433 if res != 179:
2434 raise Exception("Unexpected BSSExpireAge value %d (expected %d)" % (res, i))
2435
2436 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(3),
2437 dbus_interface=dbus.PROPERTIES_IFACE)
2438 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireCount",
2439 dbus_interface=dbus.PROPERTIES_IFACE)
2440 if res != 3:
2441 raise Exception("Unexpected BSSExpireCount value %d (expected %d)" % (res, i))
2442
2443 try:
2444 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.Int16(-1),
2445 dbus_interface=dbus.PROPERTIES_IFACE)
2446 raise Exception("Invalid Set(BSSExpireAge,-1) accepted")
2447 except dbus.exceptions.DBusException, e:
2448 if "Error.Failed: wrong property type" not in str(e):
2449 raise Exception("Unexpected error message for invalid Set(BSSExpireAge,-1): " + str(e))
2450
2451 try:
2452 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(9),
2453 dbus_interface=dbus.PROPERTIES_IFACE)
2454 raise Exception("Invalid Set(BSSExpireAge,9) accepted")
2455 except dbus.exceptions.DBusException, e:
2456 if "Error.Failed: BSSExpireAge must be >= 10" not in str(e):
2457 raise Exception("Unexpected error message for invalid Set(BSSExpireAge,9): " + str(e))
2458
2459 try:
2460 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.Int16(-1),
2461 dbus_interface=dbus.PROPERTIES_IFACE)
2462 raise Exception("Invalid Set(BSSExpireCount,-1) accepted")
2463 except dbus.exceptions.DBusException, e:
2464 if "Error.Failed: wrong property type" not in str(e):
2465 raise Exception("Unexpected error message for invalid Set(BSSExpireCount,-1): " + str(e))
2466
2467 try:
2468 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(0),
2469 dbus_interface=dbus.PROPERTIES_IFACE)
2470 raise Exception("Invalid Set(BSSExpireCount,0) accepted")
2471 except dbus.exceptions.DBusException, e:
2472 if "Error.Failed: BSSExpireCount must be > 0" not in str(e):
2473 raise Exception("Unexpected error message for invalid Set(BSSExpireCount,0): " + str(e))
2474
2475 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(180),
2476 dbus_interface=dbus.PROPERTIES_IFACE)
2477 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(2),
2478 dbus_interface=dbus.PROPERTIES_IFACE)
2479
2480 def test_dbus_country(dev, apdev):
2481 """D-Bus Get/Set Country"""
2482 try:
2483 _test_dbus_country(dev, apdev)
2484 finally:
2485 dev[0].request("SET country 00")
2486 subprocess.call(['iw', 'reg', 'set', '00'])
2487
2488 def _test_dbus_country(dev, apdev):
2489 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2490
2491 # work around issues with possible pending regdom event from the end of
2492 # the previous test case
2493 time.sleep(0.2)
2494 dev[0].dump_monitor()
2495
2496 if_obj.Set(WPAS_DBUS_IFACE, "Country", "FI",
2497 dbus_interface=dbus.PROPERTIES_IFACE)
2498 res = if_obj.Get(WPAS_DBUS_IFACE, "Country",
2499 dbus_interface=dbus.PROPERTIES_IFACE)
2500 if res != "FI":
2501 raise Exception("Unexpected Country value %s (expected FI)" % res)
2502
2503 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
2504 if ev is None:
2505 # For now, work around separate P2P Device interface event delivery
2506 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
2507 if ev is None:
2508 raise Exception("regdom change event not seen")
2509 if "init=USER type=COUNTRY alpha2=FI" not in ev:
2510 raise Exception("Unexpected event contents: " + ev)
2511
2512 try:
2513 if_obj.Set(WPAS_DBUS_IFACE, "Country", dbus.Int16(-1),
2514 dbus_interface=dbus.PROPERTIES_IFACE)
2515 raise Exception("Invalid Set(Country,-1) accepted")
2516 except dbus.exceptions.DBusException, e:
2517 if "Error.Failed: wrong property type" not in str(e):
2518 raise Exception("Unexpected error message for invalid Set(Country,-1): " + str(e))
2519
2520 try:
2521 if_obj.Set(WPAS_DBUS_IFACE, "Country", "F",
2522 dbus_interface=dbus.PROPERTIES_IFACE)
2523 raise Exception("Invalid Set(Country,F) accepted")
2524 except dbus.exceptions.DBusException, e:
2525 if "Error.Failed: invalid country code" not in str(e):
2526 raise Exception("Unexpected error message for invalid Set(Country,F): " + str(e))
2527
2528 if_obj.Set(WPAS_DBUS_IFACE, "Country", "00",
2529 dbus_interface=dbus.PROPERTIES_IFACE)
2530
2531 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
2532 if ev is None:
2533 # For now, work around separate P2P Device interface event delivery
2534 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
2535 if ev is None:
2536 raise Exception("regdom change event not seen")
2537 # init=CORE was previously used due to invalid db.txt data for 00. For
2538 # now, allow both it and the new init=USER after fixed db.txt.
2539 if "init=CORE type=WORLD" not in ev and "init=USER type=WORLD" not in ev:
2540 raise Exception("Unexpected event contents: " + ev)
2541
2542 def test_dbus_scan_interval(dev, apdev):
2543 """D-Bus Get/Set ScanInterval"""
2544 try:
2545 _test_dbus_scan_interval(dev, apdev)
2546 finally:
2547 dev[0].request("SCAN_INTERVAL 5")
2548
2549 def _test_dbus_scan_interval(dev, apdev):
2550 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2551
2552 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(3),
2553 dbus_interface=dbus.PROPERTIES_IFACE)
2554 res = if_obj.Get(WPAS_DBUS_IFACE, "ScanInterval",
2555 dbus_interface=dbus.PROPERTIES_IFACE)
2556 if res != 3:
2557 raise Exception("Unexpected ScanInterval value %d (expected %d)" % (res, i))
2558
2559 try:
2560 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.UInt16(100),
2561 dbus_interface=dbus.PROPERTIES_IFACE)
2562 raise Exception("Invalid Set(ScanInterval,100) accepted")
2563 except dbus.exceptions.DBusException, e:
2564 if "Error.Failed: wrong property type" not in str(e):
2565 raise Exception("Unexpected error message for invalid Set(ScanInterval,100): " + str(e))
2566
2567 try:
2568 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(-1),
2569 dbus_interface=dbus.PROPERTIES_IFACE)
2570 raise Exception("Invalid Set(ScanInterval,-1) accepted")
2571 except dbus.exceptions.DBusException, e:
2572 if "Error.Failed: scan_interval must be >= 0" not in str(e):
2573 raise Exception("Unexpected error message for invalid Set(ScanInterval,-1): " + str(e))
2574
2575 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(5),
2576 dbus_interface=dbus.PROPERTIES_IFACE)
2577
2578 def test_dbus_probe_req_reporting(dev, apdev):
2579 """D-Bus Probe Request reporting"""
2580 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2581
2582 dev[1].p2p_find(social=True)
2583
2584 class TestDbusProbe(TestDbus):
2585 def __init__(self, bus):
2586 TestDbus.__init__(self, bus)
2587 self.reported = False
2588
2589 def __enter__(self):
2590 gobject.timeout_add(1, self.run_test)
2591 gobject.timeout_add(15000, self.timeout)
2592 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
2593 "GroupStarted")
2594 self.add_signal(self.probeRequest, WPAS_DBUS_IFACE, "ProbeRequest",
2595 byte_arrays=True)
2596 self.loop.run()
2597 return self
2598
2599 def groupStarted(self, properties):
2600 logger.debug("groupStarted: " + str(properties))
2601 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
2602 properties['interface_object'])
2603 self.iface = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE)
2604 self.iface.SubscribeProbeReq()
2605 self.group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2606
2607 def probeRequest(self, args):
2608 logger.debug("probeRequest: args=%s" % str(args))
2609 self.reported = True
2610 self.loop.quit()
2611
2612 def run_test(self, *args):
2613 logger.debug("run_test")
2614 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2615 params = dbus.Dictionary({ 'frequency': 2412 })
2616 p2p.GroupAdd(params)
2617 return False
2618
2619 def success(self):
2620 return self.reported
2621
2622 with TestDbusProbe(bus) as t:
2623 if not t.success():
2624 raise Exception("Expected signals not seen")
2625 t.iface.UnsubscribeProbeReq()
2626 try:
2627 t.iface.UnsubscribeProbeReq()
2628 raise Exception("Invalid UnsubscribeProbeReq() accepted")
2629 except dbus.exceptions.DBusException, e:
2630 if "NoSubscription" not in str(e):
2631 raise Exception("Unexpected error message for invalid UnsubscribeProbeReq(): " + str(e))
2632 t.group_p2p.Disconnect()
2633
2634 with TestDbusProbe(bus) as t:
2635 if not t.success():
2636 raise Exception("Expected signals not seen")
2637 # On purpose, leave ProbeReq subscription in place to test automatic
2638 # cleanup.
2639
2640 dev[1].p2p_stop_find()
2641
2642 def test_dbus_probe_req_reporting_oom(dev, apdev):
2643 """D-Bus Probe Request reporting (OOM)"""
2644 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2645 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2646
2647 # Need to make sure this process has not already subscribed to avoid false
2648 # failures due to the operation succeeding due to os_strdup() not even
2649 # getting called.
2650 try:
2651 iface.UnsubscribeProbeReq()
2652 was_subscribed = True
2653 except dbus.exceptions.DBusException, e:
2654 was_subscribed = False
2655 pass
2656
2657 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_subscribe_preq",
2658 "SubscribeProbeReq"):
2659 iface.SubscribeProbeReq()
2660
2661 if was_subscribed:
2662 # On purpose, leave ProbeReq subscription in place to test automatic
2663 # cleanup.
2664 iface.SubscribeProbeReq()
2665
2666 def test_dbus_p2p_invalid(dev, apdev):
2667 """D-Bus invalid P2P operations"""
2668 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2669 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2670
2671 try:
2672 p2p.RejectPeer(path + "/Peers/00112233445566")
2673 raise Exception("Invalid RejectPeer accepted")
2674 except dbus.exceptions.DBusException, e:
2675 if "UnknownError: Failed to call wpas_p2p_reject" not in str(e):
2676 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2677
2678 try:
2679 p2p.RejectPeer("/foo")
2680 raise Exception("Invalid RejectPeer accepted")
2681 except dbus.exceptions.DBusException, e:
2682 if "InvalidArgs" not in str(e):
2683 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2684
2685 tests = [ { },
2686 { 'peer': 'foo' },
2687 { 'foo': "bar" },
2688 { 'iface': "abc" },
2689 { 'iface': 123 } ]
2690 for t in tests:
2691 try:
2692 p2p.RemoveClient(t)
2693 raise Exception("Invalid RemoveClient accepted")
2694 except dbus.exceptions.DBusException, e:
2695 if "InvalidArgs" not in str(e):
2696 raise Exception("Unexpected error message for invalid RemoveClient(): " + str(e))
2697
2698 tests = [ {'DiscoveryType': 'foo'},
2699 {'RequestedDeviceTypes': 'foo'},
2700 {'RequestedDeviceTypes': ['foo']},
2701 {'RequestedDeviceTypes': ['1','2','3','4','5','6','7','8','9',
2702 '10','11','12','13','14','15','16',
2703 '17']},
2704 {'RequestedDeviceTypes': dbus.Array([], signature="s")},
2705 {'RequestedDeviceTypes': dbus.Array([['foo']], signature="as")},
2706 {'RequestedDeviceTypes': dbus.Array([], signature="i")},
2707 {'RequestedDeviceTypes': [dbus.ByteArray('12345678'),
2708 dbus.ByteArray('1234567')]},
2709 {'Foo': dbus.Int16(1)},
2710 {'Foo': dbus.UInt16(1)},
2711 {'Foo': dbus.Int64(1)},
2712 {'Foo': dbus.UInt64(1)},
2713 {'Foo': dbus.Double(1.23)},
2714 {'Foo': dbus.Signature('s')},
2715 {'Foo': 'bar'}]
2716 for t in tests:
2717 try:
2718 p2p.Find(dbus.Dictionary(t))
2719 raise Exception("Invalid Find accepted")
2720 except dbus.exceptions.DBusException, e:
2721 if "InvalidArgs" not in str(e):
2722 raise Exception("Unexpected error message for invalid Find(): " + str(e))
2723
2724 for p in [ "/foo",
2725 "/fi/w1/wpa_supplicant1/Interfaces/1234",
2726 "/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234" ]:
2727 try:
2728 p2p.RemovePersistentGroup(dbus.ObjectPath(p))
2729 raise Exception("Invalid RemovePersistentGroup accepted")
2730 except dbus.exceptions.DBusException, e:
2731 if "InvalidArgs" not in str(e):
2732 raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e))
2733
2734 try:
2735 dev[0].request("P2P_SET disabled 1")
2736 p2p.Listen(5)
2737 raise Exception("Invalid Listen accepted")
2738 except dbus.exceptions.DBusException, e:
2739 if "UnknownError: Could not start P2P listen" not in str(e):
2740 raise Exception("Unexpected error message for invalid Listen: " + str(e))
2741 finally:
2742 dev[0].request("P2P_SET disabled 0")
2743
2744 test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False)
2745 test_p2p = dbus.Interface(test_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2746 try:
2747 test_p2p.Listen("foo")
2748 raise Exception("Invalid Listen accepted")
2749 except dbus.exceptions.DBusException, e:
2750 if "InvalidArgs" not in str(e):
2751 raise Exception("Unexpected error message for invalid Listen: " + str(e))
2752
2753 try:
2754 dev[0].request("P2P_SET disabled 1")
2755 p2p.ExtendedListen(dbus.Dictionary({}))
2756 raise Exception("Invalid ExtendedListen accepted")
2757 except dbus.exceptions.DBusException, e:
2758 if "UnknownError: failed to initiate a p2p_ext_listen" not in str(e):
2759 raise Exception("Unexpected error message for invalid ExtendedListen: " + str(e))
2760 finally:
2761 dev[0].request("P2P_SET disabled 0")
2762
2763 try:
2764 dev[0].request("P2P_SET disabled 1")
2765 args = { 'duration1': 30000, 'interval1': 102400,
2766 'duration2': 20000, 'interval2': 102400 }
2767 p2p.PresenceRequest(args)
2768 raise Exception("Invalid PresenceRequest accepted")
2769 except dbus.exceptions.DBusException, e:
2770 if "UnknownError: Failed to invoke presence request" not in str(e):
2771 raise Exception("Unexpected error message for invalid PresenceRequest: " + str(e))
2772 finally:
2773 dev[0].request("P2P_SET disabled 0")
2774
2775 try:
2776 params = dbus.Dictionary({'frequency': dbus.Int32(-1)})
2777 p2p.GroupAdd(params)
2778 raise Exception("Invalid GroupAdd accepted")
2779 except dbus.exceptions.DBusException, e:
2780 if "InvalidArgs" not in str(e):
2781 raise Exception("Unexpected error message for invalid GroupAdd: " + str(e))
2782
2783 try:
2784 params = dbus.Dictionary({'persistent_group_object':
2785 dbus.ObjectPath(path),
2786 'frequency': 2412})
2787 p2p.GroupAdd(params)
2788 raise Exception("Invalid GroupAdd accepted")
2789 except dbus.exceptions.DBusException, e:
2790 if "InvalidArgs" not in str(e):
2791 raise Exception("Unexpected error message for invalid GroupAdd: " + str(e))
2792
2793 try:
2794 p2p.Disconnect()
2795 raise Exception("Invalid Disconnect accepted")
2796 except dbus.exceptions.DBusException, e:
2797 if "UnknownError: failed to disconnect" not in str(e):
2798 raise Exception("Unexpected error message for invalid Disconnect: " + str(e))
2799
2800 try:
2801 dev[0].request("P2P_SET disabled 1")
2802 p2p.Flush()
2803 raise Exception("Invalid Flush accepted")
2804 except dbus.exceptions.DBusException, e:
2805 if "Error.Failed: P2P is not available for this interface" not in str(e):
2806 raise Exception("Unexpected error message for invalid Flush: " + str(e))
2807 finally:
2808 dev[0].request("P2P_SET disabled 0")
2809
2810 try:
2811 dev[0].request("P2P_SET disabled 1")
2812 args = { 'peer': path,
2813 'join': True,
2814 'wps_method': 'pbc',
2815 'frequency': 2412 }
2816 pin = p2p.Connect(args)
2817 raise Exception("Invalid Connect accepted")
2818 except dbus.exceptions.DBusException, e:
2819 if "Error.Failed: P2P is not available for this interface" not in str(e):
2820 raise Exception("Unexpected error message for invalid Connect: " + str(e))
2821 finally:
2822 dev[0].request("P2P_SET disabled 0")
2823
2824 tests = [ { 'frequency': dbus.Int32(-1) },
2825 { 'wps_method': 'pbc' },
2826 { 'wps_method': 'foo' } ]
2827 for args in tests:
2828 try:
2829 pin = p2p.Connect(args)
2830 raise Exception("Invalid Connect accepted")
2831 except dbus.exceptions.DBusException, e:
2832 if "InvalidArgs" not in str(e):
2833 raise Exception("Unexpected error message for invalid Connect: " + str(e))
2834
2835 try:
2836 dev[0].request("P2P_SET disabled 1")
2837 args = { 'peer': path }
2838 pin = p2p.Invite(args)
2839 raise Exception("Invalid Invite accepted")
2840 except dbus.exceptions.DBusException, e:
2841 if "Error.Failed: P2P is not available for this interface" not in str(e):
2842 raise Exception("Unexpected error message for invalid Invite: " + str(e))
2843 finally:
2844 dev[0].request("P2P_SET disabled 0")
2845
2846 try:
2847 args = { 'foo': 'bar' }
2848 pin = p2p.Invite(args)
2849 raise Exception("Invalid Invite accepted")
2850 except dbus.exceptions.DBusException, e:
2851 if "InvalidArgs" not in str(e):
2852 raise Exception("Unexpected error message for invalid Connect: " + str(e))
2853
2854 tests = [ (path, 'display', "InvalidArgs"),
2855 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2856 'display',
2857 "UnknownError: Failed to send provision discovery request"),
2858 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2859 'keypad',
2860 "UnknownError: Failed to send provision discovery request"),
2861 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2862 'pbc',
2863 "UnknownError: Failed to send provision discovery request"),
2864 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2865 'pushbutton',
2866 "UnknownError: Failed to send provision discovery request"),
2867 (dbus.ObjectPath(path + "/Peers/00112233445566"),
2868 'foo', "InvalidArgs") ]
2869 for (p,method,err) in tests:
2870 try:
2871 p2p.ProvisionDiscoveryRequest(p, method)
2872 raise Exception("Invalid ProvisionDiscoveryRequest accepted")
2873 except dbus.exceptions.DBusException, e:
2874 if err not in str(e):
2875 raise Exception("Unexpected error message for invalid ProvisionDiscoveryRequest: " + str(e))
2876
2877 try:
2878 dev[0].request("P2P_SET disabled 1")
2879 if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers",
2880 dbus_interface=dbus.PROPERTIES_IFACE)
2881 raise Exception("Invalid Get(Peers) accepted")
2882 except dbus.exceptions.DBusException, e:
2883 if "Error.Failed: P2P is not available for this interface" not in str(e):
2884 raise Exception("Unexpected error message for invalid Get(Peers): " + str(e))
2885 finally:
2886 dev[0].request("P2P_SET disabled 0")
2887
2888 def test_dbus_p2p_oom(dev, apdev):
2889 """D-Bus P2P operations and OOM"""
2890 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2891 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2892
2893 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_string_array",
2894 "Find", "InvalidArgs"):
2895 p2p.Find(dbus.Dictionary({ 'Foo': [ 'bar' ] }))
2896
2897 with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_string_array",
2898 "Find", "InvalidArgs"):
2899 p2p.Find(dbus.Dictionary({ 'Foo': [ 'bar' ] }))
2900
2901 with alloc_fail_dbus(dev[0], 10, "_wpa_dbus_dict_entry_get_string_array",
2902 "Find", "InvalidArgs"):
2903 p2p.Find(dbus.Dictionary({ 'Foo': [ '1','2','3','4','5','6','7','8','9' ] }))
2904
2905 with alloc_fail_dbus(dev[0], 1, ":=_wpa_dbus_dict_entry_get_binarray",
2906 "Find", "InvalidArgs"):
2907 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] }))
2908
2909 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array;_wpa_dbus_dict_entry_get_binarray",
2910 "Find", "InvalidArgs"):
2911 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] }))
2912
2913 with alloc_fail_dbus(dev[0], 2, "=_wpa_dbus_dict_entry_get_binarray",
2914 "Find", "InvalidArgs"):
2915 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123'),
2916 dbus.ByteArray('123'),
2917 dbus.ByteArray('123'),
2918 dbus.ByteArray('123'),
2919 dbus.ByteArray('123'),
2920 dbus.ByteArray('123'),
2921 dbus.ByteArray('123'),
2922 dbus.ByteArray('123'),
2923 dbus.ByteArray('123'),
2924 dbus.ByteArray('123'),
2925 dbus.ByteArray('123') ] }))
2926
2927 with alloc_fail_dbus(dev[0], 1, "wpabuf_alloc_ext_data;_wpa_dbus_dict_entry_get_binarray",
2928 "Find", "InvalidArgs"):
2929 p2p.Find(dbus.Dictionary({ 'Foo': [ dbus.ByteArray('123') ] }))
2930
2931 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_handler_p2p_find",
2932 "Find", "InvalidArgs"):
2933 p2p.Find(dbus.Dictionary({ 'Foo': path }))
2934
2935 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array",
2936 "AddService", "InvalidArgs"):
2937 args = { 'service_type': 'bonjour',
2938 'response': dbus.ByteArray(500*'b') }
2939 p2p.AddService(args)
2940
2941 with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_byte_array",
2942 "AddService", "InvalidArgs"):
2943 p2p.AddService(args)
2944
2945 def test_dbus_p2p_discovery(dev, apdev):
2946 """D-Bus P2P discovery"""
2947 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
2948 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2949
2950 addr0 = dev[0].p2p_dev_addr()
2951
2952 dev[1].request("SET sec_device_type 1-0050F204-2")
2953 dev[1].request("VENDOR_ELEM_ADD 1 dd0c0050f2041049000411223344")
2954 dev[1].p2p_listen()
2955 addr1 = dev[1].p2p_dev_addr()
2956 a1 = binascii.unhexlify(addr1.replace(':',''))
2957
2958 wfd_devinfo = "00001c440028"
2959 dev[2].request("SET wifi_display 1")
2960 dev[2].request("WFD_SUBELEM_SET 0 0006" + wfd_devinfo)
2961 wfd = binascii.unhexlify('000006' + wfd_devinfo)
2962 dev[2].p2p_listen()
2963 addr2 = dev[2].p2p_dev_addr()
2964 a2 = binascii.unhexlify(addr2.replace(':',''))
2965
2966 res = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
2967 dbus_interface=dbus.PROPERTIES_IFACE)
2968 if 'Peers' not in res:
2969 raise Exception("GetAll result missing Peers")
2970 if len(res['Peers']) != 0:
2971 raise Exception("Unexpected peer(s) in the list")
2972
2973 args = {'DiscoveryType': 'social',
2974 'RequestedDeviceTypes': [dbus.ByteArray('12345678')],
2975 'Timeout': dbus.Int32(1) }
2976 p2p.Find(dbus.Dictionary(args))
2977 p2p.StopFind()
2978
2979 class TestDbusP2p(TestDbus):
2980 def __init__(self, bus):
2981 TestDbus.__init__(self, bus)
2982 self.found = False
2983 self.found2 = False
2984 self.found_prop = False
2985 self.lost = False
2986 self.find_stopped = False
2987
2988 def __enter__(self):
2989 gobject.timeout_add(1, self.run_test)
2990 gobject.timeout_add(15000, self.timeout)
2991 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
2992 "DeviceFound")
2993 self.add_signal(self.deviceFoundProperties,
2994 WPAS_DBUS_IFACE_P2PDEVICE, "DeviceFoundProperties")
2995 self.add_signal(self.deviceLost, WPAS_DBUS_IFACE_P2PDEVICE,
2996 "DeviceLost")
2997 self.add_signal(self.provisionDiscoveryResponseEnterPin,
2998 WPAS_DBUS_IFACE_P2PDEVICE,
2999 "ProvisionDiscoveryResponseEnterPin")
3000 self.add_signal(self.findStopped, WPAS_DBUS_IFACE_P2PDEVICE,
3001 "FindStopped")
3002 self.loop.run()
3003 return self
3004
3005 def deviceFound(self, path):
3006 logger.debug("deviceFound: path=%s" % path)
3007 res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers",
3008 dbus_interface=dbus.PROPERTIES_IFACE)
3009 if len(res) < 1:
3010 raise Exception("Unexpected number of peers")
3011 if path not in res:
3012 raise Exception("Mismatch in peer object path")
3013 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3014 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3015 dbus_interface=dbus.PROPERTIES_IFACE,
3016 byte_arrays=True)
3017 logger.debug("peer properties: " + str(res))
3018
3019 if res['DeviceAddress'] == a1:
3020 if 'SecondaryDeviceTypes' not in res:
3021 raise Exception("Missing SecondaryDeviceTypes")
3022 sec = res['SecondaryDeviceTypes']
3023 if len(sec) < 1:
3024 raise Exception("Secondary device type missing")
3025 if "\x00\x01\x00\x50\xF2\x04\x00\x02" not in sec:
3026 raise Exception("Secondary device type mismatch")
3027
3028 if 'VendorExtension' not in res:
3029 raise Exception("Missing VendorExtension")
3030 vendor = res['VendorExtension']
3031 if len(vendor) < 1:
3032 raise Exception("Vendor extension missing")
3033 if "\x11\x22\x33\x44" not in vendor:
3034 raise Exception("Secondary device type mismatch")
3035
3036 self.found = True
3037 elif res['DeviceAddress'] == a2:
3038 if 'IEs' not in res:
3039 raise Exception("IEs missing")
3040 if res['IEs'] != wfd:
3041 raise Exception("IEs mismatch")
3042 self.found2 = True
3043 else:
3044 raise Exception("Unexpected peer device address")
3045
3046 if self.found and self.found2:
3047 p2p.StopFind()
3048 p2p.RejectPeer(path)
3049 p2p.ProvisionDiscoveryRequest(path, 'display')
3050
3051 def deviceLost(self, path):
3052 logger.debug("deviceLost: path=%s" % path)
3053 if not self.found or not self.found2:
3054 # This may happen if a previous test case ended up scheduling
3055 # deviceLost event and that event did not get delivered before
3056 # starting the next test execution.
3057 logger.debug("Ignore deviceLost before the deviceFound events")
3058 return
3059 self.lost = True
3060 try:
3061 p2p.RejectPeer(path)
3062 raise Exception("Invalid RejectPeer accepted")
3063 except dbus.exceptions.DBusException, e:
3064 if "UnknownError: Failed to call wpas_p2p_reject" not in str(e):
3065 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
3066 self.loop.quit()
3067
3068 def deviceFoundProperties(self, path, properties):
3069 logger.debug("deviceFoundProperties: path=%s" % path)
3070 logger.debug("peer properties: " + str(properties))
3071 if properties['DeviceAddress'] == a1:
3072 self.found_prop = True
3073
3074 def provisionDiscoveryResponseEnterPin(self, peer_object):
3075 logger.debug("provisionDiscoveryResponseEnterPin - peer=%s" % peer_object)
3076 p2p.Flush()
3077
3078 def findStopped(self):
3079 logger.debug("findStopped")
3080 self.find_stopped = True
3081
3082 def run_test(self, *args):
3083 logger.debug("run_test")
3084 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social',
3085 'Timeout': dbus.Int32(10)}))
3086 return False
3087
3088 def success(self):
3089 return self.found and self.lost and self.found2 and self.find_stopped
3090
3091 with TestDbusP2p(bus) as t:
3092 if not t.success():
3093 raise Exception("Expected signals not seen")
3094
3095 dev[1].request("VENDOR_ELEM_REMOVE 1 *")
3096 dev[1].p2p_stop_find()
3097
3098 p2p.Listen(1)
3099 dev[2].p2p_stop_find()
3100 dev[2].request("P2P_FLUSH")
3101 if not dev[2].discover_peer(addr0):
3102 raise Exception("Peer not found")
3103 p2p.StopFind()
3104 dev[2].p2p_stop_find()
3105
3106 try:
3107 p2p.ExtendedListen(dbus.Dictionary({'foo': 100}))
3108 raise Exception("Invalid ExtendedListen accepted")
3109 except dbus.exceptions.DBusException, e:
3110 if "InvalidArgs" not in str(e):
3111 raise Exception("Unexpected error message for invalid ExtendedListen(): " + str(e))
3112
3113 p2p.ExtendedListen(dbus.Dictionary({'period': 100, 'interval': 1000}))
3114 p2p.ExtendedListen(dbus.Dictionary({}))
3115 dev[0].global_request("P2P_EXT_LISTEN")
3116
3117 def test_dbus_p2p_discovery_freq(dev, apdev):
3118 """D-Bus P2P discovery on a specific non-social channel"""
3119 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3120 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3121
3122 addr1 = dev[1].p2p_dev_addr()
3123 autogo(dev[1], freq=2422)
3124
3125 class TestDbusP2p(TestDbus):
3126 def __init__(self, bus):
3127 TestDbus.__init__(self, bus)
3128 self.found = False
3129
3130 def __enter__(self):
3131 gobject.timeout_add(1, self.run_test)
3132 gobject.timeout_add(5000, self.timeout)
3133 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3134 "DeviceFound")
3135 self.loop.run()
3136 return self
3137
3138 def deviceFound(self, path):
3139 logger.debug("deviceFound: path=%s" % path)
3140 self.found = True
3141 self.loop.quit()
3142
3143 def run_test(self, *args):
3144 logger.debug("run_test")
3145 p2p.Find(dbus.Dictionary({'freq': 2422}))
3146 return False
3147
3148 def success(self):
3149 return self.found
3150
3151 with TestDbusP2p(bus) as t:
3152 if not t.success():
3153 raise Exception("Expected signals not seen")
3154
3155 dev[1].remove_group()
3156 p2p.StopFind()
3157
3158 def test_dbus_p2p_service_discovery(dev, apdev):
3159 """D-Bus P2P service discovery"""
3160 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3161 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3162
3163 addr0 = dev[0].p2p_dev_addr()
3164 addr1 = dev[1].p2p_dev_addr()
3165
3166 bonjour_query = dbus.ByteArray(binascii.unhexlify('0b5f6166706f766572746370c00c000c01'))
3167 bonjour_response = dbus.ByteArray(binascii.unhexlify('074578616d706c65c027'))
3168
3169 args = { 'service_type': 'bonjour',
3170 'query': bonjour_query,
3171 'response': bonjour_response }
3172 p2p.AddService(args)
3173 p2p.FlushService()
3174 p2p.AddService(args)
3175
3176 try:
3177 p2p.DeleteService(args)
3178 raise Exception("Invalid DeleteService() accepted")
3179 except dbus.exceptions.DBusException, e:
3180 if "InvalidArgs" not in str(e):
3181 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
3182
3183 args = { 'service_type': 'bonjour',
3184 'query': bonjour_query }
3185 p2p.DeleteService(args)
3186 try:
3187 p2p.DeleteService(args)
3188 raise Exception("Invalid DeleteService() accepted")
3189 except dbus.exceptions.DBusException, e:
3190 if "InvalidArgs" not in str(e):
3191 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
3192
3193 args = { 'service_type': 'upnp',
3194 'version': 0x10,
3195 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' }
3196 p2p.AddService(args)
3197 p2p.DeleteService(args)
3198 try:
3199 p2p.DeleteService(args)
3200 raise Exception("Invalid DeleteService() accepted")
3201 except dbus.exceptions.DBusException, e:
3202 if "InvalidArgs" not in str(e):
3203 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
3204
3205 tests = [ { 'service_type': 'foo' },
3206 { 'service_type': 'foo', 'query': bonjour_query },
3207 { 'service_type': 'upnp' },
3208 { 'service_type': 'upnp', 'version': 0x10 },
3209 { 'service_type': 'upnp',
3210 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
3211 { 'version': 0x10,
3212 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
3213 { 'service_type': 'upnp', 'foo': 'bar' },
3214 { 'service_type': 'bonjour' },
3215 { 'service_type': 'bonjour', 'query': 'foo' },
3216 { 'service_type': 'bonjour', 'foo': 'bar' } ]
3217 for args in tests:
3218 try:
3219 p2p.DeleteService(args)
3220 raise Exception("Invalid DeleteService() accepted")
3221 except dbus.exceptions.DBusException, e:
3222 if "InvalidArgs" not in str(e):
3223 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
3224
3225 tests = [ { 'service_type': 'foo' },
3226 { 'service_type': 'upnp' },
3227 { 'service_type': 'upnp', 'version': 0x10 },
3228 { 'service_type': 'upnp',
3229 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
3230 { 'version': 0x10,
3231 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
3232 { 'service_type': 'upnp', 'foo': 'bar' },
3233 { 'service_type': 'bonjour' },
3234 { 'service_type': 'bonjour', 'query': 'foo' },
3235 { 'service_type': 'bonjour', 'response': 'foo' },
3236 { 'service_type': 'bonjour', 'query': bonjour_query },
3237 { 'service_type': 'bonjour', 'response': bonjour_response },
3238 { 'service_type': 'bonjour', 'query': dbus.ByteArray(500*'a') },
3239 { 'service_type': 'bonjour', 'foo': 'bar' } ]
3240 for args in tests:
3241 try:
3242 p2p.AddService(args)
3243 raise Exception("Invalid AddService() accepted")
3244 except dbus.exceptions.DBusException, e:
3245 if "InvalidArgs" not in str(e):
3246 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
3247
3248 args = { 'tlv': dbus.ByteArray("\x02\x00\x00\x01") }
3249 ref = p2p.ServiceDiscoveryRequest(args)
3250 p2p.ServiceDiscoveryCancelRequest(ref)
3251 try:
3252 p2p.ServiceDiscoveryCancelRequest(ref)
3253 raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
3254 except dbus.exceptions.DBusException, e:
3255 if "InvalidArgs" not in str(e):
3256 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
3257 try:
3258 p2p.ServiceDiscoveryCancelRequest(dbus.UInt64(0))
3259 raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
3260 except dbus.exceptions.DBusException, e:
3261 if "InvalidArgs" not in str(e):
3262 raise Exception("Unexpected error message for invalid AddService(): " + str(e))
3263
3264 args = { 'service_type': 'upnp',
3265 'version': 0x10,
3266 'service': 'ssdp:foo' }
3267 ref = p2p.ServiceDiscoveryRequest(args)
3268 p2p.ServiceDiscoveryCancelRequest(ref)
3269
3270 tests = [ { 'service_type': 'foo' },
3271 { 'foo': 'bar' },
3272 { 'tlv': 'foo' },
3273 { },
3274 { 'version': 0 },
3275 { 'service_type': 'upnp',
3276 'service': 'ssdp:foo' },
3277 { 'service_type': 'upnp',
3278 'version': 0x10 },
3279 { 'service_type': 'upnp',
3280 'version': 0x10,
3281 'service': 'ssdp:foo',
3282 'peer_object': dbus.ObjectPath(path + "/Peers") },
3283 { 'service_type': 'upnp',
3284 'version': 0x10,
3285 'service': 'ssdp:foo',
3286 'peer_object': path + "/Peers" },
3287 { 'service_type': 'upnp',
3288 'version': 0x10,
3289 'service': 'ssdp:foo',
3290 'peer_object': dbus.ObjectPath(path + "/Peers/00112233445566") } ]
3291 for args in tests:
3292 try:
3293 p2p.ServiceDiscoveryRequest(args)
3294 raise Exception("Invalid ServiceDiscoveryRequest accepted")
3295 except dbus.exceptions.DBusException, e:
3296 if "InvalidArgs" not in str(e):
3297 raise Exception("Unexpected error message for invalid ServiceDiscoveryRequest(): " + str(e))
3298
3299 args = { 'foo': 'bar' }
3300 try:
3301 p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv'))
3302 raise Exception("Invalid ServiceDiscoveryResponse accepted")
3303 except dbus.exceptions.DBusException, e:
3304 if "InvalidArgs" not in str(e):
3305 raise Exception("Unexpected error message for invalid ServiceDiscoveryResponse(): " + str(e))
3306
3307 def test_dbus_p2p_service_discovery_query(dev, apdev):
3308 """D-Bus P2P service discovery query"""
3309 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3310 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3311
3312 addr0 = dev[0].p2p_dev_addr()
3313 dev[1].request("P2P_SERVICE_ADD bonjour 0b5f6166706f766572746370c00c000c01 074578616d706c65c027")
3314 dev[1].p2p_listen()
3315 addr1 = dev[1].p2p_dev_addr()
3316
3317 class TestDbusP2p(TestDbus):
3318 def __init__(self, bus):
3319 TestDbus.__init__(self, bus)
3320 self.done = False
3321
3322 def __enter__(self):
3323 gobject.timeout_add(1, self.run_test)
3324 gobject.timeout_add(15000, self.timeout)
3325 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3326 "DeviceFound")
3327 self.add_signal(self.serviceDiscoveryResponse,
3328 WPAS_DBUS_IFACE_P2PDEVICE,
3329 "ServiceDiscoveryResponse", byte_arrays=True)
3330 self.loop.run()
3331 return self
3332
3333 def deviceFound(self, path):
3334 logger.debug("deviceFound: path=%s" % path)
3335 args = { 'peer_object': path,
3336 'tlv': dbus.ByteArray("\x02\x00\x00\x01") }
3337 p2p.ServiceDiscoveryRequest(args)
3338
3339 def serviceDiscoveryResponse(self, sd_request):
3340 logger.debug("serviceDiscoveryResponse: sd_request=%s" % str(sd_request))
3341 self.done = True
3342 self.loop.quit()
3343
3344 def run_test(self, *args):
3345 logger.debug("run_test")
3346 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social',
3347 'Timeout': dbus.Int32(10)}))
3348 return False
3349
3350 def success(self):
3351 return self.done
3352
3353 with TestDbusP2p(bus) as t:
3354 if not t.success():
3355 raise Exception("Expected signals not seen")
3356
3357 dev[1].p2p_stop_find()
3358
3359 def test_dbus_p2p_service_discovery_external(dev, apdev):
3360 """D-Bus P2P service discovery with external response"""
3361 try:
3362 _test_dbus_p2p_service_discovery_external(dev, apdev)
3363 finally:
3364 dev[0].request("P2P_SERV_DISC_EXTERNAL 0")
3365
3366 def _test_dbus_p2p_service_discovery_external(dev, apdev):
3367 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3368 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3369
3370 addr0 = dev[0].p2p_dev_addr()
3371 addr1 = dev[1].p2p_dev_addr()
3372 resp = "0300000101"
3373
3374 dev[1].request("P2P_FLUSH")
3375 dev[1].request("P2P_SERV_DISC_REQ " + addr0 + " 02000001")
3376 dev[1].p2p_find(social=True)
3377
3378 class TestDbusP2p(TestDbus):
3379 def __init__(self, bus):
3380 TestDbus.__init__(self, bus)
3381 self.sd = False
3382
3383 def __enter__(self):
3384 gobject.timeout_add(1, self.run_test)
3385 gobject.timeout_add(15000, self.timeout)
3386 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3387 "DeviceFound")
3388 self.add_signal(self.serviceDiscoveryRequest,
3389 WPAS_DBUS_IFACE_P2PDEVICE,
3390 "ServiceDiscoveryRequest")
3391 self.loop.run()
3392 return self
3393
3394 def deviceFound(self, path):
3395 logger.debug("deviceFound: path=%s" % path)
3396
3397 def serviceDiscoveryRequest(self, sd_request):
3398 logger.debug("serviceDiscoveryRequest: sd_request=%s" % str(sd_request))
3399 self.sd = True
3400 args = { 'peer_object': sd_request['peer_object'],
3401 'frequency': sd_request['frequency'],
3402 'dialog_token': sd_request['dialog_token'],
3403 'tlvs': dbus.ByteArray(binascii.unhexlify(resp)) }
3404 p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv'))
3405 self.loop.quit()
3406
3407 def run_test(self, *args):
3408 logger.debug("run_test")
3409 p2p.ServiceDiscoveryExternal(1)
3410 p2p.ServiceUpdate()
3411 p2p.Listen(15)
3412 return False
3413
3414 def success(self):
3415 return self.sd
3416
3417 with TestDbusP2p(bus) as t:
3418 if not t.success():
3419 raise Exception("Expected signals not seen")
3420
3421 ev = dev[1].wait_global_event(["P2P-SERV-DISC-RESP"], timeout=5)
3422 if ev is None:
3423 raise Exception("Service discovery timed out")
3424 if addr0 not in ev:
3425 raise Exception("Unexpected address in SD Response: " + ev)
3426 if ev.split(' ')[4] != resp:
3427 raise Exception("Unexpected response data SD Response: " + ev)
3428 dev[1].p2p_stop_find()
3429
3430 p2p.StopFind()
3431 p2p.ServiceDiscoveryExternal(0)
3432
3433 def test_dbus_p2p_autogo(dev, apdev):
3434 """D-Bus P2P autonomous GO"""
3435 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3436 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3437
3438 addr0 = dev[0].p2p_dev_addr()
3439
3440 class TestDbusP2p(TestDbus):
3441 def __init__(self, bus):
3442 TestDbus.__init__(self, bus)
3443 self.first = True
3444 self.waiting_end = False
3445 self.exceptions = False
3446 self.deauthorized = False
3447 self.done = False
3448
3449 def __enter__(self):
3450 gobject.timeout_add(1, self.run_test)
3451 gobject.timeout_add(15000, self.timeout)
3452 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3453 "DeviceFound")
3454 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3455 "GroupStarted")
3456 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3457 "GroupFinished")
3458 self.add_signal(self.persistentGroupAdded,
3459 WPAS_DBUS_IFACE_P2PDEVICE,
3460 "PersistentGroupAdded")
3461 self.add_signal(self.persistentGroupRemoved,
3462 WPAS_DBUS_IFACE_P2PDEVICE,
3463 "PersistentGroupRemoved")
3464 self.add_signal(self.provisionDiscoveryRequestDisplayPin,
3465 WPAS_DBUS_IFACE_P2PDEVICE,
3466 "ProvisionDiscoveryRequestDisplayPin")
3467 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3468 "StaAuthorized")
3469 self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE,
3470 "StaDeauthorized")
3471 self.loop.run()
3472 return self
3473
3474 def groupStarted(self, properties):
3475 logger.debug("groupStarted: " + str(properties))
3476 self.group = properties['group_object']
3477 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3478 properties['interface_object'])
3479 role = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
3480 dbus_interface=dbus.PROPERTIES_IFACE)
3481 if role != "GO":
3482 self.exceptions = True
3483 raise Exception("Unexpected role reported: " + role)
3484 group = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
3485 dbus_interface=dbus.PROPERTIES_IFACE)
3486 if group != properties['group_object']:
3487 self.exceptions = True
3488 raise Exception("Unexpected Group reported: " + str(group))
3489 go = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
3490 dbus_interface=dbus.PROPERTIES_IFACE)
3491 if go != '/':
3492 self.exceptions = True
3493 raise Exception("Unexpected PeerGO value: " + str(go))
3494 if self.first:
3495 self.first = False
3496 logger.info("Remove persistent group instance")
3497 group_p2p = dbus.Interface(self.g_if_obj,
3498 WPAS_DBUS_IFACE_P2PDEVICE)
3499 group_p2p.Disconnect()
3500 else:
3501 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3502 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join")
3503
3504 def groupFinished(self, properties):
3505 logger.debug("groupFinished: " + str(properties))
3506 if self.waiting_end:
3507 logger.info("Remove persistent group")
3508 p2p.RemovePersistentGroup(self.persistent)
3509 else:
3510 logger.info("Re-start persistent group")
3511 params = dbus.Dictionary({'persistent_group_object':
3512 self.persistent,
3513 'frequency': 2412})
3514 p2p.GroupAdd(params)
3515
3516 def persistentGroupAdded(self, path, properties):
3517 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
3518 self.persistent = path
3519
3520 def persistentGroupRemoved(self, path):
3521 logger.debug("persistentGroupRemoved: %s" % path)
3522 self.done = True
3523 self.loop.quit()
3524
3525 def deviceFound(self, path):
3526 logger.debug("deviceFound: path=%s" % path)
3527 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3528 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3529 dbus_interface=dbus.PROPERTIES_IFACE,
3530 byte_arrays=True)
3531 logger.debug('peer properties: ' + str(self.peer))
3532
3533 def provisionDiscoveryRequestDisplayPin(self, peer_object, pin):
3534 logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin))
3535 self.peer_path = peer_object
3536 peer = binascii.unhexlify(peer_object.split('/')[-1])
3537 addr = ""
3538 for p in peer:
3539 if len(addr) > 0:
3540 addr += ':'
3541 addr += '%02x' % ord(p)
3542
3543 params = { 'Role': 'registrar',
3544 'P2PDeviceAddress': self.peer['DeviceAddress'],
3545 'Bssid': self.peer['DeviceAddress'],
3546 'Type': 'pin' }
3547 wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
3548 try:
3549 wps.Start(params)
3550 self.exceptions = True
3551 raise Exception("Invalid WPS.Start() accepted")
3552 except dbus.exceptions.DBusException, e:
3553 if "InvalidArgs" not in str(e):
3554 self.exceptions = True
3555 raise Exception("Unexpected error message: " + str(e))
3556 params = { 'Role': 'registrar',
3557 'P2PDeviceAddress': self.peer['DeviceAddress'],
3558 'Type': 'pin',
3559 'Pin': '12345670' }
3560 logger.info("Authorize peer to connect to the group")
3561 wps.Start(params)
3562
3563 def staAuthorized(self, name):
3564 logger.debug("staAuthorized: " + name)
3565 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, self.peer_path)
3566 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3567 dbus_interface=dbus.PROPERTIES_IFACE,
3568 byte_arrays=True)
3569 logger.debug("Peer properties: " + str(res))
3570 if 'Groups' not in res or len(res['Groups']) != 1:
3571 self.exceptions = True
3572 raise Exception("Unexpected number of peer Groups entries")
3573 if res['Groups'][0] != self.group:
3574 self.exceptions = True
3575 raise Exception("Unexpected peer Groups[0] value")
3576
3577 g_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group)
3578 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3579 dbus_interface=dbus.PROPERTIES_IFACE,
3580 byte_arrays=True)
3581 logger.debug("Group properties: " + str(res))
3582 if 'Members' not in res or len(res['Members']) != 1:
3583 self.exceptions = True
3584 raise Exception("Unexpected number of group members")
3585
3586 ext = dbus.ByteArray("\x11\x22\x33\x44")
3587 # Earlier implementation of this interface was a bit strange. The
3588 # property is defined to have aay signature and that is what the
3589 # getter returned. However, the setter expected there to be a
3590 # dictionary with 'WPSVendorExtensions' as the key surrounding these
3591 # values.. The current implementations maintains support for that
3592 # for backwards compability reasons. Verify that encoding first.
3593 vals = dbus.Dictionary({ 'WPSVendorExtensions': [ ext ]},
3594 signature='sv')
3595 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3596 dbus_interface=dbus.PROPERTIES_IFACE)
3597 res = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions',
3598 dbus_interface=dbus.PROPERTIES_IFACE,
3599 byte_arrays=True)
3600 if len(res) != 1:
3601 self.exceptions = True
3602 raise Exception("Unexpected number of vendor extensions")
3603 if res[0] != ext:
3604 self.exceptions = True
3605 raise Exception("Vendor extension value changed")
3606
3607 # And now verify that the more appropriate encoding is accepted as
3608 # well.
3609 res.append(dbus.ByteArray('\xaa\xbb\xcc\xdd\xee\xff'))
3610 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3611 dbus_interface=dbus.PROPERTIES_IFACE)
3612 res2 = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions',
3613 dbus_interface=dbus.PROPERTIES_IFACE,
3614 byte_arrays=True)
3615 if len(res) != 2:
3616 self.exceptions = True
3617 raise Exception("Unexpected number of vendor extensions")
3618 if res[0] != res2[0] or res[1] != res2[1]:
3619 self.exceptions = True
3620 raise Exception("Vendor extension value changed")
3621
3622 for i in range(10):
3623 res.append(dbus.ByteArray('\xaa\xbb'))
3624 try:
3625 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3626 dbus_interface=dbus.PROPERTIES_IFACE)
3627 self.exceptions = True
3628 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3629 except dbus.exceptions.DBusException, e:
3630 if "Error.Failed" not in str(e):
3631 self.exceptions = True
3632 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3633
3634 vals = dbus.Dictionary({ 'Foo': [ ext ]}, signature='sv')
3635 try:
3636 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3637 dbus_interface=dbus.PROPERTIES_IFACE)
3638 self.exceptions = True
3639 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3640 except dbus.exceptions.DBusException, e:
3641 if "InvalidArgs" not in str(e):
3642 self.exceptions = True
3643 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3644
3645 vals = [ "foo" ]
3646 try:
3647 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3648 dbus_interface=dbus.PROPERTIES_IFACE)
3649 self.exceptions = True
3650 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3651 except dbus.exceptions.DBusException, e:
3652 if "Error.Failed" not in str(e):
3653 self.exceptions = True
3654 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3655
3656 vals = [ [ "foo" ] ]
3657 try:
3658 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3659 dbus_interface=dbus.PROPERTIES_IFACE)
3660 self.exceptions = True
3661 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3662 except dbus.exceptions.DBusException, e:
3663 if "Error.Failed" not in str(e):
3664 self.exceptions = True
3665 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3666
3667 p2p.RemoveClient({ 'peer': self.peer_path })
3668
3669 self.waiting_end = True
3670 group_p2p = dbus.Interface(self.g_if_obj,
3671 WPAS_DBUS_IFACE_P2PDEVICE)
3672 group_p2p.Disconnect()
3673
3674 def staDeauthorized(self, name):
3675 logger.debug("staDeauthorized: " + name)
3676 self.deauthorized = True
3677
3678 def run_test(self, *args):
3679 logger.debug("run_test")
3680 params = dbus.Dictionary({'persistent': True,
3681 'frequency': 2412})
3682 logger.info("Add a persistent group")
3683 p2p.GroupAdd(params)
3684 return False
3685
3686 def success(self):
3687 return self.done and self.deauthorized and not self.exceptions
3688
3689 with TestDbusP2p(bus) as t:
3690 if not t.success():
3691 raise Exception("Expected signals not seen")
3692
3693 dev[1].wait_go_ending_session()
3694
3695 def test_dbus_p2p_autogo_pbc(dev, apdev):
3696 """D-Bus P2P autonomous GO and PBC"""
3697 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3698 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3699
3700 addr0 = dev[0].p2p_dev_addr()
3701
3702 class TestDbusP2p(TestDbus):
3703 def __init__(self, bus):
3704 TestDbus.__init__(self, bus)
3705 self.first = True
3706 self.waiting_end = False
3707 self.done = False
3708
3709 def __enter__(self):
3710 gobject.timeout_add(1, self.run_test)
3711 gobject.timeout_add(15000, self.timeout)
3712 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3713 "DeviceFound")
3714 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3715 "GroupStarted")
3716 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3717 "GroupFinished")
3718 self.add_signal(self.provisionDiscoveryPBCRequest,
3719 WPAS_DBUS_IFACE_P2PDEVICE,
3720 "ProvisionDiscoveryPBCRequest")
3721 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3722 "StaAuthorized")
3723 self.loop.run()
3724 return self
3725
3726 def groupStarted(self, properties):
3727 logger.debug("groupStarted: " + str(properties))
3728 self.group = properties['group_object']
3729 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3730 properties['interface_object'])
3731 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3732 dev1.global_request("P2P_CONNECT " + addr0 + " pbc join")
3733
3734 def groupFinished(self, properties):
3735 logger.debug("groupFinished: " + str(properties))
3736 self.done = True
3737 self.loop.quit()
3738
3739 def deviceFound(self, path):
3740 logger.debug("deviceFound: path=%s" % path)
3741 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3742 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3743 dbus_interface=dbus.PROPERTIES_IFACE,
3744 byte_arrays=True)
3745 logger.debug('peer properties: ' + str(self.peer))
3746
3747 def provisionDiscoveryPBCRequest(self, peer_object):
3748 logger.debug("provisionDiscoveryPBCRequest - peer=%s" % peer_object)
3749 self.peer_path = peer_object
3750 peer = binascii.unhexlify(peer_object.split('/')[-1])
3751 addr = ""
3752 for p in peer:
3753 if len(addr) > 0:
3754 addr += ':'
3755 addr += '%02x' % ord(p)
3756 params = { 'Role': 'registrar',
3757 'P2PDeviceAddress': self.peer['DeviceAddress'],
3758 'Type': 'pbc' }
3759 logger.info("Authorize peer to connect to the group")
3760 wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
3761 wps.Start(params)
3762
3763 def staAuthorized(self, name):
3764 logger.debug("staAuthorized: " + name)
3765 group_p2p = dbus.Interface(self.g_if_obj,
3766 WPAS_DBUS_IFACE_P2PDEVICE)
3767 group_p2p.Disconnect()
3768
3769 def run_test(self, *args):
3770 logger.debug("run_test")
3771 params = dbus.Dictionary({'frequency': 2412})
3772 p2p.GroupAdd(params)
3773 return False
3774
3775 def success(self):
3776 return self.done
3777
3778 with TestDbusP2p(bus) as t:
3779 if not t.success():
3780 raise Exception("Expected signals not seen")
3781
3782 dev[1].wait_go_ending_session()
3783 dev[1].flush_scan_cache()
3784
3785 def test_dbus_p2p_autogo_legacy(dev, apdev):
3786 """D-Bus P2P autonomous GO and legacy STA"""
3787 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3788 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3789
3790 addr0 = dev[0].p2p_dev_addr()
3791
3792 class TestDbusP2p(TestDbus):
3793 def __init__(self, bus):
3794 TestDbus.__init__(self, bus)
3795 self.done = False
3796
3797 def __enter__(self):
3798 gobject.timeout_add(1, self.run_test)
3799 gobject.timeout_add(15000, self.timeout)
3800 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3801 "GroupStarted")
3802 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3803 "GroupFinished")
3804 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3805 "StaAuthorized")
3806 self.loop.run()
3807 return self
3808
3809 def groupStarted(self, properties):
3810 logger.debug("groupStarted: " + str(properties))
3811 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
3812 properties['group_object'])
3813 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3814 dbus_interface=dbus.PROPERTIES_IFACE,
3815 byte_arrays=True)
3816 bssid = ':'.join([binascii.hexlify(l) for l in res['BSSID']])
3817
3818 pin = '12345670'
3819 params = { 'Role': 'enrollee',
3820 'Type': 'pin',
3821 'Pin': pin }
3822 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3823 properties['interface_object'])
3824 wps = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_WPS)
3825 wps.Start(params)
3826 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3827 dev1.scan_for_bss(bssid, freq=2412)
3828 dev1.request("WPS_PIN " + bssid + " " + pin)
3829 self.group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3830
3831 def groupFinished(self, properties):
3832 logger.debug("groupFinished: " + str(properties))
3833 self.done = True
3834 self.loop.quit()
3835
3836 def staAuthorized(self, name):
3837 logger.debug("staAuthorized: " + name)
3838 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3839 dev1.request("DISCONNECT")
3840 self.group_p2p.Disconnect()
3841
3842 def run_test(self, *args):
3843 logger.debug("run_test")
3844 params = dbus.Dictionary({'frequency': 2412})
3845 p2p.GroupAdd(params)
3846 return False
3847
3848 def success(self):
3849 return self.done
3850
3851 with TestDbusP2p(bus) as t:
3852 if not t.success():
3853 raise Exception("Expected signals not seen")
3854
3855 def test_dbus_p2p_join(dev, apdev):
3856 """D-Bus P2P join an autonomous GO"""
3857 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3858 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3859
3860 addr1 = dev[1].p2p_dev_addr()
3861 addr2 = dev[2].p2p_dev_addr()
3862 dev[1].p2p_start_go(freq=2412)
3863 dev1_group_ifname = dev[1].group_ifname
3864 dev[2].p2p_listen()
3865
3866 class TestDbusP2p(TestDbus):
3867 def __init__(self, bus):
3868 TestDbus.__init__(self, bus)
3869 self.done = False
3870 self.peer = None
3871 self.go = None
3872
3873 def __enter__(self):
3874 gobject.timeout_add(1, self.run_test)
3875 gobject.timeout_add(15000, self.timeout)
3876 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3877 "DeviceFound")
3878 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3879 "GroupStarted")
3880 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3881 "GroupFinished")
3882 self.add_signal(self.invitationResult, WPAS_DBUS_IFACE_P2PDEVICE,
3883 "InvitationResult")
3884 self.loop.run()
3885 return self
3886
3887 def deviceFound(self, path):
3888 logger.debug("deviceFound: path=%s" % path)
3889 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3890 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3891 dbus_interface=dbus.PROPERTIES_IFACE,
3892 byte_arrays=True)
3893 logger.debug('peer properties: ' + str(res))
3894 if addr2.replace(':','') in path:
3895 self.peer = path
3896 elif addr1.replace(':','') in path:
3897 self.go = path
3898 if self.peer and self.go:
3899 logger.info("Join the group")
3900 p2p.StopFind()
3901 args = { 'peer': self.go,
3902 'join': True,
3903 'wps_method': 'pin',
3904 'frequency': 2412 }
3905 pin = p2p.Connect(args)
3906
3907 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3908 dev1.group_ifname = dev1_group_ifname
3909 dev1.group_request("WPS_PIN any " + pin)
3910
3911 def groupStarted(self, properties):
3912 logger.debug("groupStarted: " + str(properties))
3913 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3914 properties['interface_object'])
3915 role = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
3916 dbus_interface=dbus.PROPERTIES_IFACE)
3917 if role != "client":
3918 raise Exception("Unexpected role reported: " + role)
3919 group = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
3920 dbus_interface=dbus.PROPERTIES_IFACE)
3921 if group != properties['group_object']:
3922 raise Exception("Unexpected Group reported: " + str(group))
3923 go = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
3924 dbus_interface=dbus.PROPERTIES_IFACE)
3925 if go != self.go:
3926 raise Exception("Unexpected PeerGO value: " + str(go))
3927
3928 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
3929 properties['group_object'])
3930 res = g_obj.GetAll(WPAS_DBUS_GROUP,
3931 dbus_interface=dbus.PROPERTIES_IFACE,
3932 byte_arrays=True)
3933 logger.debug("Group properties: " + str(res))
3934
3935 ext = dbus.ByteArray("\x11\x22\x33\x44")
3936 try:
3937 # Set(WPSVendorExtensions) not allowed for P2P Client
3938 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3939 dbus_interface=dbus.PROPERTIES_IFACE)
3940 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3941 except dbus.exceptions.DBusException, e:
3942 if "Error.Failed: Failed to set property" not in str(e):
3943 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3944
3945 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3946 args = { 'duration1': 30000, 'interval1': 102400,
3947 'duration2': 20000, 'interval2': 102400 }
3948 group_p2p.PresenceRequest(args)
3949
3950 args = { 'peer': self.peer }
3951 group_p2p.Invite(args)
3952
3953 def groupFinished(self, properties):
3954 logger.debug("groupFinished: " + str(properties))
3955 self.done = True
3956 self.loop.quit()
3957
3958 def invitationResult(self, result):
3959 logger.debug("invitationResult: " + str(result))
3960 if result['status'] != 1:
3961 raise Exception("Unexpected invitation result: " + str(result))
3962 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3963 dev1.group_ifname = dev1_group_ifname
3964 dev1.remove_group()
3965
3966 def run_test(self, *args):
3967 logger.debug("run_test")
3968 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
3969 return False
3970
3971 def success(self):
3972 return self.done
3973
3974 with TestDbusP2p(bus) as t:
3975 if not t.success():
3976 raise Exception("Expected signals not seen")
3977
3978 dev[2].p2p_stop_find()
3979
3980 def test_dbus_p2p_invitation_received(dev, apdev):
3981 """D-Bus P2P and InvitationReceived"""
3982 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
3983 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3984
3985 form(dev[0], dev[1])
3986 addr0 = dev[0].p2p_dev_addr()
3987 dev[0].p2p_listen()
3988 dev[0].global_request("SET persistent_reconnect 0")
3989
3990 if not dev[1].discover_peer(addr0, social=True):
3991 raise Exception("Peer " + addr0 + " not found")
3992 peer = dev[1].get_peer(addr0)
3993
3994 class TestDbusP2p(TestDbus):
3995 def __init__(self, bus):
3996 TestDbus.__init__(self, bus)
3997 self.done = False
3998
3999 def __enter__(self):
4000 gobject.timeout_add(1, self.run_test)
4001 gobject.timeout_add(15000, self.timeout)
4002 self.add_signal(self.invitationReceived, WPAS_DBUS_IFACE_P2PDEVICE,
4003 "InvitationReceived")
4004 self.loop.run()
4005 return self
4006
4007 def invitationReceived(self, result):
4008 logger.debug("invitationReceived: " + str(result))
4009 self.done = True
4010 self.loop.quit()
4011
4012 def run_test(self, *args):
4013 logger.debug("run_test")
4014 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4015 cmd = "P2P_INVITE persistent=" + peer['persistent'] + " peer=" + addr0
4016 dev1.global_request(cmd)
4017 return False
4018
4019 def success(self):
4020 return self.done
4021
4022 with TestDbusP2p(bus) as t:
4023 if not t.success():
4024 raise Exception("Expected signals not seen")
4025
4026 dev[0].p2p_stop_find()
4027 dev[1].p2p_stop_find()
4028
4029 def test_dbus_p2p_config(dev, apdev):
4030 """D-Bus Get/Set P2PDeviceConfig"""
4031 try:
4032 _test_dbus_p2p_config(dev, apdev)
4033 finally:
4034 dev[0].request("P2P_SET ssid_postfix ")
4035
4036 def _test_dbus_p2p_config(dev, apdev):
4037 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4038 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4039
4040 res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4041 dbus_interface=dbus.PROPERTIES_IFACE,
4042 byte_arrays=True)
4043 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", res,
4044 dbus_interface=dbus.PROPERTIES_IFACE)
4045 res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4046 dbus_interface=dbus.PROPERTIES_IFACE,
4047 byte_arrays=True)
4048
4049 if len(res) != len(res2):
4050 raise Exception("Different number of parameters")
4051 for k in res:
4052 if res[k] != res2[k]:
4053 raise Exception("Parameter %s value changes" % k)
4054
4055 changes = { 'SsidPostfix': 'foo',
4056 'VendorExtension': [ dbus.ByteArray('\x11\x22\x33\x44') ],
4057 'SecondaryDeviceTypes': [ dbus.ByteArray('\x11\x22\x33\x44\x55\x66\x77\x88') ]}
4058 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4059 dbus.Dictionary(changes, signature='sv'),
4060 dbus_interface=dbus.PROPERTIES_IFACE)
4061
4062 res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4063 dbus_interface=dbus.PROPERTIES_IFACE,
4064 byte_arrays=True)
4065 logger.debug("P2PDeviceConfig: " + str(res2))
4066 if 'VendorExtension' not in res2 or len(res2['VendorExtension']) != 1:
4067 raise Exception("VendorExtension does not match")
4068 if 'SecondaryDeviceTypes' not in res2 or len(res2['SecondaryDeviceTypes']) != 1:
4069 raise Exception("SecondaryDeviceType does not match")
4070
4071 changes = { 'SsidPostfix': '',
4072 'VendorExtension': dbus.Array([], signature="ay"),
4073 'SecondaryDeviceTypes': dbus.Array([], signature="ay") }
4074 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4075 dbus.Dictionary(changes, signature='sv'),
4076 dbus_interface=dbus.PROPERTIES_IFACE)
4077
4078 res3 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4079 dbus_interface=dbus.PROPERTIES_IFACE,
4080 byte_arrays=True)
4081 logger.debug("P2PDeviceConfig: " + str(res3))
4082 if 'VendorExtension' in res3:
4083 raise Exception("VendorExtension not removed")
4084 if 'SecondaryDeviceTypes' in res3:
4085 raise Exception("SecondaryDeviceType not removed")
4086
4087 try:
4088 dev[0].request("P2P_SET disabled 1")
4089 if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4090 dbus_interface=dbus.PROPERTIES_IFACE,
4091 byte_arrays=True)
4092 raise Exception("Invalid Get(P2PDeviceConfig) accepted")
4093 except dbus.exceptions.DBusException, e:
4094 if "Error.Failed: P2P is not available for this interface" not in str(e):
4095 raise Exception("Unexpected error message for invalid Invite: " + str(e))
4096 finally:
4097 dev[0].request("P2P_SET disabled 0")
4098
4099 try:
4100 dev[0].request("P2P_SET disabled 1")
4101 changes = { 'SsidPostfix': 'foo' }
4102 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4103 dbus.Dictionary(changes, signature='sv'),
4104 dbus_interface=dbus.PROPERTIES_IFACE)
4105 raise Exception("Invalid Set(P2PDeviceConfig) accepted")
4106 except dbus.exceptions.DBusException, e:
4107 if "Error.Failed: P2P is not available for this interface" not in str(e):
4108 raise Exception("Unexpected error message for invalid Invite: " + str(e))
4109 finally:
4110 dev[0].request("P2P_SET disabled 0")
4111
4112 tests = [ { 'DeviceName': 123 },
4113 { 'SsidPostfix': 123 },
4114 { 'Foo': 'Bar' } ]
4115 for changes in tests:
4116 try:
4117 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4118 dbus.Dictionary(changes, signature='sv'),
4119 dbus_interface=dbus.PROPERTIES_IFACE)
4120 raise Exception("Invalid Set(P2PDeviceConfig) accepted")
4121 except dbus.exceptions.DBusException, e:
4122 if "InvalidArgs" not in str(e):
4123 raise Exception("Unexpected error message for invalid Invite: " + str(e))
4124
4125 def test_dbus_p2p_persistent(dev, apdev):
4126 """D-Bus P2P persistent group"""
4127 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4128 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4129
4130 class TestDbusP2p(TestDbus):
4131 def __init__(self, bus):
4132 TestDbus.__init__(self, bus)
4133
4134 def __enter__(self):
4135 gobject.timeout_add(1, self.run_test)
4136 gobject.timeout_add(15000, self.timeout)
4137 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4138 "GroupStarted")
4139 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4140 "GroupFinished")
4141 self.add_signal(self.persistentGroupAdded,
4142 WPAS_DBUS_IFACE_P2PDEVICE,
4143 "PersistentGroupAdded")
4144 self.loop.run()
4145 return self
4146
4147 def groupStarted(self, properties):
4148 logger.debug("groupStarted: " + str(properties))
4149 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4150 properties['interface_object'])
4151 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4152 group_p2p.Disconnect()
4153
4154 def groupFinished(self, properties):
4155 logger.debug("groupFinished: " + str(properties))
4156 self.loop.quit()
4157
4158 def persistentGroupAdded(self, path, properties):
4159 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
4160 self.persistent = path
4161
4162 def run_test(self, *args):
4163 logger.debug("run_test")
4164 params = dbus.Dictionary({'persistent': True,
4165 'frequency': 2412})
4166 logger.info("Add a persistent group")
4167 p2p.GroupAdd(params)
4168 return False
4169
4170 def success(self):
4171 return True
4172
4173 with TestDbusP2p(bus) as t:
4174 if not t.success():
4175 raise Exception("Expected signals not seen")
4176 persistent = t.persistent
4177
4178 p_obj = bus.get_object(WPAS_DBUS_SERVICE, persistent)
4179 res = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties",
4180 dbus_interface=dbus.PROPERTIES_IFACE, byte_arrays=True)
4181 logger.info("Persistent group Properties: " + str(res))
4182 vals = dbus.Dictionary({ 'ssid': 'DIRECT-foo' }, signature='sv')
4183 p_obj.Set(WPAS_DBUS_PERSISTENT_GROUP, "Properties", vals,
4184 dbus_interface=dbus.PROPERTIES_IFACE)
4185 res2 = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties",
4186 dbus_interface=dbus.PROPERTIES_IFACE)
4187 if len(res) != len(res2):
4188 raise Exception("Different number of parameters")
4189 for k in res:
4190 if k != 'ssid' and res[k] != res2[k]:
4191 raise Exception("Parameter %s value changes" % k)
4192 if res2['ssid'] != '"DIRECT-foo"':
4193 raise Exception("Unexpected ssid")
4194
4195 args = dbus.Dictionary({ 'ssid': 'DIRECT-testing',
4196 'psk': '1234567890' }, signature='sv')
4197 group = p2p.AddPersistentGroup(args)
4198
4199 groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups",
4200 dbus_interface=dbus.PROPERTIES_IFACE)
4201 if len(groups) != 2:
4202 raise Exception("Unexpected number of persistent groups: " + str(groups))
4203
4204 p2p.RemoveAllPersistentGroups()
4205
4206 groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups",
4207 dbus_interface=dbus.PROPERTIES_IFACE)
4208 if len(groups) != 0:
4209 raise Exception("Unexpected number of persistent groups: " + str(groups))
4210
4211 try:
4212 p2p.RemovePersistentGroup(persistent)
4213 raise Exception("Invalid RemovePersistentGroup accepted")
4214 except dbus.exceptions.DBusException, e:
4215 if "NetworkUnknown: There is no such persistent group" not in str(e):
4216 raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e))
4217
4218 def test_dbus_p2p_reinvoke_persistent(dev, apdev):
4219 """D-Bus P2P reinvoke persistent group"""
4220 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4221 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4222
4223 addr0 = dev[0].p2p_dev_addr()
4224
4225 class TestDbusP2p(TestDbus):
4226 def __init__(self, bus):
4227 TestDbus.__init__(self, bus)
4228 self.first = True
4229 self.waiting_end = False
4230 self.done = False
4231 self.invited = False
4232
4233 def __enter__(self):
4234 gobject.timeout_add(1, self.run_test)
4235 gobject.timeout_add(15000, self.timeout)
4236 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4237 "DeviceFound")
4238 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4239 "GroupStarted")
4240 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4241 "GroupFinished")
4242 self.add_signal(self.persistentGroupAdded,
4243 WPAS_DBUS_IFACE_P2PDEVICE,
4244 "PersistentGroupAdded")
4245 self.add_signal(self.provisionDiscoveryRequestDisplayPin,
4246 WPAS_DBUS_IFACE_P2PDEVICE,
4247 "ProvisionDiscoveryRequestDisplayPin")
4248 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
4249 "StaAuthorized")
4250 self.loop.run()
4251 return self
4252
4253 def groupStarted(self, properties):
4254 logger.debug("groupStarted: " + str(properties))
4255 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4256 properties['interface_object'])
4257 if not self.invited:
4258 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
4259 properties['group_object'])
4260 res = g_obj.GetAll(WPAS_DBUS_GROUP,
4261 dbus_interface=dbus.PROPERTIES_IFACE,
4262 byte_arrays=True)
4263 bssid = ':'.join([binascii.hexlify(l) for l in res['BSSID']])
4264 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4265 dev1.scan_for_bss(bssid, freq=2412)
4266 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join")
4267
4268 def groupFinished(self, properties):
4269 logger.debug("groupFinished: " + str(properties))
4270 if self.invited:
4271 self.done = True
4272 self.loop.quit()
4273 else:
4274 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4275 dev1.global_request("SET persistent_reconnect 1")
4276 dev1.p2p_listen()
4277
4278 args = { 'persistent_group_object': dbus.ObjectPath(path),
4279 'peer': self.peer_path }
4280 try:
4281 pin = p2p.Invite(args)
4282 raise Exception("Invalid Invite accepted")
4283 except dbus.exceptions.DBusException, e:
4284 if "InvalidArgs" not in str(e):
4285 raise Exception("Unexpected error message for invalid Invite: " + str(e))
4286
4287 args = { 'persistent_group_object': self.persistent,
4288 'peer': self.peer_path }
4289 pin = p2p.Invite(args)
4290 self.invited = True
4291
4292 self.sta_group_ev = dev1.wait_global_event(["P2P-GROUP-STARTED"],
4293 timeout=15)
4294 if self.sta_group_ev is None:
4295 raise Exception("P2P-GROUP-STARTED event not seen")
4296
4297 def persistentGroupAdded(self, path, properties):
4298 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
4299 self.persistent = path
4300
4301 def deviceFound(self, path):
4302 logger.debug("deviceFound: path=%s" % path)
4303 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
4304 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
4305 dbus_interface=dbus.PROPERTIES_IFACE,
4306 byte_arrays=True)
4307
4308 def provisionDiscoveryRequestDisplayPin(self, peer_object, pin):
4309 logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin))
4310 self.peer_path = peer_object
4311 peer = binascii.unhexlify(peer_object.split('/')[-1])
4312 addr = ""
4313 for p in peer:
4314 if len(addr) > 0:
4315 addr += ':'
4316 addr += '%02x' % ord(p)
4317 params = { 'Role': 'registrar',
4318 'P2PDeviceAddress': self.peer['DeviceAddress'],
4319 'Bssid': self.peer['DeviceAddress'],
4320 'Type': 'pin',
4321 'Pin': '12345670' }
4322 logger.info("Authorize peer to connect to the group")
4323 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4324 wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
4325 wps.Start(params)
4326 self.sta_group_ev = dev1.wait_global_event(["P2P-GROUP-STARTED"],
4327 timeout=15)
4328 if self.sta_group_ev is None:
4329 raise Exception("P2P-GROUP-STARTED event not seen")
4330
4331 def staAuthorized(self, name):
4332 logger.debug("staAuthorized: " + name)
4333 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4334 dev1.group_form_result(self.sta_group_ev)
4335 dev1.remove_group()
4336 ev = dev1.wait_global_event(["P2P-GROUP-REMOVED"], timeout=10)
4337 if ev is None:
4338 raise Exception("Group removal timed out")
4339 group_p2p = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4340 group_p2p.Disconnect()
4341
4342 def run_test(self, *args):
4343 logger.debug("run_test")
4344 params = dbus.Dictionary({'persistent': True,
4345 'frequency': 2412})
4346 logger.info("Add a persistent group")
4347 p2p.GroupAdd(params)
4348 return False
4349
4350 def success(self):
4351 return self.done
4352
4353 with TestDbusP2p(bus) as t:
4354 if not t.success():
4355 raise Exception("Expected signals not seen")
4356
4357 def test_dbus_p2p_go_neg_rx(dev, apdev):
4358 """D-Bus P2P GO Negotiation receive"""
4359 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4360 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4361 addr0 = dev[0].p2p_dev_addr()
4362
4363 class TestDbusP2p(TestDbus):
4364 def __init__(self, bus):
4365 TestDbus.__init__(self, bus)
4366 self.done = False
4367
4368 def __enter__(self):
4369 gobject.timeout_add(1, self.run_test)
4370 gobject.timeout_add(15000, self.timeout)
4371 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4372 "DeviceFound")
4373 self.add_signal(self.goNegotiationRequest,
4374 WPAS_DBUS_IFACE_P2PDEVICE,
4375 "GONegotiationRequest",
4376 byte_arrays=True)
4377 self.add_signal(self.goNegotiationSuccess,
4378 WPAS_DBUS_IFACE_P2PDEVICE,
4379 "GONegotiationSuccess",
4380 byte_arrays=True)
4381 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4382 "GroupStarted")
4383 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4384 "GroupFinished")
4385 self.loop.run()
4386 return self
4387
4388 def deviceFound(self, path):
4389 logger.debug("deviceFound: path=%s" % path)
4390
4391 def goNegotiationRequest(self, path, dev_passwd_id, go_intent=0):
4392 logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path, dev_passwd_id, go_intent))
4393 if dev_passwd_id != 1:
4394 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id)
4395 args = { 'peer': path, 'wps_method': 'display', 'pin': '12345670',
4396 'go_intent': 15, 'persistent': False, 'frequency': 5175 }
4397 try:
4398 p2p.Connect(args)
4399 raise Exception("Invalid Connect accepted")
4400 except dbus.exceptions.DBusException, e:
4401 if "ConnectChannelUnsupported" not in str(e):
4402 raise Exception("Unexpected error message for invalid Connect: " + str(e))
4403
4404 args = { 'peer': path, 'wps_method': 'display', 'pin': '12345670',
4405 'go_intent': 15, 'persistent': False }
4406 p2p.Connect(args)
4407
4408 def goNegotiationSuccess(self, properties):
4409 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4410
4411 def groupStarted(self, properties):
4412 logger.debug("groupStarted: " + str(properties))
4413 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4414 properties['interface_object'])
4415 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4416 group_p2p.Disconnect()
4417
4418 def groupFinished(self, properties):
4419 logger.debug("groupFinished: " + str(properties))
4420 self.done = True
4421 self.loop.quit()
4422
4423 def run_test(self, *args):
4424 logger.debug("run_test")
4425 p2p.Listen(10)
4426 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4427 if not dev1.discover_peer(addr0):
4428 raise Exception("Peer not found")
4429 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 enter")
4430 return False
4431
4432 def success(self):
4433 return self.done
4434
4435 with TestDbusP2p(bus) as t:
4436 if not t.success():
4437 raise Exception("Expected signals not seen")
4438
4439 def test_dbus_p2p_go_neg_auth(dev, apdev):
4440 """D-Bus P2P GO Negotiation authorized"""
4441 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4442 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4443 addr0 = dev[0].p2p_dev_addr()
4444 dev[1].p2p_listen()
4445
4446 class TestDbusP2p(TestDbus):
4447 def __init__(self, bus):
4448 TestDbus.__init__(self, bus)
4449 self.done = False
4450 self.peer_joined = False
4451 self.peer_disconnected = False
4452
4453 def __enter__(self):
4454 gobject.timeout_add(1, self.run_test)
4455 gobject.timeout_add(15000, self.timeout)
4456 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4457 "DeviceFound")
4458 self.add_signal(self.goNegotiationSuccess,
4459 WPAS_DBUS_IFACE_P2PDEVICE,
4460 "GONegotiationSuccess",
4461 byte_arrays=True)
4462 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4463 "GroupStarted")
4464 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4465 "GroupFinished")
4466 self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE,
4467 "StaDeauthorized")
4468 self.add_signal(self.peerJoined, WPAS_DBUS_GROUP,
4469 "PeerJoined")
4470 self.add_signal(self.peerDisconnected, WPAS_DBUS_GROUP,
4471 "PeerDisconnected")
4472 self.loop.run()
4473 return self
4474
4475 def deviceFound(self, path):
4476 logger.debug("deviceFound: path=%s" % path)
4477 args = { 'peer': path, 'wps_method': 'keypad',
4478 'go_intent': 15, 'authorize_only': True }
4479 try:
4480 p2p.Connect(args)
4481 raise Exception("Invalid Connect accepted")
4482 except dbus.exceptions.DBusException, e:
4483 if "InvalidArgs" not in str(e):
4484 raise Exception("Unexpected error message for invalid Connect: " + str(e))
4485
4486 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4487 'go_intent': 15, 'authorize_only': True }
4488 p2p.Connect(args)
4489 p2p.Listen(10)
4490 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4491 if not dev1.discover_peer(addr0):
4492 raise Exception("Peer not found")
4493 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=0")
4494 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
4495 if ev is None:
4496 raise Exception("Group formation timed out")
4497 self.sta_group_ev = ev
4498
4499 def goNegotiationSuccess(self, properties):
4500 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4501
4502 def groupStarted(self, properties):
4503 logger.debug("groupStarted: " + str(properties))
4504 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4505 properties['interface_object'])
4506 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4507 dev1.group_form_result(self.sta_group_ev)
4508 dev1.remove_group()
4509
4510 def staDeauthorized(self, name):
4511 logger.debug("staDeuthorized: " + name)
4512 group_p2p = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4513 group_p2p.Disconnect()
4514
4515 def peerJoined(self, peer):
4516 logger.debug("peerJoined: " + peer)
4517 self.peer_joined = True
4518
4519 def peerDisconnected(self, peer):
4520 logger.debug("peerDisconnected: " + peer)
4521 self.peer_disconnected = True
4522
4523 def groupFinished(self, properties):
4524 logger.debug("groupFinished: " + str(properties))
4525 self.done = True
4526 self.loop.quit()
4527
4528 def run_test(self, *args):
4529 logger.debug("run_test")
4530 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4531 return False
4532
4533 def success(self):
4534 return self.done and self.peer_joined and self.peer_disconnected
4535
4536 with TestDbusP2p(bus) as t:
4537 if not t.success():
4538 raise Exception("Expected signals not seen")
4539
4540 def test_dbus_p2p_go_neg_init(dev, apdev):
4541 """D-Bus P2P GO Negotiation initiation"""
4542 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4543 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4544 addr0 = dev[0].p2p_dev_addr()
4545 dev[1].p2p_listen()
4546
4547 class TestDbusP2p(TestDbus):
4548 def __init__(self, bus):
4549 TestDbus.__init__(self, bus)
4550 self.done = False
4551 self.peer_group_added = False
4552 self.peer_group_removed = False
4553
4554 def __enter__(self):
4555 gobject.timeout_add(1, self.run_test)
4556 gobject.timeout_add(15000, self.timeout)
4557 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4558 "DeviceFound")
4559 self.add_signal(self.goNegotiationSuccess,
4560 WPAS_DBUS_IFACE_P2PDEVICE,
4561 "GONegotiationSuccess",
4562 byte_arrays=True)
4563 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4564 "GroupStarted")
4565 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4566 "GroupFinished")
4567 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4568 "PropertiesChanged")
4569 self.loop.run()
4570 return self
4571
4572 def deviceFound(self, path):
4573 logger.debug("deviceFound: path=%s" % path)
4574 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4575 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4576 'go_intent': 0 }
4577 p2p.Connect(args)
4578
4579 ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4580 if ev is None:
4581 raise Exception("Timeout while waiting for GO Neg Request")
4582 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
4583 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
4584 if ev is None:
4585 raise Exception("Group formation timed out")
4586 self.sta_group_ev = ev
4587
4588 def goNegotiationSuccess(self, properties):
4589 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4590
4591 def groupStarted(self, properties):
4592 logger.debug("groupStarted: " + str(properties))
4593 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4594 properties['interface_object'])
4595 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4596 group_p2p.Disconnect()
4597 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4598 dev1.group_form_result(self.sta_group_ev)
4599 dev1.remove_group()
4600
4601 def groupFinished(self, properties):
4602 logger.debug("groupFinished: " + str(properties))
4603 self.done = True
4604
4605 def propertiesChanged(self, interface_name, changed_properties,
4606 invalidated_properties):
4607 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4608 if interface_name != WPAS_DBUS_P2P_PEER:
4609 return
4610 if "Groups" not in changed_properties:
4611 return
4612 if len(changed_properties["Groups"]) > 0:
4613 self.peer_group_added = True
4614 if len(changed_properties["Groups"]) == 0:
4615 if not self.peer_group_added:
4616 # This is likely a leftover event from an earlier test case,
4617 # ignore it to allow this test case to go through its steps.
4618 logger.info("Ignore propertiesChanged indicating group removal before group has been added")
4619 return
4620 self.peer_group_removed = True
4621 self.loop.quit()
4622
4623 def run_test(self, *args):
4624 logger.debug("run_test")
4625 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4626 return False
4627
4628 def success(self):
4629 return self.done and self.peer_group_added and self.peer_group_removed
4630
4631 with TestDbusP2p(bus) as t:
4632 if not t.success():
4633 raise Exception("Expected signals not seen")
4634
4635 def test_dbus_p2p_group_termination_by_go(dev, apdev):
4636 """D-Bus P2P group removal on GO terminating the group"""
4637 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4638 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4639 addr0 = dev[0].p2p_dev_addr()
4640 dev[1].p2p_listen()
4641
4642 class TestDbusP2p(TestDbus):
4643 def __init__(self, bus):
4644 TestDbus.__init__(self, bus)
4645 self.done = False
4646 self.peer_group_added = False
4647 self.peer_group_removed = False
4648
4649 def __enter__(self):
4650 gobject.timeout_add(1, self.run_test)
4651 gobject.timeout_add(15000, self.timeout)
4652 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4653 "DeviceFound")
4654 self.add_signal(self.goNegotiationSuccess,
4655 WPAS_DBUS_IFACE_P2PDEVICE,
4656 "GONegotiationSuccess",
4657 byte_arrays=True)
4658 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4659 "GroupStarted")
4660 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4661 "GroupFinished")
4662 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4663 "PropertiesChanged")
4664 self.loop.run()
4665 return self
4666
4667 def deviceFound(self, path):
4668 logger.debug("deviceFound: path=%s" % path)
4669 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4670 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4671 'go_intent': 0 }
4672 p2p.Connect(args)
4673
4674 ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4675 if ev is None:
4676 raise Exception("Timeout while waiting for GO Neg Request")
4677 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
4678 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
4679 if ev is None:
4680 raise Exception("Group formation timed out")
4681 self.sta_group_ev = ev
4682
4683 def goNegotiationSuccess(self, properties):
4684 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4685
4686 def groupStarted(self, properties):
4687 logger.debug("groupStarted: " + str(properties))
4688 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4689 properties['interface_object'])
4690 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4691 dev1.group_form_result(self.sta_group_ev)
4692 dev1.remove_group()
4693
4694 def groupFinished(self, properties):
4695 logger.debug("groupFinished: " + str(properties))
4696 self.done = True
4697
4698 def propertiesChanged(self, interface_name, changed_properties,
4699 invalidated_properties):
4700 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4701 if interface_name != WPAS_DBUS_P2P_PEER:
4702 return
4703 if "Groups" not in changed_properties:
4704 return
4705 if len(changed_properties["Groups"]) > 0:
4706 self.peer_group_added = True
4707 if len(changed_properties["Groups"]) == 0 and self.peer_group_added:
4708 self.peer_group_removed = True
4709 self.loop.quit()
4710
4711 def run_test(self, *args):
4712 logger.debug("run_test")
4713 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4714 return False
4715
4716 def success(self):
4717 return self.done and self.peer_group_added and self.peer_group_removed
4718
4719 with TestDbusP2p(bus) as t:
4720 if not t.success():
4721 raise Exception("Expected signals not seen")
4722
4723 def test_dbus_p2p_group_idle_timeout(dev, apdev):
4724 """D-Bus P2P group removal on idle timeout"""
4725 try:
4726 dev[0].global_request("SET p2p_group_idle 1")
4727 _test_dbus_p2p_group_idle_timeout(dev, apdev)
4728 finally:
4729 dev[0].global_request("SET p2p_group_idle 0")
4730
4731 def _test_dbus_p2p_group_idle_timeout(dev, apdev):
4732 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4733 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4734 addr0 = dev[0].p2p_dev_addr()
4735 dev[1].p2p_listen()
4736
4737 class TestDbusP2p(TestDbus):
4738 def __init__(self, bus):
4739 TestDbus.__init__(self, bus)
4740 self.done = False
4741 self.group_started = False
4742 self.peer_group_added = False
4743 self.peer_group_removed = False
4744
4745 def __enter__(self):
4746 gobject.timeout_add(1, self.run_test)
4747 gobject.timeout_add(15000, self.timeout)
4748 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4749 "DeviceFound")
4750 self.add_signal(self.goNegotiationSuccess,
4751 WPAS_DBUS_IFACE_P2PDEVICE,
4752 "GONegotiationSuccess",
4753 byte_arrays=True)
4754 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4755 "GroupStarted")
4756 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4757 "GroupFinished")
4758 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4759 "PropertiesChanged")
4760 self.loop.run()
4761 return self
4762
4763 def deviceFound(self, path):
4764 logger.debug("deviceFound: path=%s" % path)
4765 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4766 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4767 'go_intent': 0 }
4768 p2p.Connect(args)
4769
4770 ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4771 if ev is None:
4772 raise Exception("Timeout while waiting for GO Neg Request")
4773 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
4774 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
4775 if ev is None:
4776 raise Exception("Group formation timed out")
4777 self.sta_group_ev = ev
4778
4779 def goNegotiationSuccess(self, properties):
4780 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4781
4782 def groupStarted(self, properties):
4783 logger.debug("groupStarted: " + str(properties))
4784 self.group_started = True
4785 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4786 properties['interface_object'])
4787 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4788 dev1.group_form_result(self.sta_group_ev)
4789 ifaddr = dev1.group_request("STA-FIRST").splitlines()[0]
4790 # Force disassociation with different reason code so that the
4791 # P2P Client using D-Bus does not get normal group termination event
4792 # from the GO.
4793 dev1.group_request("DEAUTHENTICATE " + ifaddr + " reason=0 test=0")
4794 dev1.remove_group()
4795
4796 def groupFinished(self, properties):
4797 logger.debug("groupFinished: " + str(properties))
4798 self.done = True
4799
4800 def propertiesChanged(self, interface_name, changed_properties,
4801 invalidated_properties):
4802 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4803 if interface_name != WPAS_DBUS_P2P_PEER:
4804 return
4805 if not self.group_started:
4806 return
4807 if "Groups" not in changed_properties:
4808 return
4809 if len(changed_properties["Groups"]) > 0:
4810 self.peer_group_added = True
4811 if len(changed_properties["Groups"]) == 0:
4812 self.peer_group_removed = True
4813 self.loop.quit()
4814
4815 def run_test(self, *args):
4816 logger.debug("run_test")
4817 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4818 return False
4819
4820 def success(self):
4821 return self.done and self.peer_group_added and self.peer_group_removed
4822
4823 with TestDbusP2p(bus) as t:
4824 if not t.success():
4825 raise Exception("Expected signals not seen")
4826
4827 def test_dbus_p2p_wps_failure(dev, apdev):
4828 """D-Bus P2P WPS failure"""
4829 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4830 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4831 addr0 = dev[0].p2p_dev_addr()
4832
4833 class TestDbusP2p(TestDbus):
4834 def __init__(self, bus):
4835 TestDbus.__init__(self, bus)
4836 self.wps_failed = False
4837 self.formation_failure = False
4838
4839 def __enter__(self):
4840 gobject.timeout_add(1, self.run_test)
4841 gobject.timeout_add(15000, self.timeout)
4842 self.add_signal(self.goNegotiationRequest,
4843 WPAS_DBUS_IFACE_P2PDEVICE,
4844 "GONegotiationRequest",
4845 byte_arrays=True)
4846 self.add_signal(self.goNegotiationSuccess,
4847 WPAS_DBUS_IFACE_P2PDEVICE,
4848 "GONegotiationSuccess",
4849 byte_arrays=True)
4850 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4851 "GroupStarted")
4852 self.add_signal(self.wpsFailed, WPAS_DBUS_IFACE_P2PDEVICE,
4853 "WpsFailed")
4854 self.add_signal(self.groupFormationFailure,
4855 WPAS_DBUS_IFACE_P2PDEVICE,
4856 "GroupFormationFailure")
4857 self.loop.run()
4858 return self
4859
4860 def goNegotiationRequest(self, path, dev_passwd_id, go_intent=0):
4861 logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path, dev_passwd_id, go_intent))
4862 if dev_passwd_id != 1:
4863 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id)
4864 args = { 'peer': path, 'wps_method': 'display', 'pin': '12345670',
4865 'go_intent': 15 }
4866 p2p.Connect(args)
4867
4868 def goNegotiationSuccess(self, properties):
4869 logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4870
4871 def groupStarted(self, properties):
4872 logger.debug("groupStarted: " + str(properties))
4873 raise Exception("Unexpected GroupStarted")
4874
4875 def wpsFailed(self, name, args):
4876 logger.debug("wpsFailed - name=%s args=%s" % (name, str(args)))
4877 self.wps_failed = True
4878 if self.formation_failure:
4879 self.loop.quit()
4880
4881 def groupFormationFailure(self, reason):
4882 logger.debug("groupFormationFailure - reason=%s" % reason)
4883 self.formation_failure = True
4884 if self.wps_failed:
4885 self.loop.quit()
4886
4887 def run_test(self, *args):
4888 logger.debug("run_test")
4889 p2p.Listen(10)
4890 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4891 if not dev1.discover_peer(addr0):
4892 raise Exception("Peer not found")
4893 dev1.global_request("P2P_CONNECT " + addr0 + " 87654321 enter")
4894 return False
4895
4896 def success(self):
4897 return self.wps_failed and self.formation_failure
4898
4899 with TestDbusP2p(bus) as t:
4900 if not t.success():
4901 raise Exception("Expected signals not seen")
4902
4903 def test_dbus_p2p_two_groups(dev, apdev):
4904 """D-Bus P2P with two concurrent groups"""
4905 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
4906 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4907
4908 dev[0].request("SET p2p_no_group_iface 0")
4909 addr0 = dev[0].p2p_dev_addr()
4910 addr1 = dev[1].p2p_dev_addr()
4911 addr2 = dev[2].p2p_dev_addr()
4912 dev[1].p2p_start_go(freq=2412)
4913 dev1_group_ifname = dev[1].group_ifname
4914
4915 class TestDbusP2p(TestDbus):
4916 def __init__(self, bus):
4917 TestDbus.__init__(self, bus)
4918 self.done = False
4919 self.peer = None
4920 self.go = None
4921 self.group1 = None
4922 self.group2 = None
4923 self.groups_removed = False
4924
4925 def __enter__(self):
4926 gobject.timeout_add(1, self.run_test)
4927 gobject.timeout_add(15000, self.timeout)
4928 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4929 "PropertiesChanged", byte_arrays=True)
4930 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4931 "DeviceFound")
4932 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4933 "GroupStarted")
4934 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4935 "GroupFinished")
4936 self.add_signal(self.peerJoined, WPAS_DBUS_GROUP,
4937 "PeerJoined")
4938 self.loop.run()
4939 return self
4940
4941 def propertiesChanged(self, interface_name, changed_properties,
4942 invalidated_properties):
4943 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4944
4945 def deviceFound(self, path):
4946 logger.debug("deviceFound: path=%s" % path)
4947 if addr2.replace(':','') in path:
4948 self.peer = path
4949 elif addr1.replace(':','') in path:
4950 self.go = path
4951 if self.go and not self.group1:
4952 logger.info("Join the group")
4953 p2p.StopFind()
4954 pin = '12345670'
4955 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4956 dev1.group_ifname = dev1_group_ifname
4957 dev1.group_request("WPS_PIN any " + pin)
4958 args = { 'peer': self.go,
4959 'join': True,
4960 'wps_method': 'pin',
4961 'pin': pin,
4962 'frequency': 2412 }
4963 p2p.Connect(args)
4964
4965 def groupStarted(self, properties):
4966 logger.debug("groupStarted: " + str(properties))
4967 prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
4968 dbus_interface=dbus.PROPERTIES_IFACE)
4969 logger.debug("p2pdevice properties: " + str(prop))
4970
4971 g_obj = bus.get_object(WPAS_DBUS_SERVICE,
4972 properties['group_object'])
4973 res = g_obj.GetAll(WPAS_DBUS_GROUP,
4974 dbus_interface=dbus.PROPERTIES_IFACE,
4975 byte_arrays=True)
4976 logger.debug("Group properties: " + str(res))
4977
4978 if not self.group1:
4979 self.group1 = properties['group_object']
4980 self.group1iface = properties['interface_object']
4981 self.g1_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4982 self.group1iface)
4983
4984 logger.info("Start autonomous GO")
4985 params = dbus.Dictionary({ 'frequency': 2412 })
4986 p2p.GroupAdd(params)
4987 elif not self.group2:
4988 self.group2 = properties['group_object']
4989 self.group2iface = properties['interface_object']
4990 self.g2_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4991 self.group2iface)
4992 self.g2_bssid = res['BSSID']
4993
4994 if self.group1 and self.group2:
4995 logger.info("Authorize peer to join the group")
4996 a2 = binascii.unhexlify(addr2.replace(':',''))
4997 params = { 'Role': 'enrollee',
4998 'P2PDeviceAddress': dbus.ByteArray(a2),
4999 'Bssid': dbus.ByteArray(a2),
5000 'Type': 'pin',
5001 'Pin': '12345670' }
5002 g_wps = dbus.Interface(self.g2_if_obj, WPAS_DBUS_IFACE_WPS)
5003 g_wps.Start(params)
5004
5005 bssid = ':'.join([binascii.hexlify(l) for l in self.g2_bssid])
5006 dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
5007 dev2.scan_for_bss(bssid, freq=2412)
5008 dev2.global_request("P2P_CONNECT " + bssid + " 12345670 join freq=2412")
5009 ev = dev2.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
5010 if ev is None:
5011 raise Exception("Group join timed out")
5012 self.dev2_group_ev = ev
5013
5014 def groupFinished(self, properties):
5015 logger.debug("groupFinished: " + str(properties))
5016
5017 if self.group1 == properties['group_object']:
5018 self.group1 = None
5019 elif self.group2 == properties['group_object']:
5020 self.group2 = None
5021
5022 if not self.group1 and not self.group2:
5023 self.done = True
5024 self.loop.quit()
5025
5026 def peerJoined(self, peer):
5027 logger.debug("peerJoined: " + peer)
5028 if self.groups_removed:
5029 return
5030 self.check_results()
5031
5032 dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
5033 dev2.group_form_result(self.dev2_group_ev)
5034 dev2.remove_group()
5035
5036 logger.info("Disconnect group2")
5037 group_p2p = dbus.Interface(self.g2_if_obj,
5038 WPAS_DBUS_IFACE_P2PDEVICE)
5039 group_p2p.Disconnect()
5040
5041 logger.info("Disconnect group1")
5042 group_p2p = dbus.Interface(self.g1_if_obj,
5043 WPAS_DBUS_IFACE_P2PDEVICE)
5044 group_p2p.Disconnect()
5045 self.groups_removed = True
5046
5047 def check_results(self):
5048 logger.info("Check results with two concurrent groups in operation")
5049
5050 g1_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group1)
5051 res1 = g1_obj.GetAll(WPAS_DBUS_GROUP,
5052 dbus_interface=dbus.PROPERTIES_IFACE,
5053 byte_arrays=True)
5054
5055 g2_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group2)
5056 res2 = g2_obj.GetAll(WPAS_DBUS_GROUP,
5057 dbus_interface=dbus.PROPERTIES_IFACE,
5058 byte_arrays=True)
5059
5060 logger.info("group1 = " + self.group1)
5061 logger.debug("Group properties: " + str(res1))
5062
5063 logger.info("group2 = " + self.group2)
5064 logger.debug("Group properties: " + str(res2))
5065
5066 prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
5067 dbus_interface=dbus.PROPERTIES_IFACE)
5068 logger.debug("p2pdevice properties: " + str(prop))
5069
5070 if res1['Role'] != 'client':
5071 raise Exception("Group1 role reported incorrectly: " + res1['Role'])
5072 if res2['Role'] != 'GO':
5073 raise Exception("Group2 role reported incorrectly: " + res2['Role'])
5074 if prop['Role'] != 'device':
5075 raise Exception("p2pdevice role reported incorrectly: " + prop['Role'])
5076
5077 if len(res2['Members']) != 1:
5078 raise Exception("Unexpected Members value for group 2")
5079
5080 def run_test(self, *args):
5081 logger.debug("run_test")
5082 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
5083 return False
5084
5085 def success(self):
5086 return self.done
5087
5088 with TestDbusP2p(bus) as t:
5089 if not t.success():
5090 raise Exception("Expected signals not seen")
5091
5092 dev[1].remove_group()
5093
5094 def test_dbus_p2p_cancel(dev, apdev):
5095 """D-Bus P2P Cancel"""
5096 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5097 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
5098 try:
5099 p2p.Cancel()
5100 raise Exception("Unexpected p2p.Cancel() success")
5101 except dbus.exceptions.DBusException, e:
5102 pass
5103
5104 addr0 = dev[0].p2p_dev_addr()
5105 dev[1].p2p_listen()
5106
5107 class TestDbusP2p(TestDbus):
5108 def __init__(self, bus):
5109 TestDbus.__init__(self, bus)
5110 self.done = False
5111
5112 def __enter__(self):
5113 gobject.timeout_add(1, self.run_test)
5114 gobject.timeout_add(15000, self.timeout)
5115 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
5116 "DeviceFound")
5117 self.loop.run()
5118 return self
5119
5120 def deviceFound(self, path):
5121 logger.debug("deviceFound: path=%s" % path)
5122 args = { 'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
5123 'go_intent': 0 }
5124 p2p.Connect(args)
5125 p2p.Cancel()
5126 self.done = True
5127 self.loop.quit()
5128
5129 def run_test(self, *args):
5130 logger.debug("run_test")
5131 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
5132 return False
5133
5134 def success(self):
5135 return self.done
5136
5137 with TestDbusP2p(bus) as t:
5138 if not t.success():
5139 raise Exception("Expected signals not seen")
5140
5141 def test_dbus_p2p_ip_addr(dev, apdev):
5142 """D-Bus P2P and IP address parameters"""
5143 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5144 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
5145
5146 vals = [ ("IpAddrGo", "192.168.43.1"),
5147 ("IpAddrMask", "255.255.255.0"),
5148 ("IpAddrStart", "192.168.43.100"),
5149 ("IpAddrEnd", "192.168.43.199") ]
5150 for field, value in vals:
5151 if_obj.Set(WPAS_DBUS_IFACE, field, value,
5152 dbus_interface=dbus.PROPERTIES_IFACE)
5153 val = if_obj.Get(WPAS_DBUS_IFACE, field,
5154 dbus_interface=dbus.PROPERTIES_IFACE)
5155 if val != value:
5156 raise Exception("Unexpected %s value: %s" % (field, val))
5157
5158 set_ip_addr_info(dev[1])
5159
5160 dev[0].global_request("SET p2p_go_intent 0")
5161
5162 req = dev[0].global_request("NFC_GET_HANDOVER_REQ NDEF P2P-CR").rstrip()
5163 if "FAIL" in req:
5164 raise Exception("Failed to generate NFC connection handover request")
5165 sel = dev[1].global_request("NFC_GET_HANDOVER_SEL NDEF P2P-CR").rstrip()
5166 if "FAIL" in sel:
5167 raise Exception("Failed to generate NFC connection handover select")
5168 dev[0].dump_monitor()
5169 dev[1].dump_monitor()
5170 res = dev[1].global_request("NFC_REPORT_HANDOVER RESP P2P " + req + " " + sel)
5171 if "FAIL" in res:
5172 raise Exception("Failed to report NFC connection handover to wpa_supplicant(resp)")
5173 res = dev[0].global_request("NFC_REPORT_HANDOVER INIT P2P " + req + " " + sel)
5174 if "FAIL" in res:
5175 raise Exception("Failed to report NFC connection handover to wpa_supplicant(init)")
5176
5177 class TestDbusP2p(TestDbus):
5178 def __init__(self, bus):
5179 TestDbus.__init__(self, bus)
5180 self.done = False
5181
5182 def __enter__(self):
5183 gobject.timeout_add(1, self.run_test)
5184 gobject.timeout_add(15000, self.timeout)
5185 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
5186 "GroupStarted")
5187 self.loop.run()
5188 return self
5189
5190 def groupStarted(self, properties):
5191 logger.debug("groupStarted: " + str(properties))
5192 self.loop.quit()
5193
5194 if 'IpAddrGo' not in properties:
5195 logger.info("IpAddrGo missing from GroupStarted")
5196 ip_addr_go = properties['IpAddrGo']
5197 addr = "%d.%d.%d.%d" % (ip_addr_go[0], ip_addr_go[1], ip_addr_go[2], ip_addr_go[3])
5198 if addr != "192.168.42.1":
5199 logger.info("Unexpected IpAddrGo value: " + addr)
5200 self.done = True
5201
5202 def run_test(self, *args):
5203 logger.debug("run_test")
5204 return False
5205
5206 def success(self):
5207 return self.done
5208
5209 with TestDbusP2p(bus) as t:
5210 if not t.success():
5211 raise Exception("Expected signals not seen")
5212
5213 def test_dbus_introspect(dev, apdev):
5214 """D-Bus introspection"""
5215 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5216
5217 res = if_obj.Introspect(WPAS_DBUS_IFACE,
5218 dbus_interface=dbus.INTROSPECTABLE_IFACE)
5219 logger.info("Initial Introspect: " + str(res))
5220 if res is None or "Introspectable" not in res or "GroupStarted" not in res:
5221 raise Exception("Unexpected initial Introspect response: " + str(res))
5222 if "FastReauth" not in res or "PassiveScan" not in res:
5223 raise Exception("Unexpected initial Introspect response: " + str(res))
5224
5225 with alloc_fail(dev[0], 1, "wpa_dbus_introspect"):
5226 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
5227 dbus_interface=dbus.INTROSPECTABLE_IFACE)
5228 logger.info("Introspect: " + str(res2))
5229 if res2 is not None:
5230 raise Exception("Unexpected Introspect response")
5231
5232 with alloc_fail(dev[0], 1, "=add_interface;wpa_dbus_introspect"):
5233 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
5234 dbus_interface=dbus.INTROSPECTABLE_IFACE)
5235 logger.info("Introspect: " + str(res2))
5236 if res2 is None:
5237 raise Exception("No Introspect response")
5238 if len(res2) >= len(res):
5239 raise Exception("Unexpected Introspect response")
5240
5241 with alloc_fail(dev[0], 1, "wpabuf_alloc;add_interface;wpa_dbus_introspect"):
5242 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
5243 dbus_interface=dbus.INTROSPECTABLE_IFACE)
5244 logger.info("Introspect: " + str(res2))
5245 if res2 is None:
5246 raise Exception("No Introspect response")
5247 if len(res2) >= len(res):
5248 raise Exception("Unexpected Introspect response")
5249
5250 with alloc_fail(dev[0], 2, "=add_interface;wpa_dbus_introspect"):
5251 res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
5252 dbus_interface=dbus.INTROSPECTABLE_IFACE)
5253 logger.info("Introspect: " + str(res2))
5254 if res2 is None:
5255 raise Exception("No Introspect response")
5256 if len(res2) >= len(res):
5257 raise Exception("Unexpected Introspect response")
5258
5259 def test_dbus_ap(dev, apdev):
5260 """D-Bus AddNetwork for AP mode"""
5261 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5262 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5263
5264 ssid = "test-wpa2-psk"
5265 passphrase = 'qwertyuiop'
5266
5267 class TestDbusConnect(TestDbus):
5268 def __init__(self, bus):
5269 TestDbus.__init__(self, bus)
5270 self.started = False
5271
5272 def __enter__(self):
5273 gobject.timeout_add(1, self.run_connect)
5274 gobject.timeout_add(15000, self.timeout)
5275 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
5276 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
5277 "NetworkSelected")
5278 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
5279 "PropertiesChanged")
5280 self.loop.run()
5281 return self
5282
5283 def networkAdded(self, network, properties):
5284 logger.debug("networkAdded: %s" % str(network))
5285 logger.debug(str(properties))
5286
5287 def networkSelected(self, network):
5288 logger.debug("networkSelected: %s" % str(network))
5289 self.network_selected = True
5290
5291 def propertiesChanged(self, properties):
5292 logger.debug("propertiesChanged: %s" % str(properties))
5293 if 'State' in properties and properties['State'] == "completed":
5294 self.started = True
5295 self.loop.quit()
5296
5297 def run_connect(self, *args):
5298 logger.debug("run_connect")
5299 args = dbus.Dictionary({ 'ssid': ssid,
5300 'key_mgmt': 'WPA-PSK',
5301 'psk': passphrase,
5302 'mode': 2,
5303 'frequency': 2412 },
5304 signature='sv')
5305 self.netw = iface.AddNetwork(args)
5306 iface.SelectNetwork(self.netw)
5307 return False
5308
5309 def success(self):
5310 return self.started
5311
5312 with TestDbusConnect(bus) as t:
5313 if not t.success():
5314 raise Exception("Expected signals not seen")
5315 dev[1].connect(ssid, psk=passphrase, scan_freq="2412")
5316
5317 def test_dbus_connect_wpa_eap(dev, apdev):
5318 """D-Bus AddNetwork and connection with WPA+WPA2-Enterprise AP"""
5319 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5320 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5321
5322 ssid = "test-wpa-eap"
5323 params = hostapd.wpa_eap_params(ssid=ssid)
5324 params["wpa"] = "3"
5325 params["rsn_pairwise"] = "CCMP"
5326 hapd = hostapd.add_ap(apdev[0], params)
5327
5328 class TestDbusConnect(TestDbus):
5329 def __init__(self, bus):
5330 TestDbus.__init__(self, bus)
5331 self.done = False
5332
5333 def __enter__(self):
5334 gobject.timeout_add(1, self.run_connect)
5335 gobject.timeout_add(15000, self.timeout)
5336 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
5337 "PropertiesChanged")
5338 self.add_signal(self.eap, WPAS_DBUS_IFACE, "EAP")
5339 self.loop.run()
5340 return self
5341
5342 def propertiesChanged(self, properties):
5343 logger.debug("propertiesChanged: %s" % str(properties))
5344 if 'State' in properties and properties['State'] == "completed":
5345 self.done = True
5346 self.loop.quit()
5347
5348 def eap(self, status, parameter):
5349 logger.debug("EAP: status=%s parameter=%s" % (status, parameter))
5350
5351 def run_connect(self, *args):
5352 logger.debug("run_connect")
5353 args = dbus.Dictionary({ 'ssid': ssid,
5354 'key_mgmt': 'WPA-EAP',
5355 'eap': 'PEAP',
5356 'identity': 'user',
5357 'password': 'password',
5358 'ca_cert': 'auth_serv/ca.pem',
5359 'phase2': 'auth=MSCHAPV2',
5360 'scan_freq': 2412 },
5361 signature='sv')
5362 self.netw = iface.AddNetwork(args)
5363 iface.SelectNetwork(self.netw)
5364 return False
5365
5366 def success(self):
5367 return self.done
5368
5369 with TestDbusConnect(bus) as t:
5370 if not t.success():
5371 raise Exception("Expected signals not seen")
5372
5373 def test_dbus_ap_scan_2_ap_mode_scan(dev, apdev):
5374 """AP_SCAN 2 AP mode and D-Bus Scan()"""
5375 try:
5376 _test_dbus_ap_scan_2_ap_mode_scan(dev, apdev)
5377 finally:
5378 dev[0].request("AP_SCAN 1")
5379
5380 def _test_dbus_ap_scan_2_ap_mode_scan(dev, apdev):
5381 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5382 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5383
5384 if "OK" not in dev[0].request("AP_SCAN 2"):
5385 raise Exception("Failed to set AP_SCAN 2")
5386
5387 id = dev[0].add_network()
5388 dev[0].set_network(id, "mode", "2")
5389 dev[0].set_network_quoted(id, "ssid", "wpas-ap-open")
5390 dev[0].set_network(id, "key_mgmt", "NONE")
5391 dev[0].set_network(id, "frequency", "2412")
5392 dev[0].set_network(id, "scan_freq", "2412")
5393 dev[0].set_network(id, "disabled", "0")
5394 dev[0].select_network(id)
5395 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=5)
5396 if ev is None:
5397 raise Exception("AP failed to start")
5398
5399 with fail_test(dev[0], 1, "wpa_driver_nl80211_scan"):
5400 iface.Scan({'Type': 'active',
5401 'AllowRoam': True,
5402 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
5403 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-FAILED",
5404 "AP-DISABLED"], timeout=5)
5405 if ev is None:
5406 raise Exception("CTRL-EVENT-SCAN-FAILED not seen")
5407 if "AP-DISABLED" in ev:
5408 raise Exception("Unexpected AP-DISABLED event")
5409 if "retry=1" in ev:
5410 # Wait for the retry to scan happen
5411 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-FAILED",
5412 "AP-DISABLED"], timeout=5)
5413 if ev is None:
5414 raise Exception("CTRL-EVENT-SCAN-FAILED not seen - retry")
5415 if "AP-DISABLED" in ev:
5416 raise Exception("Unexpected AP-DISABLED event - retry")
5417
5418 dev[1].connect("wpas-ap-open", key_mgmt="NONE", scan_freq="2412")
5419 dev[1].request("DISCONNECT")
5420 dev[1].wait_disconnected()
5421 dev[0].request("DISCONNECT")
5422 dev[0].wait_disconnected()
5423
5424 def test_dbus_expectdisconnect(dev, apdev):
5425 """D-Bus ExpectDisconnect"""
5426 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5427 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
5428
5429 params = { "ssid": "test-open" }
5430 hapd = hostapd.add_ap(apdev[0], params)
5431 dev[0].connect("test-open", key_mgmt="NONE", scan_freq="2412")
5432
5433 # This does not really verify the behavior other than by going through the
5434 # code path for additional coverage.
5435 wpas.ExpectDisconnect()
5436 dev[0].request("DISCONNECT")
5437 dev[0].wait_disconnected()
5438
5439 def test_dbus_save_config(dev, apdev):
5440 """D-Bus SaveConfig"""
5441 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5442 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5443 try:
5444 iface.SaveConfig()
5445 raise Exception("SaveConfig() accepted unexpectedly")
5446 except dbus.exceptions.DBusException, e:
5447 if not str(e).startswith("fi.w1.wpa_supplicant1.UnknownError: Not allowed to update configuration"):
5448 raise Exception("Unexpected error message for SaveConfig(): " + str(e))
5449
5450 def test_dbus_vendor_elem(dev, apdev):
5451 """D-Bus vendor element operations"""
5452 try:
5453 _test_dbus_vendor_elem(dev, apdev)
5454 finally:
5455 dev[0].request("VENDOR_ELEM_REMOVE 1 *")
5456
5457 def _test_dbus_vendor_elem(dev, apdev):
5458 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5459 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5460
5461 dev[0].request("VENDOR_ELEM_REMOVE 1 *")
5462
5463 try:
5464 ie = dbus.ByteArray("\x00\x00")
5465 iface.VendorElemAdd(-1, ie)
5466 raise Exception("Invalid VendorElemAdd() accepted")
5467 except dbus.exceptions.DBusException, e:
5468 if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5469 raise Exception("Unexpected error message for invalid VendorElemAdd[1]: " + str(e))
5470
5471 try:
5472 ie = dbus.ByteArray("")
5473 iface.VendorElemAdd(1, ie)
5474 raise Exception("Invalid VendorElemAdd() accepted")
5475 except dbus.exceptions.DBusException, e:
5476 if "InvalidArgs" not in str(e) or "Invalid value" not in str(e):
5477 raise Exception("Unexpected error message for invalid VendorElemAdd[2]: " + str(e))
5478
5479 try:
5480 ie = dbus.ByteArray("\x00\x01")
5481 iface.VendorElemAdd(1, ie)
5482 raise Exception("Invalid VendorElemAdd() accepted")
5483 except dbus.exceptions.DBusException, e:
5484 if "InvalidArgs" not in str(e) or "Parse error" not in str(e):
5485 raise Exception("Unexpected error message for invalid VendorElemAdd[3]: " + str(e))
5486
5487 try:
5488 iface.VendorElemGet(-1)
5489 raise Exception("Invalid VendorElemGet() accepted")
5490 except dbus.exceptions.DBusException, e:
5491 if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5492 raise Exception("Unexpected error message for invalid VendorElemGet[1]: " + str(e))
5493
5494 try:
5495 iface.VendorElemGet(1)
5496 raise Exception("Invalid VendorElemGet() accepted")
5497 except dbus.exceptions.DBusException, e:
5498 if "InvalidArgs" not in str(e) or "ID value does not exist" not in str(e):
5499 raise Exception("Unexpected error message for invalid VendorElemGet[2]: " + str(e))
5500
5501 try:
5502 ie = dbus.ByteArray("\x00\x00")
5503 iface.VendorElemRem(-1, ie)
5504 raise Exception("Invalid VendorElemRemove() accepted")
5505 except dbus.exceptions.DBusException, e:
5506 if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5507 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5508
5509 try:
5510 ie = dbus.ByteArray("")
5511 iface.VendorElemRem(1, ie)
5512 raise Exception("Invalid VendorElemRemove() accepted")
5513 except dbus.exceptions.DBusException, e:
5514 if "InvalidArgs" not in str(e) or "Invalid value" not in str(e):
5515 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5516
5517 iface.VendorElemRem(1, "*")
5518
5519 ie = dbus.ByteArray("\x00\x01\x00")
5520 iface.VendorElemAdd(1, ie)
5521
5522 val = iface.VendorElemGet(1)
5523 if len(val) != len(ie):
5524 raise Exception("Unexpected VendorElemGet length")
5525 for i in range(len(val)):
5526 if val[i] != dbus.Byte(ie[i]):
5527 raise Exception("Unexpected VendorElemGet data")
5528
5529 ie2 = dbus.ByteArray("\xe0\x00")
5530 iface.VendorElemAdd(1, ie2)
5531
5532 ies = ie + ie2
5533 val = iface.VendorElemGet(1)
5534 if len(val) != len(ies):
5535 raise Exception("Unexpected VendorElemGet length[2]")
5536 for i in range(len(val)):
5537 if val[i] != dbus.Byte(ies[i]):
5538 raise Exception("Unexpected VendorElemGet data[2]")
5539
5540 try:
5541 test_ie = dbus.ByteArray("\x01\x01")
5542 iface.VendorElemRem(1, test_ie)
5543 raise Exception("Invalid VendorElemRemove() accepted")
5544 except dbus.exceptions.DBusException, e:
5545 if "InvalidArgs" not in str(e) or "Parse error" not in str(e):
5546 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5547
5548 iface.VendorElemRem(1, ie)
5549 val = iface.VendorElemGet(1)
5550 if len(val) != len(ie2):
5551 raise Exception("Unexpected VendorElemGet length[3]")
5552
5553 iface.VendorElemRem(1, "*")
5554 try:
5555 iface.VendorElemGet(1)
5556 raise Exception("Invalid VendorElemGet() accepted after removal")
5557 except dbus.exceptions.DBusException, e:
5558 if "InvalidArgs" not in str(e) or "ID value does not exist" not in str(e):
5559 raise Exception("Unexpected error message for invalid VendorElemGet after removal: " + str(e))
5560
5561 def test_dbus_assoc_reject(dev, apdev):
5562 """D-Bus AssocStatusCode"""
5563 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5564 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5565
5566 ssid = "test-open"
5567 params = { "ssid": ssid,
5568 "max_listen_interval": "1" }
5569 hapd = hostapd.add_ap(apdev[0], params)
5570
5571 class TestDbusConnect(TestDbus):
5572 def __init__(self, bus):
5573 TestDbus.__init__(self, bus)
5574 self.assoc_status_seen = False
5575 self.state = 0
5576
5577 def __enter__(self):
5578 gobject.timeout_add(1, self.run_connect)
5579 gobject.timeout_add(15000, self.timeout)
5580 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
5581 "PropertiesChanged")
5582 self.loop.run()
5583 return self
5584
5585 def propertiesChanged(self, properties):
5586 logger.debug("propertiesChanged: %s" % str(properties))
5587 if 'AssocStatusCode' in properties:
5588 status = properties['AssocStatusCode']
5589 if status != 51:
5590 logger.info("Unexpected status code: " + str(status))
5591 else:
5592 self.assoc_status_seen = True
5593 iface.Disconnect()
5594 self.loop.quit()
5595
5596 def run_connect(self, *args):
5597 args = dbus.Dictionary({ 'ssid': ssid,
5598 'key_mgmt': 'NONE',
5599 'scan_freq': 2412 },
5600 signature='sv')
5601 self.netw = iface.AddNetwork(args)
5602 iface.SelectNetwork(self.netw)
5603 return False
5604
5605 def success(self):
5606 return self.assoc_status_seen
5607
5608 with TestDbusConnect(bus) as t:
5609 if not t.success():
5610 raise Exception("Expected signals not seen")
5611
5612 def test_dbus_mesh(dev, apdev):
5613 """D-Bus mesh"""
5614 check_mesh_support(dev[0])
5615 (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
5616 mesh = dbus.Interface(if_obj, WPAS_DBUS_IFACE_MESH)
5617
5618 add_open_mesh_network(dev[1])
5619 addr1 = dev[1].own_addr()
5620
5621 class TestDbusMesh(TestDbus):
5622 def __init__(self, bus):
5623 TestDbus.__init__(self, bus)
5624 self.done = False
5625
5626 def __enter__(self):
5627 gobject.timeout_add(1, self.run_test)
5628 gobject.timeout_add(15000, self.timeout)
5629 self.add_signal(self.meshGroupStarted, WPAS_DBUS_IFACE_MESH,
5630 "MeshGroupStarted")
5631 self.add_signal(self.meshGroupRemoved, WPAS_DBUS_IFACE_MESH,
5632 "MeshGroupRemoved")
5633 self.add_signal(self.meshPeerConnected, WPAS_DBUS_IFACE_MESH,
5634 "MeshPeerConnected")
5635 self.add_signal(self.meshPeerDisconnected, WPAS_DBUS_IFACE_MESH,
5636 "MeshPeerDisconnected")
5637 self.loop.run()
5638 return self
5639
5640 def meshGroupStarted(self, args):
5641 logger.debug("MeshGroupStarted: " + str(args))
5642
5643 def meshGroupRemoved(self, args):
5644 logger.debug("MeshGroupRemoved: " + str(args))
5645 self.done = True
5646 self.loop.quit()
5647
5648 def meshPeerConnected(self, args):
5649 logger.debug("MeshPeerConnected: " + str(args))
5650
5651 res = if_obj.Get(WPAS_DBUS_IFACE_MESH, 'MeshPeers',
5652 dbus_interface=dbus.PROPERTIES_IFACE,
5653 byte_arrays=True)
5654 logger.debug("MeshPeers: " + str(res))
5655 if len(res) != 1:
5656 raise Exception("Unexpected number of MeshPeer values")
5657 if binascii.hexlify(res[0]) != addr1.replace(':', ''):
5658 raise Exception("Unexpected peer address")
5659
5660 res = if_obj.Get(WPAS_DBUS_IFACE_MESH, 'MeshGroup',
5661 dbus_interface=dbus.PROPERTIES_IFACE,
5662 byte_arrays=True)
5663 logger.debug("MeshGroup: " + str(res))
5664 if res != "wpas-mesh-open":
5665 raise Exception("Unexpected MeshGroup")
5666 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
5667 dev1.mesh_group_remove()
5668
5669 def meshPeerDisconnected(self, args):
5670 logger.debug("MeshPeerDisconnected: " + str(args))
5671 dev0 = WpaSupplicant('wlan0', '/tmp/wpas-wlan0')
5672 dev0.mesh_group_remove()
5673
5674 def run_test(self, *args):
5675 logger.debug("run_test")
5676 dev0 = WpaSupplicant('wlan0', '/tmp/wpas-wlan0')
5677 add_open_mesh_network(dev0)
5678 return False
5679
5680 def success(self):
5681 return self.done
5682
5683 with TestDbusMesh(bus) as t:
5684 if not t.success():
5685 raise Exception("Expected signals not seen")