1 # wpa_supplicant D-Bus interface tests
2 # Copyright (c) 2014-2015, Jouni Malinen <j@w1.fi>
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
9 logger
= logging
.getLogger()
21 from wpasupplicant
import WpaSupplicant
22 from utils
import HwsimSkip
, alloc_fail
, fail_test
23 from test_ap_tdls
import connect_2sta_open
25 WPAS_DBUS_SERVICE
= "fi.w1.wpa_supplicant1"
26 WPAS_DBUS_PATH
= "/fi/w1/wpa_supplicant1"
27 WPAS_DBUS_IFACE
= "fi.w1.wpa_supplicant1.Interface"
28 WPAS_DBUS_IFACE_WPS
= WPAS_DBUS_IFACE
+ ".WPS"
29 WPAS_DBUS_NETWORK
= "fi.w1.wpa_supplicant1.Network"
30 WPAS_DBUS_BSS
= "fi.w1.wpa_supplicant1.BSS"
31 WPAS_DBUS_IFACE_P2PDEVICE
= WPAS_DBUS_IFACE
+ ".P2PDevice"
32 WPAS_DBUS_P2P_PEER
= "fi.w1.wpa_supplicant1.Peer"
33 WPAS_DBUS_GROUP
= "fi.w1.wpa_supplicant1.Group"
34 WPAS_DBUS_PERSISTENT_GROUP
= "fi.w1.wpa_supplicant1.PersistentGroup"
36 def prepare_dbus(dev
):
38 logger
.info("No dbus module available")
39 raise HwsimSkip("No dbus module available")
41 from dbus
.mainloop
.glib
import DBusGMainLoop
42 dbus
.mainloop
.glib
.DBusGMainLoop(set_as_default
=True)
43 bus
= dbus
.SystemBus()
44 wpas_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, WPAS_DBUS_PATH
)
45 wpas
= dbus
.Interface(wpas_obj
, WPAS_DBUS_SERVICE
)
46 path
= wpas
.GetInterface(dev
.ifname
)
47 if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, path
)
48 return (bus
,wpas_obj
,path
,if_obj
)
50 raise HwsimSkip("Could not connect to D-Bus: %s" % e
)
52 class TestDbus(object):
53 def __init__(self
, bus
):
54 self
.loop
= gobject
.MainLoop()
58 def __exit__(self
, type, value
, traceback
):
59 for s
in self
.signals
:
62 def add_signal(self
, handler
, interface
, name
, byte_arrays
=False):
63 s
= self
.bus
.add_signal_receiver(handler
, dbus_interface
=interface
,
65 byte_arrays
=byte_arrays
)
66 self
.signals
.append(s
)
68 def timeout(self
, *args
):
69 logger
.debug("timeout")
73 class alloc_fail_dbus(object):
74 def __init__(self
, dev
, count
, funcs
, operation
="Operation",
79 self
._operation
= operation
80 self
._expected
= expected
82 cmd
= "TEST_ALLOC_FAIL %d:%s" % (self
._count
, self
._funcs
)
83 if "OK" not in self
._dev
.request(cmd
):
84 raise HwsimSkip("TEST_ALLOC_FAIL not supported")
85 def __exit__(self
, type, value
, traceback
):
87 raise Exception("%s succeeded during out-of-memory" % self
._operation
)
88 if type == dbus
.exceptions
.DBusException
and self
._expected
in str(value
):
90 if self
._dev
.request("GET_ALLOC_FAIL") != "0:%s" % self
._funcs
:
91 raise Exception("%s did not trigger allocation failure" % self
._operation
)
94 def start_ap(ap
, ssid
="test-wps",
95 ap_uuid
="27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"):
96 params
= { "ssid": ssid
, "eap_server": "1", "wps_state": "2",
97 "wpa_passphrase": "12345678", "wpa": "2",
98 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
99 "ap_pin": "12345670", "uuid": ap_uuid
}
100 return hostapd
.add_ap(ap
['ifname'], params
)
102 def test_dbus_getall(dev
, apdev
):
104 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
106 props
= wpas_obj
.GetAll(WPAS_DBUS_SERVICE
,
107 dbus_interface
=dbus
.PROPERTIES_IFACE
)
108 logger
.debug("GetAll(fi.w1.wpa.supplicant1, /fi/w1/wpa_supplicant1) ==> " + str(props
))
110 props
= if_obj
.GetAll(WPAS_DBUS_IFACE
,
111 dbus_interface
=dbus
.PROPERTIES_IFACE
)
112 logger
.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE
, path
, str(props
)))
114 props
= if_obj
.GetAll(WPAS_DBUS_IFACE_WPS
,
115 dbus_interface
=dbus
.PROPERTIES_IFACE
)
116 logger
.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE_WPS
, path
, str(props
)))
118 res
= if_obj
.Get(WPAS_DBUS_IFACE
, 'BSSs',
119 dbus_interface
=dbus
.PROPERTIES_IFACE
)
121 raise Exception("Unexpected BSSs entry: " + str(res
))
123 res
= if_obj
.Get(WPAS_DBUS_IFACE
, 'Networks',
124 dbus_interface
=dbus
.PROPERTIES_IFACE
)
126 raise Exception("Unexpected Networks entry: " + str(res
))
128 hapd
= hostapd
.add_ap(apdev
[0]['ifname'], { "ssid": "open" })
129 bssid
= apdev
[0]['bssid']
130 dev
[0].scan_for_bss(bssid
, freq
=2412)
131 id = dev
[0].add_network()
132 dev
[0].set_network(id, "disabled", "0")
133 dev
[0].set_network_quoted(id, "ssid", "test")
135 res
= if_obj
.Get(WPAS_DBUS_IFACE
, 'BSSs',
136 dbus_interface
=dbus
.PROPERTIES_IFACE
)
138 raise Exception("Missing BSSs entry: " + str(res
))
139 bss_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, res
[0])
140 props
= bss_obj
.GetAll(WPAS_DBUS_BSS
, dbus_interface
=dbus
.PROPERTIES_IFACE
)
141 logger
.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS
, res
[0], str(props
)))
143 for item
in props
['BSSID']:
144 if len(bssid_str
) > 0:
146 bssid_str
+= '%02x' % item
147 if bssid_str
!= bssid
:
148 raise Exception("Unexpected BSSID in BSSs entry")
150 res
= if_obj
.Get(WPAS_DBUS_IFACE
, 'Networks',
151 dbus_interface
=dbus
.PROPERTIES_IFACE
)
153 raise Exception("Missing Networks entry: " + str(res
))
154 net_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, res
[0])
155 props
= net_obj
.GetAll(WPAS_DBUS_NETWORK
,
156 dbus_interface
=dbus
.PROPERTIES_IFACE
)
157 logger
.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_NETWORK
, res
[0], str(props
)))
158 ssid
= props
['Properties']['ssid']
160 raise Exception("Unexpected SSID in network entry")
162 def dbus_get(dbus
, wpas_obj
, prop
, expect
=None, byte_arrays
=False):
163 val
= wpas_obj
.Get(WPAS_DBUS_SERVICE
, prop
,
164 dbus_interface
=dbus
.PROPERTIES_IFACE
,
165 byte_arrays
=byte_arrays
)
166 if expect
is not None and val
!= expect
:
167 raise Exception("Unexpected %s: %s (expected: %s)" %
168 (prop
, str(val
), str(expect
)))
171 def dbus_set(dbus
, wpas_obj
, prop
, val
):
172 wpas_obj
.Set(WPAS_DBUS_SERVICE
, prop
, val
,
173 dbus_interface
=dbus
.PROPERTIES_IFACE
)
175 def test_dbus_properties(dev
, apdev
):
176 """D-Bus Get/Set fi.w1.wpa_supplicant1 properties"""
177 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
179 dbus_get(dbus
, wpas_obj
, "DebugLevel", expect
="msgdump")
180 dbus_set(dbus
, wpas_obj
, "DebugLevel", "debug")
181 dbus_get(dbus
, wpas_obj
, "DebugLevel", expect
="debug")
182 for (val
,err
) in [ (3, "Error.Failed: wrong property type"),
183 ("foo", "Error.Failed: wrong debug level value") ]:
185 dbus_set(dbus
, wpas_obj
, "DebugLevel", val
)
186 raise Exception("Invalid DebugLevel value accepted: " + str(val
))
187 except dbus
.exceptions
.DBusException
, e
:
188 if err
not in str(e
):
189 raise Exception("Unexpected error message: " + str(e
))
190 dbus_set(dbus
, wpas_obj
, "DebugLevel", "msgdump")
191 dbus_get(dbus
, wpas_obj
, "DebugLevel", expect
="msgdump")
193 dbus_get(dbus
, wpas_obj
, "DebugTimestamp", expect
=True)
194 dbus_set(dbus
, wpas_obj
, "DebugTimestamp", False)
195 dbus_get(dbus
, wpas_obj
, "DebugTimestamp", expect
=False)
197 dbus_set(dbus
, wpas_obj
, "DebugTimestamp", "foo")
198 raise Exception("Invalid DebugTimestamp value accepted")
199 except dbus
.exceptions
.DBusException
, e
:
200 if "Error.Failed: wrong property type" not in str(e
):
201 raise Exception("Unexpected error message: " + str(e
))
202 dbus_set(dbus
, wpas_obj
, "DebugTimestamp", True)
203 dbus_get(dbus
, wpas_obj
, "DebugTimestamp", expect
=True)
205 dbus_get(dbus
, wpas_obj
, "DebugShowKeys", expect
=True)
206 dbus_set(dbus
, wpas_obj
, "DebugShowKeys", False)
207 dbus_get(dbus
, wpas_obj
, "DebugShowKeys", expect
=False)
209 dbus_set(dbus
, wpas_obj
, "DebugShowKeys", "foo")
210 raise Exception("Invalid DebugShowKeys value accepted")
211 except dbus
.exceptions
.DBusException
, e
:
212 if "Error.Failed: wrong property type" not in str(e
):
213 raise Exception("Unexpected error message: " + str(e
))
214 dbus_set(dbus
, wpas_obj
, "DebugShowKeys", True)
215 dbus_get(dbus
, wpas_obj
, "DebugShowKeys", expect
=True)
217 res
= dbus_get(dbus
, wpas_obj
, "Interfaces")
219 raise Exception("Unexpected Interfaces value: " + str(res
))
221 res
= dbus_get(dbus
, wpas_obj
, "EapMethods")
222 if len(res
) < 5 or "TTLS" not in res
:
223 raise Exception("Unexpected EapMethods value: " + str(res
))
225 res
= dbus_get(dbus
, wpas_obj
, "Capabilities")
226 if len(res
) < 2 or "p2p" not in res
:
227 raise Exception("Unexpected Capabilities value: " + str(res
))
229 dbus_get(dbus
, wpas_obj
, "WFDIEs", byte_arrays
=True)
230 val
= binascii
.unhexlify("010006020304050608")
231 dbus_set(dbus
, wpas_obj
, "WFDIEs", dbus
.ByteArray(val
))
232 res
= dbus_get(dbus
, wpas_obj
, "WFDIEs", byte_arrays
=True)
234 raise Exception("WFDIEs value changed")
236 dbus_set(dbus
, wpas_obj
, "WFDIEs", dbus
.ByteArray('\x00'))
237 raise Exception("Invalid WFDIEs value accepted")
238 except dbus
.exceptions
.DBusException
, e
:
239 if "InvalidArgs" not in str(e
):
240 raise Exception("Unexpected error message: " + str(e
))
241 dbus_set(dbus
, wpas_obj
, "WFDIEs", dbus
.ByteArray(''))
242 dbus_set(dbus
, wpas_obj
, "WFDIEs", dbus
.ByteArray(val
))
243 dbus_set(dbus
, wpas_obj
, "WFDIEs", dbus
.ByteArray(''))
244 res
= dbus_get(dbus
, wpas_obj
, "WFDIEs", byte_arrays
=True)
246 raise Exception("WFDIEs not cleared properly")
248 res
= dbus_get(dbus
, wpas_obj
, "EapMethods")
250 dbus_set(dbus
, wpas_obj
, "EapMethods", res
)
251 raise Exception("Invalid Set accepted")
252 except dbus
.exceptions
.DBusException
, e
:
253 if "InvalidArgs: Property is read-only" not in str(e
):
254 raise Exception("Unexpected error message: " + str(e
))
257 wpas_obj
.SetFoo(WPAS_DBUS_SERVICE
, "DebugShowKeys", True,
258 dbus_interface
=dbus
.PROPERTIES_IFACE
)
259 raise Exception("Unknown method accepted")
260 except dbus
.exceptions
.DBusException
, e
:
261 if "UnknownMethod" not in str(e
):
262 raise Exception("Unexpected error message: " + str(e
))
265 wpas_obj
.Get("foo", "DebugShowKeys",
266 dbus_interface
=dbus
.PROPERTIES_IFACE
)
267 raise Exception("Invalid Get accepted")
268 except dbus
.exceptions
.DBusException
, e
:
269 if "InvalidArgs: No such property" not in str(e
):
270 raise Exception("Unexpected error message: " + str(e
))
272 test_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, WPAS_DBUS_PATH
,
275 test_obj
.Get(123, "DebugShowKeys",
276 dbus_interface
=dbus
.PROPERTIES_IFACE
)
277 raise Exception("Invalid Get accepted")
278 except dbus
.exceptions
.DBusException
, e
:
279 if "InvalidArgs: Invalid arguments" not in str(e
):
280 raise Exception("Unexpected error message: " + str(e
))
282 test_obj
.Get(WPAS_DBUS_SERVICE
, 123,
283 dbus_interface
=dbus
.PROPERTIES_IFACE
)
284 raise Exception("Invalid Get accepted")
285 except dbus
.exceptions
.DBusException
, e
:
286 if "InvalidArgs: Invalid arguments" not in str(e
):
287 raise Exception("Unexpected error message: " + str(e
))
290 wpas_obj
.Set(WPAS_DBUS_SERVICE
, "WFDIEs",
291 dbus
.ByteArray('', variant_level
=2),
292 dbus_interface
=dbus
.PROPERTIES_IFACE
)
293 raise Exception("Invalid Set accepted")
294 except dbus
.exceptions
.DBusException
, e
:
295 if "InvalidArgs: invalid message format" not in str(e
):
296 raise Exception("Unexpected error message: " + str(e
))
298 def test_dbus_invalid_method(dev
, apdev
):
299 """D-Bus invalid method"""
300 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
301 wps
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_WPS
)
305 raise Exception("Unknown method accepted")
306 except dbus
.exceptions
.DBusException
, e
:
307 if "UnknownMethod" not in str(e
):
308 raise Exception("Unexpected error message: " + str(e
))
310 test_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, path
, introspect
=False)
311 test_wps
= dbus
.Interface(test_obj
, WPAS_DBUS_IFACE_WPS
)
314 raise Exception("WPS.Start with incorrect signature accepted")
315 except dbus
.exceptions
.DBusException
, e
:
316 if "InvalidArgs: Invalid arg" not in str(e
):
317 raise Exception("Unexpected error message: " + str(e
))
319 def test_dbus_get_set_wps(dev
, apdev
):
320 """D-Bus Get/Set for WPS properties"""
322 _test_dbus_get_set_wps(dev
, apdev
)
324 dev
[0].request("SET wps_cred_processing 0")
325 dev
[0].request("SET config_methods display keypad virtual_display nfc_interface p2ps")
327 def _test_dbus_get_set_wps(dev
, apdev
):
328 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
330 if_obj
.Get(WPAS_DBUS_IFACE_WPS
, "ConfigMethods",
331 dbus_interface
=dbus
.PROPERTIES_IFACE
)
333 val
= "display keypad virtual_display nfc_interface"
334 dev
[0].request("SET config_methods " + val
)
336 config
= if_obj
.Get(WPAS_DBUS_IFACE_WPS
, "ConfigMethods",
337 dbus_interface
=dbus
.PROPERTIES_IFACE
)
339 raise Exception("Unexpected Get(ConfigMethods) result: " + config
)
341 val2
= "push_button display"
342 if_obj
.Set(WPAS_DBUS_IFACE_WPS
, "ConfigMethods", val2
,
343 dbus_interface
=dbus
.PROPERTIES_IFACE
)
344 config
= if_obj
.Get(WPAS_DBUS_IFACE_WPS
, "ConfigMethods",
345 dbus_interface
=dbus
.PROPERTIES_IFACE
)
347 raise Exception("Unexpected Get(ConfigMethods) result after Set: " + config
)
349 dev
[0].request("SET config_methods " + val
)
352 dev
[0].request("SET wps_cred_processing " + str(i
))
353 val
= if_obj
.Get(WPAS_DBUS_IFACE_WPS
, "ProcessCredentials",
354 dbus_interface
=dbus
.PROPERTIES_IFACE
)
355 expected_val
= False if i
== 1 else True
356 if val
!= expected_val
:
357 raise Exception("Unexpected Get(ProcessCredentials) result({}): {}".format(i
, val
))
359 class TestDbusGetSet(TestDbus
):
360 def __init__(self
, bus
):
361 TestDbus
.__init
__(self
, bus
)
362 self
.signal_received
= False
363 self
.signal_received_deprecated
= False
364 self
.sets_done
= False
367 gobject
.timeout_add(1, self
.run_sets
)
368 gobject
.timeout_add(1000, self
.timeout
)
369 self
.add_signal(self
.propertiesChanged
, WPAS_DBUS_IFACE_WPS
,
371 self
.add_signal(self
.propertiesChanged2
, dbus
.PROPERTIES_IFACE
,
376 def propertiesChanged(self
, properties
):
377 logger
.debug("PropertiesChanged: " + str(properties
))
378 if properties
.has_key("ProcessCredentials"):
379 self
.signal_received_deprecated
= True
380 if self
.sets_done
and self
.signal_received
:
383 def propertiesChanged2(self
, interface_name
, changed_properties
,
384 invalidated_properties
):
385 logger
.debug("propertiesChanged2: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name
, str(changed_properties
), str(invalidated_properties
)))
386 if interface_name
!= WPAS_DBUS_IFACE_WPS
:
388 if changed_properties
.has_key("ProcessCredentials"):
389 self
.signal_received
= True
390 if self
.sets_done
and self
.signal_received_deprecated
:
393 def run_sets(self
, *args
):
394 logger
.debug("run_sets")
395 if_obj
.Set(WPAS_DBUS_IFACE_WPS
, "ProcessCredentials",
397 dbus_interface
=dbus
.PROPERTIES_IFACE
)
398 if if_obj
.Get(WPAS_DBUS_IFACE_WPS
, "ProcessCredentials",
399 dbus_interface
=dbus
.PROPERTIES_IFACE
) != True:
400 raise Exception("Unexpected Get(ProcessCredentials) result after Set");
401 if_obj
.Set(WPAS_DBUS_IFACE_WPS
, "ProcessCredentials",
403 dbus_interface
=dbus
.PROPERTIES_IFACE
)
404 if if_obj
.Get(WPAS_DBUS_IFACE_WPS
, "ProcessCredentials",
405 dbus_interface
=dbus
.PROPERTIES_IFACE
) != False:
406 raise Exception("Unexpected Get(ProcessCredentials) result after Set");
408 self
.dbus_sets_done
= True
412 return self
.signal_received
and self
.signal_received_deprecated
414 with
TestDbusGetSet(bus
) as t
:
416 raise Exception("No signal received for ProcessCredentials change")
418 def test_dbus_wps_invalid(dev
, apdev
):
419 """D-Bus invaldi WPS operation"""
420 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
421 wps
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_WPS
)
423 failures
= [ {'Role': 'foo', 'Type': 'pbc'},
424 {'Role': 123, 'Type': 'pbc'},
426 {'Role': 'enrollee'},
427 {'Role': 'registrar'},
428 {'Role': 'enrollee', 'Type': 123},
429 {'Role': 'enrollee', 'Type': 'foo'},
430 {'Role': 'enrollee', 'Type': 'pbc',
431 'Bssid': '02:33:44:55:66:77'},
432 {'Role': 'enrollee', 'Type': 'pin', 'Pin': 123},
433 {'Role': 'enrollee', 'Type': 'pbc',
434 'Bssid': dbus
.ByteArray('12345')},
435 {'Role': 'enrollee', 'Type': 'pbc',
436 'P2PDeviceAddress': 12345},
437 {'Role': 'enrollee', 'Type': 'pbc',
438 'P2PDeviceAddress': dbus
.ByteArray('12345')},
439 {'Role': 'enrollee', 'Type': 'pbc', 'Foo': 'bar'} ]
440 for args
in failures
:
443 raise Exception("Invalid WPS.Start() arguments accepted: " + str(args
))
444 except dbus
.exceptions
.DBusException
, e
:
445 if not str(e
).startswith("fi.w1.wpa_supplicant1.InvalidArgs"):
446 raise Exception("Unexpected error message: " + str(e
))
448 def test_dbus_wps_oom(dev
, apdev
):
449 """D-Bus WPS operation (OOM)"""
450 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
451 wps
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_WPS
)
453 with
alloc_fail_dbus(dev
[0], 1, "=wpas_dbus_getter_state", "Get"):
454 if_obj
.Get(WPAS_DBUS_IFACE
, "State",
455 dbus_interface
=dbus
.PROPERTIES_IFACE
)
457 hapd
= hostapd
.add_ap(apdev
[0]['ifname'], { "ssid": "open" })
458 bssid
= apdev
[0]['bssid']
459 dev
[0].scan_for_bss(bssid
, freq
=2412)
461 for i
in range(1, 3):
462 with
alloc_fail_dbus(dev
[0], i
, "=wpas_dbus_getter_bsss", "Get"):
463 if_obj
.Get(WPAS_DBUS_IFACE
, "BSSs",
464 dbus_interface
=dbus
.PROPERTIES_IFACE
)
466 res
= if_obj
.Get(WPAS_DBUS_IFACE
, 'BSSs',
467 dbus_interface
=dbus
.PROPERTIES_IFACE
)
468 bss_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, res
[0])
469 with
alloc_fail_dbus(dev
[0], 1, "=wpas_dbus_getter_bss_rates", "Get"):
470 bss_obj
.Get(WPAS_DBUS_BSS
, "Rates",
471 dbus_interface
=dbus
.PROPERTIES_IFACE
)
473 id = dev
[0].add_network()
474 dev
[0].set_network(id, "disabled", "0")
475 dev
[0].set_network_quoted(id, "ssid", "test")
477 for i
in range(1, 3):
478 with
alloc_fail_dbus(dev
[0], i
, "=wpas_dbus_getter_networks", "Get"):
479 if_obj
.Get(WPAS_DBUS_IFACE
, "Networks",
480 dbus_interface
=dbus
.PROPERTIES_IFACE
)
482 with
alloc_fail_dbus(dev
[0], 1, "wpas_dbus_getter_interfaces", "Get"):
483 dbus_get(dbus
, wpas_obj
, "Interfaces")
485 for i
in range(1, 6):
486 with
alloc_fail_dbus(dev
[0], i
, "=eap_get_names_as_string_array;wpas_dbus_getter_eap_methods", "Get"):
487 dbus_get(dbus
, wpas_obj
, "EapMethods")
489 with
alloc_fail_dbus(dev
[0], 1, "wpas_dbus_setter_config_methods", "Set",
490 expected
="Error.Failed: Failed to set property"):
491 val2
= "push_button display"
492 if_obj
.Set(WPAS_DBUS_IFACE_WPS
, "ConfigMethods", val2
,
493 dbus_interface
=dbus
.PROPERTIES_IFACE
)
495 with
alloc_fail_dbus(dev
[0], 1, "=wpa_config_add_network;wpas_dbus_handler_wps_start",
497 expected
="UnknownError: WPS start failed"):
498 wps
.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670'})
500 def test_dbus_wps_pbc(dev
, apdev
):
501 """D-Bus WPS/PBC operation and signals"""
503 _test_dbus_wps_pbc(dev
, apdev
)
505 dev
[0].request("SET wps_cred_processing 0")
507 def _test_dbus_wps_pbc(dev
, apdev
):
508 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
509 wps
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_WPS
)
511 hapd
= start_ap(apdev
[0])
512 hapd
.request("WPS_PBC")
513 bssid
= apdev
[0]['bssid']
514 dev
[0].scan_for_bss(bssid
, freq
="2412")
515 dev
[0].request("SET wps_cred_processing 2")
517 res
= if_obj
.Get(WPAS_DBUS_IFACE
, 'BSSs',
518 dbus_interface
=dbus
.PROPERTIES_IFACE
)
520 raise Exception("Missing BSSs entry: " + str(res
))
521 bss_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, res
[0])
522 props
= bss_obj
.GetAll(WPAS_DBUS_BSS
, dbus_interface
=dbus
.PROPERTIES_IFACE
)
523 logger
.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS
, res
[0], str(props
)))
524 if 'WPS' not in props
:
525 raise Exception("No WPS information in the BSS entry")
526 if 'Type' not in props
['WPS']:
527 raise Exception("No Type field in the WPS dictionary")
528 if props
['WPS']['Type'] != 'pbc':
529 raise Exception("Unexpected WPS Type: " + props
['WPS']['Type'])
531 class TestDbusWps(TestDbus
):
532 def __init__(self
, bus
, wps
):
533 TestDbus
.__init
__(self
, bus
)
534 self
.success_seen
= False
535 self
.credentials_received
= False
539 gobject
.timeout_add(1, self
.start_pbc
)
540 gobject
.timeout_add(15000, self
.timeout
)
541 self
.add_signal(self
.wpsEvent
, WPAS_DBUS_IFACE_WPS
, "Event")
542 self
.add_signal(self
.credentials
, WPAS_DBUS_IFACE_WPS
,
547 def wpsEvent(self
, name
, args
):
548 logger
.debug("wpsEvent: %s args='%s'" % (name
, str(args
)))
549 if name
== "success":
550 self
.success_seen
= True
551 if self
.credentials_received
:
554 def credentials(self
, args
):
555 logger
.debug("credentials: " + str(args
))
556 self
.credentials_received
= True
557 if self
.success_seen
:
560 def start_pbc(self
, *args
):
561 logger
.debug("start_pbc")
562 self
.wps
.Start({'Role': 'enrollee', 'Type': 'pbc'})
566 return self
.success_seen
and self
.credentials_received
568 with
TestDbusWps(bus
, wps
) as t
:
570 raise Exception("Failure in D-Bus operations")
572 dev
[0].wait_connected(timeout
=10)
573 dev
[0].request("DISCONNECT")
575 dev
[0].flush_scan_cache()
577 def test_dbus_wps_pbc_overlap(dev
, apdev
):
578 """D-Bus WPS/PBC operation and signal for PBC overlap"""
579 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
580 wps
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_WPS
)
582 hapd
= start_ap(apdev
[0])
583 hapd2
= start_ap(apdev
[1], ssid
="test-wps2",
584 ap_uuid
="27ea801a-9e5c-4e73-bd82-f89cbcd10d7f")
585 hapd
.request("WPS_PBC")
586 hapd2
.request("WPS_PBC")
587 bssid
= apdev
[0]['bssid']
588 dev
[0].scan_for_bss(bssid
, freq
="2412")
589 bssid2
= apdev
[1]['bssid']
590 dev
[0].scan_for_bss(bssid2
, freq
="2412")
592 class TestDbusWps(TestDbus
):
593 def __init__(self
, bus
, wps
):
594 TestDbus
.__init
__(self
, bus
)
595 self
.overlap_seen
= False
599 gobject
.timeout_add(1, self
.start_pbc
)
600 gobject
.timeout_add(15000, self
.timeout
)
601 self
.add_signal(self
.wpsEvent
, WPAS_DBUS_IFACE_WPS
, "Event")
605 def wpsEvent(self
, name
, args
):
606 logger
.debug("wpsEvent: %s args='%s'" % (name
, str(args
)))
607 if name
== "pbc-overlap":
608 self
.overlap_seen
= True
611 def start_pbc(self
, *args
):
612 logger
.debug("start_pbc")
613 self
.wps
.Start({'Role': 'enrollee', 'Type': 'pbc'})
617 return self
.overlap_seen
619 with
TestDbusWps(bus
, wps
) as t
:
621 raise Exception("Failure in D-Bus operations")
623 dev
[0].request("WPS_CANCEL")
624 dev
[0].request("DISCONNECT")
626 dev
[0].flush_scan_cache()
628 def test_dbus_wps_pin(dev
, apdev
):
629 """D-Bus WPS/PIN operation and signals"""
631 _test_dbus_wps_pin(dev
, apdev
)
633 dev
[0].request("SET wps_cred_processing 0")
635 def _test_dbus_wps_pin(dev
, apdev
):
636 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
637 wps
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_WPS
)
639 hapd
= start_ap(apdev
[0])
640 hapd
.request("WPS_PIN any 12345670")
641 bssid
= apdev
[0]['bssid']
642 dev
[0].scan_for_bss(bssid
, freq
="2412")
643 dev
[0].request("SET wps_cred_processing 2")
645 class TestDbusWps(TestDbus
):
646 def __init__(self
, bus
):
647 TestDbus
.__init
__(self
, bus
)
648 self
.success_seen
= False
649 self
.credentials_received
= False
652 gobject
.timeout_add(1, self
.start_pin
)
653 gobject
.timeout_add(15000, self
.timeout
)
654 self
.add_signal(self
.wpsEvent
, WPAS_DBUS_IFACE_WPS
, "Event")
655 self
.add_signal(self
.credentials
, WPAS_DBUS_IFACE_WPS
,
660 def wpsEvent(self
, name
, args
):
661 logger
.debug("wpsEvent: %s args='%s'" % (name
, str(args
)))
662 if name
== "success":
663 self
.success_seen
= True
664 if self
.credentials_received
:
667 def credentials(self
, args
):
668 logger
.debug("credentials: " + str(args
))
669 self
.credentials_received
= True
670 if self
.success_seen
:
673 def start_pin(self
, *args
):
674 logger
.debug("start_pin")
675 bssid_ay
= dbus
.ByteArray(bssid
.replace(':','').decode('hex'))
676 wps
.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
681 return self
.success_seen
and self
.credentials_received
683 with
TestDbusWps(bus
) as t
:
685 raise Exception("Failure in D-Bus operations")
687 dev
[0].wait_connected(timeout
=10)
689 def test_dbus_wps_pin2(dev
, apdev
):
690 """D-Bus WPS/PIN operation and signals (PIN from wpa_supplicant)"""
692 _test_dbus_wps_pin2(dev
, apdev
)
694 dev
[0].request("SET wps_cred_processing 0")
696 def _test_dbus_wps_pin2(dev
, apdev
):
697 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
698 wps
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_WPS
)
700 hapd
= start_ap(apdev
[0])
701 bssid
= apdev
[0]['bssid']
702 dev
[0].scan_for_bss(bssid
, freq
="2412")
703 dev
[0].request("SET wps_cred_processing 2")
705 class TestDbusWps(TestDbus
):
706 def __init__(self
, bus
):
707 TestDbus
.__init
__(self
, bus
)
708 self
.success_seen
= False
712 gobject
.timeout_add(1, self
.start_pin
)
713 gobject
.timeout_add(15000, self
.timeout
)
714 self
.add_signal(self
.wpsEvent
, WPAS_DBUS_IFACE_WPS
, "Event")
715 self
.add_signal(self
.credentials
, WPAS_DBUS_IFACE_WPS
,
720 def wpsEvent(self
, name
, args
):
721 logger
.debug("wpsEvent: %s args='%s'" % (name
, str(args
)))
722 if name
== "success":
723 self
.success_seen
= True
724 if self
.credentials_received
:
727 def credentials(self
, args
):
728 logger
.debug("credentials: " + str(args
))
729 self
.credentials_received
= True
730 if self
.success_seen
:
733 def start_pin(self
, *args
):
734 logger
.debug("start_pin")
735 bssid_ay
= dbus
.ByteArray(bssid
.replace(':','').decode('hex'))
736 res
= wps
.Start({'Role': 'enrollee', 'Type': 'pin',
739 h
= hostapd
.Hostapd(apdev
[0]['ifname'])
740 h
.request("WPS_PIN any " + pin
)
744 return self
.success_seen
and self
.credentials_received
746 with
TestDbusWps(bus
) as t
:
748 raise Exception("Failure in D-Bus operations")
750 dev
[0].wait_connected(timeout
=10)
752 def test_dbus_wps_pin_m2d(dev
, apdev
):
753 """D-Bus WPS/PIN operation and signals with M2D"""
755 _test_dbus_wps_pin_m2d(dev
, apdev
)
757 dev
[0].request("SET wps_cred_processing 0")
759 def _test_dbus_wps_pin_m2d(dev
, apdev
):
760 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
761 wps
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_WPS
)
763 hapd
= start_ap(apdev
[0])
764 bssid
= apdev
[0]['bssid']
765 dev
[0].scan_for_bss(bssid
, freq
="2412")
766 dev
[0].request("SET wps_cred_processing 2")
768 class TestDbusWps(TestDbus
):
769 def __init__(self
, bus
):
770 TestDbus
.__init
__(self
, bus
)
771 self
.success_seen
= False
772 self
.credentials_received
= False
775 gobject
.timeout_add(1, self
.start_pin
)
776 gobject
.timeout_add(15000, self
.timeout
)
777 self
.add_signal(self
.wpsEvent
, WPAS_DBUS_IFACE_WPS
, "Event")
778 self
.add_signal(self
.credentials
, WPAS_DBUS_IFACE_WPS
,
783 def wpsEvent(self
, name
, args
):
784 logger
.debug("wpsEvent: %s args='%s'" % (name
, str(args
)))
785 if name
== "success":
786 self
.success_seen
= True
787 if self
.credentials_received
:
790 h
= hostapd
.Hostapd(apdev
[0]['ifname'])
791 h
.request("WPS_PIN any 12345670")
793 def credentials(self
, args
):
794 logger
.debug("credentials: " + str(args
))
795 self
.credentials_received
= True
796 if self
.success_seen
:
799 def start_pin(self
, *args
):
800 logger
.debug("start_pin")
801 bssid_ay
= dbus
.ByteArray(bssid
.replace(':','').decode('hex'))
802 wps
.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
807 return self
.success_seen
and self
.credentials_received
809 with
TestDbusWps(bus
) as t
:
811 raise Exception("Failure in D-Bus operations")
813 dev
[0].wait_connected(timeout
=10)
815 def test_dbus_wps_reg(dev
, apdev
):
816 """D-Bus WPS/Registrar operation and signals"""
818 _test_dbus_wps_reg(dev
, apdev
)
820 dev
[0].request("SET wps_cred_processing 0")
822 def _test_dbus_wps_reg(dev
, apdev
):
823 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
824 wps
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_WPS
)
826 hapd
= start_ap(apdev
[0])
827 hapd
.request("WPS_PIN any 12345670")
828 bssid
= apdev
[0]['bssid']
829 dev
[0].scan_for_bss(bssid
, freq
="2412")
830 dev
[0].request("SET wps_cred_processing 2")
832 class TestDbusWps(TestDbus
):
833 def __init__(self
, bus
):
834 TestDbus
.__init
__(self
, bus
)
835 self
.credentials_received
= False
838 gobject
.timeout_add(100, self
.start_reg
)
839 gobject
.timeout_add(15000, self
.timeout
)
840 self
.add_signal(self
.wpsEvent
, WPAS_DBUS_IFACE_WPS
, "Event")
841 self
.add_signal(self
.credentials
, WPAS_DBUS_IFACE_WPS
,
846 def wpsEvent(self
, name
, args
):
847 logger
.debug("wpsEvent: %s args='%s'" % (name
, str(args
)))
849 def credentials(self
, args
):
850 logger
.debug("credentials: " + str(args
))
851 self
.credentials_received
= True
854 def start_reg(self
, *args
):
855 logger
.debug("start_reg")
856 bssid_ay
= dbus
.ByteArray(bssid
.replace(':','').decode('hex'))
857 wps
.Start({'Role': 'registrar', 'Type': 'pin',
858 'Pin': '12345670', 'Bssid': bssid_ay
})
862 return self
.credentials_received
864 with
TestDbusWps(bus
) as t
:
866 raise Exception("Failure in D-Bus operations")
868 dev
[0].wait_connected(timeout
=10)
870 def test_dbus_wps_cancel(dev
, apdev
):
871 """D-Bus WPS Cancel operation"""
872 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
873 wps
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_WPS
)
875 hapd
= start_ap(apdev
[0])
876 bssid
= apdev
[0]['bssid']
879 dev
[0].scan_for_bss(bssid
, freq
="2412")
880 bssid_ay
= dbus
.ByteArray(bssid
.replace(':','').decode('hex'))
881 wps
.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
884 dev
[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 1)
886 def test_dbus_scan_invalid(dev
, apdev
):
887 """D-Bus invalid scan method"""
888 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
889 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
891 tests
= [ ({}, "InvalidArgs"),
892 ({'Type': 123}, "InvalidArgs"),
893 ({'Type': 'foo'}, "InvalidArgs"),
894 ({'Type': 'active', 'Foo': 'bar'}, "InvalidArgs"),
895 ({'Type': 'active', 'SSIDs': 'foo'}, "InvalidArgs"),
896 ({'Type': 'active', 'SSIDs': ['foo']}, "InvalidArgs"),
898 'SSIDs': [ dbus
.ByteArray("1"), dbus
.ByteArray("2"),
899 dbus
.ByteArray("3"), dbus
.ByteArray("4"),
900 dbus
.ByteArray("5"), dbus
.ByteArray("6"),
901 dbus
.ByteArray("7"), dbus
.ByteArray("8"),
902 dbus
.ByteArray("9"), dbus
.ByteArray("10"),
903 dbus
.ByteArray("11"), dbus
.ByteArray("12"),
904 dbus
.ByteArray("13"), dbus
.ByteArray("14"),
905 dbus
.ByteArray("15"), dbus
.ByteArray("16"),
906 dbus
.ByteArray("17") ]},
909 'SSIDs': [ dbus
.ByteArray("1234567890abcdef1234567890abcdef1") ]},
911 ({'Type': 'active', 'IEs': 'foo'}, "InvalidArgs"),
912 ({'Type': 'active', 'IEs': ['foo']}, "InvalidArgs"),
913 ({'Type': 'active', 'Channels': 2412 }, "InvalidArgs"),
914 ({'Type': 'active', 'Channels': [ 2412 ] }, "InvalidArgs"),
916 'Channels': [ (dbus
.Int32(2412), dbus
.UInt32(20)) ] },
919 'Channels': [ (dbus
.UInt32(2412), dbus
.Int32(20)) ] },
921 ({'Type': 'active', 'AllowRoam': "yes" }, "InvalidArgs"),
922 ({'Type': 'passive', 'IEs': [ dbus
.ByteArray("\xdd\x00") ]},
924 ({'Type': 'passive', 'SSIDs': [ dbus
.ByteArray("foo") ]},
926 for (t
,err
) in tests
:
929 raise Exception("Invalid Scan() arguments accepted: " + str(t
))
930 except dbus
.exceptions
.DBusException
, e
:
931 if err
not in str(e
):
932 raise Exception("Unexpected error message for invalid Scan(%s): %s" % (str(t
), str(e
)))
934 def test_dbus_scan_oom(dev
, apdev
):
935 """D-Bus scan method and OOM"""
936 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
937 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
939 with
alloc_fail_dbus(dev
[0], 1,
940 "wpa_scan_clone_params;wpas_dbus_handler_scan",
941 "Scan", expected
="ScanError: Scan request rejected"):
942 iface
.Scan({ 'Type': 'passive',
943 'Channels': [ (dbus
.UInt32(2412), dbus
.UInt32(20)) ] })
945 with
alloc_fail_dbus(dev
[0], 1,
946 "=wpas_dbus_get_scan_channels;wpas_dbus_handler_scan",
948 iface
.Scan({ 'Type': 'passive',
949 'Channels': [ (dbus
.UInt32(2412), dbus
.UInt32(20)) ] })
951 with
alloc_fail_dbus(dev
[0], 1,
952 "=wpas_dbus_get_scan_ies;wpas_dbus_handler_scan",
954 iface
.Scan({ 'Type': 'active',
955 'IEs': [ dbus
.ByteArray("\xdd\x00") ],
956 'Channels': [ (dbus
.UInt32(2412), dbus
.UInt32(20)) ] })
958 with
alloc_fail_dbus(dev
[0], 1,
959 "=wpas_dbus_get_scan_ssids;wpas_dbus_handler_scan",
961 iface
.Scan({ 'Type': 'active',
962 'SSIDs': [ dbus
.ByteArray("open"),
964 'Channels': [ (dbus
.UInt32(2412), dbus
.UInt32(20)) ] })
966 def test_dbus_scan(dev
, apdev
):
967 """D-Bus scan and related signals"""
968 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
969 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
971 hapd
= hostapd
.add_ap(apdev
[0]['ifname'], { "ssid": "open" })
973 class TestDbusScan(TestDbus
):
974 def __init__(self
, bus
):
975 TestDbus
.__init
__(self
, bus
)
976 self
.scan_completed
= 0
977 self
.bss_added
= False
978 self
.fail_reason
= None
981 gobject
.timeout_add(1, self
.run_scan
)
982 gobject
.timeout_add(15000, self
.timeout
)
983 self
.add_signal(self
.scanDone
, WPAS_DBUS_IFACE
, "ScanDone")
984 self
.add_signal(self
.bssAdded
, WPAS_DBUS_IFACE
, "BSSAdded")
985 self
.add_signal(self
.bssRemoved
, WPAS_DBUS_IFACE
, "BSSRemoved")
989 def scanDone(self
, success
):
990 logger
.debug("scanDone: success=%s" % success
)
991 self
.scan_completed
+= 1
992 if self
.scan_completed
== 1:
993 iface
.Scan({'Type': 'passive',
995 'Channels': [(dbus
.UInt32(2412), dbus
.UInt32(20))]})
996 elif self
.scan_completed
== 2:
997 iface
.Scan({'Type': 'passive',
999 elif self
.bss_added
and self
.scan_completed
== 3:
1002 def bssAdded(self
, bss
, properties
):
1003 logger
.debug("bssAdded: %s" % bss
)
1004 logger
.debug(str(properties
))
1005 if 'WPS' in properties
:
1006 if 'Type' in properties
['WPS']:
1007 self
.fail_reason
= "Unexpected WPS dictionary entry in non-WPS BSS"
1009 self
.bss_added
= True
1010 if self
.scan_completed
== 3:
1013 def bssRemoved(self
, bss
):
1014 logger
.debug("bssRemoved: %s" % bss
)
1016 def run_scan(self
, *args
):
1017 logger
.debug("run_scan")
1018 iface
.Scan({'Type': 'active',
1019 'SSIDs': [ dbus
.ByteArray("open"),
1021 'IEs': [ dbus
.ByteArray("\xdd\x00"),
1024 'Channels': [(dbus
.UInt32(2412), dbus
.UInt32(20))]})
1028 return self
.scan_completed
== 3 and self
.bss_added
1030 with
TestDbusScan(bus
) as t
:
1032 raise Exception(t
.fail_reason
)
1034 raise Exception("Expected signals not seen")
1036 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "BSSs",
1037 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1039 raise Exception("Scan result not in BSSs property")
1041 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "BSSs",
1042 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1044 raise Exception("FlushBSS() did not remove scan results from BSSs property")
1047 def test_dbus_scan_busy(dev
, apdev
):
1048 """D-Bus scan trigger rejection when busy with previous scan"""
1049 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1050 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1052 if "OK" not in dev
[0].request("SCAN freq=2412-2462"):
1053 raise Exception("Failed to start scan")
1054 ev
= dev
[0].wait_event(["CTRL-EVENT-SCAN-STARTED"], 15)
1056 raise Exception("Scan start timed out")
1059 iface
.Scan({'Type': 'active', 'AllowRoam': False})
1060 raise Exception("Scan() accepted when busy")
1061 except dbus
.exceptions
.DBusException
, e
:
1062 if "ScanError: Scan request reject" not in str(e
):
1063 raise Exception("Unexpected error message: " + str(e
))
1065 ev
= dev
[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
1067 raise Exception("Scan timed out")
1069 def test_dbus_connect(dev
, apdev
):
1070 """D-Bus AddNetwork and connect"""
1071 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1072 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1074 ssid
= "test-wpa2-psk"
1075 passphrase
= 'qwertyuiop'
1076 params
= hostapd
.wpa2_params(ssid
=ssid
, passphrase
=passphrase
)
1077 hapd
= hostapd
.add_ap(apdev
[0]['ifname'], params
)
1079 class TestDbusConnect(TestDbus
):
1080 def __init__(self
, bus
):
1081 TestDbus
.__init
__(self
, bus
)
1082 self
.network_added
= False
1083 self
.network_selected
= False
1084 self
.network_removed
= False
1087 def __enter__(self
):
1088 gobject
.timeout_add(1, self
.run_connect
)
1089 gobject
.timeout_add(15000, self
.timeout
)
1090 self
.add_signal(self
.networkAdded
, WPAS_DBUS_IFACE
, "NetworkAdded")
1091 self
.add_signal(self
.networkRemoved
, WPAS_DBUS_IFACE
,
1093 self
.add_signal(self
.networkSelected
, WPAS_DBUS_IFACE
,
1095 self
.add_signal(self
.propertiesChanged
, WPAS_DBUS_IFACE
,
1096 "PropertiesChanged")
1100 def networkAdded(self
, network
, properties
):
1101 logger
.debug("networkAdded: %s" % str(network
))
1102 logger
.debug(str(properties
))
1103 self
.network_added
= True
1105 def networkRemoved(self
, network
):
1106 logger
.debug("networkRemoved: %s" % str(network
))
1107 self
.network_removed
= True
1109 def networkSelected(self
, network
):
1110 logger
.debug("networkSelected: %s" % str(network
))
1111 self
.network_selected
= True
1113 def propertiesChanged(self
, properties
):
1114 logger
.debug("propertiesChanged: %s" % str(properties
))
1115 if 'State' in properties
and properties
['State'] == "completed":
1119 elif self
.state
== 2:
1122 elif self
.state
== 4:
1125 elif self
.state
== 5:
1128 elif self
.state
== 7:
1130 res
= iface
.SignalPoll()
1131 logger
.debug("SignalPoll: " + str(res
))
1132 if 'frequency' not in res
or res
['frequency'] != 2412:
1134 logger
.info("Unexpected SignalPoll result")
1135 iface
.RemoveNetwork(self
.netw
)
1136 if 'State' in properties
and properties
['State'] == "disconnected":
1139 iface
.SelectNetwork(self
.netw
)
1140 elif self
.state
== 3:
1143 elif self
.state
== 6:
1146 elif self
.state
== 8:
1150 def run_connect(self
, *args
):
1151 logger
.debug("run_connect")
1152 args
= dbus
.Dictionary({ 'ssid': ssid
,
1153 'key_mgmt': 'WPA-PSK',
1155 'scan_freq': 2412 },
1157 self
.netw
= iface
.AddNetwork(args
)
1158 iface
.SelectNetwork(self
.netw
)
1162 if not self
.network_added
or \
1163 not self
.network_removed
or \
1164 not self
.network_selected
:
1166 return self
.state
== 9
1168 with
TestDbusConnect(bus
) as t
:
1170 raise Exception("Expected signals not seen")
1172 def test_dbus_connect_psk_mem(dev
, apdev
):
1173 """D-Bus AddNetwork and connect with memory-only PSK"""
1174 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1175 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1177 ssid
= "test-wpa2-psk"
1178 passphrase
= 'qwertyuiop'
1179 params
= hostapd
.wpa2_params(ssid
=ssid
, passphrase
=passphrase
)
1180 hapd
= hostapd
.add_ap(apdev
[0]['ifname'], params
)
1182 class TestDbusConnect(TestDbus
):
1183 def __init__(self
, bus
):
1184 TestDbus
.__init
__(self
, bus
)
1185 self
.connected
= False
1187 def __enter__(self
):
1188 gobject
.timeout_add(1, self
.run_connect
)
1189 gobject
.timeout_add(15000, self
.timeout
)
1190 self
.add_signal(self
.propertiesChanged
, WPAS_DBUS_IFACE
,
1191 "PropertiesChanged")
1192 self
.add_signal(self
.networkRequest
, WPAS_DBUS_IFACE
,
1197 def propertiesChanged(self
, properties
):
1198 logger
.debug("propertiesChanged: %s" % str(properties
))
1199 if 'State' in properties
and properties
['State'] == "completed":
1200 self
.connected
= True
1203 def networkRequest(self
, path
, field
, txt
):
1204 logger
.debug("networkRequest: %s %s %s" % (path
, field
, txt
))
1205 if field
== "PSK_PASSPHRASE":
1206 iface
.NetworkReply(path
, field
, '"' + passphrase
+ '"')
1208 def run_connect(self
, *args
):
1209 logger
.debug("run_connect")
1210 args
= dbus
.Dictionary({ 'ssid': ssid
,
1211 'key_mgmt': 'WPA-PSK',
1213 'scan_freq': 2412 },
1215 self
.netw
= iface
.AddNetwork(args
)
1216 iface
.SelectNetwork(self
.netw
)
1220 return self
.connected
1222 with
TestDbusConnect(bus
) as t
:
1224 raise Exception("Expected signals not seen")
1226 def test_dbus_connect_oom(dev
, apdev
):
1227 """D-Bus AddNetwork and connect when out-of-memory"""
1228 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1229 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1231 if "OK" not in dev
[0].request("TEST_ALLOC_FAIL 0:"):
1232 raise HwsimSkip("TEST_ALLOC_FAIL not supported in the build")
1234 ssid
= "test-wpa2-psk"
1235 passphrase
= 'qwertyuiop'
1236 params
= hostapd
.wpa2_params(ssid
=ssid
, passphrase
=passphrase
)
1237 hapd
= hostapd
.add_ap(apdev
[0]['ifname'], params
)
1239 class TestDbusConnect(TestDbus
):
1240 def __init__(self
, bus
):
1241 TestDbus
.__init
__(self
, bus
)
1242 self
.network_added
= False
1243 self
.network_selected
= False
1244 self
.network_removed
= False
1247 def __enter__(self
):
1248 gobject
.timeout_add(1, self
.run_connect
)
1249 gobject
.timeout_add(1500, self
.timeout
)
1250 self
.add_signal(self
.networkAdded
, WPAS_DBUS_IFACE
, "NetworkAdded")
1251 self
.add_signal(self
.networkRemoved
, WPAS_DBUS_IFACE
,
1253 self
.add_signal(self
.networkSelected
, WPAS_DBUS_IFACE
,
1255 self
.add_signal(self
.propertiesChanged
, WPAS_DBUS_IFACE
,
1256 "PropertiesChanged")
1260 def networkAdded(self
, network
, properties
):
1261 logger
.debug("networkAdded: %s" % str(network
))
1262 logger
.debug(str(properties
))
1263 self
.network_added
= True
1265 def networkRemoved(self
, network
):
1266 logger
.debug("networkRemoved: %s" % str(network
))
1267 self
.network_removed
= True
1269 def networkSelected(self
, network
):
1270 logger
.debug("networkSelected: %s" % str(network
))
1271 self
.network_selected
= True
1273 def propertiesChanged(self
, properties
):
1274 logger
.debug("propertiesChanged: %s" % str(properties
))
1275 if 'State' in properties
and properties
['State'] == "completed":
1279 elif self
.state
== 2:
1282 elif self
.state
== 4:
1285 elif self
.state
== 5:
1287 res
= iface
.SignalPoll()
1288 logger
.debug("SignalPoll: " + str(res
))
1289 if 'frequency' not in res
or res
['frequency'] != 2412:
1291 logger
.info("Unexpected SignalPoll result")
1292 iface
.RemoveNetwork(self
.netw
)
1293 if 'State' in properties
and properties
['State'] == "disconnected":
1296 iface
.SelectNetwork(self
.netw
)
1297 elif self
.state
== 3:
1300 elif self
.state
== 6:
1304 def run_connect(self
, *args
):
1305 logger
.debug("run_connect")
1306 args
= dbus
.Dictionary({ 'ssid': ssid
,
1307 'key_mgmt': 'WPA-PSK',
1309 'scan_freq': 2412 },
1312 self
.netw
= iface
.AddNetwork(args
)
1313 except Exception, e
:
1314 logger
.info("Exception on AddNetwork: " + str(e
))
1318 iface
.SelectNetwork(self
.netw
)
1319 except Exception, e
:
1320 logger
.info("Exception on SelectNetwork: " + str(e
))
1326 if not self
.network_added
or \
1327 not self
.network_removed
or \
1328 not self
.network_selected
:
1330 return self
.state
== 7
1333 for i
in range(1, 1000):
1335 dev
[j
].dump_monitor()
1336 dev
[0].request("TEST_ALLOC_FAIL %d:main" % i
)
1338 with
TestDbusConnect(bus
) as t
:
1340 logger
.info("Iteration %d - Expected signals not seen" % i
)
1342 logger
.info("Iteration %d - success" % i
)
1344 state
= dev
[0].request('GET_ALLOC_FAIL')
1345 logger
.info("GET_ALLOC_FAIL: " + state
)
1346 dev
[0].dump_monitor()
1347 dev
[0].request("TEST_ALLOC_FAIL 0:")
1349 raise Exception("Connection succeeded during out-of-memory")
1350 if not state
.startswith('0:'):
1357 def test_dbus_while_not_connected(dev
, apdev
):
1358 """D-Bus invalid operations while not connected"""
1359 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1360 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1364 raise Exception("Disconnect() accepted when not connected")
1365 except dbus
.exceptions
.DBusException
, e
:
1366 if "NotConnected" not in str(e
):
1367 raise Exception("Unexpected error message for invalid Disconnect: " + str(e
))
1371 raise Exception("Reattach() accepted when not connected")
1372 except dbus
.exceptions
.DBusException
, e
:
1373 if "NotConnected" not in str(e
):
1374 raise Exception("Unexpected error message for invalid Reattach: " + str(e
))
1376 def test_dbus_connect_eap(dev
, apdev
):
1377 """D-Bus AddNetwork and connect to EAP network"""
1378 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1379 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1381 ssid
= "ieee8021x-open"
1382 params
= hostapd
.radius_params()
1383 params
["ssid"] = ssid
1384 params
["ieee8021x"] = "1"
1385 hapd
= hostapd
.add_ap(apdev
[0]['ifname'], params
)
1387 class TestDbusConnect(TestDbus
):
1388 def __init__(self
, bus
):
1389 TestDbus
.__init
__(self
, bus
)
1390 self
.certification_received
= False
1391 self
.eap_status
= False
1394 def __enter__(self
):
1395 gobject
.timeout_add(1, self
.run_connect
)
1396 gobject
.timeout_add(15000, self
.timeout
)
1397 self
.add_signal(self
.propertiesChanged
, WPAS_DBUS_IFACE
,
1398 "PropertiesChanged")
1399 self
.add_signal(self
.certification
, WPAS_DBUS_IFACE
,
1400 "Certification", byte_arrays
=True)
1401 self
.add_signal(self
.networkRequest
, WPAS_DBUS_IFACE
,
1403 self
.add_signal(self
.eap
, WPAS_DBUS_IFACE
, "EAP")
1407 def propertiesChanged(self
, properties
):
1408 logger
.debug("propertiesChanged: %s" % str(properties
))
1409 if 'State' in properties
and properties
['State'] == "completed":
1413 logger
.info("Set dNSName constraint")
1414 net_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, self
.netw
)
1415 args
= dbus
.Dictionary({ 'altsubject_match':
1416 self
.server_dnsname
},
1418 net_obj
.Set(WPAS_DBUS_NETWORK
, "Properties", args
,
1419 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1420 elif self
.state
== 2:
1423 logger
.info("Set non-matching dNSName constraint")
1424 net_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, self
.netw
)
1425 args
= dbus
.Dictionary({ 'altsubject_match':
1426 self
.server_dnsname
+ "FOO" },
1428 net_obj
.Set(WPAS_DBUS_NETWORK
, "Properties", args
,
1429 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1430 if 'State' in properties
and properties
['State'] == "disconnected":
1434 iface
.SelectNetwork(self
.netw
)
1437 iface
.SelectNetwork(self
.netw
)
1439 def certification(self
, args
):
1440 logger
.debug("certification: %s" % str(args
))
1441 self
.certification_received
= True
1442 if args
['depth'] == 0:
1443 # The test server certificate is supposed to have dNSName
1444 if len(args
['altsubject']) < 1:
1445 raise Exception("Missing dNSName")
1446 dnsname
= args
['altsubject'][0]
1447 if not dnsname
.startswith("DNS:"):
1448 raise Exception("Expected dNSName not found: " + dnsname
)
1449 logger
.info("altsubject: " + dnsname
)
1450 self
.server_dnsname
= dnsname
1452 def eap(self
, status
, parameter
):
1453 logger
.debug("EAP: status=%s parameter=%s" % (status
, parameter
))
1454 if status
== 'completion' and parameter
== 'success':
1455 self
.eap_status
= True
1456 if self
.state
== 4 and status
== 'remote certificate verification' and parameter
== 'AltSubject mismatch':
1460 def networkRequest(self
, path
, field
, txt
):
1461 logger
.debug("networkRequest: %s %s %s" % (path
, field
, txt
))
1462 if field
== "PASSWORD":
1463 iface
.NetworkReply(path
, field
, "password")
1465 def run_connect(self
, *args
):
1466 logger
.debug("run_connect")
1467 args
= dbus
.Dictionary({ 'ssid': ssid
,
1468 'key_mgmt': 'IEEE8021X',
1471 'anonymous_identity': 'ttls',
1472 'identity': 'pap user',
1473 'ca_cert': 'auth_serv/ca.pem',
1474 'phase2': 'auth=PAP',
1475 'scan_freq': 2412 },
1477 self
.netw
= iface
.AddNetwork(args
)
1478 iface
.SelectNetwork(self
.netw
)
1482 if not self
.eap_status
or not self
.certification_received
:
1484 return self
.state
== 5
1486 with
TestDbusConnect(bus
) as t
:
1488 raise Exception("Expected signals not seen")
1490 def test_dbus_network(dev
, apdev
):
1491 """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
1492 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1493 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1495 args
= dbus
.Dictionary({ 'ssid': "foo",
1496 'key_mgmt': 'WPA-PSK',
1498 'identity': dbus
.ByteArray([ 1, 2 ]),
1499 'priority': dbus
.Int32(0),
1500 'scan_freq': dbus
.UInt32(2412) },
1502 netw
= iface
.AddNetwork(args
)
1503 id = int(dev
[0].list_networks()[0]['id'])
1504 val
= dev
[0].get_network(id, "scan_freq")
1506 raise Exception("Invalid scan_freq value: " + str(val
))
1507 iface
.RemoveNetwork(netw
)
1509 args
= dbus
.Dictionary({ 'ssid': "foo",
1511 'scan_freq': "2412 2432",
1512 'freq_list': "2412 2417 2432" },
1514 netw
= iface
.AddNetwork(args
)
1515 id = int(dev
[0].list_networks()[0]['id'])
1516 val
= dev
[0].get_network(id, "scan_freq")
1517 if val
!= "2412 2432":
1518 raise Exception("Invalid scan_freq value (2): " + str(val
))
1519 val
= dev
[0].get_network(id, "freq_list")
1520 if val
!= "2412 2417 2432":
1521 raise Exception("Invalid freq_list value: " + str(val
))
1522 iface
.RemoveNetwork(netw
)
1524 iface
.RemoveNetwork(netw
)
1525 raise Exception("Invalid RemoveNetwork() accepted")
1526 except dbus
.exceptions
.DBusException
, e
:
1527 if "NetworkUnknown" not in str(e
):
1528 raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e
))
1530 iface
.SelectNetwork(netw
)
1531 raise Exception("Invalid SelectNetwork() accepted")
1532 except dbus
.exceptions
.DBusException
, e
:
1533 if "NetworkUnknown" not in str(e
):
1534 raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e
))
1536 args
= dbus
.Dictionary({ 'ssid': "foo1", 'key_mgmt': 'NONE',
1537 'identity': "testuser", 'scan_freq': '2412' },
1539 netw1
= iface
.AddNetwork(args
)
1540 args
= dbus
.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1542 netw2
= iface
.AddNetwork(args
)
1543 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "Networks",
1544 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1546 raise Exception("Unexpected number of networks")
1548 net_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, netw1
)
1549 res
= net_obj
.Get(WPAS_DBUS_NETWORK
, "Enabled",
1550 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1552 raise Exception("Added network was unexpectedly enabled by default")
1553 net_obj
.Set(WPAS_DBUS_NETWORK
, "Enabled", dbus
.Boolean(True),
1554 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1555 res
= net_obj
.Get(WPAS_DBUS_NETWORK
, "Enabled",
1556 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1558 raise Exception("Set(Enabled,True) did not seem to change property value")
1559 net_obj
.Set(WPAS_DBUS_NETWORK
, "Enabled", dbus
.Boolean(False),
1560 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1561 res
= net_obj
.Get(WPAS_DBUS_NETWORK
, "Enabled",
1562 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1564 raise Exception("Set(Enabled,False) did not seem to change property value")
1566 net_obj
.Set(WPAS_DBUS_NETWORK
, "Enabled", dbus
.UInt32(1),
1567 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1568 raise Exception("Invalid Set(Enabled,1) accepted")
1569 except dbus
.exceptions
.DBusException
, e
:
1570 if "Error.Failed: wrong property type" not in str(e
):
1571 raise Exception("Unexpected error message for invalid Set(Enabled,1): " + str(e
))
1573 args
= dbus
.Dictionary({ 'ssid': "foo1new" }, signature
='sv')
1574 net_obj
.Set(WPAS_DBUS_NETWORK
, "Properties", args
,
1575 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1576 res
= net_obj
.Get(WPAS_DBUS_NETWORK
, "Properties",
1577 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1578 if res
['ssid'] != '"foo1new"':
1579 raise Exception("Set(Properties) failed to update ssid")
1580 if res
['identity'] != '"testuser"':
1581 raise Exception("Set(Properties) unexpectedly changed unrelated parameter")
1583 iface
.RemoveAllNetworks()
1584 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "Networks",
1585 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1587 raise Exception("Unexpected number of networks")
1588 iface
.RemoveAllNetworks()
1590 tests
= [ dbus
.Dictionary({ 'psk': "1234567" }, signature
='sv'),
1591 dbus
.Dictionary({ 'identity': dbus
.ByteArray() },
1593 dbus
.Dictionary({ 'identity': dbus
.Byte(1) }, signature
='sv'),
1594 dbus
.Dictionary({ 'identity': "" }, signature
='sv') ]
1597 iface
.AddNetwork(args
)
1598 raise Exception("Invalid AddNetwork args accepted: " + str(args
))
1599 except dbus
.exceptions
.DBusException
, e
:
1600 if "InvalidArgs" not in str(e
):
1601 raise Exception("Unexpected error message for invalid AddNetwork: " + str(e
))
1603 def test_dbus_network_oom(dev
, apdev
):
1604 """D-Bus AddNetwork/RemoveNetwork parameters and OOM error cases"""
1605 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1606 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1608 args
= dbus
.Dictionary({ 'ssid': "foo1", 'key_mgmt': 'NONE',
1609 'identity': "testuser", 'scan_freq': '2412' },
1611 netw1
= iface
.AddNetwork(args
)
1612 net_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, netw1
)
1614 with
alloc_fail_dbus(dev
[0], 1,
1615 "wpa_config_get_all;wpas_dbus_getter_network_properties",
1617 net_obj
.Get(WPAS_DBUS_NETWORK
, "Properties",
1618 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1620 iface
.RemoveAllNetworks()
1622 with
alloc_fail_dbus(dev
[0], 1,
1623 "wpas_dbus_new_decompose_object_path;wpas_dbus_handler_remove_network",
1624 "RemoveNetwork", "InvalidArgs"):
1625 iface
.RemoveNetwork(dbus
.ObjectPath("/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234"))
1627 with
alloc_fail(dev
[0], 1, "wpa_dbus_register_object_per_iface;wpas_dbus_register_network"):
1628 args
= dbus
.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1631 netw
= iface
.AddNetwork(args
)
1632 # Currently, AddNetwork() succeeds even if os_strdup() for path
1633 # fails, so remove the network if that occurs.
1634 iface
.RemoveNetwork(netw
)
1635 except dbus
.exceptions
.DBusException
, e
:
1638 for i
in range(1, 3):
1639 with
alloc_fail(dev
[0], i
, "=wpas_dbus_register_network"):
1641 netw
= iface
.AddNetwork(args
)
1642 # Currently, AddNetwork() succeeds even if network registration
1643 # fails, so remove the network if that occurs.
1644 iface
.RemoveNetwork(netw
)
1645 except dbus
.exceptions
.DBusException
, e
:
1648 with
alloc_fail_dbus(dev
[0], 1,
1649 "=wpa_config_add_network;wpas_dbus_handler_add_network",
1651 "UnknownError: wpa_supplicant could not add a network"):
1652 args
= dbus
.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1654 netw
= iface
.AddNetwork(args
)
1657 'wpa_dbus_dict_get_entry;set_network_properties;wpas_dbus_handler_add_network',
1658 dbus
.Dictionary({ 'ssid': dbus
.ByteArray(' ') },
1660 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1661 dbus
.Dictionary({ 'ssid': 'foo' }, signature
='sv')),
1662 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1663 dbus
.Dictionary({ 'eap': 'foo' }, signature
='sv')),
1664 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1665 dbus
.Dictionary({ 'priority': dbus
.UInt32(1) },
1667 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1668 dbus
.Dictionary({ 'priority': dbus
.Int32(1) },
1670 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1671 dbus
.Dictionary({ 'ssid': dbus
.ByteArray(' ') },
1673 for (count
,funcs
,args
) in tests
:
1674 with
alloc_fail_dbus(dev
[0], count
, funcs
, "AddNetwork", "InvalidArgs"):
1675 netw
= iface
.AddNetwork(args
)
1677 if len(if_obj
.Get(WPAS_DBUS_IFACE
, 'Networks',
1678 dbus_interface
=dbus
.PROPERTIES_IFACE
)) > 0:
1679 raise Exception("Unexpected network block added")
1680 if len(dev
[0].list_networks()) > 0:
1681 raise Exception("Unexpected network block visible")
1683 def test_dbus_interface(dev
, apdev
):
1684 """D-Bus CreateInterface/GetInterface/RemoveInterface parameters and error cases"""
1685 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1686 wpas
= dbus
.Interface(wpas_obj
, WPAS_DBUS_SERVICE
)
1688 params
= dbus
.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1690 path
= wpas
.CreateInterface(params
)
1691 logger
.debug("New interface path: " + str(path
))
1692 path2
= wpas
.GetInterface("lo")
1694 raise Exception("Interface object mismatch")
1696 params
= dbus
.Dictionary({ 'Ifname': 'lo',
1698 'ConfigFile': 'foo',
1699 'BridgeIfname': 'foo', },
1702 wpas
.CreateInterface(params
)
1703 raise Exception("Invalid CreateInterface() accepted")
1704 except dbus
.exceptions
.DBusException
, e
:
1705 if "InterfaceExists" not in str(e
):
1706 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e
))
1708 wpas
.RemoveInterface(path
)
1710 wpas
.RemoveInterface(path
)
1711 raise Exception("Invalid RemoveInterface() accepted")
1712 except dbus
.exceptions
.DBusException
, e
:
1713 if "InterfaceUnknown" not in str(e
):
1714 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e
))
1716 params
= dbus
.Dictionary({ 'Ifname': 'lo', 'Driver': 'none',
1720 wpas
.CreateInterface(params
)
1721 raise Exception("Invalid CreateInterface() accepted")
1722 except dbus
.exceptions
.DBusException
, e
:
1723 if "InvalidArgs" not in str(e
):
1724 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e
))
1726 params
= dbus
.Dictionary({ 'Driver': 'none' }, signature
='sv')
1728 wpas
.CreateInterface(params
)
1729 raise Exception("Invalid CreateInterface() accepted")
1730 except dbus
.exceptions
.DBusException
, e
:
1731 if "InvalidArgs" not in str(e
):
1732 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e
))
1735 wpas
.GetInterface("lo")
1736 raise Exception("Invalid GetInterface() accepted")
1737 except dbus
.exceptions
.DBusException
, e
:
1738 if "InterfaceUnknown" not in str(e
):
1739 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e
))
1741 def test_dbus_interface_oom(dev
, apdev
):
1742 """D-Bus CreateInterface/GetInterface/RemoveInterface OOM error cases"""
1743 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1744 wpas
= dbus
.Interface(wpas_obj
, WPAS_DBUS_SERVICE
)
1746 with
alloc_fail_dbus(dev
[0], 1, "wpa_dbus_dict_get_entry;wpas_dbus_handler_create_interface", "CreateInterface", "InvalidArgs"):
1747 params
= dbus
.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1749 wpas
.CreateInterface(params
)
1751 for i
in range(1, 1000):
1752 dev
[0].request("TEST_ALLOC_FAIL %d:wpa_supplicant_add_iface;wpas_dbus_handler_create_interface" % i
)
1753 params
= dbus
.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1756 npath
= wpas
.CreateInterface(params
)
1757 wpas
.RemoveInterface(npath
)
1758 logger
.info("CreateInterface succeeds after %d allocation failures" % i
)
1759 state
= dev
[0].request('GET_ALLOC_FAIL')
1760 logger
.info("GET_ALLOC_FAIL: " + state
)
1761 dev
[0].dump_monitor()
1762 dev
[0].request("TEST_ALLOC_FAIL 0:")
1764 raise Exception("CreateInterface succeeded during out-of-memory")
1765 if not state
.startswith('0:'):
1767 except dbus
.exceptions
.DBusException
, e
:
1770 for arg
in [ 'Driver', 'Ifname', 'ConfigFile', 'BridgeIfname' ]:
1771 with
alloc_fail_dbus(dev
[0], 1, "=wpas_dbus_handler_create_interface",
1773 params
= dbus
.Dictionary({ arg
: 'foo' }, signature
='sv')
1774 wpas
.CreateInterface(params
)
1776 def test_dbus_blob(dev
, apdev
):
1777 """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
1778 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1779 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1781 blob
= dbus
.ByteArray("\x01\x02\x03")
1782 iface
.AddBlob('blob1', blob
)
1784 iface
.AddBlob('blob1', dbus
.ByteArray("\x01\x02\x04"))
1785 raise Exception("Invalid AddBlob() accepted")
1786 except dbus
.exceptions
.DBusException
, e
:
1787 if "BlobExists" not in str(e
):
1788 raise Exception("Unexpected error message for invalid AddBlob: " + str(e
))
1789 res
= iface
.GetBlob('blob1')
1790 if len(res
) != len(blob
):
1791 raise Exception("Unexpected blob data length")
1792 for i
in range(len(res
)):
1793 if res
[i
] != dbus
.Byte(blob
[i
]):
1794 raise Exception("Unexpected blob data")
1795 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "Blobs",
1796 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1797 if 'blob1' not in res
:
1798 raise Exception("Added blob missing from Blobs property")
1799 iface
.RemoveBlob('blob1')
1801 iface
.RemoveBlob('blob1')
1802 raise Exception("Invalid RemoveBlob() accepted")
1803 except dbus
.exceptions
.DBusException
, e
:
1804 if "BlobUnknown" not in str(e
):
1805 raise Exception("Unexpected error message for invalid RemoveBlob: " + str(e
))
1807 iface
.GetBlob('blob1')
1808 raise Exception("Invalid GetBlob() accepted")
1809 except dbus
.exceptions
.DBusException
, e
:
1810 if "BlobUnknown" not in str(e
):
1811 raise Exception("Unexpected error message for invalid GetBlob: " + str(e
))
1813 class TestDbusBlob(TestDbus
):
1814 def __init__(self
, bus
):
1815 TestDbus
.__init
__(self
, bus
)
1816 self
.blob_added
= False
1817 self
.blob_removed
= False
1819 def __enter__(self
):
1820 gobject
.timeout_add(1, self
.run_blob
)
1821 gobject
.timeout_add(15000, self
.timeout
)
1822 self
.add_signal(self
.blobAdded
, WPAS_DBUS_IFACE
, "BlobAdded")
1823 self
.add_signal(self
.blobRemoved
, WPAS_DBUS_IFACE
, "BlobRemoved")
1827 def blobAdded(self
, blobName
):
1828 logger
.debug("blobAdded: %s" % blobName
)
1829 if blobName
== 'blob2':
1830 self
.blob_added
= True
1832 def blobRemoved(self
, blobName
):
1833 logger
.debug("blobRemoved: %s" % blobName
)
1834 if blobName
== 'blob2':
1835 self
.blob_removed
= True
1838 def run_blob(self
, *args
):
1839 logger
.debug("run_blob")
1840 iface
.AddBlob('blob2', dbus
.ByteArray("\x01\x02\x04"))
1841 iface
.RemoveBlob('blob2')
1845 return self
.blob_added
and self
.blob_removed
1847 with
TestDbusBlob(bus
) as t
:
1849 raise Exception("Expected signals not seen")
1851 def test_dbus_blob_oom(dev
, apdev
):
1852 """D-Bus AddNetwork/RemoveNetwork OOM error cases"""
1853 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1854 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1856 for i
in range(1, 4):
1857 with
alloc_fail_dbus(dev
[0], i
, "wpas_dbus_handler_add_blob",
1859 iface
.AddBlob('blob_no_mem', dbus
.ByteArray("\x01\x02\x03\x04"))
1861 def test_dbus_autoscan(dev
, apdev
):
1862 """D-Bus Autoscan()"""
1863 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1864 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1866 iface
.AutoScan("foo")
1867 iface
.AutoScan("periodic:1")
1869 dev
[0].request("AUTOSCAN ")
1871 def test_dbus_autoscan_oom(dev
, apdev
):
1872 """D-Bus Autoscan() OOM"""
1873 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1874 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1876 with
alloc_fail_dbus(dev
[0], 1, "wpas_dbus_handler_autoscan", "AutoScan"):
1877 iface
.AutoScan("foo")
1878 dev
[0].request("AUTOSCAN ")
1880 def test_dbus_tdls_invalid(dev
, apdev
):
1881 """D-Bus invalid TDLS operations"""
1882 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1883 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1885 hapd
= hostapd
.add_ap(apdev
[0]['ifname'], { "ssid": "test-open" })
1886 connect_2sta_open(dev
, hapd
)
1887 addr1
= dev
[1].p2p_interface_addr()
1890 iface
.TDLSDiscover("foo")
1891 raise Exception("Invalid TDLSDiscover() accepted")
1892 except dbus
.exceptions
.DBusException
, e
:
1893 if "InvalidArgs" not in str(e
):
1894 raise Exception("Unexpected error message for invalid TDLSDiscover: " + str(e
))
1897 iface
.TDLSStatus("foo")
1898 raise Exception("Invalid TDLSStatus() accepted")
1899 except dbus
.exceptions
.DBusException
, e
:
1900 if "InvalidArgs" not in str(e
):
1901 raise Exception("Unexpected error message for invalid TDLSStatus: " + str(e
))
1903 res
= iface
.TDLSStatus(addr1
)
1904 if res
!= "peer does not exist":
1905 raise Exception("Unexpected TDLSStatus response")
1908 iface
.TDLSSetup("foo")
1909 raise Exception("Invalid TDLSSetup() accepted")
1910 except dbus
.exceptions
.DBusException
, e
:
1911 if "InvalidArgs" not in str(e
):
1912 raise Exception("Unexpected error message for invalid TDLSSetup: " + str(e
))
1915 iface
.TDLSTeardown("foo")
1916 raise Exception("Invalid TDLSTeardown() accepted")
1917 except dbus
.exceptions
.DBusException
, e
:
1918 if "InvalidArgs" not in str(e
):
1919 raise Exception("Unexpected error message for invalid TDLSTeardown: " + str(e
))
1922 iface
.TDLSTeardown("00:11:22:33:44:55")
1923 raise Exception("TDLSTeardown accepted for unknown peer")
1924 except dbus
.exceptions
.DBusException
, e
:
1925 if "UnknownError: error performing TDLS teardown" not in str(e
):
1926 raise Exception("Unexpected error message: " + str(e
))
1928 def test_dbus_tdls_oom(dev
, apdev
):
1929 """D-Bus TDLS operations during OOM"""
1930 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1931 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1933 with
alloc_fail_dbus(dev
[0], 1, "wpa_tdls_add_peer", "TDLSSetup",
1934 "UnknownError: error performing TDLS setup"):
1935 iface
.TDLSSetup("00:11:22:33:44:55")
1937 def test_dbus_tdls(dev
, apdev
):
1939 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1940 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1942 hapd
= hostapd
.add_ap(apdev
[0]['ifname'], { "ssid": "test-open" })
1943 connect_2sta_open(dev
, hapd
)
1945 addr1
= dev
[1].p2p_interface_addr()
1947 class TestDbusTdls(TestDbus
):
1948 def __init__(self
, bus
):
1949 TestDbus
.__init
__(self
, bus
)
1950 self
.tdls_setup
= False
1951 self
.tdls_teardown
= False
1953 def __enter__(self
):
1954 gobject
.timeout_add(1, self
.run_tdls
)
1955 gobject
.timeout_add(15000, self
.timeout
)
1956 self
.add_signal(self
.propertiesChanged
, WPAS_DBUS_IFACE
,
1957 "PropertiesChanged")
1961 def propertiesChanged(self
, properties
):
1962 logger
.debug("propertiesChanged: %s" % str(properties
))
1964 def run_tdls(self
, *args
):
1965 logger
.debug("run_tdls")
1966 iface
.TDLSDiscover(addr1
)
1967 gobject
.timeout_add(100, self
.run_tdls2
)
1970 def run_tdls2(self
, *args
):
1971 logger
.debug("run_tdls2")
1972 iface
.TDLSSetup(addr1
)
1973 gobject
.timeout_add(500, self
.run_tdls3
)
1976 def run_tdls3(self
, *args
):
1977 logger
.debug("run_tdls3")
1978 res
= iface
.TDLSStatus(addr1
)
1979 if res
== "connected":
1980 self
.tdls_setup
= True
1982 logger
.info("Unexpected TDLSStatus: " + res
)
1983 iface
.TDLSTeardown(addr1
)
1984 gobject
.timeout_add(200, self
.run_tdls4
)
1987 def run_tdls4(self
, *args
):
1988 logger
.debug("run_tdls4")
1989 res
= iface
.TDLSStatus(addr1
)
1990 if res
== "peer does not exist":
1991 self
.tdls_teardown
= True
1993 logger
.info("Unexpected TDLSStatus: " + res
)
1998 return self
.tdls_setup
and self
.tdls_teardown
2000 with
TestDbusTdls(bus
) as t
:
2002 raise Exception("Expected signals not seen")
2004 def test_dbus_pkcs11(dev
, apdev
):
2005 """D-Bus SetPKCS11EngineAndModulePath()"""
2006 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2007 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
2010 iface
.SetPKCS11EngineAndModulePath("foo", "bar")
2011 except dbus
.exceptions
.DBusException
, e
:
2012 if "Error.Failed: Reinit of the EAPOL" not in str(e
):
2013 raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e
))
2016 iface
.SetPKCS11EngineAndModulePath("foo", "")
2017 except dbus
.exceptions
.DBusException
, e
:
2018 if "Error.Failed: Reinit of the EAPOL" not in str(e
):
2019 raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e
))
2021 iface
.SetPKCS11EngineAndModulePath("", "bar")
2022 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "PKCS11EnginePath",
2023 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2025 raise Exception("Unexpected PKCS11EnginePath value: " + res
)
2026 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "PKCS11ModulePath",
2027 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2029 raise Exception("Unexpected PKCS11ModulePath value: " + res
)
2031 iface
.SetPKCS11EngineAndModulePath("", "")
2032 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "PKCS11EnginePath",
2033 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2035 raise Exception("Unexpected PKCS11EnginePath value: " + res
)
2036 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "PKCS11ModulePath",
2037 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2039 raise Exception("Unexpected PKCS11ModulePath value: " + res
)
2041 def test_dbus_apscan(dev
, apdev
):
2042 """D-Bus Get/Set ApScan"""
2044 _test_dbus_apscan(dev
, apdev
)
2046 dev
[0].request("AP_SCAN 1")
2048 def _test_dbus_apscan(dev
, apdev
):
2049 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2051 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "ApScan",
2052 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2054 raise Exception("Unexpected initial ApScan value: %d" % res
)
2057 if_obj
.Set(WPAS_DBUS_IFACE
, "ApScan", dbus
.UInt32(i
),
2058 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2059 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "ApScan",
2060 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2062 raise Exception("Unexpected ApScan value %d (expected %d)" % (res
, i
))
2065 if_obj
.Set(WPAS_DBUS_IFACE
, "ApScan", dbus
.Int16(-1),
2066 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2067 raise Exception("Invalid Set(ApScan,-1) accepted")
2068 except dbus
.exceptions
.DBusException
, e
:
2069 if "Error.Failed: wrong property type" not in str(e
):
2070 raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e
))
2073 if_obj
.Set(WPAS_DBUS_IFACE
, "ApScan", dbus
.UInt32(123),
2074 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2075 raise Exception("Invalid Set(ApScan,123) accepted")
2076 except dbus
.exceptions
.DBusException
, e
:
2077 if "Error.Failed: ap_scan must be 0, 1, or 2" not in str(e
):
2078 raise Exception("Unexpected error message for invalid Set(ApScan,123): " + str(e
))
2080 if_obj
.Set(WPAS_DBUS_IFACE
, "ApScan", dbus
.UInt32(1),
2081 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2083 def test_dbus_fastreauth(dev
, apdev
):
2084 """D-Bus Get/Set FastReauth"""
2085 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2087 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "FastReauth",
2088 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2090 raise Exception("Unexpected initial FastReauth value: " + str(res
))
2092 for i
in [ False, True ]:
2093 if_obj
.Set(WPAS_DBUS_IFACE
, "FastReauth", dbus
.Boolean(i
),
2094 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2095 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "FastReauth",
2096 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2098 raise Exception("Unexpected FastReauth value %d (expected %d)" % (res
, i
))
2101 if_obj
.Set(WPAS_DBUS_IFACE
, "FastReauth", dbus
.Int16(-1),
2102 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2103 raise Exception("Invalid Set(FastReauth,-1) accepted")
2104 except dbus
.exceptions
.DBusException
, e
:
2105 if "Error.Failed: wrong property type" not in str(e
):
2106 raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e
))
2108 if_obj
.Set(WPAS_DBUS_IFACE
, "FastReauth", dbus
.Boolean(True),
2109 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2111 def test_dbus_bss_expire(dev
, apdev
):
2112 """D-Bus Get/Set BSSExpireAge and BSSExpireCount"""
2113 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2115 if_obj
.Set(WPAS_DBUS_IFACE
, "BSSExpireAge", dbus
.UInt32(179),
2116 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2117 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "BSSExpireAge",
2118 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2120 raise Exception("Unexpected BSSExpireAge value %d (expected %d)" % (res
, i
))
2122 if_obj
.Set(WPAS_DBUS_IFACE
, "BSSExpireCount", dbus
.UInt32(3),
2123 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2124 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "BSSExpireCount",
2125 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2127 raise Exception("Unexpected BSSExpireCount value %d (expected %d)" % (res
, i
))
2130 if_obj
.Set(WPAS_DBUS_IFACE
, "BSSExpireAge", dbus
.Int16(-1),
2131 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2132 raise Exception("Invalid Set(BSSExpireAge,-1) accepted")
2133 except dbus
.exceptions
.DBusException
, e
:
2134 if "Error.Failed: wrong property type" not in str(e
):
2135 raise Exception("Unexpected error message for invalid Set(BSSExpireAge,-1): " + str(e
))
2138 if_obj
.Set(WPAS_DBUS_IFACE
, "BSSExpireAge", dbus
.UInt32(9),
2139 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2140 raise Exception("Invalid Set(BSSExpireAge,9) accepted")
2141 except dbus
.exceptions
.DBusException
, e
:
2142 if "Error.Failed: BSSExpireAge must be >= 10" not in str(e
):
2143 raise Exception("Unexpected error message for invalid Set(BSSExpireAge,9): " + str(e
))
2146 if_obj
.Set(WPAS_DBUS_IFACE
, "BSSExpireCount", dbus
.Int16(-1),
2147 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2148 raise Exception("Invalid Set(BSSExpireCount,-1) accepted")
2149 except dbus
.exceptions
.DBusException
, e
:
2150 if "Error.Failed: wrong property type" not in str(e
):
2151 raise Exception("Unexpected error message for invalid Set(BSSExpireCount,-1): " + str(e
))
2154 if_obj
.Set(WPAS_DBUS_IFACE
, "BSSExpireCount", dbus
.UInt32(0),
2155 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2156 raise Exception("Invalid Set(BSSExpireCount,0) accepted")
2157 except dbus
.exceptions
.DBusException
, e
:
2158 if "Error.Failed: BSSExpireCount must be > 0" not in str(e
):
2159 raise Exception("Unexpected error message for invalid Set(BSSExpireCount,0): " + str(e
))
2161 if_obj
.Set(WPAS_DBUS_IFACE
, "BSSExpireAge", dbus
.UInt32(180),
2162 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2163 if_obj
.Set(WPAS_DBUS_IFACE
, "BSSExpireCount", dbus
.UInt32(2),
2164 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2166 def test_dbus_country(dev
, apdev
):
2167 """D-Bus Get/Set Country"""
2169 _test_dbus_country(dev
, apdev
)
2171 dev
[0].request("SET country 00")
2172 subprocess
.call(['iw', 'reg', 'set', '00'])
2174 def _test_dbus_country(dev
, apdev
):
2175 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2177 # work around issues with possible pending regdom event from the end of
2178 # the previous test case
2180 dev
[0].dump_monitor()
2182 if_obj
.Set(WPAS_DBUS_IFACE
, "Country", "FI",
2183 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2184 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "Country",
2185 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2187 raise Exception("Unexpected Country value %s (expected FI)" % res
)
2189 ev
= dev
[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
2191 # For now, work around separate P2P Device interface event delivery
2192 ev
= dev
[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout
=1)
2194 raise Exception("regdom change event not seen")
2195 if "init=USER type=COUNTRY alpha2=FI" not in ev
:
2196 raise Exception("Unexpected event contents: " + ev
)
2199 if_obj
.Set(WPAS_DBUS_IFACE
, "Country", dbus
.Int16(-1),
2200 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2201 raise Exception("Invalid Set(Country,-1) accepted")
2202 except dbus
.exceptions
.DBusException
, e
:
2203 if "Error.Failed: wrong property type" not in str(e
):
2204 raise Exception("Unexpected error message for invalid Set(Country,-1): " + str(e
))
2207 if_obj
.Set(WPAS_DBUS_IFACE
, "Country", "F",
2208 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2209 raise Exception("Invalid Set(Country,F) accepted")
2210 except dbus
.exceptions
.DBusException
, e
:
2211 if "Error.Failed: invalid country code" not in str(e
):
2212 raise Exception("Unexpected error message for invalid Set(Country,F): " + str(e
))
2214 if_obj
.Set(WPAS_DBUS_IFACE
, "Country", "00",
2215 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2217 ev
= dev
[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
2219 # For now, work around separate P2P Device interface event delivery
2220 ev
= dev
[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout
=1)
2222 raise Exception("regdom change event not seen")
2223 if "init=CORE type=WORLD" not in ev
:
2224 raise Exception("Unexpected event contents: " + ev
)
2226 def test_dbus_scan_interval(dev
, apdev
):
2227 """D-Bus Get/Set ScanInterval"""
2229 _test_dbus_scan_interval(dev
, apdev
)
2231 dev
[0].request("SCAN_INTERVAL 5")
2233 def _test_dbus_scan_interval(dev
, apdev
):
2234 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2236 if_obj
.Set(WPAS_DBUS_IFACE
, "ScanInterval", dbus
.Int32(3),
2237 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2238 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "ScanInterval",
2239 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2241 raise Exception("Unexpected ScanInterval value %d (expected %d)" % (res
, i
))
2244 if_obj
.Set(WPAS_DBUS_IFACE
, "ScanInterval", dbus
.UInt16(100),
2245 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2246 raise Exception("Invalid Set(ScanInterval,100) accepted")
2247 except dbus
.exceptions
.DBusException
, e
:
2248 if "Error.Failed: wrong property type" not in str(e
):
2249 raise Exception("Unexpected error message for invalid Set(ScanInterval,100): " + str(e
))
2252 if_obj
.Set(WPAS_DBUS_IFACE
, "ScanInterval", dbus
.Int32(-1),
2253 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2254 raise Exception("Invalid Set(ScanInterval,-1) accepted")
2255 except dbus
.exceptions
.DBusException
, e
:
2256 if "Error.Failed: scan_interval must be >= 0" not in str(e
):
2257 raise Exception("Unexpected error message for invalid Set(ScanInterval,-1): " + str(e
))
2259 if_obj
.Set(WPAS_DBUS_IFACE
, "ScanInterval", dbus
.Int32(5),
2260 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2262 def test_dbus_probe_req_reporting(dev
, apdev
):
2263 """D-Bus Probe Request reporting"""
2264 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2266 dev
[1].p2p_find(social
=True)
2268 class TestDbusProbe(TestDbus
):
2269 def __init__(self
, bus
):
2270 TestDbus
.__init
__(self
, bus
)
2271 self
.reported
= False
2273 def __enter__(self
):
2274 gobject
.timeout_add(1, self
.run_test
)
2275 gobject
.timeout_add(15000, self
.timeout
)
2276 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
2278 self
.add_signal(self
.probeRequest
, WPAS_DBUS_IFACE
, "ProbeRequest",
2283 def groupStarted(self
, properties
):
2284 logger
.debug("groupStarted: " + str(properties
))
2285 g_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
2286 properties
['interface_object'])
2287 self
.iface
= dbus
.Interface(g_if_obj
, WPAS_DBUS_IFACE
)
2288 self
.iface
.SubscribeProbeReq()
2289 self
.group_p2p
= dbus
.Interface(g_if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
2291 def probeRequest(self
, args
):
2292 logger
.debug("probeRequest: args=%s" % str(args
))
2293 self
.reported
= True
2296 def run_test(self
, *args
):
2297 logger
.debug("run_test")
2298 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
2299 params
= dbus
.Dictionary({ 'frequency': 2412 })
2300 p2p
.GroupAdd(params
)
2304 return self
.reported
2306 with
TestDbusProbe(bus
) as t
:
2308 raise Exception("Expected signals not seen")
2309 t
.iface
.UnsubscribeProbeReq()
2311 t
.iface
.UnsubscribeProbeReq()
2312 raise Exception("Invalid UnsubscribeProbeReq() accepted")
2313 except dbus
.exceptions
.DBusException
, e
:
2314 if "NoSubscription" not in str(e
):
2315 raise Exception("Unexpected error message for invalid UnsubscribeProbeReq(): " + str(e
))
2316 t
.group_p2p
.Disconnect()
2318 with
TestDbusProbe(bus
) as t
:
2320 raise Exception("Expected signals not seen")
2321 # On purpose, leave ProbeReq subscription in place to test automatic
2324 dev
[1].p2p_stop_find()
2326 def test_dbus_probe_req_reporting_oom(dev
, apdev
):
2327 """D-Bus Probe Request reporting (OOM)"""
2328 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2329 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
2331 # Need to make sure this process has not already subscribed to avoid false
2332 # failures due to the operation succeeding due to os_strdup() not even
2335 iface
.UnsubscribeProbeReq()
2336 was_subscribed
= True
2337 except dbus
.exceptions
.DBusException
, e
:
2338 was_subscribed
= False
2341 with
alloc_fail_dbus(dev
[0], 1, "wpas_dbus_handler_subscribe_preq",
2342 "SubscribeProbeReq"):
2343 iface
.SubscribeProbeReq()
2346 # On purpose, leave ProbeReq subscription in place to test automatic
2348 iface
.SubscribeProbeReq()
2350 def test_dbus_p2p_invalid(dev
, apdev
):
2351 """D-Bus invalid P2P operations"""
2352 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2353 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
2356 p2p
.RejectPeer(path
+ "/Peers/00112233445566")
2357 raise Exception("Invalid RejectPeer accepted")
2358 except dbus
.exceptions
.DBusException
, e
:
2359 if "UnknownError: Failed to call wpas_p2p_reject" not in str(e
):
2360 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e
))
2363 p2p
.RejectPeer("/foo")
2364 raise Exception("Invalid RejectPeer accepted")
2365 except dbus
.exceptions
.DBusException
, e
:
2366 if "InvalidArgs" not in str(e
):
2367 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e
))
2377 raise Exception("Invalid RemoveClient accepted")
2378 except dbus
.exceptions
.DBusException
, e
:
2379 if "InvalidArgs" not in str(e
):
2380 raise Exception("Unexpected error message for invalid RemoveClient(): " + str(e
))
2382 tests
= [ {'DiscoveryType': 'foo'},
2383 {'RequestedDeviceTypes': 'foo'},
2384 {'RequestedDeviceTypes': ['foo']},
2385 {'RequestedDeviceTypes': ['1','2','3','4','5','6','7','8','9',
2386 '10','11','12','13','14','15','16',
2388 {'RequestedDeviceTypes': dbus
.Array([], signature
="s")},
2389 {'RequestedDeviceTypes': dbus
.Array([['foo']], signature
="as")},
2390 {'RequestedDeviceTypes': dbus
.Array([], signature
="i")},
2391 {'RequestedDeviceTypes': [dbus
.ByteArray('12345678'),
2392 dbus
.ByteArray('1234567')]},
2393 {'Foo': dbus
.Int16(1)},
2394 {'Foo': dbus
.UInt16(1)},
2395 {'Foo': dbus
.Int64(1)},
2396 {'Foo': dbus
.UInt64(1)},
2397 {'Foo': dbus
.Double(1.23)},
2398 {'Foo': dbus
.Signature('s')},
2402 p2p
.Find(dbus
.Dictionary(t
))
2403 raise Exception("Invalid Find accepted")
2404 except dbus
.exceptions
.DBusException
, e
:
2405 if "InvalidArgs" not in str(e
):
2406 raise Exception("Unexpected error message for invalid Find(): " + str(e
))
2409 "/fi/w1/wpa_supplicant1/Interfaces/1234",
2410 "/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234" ]:
2412 p2p
.RemovePersistentGroup(dbus
.ObjectPath(p
))
2413 raise Exception("Invalid RemovePersistentGroup accepted")
2414 except dbus
.exceptions
.DBusException
, e
:
2415 if "InvalidArgs" not in str(e
):
2416 raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e
))
2419 dev
[0].request("P2P_SET disabled 1")
2421 raise Exception("Invalid Listen accepted")
2422 except dbus
.exceptions
.DBusException
, e
:
2423 if "UnknownError: Could not start P2P listen" not in str(e
):
2424 raise Exception("Unexpected error message for invalid Listen: " + str(e
))
2426 dev
[0].request("P2P_SET disabled 0")
2428 test_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, path
, introspect
=False)
2429 test_p2p
= dbus
.Interface(test_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
2431 test_p2p
.Listen("foo")
2432 raise Exception("Invalid Listen accepted")
2433 except dbus
.exceptions
.DBusException
, e
:
2434 if "InvalidArgs" not in str(e
):
2435 raise Exception("Unexpected error message for invalid Listen: " + str(e
))
2438 dev
[0].request("P2P_SET disabled 1")
2439 p2p
.ExtendedListen(dbus
.Dictionary({}))
2440 raise Exception("Invalid ExtendedListen accepted")
2441 except dbus
.exceptions
.DBusException
, e
:
2442 if "UnknownError: failed to initiate a p2p_ext_listen" not in str(e
):
2443 raise Exception("Unexpected error message for invalid ExtendedListen: " + str(e
))
2445 dev
[0].request("P2P_SET disabled 0")
2448 dev
[0].request("P2P_SET disabled 1")
2449 args
= { 'duration1': 30000, 'interval1': 102400,
2450 'duration2': 20000, 'interval2': 102400 }
2451 p2p
.PresenceRequest(args
)
2452 raise Exception("Invalid PresenceRequest accepted")
2453 except dbus
.exceptions
.DBusException
, e
:
2454 if "UnknownError: Failed to invoke presence request" not in str(e
):
2455 raise Exception("Unexpected error message for invalid PresenceRequest: " + str(e
))
2457 dev
[0].request("P2P_SET disabled 0")
2460 params
= dbus
.Dictionary({'frequency': dbus
.Int32(-1)})
2461 p2p
.GroupAdd(params
)
2462 raise Exception("Invalid GroupAdd accepted")
2463 except dbus
.exceptions
.DBusException
, e
:
2464 if "InvalidArgs" not in str(e
):
2465 raise Exception("Unexpected error message for invalid GroupAdd: " + str(e
))
2468 params
= dbus
.Dictionary({'persistent_group_object':
2469 dbus
.ObjectPath(path
),
2471 p2p
.GroupAdd(params
)
2472 raise Exception("Invalid GroupAdd accepted")
2473 except dbus
.exceptions
.DBusException
, e
:
2474 if "InvalidArgs" not in str(e
):
2475 raise Exception("Unexpected error message for invalid GroupAdd: " + str(e
))
2479 raise Exception("Invalid Disconnect accepted")
2480 except dbus
.exceptions
.DBusException
, e
:
2481 if "UnknownError: failed to disconnect" not in str(e
):
2482 raise Exception("Unexpected error message for invalid Disconnect: " + str(e
))
2485 dev
[0].request("P2P_SET disabled 1")
2487 raise Exception("Invalid Flush accepted")
2488 except dbus
.exceptions
.DBusException
, e
:
2489 if "Error.Failed: P2P is not available for this interface" not in str(e
):
2490 raise Exception("Unexpected error message for invalid Flush: " + str(e
))
2492 dev
[0].request("P2P_SET disabled 0")
2495 dev
[0].request("P2P_SET disabled 1")
2496 args
= { 'peer': path
,
2498 'wps_method': 'pbc',
2500 pin
= p2p
.Connect(args
)
2501 raise Exception("Invalid Connect accepted")
2502 except dbus
.exceptions
.DBusException
, e
:
2503 if "Error.Failed: P2P is not available for this interface" not in str(e
):
2504 raise Exception("Unexpected error message for invalid Connect: " + str(e
))
2506 dev
[0].request("P2P_SET disabled 0")
2508 tests
= [ { 'frequency': dbus
.Int32(-1) },
2509 { 'wps_method': 'pbc' },
2510 { 'wps_method': 'foo' } ]
2513 pin
= p2p
.Connect(args
)
2514 raise Exception("Invalid Connect accepted")
2515 except dbus
.exceptions
.DBusException
, e
:
2516 if "InvalidArgs" not in str(e
):
2517 raise Exception("Unexpected error message for invalid Connect: " + str(e
))
2520 dev
[0].request("P2P_SET disabled 1")
2521 args
= { 'peer': path
}
2522 pin
= p2p
.Invite(args
)
2523 raise Exception("Invalid Invite accepted")
2524 except dbus
.exceptions
.DBusException
, e
:
2525 if "Error.Failed: P2P is not available for this interface" not in str(e
):
2526 raise Exception("Unexpected error message for invalid Invite: " + str(e
))
2528 dev
[0].request("P2P_SET disabled 0")
2531 args
= { 'foo': 'bar' }
2532 pin
= p2p
.Invite(args
)
2533 raise Exception("Invalid Invite accepted")
2534 except dbus
.exceptions
.DBusException
, e
:
2535 if "InvalidArgs" not in str(e
):
2536 raise Exception("Unexpected error message for invalid Connect: " + str(e
))
2538 tests
= [ (path
, 'display', "InvalidArgs"),
2539 (dbus
.ObjectPath(path
+ "/Peers/00112233445566"),
2541 "UnknownError: Failed to send provision discovery request"),
2542 (dbus
.ObjectPath(path
+ "/Peers/00112233445566"),
2544 "UnknownError: Failed to send provision discovery request"),
2545 (dbus
.ObjectPath(path
+ "/Peers/00112233445566"),
2547 "UnknownError: Failed to send provision discovery request"),
2548 (dbus
.ObjectPath(path
+ "/Peers/00112233445566"),
2550 "UnknownError: Failed to send provision discovery request"),
2551 (dbus
.ObjectPath(path
+ "/Peers/00112233445566"),
2552 'foo', "InvalidArgs") ]
2553 for (p
,method
,err
) in tests
:
2555 p2p
.ProvisionDiscoveryRequest(p
, method
)
2556 raise Exception("Invalid ProvisionDiscoveryRequest accepted")
2557 except dbus
.exceptions
.DBusException
, e
:
2558 if err
not in str(e
):
2559 raise Exception("Unexpected error message for invalid ProvisionDiscoveryRequest: " + str(e
))
2562 dev
[0].request("P2P_SET disabled 1")
2563 if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "Peers",
2564 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2565 raise Exception("Invalid Get(Peers) accepted")
2566 except dbus
.exceptions
.DBusException
, e
:
2567 if "Error.Failed: P2P is not available for this interface" not in str(e
):
2568 raise Exception("Unexpected error message for invalid Get(Peers): " + str(e
))
2570 dev
[0].request("P2P_SET disabled 0")
2572 def test_dbus_p2p_oom(dev
, apdev
):
2573 """D-Bus P2P operations and OOM"""
2574 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2575 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
2577 with
alloc_fail_dbus(dev
[0], 1, "_wpa_dbus_dict_entry_get_string_array",
2578 "Find", "InvalidArgs"):
2579 p2p
.Find(dbus
.Dictionary({ 'Foo': [ 'bar' ] }))
2581 with
alloc_fail_dbus(dev
[0], 2, "_wpa_dbus_dict_entry_get_string_array",
2582 "Find", "InvalidArgs"):
2583 p2p
.Find(dbus
.Dictionary({ 'Foo': [ 'bar' ] }))
2585 with
alloc_fail_dbus(dev
[0], 10, "_wpa_dbus_dict_entry_get_string_array",
2586 "Find", "InvalidArgs"):
2587 p2p
.Find(dbus
.Dictionary({ 'Foo': [ '1','2','3','4','5','6','7','8','9' ] }))
2589 with
alloc_fail_dbus(dev
[0], 1, ":=_wpa_dbus_dict_entry_get_binarray",
2590 "Find", "InvalidArgs"):
2591 p2p
.Find(dbus
.Dictionary({ 'Foo': [ dbus
.ByteArray('123') ] }))
2593 with
alloc_fail_dbus(dev
[0], 1, "_wpa_dbus_dict_entry_get_byte_array;_wpa_dbus_dict_entry_get_binarray",
2594 "Find", "InvalidArgs"):
2595 p2p
.Find(dbus
.Dictionary({ 'Foo': [ dbus
.ByteArray('123') ] }))
2597 with
alloc_fail_dbus(dev
[0], 2, "=_wpa_dbus_dict_entry_get_binarray",
2598 "Find", "InvalidArgs"):
2599 p2p
.Find(dbus
.Dictionary({ 'Foo': [ dbus
.ByteArray('123'),
2600 dbus
.ByteArray('123'),
2601 dbus
.ByteArray('123'),
2602 dbus
.ByteArray('123'),
2603 dbus
.ByteArray('123'),
2604 dbus
.ByteArray('123'),
2605 dbus
.ByteArray('123'),
2606 dbus
.ByteArray('123'),
2607 dbus
.ByteArray('123'),
2608 dbus
.ByteArray('123'),
2609 dbus
.ByteArray('123') ] }))
2611 with
alloc_fail_dbus(dev
[0], 1, "wpabuf_alloc_ext_data;_wpa_dbus_dict_entry_get_binarray",
2612 "Find", "InvalidArgs"):
2613 p2p
.Find(dbus
.Dictionary({ 'Foo': [ dbus
.ByteArray('123') ] }))
2615 with
alloc_fail_dbus(dev
[0], 1, "_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_handler_p2p_find",
2616 "Find", "InvalidArgs"):
2617 p2p
.Find(dbus
.Dictionary({ 'Foo': path
}))
2619 with
alloc_fail_dbus(dev
[0], 1, "_wpa_dbus_dict_entry_get_byte_array",
2620 "AddService", "InvalidArgs"):
2621 args
= { 'service_type': 'bonjour',
2622 'response': dbus
.ByteArray(500*'b') }
2623 p2p
.AddService(args
)
2625 with
alloc_fail_dbus(dev
[0], 2, "_wpa_dbus_dict_entry_get_byte_array",
2626 "AddService", "InvalidArgs"):
2627 p2p
.AddService(args
)
2629 def test_dbus_p2p_discovery(dev
, apdev
):
2630 """D-Bus P2P discovery"""
2631 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2632 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
2634 addr0
= dev
[0].p2p_dev_addr()
2636 dev
[1].request("SET sec_device_type 1-0050F204-2")
2637 dev
[1].request("VENDOR_ELEM_ADD 1 dd0c0050f2041049000411223344")
2639 addr1
= dev
[1].p2p_dev_addr()
2640 a1
= binascii
.unhexlify(addr1
.replace(':',''))
2642 wfd_devinfo
= "00001c440028"
2643 dev
[2].request("SET wifi_display 1")
2644 dev
[2].request("WFD_SUBELEM_SET 0 0006" + wfd_devinfo
)
2645 wfd
= binascii
.unhexlify('000006' + wfd_devinfo
)
2647 addr2
= dev
[2].p2p_dev_addr()
2648 a2
= binascii
.unhexlify(addr2
.replace(':',''))
2650 res
= if_obj
.GetAll(WPAS_DBUS_IFACE_P2PDEVICE
,
2651 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2652 if 'Peers' not in res
:
2653 raise Exception("GetAll result missing Peers")
2654 if len(res
['Peers']) != 0:
2655 raise Exception("Unexpected peer(s) in the list")
2657 args
= {'DiscoveryType': 'social',
2658 'RequestedDeviceTypes': [dbus
.ByteArray('12345678')],
2659 'Timeout': dbus
.Int32(1) }
2660 p2p
.Find(dbus
.Dictionary(args
))
2663 class TestDbusP2p(TestDbus
):
2664 def __init__(self
, bus
):
2665 TestDbus
.__init
__(self
, bus
)
2669 self
.find_stopped
= False
2671 def __enter__(self
):
2672 gobject
.timeout_add(1, self
.run_test
)
2673 gobject
.timeout_add(15000, self
.timeout
)
2674 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
2676 self
.add_signal(self
.deviceLost
, WPAS_DBUS_IFACE_P2PDEVICE
,
2678 self
.add_signal(self
.provisionDiscoveryResponseEnterPin
,
2679 WPAS_DBUS_IFACE_P2PDEVICE
,
2680 "ProvisionDiscoveryResponseEnterPin")
2681 self
.add_signal(self
.findStopped
, WPAS_DBUS_IFACE_P2PDEVICE
,
2686 def deviceFound(self
, path
):
2687 logger
.debug("deviceFound: path=%s" % path
)
2688 res
= if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "Peers",
2689 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2691 raise Exception("Unexpected number of peers")
2693 raise Exception("Mismatch in peer object path")
2694 peer_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, path
)
2695 res
= peer_obj
.GetAll(WPAS_DBUS_P2P_PEER
,
2696 dbus_interface
=dbus
.PROPERTIES_IFACE
,
2698 logger
.debug("peer properties: " + str(res
))
2700 if res
['DeviceAddress'] == a1
:
2701 if 'SecondaryDeviceTypes' not in res
:
2702 raise Exception("Missing SecondaryDeviceTypes")
2703 sec
= res
['SecondaryDeviceTypes']
2705 raise Exception("Secondary device type missing")
2706 if "\x00\x01\x00\x50\xF2\x04\x00\x02" not in sec
:
2707 raise Exception("Secondary device type mismatch")
2709 if 'VendorExtension' not in res
:
2710 raise Exception("Missing VendorExtension")
2711 vendor
= res
['VendorExtension']
2713 raise Exception("Vendor extension missing")
2714 if "\x11\x22\x33\x44" not in vendor
:
2715 raise Exception("Secondary device type mismatch")
2718 elif res
['DeviceAddress'] == a2
:
2719 if 'IEs' not in res
:
2720 raise Exception("IEs missing")
2721 if res
['IEs'] != wfd
:
2722 raise Exception("IEs mismatch")
2725 raise Exception("Unexpected peer device address")
2727 if self
.found
and self
.found2
:
2729 p2p
.RejectPeer(path
)
2730 p2p
.ProvisionDiscoveryRequest(path
, 'display')
2732 def deviceLost(self
, path
):
2733 logger
.debug("deviceLost: path=%s" % path
)
2736 p2p
.RejectPeer(path
)
2737 raise Exception("Invalid RejectPeer accepted")
2738 except dbus
.exceptions
.DBusException
, e
:
2739 if "UnknownError: Failed to call wpas_p2p_reject" not in str(e
):
2740 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e
))
2743 def provisionDiscoveryResponseEnterPin(self
, peer_object
):
2744 logger
.debug("provisionDiscoveryResponseEnterPin - peer=%s" % peer_object
)
2747 def findStopped(self
):
2748 logger
.debug("findStopped")
2749 self
.find_stopped
= True
2751 def run_test(self
, *args
):
2752 logger
.debug("run_test")
2753 p2p
.Find(dbus
.Dictionary({'DiscoveryType': 'social',
2754 'Timeout': dbus
.Int32(10)}))
2758 return self
.found
and self
.lost
and self
.found2
and self
.find_stopped
2760 with
TestDbusP2p(bus
) as t
:
2762 raise Exception("Expected signals not seen")
2764 dev
[1].request("VENDOR_ELEM_REMOVE 1 *")
2765 dev
[1].p2p_stop_find()
2768 dev
[2].p2p_stop_find()
2769 dev
[2].request("P2P_FLUSH")
2770 if not dev
[2].discover_peer(addr0
):
2771 raise Exception("Peer not found")
2773 dev
[2].p2p_stop_find()
2776 p2p
.ExtendedListen(dbus
.Dictionary({'foo': 100}))
2777 raise Exception("Invalid ExtendedListen accepted")
2778 except dbus
.exceptions
.DBusException
, e
:
2779 if "InvalidArgs" not in str(e
):
2780 raise Exception("Unexpected error message for invalid ExtendedListen(): " + str(e
))
2782 p2p
.ExtendedListen(dbus
.Dictionary({'period': 100, 'interval': 1000}))
2783 p2p
.ExtendedListen(dbus
.Dictionary({}))
2784 dev
[0].global_request("P2P_EXT_LISTEN")
2786 def test_dbus_p2p_service_discovery(dev
, apdev
):
2787 """D-Bus P2P service discovery"""
2788 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2789 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
2791 addr0
= dev
[0].p2p_dev_addr()
2792 addr1
= dev
[1].p2p_dev_addr()
2794 bonjour_query
= dbus
.ByteArray(binascii
.unhexlify('0b5f6166706f766572746370c00c000c01'))
2795 bonjour_response
= dbus
.ByteArray(binascii
.unhexlify('074578616d706c65c027'))
2797 args
= { 'service_type': 'bonjour',
2798 'query': bonjour_query
,
2799 'response': bonjour_response
}
2800 p2p
.AddService(args
)
2802 p2p
.AddService(args
)
2805 p2p
.DeleteService(args
)
2806 raise Exception("Invalid DeleteService() accepted")
2807 except dbus
.exceptions
.DBusException
, e
:
2808 if "InvalidArgs" not in str(e
):
2809 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e
))
2811 args
= { 'service_type': 'bonjour',
2812 'query': bonjour_query
}
2813 p2p
.DeleteService(args
)
2815 p2p
.DeleteService(args
)
2816 raise Exception("Invalid DeleteService() accepted")
2817 except dbus
.exceptions
.DBusException
, e
:
2818 if "InvalidArgs" not in str(e
):
2819 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e
))
2821 args
= { 'service_type': 'upnp',
2823 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' }
2824 p2p
.AddService(args
)
2825 p2p
.DeleteService(args
)
2827 p2p
.DeleteService(args
)
2828 raise Exception("Invalid DeleteService() accepted")
2829 except dbus
.exceptions
.DBusException
, e
:
2830 if "InvalidArgs" not in str(e
):
2831 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e
))
2833 tests
= [ { 'service_type': 'foo' },
2834 { 'service_type': 'foo', 'query': bonjour_query
},
2835 { 'service_type': 'upnp' },
2836 { 'service_type': 'upnp', 'version': 0x10 },
2837 { 'service_type': 'upnp',
2838 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2840 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2841 { 'service_type': 'upnp', 'foo': 'bar' },
2842 { 'service_type': 'bonjour' },
2843 { 'service_type': 'bonjour', 'query': 'foo' },
2844 { 'service_type': 'bonjour', 'foo': 'bar' } ]
2847 p2p
.DeleteService(args
)
2848 raise Exception("Invalid DeleteService() accepted")
2849 except dbus
.exceptions
.DBusException
, e
:
2850 if "InvalidArgs" not in str(e
):
2851 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e
))
2853 tests
= [ { 'service_type': 'foo' },
2854 { 'service_type': 'upnp' },
2855 { 'service_type': 'upnp', 'version': 0x10 },
2856 { 'service_type': 'upnp',
2857 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2859 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2860 { 'service_type': 'upnp', 'foo': 'bar' },
2861 { 'service_type': 'bonjour' },
2862 { 'service_type': 'bonjour', 'query': 'foo' },
2863 { 'service_type': 'bonjour', 'response': 'foo' },
2864 { 'service_type': 'bonjour', 'query': bonjour_query
},
2865 { 'service_type': 'bonjour', 'response': bonjour_response
},
2866 { 'service_type': 'bonjour', 'query': dbus
.ByteArray(500*'a') },
2867 { 'service_type': 'bonjour', 'foo': 'bar' } ]
2870 p2p
.AddService(args
)
2871 raise Exception("Invalid AddService() accepted")
2872 except dbus
.exceptions
.DBusException
, e
:
2873 if "InvalidArgs" not in str(e
):
2874 raise Exception("Unexpected error message for invalid AddService(): " + str(e
))
2876 args
= { 'tlv': dbus
.ByteArray("\x02\x00\x00\x01") }
2877 ref
= p2p
.ServiceDiscoveryRequest(args
)
2878 p2p
.ServiceDiscoveryCancelRequest(ref
)
2880 p2p
.ServiceDiscoveryCancelRequest(ref
)
2881 raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
2882 except dbus
.exceptions
.DBusException
, e
:
2883 if "InvalidArgs" not in str(e
):
2884 raise Exception("Unexpected error message for invalid AddService(): " + str(e
))
2886 p2p
.ServiceDiscoveryCancelRequest(dbus
.UInt64(0))
2887 raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
2888 except dbus
.exceptions
.DBusException
, e
:
2889 if "InvalidArgs" not in str(e
):
2890 raise Exception("Unexpected error message for invalid AddService(): " + str(e
))
2892 args
= { 'service_type': 'upnp',
2894 'service': 'ssdp:foo' }
2895 ref
= p2p
.ServiceDiscoveryRequest(args
)
2896 p2p
.ServiceDiscoveryCancelRequest(ref
)
2898 tests
= [ { 'service_type': 'foo' },
2903 { 'service_type': 'upnp',
2904 'service': 'ssdp:foo' },
2905 { 'service_type': 'upnp',
2907 { 'service_type': 'upnp',
2909 'service': 'ssdp:foo',
2910 'peer_object': dbus
.ObjectPath(path
+ "/Peers") },
2911 { 'service_type': 'upnp',
2913 'service': 'ssdp:foo',
2914 'peer_object': path
+ "/Peers" },
2915 { 'service_type': 'upnp',
2917 'service': 'ssdp:foo',
2918 'peer_object': dbus
.ObjectPath(path
+ "/Peers/00112233445566") } ]
2921 p2p
.ServiceDiscoveryRequest(args
)
2922 raise Exception("Invalid ServiceDiscoveryRequest accepted")
2923 except dbus
.exceptions
.DBusException
, e
:
2924 if "InvalidArgs" not in str(e
):
2925 raise Exception("Unexpected error message for invalid ServiceDiscoveryRequest(): " + str(e
))
2927 args
= { 'foo': 'bar' }
2929 p2p
.ServiceDiscoveryResponse(dbus
.Dictionary(args
, signature
='sv'))
2930 raise Exception("Invalid ServiceDiscoveryResponse accepted")
2931 except dbus
.exceptions
.DBusException
, e
:
2932 if "InvalidArgs" not in str(e
):
2933 raise Exception("Unexpected error message for invalid ServiceDiscoveryResponse(): " + str(e
))
2935 def test_dbus_p2p_service_discovery_query(dev
, apdev
):
2936 """D-Bus P2P service discovery query"""
2937 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2938 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
2940 addr0
= dev
[0].p2p_dev_addr()
2941 dev
[1].request("P2P_SERVICE_ADD bonjour 0b5f6166706f766572746370c00c000c01 074578616d706c65c027")
2943 addr1
= dev
[1].p2p_dev_addr()
2945 class TestDbusP2p(TestDbus
):
2946 def __init__(self
, bus
):
2947 TestDbus
.__init
__(self
, bus
)
2950 def __enter__(self
):
2951 gobject
.timeout_add(1, self
.run_test
)
2952 gobject
.timeout_add(15000, self
.timeout
)
2953 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
2955 self
.add_signal(self
.serviceDiscoveryResponse
,
2956 WPAS_DBUS_IFACE_P2PDEVICE
,
2957 "ServiceDiscoveryResponse", byte_arrays
=True)
2961 def deviceFound(self
, path
):
2962 logger
.debug("deviceFound: path=%s" % path
)
2963 args
= { 'peer_object': path
,
2964 'tlv': dbus
.ByteArray("\x02\x00\x00\x01") }
2965 p2p
.ServiceDiscoveryRequest(args
)
2967 def serviceDiscoveryResponse(self
, sd_request
):
2968 logger
.debug("serviceDiscoveryResponse: sd_request=%s" % str(sd_request
))
2972 def run_test(self
, *args
):
2973 logger
.debug("run_test")
2974 p2p
.Find(dbus
.Dictionary({'DiscoveryType': 'social',
2975 'Timeout': dbus
.Int32(10)}))
2981 with
TestDbusP2p(bus
) as t
:
2983 raise Exception("Expected signals not seen")
2985 dev
[1].p2p_stop_find()
2987 def test_dbus_p2p_service_discovery_external(dev
, apdev
):
2988 """D-Bus P2P service discovery with external response"""
2990 _test_dbus_p2p_service_discovery_external(dev
, apdev
)
2992 dev
[0].request("P2P_SERV_DISC_EXTERNAL 0")
2994 def _test_dbus_p2p_service_discovery_external(dev
, apdev
):
2995 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2996 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
2998 addr0
= dev
[0].p2p_dev_addr()
2999 addr1
= dev
[1].p2p_dev_addr()
3002 dev
[1].request("P2P_FLUSH")
3003 dev
[1].request("P2P_SERV_DISC_REQ " + addr0
+ " 02000001")
3004 dev
[1].p2p_find(social
=True)
3006 class TestDbusP2p(TestDbus
):
3007 def __init__(self
, bus
):
3008 TestDbus
.__init
__(self
, bus
)
3011 def __enter__(self
):
3012 gobject
.timeout_add(1, self
.run_test
)
3013 gobject
.timeout_add(15000, self
.timeout
)
3014 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
3016 self
.add_signal(self
.serviceDiscoveryRequest
,
3017 WPAS_DBUS_IFACE_P2PDEVICE
,
3018 "ServiceDiscoveryRequest")
3022 def deviceFound(self
, path
):
3023 logger
.debug("deviceFound: path=%s" % path
)
3025 def serviceDiscoveryRequest(self
, sd_request
):
3026 logger
.debug("serviceDiscoveryRequest: sd_request=%s" % str(sd_request
))
3028 args
= { 'peer_object': sd_request
['peer_object'],
3029 'frequency': sd_request
['frequency'],
3030 'dialog_token': sd_request
['dialog_token'],
3031 'tlvs': dbus
.ByteArray(binascii
.unhexlify(resp
)) }
3032 p2p
.ServiceDiscoveryResponse(dbus
.Dictionary(args
, signature
='sv'))
3035 def run_test(self
, *args
):
3036 logger
.debug("run_test")
3037 p2p
.ServiceDiscoveryExternal(1)
3045 with
TestDbusP2p(bus
) as t
:
3047 raise Exception("Expected signals not seen")
3049 ev
= dev
[1].wait_global_event(["P2P-SERV-DISC-RESP"], timeout
=5)
3051 raise Exception("Service discovery timed out")
3053 raise Exception("Unexpected address in SD Response: " + ev
)
3054 if ev
.split(' ')[4] != resp
:
3055 raise Exception("Unexpected response data SD Response: " + ev
)
3056 dev
[1].p2p_stop_find()
3059 p2p
.ServiceDiscoveryExternal(0)
3061 def test_dbus_p2p_autogo(dev
, apdev
):
3062 """D-Bus P2P autonomous GO"""
3063 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
3064 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
3066 addr0
= dev
[0].p2p_dev_addr()
3068 class TestDbusP2p(TestDbus
):
3069 def __init__(self
, bus
):
3070 TestDbus
.__init
__(self
, bus
)
3072 self
.waiting_end
= False
3073 self
.exceptions
= False
3074 self
.deauthorized
= False
3077 def __enter__(self
):
3078 gobject
.timeout_add(1, self
.run_test
)
3079 gobject
.timeout_add(15000, self
.timeout
)
3080 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
3082 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
3084 self
.add_signal(self
.groupFinished
, WPAS_DBUS_IFACE_P2PDEVICE
,
3086 self
.add_signal(self
.persistentGroupAdded
,
3087 WPAS_DBUS_IFACE_P2PDEVICE
,
3088 "PersistentGroupAdded")
3089 self
.add_signal(self
.persistentGroupRemoved
,
3090 WPAS_DBUS_IFACE_P2PDEVICE
,
3091 "PersistentGroupRemoved")
3092 self
.add_signal(self
.provisionDiscoveryRequestDisplayPin
,
3093 WPAS_DBUS_IFACE_P2PDEVICE
,
3094 "ProvisionDiscoveryRequestDisplayPin")
3095 self
.add_signal(self
.staAuthorized
, WPAS_DBUS_IFACE
,
3097 self
.add_signal(self
.staDeauthorized
, WPAS_DBUS_IFACE
,
3102 def groupStarted(self
, properties
):
3103 logger
.debug("groupStarted: " + str(properties
))
3104 self
.group
= properties
['group_object']
3105 self
.g_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
3106 properties
['interface_object'])
3107 role
= self
.g_if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "Role",
3108 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3110 self
.exceptions
= True
3111 raise Exception("Unexpected role reported: " + role
)
3112 group
= self
.g_if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "Group",
3113 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3114 if group
!= properties
['group_object']:
3115 self
.exceptions
= True
3116 raise Exception("Unexpected Group reported: " + str(group
))
3117 go
= self
.g_if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "PeerGO",
3118 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3120 self
.exceptions
= True
3121 raise Exception("Unexpected PeerGO value: " + str(go
))
3124 logger
.info("Remove persistent group instance")
3125 group_p2p
= dbus
.Interface(self
.g_if_obj
,
3126 WPAS_DBUS_IFACE_P2PDEVICE
)
3127 group_p2p
.Disconnect()
3129 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3130 dev1
.global_request("P2P_CONNECT " + addr0
+ " 12345670 join")
3132 def groupFinished(self
, properties
):
3133 logger
.debug("groupFinished: " + str(properties
))
3134 if self
.waiting_end
:
3135 logger
.info("Remove persistent group")
3136 p2p
.RemovePersistentGroup(self
.persistent
)
3138 logger
.info("Re-start persistent group")
3139 params
= dbus
.Dictionary({'persistent_group_object':
3142 p2p
.GroupAdd(params
)
3144 def persistentGroupAdded(self
, path
, properties
):
3145 logger
.debug("persistentGroupAdded: %s %s" % (path
, str(properties
)))
3146 self
.persistent
= path
3148 def persistentGroupRemoved(self
, path
):
3149 logger
.debug("persistentGroupRemoved: %s" % path
)
3153 def deviceFound(self
, path
):
3154 logger
.debug("deviceFound: path=%s" % path
)
3155 peer_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, path
)
3156 self
.peer
= peer_obj
.GetAll(WPAS_DBUS_P2P_PEER
,
3157 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3159 logger
.debug('peer properties: ' + str(self
.peer
))
3161 def provisionDiscoveryRequestDisplayPin(self
, peer_object
, pin
):
3162 logger
.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object
, pin
))
3163 self
.peer_path
= peer_object
3164 peer
= binascii
.unhexlify(peer_object
.split('/')[-1])
3169 addr
+= '%02x' % ord(p
)
3171 params
= { 'Role': 'registrar',
3172 'P2PDeviceAddress': self
.peer
['DeviceAddress'],
3173 'Bssid': self
.peer
['DeviceAddress'],
3175 wps
= dbus
.Interface(self
.g_if_obj
, WPAS_DBUS_IFACE_WPS
)
3178 self
.exceptions
= True
3179 raise Exception("Invalid WPS.Start() accepted")
3180 except dbus
.exceptions
.DBusException
, e
:
3181 if "InvalidArgs" not in str(e
):
3182 self
.exceptions
= True
3183 raise Exception("Unexpected error message: " + str(e
))
3184 params
= { 'Role': 'registrar',
3185 'P2PDeviceAddress': self
.peer
['DeviceAddress'],
3188 logger
.info("Authorize peer to connect to the group")
3191 def staAuthorized(self
, name
):
3192 logger
.debug("staAuthorized: " + name
)
3193 peer_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, self
.peer_path
)
3194 res
= peer_obj
.GetAll(WPAS_DBUS_P2P_PEER
,
3195 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3197 logger
.debug("Peer properties: " + str(res
))
3198 if 'Groups' not in res
or len(res
['Groups']) != 1:
3199 self
.exceptions
= True
3200 raise Exception("Unexpected number of peer Groups entries")
3201 if res
['Groups'][0] != self
.group
:
3202 self
.exceptions
= True
3203 raise Exception("Unexpected peer Groups[0] value")
3205 g_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, self
.group
)
3206 res
= g_obj
.GetAll(WPAS_DBUS_GROUP
,
3207 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3209 logger
.debug("Group properties: " + str(res
))
3210 if 'Members' not in res
or len(res
['Members']) != 1:
3211 self
.exceptions
= True
3212 raise Exception("Unexpected number of group members")
3214 ext
= dbus
.ByteArray("\x11\x22\x33\x44")
3215 # Earlier implementation of this interface was a bit strange. The
3216 # property is defined to have aay signature and that is what the
3217 # getter returned. However, the setter expected there to be a
3218 # dictionary with 'WPSVendorExtensions' as the key surrounding these
3219 # values.. The current implementations maintains support for that
3220 # for backwards compability reasons. Verify that encoding first.
3221 vals
= dbus
.Dictionary({ 'WPSVendorExtensions': [ ext
]},
3223 g_obj
.Set(WPAS_DBUS_GROUP
, 'WPSVendorExtensions', vals
,
3224 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3225 res
= g_obj
.Get(WPAS_DBUS_GROUP
, 'WPSVendorExtensions',
3226 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3229 self
.exceptions
= True
3230 raise Exception("Unexpected number of vendor extensions")
3232 self
.exceptions
= True
3233 raise Exception("Vendor extension value changed")
3235 # And now verify that the more appropriate encoding is accepted as
3237 res
.append(dbus
.ByteArray('\xaa\xbb\xcc\xdd\xee\xff'))
3238 g_obj
.Set(WPAS_DBUS_GROUP
, 'WPSVendorExtensions', res
,
3239 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3240 res2
= g_obj
.Get(WPAS_DBUS_GROUP
, 'WPSVendorExtensions',
3241 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3244 self
.exceptions
= True
3245 raise Exception("Unexpected number of vendor extensions")
3246 if res
[0] != res2
[0] or res
[1] != res2
[1]:
3247 self
.exceptions
= True
3248 raise Exception("Vendor extension value changed")
3251 res
.append(dbus
.ByteArray('\xaa\xbb'))
3253 g_obj
.Set(WPAS_DBUS_GROUP
, 'WPSVendorExtensions', res
,
3254 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3255 self
.exceptions
= True
3256 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3257 except dbus
.exceptions
.DBusException
, e
:
3258 if "Error.Failed" not in str(e
):
3259 self
.exceptions
= True
3260 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e
))
3262 vals
= dbus
.Dictionary({ 'Foo': [ ext
]}, signature
='sv')
3264 g_obj
.Set(WPAS_DBUS_GROUP
, 'WPSVendorExtensions', vals
,
3265 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3266 self
.exceptions
= True
3267 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3268 except dbus
.exceptions
.DBusException
, e
:
3269 if "InvalidArgs" not in str(e
):
3270 self
.exceptions
= True
3271 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e
))
3275 g_obj
.Set(WPAS_DBUS_GROUP
, 'WPSVendorExtensions', vals
,
3276 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3277 self
.exceptions
= True
3278 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3279 except dbus
.exceptions
.DBusException
, e
:
3280 if "Error.Failed" not in str(e
):
3281 self
.exceptions
= True
3282 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e
))
3284 vals
= [ [ "foo" ] ]
3286 g_obj
.Set(WPAS_DBUS_GROUP
, 'WPSVendorExtensions', vals
,
3287 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3288 self
.exceptions
= True
3289 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3290 except dbus
.exceptions
.DBusException
, e
:
3291 if "Error.Failed" not in str(e
):
3292 self
.exceptions
= True
3293 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e
))
3295 p2p
.RemoveClient({ 'peer': self
.peer_path
})
3297 self
.waiting_end
= True
3298 group_p2p
= dbus
.Interface(self
.g_if_obj
,
3299 WPAS_DBUS_IFACE_P2PDEVICE
)
3300 group_p2p
.Disconnect()
3302 def staDeauthorized(self
, name
):
3303 logger
.debug("staDeauthorized: " + name
)
3304 self
.deauthorized
= True
3306 def run_test(self
, *args
):
3307 logger
.debug("run_test")
3308 params
= dbus
.Dictionary({'persistent': True,
3310 logger
.info("Add a persistent group")
3311 p2p
.GroupAdd(params
)
3315 return self
.done
and self
.deauthorized
and not self
.exceptions
3317 with
TestDbusP2p(bus
) as t
:
3319 raise Exception("Expected signals not seen")
3321 dev
[1].wait_go_ending_session()
3323 def test_dbus_p2p_autogo_pbc(dev
, apdev
):
3324 """D-Bus P2P autonomous GO and PBC"""
3325 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
3326 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
3328 addr0
= dev
[0].p2p_dev_addr()
3330 class TestDbusP2p(TestDbus
):
3331 def __init__(self
, bus
):
3332 TestDbus
.__init
__(self
, bus
)
3334 self
.waiting_end
= False
3337 def __enter__(self
):
3338 gobject
.timeout_add(1, self
.run_test
)
3339 gobject
.timeout_add(15000, self
.timeout
)
3340 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
3342 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
3344 self
.add_signal(self
.groupFinished
, WPAS_DBUS_IFACE_P2PDEVICE
,
3346 self
.add_signal(self
.provisionDiscoveryPBCRequest
,
3347 WPAS_DBUS_IFACE_P2PDEVICE
,
3348 "ProvisionDiscoveryPBCRequest")
3349 self
.add_signal(self
.staAuthorized
, WPAS_DBUS_IFACE
,
3354 def groupStarted(self
, properties
):
3355 logger
.debug("groupStarted: " + str(properties
))
3356 self
.group
= properties
['group_object']
3357 self
.g_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
3358 properties
['interface_object'])
3359 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3360 dev1
.global_request("P2P_CONNECT " + addr0
+ " pbc join")
3362 def groupFinished(self
, properties
):
3363 logger
.debug("groupFinished: " + str(properties
))
3367 def deviceFound(self
, path
):
3368 logger
.debug("deviceFound: path=%s" % path
)
3369 peer_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, path
)
3370 self
.peer
= peer_obj
.GetAll(WPAS_DBUS_P2P_PEER
,
3371 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3373 logger
.debug('peer properties: ' + str(self
.peer
))
3375 def provisionDiscoveryPBCRequest(self
, peer_object
):
3376 logger
.debug("provisionDiscoveryPBCRequest - peer=%s" % peer_object
)
3377 self
.peer_path
= peer_object
3378 peer
= binascii
.unhexlify(peer_object
.split('/')[-1])
3383 addr
+= '%02x' % ord(p
)
3384 params
= { 'Role': 'registrar',
3385 'P2PDeviceAddress': self
.peer
['DeviceAddress'],
3387 logger
.info("Authorize peer to connect to the group")
3388 wps
= dbus
.Interface(self
.g_if_obj
, WPAS_DBUS_IFACE_WPS
)
3391 def staAuthorized(self
, name
):
3392 logger
.debug("staAuthorized: " + name
)
3393 group_p2p
= dbus
.Interface(self
.g_if_obj
,
3394 WPAS_DBUS_IFACE_P2PDEVICE
)
3395 group_p2p
.Disconnect()
3397 def run_test(self
, *args
):
3398 logger
.debug("run_test")
3399 params
= dbus
.Dictionary({'frequency': 2412})
3400 p2p
.GroupAdd(params
)
3406 with
TestDbusP2p(bus
) as t
:
3408 raise Exception("Expected signals not seen")
3410 dev
[1].wait_go_ending_session()
3411 dev
[1].flush_scan_cache()
3413 def test_dbus_p2p_autogo_legacy(dev
, apdev
):
3414 """D-Bus P2P autonomous GO and legacy STA"""
3415 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
3416 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
3418 addr0
= dev
[0].p2p_dev_addr()
3420 class TestDbusP2p(TestDbus
):
3421 def __init__(self
, bus
):
3422 TestDbus
.__init
__(self
, bus
)
3425 def __enter__(self
):
3426 gobject
.timeout_add(1, self
.run_test
)
3427 gobject
.timeout_add(15000, self
.timeout
)
3428 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
3430 self
.add_signal(self
.groupFinished
, WPAS_DBUS_IFACE_P2PDEVICE
,
3432 self
.add_signal(self
.staAuthorized
, WPAS_DBUS_IFACE
,
3437 def groupStarted(self
, properties
):
3438 logger
.debug("groupStarted: " + str(properties
))
3439 g_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
3440 properties
['group_object'])
3441 res
= g_obj
.GetAll(WPAS_DBUS_GROUP
,
3442 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3444 bssid
= ':'.join([binascii
.hexlify(l
) for l
in res
['BSSID']])
3447 params
= { 'Role': 'enrollee',
3450 g_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
3451 properties
['interface_object'])
3452 wps
= dbus
.Interface(g_if_obj
, WPAS_DBUS_IFACE_WPS
)
3454 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3455 dev1
.scan_for_bss(bssid
, freq
=2412)
3456 dev1
.request("WPS_PIN " + bssid
+ " " + pin
)
3457 self
.group_p2p
= dbus
.Interface(g_if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
3459 def groupFinished(self
, properties
):
3460 logger
.debug("groupFinished: " + str(properties
))
3464 def staAuthorized(self
, name
):
3465 logger
.debug("staAuthorized: " + name
)
3466 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3467 dev1
.request("DISCONNECT")
3468 self
.group_p2p
.Disconnect()
3470 def run_test(self
, *args
):
3471 logger
.debug("run_test")
3472 params
= dbus
.Dictionary({'frequency': 2412})
3473 p2p
.GroupAdd(params
)
3479 with
TestDbusP2p(bus
) as t
:
3481 raise Exception("Expected signals not seen")
3483 def test_dbus_p2p_join(dev
, apdev
):
3484 """D-Bus P2P join an autonomous GO"""
3485 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
3486 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
3488 addr1
= dev
[1].p2p_dev_addr()
3489 addr2
= dev
[2].p2p_dev_addr()
3490 dev
[1].p2p_start_go(freq
=2412)
3491 dev1_group_ifname
= dev
[1].group_ifname
3494 class TestDbusP2p(TestDbus
):
3495 def __init__(self
, bus
):
3496 TestDbus
.__init
__(self
, bus
)
3501 def __enter__(self
):
3502 gobject
.timeout_add(1, self
.run_test
)
3503 gobject
.timeout_add(15000, self
.timeout
)
3504 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
3506 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
3508 self
.add_signal(self
.groupFinished
, WPAS_DBUS_IFACE_P2PDEVICE
,
3510 self
.add_signal(self
.invitationResult
, WPAS_DBUS_IFACE_P2PDEVICE
,
3515 def deviceFound(self
, path
):
3516 logger
.debug("deviceFound: path=%s" % path
)
3517 peer_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, path
)
3518 res
= peer_obj
.GetAll(WPAS_DBUS_P2P_PEER
,
3519 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3521 logger
.debug('peer properties: ' + str(res
))
3522 if addr2
.replace(':','') in path
:
3524 elif addr1
.replace(':','') in path
:
3526 if self
.peer
and self
.go
:
3527 logger
.info("Join the group")
3529 args
= { 'peer': self
.go
,
3531 'wps_method': 'pin',
3533 pin
= p2p
.Connect(args
)
3535 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3536 dev1
.group_ifname
= dev1_group_ifname
3537 dev1
.group_request("WPS_PIN any " + pin
)
3539 def groupStarted(self
, properties
):
3540 logger
.debug("groupStarted: " + str(properties
))
3541 g_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
3542 properties
['interface_object'])
3543 role
= g_if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "Role",
3544 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3545 if role
!= "client":
3546 raise Exception("Unexpected role reported: " + role
)
3547 group
= g_if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "Group",
3548 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3549 if group
!= properties
['group_object']:
3550 raise Exception("Unexpected Group reported: " + str(group
))
3551 go
= g_if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "PeerGO",
3552 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3554 raise Exception("Unexpected PeerGO value: " + str(go
))
3556 g_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
3557 properties
['group_object'])
3558 res
= g_obj
.GetAll(WPAS_DBUS_GROUP
,
3559 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3561 logger
.debug("Group properties: " + str(res
))
3563 ext
= dbus
.ByteArray("\x11\x22\x33\x44")
3565 # Set(WPSVendorExtensions) not allowed for P2P Client
3566 g_obj
.Set(WPAS_DBUS_GROUP
, 'WPSVendorExtensions', res
,
3567 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3568 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3569 except dbus
.exceptions
.DBusException
, e
:
3570 if "Error.Failed: Failed to set property" not in str(e
):
3571 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e
))
3573 group_p2p
= dbus
.Interface(g_if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
3574 args
= { 'duration1': 30000, 'interval1': 102400,
3575 'duration2': 20000, 'interval2': 102400 }
3576 group_p2p
.PresenceRequest(args
)
3578 args
= { 'peer': self
.peer
}
3579 group_p2p
.Invite(args
)
3581 def groupFinished(self
, properties
):
3582 logger
.debug("groupFinished: " + str(properties
))
3586 def invitationResult(self
, result
):
3587 logger
.debug("invitationResult: " + str(result
))
3588 if result
['status'] != 1:
3589 raise Exception("Unexpected invitation result: " + str(result
))
3590 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3591 dev1
.group_ifname
= dev1_group_ifname
3594 def run_test(self
, *args
):
3595 logger
.debug("run_test")
3596 p2p
.Find(dbus
.Dictionary({'DiscoveryType': 'social'}))
3602 with
TestDbusP2p(bus
) as t
:
3604 raise Exception("Expected signals not seen")
3606 dev
[2].p2p_stop_find()
3608 def test_dbus_p2p_config(dev
, apdev
):
3609 """D-Bus Get/Set P2PDeviceConfig"""
3611 _test_dbus_p2p_config(dev
, apdev
)
3613 dev
[0].request("P2P_SET ssid_postfix ")
3615 def _test_dbus_p2p_config(dev
, apdev
):
3616 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
3617 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
3619 res
= if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "P2PDeviceConfig",
3620 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3622 if_obj
.Set(WPAS_DBUS_IFACE_P2PDEVICE
, "P2PDeviceConfig", res
,
3623 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3624 res2
= if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "P2PDeviceConfig",
3625 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3628 if len(res
) != len(res2
):
3629 raise Exception("Different number of parameters")
3631 if res
[k
] != res2
[k
]:
3632 raise Exception("Parameter %s value changes" % k
)
3634 changes
= { 'SsidPostfix': 'foo',
3635 'VendorExtension': [ dbus
.ByteArray('\x11\x22\x33\x44') ],
3636 'SecondaryDeviceTypes': [ dbus
.ByteArray('\x11\x22\x33\x44\x55\x66\x77\x88') ]}
3637 if_obj
.Set(WPAS_DBUS_IFACE_P2PDEVICE
, "P2PDeviceConfig",
3638 dbus
.Dictionary(changes
, signature
='sv'),
3639 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3641 res2
= if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "P2PDeviceConfig",
3642 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3644 logger
.debug("P2PDeviceConfig: " + str(res2
))
3645 if 'VendorExtension' not in res2
or len(res2
['VendorExtension']) != 1:
3646 raise Exception("VendorExtension does not match")
3647 if 'SecondaryDeviceTypes' not in res2
or len(res2
['SecondaryDeviceTypes']) != 1:
3648 raise Exception("SecondaryDeviceType does not match")
3650 changes
= { 'SsidPostfix': '',
3651 'VendorExtension': dbus
.Array([], signature
="ay"),
3652 'SecondaryDeviceTypes': dbus
.Array([], signature
="ay") }
3653 if_obj
.Set(WPAS_DBUS_IFACE_P2PDEVICE
, "P2PDeviceConfig",
3654 dbus
.Dictionary(changes
, signature
='sv'),
3655 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3657 res3
= if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "P2PDeviceConfig",
3658 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3660 logger
.debug("P2PDeviceConfig: " + str(res3
))
3661 if 'VendorExtension' in res3
:
3662 raise Exception("VendorExtension not removed")
3663 if 'SecondaryDeviceTypes' in res3
:
3664 raise Exception("SecondaryDeviceType not removed")
3667 dev
[0].request("P2P_SET disabled 1")
3668 if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "P2PDeviceConfig",
3669 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3671 raise Exception("Invalid Get(P2PDeviceConfig) accepted")
3672 except dbus
.exceptions
.DBusException
, e
:
3673 if "Error.Failed: P2P is not available for this interface" not in str(e
):
3674 raise Exception("Unexpected error message for invalid Invite: " + str(e
))
3676 dev
[0].request("P2P_SET disabled 0")
3679 dev
[0].request("P2P_SET disabled 1")
3680 changes
= { 'SsidPostfix': 'foo' }
3681 if_obj
.Set(WPAS_DBUS_IFACE_P2PDEVICE
, "P2PDeviceConfig",
3682 dbus
.Dictionary(changes
, signature
='sv'),
3683 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3684 raise Exception("Invalid Set(P2PDeviceConfig) accepted")
3685 except dbus
.exceptions
.DBusException
, e
:
3686 if "Error.Failed: P2P is not available for this interface" not in str(e
):
3687 raise Exception("Unexpected error message for invalid Invite: " + str(e
))
3689 dev
[0].request("P2P_SET disabled 0")
3691 tests
= [ { 'DeviceName': 123 },
3692 { 'SsidPostfix': 123 },
3694 for changes
in tests
:
3696 if_obj
.Set(WPAS_DBUS_IFACE_P2PDEVICE
, "P2PDeviceConfig",
3697 dbus
.Dictionary(changes
, signature
='sv'),
3698 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3699 raise Exception("Invalid Set(P2PDeviceConfig) accepted")
3700 except dbus
.exceptions
.DBusException
, e
:
3701 if "InvalidArgs" not in str(e
):
3702 raise Exception("Unexpected error message for invalid Invite: " + str(e
))
3704 def test_dbus_p2p_persistent(dev
, apdev
):
3705 """D-Bus P2P persistent group"""
3706 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
3707 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
3709 class TestDbusP2p(TestDbus
):
3710 def __init__(self
, bus
):
3711 TestDbus
.__init
__(self
, bus
)
3713 def __enter__(self
):
3714 gobject
.timeout_add(1, self
.run_test
)
3715 gobject
.timeout_add(15000, self
.timeout
)
3716 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
3718 self
.add_signal(self
.groupFinished
, WPAS_DBUS_IFACE_P2PDEVICE
,
3720 self
.add_signal(self
.persistentGroupAdded
,
3721 WPAS_DBUS_IFACE_P2PDEVICE
,
3722 "PersistentGroupAdded")
3726 def groupStarted(self
, properties
):
3727 logger
.debug("groupStarted: " + str(properties
))
3728 g_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
3729 properties
['interface_object'])
3730 group_p2p
= dbus
.Interface(g_if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
3731 group_p2p
.Disconnect()
3733 def groupFinished(self
, properties
):
3734 logger
.debug("groupFinished: " + str(properties
))
3737 def persistentGroupAdded(self
, path
, properties
):
3738 logger
.debug("persistentGroupAdded: %s %s" % (path
, str(properties
)))
3739 self
.persistent
= path
3741 def run_test(self
, *args
):
3742 logger
.debug("run_test")
3743 params
= dbus
.Dictionary({'persistent': True,
3745 logger
.info("Add a persistent group")
3746 p2p
.GroupAdd(params
)
3752 with
TestDbusP2p(bus
) as t
:
3754 raise Exception("Expected signals not seen")
3755 persistent
= t
.persistent
3757 p_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, persistent
)
3758 res
= p_obj
.Get(WPAS_DBUS_PERSISTENT_GROUP
, "Properties",
3759 dbus_interface
=dbus
.PROPERTIES_IFACE
, byte_arrays
=True)
3760 logger
.info("Persistent group Properties: " + str(res
))
3761 vals
= dbus
.Dictionary({ 'ssid': 'DIRECT-foo' }, signature
='sv')
3762 p_obj
.Set(WPAS_DBUS_PERSISTENT_GROUP
, "Properties", vals
,
3763 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3764 res2
= p_obj
.Get(WPAS_DBUS_PERSISTENT_GROUP
, "Properties",
3765 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3766 if len(res
) != len(res2
):
3767 raise Exception("Different number of parameters")
3769 if k
!= 'ssid' and res
[k
] != res2
[k
]:
3770 raise Exception("Parameter %s value changes" % k
)
3771 if res2
['ssid'] != '"DIRECT-foo"':
3772 raise Exception("Unexpected ssid")
3774 args
= dbus
.Dictionary({ 'ssid': 'DIRECT-testing',
3775 'psk': '1234567890' }, signature
='sv')
3776 group
= p2p
.AddPersistentGroup(args
)
3778 groups
= if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "PersistentGroups",
3779 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3780 if len(groups
) != 2:
3781 raise Exception("Unexpected number of persistent groups: " + str(groups
))
3783 p2p
.RemoveAllPersistentGroups()
3785 groups
= if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "PersistentGroups",
3786 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3787 if len(groups
) != 0:
3788 raise Exception("Unexpected number of persistent groups: " + str(groups
))
3791 p2p
.RemovePersistentGroup(persistent
)
3792 raise Exception("Invalid RemovePersistentGroup accepted")
3793 except dbus
.exceptions
.DBusException
, e
:
3794 if "NetworkUnknown: There is no such persistent group" not in str(e
):
3795 raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e
))
3797 def test_dbus_p2p_reinvoke_persistent(dev
, apdev
):
3798 """D-Bus P2P reinvoke persistent group"""
3799 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
3800 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
3802 addr0
= dev
[0].p2p_dev_addr()
3804 class TestDbusP2p(TestDbus
):
3805 def __init__(self
, bus
):
3806 TestDbus
.__init
__(self
, bus
)
3808 self
.waiting_end
= False
3810 self
.invited
= False
3812 def __enter__(self
):
3813 gobject
.timeout_add(1, self
.run_test
)
3814 gobject
.timeout_add(15000, self
.timeout
)
3815 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
3817 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
3819 self
.add_signal(self
.groupFinished
, WPAS_DBUS_IFACE_P2PDEVICE
,
3821 self
.add_signal(self
.persistentGroupAdded
,
3822 WPAS_DBUS_IFACE_P2PDEVICE
,
3823 "PersistentGroupAdded")
3824 self
.add_signal(self
.provisionDiscoveryRequestDisplayPin
,
3825 WPAS_DBUS_IFACE_P2PDEVICE
,
3826 "ProvisionDiscoveryRequestDisplayPin")
3827 self
.add_signal(self
.staAuthorized
, WPAS_DBUS_IFACE
,
3832 def groupStarted(self
, properties
):
3833 logger
.debug("groupStarted: " + str(properties
))
3834 self
.g_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
3835 properties
['interface_object'])
3836 if not self
.invited
:
3837 g_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
3838 properties
['group_object'])
3839 res
= g_obj
.GetAll(WPAS_DBUS_GROUP
,
3840 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3842 bssid
= ':'.join([binascii
.hexlify(l
) for l
in res
['BSSID']])
3843 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3844 dev1
.scan_for_bss(bssid
, freq
=2412)
3845 dev1
.global_request("P2P_CONNECT " + addr0
+ " 12345670 join")
3847 def groupFinished(self
, properties
):
3848 logger
.debug("groupFinished: " + str(properties
))
3853 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3854 dev1
.global_request("SET persistent_reconnect 1")
3857 args
= { 'persistent_group_object': dbus
.ObjectPath(path
),
3858 'peer': self
.peer_path
}
3860 pin
= p2p
.Invite(args
)
3861 raise Exception("Invalid Invite accepted")
3862 except dbus
.exceptions
.DBusException
, e
:
3863 if "InvalidArgs" not in str(e
):
3864 raise Exception("Unexpected error message for invalid Invite: " + str(e
))
3866 args
= { 'persistent_group_object': self
.persistent
,
3867 'peer': self
.peer_path
}
3868 pin
= p2p
.Invite(args
)
3871 self
.sta_group_ev
= dev1
.wait_global_event(["P2P-GROUP-STARTED"],
3873 if self
.sta_group_ev
is None:
3874 raise Exception("P2P-GROUP-STARTED event not seen")
3876 def persistentGroupAdded(self
, path
, properties
):
3877 logger
.debug("persistentGroupAdded: %s %s" % (path
, str(properties
)))
3878 self
.persistent
= path
3880 def deviceFound(self
, path
):
3881 logger
.debug("deviceFound: path=%s" % path
)
3882 peer_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, path
)
3883 self
.peer
= peer_obj
.GetAll(WPAS_DBUS_P2P_PEER
,
3884 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3887 def provisionDiscoveryRequestDisplayPin(self
, peer_object
, pin
):
3888 logger
.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object
, pin
))
3889 self
.peer_path
= peer_object
3890 peer
= binascii
.unhexlify(peer_object
.split('/')[-1])
3895 addr
+= '%02x' % ord(p
)
3896 params
= { 'Role': 'registrar',
3897 'P2PDeviceAddress': self
.peer
['DeviceAddress'],
3898 'Bssid': self
.peer
['DeviceAddress'],
3901 logger
.info("Authorize peer to connect to the group")
3902 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3903 wps
= dbus
.Interface(self
.g_if_obj
, WPAS_DBUS_IFACE_WPS
)
3905 self
.sta_group_ev
= dev1
.wait_global_event(["P2P-GROUP-STARTED"],
3907 if self
.sta_group_ev
is None:
3908 raise Exception("P2P-GROUP-STARTED event not seen")
3910 def staAuthorized(self
, name
):
3911 logger
.debug("staAuthorized: " + name
)
3912 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3913 dev1
.group_form_result(self
.sta_group_ev
)
3915 ev
= dev1
.wait_global_event(["P2P-GROUP-REMOVED"], timeout
=10)
3917 raise Exception("Group removal timed out")
3918 group_p2p
= dbus
.Interface(self
.g_if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
3919 group_p2p
.Disconnect()
3921 def run_test(self
, *args
):
3922 logger
.debug("run_test")
3923 params
= dbus
.Dictionary({'persistent': True,
3925 logger
.info("Add a persistent group")
3926 p2p
.GroupAdd(params
)
3932 with
TestDbusP2p(bus
) as t
:
3934 raise Exception("Expected signals not seen")
3936 def test_dbus_p2p_go_neg_rx(dev
, apdev
):
3937 """D-Bus P2P GO Negotiation receive"""
3938 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
3939 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
3940 addr0
= dev
[0].p2p_dev_addr()
3942 class TestDbusP2p(TestDbus
):
3943 def __init__(self
, bus
):
3944 TestDbus
.__init
__(self
, bus
)
3947 def __enter__(self
):
3948 gobject
.timeout_add(1, self
.run_test
)
3949 gobject
.timeout_add(15000, self
.timeout
)
3950 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
3952 self
.add_signal(self
.goNegotiationRequest
,
3953 WPAS_DBUS_IFACE_P2PDEVICE
,
3954 "GONegotiationRequest",
3956 self
.add_signal(self
.goNegotiationSuccess
,
3957 WPAS_DBUS_IFACE_P2PDEVICE
,
3958 "GONegotiationSuccess",
3960 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
3962 self
.add_signal(self
.groupFinished
, WPAS_DBUS_IFACE_P2PDEVICE
,
3967 def deviceFound(self
, path
):
3968 logger
.debug("deviceFound: path=%s" % path
)
3970 def goNegotiationRequest(self
, path
, dev_passwd_id
, go_intent
=0):
3971 logger
.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path
, dev_passwd_id
, go_intent
))
3972 if dev_passwd_id
!= 1:
3973 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id
)
3974 args
= { 'peer': path
, 'wps_method': 'display', 'pin': '12345670',
3975 'go_intent': 15, 'persistent': False, 'frequency': 5175 }
3978 raise Exception("Invalid Connect accepted")
3979 except dbus
.exceptions
.DBusException
, e
:
3980 if "ConnectChannelUnsupported" not in str(e
):
3981 raise Exception("Unexpected error message for invalid Connect: " + str(e
))
3983 args
= { 'peer': path
, 'wps_method': 'display', 'pin': '12345670',
3984 'go_intent': 15, 'persistent': False }
3987 def goNegotiationSuccess(self
, properties
):
3988 logger
.debug("goNegotiationSuccess: properties=%s" % str(properties
))
3990 def groupStarted(self
, properties
):
3991 logger
.debug("groupStarted: " + str(properties
))
3992 g_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
3993 properties
['interface_object'])
3994 group_p2p
= dbus
.Interface(g_if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
3995 group_p2p
.Disconnect()
3997 def groupFinished(self
, properties
):
3998 logger
.debug("groupFinished: " + str(properties
))
4002 def run_test(self
, *args
):
4003 logger
.debug("run_test")
4005 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4006 if not dev1
.discover_peer(addr0
):
4007 raise Exception("Peer not found")
4008 dev1
.global_request("P2P_CONNECT " + addr0
+ " 12345670 enter")
4014 with
TestDbusP2p(bus
) as t
:
4016 raise Exception("Expected signals not seen")
4018 def test_dbus_p2p_go_neg_auth(dev
, apdev
):
4019 """D-Bus P2P GO Negotiation authorized"""
4020 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
4021 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
4022 addr0
= dev
[0].p2p_dev_addr()
4025 class TestDbusP2p(TestDbus
):
4026 def __init__(self
, bus
):
4027 TestDbus
.__init
__(self
, bus
)
4029 self
.peer_joined
= False
4030 self
.peer_disconnected
= False
4032 def __enter__(self
):
4033 gobject
.timeout_add(1, self
.run_test
)
4034 gobject
.timeout_add(15000, self
.timeout
)
4035 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
4037 self
.add_signal(self
.goNegotiationSuccess
,
4038 WPAS_DBUS_IFACE_P2PDEVICE
,
4039 "GONegotiationSuccess",
4041 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
4043 self
.add_signal(self
.groupFinished
, WPAS_DBUS_IFACE_P2PDEVICE
,
4045 self
.add_signal(self
.staDeauthorized
, WPAS_DBUS_IFACE
,
4047 self
.add_signal(self
.peerJoined
, WPAS_DBUS_GROUP
,
4049 self
.add_signal(self
.peerDisconnected
, WPAS_DBUS_GROUP
,
4054 def deviceFound(self
, path
):
4055 logger
.debug("deviceFound: path=%s" % path
)
4056 args
= { 'peer': path
, 'wps_method': 'keypad',
4057 'go_intent': 15, 'authorize_only': True }
4060 raise Exception("Invalid Connect accepted")
4061 except dbus
.exceptions
.DBusException
, e
:
4062 if "InvalidArgs" not in str(e
):
4063 raise Exception("Unexpected error message for invalid Connect: " + str(e
))
4065 args
= { 'peer': path
, 'wps_method': 'keypad', 'pin': '12345670',
4066 'go_intent': 15, 'authorize_only': True }
4069 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4070 if not dev1
.discover_peer(addr0
):
4071 raise Exception("Peer not found")
4072 dev1
.global_request("P2P_CONNECT " + addr0
+ " 12345670 display go_intent=0")
4073 ev
= dev1
.wait_global_event(["P2P-GROUP-STARTED"], timeout
=15);
4075 raise Exception("Group formation timed out")
4076 self
.sta_group_ev
= ev
4078 def goNegotiationSuccess(self
, properties
):
4079 logger
.debug("goNegotiationSuccess: properties=%s" % str(properties
))
4081 def groupStarted(self
, properties
):
4082 logger
.debug("groupStarted: " + str(properties
))
4083 self
.g_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
4084 properties
['interface_object'])
4085 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4086 dev1
.group_form_result(self
.sta_group_ev
)
4089 def staDeauthorized(self
, name
):
4090 logger
.debug("staDeuthorized: " + name
)
4091 group_p2p
= dbus
.Interface(self
.g_if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
4092 group_p2p
.Disconnect()
4094 def peerJoined(self
, peer
):
4095 logger
.debug("peerJoined: " + peer
)
4096 self
.peer_joined
= True
4098 def peerDisconnected(self
, peer
):
4099 logger
.debug("peerDisconnected: " + peer
)
4100 self
.peer_disconnected
= True
4102 def groupFinished(self
, properties
):
4103 logger
.debug("groupFinished: " + str(properties
))
4107 def run_test(self
, *args
):
4108 logger
.debug("run_test")
4109 p2p
.Find(dbus
.Dictionary({'DiscoveryType': 'social'}))
4113 return self
.done
and self
.peer_joined
and self
.peer_disconnected
4115 with
TestDbusP2p(bus
) as t
:
4117 raise Exception("Expected signals not seen")
4119 def test_dbus_p2p_go_neg_init(dev
, apdev
):
4120 """D-Bus P2P GO Negotiation initiation"""
4121 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
4122 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
4123 addr0
= dev
[0].p2p_dev_addr()
4126 class TestDbusP2p(TestDbus
):
4127 def __init__(self
, bus
):
4128 TestDbus
.__init
__(self
, bus
)
4130 self
.peer_group_added
= False
4131 self
.peer_group_removed
= False
4133 def __enter__(self
):
4134 gobject
.timeout_add(1, self
.run_test
)
4135 gobject
.timeout_add(15000, self
.timeout
)
4136 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
4138 self
.add_signal(self
.goNegotiationSuccess
,
4139 WPAS_DBUS_IFACE_P2PDEVICE
,
4140 "GONegotiationSuccess",
4142 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
4144 self
.add_signal(self
.groupFinished
, WPAS_DBUS_IFACE_P2PDEVICE
,
4146 self
.add_signal(self
.propertiesChanged
, dbus
.PROPERTIES_IFACE
,
4147 "PropertiesChanged")
4151 def deviceFound(self
, path
):
4152 logger
.debug("deviceFound: path=%s" % path
)
4153 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4154 args
= { 'peer': path
, 'wps_method': 'keypad', 'pin': '12345670',
4158 ev
= dev1
.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout
=15)
4160 raise Exception("Timeout while waiting for GO Neg Request")
4161 dev1
.global_request("P2P_CONNECT " + addr0
+ " 12345670 display go_intent=15")
4162 ev
= dev1
.wait_global_event(["P2P-GROUP-STARTED"], timeout
=15);
4164 raise Exception("Group formation timed out")
4165 self
.sta_group_ev
= ev
4167 def goNegotiationSuccess(self
, properties
):
4168 logger
.debug("goNegotiationSuccess: properties=%s" % str(properties
))
4170 def groupStarted(self
, properties
):
4171 logger
.debug("groupStarted: " + str(properties
))
4172 g_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
4173 properties
['interface_object'])
4174 group_p2p
= dbus
.Interface(g_if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
4175 group_p2p
.Disconnect()
4176 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4177 dev1
.group_form_result(self
.sta_group_ev
)
4180 def groupFinished(self
, properties
):
4181 logger
.debug("groupFinished: " + str(properties
))
4184 def propertiesChanged(self
, interface_name
, changed_properties
,
4185 invalidated_properties
):
4186 logger
.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name
, str(changed_properties
), str(invalidated_properties
)))
4187 if interface_name
!= WPAS_DBUS_P2P_PEER
:
4189 if "Groups" not in changed_properties
:
4191 if len(changed_properties
["Groups"]) > 0:
4192 self
.peer_group_added
= True
4193 if len(changed_properties
["Groups"]) == 0:
4194 self
.peer_group_removed
= True
4197 def run_test(self
, *args
):
4198 logger
.debug("run_test")
4199 p2p
.Find(dbus
.Dictionary({'DiscoveryType': 'social'}))
4203 return self
.done
and self
.peer_group_added
and self
.peer_group_removed
4205 with
TestDbusP2p(bus
) as t
:
4207 raise Exception("Expected signals not seen")
4209 def test_dbus_p2p_group_termination_by_go(dev
, apdev
):
4210 """D-Bus P2P group removal on GO terminating the group"""
4211 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
4212 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
4213 addr0
= dev
[0].p2p_dev_addr()
4216 class TestDbusP2p(TestDbus
):
4217 def __init__(self
, bus
):
4218 TestDbus
.__init
__(self
, bus
)
4220 self
.peer_group_added
= False
4221 self
.peer_group_removed
= False
4223 def __enter__(self
):
4224 gobject
.timeout_add(1, self
.run_test
)
4225 gobject
.timeout_add(15000, self
.timeout
)
4226 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
4228 self
.add_signal(self
.goNegotiationSuccess
,
4229 WPAS_DBUS_IFACE_P2PDEVICE
,
4230 "GONegotiationSuccess",
4232 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
4234 self
.add_signal(self
.groupFinished
, WPAS_DBUS_IFACE_P2PDEVICE
,
4236 self
.add_signal(self
.propertiesChanged
, dbus
.PROPERTIES_IFACE
,
4237 "PropertiesChanged")
4241 def deviceFound(self
, path
):
4242 logger
.debug("deviceFound: path=%s" % path
)
4243 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4244 args
= { 'peer': path
, 'wps_method': 'keypad', 'pin': '12345670',
4248 ev
= dev1
.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout
=15)
4250 raise Exception("Timeout while waiting for GO Neg Request")
4251 dev1
.global_request("P2P_CONNECT " + addr0
+ " 12345670 display go_intent=15")
4252 ev
= dev1
.wait_global_event(["P2P-GROUP-STARTED"], timeout
=15);
4254 raise Exception("Group formation timed out")
4255 self
.sta_group_ev
= ev
4257 def goNegotiationSuccess(self
, properties
):
4258 logger
.debug("goNegotiationSuccess: properties=%s" % str(properties
))
4260 def groupStarted(self
, properties
):
4261 logger
.debug("groupStarted: " + str(properties
))
4262 g_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
4263 properties
['interface_object'])
4264 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4265 dev1
.group_form_result(self
.sta_group_ev
)
4268 def groupFinished(self
, properties
):
4269 logger
.debug("groupFinished: " + str(properties
))
4272 def propertiesChanged(self
, interface_name
, changed_properties
,
4273 invalidated_properties
):
4274 logger
.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name
, str(changed_properties
), str(invalidated_properties
)))
4275 if interface_name
!= WPAS_DBUS_P2P_PEER
:
4277 if "Groups" not in changed_properties
:
4279 if len(changed_properties
["Groups"]) > 0:
4280 self
.peer_group_added
= True
4281 if len(changed_properties
["Groups"]) == 0 and self
.peer_group_added
:
4282 self
.peer_group_removed
= True
4285 def run_test(self
, *args
):
4286 logger
.debug("run_test")
4287 p2p
.Find(dbus
.Dictionary({'DiscoveryType': 'social'}))
4291 return self
.done
and self
.peer_group_added
and self
.peer_group_removed
4293 with
TestDbusP2p(bus
) as t
:
4295 raise Exception("Expected signals not seen")
4297 def test_dbus_p2p_group_idle_timeout(dev
, apdev
):
4298 """D-Bus P2P group removal on idle timeout"""
4300 dev
[0].global_request("SET p2p_group_idle 1")
4301 _test_dbus_p2p_group_idle_timeout(dev
, apdev
)
4303 dev
[0].global_request("SET p2p_group_idle 0")
4305 def _test_dbus_p2p_group_idle_timeout(dev
, apdev
):
4306 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
4307 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
4308 addr0
= dev
[0].p2p_dev_addr()
4311 class TestDbusP2p(TestDbus
):
4312 def __init__(self
, bus
):
4313 TestDbus
.__init
__(self
, bus
)
4315 self
.peer_group_added
= False
4316 self
.peer_group_removed
= False
4318 def __enter__(self
):
4319 gobject
.timeout_add(1, self
.run_test
)
4320 gobject
.timeout_add(15000, self
.timeout
)
4321 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
4323 self
.add_signal(self
.goNegotiationSuccess
,
4324 WPAS_DBUS_IFACE_P2PDEVICE
,
4325 "GONegotiationSuccess",
4327 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
4329 self
.add_signal(self
.groupFinished
, WPAS_DBUS_IFACE_P2PDEVICE
,
4331 self
.add_signal(self
.propertiesChanged
, dbus
.PROPERTIES_IFACE
,
4332 "PropertiesChanged")
4336 def deviceFound(self
, path
):
4337 logger
.debug("deviceFound: path=%s" % path
)
4338 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4339 args
= { 'peer': path
, 'wps_method': 'keypad', 'pin': '12345670',
4343 ev
= dev1
.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout
=15)
4345 raise Exception("Timeout while waiting for GO Neg Request")
4346 dev1
.global_request("P2P_CONNECT " + addr0
+ " 12345670 display go_intent=15")
4347 ev
= dev1
.wait_global_event(["P2P-GROUP-STARTED"], timeout
=15);
4349 raise Exception("Group formation timed out")
4350 self
.sta_group_ev
= ev
4352 def goNegotiationSuccess(self
, properties
):
4353 logger
.debug("goNegotiationSuccess: properties=%s" % str(properties
))
4355 def groupStarted(self
, properties
):
4356 logger
.debug("groupStarted: " + str(properties
))
4357 g_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
4358 properties
['interface_object'])
4359 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4360 dev1
.group_form_result(self
.sta_group_ev
)
4361 ifaddr
= dev1
.group_request("STA-FIRST").splitlines()[0]
4362 # Force disassociation with different reason code so that the
4363 # P2P Client using D-Bus does not get normal group termination event
4365 dev1
.group_request("DEAUTHENTICATE " + ifaddr
+ " reason=0 test=0")
4368 def groupFinished(self
, properties
):
4369 logger
.debug("groupFinished: " + str(properties
))
4372 def propertiesChanged(self
, interface_name
, changed_properties
,
4373 invalidated_properties
):
4374 logger
.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name
, str(changed_properties
), str(invalidated_properties
)))
4375 if interface_name
!= WPAS_DBUS_P2P_PEER
:
4377 if "Groups" not in changed_properties
:
4379 if len(changed_properties
["Groups"]) > 0:
4380 self
.peer_group_added
= True
4381 if len(changed_properties
["Groups"]) == 0:
4382 self
.peer_group_removed
= True
4385 def run_test(self
, *args
):
4386 logger
.debug("run_test")
4387 p2p
.Find(dbus
.Dictionary({'DiscoveryType': 'social'}))
4391 return self
.done
and self
.peer_group_added
and self
.peer_group_removed
4393 with
TestDbusP2p(bus
) as t
:
4395 raise Exception("Expected signals not seen")
4397 def test_dbus_p2p_wps_failure(dev
, apdev
):
4398 """D-Bus P2P WPS failure"""
4399 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
4400 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
4401 addr0
= dev
[0].p2p_dev_addr()
4403 class TestDbusP2p(TestDbus
):
4404 def __init__(self
, bus
):
4405 TestDbus
.__init
__(self
, bus
)
4406 self
.wps_failed
= False
4407 self
.formation_failure
= False
4409 def __enter__(self
):
4410 gobject
.timeout_add(1, self
.run_test
)
4411 gobject
.timeout_add(15000, self
.timeout
)
4412 self
.add_signal(self
.goNegotiationRequest
,
4413 WPAS_DBUS_IFACE_P2PDEVICE
,
4414 "GONegotiationRequest",
4416 self
.add_signal(self
.goNegotiationSuccess
,
4417 WPAS_DBUS_IFACE_P2PDEVICE
,
4418 "GONegotiationSuccess",
4420 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
4422 self
.add_signal(self
.wpsFailed
, WPAS_DBUS_IFACE_P2PDEVICE
,
4424 self
.add_signal(self
.groupFormationFailure
,
4425 WPAS_DBUS_IFACE_P2PDEVICE
,
4426 "GroupFormationFailure")
4430 def goNegotiationRequest(self
, path
, dev_passwd_id
, go_intent
=0):
4431 logger
.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path
, dev_passwd_id
, go_intent
))
4432 if dev_passwd_id
!= 1:
4433 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id
)
4434 args
= { 'peer': path
, 'wps_method': 'display', 'pin': '12345670',
4438 def goNegotiationSuccess(self
, properties
):
4439 logger
.debug("goNegotiationSuccess: properties=%s" % str(properties
))
4441 def groupStarted(self
, properties
):
4442 logger
.debug("groupStarted: " + str(properties
))
4443 raise Exception("Unexpected GroupStarted")
4445 def wpsFailed(self
, name
, args
):
4446 logger
.debug("wpsFailed - name=%s args=%s" % (name
, str(args
)))
4447 self
.wps_failed
= True
4448 if self
.formation_failure
:
4451 def groupFormationFailure(self
, reason
):
4452 logger
.debug("groupFormationFailure - reason=%s" % reason
)
4453 self
.formation_failure
= True
4457 def run_test(self
, *args
):
4458 logger
.debug("run_test")
4460 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4461 if not dev1
.discover_peer(addr0
):
4462 raise Exception("Peer not found")
4463 dev1
.global_request("P2P_CONNECT " + addr0
+ " 87654321 enter")
4467 return self
.wps_failed
and self
.formation_failure
4469 with
TestDbusP2p(bus
) as t
:
4471 raise Exception("Expected signals not seen")
4473 def test_dbus_p2p_two_groups(dev
, apdev
):
4474 """D-Bus P2P with two concurrent groups"""
4475 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
4476 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
4478 dev
[0].request("SET p2p_no_group_iface 0")
4479 addr0
= dev
[0].p2p_dev_addr()
4480 addr1
= dev
[1].p2p_dev_addr()
4481 addr2
= dev
[2].p2p_dev_addr()
4482 dev
[1].p2p_start_go(freq
=2412)
4483 dev1_group_ifname
= dev
[1].group_ifname
4485 class TestDbusP2p(TestDbus
):
4486 def __init__(self
, bus
):
4487 TestDbus
.__init
__(self
, bus
)
4493 self
.groups_removed
= False
4495 def __enter__(self
):
4496 gobject
.timeout_add(1, self
.run_test
)
4497 gobject
.timeout_add(15000, self
.timeout
)
4498 self
.add_signal(self
.propertiesChanged
, dbus
.PROPERTIES_IFACE
,
4499 "PropertiesChanged", byte_arrays
=True)
4500 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
4502 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
4504 self
.add_signal(self
.groupFinished
, WPAS_DBUS_IFACE_P2PDEVICE
,
4506 self
.add_signal(self
.peerJoined
, WPAS_DBUS_GROUP
,
4511 def propertiesChanged(self
, interface_name
, changed_properties
,
4512 invalidated_properties
):
4513 logger
.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name
, str(changed_properties
), str(invalidated_properties
)))
4515 def deviceFound(self
, path
):
4516 logger
.debug("deviceFound: path=%s" % path
)
4517 if addr2
.replace(':','') in path
:
4519 elif addr1
.replace(':','') in path
:
4521 if self
.go
and not self
.group1
:
4522 logger
.info("Join the group")
4525 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4526 dev1
.group_ifname
= dev1_group_ifname
4527 dev1
.group_request("WPS_PIN any " + pin
)
4528 args
= { 'peer': self
.go
,
4530 'wps_method': 'pin',
4535 def groupStarted(self
, properties
):
4536 logger
.debug("groupStarted: " + str(properties
))
4537 prop
= if_obj
.GetAll(WPAS_DBUS_IFACE_P2PDEVICE
,
4538 dbus_interface
=dbus
.PROPERTIES_IFACE
)
4539 logger
.debug("p2pdevice properties: " + str(prop
))
4541 g_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
4542 properties
['group_object'])
4543 res
= g_obj
.GetAll(WPAS_DBUS_GROUP
,
4544 dbus_interface
=dbus
.PROPERTIES_IFACE
,
4546 logger
.debug("Group properties: " + str(res
))
4549 self
.group1
= properties
['group_object']
4550 self
.group1iface
= properties
['interface_object']
4551 self
.g1_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
4554 logger
.info("Start autonomous GO")
4555 params
= dbus
.Dictionary({ 'frequency': 2412 })
4556 p2p
.GroupAdd(params
)
4557 elif not self
.group2
:
4558 self
.group2
= properties
['group_object']
4559 self
.group2iface
= properties
['interface_object']
4560 self
.g2_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
4562 self
.g2_bssid
= res
['BSSID']
4564 if self
.group1
and self
.group2
:
4565 logger
.info("Authorize peer to join the group")
4566 a2
= binascii
.unhexlify(addr2
.replace(':',''))
4567 params
= { 'Role': 'enrollee',
4568 'P2PDeviceAddress': dbus
.ByteArray(a2
),
4569 'Bssid': dbus
.ByteArray(a2
),
4572 g_wps
= dbus
.Interface(self
.g2_if_obj
, WPAS_DBUS_IFACE_WPS
)
4575 bssid
= ':'.join([binascii
.hexlify(l
) for l
in self
.g2_bssid
])
4576 dev2
= WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
4577 dev2
.scan_for_bss(bssid
, freq
=2412)
4578 dev2
.global_request("P2P_CONNECT " + bssid
+ " 12345670 join freq=2412")
4579 ev
= dev2
.wait_global_event(["P2P-GROUP-STARTED"], timeout
=15);
4581 raise Exception("Group join timed out")
4582 self
.dev2_group_ev
= ev
4584 def groupFinished(self
, properties
):
4585 logger
.debug("groupFinished: " + str(properties
))
4587 if self
.group1
== properties
['group_object']:
4589 elif self
.group2
== properties
['group_object']:
4592 if not self
.group1
and not self
.group2
:
4596 def peerJoined(self
, peer
):
4597 logger
.debug("peerJoined: " + peer
)
4598 if self
.groups_removed
:
4600 self
.check_results()
4602 dev2
= WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
4603 dev2
.group_form_result(self
.dev2_group_ev
)
4606 logger
.info("Disconnect group2")
4607 group_p2p
= dbus
.Interface(self
.g2_if_obj
,
4608 WPAS_DBUS_IFACE_P2PDEVICE
)
4609 group_p2p
.Disconnect()
4611 logger
.info("Disconnect group1")
4612 group_p2p
= dbus
.Interface(self
.g1_if_obj
,
4613 WPAS_DBUS_IFACE_P2PDEVICE
)
4614 group_p2p
.Disconnect()
4615 self
.groups_removed
= True
4617 def check_results(self
):
4618 logger
.info("Check results with two concurrent groups in operation")
4620 g1_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, self
.group1
)
4621 res1
= g1_obj
.GetAll(WPAS_DBUS_GROUP
,
4622 dbus_interface
=dbus
.PROPERTIES_IFACE
,
4625 g2_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, self
.group2
)
4626 res2
= g2_obj
.GetAll(WPAS_DBUS_GROUP
,
4627 dbus_interface
=dbus
.PROPERTIES_IFACE
,
4630 logger
.info("group1 = " + self
.group1
)
4631 logger
.debug("Group properties: " + str(res1
))
4633 logger
.info("group2 = " + self
.group2
)
4634 logger
.debug("Group properties: " + str(res2
))
4636 prop
= if_obj
.GetAll(WPAS_DBUS_IFACE_P2PDEVICE
,
4637 dbus_interface
=dbus
.PROPERTIES_IFACE
)
4638 logger
.debug("p2pdevice properties: " + str(prop
))
4640 if res1
['Role'] != 'client':
4641 raise Exception("Group1 role reported incorrectly: " + res1
['Role'])
4642 if res2
['Role'] != 'GO':
4643 raise Exception("Group2 role reported incorrectly: " + res2
['Role'])
4644 if prop
['Role'] != 'device':
4645 raise Exception("p2pdevice role reported incorrectly: " + prop
['Role'])
4647 if len(res2
['Members']) != 1:
4648 raise Exception("Unexpected Members value for group 2")
4650 def run_test(self
, *args
):
4651 logger
.debug("run_test")
4652 p2p
.Find(dbus
.Dictionary({'DiscoveryType': 'social'}))
4658 with
TestDbusP2p(bus
) as t
:
4660 raise Exception("Expected signals not seen")
4662 dev
[1].remove_group()
4664 def test_dbus_p2p_cancel(dev
, apdev
):
4665 """D-Bus P2P Cancel"""
4666 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
4667 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
4670 raise Exception("Unexpected p2p.Cancel() success")
4671 except dbus
.exceptions
.DBusException
, e
:
4674 addr0
= dev
[0].p2p_dev_addr()
4677 class TestDbusP2p(TestDbus
):
4678 def __init__(self
, bus
):
4679 TestDbus
.__init
__(self
, bus
)
4682 def __enter__(self
):
4683 gobject
.timeout_add(1, self
.run_test
)
4684 gobject
.timeout_add(15000, self
.timeout
)
4685 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
4690 def deviceFound(self
, path
):
4691 logger
.debug("deviceFound: path=%s" % path
)
4692 args
= { 'peer': path
, 'wps_method': 'keypad', 'pin': '12345670',
4699 def run_test(self
, *args
):
4700 logger
.debug("run_test")
4701 p2p
.Find(dbus
.Dictionary({'DiscoveryType': 'social'}))
4707 with
TestDbusP2p(bus
) as t
:
4709 raise Exception("Expected signals not seen")
4711 def test_dbus_introspect(dev
, apdev
):
4712 """D-Bus introspection"""
4713 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
4715 res
= if_obj
.Introspect(WPAS_DBUS_IFACE
,
4716 dbus_interface
=dbus
.INTROSPECTABLE_IFACE
)
4717 logger
.info("Initial Introspect: " + str(res
))
4718 if res
is None or "Introspectable" not in res
or "GroupStarted" not in res
:
4719 raise Exception("Unexpected initial Introspect response: " + str(res
))
4721 with
alloc_fail(dev
[0], 1, "wpa_dbus_introspect"):
4722 res2
= if_obj
.Introspect(WPAS_DBUS_IFACE
,
4723 dbus_interface
=dbus
.INTROSPECTABLE_IFACE
)
4724 logger
.info("Introspect: " + str(res2
))
4725 if res2
is not None:
4726 raise Exception("Unexpected Introspect response")
4728 with
alloc_fail(dev
[0], 1, "=add_interface;wpa_dbus_introspect"):
4729 res2
= if_obj
.Introspect(WPAS_DBUS_IFACE
,
4730 dbus_interface
=dbus
.INTROSPECTABLE_IFACE
)
4731 logger
.info("Introspect: " + str(res2
))
4733 raise Exception("No Introspect response")
4734 if len(res2
) >= len(res
):
4735 raise Exception("Unexpected Introspect response")
4737 with
alloc_fail(dev
[0], 1, "wpabuf_alloc;add_interface;wpa_dbus_introspect"):
4738 res2
= if_obj
.Introspect(WPAS_DBUS_IFACE
,
4739 dbus_interface
=dbus
.INTROSPECTABLE_IFACE
)
4740 logger
.info("Introspect: " + str(res2
))
4742 raise Exception("No Introspect response")
4743 if len(res2
) >= len(res
):
4744 raise Exception("Unexpected Introspect response")
4746 with
alloc_fail(dev
[0], 2, "=add_interface;wpa_dbus_introspect"):
4747 res2
= if_obj
.Introspect(WPAS_DBUS_IFACE
,
4748 dbus_interface
=dbus
.INTROSPECTABLE_IFACE
)
4749 logger
.info("Introspect: " + str(res2
))
4751 raise Exception("No Introspect response")
4752 if len(res2
) >= len(res
):
4753 raise Exception("Unexpected Introspect response")
4755 def test_dbus_ap(dev
, apdev
):
4756 """D-Bus AddNetwork for AP mode"""
4757 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
4758 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
4760 ssid
= "test-wpa2-psk"
4761 passphrase
= 'qwertyuiop'
4763 class TestDbusConnect(TestDbus
):
4764 def __init__(self
, bus
):
4765 TestDbus
.__init
__(self
, bus
)
4766 self
.started
= False
4768 def __enter__(self
):
4769 gobject
.timeout_add(1, self
.run_connect
)
4770 gobject
.timeout_add(15000, self
.timeout
)
4771 self
.add_signal(self
.networkAdded
, WPAS_DBUS_IFACE
, "NetworkAdded")
4772 self
.add_signal(self
.networkSelected
, WPAS_DBUS_IFACE
,
4774 self
.add_signal(self
.propertiesChanged
, WPAS_DBUS_IFACE
,
4775 "PropertiesChanged")
4779 def networkAdded(self
, network
, properties
):
4780 logger
.debug("networkAdded: %s" % str(network
))
4781 logger
.debug(str(properties
))
4783 def networkSelected(self
, network
):
4784 logger
.debug("networkSelected: %s" % str(network
))
4785 self
.network_selected
= True
4787 def propertiesChanged(self
, properties
):
4788 logger
.debug("propertiesChanged: %s" % str(properties
))
4789 if 'State' in properties
and properties
['State'] == "completed":
4793 def run_connect(self
, *args
):
4794 logger
.debug("run_connect")
4795 args
= dbus
.Dictionary({ 'ssid': ssid
,
4796 'key_mgmt': 'WPA-PSK',
4799 'frequency': 2412 },
4801 self
.netw
= iface
.AddNetwork(args
)
4802 iface
.SelectNetwork(self
.netw
)
4808 with
TestDbusConnect(bus
) as t
:
4810 raise Exception("Expected signals not seen")
4811 dev
[1].connect(ssid
, psk
=passphrase
, scan_freq
="2412")
4813 def test_dbus_connect_wpa_eap(dev
, apdev
):
4814 """D-Bus AddNetwork and connection with WPA+WPA2-Enterprise AP"""
4815 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
4816 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
4818 ssid
= "test-wpa-eap"
4819 params
= hostapd
.wpa_eap_params(ssid
=ssid
)
4821 params
["rsn_pairwise"] = "CCMP"
4822 hapd
= hostapd
.add_ap(apdev
[0]['ifname'], params
)
4824 class TestDbusConnect(TestDbus
):
4825 def __init__(self
, bus
):
4826 TestDbus
.__init
__(self
, bus
)
4829 def __enter__(self
):
4830 gobject
.timeout_add(1, self
.run_connect
)
4831 gobject
.timeout_add(15000, self
.timeout
)
4832 self
.add_signal(self
.propertiesChanged
, WPAS_DBUS_IFACE
,
4833 "PropertiesChanged")
4834 self
.add_signal(self
.eap
, WPAS_DBUS_IFACE
, "EAP")
4838 def propertiesChanged(self
, properties
):
4839 logger
.debug("propertiesChanged: %s" % str(properties
))
4840 if 'State' in properties
and properties
['State'] == "completed":
4844 def eap(self
, status
, parameter
):
4845 logger
.debug("EAP: status=%s parameter=%s" % (status
, parameter
))
4847 def run_connect(self
, *args
):
4848 logger
.debug("run_connect")
4849 args
= dbus
.Dictionary({ 'ssid': ssid
,
4850 'key_mgmt': 'WPA-EAP',
4853 'password': 'password',
4854 'ca_cert': 'auth_serv/ca.pem',
4855 'phase2': 'auth=MSCHAPV2',
4856 'scan_freq': 2412 },
4858 self
.netw
= iface
.AddNetwork(args
)
4859 iface
.SelectNetwork(self
.netw
)
4865 with
TestDbusConnect(bus
) as t
:
4867 raise Exception("Expected signals not seen")
4869 def test_dbus_ap_scan_2_ap_mode_scan(dev
, apdev
):
4870 """AP_SCAN 2 AP mode and D-Bus Scan()"""
4872 _test_dbus_ap_scan_2_ap_mode_scan(dev
, apdev
)
4874 dev
[0].request("AP_SCAN 1")
4876 def _test_dbus_ap_scan_2_ap_mode_scan(dev
, apdev
):
4877 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
4878 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
4880 if "OK" not in dev
[0].request("AP_SCAN 2"):
4881 raise Exception("Failed to set AP_SCAN 2")
4883 id = dev
[0].add_network()
4884 dev
[0].set_network(id, "mode", "2")
4885 dev
[0].set_network_quoted(id, "ssid", "wpas-ap-open")
4886 dev
[0].set_network(id, "key_mgmt", "NONE")
4887 dev
[0].set_network(id, "frequency", "2412")
4888 dev
[0].set_network(id, "scan_freq", "2412")
4889 dev
[0].set_network(id, "disabled", "0")
4890 dev
[0].select_network(id)
4891 ev
= dev
[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout
=5)
4893 raise Exception("AP failed to start")
4895 with
fail_test(dev
[0], 1, "wpa_driver_nl80211_scan"):
4896 iface
.Scan({'Type': 'active',
4898 'Channels': [(dbus
.UInt32(2412), dbus
.UInt32(20))]})
4899 ev
= dev
[0].wait_event(["CTRL-EVENT-SCAN-FAILED",
4900 "AP-DISABLED"], timeout
=5)
4902 raise Exception("CTRL-EVENT-SCAN-FAILED not seen")
4903 if "AP-DISABLED" in ev
:
4904 raise Exception("Unexpected AP-DISABLED event")
4906 # Wait for the retry to scan happen
4907 ev
= dev
[0].wait_event(["CTRL-EVENT-SCAN-FAILED",
4908 "AP-DISABLED"], timeout
=5)
4910 raise Exception("CTRL-EVENT-SCAN-FAILED not seen - retry")
4911 if "AP-DISABLED" in ev
:
4912 raise Exception("Unexpected AP-DISABLED event - retry")
4914 dev
[1].connect("wpas-ap-open", key_mgmt
="NONE", scan_freq
="2412")
4915 dev
[1].request("DISCONNECT")
4916 dev
[1].wait_disconnected()
4917 dev
[0].request("DISCONNECT")
4918 dev
[0].wait_disconnected()