1 # wpa_supplicant D-Bus interface tests
2 # Copyright (c) 2014-2015, Jouni Malinen <j@w1.fi>
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
9 logger
= logging
.getLogger()
21 from wpasupplicant
import WpaSupplicant
22 from utils
import HwsimSkip
, alloc_fail
, fail_test
23 from p2p_utils
import *
24 from test_ap_tdls
import connect_2sta_open
25 from test_ap_eap
import check_altsubject_match_support
27 WPAS_DBUS_SERVICE
= "fi.w1.wpa_supplicant1"
28 WPAS_DBUS_PATH
= "/fi/w1/wpa_supplicant1"
29 WPAS_DBUS_IFACE
= "fi.w1.wpa_supplicant1.Interface"
30 WPAS_DBUS_IFACE_WPS
= WPAS_DBUS_IFACE
+ ".WPS"
31 WPAS_DBUS_NETWORK
= "fi.w1.wpa_supplicant1.Network"
32 WPAS_DBUS_BSS
= "fi.w1.wpa_supplicant1.BSS"
33 WPAS_DBUS_IFACE_P2PDEVICE
= WPAS_DBUS_IFACE
+ ".P2PDevice"
34 WPAS_DBUS_P2P_PEER
= "fi.w1.wpa_supplicant1.Peer"
35 WPAS_DBUS_GROUP
= "fi.w1.wpa_supplicant1.Group"
36 WPAS_DBUS_PERSISTENT_GROUP
= "fi.w1.wpa_supplicant1.PersistentGroup"
38 def prepare_dbus(dev
):
40 logger
.info("No dbus module available")
41 raise HwsimSkip("No dbus module available")
43 from dbus
.mainloop
.glib
import DBusGMainLoop
44 dbus
.mainloop
.glib
.DBusGMainLoop(set_as_default
=True)
45 bus
= dbus
.SystemBus()
46 wpas_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, WPAS_DBUS_PATH
)
47 wpas
= dbus
.Interface(wpas_obj
, WPAS_DBUS_SERVICE
)
48 path
= wpas
.GetInterface(dev
.ifname
)
49 if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, path
)
50 return (bus
,wpas_obj
,path
,if_obj
)
52 raise HwsimSkip("Could not connect to D-Bus: %s" % e
)
54 class TestDbus(object):
55 def __init__(self
, bus
):
56 self
.loop
= gobject
.MainLoop()
60 def __exit__(self
, type, value
, traceback
):
61 for s
in self
.signals
:
64 def add_signal(self
, handler
, interface
, name
, byte_arrays
=False):
65 s
= self
.bus
.add_signal_receiver(handler
, dbus_interface
=interface
,
67 byte_arrays
=byte_arrays
)
68 self
.signals
.append(s
)
70 def timeout(self
, *args
):
71 logger
.debug("timeout")
75 class alloc_fail_dbus(object):
76 def __init__(self
, dev
, count
, funcs
, operation
="Operation",
81 self
._operation
= operation
82 self
._expected
= expected
84 cmd
= "TEST_ALLOC_FAIL %d:%s" % (self
._count
, self
._funcs
)
85 if "OK" not in self
._dev
.request(cmd
):
86 raise HwsimSkip("TEST_ALLOC_FAIL not supported")
87 def __exit__(self
, type, value
, traceback
):
89 raise Exception("%s succeeded during out-of-memory" % self
._operation
)
90 if type == dbus
.exceptions
.DBusException
and self
._expected
in str(value
):
92 if self
._dev
.request("GET_ALLOC_FAIL") != "0:%s" % self
._funcs
:
93 raise Exception("%s did not trigger allocation failure" % self
._operation
)
96 def start_ap(ap
, ssid
="test-wps",
97 ap_uuid
="27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"):
98 params
= { "ssid": ssid
, "eap_server": "1", "wps_state": "2",
99 "wpa_passphrase": "12345678", "wpa": "2",
100 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
101 "ap_pin": "12345670", "uuid": ap_uuid
}
102 return hostapd
.add_ap(ap
['ifname'], params
)
104 def test_dbus_getall(dev
, apdev
):
106 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
108 props
= wpas_obj
.GetAll(WPAS_DBUS_SERVICE
,
109 dbus_interface
=dbus
.PROPERTIES_IFACE
)
110 logger
.debug("GetAll(fi.w1.wpa.supplicant1, /fi/w1/wpa_supplicant1) ==> " + str(props
))
112 props
= if_obj
.GetAll(WPAS_DBUS_IFACE
,
113 dbus_interface
=dbus
.PROPERTIES_IFACE
)
114 logger
.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE
, path
, str(props
)))
116 props
= if_obj
.GetAll(WPAS_DBUS_IFACE_WPS
,
117 dbus_interface
=dbus
.PROPERTIES_IFACE
)
118 logger
.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE_WPS
, path
, str(props
)))
120 res
= if_obj
.Get(WPAS_DBUS_IFACE
, 'BSSs',
121 dbus_interface
=dbus
.PROPERTIES_IFACE
)
123 raise Exception("Unexpected BSSs entry: " + str(res
))
125 res
= if_obj
.Get(WPAS_DBUS_IFACE
, 'Networks',
126 dbus_interface
=dbus
.PROPERTIES_IFACE
)
128 raise Exception("Unexpected Networks entry: " + str(res
))
130 hapd
= hostapd
.add_ap(apdev
[0]['ifname'], { "ssid": "open" })
131 bssid
= apdev
[0]['bssid']
132 dev
[0].scan_for_bss(bssid
, freq
=2412)
133 id = dev
[0].add_network()
134 dev
[0].set_network(id, "disabled", "0")
135 dev
[0].set_network_quoted(id, "ssid", "test")
137 res
= if_obj
.Get(WPAS_DBUS_IFACE
, 'BSSs',
138 dbus_interface
=dbus
.PROPERTIES_IFACE
)
140 raise Exception("Missing BSSs entry: " + str(res
))
141 bss_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, res
[0])
142 props
= bss_obj
.GetAll(WPAS_DBUS_BSS
, dbus_interface
=dbus
.PROPERTIES_IFACE
)
143 logger
.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS
, res
[0], str(props
)))
145 for item
in props
['BSSID']:
146 if len(bssid_str
) > 0:
148 bssid_str
+= '%02x' % item
149 if bssid_str
!= bssid
:
150 raise Exception("Unexpected BSSID in BSSs entry")
152 res
= if_obj
.Get(WPAS_DBUS_IFACE
, 'Networks',
153 dbus_interface
=dbus
.PROPERTIES_IFACE
)
155 raise Exception("Missing Networks entry: " + str(res
))
156 net_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, res
[0])
157 props
= net_obj
.GetAll(WPAS_DBUS_NETWORK
,
158 dbus_interface
=dbus
.PROPERTIES_IFACE
)
159 logger
.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_NETWORK
, res
[0], str(props
)))
160 ssid
= props
['Properties']['ssid']
162 raise Exception("Unexpected SSID in network entry")
164 def dbus_get(dbus
, wpas_obj
, prop
, expect
=None, byte_arrays
=False):
165 val
= wpas_obj
.Get(WPAS_DBUS_SERVICE
, prop
,
166 dbus_interface
=dbus
.PROPERTIES_IFACE
,
167 byte_arrays
=byte_arrays
)
168 if expect
is not None and val
!= expect
:
169 raise Exception("Unexpected %s: %s (expected: %s)" %
170 (prop
, str(val
), str(expect
)))
173 def dbus_set(dbus
, wpas_obj
, prop
, val
):
174 wpas_obj
.Set(WPAS_DBUS_SERVICE
, prop
, val
,
175 dbus_interface
=dbus
.PROPERTIES_IFACE
)
177 def test_dbus_properties(dev
, apdev
):
178 """D-Bus Get/Set fi.w1.wpa_supplicant1 properties"""
179 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
181 dbus_get(dbus
, wpas_obj
, "DebugLevel", expect
="msgdump")
182 dbus_set(dbus
, wpas_obj
, "DebugLevel", "debug")
183 dbus_get(dbus
, wpas_obj
, "DebugLevel", expect
="debug")
184 for (val
,err
) in [ (3, "Error.Failed: wrong property type"),
185 ("foo", "Error.Failed: wrong debug level value") ]:
187 dbus_set(dbus
, wpas_obj
, "DebugLevel", val
)
188 raise Exception("Invalid DebugLevel value accepted: " + str(val
))
189 except dbus
.exceptions
.DBusException
, e
:
190 if err
not in str(e
):
191 raise Exception("Unexpected error message: " + str(e
))
192 dbus_set(dbus
, wpas_obj
, "DebugLevel", "msgdump")
193 dbus_get(dbus
, wpas_obj
, "DebugLevel", expect
="msgdump")
195 dbus_get(dbus
, wpas_obj
, "DebugTimestamp", expect
=True)
196 dbus_set(dbus
, wpas_obj
, "DebugTimestamp", False)
197 dbus_get(dbus
, wpas_obj
, "DebugTimestamp", expect
=False)
199 dbus_set(dbus
, wpas_obj
, "DebugTimestamp", "foo")
200 raise Exception("Invalid DebugTimestamp value accepted")
201 except dbus
.exceptions
.DBusException
, e
:
202 if "Error.Failed: wrong property type" not in str(e
):
203 raise Exception("Unexpected error message: " + str(e
))
204 dbus_set(dbus
, wpas_obj
, "DebugTimestamp", True)
205 dbus_get(dbus
, wpas_obj
, "DebugTimestamp", expect
=True)
207 dbus_get(dbus
, wpas_obj
, "DebugShowKeys", expect
=True)
208 dbus_set(dbus
, wpas_obj
, "DebugShowKeys", False)
209 dbus_get(dbus
, wpas_obj
, "DebugShowKeys", expect
=False)
211 dbus_set(dbus
, wpas_obj
, "DebugShowKeys", "foo")
212 raise Exception("Invalid DebugShowKeys value accepted")
213 except dbus
.exceptions
.DBusException
, e
:
214 if "Error.Failed: wrong property type" not in str(e
):
215 raise Exception("Unexpected error message: " + str(e
))
216 dbus_set(dbus
, wpas_obj
, "DebugShowKeys", True)
217 dbus_get(dbus
, wpas_obj
, "DebugShowKeys", expect
=True)
219 res
= dbus_get(dbus
, wpas_obj
, "Interfaces")
221 raise Exception("Unexpected Interfaces value: " + str(res
))
223 res
= dbus_get(dbus
, wpas_obj
, "EapMethods")
224 if len(res
) < 5 or "TTLS" not in res
:
225 raise Exception("Unexpected EapMethods value: " + str(res
))
227 res
= dbus_get(dbus
, wpas_obj
, "Capabilities")
228 if len(res
) < 2 or "p2p" not in res
:
229 raise Exception("Unexpected Capabilities value: " + str(res
))
231 dbus_get(dbus
, wpas_obj
, "WFDIEs", byte_arrays
=True)
232 val
= binascii
.unhexlify("010006020304050608")
233 dbus_set(dbus
, wpas_obj
, "WFDIEs", dbus
.ByteArray(val
))
234 res
= dbus_get(dbus
, wpas_obj
, "WFDIEs", byte_arrays
=True)
236 raise Exception("WFDIEs value changed")
238 dbus_set(dbus
, wpas_obj
, "WFDIEs", dbus
.ByteArray('\x00'))
239 raise Exception("Invalid WFDIEs value accepted")
240 except dbus
.exceptions
.DBusException
, e
:
241 if "InvalidArgs" not in str(e
):
242 raise Exception("Unexpected error message: " + str(e
))
243 dbus_set(dbus
, wpas_obj
, "WFDIEs", dbus
.ByteArray(''))
244 dbus_set(dbus
, wpas_obj
, "WFDIEs", dbus
.ByteArray(val
))
245 dbus_set(dbus
, wpas_obj
, "WFDIEs", dbus
.ByteArray(''))
246 res
= dbus_get(dbus
, wpas_obj
, "WFDIEs", byte_arrays
=True)
248 raise Exception("WFDIEs not cleared properly")
250 res
= dbus_get(dbus
, wpas_obj
, "EapMethods")
252 dbus_set(dbus
, wpas_obj
, "EapMethods", res
)
253 raise Exception("Invalid Set accepted")
254 except dbus
.exceptions
.DBusException
, e
:
255 if "InvalidArgs: Property is read-only" not in str(e
):
256 raise Exception("Unexpected error message: " + str(e
))
259 wpas_obj
.SetFoo(WPAS_DBUS_SERVICE
, "DebugShowKeys", True,
260 dbus_interface
=dbus
.PROPERTIES_IFACE
)
261 raise Exception("Unknown method accepted")
262 except dbus
.exceptions
.DBusException
, e
:
263 if "UnknownMethod" not in str(e
):
264 raise Exception("Unexpected error message: " + str(e
))
267 wpas_obj
.Get("foo", "DebugShowKeys",
268 dbus_interface
=dbus
.PROPERTIES_IFACE
)
269 raise Exception("Invalid Get accepted")
270 except dbus
.exceptions
.DBusException
, e
:
271 if "InvalidArgs: No such property" not in str(e
):
272 raise Exception("Unexpected error message: " + str(e
))
274 test_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, WPAS_DBUS_PATH
,
277 test_obj
.Get(123, "DebugShowKeys",
278 dbus_interface
=dbus
.PROPERTIES_IFACE
)
279 raise Exception("Invalid Get accepted")
280 except dbus
.exceptions
.DBusException
, e
:
281 if "InvalidArgs: Invalid arguments" not in str(e
):
282 raise Exception("Unexpected error message: " + str(e
))
284 test_obj
.Get(WPAS_DBUS_SERVICE
, 123,
285 dbus_interface
=dbus
.PROPERTIES_IFACE
)
286 raise Exception("Invalid Get accepted")
287 except dbus
.exceptions
.DBusException
, e
:
288 if "InvalidArgs: Invalid arguments" not in str(e
):
289 raise Exception("Unexpected error message: " + str(e
))
292 wpas_obj
.Set(WPAS_DBUS_SERVICE
, "WFDIEs",
293 dbus
.ByteArray('', variant_level
=2),
294 dbus_interface
=dbus
.PROPERTIES_IFACE
)
295 raise Exception("Invalid Set accepted")
296 except dbus
.exceptions
.DBusException
, e
:
297 if "InvalidArgs: invalid message format" not in str(e
):
298 raise Exception("Unexpected error message: " + str(e
))
300 def test_dbus_set_global_properties(dev
, apdev
):
301 """D-Bus Get/Set fi.w1.wpa_supplicant1 interface global properties"""
302 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
304 props
= [ ('Okc', '0', '1'), ('ModelName', '', 'blahblahblah') ]
307 res
= if_obj
.Get(WPAS_DBUS_IFACE
, p
[0],
308 dbus_interface
=dbus
.PROPERTIES_IFACE
)
310 raise Exception("Unexpected " + p
[0] + " value: " + str(res
))
312 if_obj
.Set(WPAS_DBUS_IFACE
, p
[0], p
[2],
313 dbus_interface
=dbus
.PROPERTIES_IFACE
)
315 res
= if_obj
.Get(WPAS_DBUS_IFACE
, p
[0],
316 dbus_interface
=dbus
.PROPERTIES_IFACE
)
318 raise Exception("Unexpected " + p
[0] + " value after set: " + str(res
))
320 def test_dbus_invalid_method(dev
, apdev
):
321 """D-Bus invalid method"""
322 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
323 wps
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_WPS
)
327 raise Exception("Unknown method accepted")
328 except dbus
.exceptions
.DBusException
, e
:
329 if "UnknownMethod" not in str(e
):
330 raise Exception("Unexpected error message: " + str(e
))
332 test_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, path
, introspect
=False)
333 test_wps
= dbus
.Interface(test_obj
, WPAS_DBUS_IFACE_WPS
)
336 raise Exception("WPS.Start with incorrect signature accepted")
337 except dbus
.exceptions
.DBusException
, e
:
338 if "InvalidArgs: Invalid arg" not in str(e
):
339 raise Exception("Unexpected error message: " + str(e
))
341 def test_dbus_get_set_wps(dev
, apdev
):
342 """D-Bus Get/Set for WPS properties"""
344 _test_dbus_get_set_wps(dev
, apdev
)
346 dev
[0].request("SET wps_cred_processing 0")
347 dev
[0].request("SET config_methods display keypad virtual_display nfc_interface p2ps")
349 def _test_dbus_get_set_wps(dev
, apdev
):
350 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
352 if_obj
.Get(WPAS_DBUS_IFACE_WPS
, "ConfigMethods",
353 dbus_interface
=dbus
.PROPERTIES_IFACE
)
355 val
= "display keypad virtual_display nfc_interface"
356 dev
[0].request("SET config_methods " + val
)
358 config
= if_obj
.Get(WPAS_DBUS_IFACE_WPS
, "ConfigMethods",
359 dbus_interface
=dbus
.PROPERTIES_IFACE
)
361 raise Exception("Unexpected Get(ConfigMethods) result: " + config
)
363 val2
= "push_button display"
364 if_obj
.Set(WPAS_DBUS_IFACE_WPS
, "ConfigMethods", val2
,
365 dbus_interface
=dbus
.PROPERTIES_IFACE
)
366 config
= if_obj
.Get(WPAS_DBUS_IFACE_WPS
, "ConfigMethods",
367 dbus_interface
=dbus
.PROPERTIES_IFACE
)
369 raise Exception("Unexpected Get(ConfigMethods) result after Set: " + config
)
371 dev
[0].request("SET config_methods " + val
)
374 dev
[0].request("SET wps_cred_processing " + str(i
))
375 val
= if_obj
.Get(WPAS_DBUS_IFACE_WPS
, "ProcessCredentials",
376 dbus_interface
=dbus
.PROPERTIES_IFACE
)
377 expected_val
= False if i
== 1 else True
378 if val
!= expected_val
:
379 raise Exception("Unexpected Get(ProcessCredentials) result({}): {}".format(i
, val
))
381 class TestDbusGetSet(TestDbus
):
382 def __init__(self
, bus
):
383 TestDbus
.__init
__(self
, bus
)
384 self
.signal_received
= False
385 self
.signal_received_deprecated
= False
386 self
.sets_done
= False
389 gobject
.timeout_add(1, self
.run_sets
)
390 gobject
.timeout_add(1000, self
.timeout
)
391 self
.add_signal(self
.propertiesChanged
, WPAS_DBUS_IFACE_WPS
,
393 self
.add_signal(self
.propertiesChanged2
, dbus
.PROPERTIES_IFACE
,
398 def propertiesChanged(self
, properties
):
399 logger
.debug("PropertiesChanged: " + str(properties
))
400 if properties
.has_key("ProcessCredentials"):
401 self
.signal_received_deprecated
= True
402 if self
.sets_done
and self
.signal_received
:
405 def propertiesChanged2(self
, interface_name
, changed_properties
,
406 invalidated_properties
):
407 logger
.debug("propertiesChanged2: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name
, str(changed_properties
), str(invalidated_properties
)))
408 if interface_name
!= WPAS_DBUS_IFACE_WPS
:
410 if changed_properties
.has_key("ProcessCredentials"):
411 self
.signal_received
= True
412 if self
.sets_done
and self
.signal_received_deprecated
:
415 def run_sets(self
, *args
):
416 logger
.debug("run_sets")
417 if_obj
.Set(WPAS_DBUS_IFACE_WPS
, "ProcessCredentials",
419 dbus_interface
=dbus
.PROPERTIES_IFACE
)
420 if if_obj
.Get(WPAS_DBUS_IFACE_WPS
, "ProcessCredentials",
421 dbus_interface
=dbus
.PROPERTIES_IFACE
) != True:
422 raise Exception("Unexpected Get(ProcessCredentials) result after Set");
423 if_obj
.Set(WPAS_DBUS_IFACE_WPS
, "ProcessCredentials",
425 dbus_interface
=dbus
.PROPERTIES_IFACE
)
426 if if_obj
.Get(WPAS_DBUS_IFACE_WPS
, "ProcessCredentials",
427 dbus_interface
=dbus
.PROPERTIES_IFACE
) != False:
428 raise Exception("Unexpected Get(ProcessCredentials) result after Set");
430 self
.dbus_sets_done
= True
434 return self
.signal_received
and self
.signal_received_deprecated
436 with
TestDbusGetSet(bus
) as t
:
438 raise Exception("No signal received for ProcessCredentials change")
440 def test_dbus_wps_invalid(dev
, apdev
):
441 """D-Bus invaldi WPS operation"""
442 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
443 wps
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_WPS
)
445 failures
= [ {'Role': 'foo', 'Type': 'pbc'},
446 {'Role': 123, 'Type': 'pbc'},
448 {'Role': 'enrollee'},
449 {'Role': 'registrar'},
450 {'Role': 'enrollee', 'Type': 123},
451 {'Role': 'enrollee', 'Type': 'foo'},
452 {'Role': 'enrollee', 'Type': 'pbc',
453 'Bssid': '02:33:44:55:66:77'},
454 {'Role': 'enrollee', 'Type': 'pin', 'Pin': 123},
455 {'Role': 'enrollee', 'Type': 'pbc',
456 'Bssid': dbus
.ByteArray('12345')},
457 {'Role': 'enrollee', 'Type': 'pbc',
458 'P2PDeviceAddress': 12345},
459 {'Role': 'enrollee', 'Type': 'pbc',
460 'P2PDeviceAddress': dbus
.ByteArray('12345')},
461 {'Role': 'enrollee', 'Type': 'pbc', 'Foo': 'bar'} ]
462 for args
in failures
:
465 raise Exception("Invalid WPS.Start() arguments accepted: " + str(args
))
466 except dbus
.exceptions
.DBusException
, e
:
467 if not str(e
).startswith("fi.w1.wpa_supplicant1.InvalidArgs"):
468 raise Exception("Unexpected error message: " + str(e
))
470 def test_dbus_wps_oom(dev
, apdev
):
471 """D-Bus WPS operation (OOM)"""
472 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
473 wps
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_WPS
)
475 with
alloc_fail_dbus(dev
[0], 1, "=wpas_dbus_getter_state", "Get"):
476 if_obj
.Get(WPAS_DBUS_IFACE
, "State",
477 dbus_interface
=dbus
.PROPERTIES_IFACE
)
479 hapd
= hostapd
.add_ap(apdev
[0]['ifname'], { "ssid": "open" })
480 bssid
= apdev
[0]['bssid']
481 dev
[0].scan_for_bss(bssid
, freq
=2412)
483 for i
in range(1, 3):
484 with
alloc_fail_dbus(dev
[0], i
, "=wpas_dbus_getter_bsss", "Get"):
485 if_obj
.Get(WPAS_DBUS_IFACE
, "BSSs",
486 dbus_interface
=dbus
.PROPERTIES_IFACE
)
488 res
= if_obj
.Get(WPAS_DBUS_IFACE
, 'BSSs',
489 dbus_interface
=dbus
.PROPERTIES_IFACE
)
490 bss_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, res
[0])
491 with
alloc_fail_dbus(dev
[0], 1, "=wpas_dbus_getter_bss_rates", "Get"):
492 bss_obj
.Get(WPAS_DBUS_BSS
, "Rates",
493 dbus_interface
=dbus
.PROPERTIES_IFACE
)
495 id = dev
[0].add_network()
496 dev
[0].set_network(id, "disabled", "0")
497 dev
[0].set_network_quoted(id, "ssid", "test")
499 for i
in range(1, 3):
500 with
alloc_fail_dbus(dev
[0], i
, "=wpas_dbus_getter_networks", "Get"):
501 if_obj
.Get(WPAS_DBUS_IFACE
, "Networks",
502 dbus_interface
=dbus
.PROPERTIES_IFACE
)
504 with
alloc_fail_dbus(dev
[0], 1, "wpas_dbus_getter_interfaces", "Get"):
505 dbus_get(dbus
, wpas_obj
, "Interfaces")
507 for i
in range(1, 6):
508 with
alloc_fail_dbus(dev
[0], i
, "=eap_get_names_as_string_array;wpas_dbus_getter_eap_methods", "Get"):
509 dbus_get(dbus
, wpas_obj
, "EapMethods")
511 with
alloc_fail_dbus(dev
[0], 1, "wpas_dbus_setter_config_methods", "Set",
512 expected
="Error.Failed: Failed to set property"):
513 val2
= "push_button display"
514 if_obj
.Set(WPAS_DBUS_IFACE_WPS
, "ConfigMethods", val2
,
515 dbus_interface
=dbus
.PROPERTIES_IFACE
)
517 with
alloc_fail_dbus(dev
[0], 1, "=wpa_config_add_network;wpas_dbus_handler_wps_start",
519 expected
="UnknownError: WPS start failed"):
520 wps
.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670'})
522 def test_dbus_wps_pbc(dev
, apdev
):
523 """D-Bus WPS/PBC operation and signals"""
525 _test_dbus_wps_pbc(dev
, apdev
)
527 dev
[0].request("SET wps_cred_processing 0")
529 def _test_dbus_wps_pbc(dev
, apdev
):
530 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
531 wps
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_WPS
)
533 hapd
= start_ap(apdev
[0])
534 hapd
.request("WPS_PBC")
535 bssid
= apdev
[0]['bssid']
536 dev
[0].scan_for_bss(bssid
, freq
="2412")
537 dev
[0].request("SET wps_cred_processing 2")
539 res
= if_obj
.Get(WPAS_DBUS_IFACE
, 'BSSs',
540 dbus_interface
=dbus
.PROPERTIES_IFACE
)
542 raise Exception("Missing BSSs entry: " + str(res
))
543 bss_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, res
[0])
544 props
= bss_obj
.GetAll(WPAS_DBUS_BSS
, dbus_interface
=dbus
.PROPERTIES_IFACE
)
545 logger
.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS
, res
[0], str(props
)))
546 if 'WPS' not in props
:
547 raise Exception("No WPS information in the BSS entry")
548 if 'Type' not in props
['WPS']:
549 raise Exception("No Type field in the WPS dictionary")
550 if props
['WPS']['Type'] != 'pbc':
551 raise Exception("Unexpected WPS Type: " + props
['WPS']['Type'])
553 class TestDbusWps(TestDbus
):
554 def __init__(self
, bus
, wps
):
555 TestDbus
.__init
__(self
, bus
)
556 self
.success_seen
= False
557 self
.credentials_received
= False
561 gobject
.timeout_add(1, self
.start_pbc
)
562 gobject
.timeout_add(15000, self
.timeout
)
563 self
.add_signal(self
.wpsEvent
, WPAS_DBUS_IFACE_WPS
, "Event")
564 self
.add_signal(self
.credentials
, WPAS_DBUS_IFACE_WPS
,
569 def wpsEvent(self
, name
, args
):
570 logger
.debug("wpsEvent: %s args='%s'" % (name
, str(args
)))
571 if name
== "success":
572 self
.success_seen
= True
573 if self
.credentials_received
:
576 def credentials(self
, args
):
577 logger
.debug("credentials: " + str(args
))
578 self
.credentials_received
= True
579 if self
.success_seen
:
582 def start_pbc(self
, *args
):
583 logger
.debug("start_pbc")
584 self
.wps
.Start({'Role': 'enrollee', 'Type': 'pbc'})
588 return self
.success_seen
and self
.credentials_received
590 with
TestDbusWps(bus
, wps
) as t
:
592 raise Exception("Failure in D-Bus operations")
594 dev
[0].wait_connected(timeout
=10)
595 dev
[0].request("DISCONNECT")
597 dev
[0].flush_scan_cache()
599 def test_dbus_wps_pbc_overlap(dev
, apdev
):
600 """D-Bus WPS/PBC operation and signal for PBC overlap"""
601 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
602 wps
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_WPS
)
604 hapd
= start_ap(apdev
[0])
605 hapd2
= start_ap(apdev
[1], ssid
="test-wps2",
606 ap_uuid
="27ea801a-9e5c-4e73-bd82-f89cbcd10d7f")
607 hapd
.request("WPS_PBC")
608 hapd2
.request("WPS_PBC")
609 bssid
= apdev
[0]['bssid']
610 dev
[0].scan_for_bss(bssid
, freq
="2412")
611 bssid2
= apdev
[1]['bssid']
612 dev
[0].scan_for_bss(bssid2
, freq
="2412")
614 class TestDbusWps(TestDbus
):
615 def __init__(self
, bus
, wps
):
616 TestDbus
.__init
__(self
, bus
)
617 self
.overlap_seen
= False
621 gobject
.timeout_add(1, self
.start_pbc
)
622 gobject
.timeout_add(15000, self
.timeout
)
623 self
.add_signal(self
.wpsEvent
, WPAS_DBUS_IFACE_WPS
, "Event")
627 def wpsEvent(self
, name
, args
):
628 logger
.debug("wpsEvent: %s args='%s'" % (name
, str(args
)))
629 if name
== "pbc-overlap":
630 self
.overlap_seen
= True
633 def start_pbc(self
, *args
):
634 logger
.debug("start_pbc")
635 self
.wps
.Start({'Role': 'enrollee', 'Type': 'pbc'})
639 return self
.overlap_seen
641 with
TestDbusWps(bus
, wps
) as t
:
643 raise Exception("Failure in D-Bus operations")
645 dev
[0].request("WPS_CANCEL")
646 dev
[0].request("DISCONNECT")
648 dev
[0].flush_scan_cache()
650 def test_dbus_wps_pin(dev
, apdev
):
651 """D-Bus WPS/PIN operation and signals"""
653 _test_dbus_wps_pin(dev
, apdev
)
655 dev
[0].request("SET wps_cred_processing 0")
657 def _test_dbus_wps_pin(dev
, apdev
):
658 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
659 wps
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_WPS
)
661 hapd
= start_ap(apdev
[0])
662 hapd
.request("WPS_PIN any 12345670")
663 bssid
= apdev
[0]['bssid']
664 dev
[0].scan_for_bss(bssid
, freq
="2412")
665 dev
[0].request("SET wps_cred_processing 2")
667 class TestDbusWps(TestDbus
):
668 def __init__(self
, bus
):
669 TestDbus
.__init
__(self
, bus
)
670 self
.success_seen
= False
671 self
.credentials_received
= False
674 gobject
.timeout_add(1, self
.start_pin
)
675 gobject
.timeout_add(15000, self
.timeout
)
676 self
.add_signal(self
.wpsEvent
, WPAS_DBUS_IFACE_WPS
, "Event")
677 self
.add_signal(self
.credentials
, WPAS_DBUS_IFACE_WPS
,
682 def wpsEvent(self
, name
, args
):
683 logger
.debug("wpsEvent: %s args='%s'" % (name
, str(args
)))
684 if name
== "success":
685 self
.success_seen
= True
686 if self
.credentials_received
:
689 def credentials(self
, args
):
690 logger
.debug("credentials: " + str(args
))
691 self
.credentials_received
= True
692 if self
.success_seen
:
695 def start_pin(self
, *args
):
696 logger
.debug("start_pin")
697 bssid_ay
= dbus
.ByteArray(bssid
.replace(':','').decode('hex'))
698 wps
.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
703 return self
.success_seen
and self
.credentials_received
705 with
TestDbusWps(bus
) as t
:
707 raise Exception("Failure in D-Bus operations")
709 dev
[0].wait_connected(timeout
=10)
711 def test_dbus_wps_pin2(dev
, apdev
):
712 """D-Bus WPS/PIN operation and signals (PIN from wpa_supplicant)"""
714 _test_dbus_wps_pin2(dev
, apdev
)
716 dev
[0].request("SET wps_cred_processing 0")
718 def _test_dbus_wps_pin2(dev
, apdev
):
719 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
720 wps
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_WPS
)
722 hapd
= start_ap(apdev
[0])
723 bssid
= apdev
[0]['bssid']
724 dev
[0].scan_for_bss(bssid
, freq
="2412")
725 dev
[0].request("SET wps_cred_processing 2")
727 class TestDbusWps(TestDbus
):
728 def __init__(self
, bus
):
729 TestDbus
.__init
__(self
, bus
)
730 self
.success_seen
= False
734 gobject
.timeout_add(1, self
.start_pin
)
735 gobject
.timeout_add(15000, self
.timeout
)
736 self
.add_signal(self
.wpsEvent
, WPAS_DBUS_IFACE_WPS
, "Event")
737 self
.add_signal(self
.credentials
, WPAS_DBUS_IFACE_WPS
,
742 def wpsEvent(self
, name
, args
):
743 logger
.debug("wpsEvent: %s args='%s'" % (name
, str(args
)))
744 if name
== "success":
745 self
.success_seen
= True
746 if self
.credentials_received
:
749 def credentials(self
, args
):
750 logger
.debug("credentials: " + str(args
))
751 self
.credentials_received
= True
752 if self
.success_seen
:
755 def start_pin(self
, *args
):
756 logger
.debug("start_pin")
757 bssid_ay
= dbus
.ByteArray(bssid
.replace(':','').decode('hex'))
758 res
= wps
.Start({'Role': 'enrollee', 'Type': 'pin',
761 h
= hostapd
.Hostapd(apdev
[0]['ifname'])
762 h
.request("WPS_PIN any " + pin
)
766 return self
.success_seen
and self
.credentials_received
768 with
TestDbusWps(bus
) as t
:
770 raise Exception("Failure in D-Bus operations")
772 dev
[0].wait_connected(timeout
=10)
774 def test_dbus_wps_pin_m2d(dev
, apdev
):
775 """D-Bus WPS/PIN operation and signals with M2D"""
777 _test_dbus_wps_pin_m2d(dev
, apdev
)
779 dev
[0].request("SET wps_cred_processing 0")
781 def _test_dbus_wps_pin_m2d(dev
, apdev
):
782 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
783 wps
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_WPS
)
785 hapd
= start_ap(apdev
[0])
786 bssid
= apdev
[0]['bssid']
787 dev
[0].scan_for_bss(bssid
, freq
="2412")
788 dev
[0].request("SET wps_cred_processing 2")
790 class TestDbusWps(TestDbus
):
791 def __init__(self
, bus
):
792 TestDbus
.__init
__(self
, bus
)
793 self
.success_seen
= False
794 self
.credentials_received
= False
797 gobject
.timeout_add(1, self
.start_pin
)
798 gobject
.timeout_add(15000, self
.timeout
)
799 self
.add_signal(self
.wpsEvent
, WPAS_DBUS_IFACE_WPS
, "Event")
800 self
.add_signal(self
.credentials
, WPAS_DBUS_IFACE_WPS
,
805 def wpsEvent(self
, name
, args
):
806 logger
.debug("wpsEvent: %s args='%s'" % (name
, str(args
)))
807 if name
== "success":
808 self
.success_seen
= True
809 if self
.credentials_received
:
812 h
= hostapd
.Hostapd(apdev
[0]['ifname'])
813 h
.request("WPS_PIN any 12345670")
815 def credentials(self
, args
):
816 logger
.debug("credentials: " + str(args
))
817 self
.credentials_received
= True
818 if self
.success_seen
:
821 def start_pin(self
, *args
):
822 logger
.debug("start_pin")
823 bssid_ay
= dbus
.ByteArray(bssid
.replace(':','').decode('hex'))
824 wps
.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
829 return self
.success_seen
and self
.credentials_received
831 with
TestDbusWps(bus
) as t
:
833 raise Exception("Failure in D-Bus operations")
835 dev
[0].wait_connected(timeout
=10)
837 def test_dbus_wps_reg(dev
, apdev
):
838 """D-Bus WPS/Registrar operation and signals"""
840 _test_dbus_wps_reg(dev
, apdev
)
842 dev
[0].request("SET wps_cred_processing 0")
844 def _test_dbus_wps_reg(dev
, apdev
):
845 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
846 wps
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_WPS
)
848 hapd
= start_ap(apdev
[0])
849 hapd
.request("WPS_PIN any 12345670")
850 bssid
= apdev
[0]['bssid']
851 dev
[0].scan_for_bss(bssid
, freq
="2412")
852 dev
[0].request("SET wps_cred_processing 2")
854 class TestDbusWps(TestDbus
):
855 def __init__(self
, bus
):
856 TestDbus
.__init
__(self
, bus
)
857 self
.credentials_received
= False
860 gobject
.timeout_add(100, self
.start_reg
)
861 gobject
.timeout_add(15000, self
.timeout
)
862 self
.add_signal(self
.wpsEvent
, WPAS_DBUS_IFACE_WPS
, "Event")
863 self
.add_signal(self
.credentials
, WPAS_DBUS_IFACE_WPS
,
868 def wpsEvent(self
, name
, args
):
869 logger
.debug("wpsEvent: %s args='%s'" % (name
, str(args
)))
871 def credentials(self
, args
):
872 logger
.debug("credentials: " + str(args
))
873 self
.credentials_received
= True
876 def start_reg(self
, *args
):
877 logger
.debug("start_reg")
878 bssid_ay
= dbus
.ByteArray(bssid
.replace(':','').decode('hex'))
879 wps
.Start({'Role': 'registrar', 'Type': 'pin',
880 'Pin': '12345670', 'Bssid': bssid_ay
})
884 return self
.credentials_received
886 with
TestDbusWps(bus
) as t
:
888 raise Exception("Failure in D-Bus operations")
890 dev
[0].wait_connected(timeout
=10)
892 def test_dbus_wps_cancel(dev
, apdev
):
893 """D-Bus WPS Cancel operation"""
894 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
895 wps
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_WPS
)
897 hapd
= start_ap(apdev
[0])
898 bssid
= apdev
[0]['bssid']
901 dev
[0].scan_for_bss(bssid
, freq
="2412")
902 bssid_ay
= dbus
.ByteArray(bssid
.replace(':','').decode('hex'))
903 wps
.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
906 dev
[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 1)
908 def test_dbus_scan_invalid(dev
, apdev
):
909 """D-Bus invalid scan method"""
910 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
911 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
913 tests
= [ ({}, "InvalidArgs"),
914 ({'Type': 123}, "InvalidArgs"),
915 ({'Type': 'foo'}, "InvalidArgs"),
916 ({'Type': 'active', 'Foo': 'bar'}, "InvalidArgs"),
917 ({'Type': 'active', 'SSIDs': 'foo'}, "InvalidArgs"),
918 ({'Type': 'active', 'SSIDs': ['foo']}, "InvalidArgs"),
920 'SSIDs': [ dbus
.ByteArray("1"), dbus
.ByteArray("2"),
921 dbus
.ByteArray("3"), dbus
.ByteArray("4"),
922 dbus
.ByteArray("5"), dbus
.ByteArray("6"),
923 dbus
.ByteArray("7"), dbus
.ByteArray("8"),
924 dbus
.ByteArray("9"), dbus
.ByteArray("10"),
925 dbus
.ByteArray("11"), dbus
.ByteArray("12"),
926 dbus
.ByteArray("13"), dbus
.ByteArray("14"),
927 dbus
.ByteArray("15"), dbus
.ByteArray("16"),
928 dbus
.ByteArray("17") ]},
931 'SSIDs': [ dbus
.ByteArray("1234567890abcdef1234567890abcdef1") ]},
933 ({'Type': 'active', 'IEs': 'foo'}, "InvalidArgs"),
934 ({'Type': 'active', 'IEs': ['foo']}, "InvalidArgs"),
935 ({'Type': 'active', 'Channels': 2412 }, "InvalidArgs"),
936 ({'Type': 'active', 'Channels': [ 2412 ] }, "InvalidArgs"),
938 'Channels': [ (dbus
.Int32(2412), dbus
.UInt32(20)) ] },
941 'Channels': [ (dbus
.UInt32(2412), dbus
.Int32(20)) ] },
943 ({'Type': 'active', 'AllowRoam': "yes" }, "InvalidArgs"),
944 ({'Type': 'passive', 'IEs': [ dbus
.ByteArray("\xdd\x00") ]},
946 ({'Type': 'passive', 'SSIDs': [ dbus
.ByteArray("foo") ]},
948 for (t
,err
) in tests
:
951 raise Exception("Invalid Scan() arguments accepted: " + str(t
))
952 except dbus
.exceptions
.DBusException
, e
:
953 if err
not in str(e
):
954 raise Exception("Unexpected error message for invalid Scan(%s): %s" % (str(t
), str(e
)))
956 def test_dbus_scan_oom(dev
, apdev
):
957 """D-Bus scan method and OOM"""
958 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
959 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
961 with
alloc_fail_dbus(dev
[0], 1,
962 "wpa_scan_clone_params;wpas_dbus_handler_scan",
963 "Scan", expected
="ScanError: Scan request rejected"):
964 iface
.Scan({ 'Type': 'passive',
965 'Channels': [ (dbus
.UInt32(2412), dbus
.UInt32(20)) ] })
967 with
alloc_fail_dbus(dev
[0], 1,
968 "=wpas_dbus_get_scan_channels;wpas_dbus_handler_scan",
970 iface
.Scan({ 'Type': 'passive',
971 'Channels': [ (dbus
.UInt32(2412), dbus
.UInt32(20)) ] })
973 with
alloc_fail_dbus(dev
[0], 1,
974 "=wpas_dbus_get_scan_ies;wpas_dbus_handler_scan",
976 iface
.Scan({ 'Type': 'active',
977 'IEs': [ dbus
.ByteArray("\xdd\x00") ],
978 'Channels': [ (dbus
.UInt32(2412), dbus
.UInt32(20)) ] })
980 with
alloc_fail_dbus(dev
[0], 1,
981 "=wpas_dbus_get_scan_ssids;wpas_dbus_handler_scan",
983 iface
.Scan({ 'Type': 'active',
984 'SSIDs': [ dbus
.ByteArray("open"),
986 'Channels': [ (dbus
.UInt32(2412), dbus
.UInt32(20)) ] })
988 def test_dbus_scan(dev
, apdev
):
989 """D-Bus scan and related signals"""
990 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
991 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
993 hapd
= hostapd
.add_ap(apdev
[0]['ifname'], { "ssid": "open" })
995 class TestDbusScan(TestDbus
):
996 def __init__(self
, bus
):
997 TestDbus
.__init
__(self
, bus
)
998 self
.scan_completed
= 0
999 self
.bss_added
= False
1000 self
.fail_reason
= None
1002 def __enter__(self
):
1003 gobject
.timeout_add(1, self
.run_scan
)
1004 gobject
.timeout_add(15000, self
.timeout
)
1005 self
.add_signal(self
.scanDone
, WPAS_DBUS_IFACE
, "ScanDone")
1006 self
.add_signal(self
.bssAdded
, WPAS_DBUS_IFACE
, "BSSAdded")
1007 self
.add_signal(self
.bssRemoved
, WPAS_DBUS_IFACE
, "BSSRemoved")
1011 def scanDone(self
, success
):
1012 logger
.debug("scanDone: success=%s" % success
)
1013 self
.scan_completed
+= 1
1014 if self
.scan_completed
== 1:
1015 iface
.Scan({'Type': 'passive',
1017 'Channels': [(dbus
.UInt32(2412), dbus
.UInt32(20))]})
1018 elif self
.scan_completed
== 2:
1019 iface
.Scan({'Type': 'passive',
1020 'AllowRoam': False})
1021 elif self
.bss_added
and self
.scan_completed
== 3:
1024 def bssAdded(self
, bss
, properties
):
1025 logger
.debug("bssAdded: %s" % bss
)
1026 logger
.debug(str(properties
))
1027 if 'WPS' in properties
:
1028 if 'Type' in properties
['WPS']:
1029 self
.fail_reason
= "Unexpected WPS dictionary entry in non-WPS BSS"
1031 self
.bss_added
= True
1032 if self
.scan_completed
== 3:
1035 def bssRemoved(self
, bss
):
1036 logger
.debug("bssRemoved: %s" % bss
)
1038 def run_scan(self
, *args
):
1039 logger
.debug("run_scan")
1040 iface
.Scan({'Type': 'active',
1041 'SSIDs': [ dbus
.ByteArray("open"),
1043 'IEs': [ dbus
.ByteArray("\xdd\x00"),
1046 'Channels': [(dbus
.UInt32(2412), dbus
.UInt32(20))]})
1050 return self
.scan_completed
== 3 and self
.bss_added
1052 with
TestDbusScan(bus
) as t
:
1054 raise Exception(t
.fail_reason
)
1056 raise Exception("Expected signals not seen")
1058 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "BSSs",
1059 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1061 raise Exception("Scan result not in BSSs property")
1063 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "BSSs",
1064 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1066 raise Exception("FlushBSS() did not remove scan results from BSSs property")
1069 def test_dbus_scan_busy(dev
, apdev
):
1070 """D-Bus scan trigger rejection when busy with previous scan"""
1071 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1072 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1074 if "OK" not in dev
[0].request("SCAN freq=2412-2462"):
1075 raise Exception("Failed to start scan")
1076 ev
= dev
[0].wait_event(["CTRL-EVENT-SCAN-STARTED"], 15)
1078 raise Exception("Scan start timed out")
1081 iface
.Scan({'Type': 'active', 'AllowRoam': False})
1082 raise Exception("Scan() accepted when busy")
1083 except dbus
.exceptions
.DBusException
, e
:
1084 if "ScanError: Scan request reject" not in str(e
):
1085 raise Exception("Unexpected error message: " + str(e
))
1087 ev
= dev
[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
1089 raise Exception("Scan timed out")
1091 def test_dbus_connect(dev
, apdev
):
1092 """D-Bus AddNetwork and connect"""
1093 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1094 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1096 ssid
= "test-wpa2-psk"
1097 passphrase
= 'qwertyuiop'
1098 params
= hostapd
.wpa2_params(ssid
=ssid
, passphrase
=passphrase
)
1099 hapd
= hostapd
.add_ap(apdev
[0]['ifname'], params
)
1101 class TestDbusConnect(TestDbus
):
1102 def __init__(self
, bus
):
1103 TestDbus
.__init
__(self
, bus
)
1104 self
.network_added
= False
1105 self
.network_selected
= False
1106 self
.network_removed
= False
1109 def __enter__(self
):
1110 gobject
.timeout_add(1, self
.run_connect
)
1111 gobject
.timeout_add(15000, self
.timeout
)
1112 self
.add_signal(self
.networkAdded
, WPAS_DBUS_IFACE
, "NetworkAdded")
1113 self
.add_signal(self
.networkRemoved
, WPAS_DBUS_IFACE
,
1115 self
.add_signal(self
.networkSelected
, WPAS_DBUS_IFACE
,
1117 self
.add_signal(self
.propertiesChanged
, WPAS_DBUS_IFACE
,
1118 "PropertiesChanged")
1122 def networkAdded(self
, network
, properties
):
1123 logger
.debug("networkAdded: %s" % str(network
))
1124 logger
.debug(str(properties
))
1125 self
.network_added
= True
1127 def networkRemoved(self
, network
):
1128 logger
.debug("networkRemoved: %s" % str(network
))
1129 self
.network_removed
= True
1131 def networkSelected(self
, network
):
1132 logger
.debug("networkSelected: %s" % str(network
))
1133 self
.network_selected
= True
1135 def propertiesChanged(self
, properties
):
1136 logger
.debug("propertiesChanged: %s" % str(properties
))
1137 if 'State' in properties
and properties
['State'] == "completed":
1141 elif self
.state
== 2:
1144 elif self
.state
== 4:
1147 elif self
.state
== 5:
1150 elif self
.state
== 7:
1152 res
= iface
.SignalPoll()
1153 logger
.debug("SignalPoll: " + str(res
))
1154 if 'frequency' not in res
or res
['frequency'] != 2412:
1156 logger
.info("Unexpected SignalPoll result")
1157 iface
.RemoveNetwork(self
.netw
)
1158 if 'State' in properties
and properties
['State'] == "disconnected":
1161 iface
.SelectNetwork(self
.netw
)
1162 elif self
.state
== 3:
1165 elif self
.state
== 6:
1168 elif self
.state
== 8:
1172 def run_connect(self
, *args
):
1173 logger
.debug("run_connect")
1174 args
= dbus
.Dictionary({ 'ssid': ssid
,
1175 'key_mgmt': 'WPA-PSK',
1177 'scan_freq': 2412 },
1179 self
.netw
= iface
.AddNetwork(args
)
1180 iface
.SelectNetwork(self
.netw
)
1184 if not self
.network_added
or \
1185 not self
.network_removed
or \
1186 not self
.network_selected
:
1188 return self
.state
== 9
1190 with
TestDbusConnect(bus
) as t
:
1192 raise Exception("Expected signals not seen")
1194 def test_dbus_connect_psk_mem(dev
, apdev
):
1195 """D-Bus AddNetwork and connect with memory-only PSK"""
1196 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1197 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1199 ssid
= "test-wpa2-psk"
1200 passphrase
= 'qwertyuiop'
1201 params
= hostapd
.wpa2_params(ssid
=ssid
, passphrase
=passphrase
)
1202 hapd
= hostapd
.add_ap(apdev
[0]['ifname'], params
)
1204 class TestDbusConnect(TestDbus
):
1205 def __init__(self
, bus
):
1206 TestDbus
.__init
__(self
, bus
)
1207 self
.connected
= False
1209 def __enter__(self
):
1210 gobject
.timeout_add(1, self
.run_connect
)
1211 gobject
.timeout_add(15000, self
.timeout
)
1212 self
.add_signal(self
.propertiesChanged
, WPAS_DBUS_IFACE
,
1213 "PropertiesChanged")
1214 self
.add_signal(self
.networkRequest
, WPAS_DBUS_IFACE
,
1219 def propertiesChanged(self
, properties
):
1220 logger
.debug("propertiesChanged: %s" % str(properties
))
1221 if 'State' in properties
and properties
['State'] == "completed":
1222 self
.connected
= True
1225 def networkRequest(self
, path
, field
, txt
):
1226 logger
.debug("networkRequest: %s %s %s" % (path
, field
, txt
))
1227 if field
== "PSK_PASSPHRASE":
1228 iface
.NetworkReply(path
, field
, '"' + passphrase
+ '"')
1230 def run_connect(self
, *args
):
1231 logger
.debug("run_connect")
1232 args
= dbus
.Dictionary({ 'ssid': ssid
,
1233 'key_mgmt': 'WPA-PSK',
1235 'scan_freq': 2412 },
1237 self
.netw
= iface
.AddNetwork(args
)
1238 iface
.SelectNetwork(self
.netw
)
1242 return self
.connected
1244 with
TestDbusConnect(bus
) as t
:
1246 raise Exception("Expected signals not seen")
1248 def test_dbus_connect_oom(dev
, apdev
):
1249 """D-Bus AddNetwork and connect when out-of-memory"""
1250 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1251 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1253 if "OK" not in dev
[0].request("TEST_ALLOC_FAIL 0:"):
1254 raise HwsimSkip("TEST_ALLOC_FAIL not supported in the build")
1256 ssid
= "test-wpa2-psk"
1257 passphrase
= 'qwertyuiop'
1258 params
= hostapd
.wpa2_params(ssid
=ssid
, passphrase
=passphrase
)
1259 hapd
= hostapd
.add_ap(apdev
[0]['ifname'], params
)
1261 class TestDbusConnect(TestDbus
):
1262 def __init__(self
, bus
):
1263 TestDbus
.__init
__(self
, bus
)
1264 self
.network_added
= False
1265 self
.network_selected
= False
1266 self
.network_removed
= False
1269 def __enter__(self
):
1270 gobject
.timeout_add(1, self
.run_connect
)
1271 gobject
.timeout_add(1500, self
.timeout
)
1272 self
.add_signal(self
.networkAdded
, WPAS_DBUS_IFACE
, "NetworkAdded")
1273 self
.add_signal(self
.networkRemoved
, WPAS_DBUS_IFACE
,
1275 self
.add_signal(self
.networkSelected
, WPAS_DBUS_IFACE
,
1277 self
.add_signal(self
.propertiesChanged
, WPAS_DBUS_IFACE
,
1278 "PropertiesChanged")
1282 def networkAdded(self
, network
, properties
):
1283 logger
.debug("networkAdded: %s" % str(network
))
1284 logger
.debug(str(properties
))
1285 self
.network_added
= True
1287 def networkRemoved(self
, network
):
1288 logger
.debug("networkRemoved: %s" % str(network
))
1289 self
.network_removed
= True
1291 def networkSelected(self
, network
):
1292 logger
.debug("networkSelected: %s" % str(network
))
1293 self
.network_selected
= True
1295 def propertiesChanged(self
, properties
):
1296 logger
.debug("propertiesChanged: %s" % str(properties
))
1297 if 'State' in properties
and properties
['State'] == "completed":
1301 elif self
.state
== 2:
1304 elif self
.state
== 4:
1307 elif self
.state
== 5:
1309 res
= iface
.SignalPoll()
1310 logger
.debug("SignalPoll: " + str(res
))
1311 if 'frequency' not in res
or res
['frequency'] != 2412:
1313 logger
.info("Unexpected SignalPoll result")
1314 iface
.RemoveNetwork(self
.netw
)
1315 if 'State' in properties
and properties
['State'] == "disconnected":
1318 iface
.SelectNetwork(self
.netw
)
1319 elif self
.state
== 3:
1322 elif self
.state
== 6:
1326 def run_connect(self
, *args
):
1327 logger
.debug("run_connect")
1328 args
= dbus
.Dictionary({ 'ssid': ssid
,
1329 'key_mgmt': 'WPA-PSK',
1331 'scan_freq': 2412 },
1334 self
.netw
= iface
.AddNetwork(args
)
1335 except Exception, e
:
1336 logger
.info("Exception on AddNetwork: " + str(e
))
1340 iface
.SelectNetwork(self
.netw
)
1341 except Exception, e
:
1342 logger
.info("Exception on SelectNetwork: " + str(e
))
1348 if not self
.network_added
or \
1349 not self
.network_removed
or \
1350 not self
.network_selected
:
1352 return self
.state
== 7
1355 for i
in range(1, 1000):
1357 dev
[j
].dump_monitor()
1358 dev
[0].request("TEST_ALLOC_FAIL %d:main" % i
)
1360 with
TestDbusConnect(bus
) as t
:
1362 logger
.info("Iteration %d - Expected signals not seen" % i
)
1364 logger
.info("Iteration %d - success" % i
)
1366 state
= dev
[0].request('GET_ALLOC_FAIL')
1367 logger
.info("GET_ALLOC_FAIL: " + state
)
1368 dev
[0].dump_monitor()
1369 dev
[0].request("TEST_ALLOC_FAIL 0:")
1371 raise Exception("Connection succeeded during out-of-memory")
1372 if not state
.startswith('0:'):
1379 # Force regulatory update to re-fetch hw capabilities for the following
1382 dev
[0].dump_monitor()
1383 subprocess
.call(['iw', 'reg', 'set', 'US'])
1384 ev
= dev
[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout
=1)
1386 dev
[0].dump_monitor()
1387 subprocess
.call(['iw', 'reg', 'set', '00'])
1388 ev
= dev
[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout
=1)
1390 def test_dbus_while_not_connected(dev
, apdev
):
1391 """D-Bus invalid operations while not connected"""
1392 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1393 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1397 raise Exception("Disconnect() accepted when not connected")
1398 except dbus
.exceptions
.DBusException
, e
:
1399 if "NotConnected" not in str(e
):
1400 raise Exception("Unexpected error message for invalid Disconnect: " + str(e
))
1404 raise Exception("Reattach() accepted when not connected")
1405 except dbus
.exceptions
.DBusException
, e
:
1406 if "NotConnected" not in str(e
):
1407 raise Exception("Unexpected error message for invalid Reattach: " + str(e
))
1409 def test_dbus_connect_eap(dev
, apdev
):
1410 """D-Bus AddNetwork and connect to EAP network"""
1411 check_altsubject_match_support(dev
[0])
1412 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1413 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1415 ssid
= "ieee8021x-open"
1416 params
= hostapd
.radius_params()
1417 params
["ssid"] = ssid
1418 params
["ieee8021x"] = "1"
1419 hapd
= hostapd
.add_ap(apdev
[0]['ifname'], params
)
1421 class TestDbusConnect(TestDbus
):
1422 def __init__(self
, bus
):
1423 TestDbus
.__init
__(self
, bus
)
1424 self
.certification_received
= False
1425 self
.eap_status
= False
1428 def __enter__(self
):
1429 gobject
.timeout_add(1, self
.run_connect
)
1430 gobject
.timeout_add(15000, self
.timeout
)
1431 self
.add_signal(self
.propertiesChanged
, WPAS_DBUS_IFACE
,
1432 "PropertiesChanged")
1433 self
.add_signal(self
.certification
, WPAS_DBUS_IFACE
,
1434 "Certification", byte_arrays
=True)
1435 self
.add_signal(self
.networkRequest
, WPAS_DBUS_IFACE
,
1437 self
.add_signal(self
.eap
, WPAS_DBUS_IFACE
, "EAP")
1441 def propertiesChanged(self
, properties
):
1442 logger
.debug("propertiesChanged: %s" % str(properties
))
1443 if 'State' in properties
and properties
['State'] == "completed":
1447 logger
.info("Set dNSName constraint")
1448 net_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, self
.netw
)
1449 args
= dbus
.Dictionary({ 'altsubject_match':
1450 self
.server_dnsname
},
1452 net_obj
.Set(WPAS_DBUS_NETWORK
, "Properties", args
,
1453 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1454 elif self
.state
== 2:
1457 logger
.info("Set non-matching dNSName constraint")
1458 net_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, self
.netw
)
1459 args
= dbus
.Dictionary({ 'altsubject_match':
1460 self
.server_dnsname
+ "FOO" },
1462 net_obj
.Set(WPAS_DBUS_NETWORK
, "Properties", args
,
1463 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1464 if 'State' in properties
and properties
['State'] == "disconnected":
1468 iface
.SelectNetwork(self
.netw
)
1471 iface
.SelectNetwork(self
.netw
)
1473 def certification(self
, args
):
1474 logger
.debug("certification: %s" % str(args
))
1475 self
.certification_received
= True
1476 if args
['depth'] == 0:
1477 # The test server certificate is supposed to have dNSName
1478 if len(args
['altsubject']) < 1:
1479 raise Exception("Missing dNSName")
1480 dnsname
= args
['altsubject'][0]
1481 if not dnsname
.startswith("DNS:"):
1482 raise Exception("Expected dNSName not found: " + dnsname
)
1483 logger
.info("altsubject: " + dnsname
)
1484 self
.server_dnsname
= dnsname
1486 def eap(self
, status
, parameter
):
1487 logger
.debug("EAP: status=%s parameter=%s" % (status
, parameter
))
1488 if status
== 'completion' and parameter
== 'success':
1489 self
.eap_status
= True
1490 if self
.state
== 4 and status
== 'remote certificate verification' and parameter
== 'AltSubject mismatch':
1494 def networkRequest(self
, path
, field
, txt
):
1495 logger
.debug("networkRequest: %s %s %s" % (path
, field
, txt
))
1496 if field
== "PASSWORD":
1497 iface
.NetworkReply(path
, field
, "password")
1499 def run_connect(self
, *args
):
1500 logger
.debug("run_connect")
1501 args
= dbus
.Dictionary({ 'ssid': ssid
,
1502 'key_mgmt': 'IEEE8021X',
1505 'anonymous_identity': 'ttls',
1506 'identity': 'pap user',
1507 'ca_cert': 'auth_serv/ca.pem',
1508 'phase2': 'auth=PAP',
1509 'scan_freq': 2412 },
1511 self
.netw
= iface
.AddNetwork(args
)
1512 iface
.SelectNetwork(self
.netw
)
1516 if not self
.eap_status
or not self
.certification_received
:
1518 return self
.state
== 5
1520 with
TestDbusConnect(bus
) as t
:
1522 raise Exception("Expected signals not seen")
1524 def test_dbus_network(dev
, apdev
):
1525 """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
1526 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1527 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1529 args
= dbus
.Dictionary({ 'ssid': "foo",
1530 'key_mgmt': 'WPA-PSK',
1532 'identity': dbus
.ByteArray([ 1, 2 ]),
1533 'priority': dbus
.Int32(0),
1534 'scan_freq': dbus
.UInt32(2412) },
1536 netw
= iface
.AddNetwork(args
)
1537 id = int(dev
[0].list_networks()[0]['id'])
1538 val
= dev
[0].get_network(id, "scan_freq")
1540 raise Exception("Invalid scan_freq value: " + str(val
))
1541 iface
.RemoveNetwork(netw
)
1543 args
= dbus
.Dictionary({ 'ssid': "foo",
1545 'scan_freq': "2412 2432",
1546 'freq_list': "2412 2417 2432" },
1548 netw
= iface
.AddNetwork(args
)
1549 id = int(dev
[0].list_networks()[0]['id'])
1550 val
= dev
[0].get_network(id, "scan_freq")
1551 if val
!= "2412 2432":
1552 raise Exception("Invalid scan_freq value (2): " + str(val
))
1553 val
= dev
[0].get_network(id, "freq_list")
1554 if val
!= "2412 2417 2432":
1555 raise Exception("Invalid freq_list value: " + str(val
))
1556 iface
.RemoveNetwork(netw
)
1558 iface
.RemoveNetwork(netw
)
1559 raise Exception("Invalid RemoveNetwork() accepted")
1560 except dbus
.exceptions
.DBusException
, e
:
1561 if "NetworkUnknown" not in str(e
):
1562 raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e
))
1564 iface
.SelectNetwork(netw
)
1565 raise Exception("Invalid SelectNetwork() accepted")
1566 except dbus
.exceptions
.DBusException
, e
:
1567 if "NetworkUnknown" not in str(e
):
1568 raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e
))
1570 args
= dbus
.Dictionary({ 'ssid': "foo1", 'key_mgmt': 'NONE',
1571 'identity': "testuser", 'scan_freq': '2412' },
1573 netw1
= iface
.AddNetwork(args
)
1574 args
= dbus
.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1576 netw2
= iface
.AddNetwork(args
)
1577 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "Networks",
1578 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1580 raise Exception("Unexpected number of networks")
1582 net_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, netw1
)
1583 res
= net_obj
.Get(WPAS_DBUS_NETWORK
, "Enabled",
1584 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1586 raise Exception("Added network was unexpectedly enabled by default")
1587 net_obj
.Set(WPAS_DBUS_NETWORK
, "Enabled", dbus
.Boolean(True),
1588 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1589 res
= net_obj
.Get(WPAS_DBUS_NETWORK
, "Enabled",
1590 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1592 raise Exception("Set(Enabled,True) did not seem to change property value")
1593 net_obj
.Set(WPAS_DBUS_NETWORK
, "Enabled", dbus
.Boolean(False),
1594 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1595 res
= net_obj
.Get(WPAS_DBUS_NETWORK
, "Enabled",
1596 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1598 raise Exception("Set(Enabled,False) did not seem to change property value")
1600 net_obj
.Set(WPAS_DBUS_NETWORK
, "Enabled", dbus
.UInt32(1),
1601 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1602 raise Exception("Invalid Set(Enabled,1) accepted")
1603 except dbus
.exceptions
.DBusException
, e
:
1604 if "Error.Failed: wrong property type" not in str(e
):
1605 raise Exception("Unexpected error message for invalid Set(Enabled,1): " + str(e
))
1607 args
= dbus
.Dictionary({ 'ssid': "foo1new" }, signature
='sv')
1608 net_obj
.Set(WPAS_DBUS_NETWORK
, "Properties", args
,
1609 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1610 res
= net_obj
.Get(WPAS_DBUS_NETWORK
, "Properties",
1611 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1612 if res
['ssid'] != '"foo1new"':
1613 raise Exception("Set(Properties) failed to update ssid")
1614 if res
['identity'] != '"testuser"':
1615 raise Exception("Set(Properties) unexpectedly changed unrelated parameter")
1617 iface
.RemoveAllNetworks()
1618 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "Networks",
1619 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1621 raise Exception("Unexpected number of networks")
1622 iface
.RemoveAllNetworks()
1624 tests
= [ dbus
.Dictionary({ 'psk': "1234567" }, signature
='sv'),
1625 dbus
.Dictionary({ 'identity': dbus
.ByteArray() },
1627 dbus
.Dictionary({ 'identity': dbus
.Byte(1) }, signature
='sv'),
1628 dbus
.Dictionary({ 'identity': "" }, signature
='sv') ]
1631 iface
.AddNetwork(args
)
1632 raise Exception("Invalid AddNetwork args accepted: " + str(args
))
1633 except dbus
.exceptions
.DBusException
, e
:
1634 if "InvalidArgs" not in str(e
):
1635 raise Exception("Unexpected error message for invalid AddNetwork: " + str(e
))
1637 def test_dbus_network_oom(dev
, apdev
):
1638 """D-Bus AddNetwork/RemoveNetwork parameters and OOM error cases"""
1639 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1640 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1642 args
= dbus
.Dictionary({ 'ssid': "foo1", 'key_mgmt': 'NONE',
1643 'identity': "testuser", 'scan_freq': '2412' },
1645 netw1
= iface
.AddNetwork(args
)
1646 net_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, netw1
)
1648 with
alloc_fail_dbus(dev
[0], 1,
1649 "wpa_config_get_all;wpas_dbus_getter_network_properties",
1651 net_obj
.Get(WPAS_DBUS_NETWORK
, "Properties",
1652 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1654 iface
.RemoveAllNetworks()
1656 with
alloc_fail_dbus(dev
[0], 1,
1657 "wpas_dbus_new_decompose_object_path;wpas_dbus_handler_remove_network",
1658 "RemoveNetwork", "InvalidArgs"):
1659 iface
.RemoveNetwork(dbus
.ObjectPath("/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234"))
1661 with
alloc_fail(dev
[0], 1, "wpa_dbus_register_object_per_iface;wpas_dbus_register_network"):
1662 args
= dbus
.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1665 netw
= iface
.AddNetwork(args
)
1666 # Currently, AddNetwork() succeeds even if os_strdup() for path
1667 # fails, so remove the network if that occurs.
1668 iface
.RemoveNetwork(netw
)
1669 except dbus
.exceptions
.DBusException
, e
:
1672 for i
in range(1, 3):
1673 with
alloc_fail(dev
[0], i
, "=wpas_dbus_register_network"):
1675 netw
= iface
.AddNetwork(args
)
1676 # Currently, AddNetwork() succeeds even if network registration
1677 # fails, so remove the network if that occurs.
1678 iface
.RemoveNetwork(netw
)
1679 except dbus
.exceptions
.DBusException
, e
:
1682 with
alloc_fail_dbus(dev
[0], 1,
1683 "=wpa_config_add_network;wpas_dbus_handler_add_network",
1685 "UnknownError: wpa_supplicant could not add a network"):
1686 args
= dbus
.Dictionary({ 'ssid': "foo2", 'key_mgmt': 'NONE' },
1688 netw
= iface
.AddNetwork(args
)
1691 'wpa_dbus_dict_get_entry;set_network_properties;wpas_dbus_handler_add_network',
1692 dbus
.Dictionary({ 'ssid': dbus
.ByteArray(' ') },
1694 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1695 dbus
.Dictionary({ 'ssid': 'foo' }, signature
='sv')),
1696 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1697 dbus
.Dictionary({ 'eap': 'foo' }, signature
='sv')),
1698 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1699 dbus
.Dictionary({ 'priority': dbus
.UInt32(1) },
1701 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1702 dbus
.Dictionary({ 'priority': dbus
.Int32(1) },
1704 (1, '=set_network_properties;wpas_dbus_handler_add_network',
1705 dbus
.Dictionary({ 'ssid': dbus
.ByteArray(' ') },
1707 for (count
,funcs
,args
) in tests
:
1708 with
alloc_fail_dbus(dev
[0], count
, funcs
, "AddNetwork", "InvalidArgs"):
1709 netw
= iface
.AddNetwork(args
)
1711 if len(if_obj
.Get(WPAS_DBUS_IFACE
, 'Networks',
1712 dbus_interface
=dbus
.PROPERTIES_IFACE
)) > 0:
1713 raise Exception("Unexpected network block added")
1714 if len(dev
[0].list_networks()) > 0:
1715 raise Exception("Unexpected network block visible")
1717 def test_dbus_interface(dev
, apdev
):
1718 """D-Bus CreateInterface/GetInterface/RemoveInterface parameters and error cases"""
1720 _test_dbus_interface(dev
, apdev
)
1722 # Need to force P2P channel list update since the 'lo' interface
1723 # with driver=none ends up configuring default dualband channels.
1724 dev
[0].request("SET country US")
1725 ev
= dev
[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout
=1)
1727 ev
= dev
[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"],
1729 dev
[0].request("SET country 00")
1730 ev
= dev
[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout
=1)
1732 ev
= dev
[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"],
1734 subprocess
.call(['iw', 'reg', 'set', '00'])
1736 def _test_dbus_interface(dev
, apdev
):
1737 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1738 wpas
= dbus
.Interface(wpas_obj
, WPAS_DBUS_SERVICE
)
1740 params
= dbus
.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1742 path
= wpas
.CreateInterface(params
)
1743 logger
.debug("New interface path: " + str(path
))
1744 path2
= wpas
.GetInterface("lo")
1746 raise Exception("Interface object mismatch")
1748 params
= dbus
.Dictionary({ 'Ifname': 'lo',
1750 'ConfigFile': 'foo',
1751 'BridgeIfname': 'foo', },
1754 wpas
.CreateInterface(params
)
1755 raise Exception("Invalid CreateInterface() accepted")
1756 except dbus
.exceptions
.DBusException
, e
:
1757 if "InterfaceExists" not in str(e
):
1758 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e
))
1760 wpas
.RemoveInterface(path
)
1762 wpas
.RemoveInterface(path
)
1763 raise Exception("Invalid RemoveInterface() accepted")
1764 except dbus
.exceptions
.DBusException
, e
:
1765 if "InterfaceUnknown" not in str(e
):
1766 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e
))
1768 params
= dbus
.Dictionary({ 'Ifname': 'lo', 'Driver': 'none',
1772 wpas
.CreateInterface(params
)
1773 raise Exception("Invalid CreateInterface() accepted")
1774 except dbus
.exceptions
.DBusException
, e
:
1775 if "InvalidArgs" not in str(e
):
1776 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e
))
1778 params
= dbus
.Dictionary({ 'Driver': 'none' }, signature
='sv')
1780 wpas
.CreateInterface(params
)
1781 raise Exception("Invalid CreateInterface() accepted")
1782 except dbus
.exceptions
.DBusException
, e
:
1783 if "InvalidArgs" not in str(e
):
1784 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e
))
1787 wpas
.GetInterface("lo")
1788 raise Exception("Invalid GetInterface() accepted")
1789 except dbus
.exceptions
.DBusException
, e
:
1790 if "InterfaceUnknown" not in str(e
):
1791 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e
))
1793 def test_dbus_interface_oom(dev
, apdev
):
1794 """D-Bus CreateInterface/GetInterface/RemoveInterface OOM error cases"""
1795 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1796 wpas
= dbus
.Interface(wpas_obj
, WPAS_DBUS_SERVICE
)
1798 with
alloc_fail_dbus(dev
[0], 1, "wpa_dbus_dict_get_entry;wpas_dbus_handler_create_interface", "CreateInterface", "InvalidArgs"):
1799 params
= dbus
.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1801 wpas
.CreateInterface(params
)
1803 for i
in range(1, 1000):
1804 dev
[0].request("TEST_ALLOC_FAIL %d:wpa_supplicant_add_iface;wpas_dbus_handler_create_interface" % i
)
1805 params
= dbus
.Dictionary({ 'Ifname': 'lo', 'Driver': 'none' },
1808 npath
= wpas
.CreateInterface(params
)
1809 wpas
.RemoveInterface(npath
)
1810 logger
.info("CreateInterface succeeds after %d allocation failures" % i
)
1811 state
= dev
[0].request('GET_ALLOC_FAIL')
1812 logger
.info("GET_ALLOC_FAIL: " + state
)
1813 dev
[0].dump_monitor()
1814 dev
[0].request("TEST_ALLOC_FAIL 0:")
1816 raise Exception("CreateInterface succeeded during out-of-memory")
1817 if not state
.startswith('0:'):
1819 except dbus
.exceptions
.DBusException
, e
:
1822 for arg
in [ 'Driver', 'Ifname', 'ConfigFile', 'BridgeIfname' ]:
1823 with
alloc_fail_dbus(dev
[0], 1, "=wpas_dbus_handler_create_interface",
1825 params
= dbus
.Dictionary({ arg
: 'foo' }, signature
='sv')
1826 wpas
.CreateInterface(params
)
1828 def test_dbus_blob(dev
, apdev
):
1829 """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
1830 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1831 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1833 blob
= dbus
.ByteArray("\x01\x02\x03")
1834 iface
.AddBlob('blob1', blob
)
1836 iface
.AddBlob('blob1', dbus
.ByteArray("\x01\x02\x04"))
1837 raise Exception("Invalid AddBlob() accepted")
1838 except dbus
.exceptions
.DBusException
, e
:
1839 if "BlobExists" not in str(e
):
1840 raise Exception("Unexpected error message for invalid AddBlob: " + str(e
))
1841 res
= iface
.GetBlob('blob1')
1842 if len(res
) != len(blob
):
1843 raise Exception("Unexpected blob data length")
1844 for i
in range(len(res
)):
1845 if res
[i
] != dbus
.Byte(blob
[i
]):
1846 raise Exception("Unexpected blob data")
1847 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "Blobs",
1848 dbus_interface
=dbus
.PROPERTIES_IFACE
)
1849 if 'blob1' not in res
:
1850 raise Exception("Added blob missing from Blobs property")
1851 iface
.RemoveBlob('blob1')
1853 iface
.RemoveBlob('blob1')
1854 raise Exception("Invalid RemoveBlob() accepted")
1855 except dbus
.exceptions
.DBusException
, e
:
1856 if "BlobUnknown" not in str(e
):
1857 raise Exception("Unexpected error message for invalid RemoveBlob: " + str(e
))
1859 iface
.GetBlob('blob1')
1860 raise Exception("Invalid GetBlob() accepted")
1861 except dbus
.exceptions
.DBusException
, e
:
1862 if "BlobUnknown" not in str(e
):
1863 raise Exception("Unexpected error message for invalid GetBlob: " + str(e
))
1865 class TestDbusBlob(TestDbus
):
1866 def __init__(self
, bus
):
1867 TestDbus
.__init
__(self
, bus
)
1868 self
.blob_added
= False
1869 self
.blob_removed
= False
1871 def __enter__(self
):
1872 gobject
.timeout_add(1, self
.run_blob
)
1873 gobject
.timeout_add(15000, self
.timeout
)
1874 self
.add_signal(self
.blobAdded
, WPAS_DBUS_IFACE
, "BlobAdded")
1875 self
.add_signal(self
.blobRemoved
, WPAS_DBUS_IFACE
, "BlobRemoved")
1879 def blobAdded(self
, blobName
):
1880 logger
.debug("blobAdded: %s" % blobName
)
1881 if blobName
== 'blob2':
1882 self
.blob_added
= True
1884 def blobRemoved(self
, blobName
):
1885 logger
.debug("blobRemoved: %s" % blobName
)
1886 if blobName
== 'blob2':
1887 self
.blob_removed
= True
1890 def run_blob(self
, *args
):
1891 logger
.debug("run_blob")
1892 iface
.AddBlob('blob2', dbus
.ByteArray("\x01\x02\x04"))
1893 iface
.RemoveBlob('blob2')
1897 return self
.blob_added
and self
.blob_removed
1899 with
TestDbusBlob(bus
) as t
:
1901 raise Exception("Expected signals not seen")
1903 def test_dbus_blob_oom(dev
, apdev
):
1904 """D-Bus AddNetwork/RemoveNetwork OOM error cases"""
1905 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1906 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1908 for i
in range(1, 4):
1909 with
alloc_fail_dbus(dev
[0], i
, "wpas_dbus_handler_add_blob",
1911 iface
.AddBlob('blob_no_mem', dbus
.ByteArray("\x01\x02\x03\x04"))
1913 def test_dbus_autoscan(dev
, apdev
):
1914 """D-Bus Autoscan()"""
1915 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1916 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1918 iface
.AutoScan("foo")
1919 iface
.AutoScan("periodic:1")
1921 dev
[0].request("AUTOSCAN ")
1923 def test_dbus_autoscan_oom(dev
, apdev
):
1924 """D-Bus Autoscan() OOM"""
1925 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1926 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1928 with
alloc_fail_dbus(dev
[0], 1, "wpas_dbus_handler_autoscan", "AutoScan"):
1929 iface
.AutoScan("foo")
1930 dev
[0].request("AUTOSCAN ")
1932 def test_dbus_tdls_invalid(dev
, apdev
):
1933 """D-Bus invalid TDLS operations"""
1934 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1935 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1937 hapd
= hostapd
.add_ap(apdev
[0]['ifname'], { "ssid": "test-open" })
1938 connect_2sta_open(dev
, hapd
)
1939 addr1
= dev
[1].p2p_interface_addr()
1942 iface
.TDLSDiscover("foo")
1943 raise Exception("Invalid TDLSDiscover() accepted")
1944 except dbus
.exceptions
.DBusException
, e
:
1945 if "InvalidArgs" not in str(e
):
1946 raise Exception("Unexpected error message for invalid TDLSDiscover: " + str(e
))
1949 iface
.TDLSStatus("foo")
1950 raise Exception("Invalid TDLSStatus() accepted")
1951 except dbus
.exceptions
.DBusException
, e
:
1952 if "InvalidArgs" not in str(e
):
1953 raise Exception("Unexpected error message for invalid TDLSStatus: " + str(e
))
1955 res
= iface
.TDLSStatus(addr1
)
1956 if res
!= "peer does not exist":
1957 raise Exception("Unexpected TDLSStatus response")
1960 iface
.TDLSSetup("foo")
1961 raise Exception("Invalid TDLSSetup() accepted")
1962 except dbus
.exceptions
.DBusException
, e
:
1963 if "InvalidArgs" not in str(e
):
1964 raise Exception("Unexpected error message for invalid TDLSSetup: " + str(e
))
1967 iface
.TDLSTeardown("foo")
1968 raise Exception("Invalid TDLSTeardown() accepted")
1969 except dbus
.exceptions
.DBusException
, e
:
1970 if "InvalidArgs" not in str(e
):
1971 raise Exception("Unexpected error message for invalid TDLSTeardown: " + str(e
))
1974 iface
.TDLSTeardown("00:11:22:33:44:55")
1975 raise Exception("TDLSTeardown accepted for unknown peer")
1976 except dbus
.exceptions
.DBusException
, e
:
1977 if "UnknownError: error performing TDLS teardown" not in str(e
):
1978 raise Exception("Unexpected error message: " + str(e
))
1980 def test_dbus_tdls_oom(dev
, apdev
):
1981 """D-Bus TDLS operations during OOM"""
1982 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1983 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1985 with
alloc_fail_dbus(dev
[0], 1, "wpa_tdls_add_peer", "TDLSSetup",
1986 "UnknownError: error performing TDLS setup"):
1987 iface
.TDLSSetup("00:11:22:33:44:55")
1989 def test_dbus_tdls(dev
, apdev
):
1991 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
1992 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
1994 hapd
= hostapd
.add_ap(apdev
[0]['ifname'], { "ssid": "test-open" })
1995 connect_2sta_open(dev
, hapd
)
1997 addr1
= dev
[1].p2p_interface_addr()
1999 class TestDbusTdls(TestDbus
):
2000 def __init__(self
, bus
):
2001 TestDbus
.__init
__(self
, bus
)
2002 self
.tdls_setup
= False
2003 self
.tdls_teardown
= False
2005 def __enter__(self
):
2006 gobject
.timeout_add(1, self
.run_tdls
)
2007 gobject
.timeout_add(15000, self
.timeout
)
2008 self
.add_signal(self
.propertiesChanged
, WPAS_DBUS_IFACE
,
2009 "PropertiesChanged")
2013 def propertiesChanged(self
, properties
):
2014 logger
.debug("propertiesChanged: %s" % str(properties
))
2016 def run_tdls(self
, *args
):
2017 logger
.debug("run_tdls")
2018 iface
.TDLSDiscover(addr1
)
2019 gobject
.timeout_add(100, self
.run_tdls2
)
2022 def run_tdls2(self
, *args
):
2023 logger
.debug("run_tdls2")
2024 iface
.TDLSSetup(addr1
)
2025 gobject
.timeout_add(500, self
.run_tdls3
)
2028 def run_tdls3(self
, *args
):
2029 logger
.debug("run_tdls3")
2030 res
= iface
.TDLSStatus(addr1
)
2031 if res
== "connected":
2032 self
.tdls_setup
= True
2034 logger
.info("Unexpected TDLSStatus: " + res
)
2035 iface
.TDLSTeardown(addr1
)
2036 gobject
.timeout_add(200, self
.run_tdls4
)
2039 def run_tdls4(self
, *args
):
2040 logger
.debug("run_tdls4")
2041 res
= iface
.TDLSStatus(addr1
)
2042 if res
== "peer does not exist":
2043 self
.tdls_teardown
= True
2045 logger
.info("Unexpected TDLSStatus: " + res
)
2050 return self
.tdls_setup
and self
.tdls_teardown
2052 with
TestDbusTdls(bus
) as t
:
2054 raise Exception("Expected signals not seen")
2056 def test_dbus_pkcs11(dev
, apdev
):
2057 """D-Bus SetPKCS11EngineAndModulePath()"""
2058 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2059 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
2062 iface
.SetPKCS11EngineAndModulePath("foo", "bar")
2063 except dbus
.exceptions
.DBusException
, e
:
2064 if "Error.Failed: Reinit of the EAPOL" not in str(e
):
2065 raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e
))
2068 iface
.SetPKCS11EngineAndModulePath("foo", "")
2069 except dbus
.exceptions
.DBusException
, e
:
2070 if "Error.Failed: Reinit of the EAPOL" not in str(e
):
2071 raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e
))
2073 iface
.SetPKCS11EngineAndModulePath("", "bar")
2074 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "PKCS11EnginePath",
2075 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2077 raise Exception("Unexpected PKCS11EnginePath value: " + res
)
2078 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "PKCS11ModulePath",
2079 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2081 raise Exception("Unexpected PKCS11ModulePath value: " + res
)
2083 iface
.SetPKCS11EngineAndModulePath("", "")
2084 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "PKCS11EnginePath",
2085 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2087 raise Exception("Unexpected PKCS11EnginePath value: " + res
)
2088 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "PKCS11ModulePath",
2089 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2091 raise Exception("Unexpected PKCS11ModulePath value: " + res
)
2093 def test_dbus_apscan(dev
, apdev
):
2094 """D-Bus Get/Set ApScan"""
2096 _test_dbus_apscan(dev
, apdev
)
2098 dev
[0].request("AP_SCAN 1")
2100 def _test_dbus_apscan(dev
, apdev
):
2101 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2103 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "ApScan",
2104 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2106 raise Exception("Unexpected initial ApScan value: %d" % res
)
2109 if_obj
.Set(WPAS_DBUS_IFACE
, "ApScan", dbus
.UInt32(i
),
2110 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2111 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "ApScan",
2112 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2114 raise Exception("Unexpected ApScan value %d (expected %d)" % (res
, i
))
2117 if_obj
.Set(WPAS_DBUS_IFACE
, "ApScan", dbus
.Int16(-1),
2118 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2119 raise Exception("Invalid Set(ApScan,-1) accepted")
2120 except dbus
.exceptions
.DBusException
, e
:
2121 if "Error.Failed: wrong property type" not in str(e
):
2122 raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e
))
2125 if_obj
.Set(WPAS_DBUS_IFACE
, "ApScan", dbus
.UInt32(123),
2126 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2127 raise Exception("Invalid Set(ApScan,123) accepted")
2128 except dbus
.exceptions
.DBusException
, e
:
2129 if "Error.Failed: ap_scan must be 0, 1, or 2" not in str(e
):
2130 raise Exception("Unexpected error message for invalid Set(ApScan,123): " + str(e
))
2132 if_obj
.Set(WPAS_DBUS_IFACE
, "ApScan", dbus
.UInt32(1),
2133 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2135 def test_dbus_fastreauth(dev
, apdev
):
2136 """D-Bus Get/Set FastReauth"""
2137 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2139 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "FastReauth",
2140 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2142 raise Exception("Unexpected initial FastReauth value: " + str(res
))
2144 for i
in [ False, True ]:
2145 if_obj
.Set(WPAS_DBUS_IFACE
, "FastReauth", dbus
.Boolean(i
),
2146 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2147 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "FastReauth",
2148 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2150 raise Exception("Unexpected FastReauth value %d (expected %d)" % (res
, i
))
2153 if_obj
.Set(WPAS_DBUS_IFACE
, "FastReauth", dbus
.Int16(-1),
2154 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2155 raise Exception("Invalid Set(FastReauth,-1) accepted")
2156 except dbus
.exceptions
.DBusException
, e
:
2157 if "Error.Failed: wrong property type" not in str(e
):
2158 raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e
))
2160 if_obj
.Set(WPAS_DBUS_IFACE
, "FastReauth", dbus
.Boolean(True),
2161 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2163 def test_dbus_bss_expire(dev
, apdev
):
2164 """D-Bus Get/Set BSSExpireAge and BSSExpireCount"""
2165 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2167 if_obj
.Set(WPAS_DBUS_IFACE
, "BSSExpireAge", dbus
.UInt32(179),
2168 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2169 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "BSSExpireAge",
2170 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2172 raise Exception("Unexpected BSSExpireAge value %d (expected %d)" % (res
, i
))
2174 if_obj
.Set(WPAS_DBUS_IFACE
, "BSSExpireCount", dbus
.UInt32(3),
2175 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2176 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "BSSExpireCount",
2177 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2179 raise Exception("Unexpected BSSExpireCount value %d (expected %d)" % (res
, i
))
2182 if_obj
.Set(WPAS_DBUS_IFACE
, "BSSExpireAge", dbus
.Int16(-1),
2183 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2184 raise Exception("Invalid Set(BSSExpireAge,-1) accepted")
2185 except dbus
.exceptions
.DBusException
, e
:
2186 if "Error.Failed: wrong property type" not in str(e
):
2187 raise Exception("Unexpected error message for invalid Set(BSSExpireAge,-1): " + str(e
))
2190 if_obj
.Set(WPAS_DBUS_IFACE
, "BSSExpireAge", dbus
.UInt32(9),
2191 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2192 raise Exception("Invalid Set(BSSExpireAge,9) accepted")
2193 except dbus
.exceptions
.DBusException
, e
:
2194 if "Error.Failed: BSSExpireAge must be >= 10" not in str(e
):
2195 raise Exception("Unexpected error message for invalid Set(BSSExpireAge,9): " + str(e
))
2198 if_obj
.Set(WPAS_DBUS_IFACE
, "BSSExpireCount", dbus
.Int16(-1),
2199 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2200 raise Exception("Invalid Set(BSSExpireCount,-1) accepted")
2201 except dbus
.exceptions
.DBusException
, e
:
2202 if "Error.Failed: wrong property type" not in str(e
):
2203 raise Exception("Unexpected error message for invalid Set(BSSExpireCount,-1): " + str(e
))
2206 if_obj
.Set(WPAS_DBUS_IFACE
, "BSSExpireCount", dbus
.UInt32(0),
2207 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2208 raise Exception("Invalid Set(BSSExpireCount,0) accepted")
2209 except dbus
.exceptions
.DBusException
, e
:
2210 if "Error.Failed: BSSExpireCount must be > 0" not in str(e
):
2211 raise Exception("Unexpected error message for invalid Set(BSSExpireCount,0): " + str(e
))
2213 if_obj
.Set(WPAS_DBUS_IFACE
, "BSSExpireAge", dbus
.UInt32(180),
2214 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2215 if_obj
.Set(WPAS_DBUS_IFACE
, "BSSExpireCount", dbus
.UInt32(2),
2216 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2218 def test_dbus_country(dev
, apdev
):
2219 """D-Bus Get/Set Country"""
2221 _test_dbus_country(dev
, apdev
)
2223 dev
[0].request("SET country 00")
2224 subprocess
.call(['iw', 'reg', 'set', '00'])
2226 def _test_dbus_country(dev
, apdev
):
2227 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2229 # work around issues with possible pending regdom event from the end of
2230 # the previous test case
2232 dev
[0].dump_monitor()
2234 if_obj
.Set(WPAS_DBUS_IFACE
, "Country", "FI",
2235 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2236 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "Country",
2237 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2239 raise Exception("Unexpected Country value %s (expected FI)" % res
)
2241 ev
= dev
[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
2243 # For now, work around separate P2P Device interface event delivery
2244 ev
= dev
[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout
=1)
2246 raise Exception("regdom change event not seen")
2247 if "init=USER type=COUNTRY alpha2=FI" not in ev
:
2248 raise Exception("Unexpected event contents: " + ev
)
2251 if_obj
.Set(WPAS_DBUS_IFACE
, "Country", dbus
.Int16(-1),
2252 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2253 raise Exception("Invalid Set(Country,-1) accepted")
2254 except dbus
.exceptions
.DBusException
, e
:
2255 if "Error.Failed: wrong property type" not in str(e
):
2256 raise Exception("Unexpected error message for invalid Set(Country,-1): " + str(e
))
2259 if_obj
.Set(WPAS_DBUS_IFACE
, "Country", "F",
2260 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2261 raise Exception("Invalid Set(Country,F) accepted")
2262 except dbus
.exceptions
.DBusException
, e
:
2263 if "Error.Failed: invalid country code" not in str(e
):
2264 raise Exception("Unexpected error message for invalid Set(Country,F): " + str(e
))
2266 if_obj
.Set(WPAS_DBUS_IFACE
, "Country", "00",
2267 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2269 ev
= dev
[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
2271 # For now, work around separate P2P Device interface event delivery
2272 ev
= dev
[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout
=1)
2274 raise Exception("regdom change event not seen")
2275 if "init=CORE type=WORLD" not in ev
:
2276 raise Exception("Unexpected event contents: " + ev
)
2278 def test_dbus_scan_interval(dev
, apdev
):
2279 """D-Bus Get/Set ScanInterval"""
2281 _test_dbus_scan_interval(dev
, apdev
)
2283 dev
[0].request("SCAN_INTERVAL 5")
2285 def _test_dbus_scan_interval(dev
, apdev
):
2286 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2288 if_obj
.Set(WPAS_DBUS_IFACE
, "ScanInterval", dbus
.Int32(3),
2289 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2290 res
= if_obj
.Get(WPAS_DBUS_IFACE
, "ScanInterval",
2291 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2293 raise Exception("Unexpected ScanInterval value %d (expected %d)" % (res
, i
))
2296 if_obj
.Set(WPAS_DBUS_IFACE
, "ScanInterval", dbus
.UInt16(100),
2297 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2298 raise Exception("Invalid Set(ScanInterval,100) accepted")
2299 except dbus
.exceptions
.DBusException
, e
:
2300 if "Error.Failed: wrong property type" not in str(e
):
2301 raise Exception("Unexpected error message for invalid Set(ScanInterval,100): " + str(e
))
2304 if_obj
.Set(WPAS_DBUS_IFACE
, "ScanInterval", dbus
.Int32(-1),
2305 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2306 raise Exception("Invalid Set(ScanInterval,-1) accepted")
2307 except dbus
.exceptions
.DBusException
, e
:
2308 if "Error.Failed: scan_interval must be >= 0" not in str(e
):
2309 raise Exception("Unexpected error message for invalid Set(ScanInterval,-1): " + str(e
))
2311 if_obj
.Set(WPAS_DBUS_IFACE
, "ScanInterval", dbus
.Int32(5),
2312 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2314 def test_dbus_probe_req_reporting(dev
, apdev
):
2315 """D-Bus Probe Request reporting"""
2316 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2318 dev
[1].p2p_find(social
=True)
2320 class TestDbusProbe(TestDbus
):
2321 def __init__(self
, bus
):
2322 TestDbus
.__init
__(self
, bus
)
2323 self
.reported
= False
2325 def __enter__(self
):
2326 gobject
.timeout_add(1, self
.run_test
)
2327 gobject
.timeout_add(15000, self
.timeout
)
2328 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
2330 self
.add_signal(self
.probeRequest
, WPAS_DBUS_IFACE
, "ProbeRequest",
2335 def groupStarted(self
, properties
):
2336 logger
.debug("groupStarted: " + str(properties
))
2337 g_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
2338 properties
['interface_object'])
2339 self
.iface
= dbus
.Interface(g_if_obj
, WPAS_DBUS_IFACE
)
2340 self
.iface
.SubscribeProbeReq()
2341 self
.group_p2p
= dbus
.Interface(g_if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
2343 def probeRequest(self
, args
):
2344 logger
.debug("probeRequest: args=%s" % str(args
))
2345 self
.reported
= True
2348 def run_test(self
, *args
):
2349 logger
.debug("run_test")
2350 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
2351 params
= dbus
.Dictionary({ 'frequency': 2412 })
2352 p2p
.GroupAdd(params
)
2356 return self
.reported
2358 with
TestDbusProbe(bus
) as t
:
2360 raise Exception("Expected signals not seen")
2361 t
.iface
.UnsubscribeProbeReq()
2363 t
.iface
.UnsubscribeProbeReq()
2364 raise Exception("Invalid UnsubscribeProbeReq() accepted")
2365 except dbus
.exceptions
.DBusException
, e
:
2366 if "NoSubscription" not in str(e
):
2367 raise Exception("Unexpected error message for invalid UnsubscribeProbeReq(): " + str(e
))
2368 t
.group_p2p
.Disconnect()
2370 with
TestDbusProbe(bus
) as t
:
2372 raise Exception("Expected signals not seen")
2373 # On purpose, leave ProbeReq subscription in place to test automatic
2376 dev
[1].p2p_stop_find()
2378 def test_dbus_probe_req_reporting_oom(dev
, apdev
):
2379 """D-Bus Probe Request reporting (OOM)"""
2380 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2381 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
2383 # Need to make sure this process has not already subscribed to avoid false
2384 # failures due to the operation succeeding due to os_strdup() not even
2387 iface
.UnsubscribeProbeReq()
2388 was_subscribed
= True
2389 except dbus
.exceptions
.DBusException
, e
:
2390 was_subscribed
= False
2393 with
alloc_fail_dbus(dev
[0], 1, "wpas_dbus_handler_subscribe_preq",
2394 "SubscribeProbeReq"):
2395 iface
.SubscribeProbeReq()
2398 # On purpose, leave ProbeReq subscription in place to test automatic
2400 iface
.SubscribeProbeReq()
2402 def test_dbus_p2p_invalid(dev
, apdev
):
2403 """D-Bus invalid P2P operations"""
2404 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2405 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
2408 p2p
.RejectPeer(path
+ "/Peers/00112233445566")
2409 raise Exception("Invalid RejectPeer accepted")
2410 except dbus
.exceptions
.DBusException
, e
:
2411 if "UnknownError: Failed to call wpas_p2p_reject" not in str(e
):
2412 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e
))
2415 p2p
.RejectPeer("/foo")
2416 raise Exception("Invalid RejectPeer accepted")
2417 except dbus
.exceptions
.DBusException
, e
:
2418 if "InvalidArgs" not in str(e
):
2419 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e
))
2429 raise Exception("Invalid RemoveClient accepted")
2430 except dbus
.exceptions
.DBusException
, e
:
2431 if "InvalidArgs" not in str(e
):
2432 raise Exception("Unexpected error message for invalid RemoveClient(): " + str(e
))
2434 tests
= [ {'DiscoveryType': 'foo'},
2435 {'RequestedDeviceTypes': 'foo'},
2436 {'RequestedDeviceTypes': ['foo']},
2437 {'RequestedDeviceTypes': ['1','2','3','4','5','6','7','8','9',
2438 '10','11','12','13','14','15','16',
2440 {'RequestedDeviceTypes': dbus
.Array([], signature
="s")},
2441 {'RequestedDeviceTypes': dbus
.Array([['foo']], signature
="as")},
2442 {'RequestedDeviceTypes': dbus
.Array([], signature
="i")},
2443 {'RequestedDeviceTypes': [dbus
.ByteArray('12345678'),
2444 dbus
.ByteArray('1234567')]},
2445 {'Foo': dbus
.Int16(1)},
2446 {'Foo': dbus
.UInt16(1)},
2447 {'Foo': dbus
.Int64(1)},
2448 {'Foo': dbus
.UInt64(1)},
2449 {'Foo': dbus
.Double(1.23)},
2450 {'Foo': dbus
.Signature('s')},
2454 p2p
.Find(dbus
.Dictionary(t
))
2455 raise Exception("Invalid Find accepted")
2456 except dbus
.exceptions
.DBusException
, e
:
2457 if "InvalidArgs" not in str(e
):
2458 raise Exception("Unexpected error message for invalid Find(): " + str(e
))
2461 "/fi/w1/wpa_supplicant1/Interfaces/1234",
2462 "/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234" ]:
2464 p2p
.RemovePersistentGroup(dbus
.ObjectPath(p
))
2465 raise Exception("Invalid RemovePersistentGroup accepted")
2466 except dbus
.exceptions
.DBusException
, e
:
2467 if "InvalidArgs" not in str(e
):
2468 raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e
))
2471 dev
[0].request("P2P_SET disabled 1")
2473 raise Exception("Invalid Listen accepted")
2474 except dbus
.exceptions
.DBusException
, e
:
2475 if "UnknownError: Could not start P2P listen" not in str(e
):
2476 raise Exception("Unexpected error message for invalid Listen: " + str(e
))
2478 dev
[0].request("P2P_SET disabled 0")
2480 test_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, path
, introspect
=False)
2481 test_p2p
= dbus
.Interface(test_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
2483 test_p2p
.Listen("foo")
2484 raise Exception("Invalid Listen accepted")
2485 except dbus
.exceptions
.DBusException
, e
:
2486 if "InvalidArgs" not in str(e
):
2487 raise Exception("Unexpected error message for invalid Listen: " + str(e
))
2490 dev
[0].request("P2P_SET disabled 1")
2491 p2p
.ExtendedListen(dbus
.Dictionary({}))
2492 raise Exception("Invalid ExtendedListen accepted")
2493 except dbus
.exceptions
.DBusException
, e
:
2494 if "UnknownError: failed to initiate a p2p_ext_listen" not in str(e
):
2495 raise Exception("Unexpected error message for invalid ExtendedListen: " + str(e
))
2497 dev
[0].request("P2P_SET disabled 0")
2500 dev
[0].request("P2P_SET disabled 1")
2501 args
= { 'duration1': 30000, 'interval1': 102400,
2502 'duration2': 20000, 'interval2': 102400 }
2503 p2p
.PresenceRequest(args
)
2504 raise Exception("Invalid PresenceRequest accepted")
2505 except dbus
.exceptions
.DBusException
, e
:
2506 if "UnknownError: Failed to invoke presence request" not in str(e
):
2507 raise Exception("Unexpected error message for invalid PresenceRequest: " + str(e
))
2509 dev
[0].request("P2P_SET disabled 0")
2512 params
= dbus
.Dictionary({'frequency': dbus
.Int32(-1)})
2513 p2p
.GroupAdd(params
)
2514 raise Exception("Invalid GroupAdd accepted")
2515 except dbus
.exceptions
.DBusException
, e
:
2516 if "InvalidArgs" not in str(e
):
2517 raise Exception("Unexpected error message for invalid GroupAdd: " + str(e
))
2520 params
= dbus
.Dictionary({'persistent_group_object':
2521 dbus
.ObjectPath(path
),
2523 p2p
.GroupAdd(params
)
2524 raise Exception("Invalid GroupAdd accepted")
2525 except dbus
.exceptions
.DBusException
, e
:
2526 if "InvalidArgs" not in str(e
):
2527 raise Exception("Unexpected error message for invalid GroupAdd: " + str(e
))
2531 raise Exception("Invalid Disconnect accepted")
2532 except dbus
.exceptions
.DBusException
, e
:
2533 if "UnknownError: failed to disconnect" not in str(e
):
2534 raise Exception("Unexpected error message for invalid Disconnect: " + str(e
))
2537 dev
[0].request("P2P_SET disabled 1")
2539 raise Exception("Invalid Flush accepted")
2540 except dbus
.exceptions
.DBusException
, e
:
2541 if "Error.Failed: P2P is not available for this interface" not in str(e
):
2542 raise Exception("Unexpected error message for invalid Flush: " + str(e
))
2544 dev
[0].request("P2P_SET disabled 0")
2547 dev
[0].request("P2P_SET disabled 1")
2548 args
= { 'peer': path
,
2550 'wps_method': 'pbc',
2552 pin
= p2p
.Connect(args
)
2553 raise Exception("Invalid Connect accepted")
2554 except dbus
.exceptions
.DBusException
, e
:
2555 if "Error.Failed: P2P is not available for this interface" not in str(e
):
2556 raise Exception("Unexpected error message for invalid Connect: " + str(e
))
2558 dev
[0].request("P2P_SET disabled 0")
2560 tests
= [ { 'frequency': dbus
.Int32(-1) },
2561 { 'wps_method': 'pbc' },
2562 { 'wps_method': 'foo' } ]
2565 pin
= p2p
.Connect(args
)
2566 raise Exception("Invalid Connect accepted")
2567 except dbus
.exceptions
.DBusException
, e
:
2568 if "InvalidArgs" not in str(e
):
2569 raise Exception("Unexpected error message for invalid Connect: " + str(e
))
2572 dev
[0].request("P2P_SET disabled 1")
2573 args
= { 'peer': path
}
2574 pin
= p2p
.Invite(args
)
2575 raise Exception("Invalid Invite accepted")
2576 except dbus
.exceptions
.DBusException
, e
:
2577 if "Error.Failed: P2P is not available for this interface" not in str(e
):
2578 raise Exception("Unexpected error message for invalid Invite: " + str(e
))
2580 dev
[0].request("P2P_SET disabled 0")
2583 args
= { 'foo': 'bar' }
2584 pin
= p2p
.Invite(args
)
2585 raise Exception("Invalid Invite accepted")
2586 except dbus
.exceptions
.DBusException
, e
:
2587 if "InvalidArgs" not in str(e
):
2588 raise Exception("Unexpected error message for invalid Connect: " + str(e
))
2590 tests
= [ (path
, 'display', "InvalidArgs"),
2591 (dbus
.ObjectPath(path
+ "/Peers/00112233445566"),
2593 "UnknownError: Failed to send provision discovery request"),
2594 (dbus
.ObjectPath(path
+ "/Peers/00112233445566"),
2596 "UnknownError: Failed to send provision discovery request"),
2597 (dbus
.ObjectPath(path
+ "/Peers/00112233445566"),
2599 "UnknownError: Failed to send provision discovery request"),
2600 (dbus
.ObjectPath(path
+ "/Peers/00112233445566"),
2602 "UnknownError: Failed to send provision discovery request"),
2603 (dbus
.ObjectPath(path
+ "/Peers/00112233445566"),
2604 'foo', "InvalidArgs") ]
2605 for (p
,method
,err
) in tests
:
2607 p2p
.ProvisionDiscoveryRequest(p
, method
)
2608 raise Exception("Invalid ProvisionDiscoveryRequest accepted")
2609 except dbus
.exceptions
.DBusException
, e
:
2610 if err
not in str(e
):
2611 raise Exception("Unexpected error message for invalid ProvisionDiscoveryRequest: " + str(e
))
2614 dev
[0].request("P2P_SET disabled 1")
2615 if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "Peers",
2616 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2617 raise Exception("Invalid Get(Peers) accepted")
2618 except dbus
.exceptions
.DBusException
, e
:
2619 if "Error.Failed: P2P is not available for this interface" not in str(e
):
2620 raise Exception("Unexpected error message for invalid Get(Peers): " + str(e
))
2622 dev
[0].request("P2P_SET disabled 0")
2624 def test_dbus_p2p_oom(dev
, apdev
):
2625 """D-Bus P2P operations and OOM"""
2626 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2627 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
2629 with
alloc_fail_dbus(dev
[0], 1, "_wpa_dbus_dict_entry_get_string_array",
2630 "Find", "InvalidArgs"):
2631 p2p
.Find(dbus
.Dictionary({ 'Foo': [ 'bar' ] }))
2633 with
alloc_fail_dbus(dev
[0], 2, "_wpa_dbus_dict_entry_get_string_array",
2634 "Find", "InvalidArgs"):
2635 p2p
.Find(dbus
.Dictionary({ 'Foo': [ 'bar' ] }))
2637 with
alloc_fail_dbus(dev
[0], 10, "_wpa_dbus_dict_entry_get_string_array",
2638 "Find", "InvalidArgs"):
2639 p2p
.Find(dbus
.Dictionary({ 'Foo': [ '1','2','3','4','5','6','7','8','9' ] }))
2641 with
alloc_fail_dbus(dev
[0], 1, ":=_wpa_dbus_dict_entry_get_binarray",
2642 "Find", "InvalidArgs"):
2643 p2p
.Find(dbus
.Dictionary({ 'Foo': [ dbus
.ByteArray('123') ] }))
2645 with
alloc_fail_dbus(dev
[0], 1, "_wpa_dbus_dict_entry_get_byte_array;_wpa_dbus_dict_entry_get_binarray",
2646 "Find", "InvalidArgs"):
2647 p2p
.Find(dbus
.Dictionary({ 'Foo': [ dbus
.ByteArray('123') ] }))
2649 with
alloc_fail_dbus(dev
[0], 2, "=_wpa_dbus_dict_entry_get_binarray",
2650 "Find", "InvalidArgs"):
2651 p2p
.Find(dbus
.Dictionary({ 'Foo': [ dbus
.ByteArray('123'),
2652 dbus
.ByteArray('123'),
2653 dbus
.ByteArray('123'),
2654 dbus
.ByteArray('123'),
2655 dbus
.ByteArray('123'),
2656 dbus
.ByteArray('123'),
2657 dbus
.ByteArray('123'),
2658 dbus
.ByteArray('123'),
2659 dbus
.ByteArray('123'),
2660 dbus
.ByteArray('123'),
2661 dbus
.ByteArray('123') ] }))
2663 with
alloc_fail_dbus(dev
[0], 1, "wpabuf_alloc_ext_data;_wpa_dbus_dict_entry_get_binarray",
2664 "Find", "InvalidArgs"):
2665 p2p
.Find(dbus
.Dictionary({ 'Foo': [ dbus
.ByteArray('123') ] }))
2667 with
alloc_fail_dbus(dev
[0], 1, "_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_handler_p2p_find",
2668 "Find", "InvalidArgs"):
2669 p2p
.Find(dbus
.Dictionary({ 'Foo': path
}))
2671 with
alloc_fail_dbus(dev
[0], 1, "_wpa_dbus_dict_entry_get_byte_array",
2672 "AddService", "InvalidArgs"):
2673 args
= { 'service_type': 'bonjour',
2674 'response': dbus
.ByteArray(500*'b') }
2675 p2p
.AddService(args
)
2677 with
alloc_fail_dbus(dev
[0], 2, "_wpa_dbus_dict_entry_get_byte_array",
2678 "AddService", "InvalidArgs"):
2679 p2p
.AddService(args
)
2681 def test_dbus_p2p_discovery(dev
, apdev
):
2682 """D-Bus P2P discovery"""
2683 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2684 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
2686 addr0
= dev
[0].p2p_dev_addr()
2688 dev
[1].request("SET sec_device_type 1-0050F204-2")
2689 dev
[1].request("VENDOR_ELEM_ADD 1 dd0c0050f2041049000411223344")
2691 addr1
= dev
[1].p2p_dev_addr()
2692 a1
= binascii
.unhexlify(addr1
.replace(':',''))
2694 wfd_devinfo
= "00001c440028"
2695 dev
[2].request("SET wifi_display 1")
2696 dev
[2].request("WFD_SUBELEM_SET 0 0006" + wfd_devinfo
)
2697 wfd
= binascii
.unhexlify('000006' + wfd_devinfo
)
2699 addr2
= dev
[2].p2p_dev_addr()
2700 a2
= binascii
.unhexlify(addr2
.replace(':',''))
2702 res
= if_obj
.GetAll(WPAS_DBUS_IFACE_P2PDEVICE
,
2703 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2704 if 'Peers' not in res
:
2705 raise Exception("GetAll result missing Peers")
2706 if len(res
['Peers']) != 0:
2707 raise Exception("Unexpected peer(s) in the list")
2709 args
= {'DiscoveryType': 'social',
2710 'RequestedDeviceTypes': [dbus
.ByteArray('12345678')],
2711 'Timeout': dbus
.Int32(1) }
2712 p2p
.Find(dbus
.Dictionary(args
))
2715 class TestDbusP2p(TestDbus
):
2716 def __init__(self
, bus
):
2717 TestDbus
.__init
__(self
, bus
)
2721 self
.find_stopped
= False
2723 def __enter__(self
):
2724 gobject
.timeout_add(1, self
.run_test
)
2725 gobject
.timeout_add(15000, self
.timeout
)
2726 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
2728 self
.add_signal(self
.deviceLost
, WPAS_DBUS_IFACE_P2PDEVICE
,
2730 self
.add_signal(self
.provisionDiscoveryResponseEnterPin
,
2731 WPAS_DBUS_IFACE_P2PDEVICE
,
2732 "ProvisionDiscoveryResponseEnterPin")
2733 self
.add_signal(self
.findStopped
, WPAS_DBUS_IFACE_P2PDEVICE
,
2738 def deviceFound(self
, path
):
2739 logger
.debug("deviceFound: path=%s" % path
)
2740 res
= if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "Peers",
2741 dbus_interface
=dbus
.PROPERTIES_IFACE
)
2743 raise Exception("Unexpected number of peers")
2745 raise Exception("Mismatch in peer object path")
2746 peer_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, path
)
2747 res
= peer_obj
.GetAll(WPAS_DBUS_P2P_PEER
,
2748 dbus_interface
=dbus
.PROPERTIES_IFACE
,
2750 logger
.debug("peer properties: " + str(res
))
2752 if res
['DeviceAddress'] == a1
:
2753 if 'SecondaryDeviceTypes' not in res
:
2754 raise Exception("Missing SecondaryDeviceTypes")
2755 sec
= res
['SecondaryDeviceTypes']
2757 raise Exception("Secondary device type missing")
2758 if "\x00\x01\x00\x50\xF2\x04\x00\x02" not in sec
:
2759 raise Exception("Secondary device type mismatch")
2761 if 'VendorExtension' not in res
:
2762 raise Exception("Missing VendorExtension")
2763 vendor
= res
['VendorExtension']
2765 raise Exception("Vendor extension missing")
2766 if "\x11\x22\x33\x44" not in vendor
:
2767 raise Exception("Secondary device type mismatch")
2770 elif res
['DeviceAddress'] == a2
:
2771 if 'IEs' not in res
:
2772 raise Exception("IEs missing")
2773 if res
['IEs'] != wfd
:
2774 raise Exception("IEs mismatch")
2777 raise Exception("Unexpected peer device address")
2779 if self
.found
and self
.found2
:
2781 p2p
.RejectPeer(path
)
2782 p2p
.ProvisionDiscoveryRequest(path
, 'display')
2784 def deviceLost(self
, path
):
2785 logger
.debug("deviceLost: path=%s" % path
)
2788 p2p
.RejectPeer(path
)
2789 raise Exception("Invalid RejectPeer accepted")
2790 except dbus
.exceptions
.DBusException
, e
:
2791 if "UnknownError: Failed to call wpas_p2p_reject" not in str(e
):
2792 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e
))
2795 def provisionDiscoveryResponseEnterPin(self
, peer_object
):
2796 logger
.debug("provisionDiscoveryResponseEnterPin - peer=%s" % peer_object
)
2799 def findStopped(self
):
2800 logger
.debug("findStopped")
2801 self
.find_stopped
= True
2803 def run_test(self
, *args
):
2804 logger
.debug("run_test")
2805 p2p
.Find(dbus
.Dictionary({'DiscoveryType': 'social',
2806 'Timeout': dbus
.Int32(10)}))
2810 return self
.found
and self
.lost
and self
.found2
and self
.find_stopped
2812 with
TestDbusP2p(bus
) as t
:
2814 raise Exception("Expected signals not seen")
2816 dev
[1].request("VENDOR_ELEM_REMOVE 1 *")
2817 dev
[1].p2p_stop_find()
2820 dev
[2].p2p_stop_find()
2821 dev
[2].request("P2P_FLUSH")
2822 if not dev
[2].discover_peer(addr0
):
2823 raise Exception("Peer not found")
2825 dev
[2].p2p_stop_find()
2828 p2p
.ExtendedListen(dbus
.Dictionary({'foo': 100}))
2829 raise Exception("Invalid ExtendedListen accepted")
2830 except dbus
.exceptions
.DBusException
, e
:
2831 if "InvalidArgs" not in str(e
):
2832 raise Exception("Unexpected error message for invalid ExtendedListen(): " + str(e
))
2834 p2p
.ExtendedListen(dbus
.Dictionary({'period': 100, 'interval': 1000}))
2835 p2p
.ExtendedListen(dbus
.Dictionary({}))
2836 dev
[0].global_request("P2P_EXT_LISTEN")
2838 def test_dbus_p2p_service_discovery(dev
, apdev
):
2839 """D-Bus P2P service discovery"""
2840 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2841 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
2843 addr0
= dev
[0].p2p_dev_addr()
2844 addr1
= dev
[1].p2p_dev_addr()
2846 bonjour_query
= dbus
.ByteArray(binascii
.unhexlify('0b5f6166706f766572746370c00c000c01'))
2847 bonjour_response
= dbus
.ByteArray(binascii
.unhexlify('074578616d706c65c027'))
2849 args
= { 'service_type': 'bonjour',
2850 'query': bonjour_query
,
2851 'response': bonjour_response
}
2852 p2p
.AddService(args
)
2854 p2p
.AddService(args
)
2857 p2p
.DeleteService(args
)
2858 raise Exception("Invalid DeleteService() accepted")
2859 except dbus
.exceptions
.DBusException
, e
:
2860 if "InvalidArgs" not in str(e
):
2861 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e
))
2863 args
= { 'service_type': 'bonjour',
2864 'query': bonjour_query
}
2865 p2p
.DeleteService(args
)
2867 p2p
.DeleteService(args
)
2868 raise Exception("Invalid DeleteService() accepted")
2869 except dbus
.exceptions
.DBusException
, e
:
2870 if "InvalidArgs" not in str(e
):
2871 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e
))
2873 args
= { 'service_type': 'upnp',
2875 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' }
2876 p2p
.AddService(args
)
2877 p2p
.DeleteService(args
)
2879 p2p
.DeleteService(args
)
2880 raise Exception("Invalid DeleteService() accepted")
2881 except dbus
.exceptions
.DBusException
, e
:
2882 if "InvalidArgs" not in str(e
):
2883 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e
))
2885 tests
= [ { 'service_type': 'foo' },
2886 { 'service_type': 'foo', 'query': bonjour_query
},
2887 { 'service_type': 'upnp' },
2888 { 'service_type': 'upnp', 'version': 0x10 },
2889 { 'service_type': 'upnp',
2890 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2892 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2893 { 'service_type': 'upnp', 'foo': 'bar' },
2894 { 'service_type': 'bonjour' },
2895 { 'service_type': 'bonjour', 'query': 'foo' },
2896 { 'service_type': 'bonjour', 'foo': 'bar' } ]
2899 p2p
.DeleteService(args
)
2900 raise Exception("Invalid DeleteService() accepted")
2901 except dbus
.exceptions
.DBusException
, e
:
2902 if "InvalidArgs" not in str(e
):
2903 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e
))
2905 tests
= [ { 'service_type': 'foo' },
2906 { 'service_type': 'upnp' },
2907 { 'service_type': 'upnp', 'version': 0x10 },
2908 { 'service_type': 'upnp',
2909 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2911 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice' },
2912 { 'service_type': 'upnp', 'foo': 'bar' },
2913 { 'service_type': 'bonjour' },
2914 { 'service_type': 'bonjour', 'query': 'foo' },
2915 { 'service_type': 'bonjour', 'response': 'foo' },
2916 { 'service_type': 'bonjour', 'query': bonjour_query
},
2917 { 'service_type': 'bonjour', 'response': bonjour_response
},
2918 { 'service_type': 'bonjour', 'query': dbus
.ByteArray(500*'a') },
2919 { 'service_type': 'bonjour', 'foo': 'bar' } ]
2922 p2p
.AddService(args
)
2923 raise Exception("Invalid AddService() accepted")
2924 except dbus
.exceptions
.DBusException
, e
:
2925 if "InvalidArgs" not in str(e
):
2926 raise Exception("Unexpected error message for invalid AddService(): " + str(e
))
2928 args
= { 'tlv': dbus
.ByteArray("\x02\x00\x00\x01") }
2929 ref
= p2p
.ServiceDiscoveryRequest(args
)
2930 p2p
.ServiceDiscoveryCancelRequest(ref
)
2932 p2p
.ServiceDiscoveryCancelRequest(ref
)
2933 raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
2934 except dbus
.exceptions
.DBusException
, e
:
2935 if "InvalidArgs" not in str(e
):
2936 raise Exception("Unexpected error message for invalid AddService(): " + str(e
))
2938 p2p
.ServiceDiscoveryCancelRequest(dbus
.UInt64(0))
2939 raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
2940 except dbus
.exceptions
.DBusException
, e
:
2941 if "InvalidArgs" not in str(e
):
2942 raise Exception("Unexpected error message for invalid AddService(): " + str(e
))
2944 args
= { 'service_type': 'upnp',
2946 'service': 'ssdp:foo' }
2947 ref
= p2p
.ServiceDiscoveryRequest(args
)
2948 p2p
.ServiceDiscoveryCancelRequest(ref
)
2950 tests
= [ { 'service_type': 'foo' },
2955 { 'service_type': 'upnp',
2956 'service': 'ssdp:foo' },
2957 { 'service_type': 'upnp',
2959 { 'service_type': 'upnp',
2961 'service': 'ssdp:foo',
2962 'peer_object': dbus
.ObjectPath(path
+ "/Peers") },
2963 { 'service_type': 'upnp',
2965 'service': 'ssdp:foo',
2966 'peer_object': path
+ "/Peers" },
2967 { 'service_type': 'upnp',
2969 'service': 'ssdp:foo',
2970 'peer_object': dbus
.ObjectPath(path
+ "/Peers/00112233445566") } ]
2973 p2p
.ServiceDiscoveryRequest(args
)
2974 raise Exception("Invalid ServiceDiscoveryRequest accepted")
2975 except dbus
.exceptions
.DBusException
, e
:
2976 if "InvalidArgs" not in str(e
):
2977 raise Exception("Unexpected error message for invalid ServiceDiscoveryRequest(): " + str(e
))
2979 args
= { 'foo': 'bar' }
2981 p2p
.ServiceDiscoveryResponse(dbus
.Dictionary(args
, signature
='sv'))
2982 raise Exception("Invalid ServiceDiscoveryResponse accepted")
2983 except dbus
.exceptions
.DBusException
, e
:
2984 if "InvalidArgs" not in str(e
):
2985 raise Exception("Unexpected error message for invalid ServiceDiscoveryResponse(): " + str(e
))
2987 def test_dbus_p2p_service_discovery_query(dev
, apdev
):
2988 """D-Bus P2P service discovery query"""
2989 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
2990 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
2992 addr0
= dev
[0].p2p_dev_addr()
2993 dev
[1].request("P2P_SERVICE_ADD bonjour 0b5f6166706f766572746370c00c000c01 074578616d706c65c027")
2995 addr1
= dev
[1].p2p_dev_addr()
2997 class TestDbusP2p(TestDbus
):
2998 def __init__(self
, bus
):
2999 TestDbus
.__init
__(self
, bus
)
3002 def __enter__(self
):
3003 gobject
.timeout_add(1, self
.run_test
)
3004 gobject
.timeout_add(15000, self
.timeout
)
3005 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
3007 self
.add_signal(self
.serviceDiscoveryResponse
,
3008 WPAS_DBUS_IFACE_P2PDEVICE
,
3009 "ServiceDiscoveryResponse", byte_arrays
=True)
3013 def deviceFound(self
, path
):
3014 logger
.debug("deviceFound: path=%s" % path
)
3015 args
= { 'peer_object': path
,
3016 'tlv': dbus
.ByteArray("\x02\x00\x00\x01") }
3017 p2p
.ServiceDiscoveryRequest(args
)
3019 def serviceDiscoveryResponse(self
, sd_request
):
3020 logger
.debug("serviceDiscoveryResponse: sd_request=%s" % str(sd_request
))
3024 def run_test(self
, *args
):
3025 logger
.debug("run_test")
3026 p2p
.Find(dbus
.Dictionary({'DiscoveryType': 'social',
3027 'Timeout': dbus
.Int32(10)}))
3033 with
TestDbusP2p(bus
) as t
:
3035 raise Exception("Expected signals not seen")
3037 dev
[1].p2p_stop_find()
3039 def test_dbus_p2p_service_discovery_external(dev
, apdev
):
3040 """D-Bus P2P service discovery with external response"""
3042 _test_dbus_p2p_service_discovery_external(dev
, apdev
)
3044 dev
[0].request("P2P_SERV_DISC_EXTERNAL 0")
3046 def _test_dbus_p2p_service_discovery_external(dev
, apdev
):
3047 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
3048 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
3050 addr0
= dev
[0].p2p_dev_addr()
3051 addr1
= dev
[1].p2p_dev_addr()
3054 dev
[1].request("P2P_FLUSH")
3055 dev
[1].request("P2P_SERV_DISC_REQ " + addr0
+ " 02000001")
3056 dev
[1].p2p_find(social
=True)
3058 class TestDbusP2p(TestDbus
):
3059 def __init__(self
, bus
):
3060 TestDbus
.__init
__(self
, bus
)
3063 def __enter__(self
):
3064 gobject
.timeout_add(1, self
.run_test
)
3065 gobject
.timeout_add(15000, self
.timeout
)
3066 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
3068 self
.add_signal(self
.serviceDiscoveryRequest
,
3069 WPAS_DBUS_IFACE_P2PDEVICE
,
3070 "ServiceDiscoveryRequest")
3074 def deviceFound(self
, path
):
3075 logger
.debug("deviceFound: path=%s" % path
)
3077 def serviceDiscoveryRequest(self
, sd_request
):
3078 logger
.debug("serviceDiscoveryRequest: sd_request=%s" % str(sd_request
))
3080 args
= { 'peer_object': sd_request
['peer_object'],
3081 'frequency': sd_request
['frequency'],
3082 'dialog_token': sd_request
['dialog_token'],
3083 'tlvs': dbus
.ByteArray(binascii
.unhexlify(resp
)) }
3084 p2p
.ServiceDiscoveryResponse(dbus
.Dictionary(args
, signature
='sv'))
3087 def run_test(self
, *args
):
3088 logger
.debug("run_test")
3089 p2p
.ServiceDiscoveryExternal(1)
3097 with
TestDbusP2p(bus
) as t
:
3099 raise Exception("Expected signals not seen")
3101 ev
= dev
[1].wait_global_event(["P2P-SERV-DISC-RESP"], timeout
=5)
3103 raise Exception("Service discovery timed out")
3105 raise Exception("Unexpected address in SD Response: " + ev
)
3106 if ev
.split(' ')[4] != resp
:
3107 raise Exception("Unexpected response data SD Response: " + ev
)
3108 dev
[1].p2p_stop_find()
3111 p2p
.ServiceDiscoveryExternal(0)
3113 def test_dbus_p2p_autogo(dev
, apdev
):
3114 """D-Bus P2P autonomous GO"""
3115 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
3116 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
3118 addr0
= dev
[0].p2p_dev_addr()
3120 class TestDbusP2p(TestDbus
):
3121 def __init__(self
, bus
):
3122 TestDbus
.__init
__(self
, bus
)
3124 self
.waiting_end
= False
3125 self
.exceptions
= False
3126 self
.deauthorized
= False
3129 def __enter__(self
):
3130 gobject
.timeout_add(1, self
.run_test
)
3131 gobject
.timeout_add(15000, self
.timeout
)
3132 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
3134 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
3136 self
.add_signal(self
.groupFinished
, WPAS_DBUS_IFACE_P2PDEVICE
,
3138 self
.add_signal(self
.persistentGroupAdded
,
3139 WPAS_DBUS_IFACE_P2PDEVICE
,
3140 "PersistentGroupAdded")
3141 self
.add_signal(self
.persistentGroupRemoved
,
3142 WPAS_DBUS_IFACE_P2PDEVICE
,
3143 "PersistentGroupRemoved")
3144 self
.add_signal(self
.provisionDiscoveryRequestDisplayPin
,
3145 WPAS_DBUS_IFACE_P2PDEVICE
,
3146 "ProvisionDiscoveryRequestDisplayPin")
3147 self
.add_signal(self
.staAuthorized
, WPAS_DBUS_IFACE
,
3149 self
.add_signal(self
.staDeauthorized
, WPAS_DBUS_IFACE
,
3154 def groupStarted(self
, properties
):
3155 logger
.debug("groupStarted: " + str(properties
))
3156 self
.group
= properties
['group_object']
3157 self
.g_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
3158 properties
['interface_object'])
3159 role
= self
.g_if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "Role",
3160 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3162 self
.exceptions
= True
3163 raise Exception("Unexpected role reported: " + role
)
3164 group
= self
.g_if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "Group",
3165 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3166 if group
!= properties
['group_object']:
3167 self
.exceptions
= True
3168 raise Exception("Unexpected Group reported: " + str(group
))
3169 go
= self
.g_if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "PeerGO",
3170 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3172 self
.exceptions
= True
3173 raise Exception("Unexpected PeerGO value: " + str(go
))
3176 logger
.info("Remove persistent group instance")
3177 group_p2p
= dbus
.Interface(self
.g_if_obj
,
3178 WPAS_DBUS_IFACE_P2PDEVICE
)
3179 group_p2p
.Disconnect()
3181 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3182 dev1
.global_request("P2P_CONNECT " + addr0
+ " 12345670 join")
3184 def groupFinished(self
, properties
):
3185 logger
.debug("groupFinished: " + str(properties
))
3186 if self
.waiting_end
:
3187 logger
.info("Remove persistent group")
3188 p2p
.RemovePersistentGroup(self
.persistent
)
3190 logger
.info("Re-start persistent group")
3191 params
= dbus
.Dictionary({'persistent_group_object':
3194 p2p
.GroupAdd(params
)
3196 def persistentGroupAdded(self
, path
, properties
):
3197 logger
.debug("persistentGroupAdded: %s %s" % (path
, str(properties
)))
3198 self
.persistent
= path
3200 def persistentGroupRemoved(self
, path
):
3201 logger
.debug("persistentGroupRemoved: %s" % path
)
3205 def deviceFound(self
, path
):
3206 logger
.debug("deviceFound: path=%s" % path
)
3207 peer_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, path
)
3208 self
.peer
= peer_obj
.GetAll(WPAS_DBUS_P2P_PEER
,
3209 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3211 logger
.debug('peer properties: ' + str(self
.peer
))
3213 def provisionDiscoveryRequestDisplayPin(self
, peer_object
, pin
):
3214 logger
.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object
, pin
))
3215 self
.peer_path
= peer_object
3216 peer
= binascii
.unhexlify(peer_object
.split('/')[-1])
3221 addr
+= '%02x' % ord(p
)
3223 params
= { 'Role': 'registrar',
3224 'P2PDeviceAddress': self
.peer
['DeviceAddress'],
3225 'Bssid': self
.peer
['DeviceAddress'],
3227 wps
= dbus
.Interface(self
.g_if_obj
, WPAS_DBUS_IFACE_WPS
)
3230 self
.exceptions
= True
3231 raise Exception("Invalid WPS.Start() accepted")
3232 except dbus
.exceptions
.DBusException
, e
:
3233 if "InvalidArgs" not in str(e
):
3234 self
.exceptions
= True
3235 raise Exception("Unexpected error message: " + str(e
))
3236 params
= { 'Role': 'registrar',
3237 'P2PDeviceAddress': self
.peer
['DeviceAddress'],
3240 logger
.info("Authorize peer to connect to the group")
3243 def staAuthorized(self
, name
):
3244 logger
.debug("staAuthorized: " + name
)
3245 peer_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, self
.peer_path
)
3246 res
= peer_obj
.GetAll(WPAS_DBUS_P2P_PEER
,
3247 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3249 logger
.debug("Peer properties: " + str(res
))
3250 if 'Groups' not in res
or len(res
['Groups']) != 1:
3251 self
.exceptions
= True
3252 raise Exception("Unexpected number of peer Groups entries")
3253 if res
['Groups'][0] != self
.group
:
3254 self
.exceptions
= True
3255 raise Exception("Unexpected peer Groups[0] value")
3257 g_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, self
.group
)
3258 res
= g_obj
.GetAll(WPAS_DBUS_GROUP
,
3259 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3261 logger
.debug("Group properties: " + str(res
))
3262 if 'Members' not in res
or len(res
['Members']) != 1:
3263 self
.exceptions
= True
3264 raise Exception("Unexpected number of group members")
3266 ext
= dbus
.ByteArray("\x11\x22\x33\x44")
3267 # Earlier implementation of this interface was a bit strange. The
3268 # property is defined to have aay signature and that is what the
3269 # getter returned. However, the setter expected there to be a
3270 # dictionary with 'WPSVendorExtensions' as the key surrounding these
3271 # values.. The current implementations maintains support for that
3272 # for backwards compability reasons. Verify that encoding first.
3273 vals
= dbus
.Dictionary({ 'WPSVendorExtensions': [ ext
]},
3275 g_obj
.Set(WPAS_DBUS_GROUP
, 'WPSVendorExtensions', vals
,
3276 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3277 res
= g_obj
.Get(WPAS_DBUS_GROUP
, 'WPSVendorExtensions',
3278 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3281 self
.exceptions
= True
3282 raise Exception("Unexpected number of vendor extensions")
3284 self
.exceptions
= True
3285 raise Exception("Vendor extension value changed")
3287 # And now verify that the more appropriate encoding is accepted as
3289 res
.append(dbus
.ByteArray('\xaa\xbb\xcc\xdd\xee\xff'))
3290 g_obj
.Set(WPAS_DBUS_GROUP
, 'WPSVendorExtensions', res
,
3291 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3292 res2
= g_obj
.Get(WPAS_DBUS_GROUP
, 'WPSVendorExtensions',
3293 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3296 self
.exceptions
= True
3297 raise Exception("Unexpected number of vendor extensions")
3298 if res
[0] != res2
[0] or res
[1] != res2
[1]:
3299 self
.exceptions
= True
3300 raise Exception("Vendor extension value changed")
3303 res
.append(dbus
.ByteArray('\xaa\xbb'))
3305 g_obj
.Set(WPAS_DBUS_GROUP
, 'WPSVendorExtensions', res
,
3306 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3307 self
.exceptions
= True
3308 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3309 except dbus
.exceptions
.DBusException
, e
:
3310 if "Error.Failed" not in str(e
):
3311 self
.exceptions
= True
3312 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e
))
3314 vals
= dbus
.Dictionary({ 'Foo': [ ext
]}, signature
='sv')
3316 g_obj
.Set(WPAS_DBUS_GROUP
, 'WPSVendorExtensions', vals
,
3317 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3318 self
.exceptions
= True
3319 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3320 except dbus
.exceptions
.DBusException
, e
:
3321 if "InvalidArgs" not in str(e
):
3322 self
.exceptions
= True
3323 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e
))
3327 g_obj
.Set(WPAS_DBUS_GROUP
, 'WPSVendorExtensions', vals
,
3328 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3329 self
.exceptions
= True
3330 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3331 except dbus
.exceptions
.DBusException
, e
:
3332 if "Error.Failed" not in str(e
):
3333 self
.exceptions
= True
3334 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e
))
3336 vals
= [ [ "foo" ] ]
3338 g_obj
.Set(WPAS_DBUS_GROUP
, 'WPSVendorExtensions', vals
,
3339 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3340 self
.exceptions
= True
3341 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3342 except dbus
.exceptions
.DBusException
, e
:
3343 if "Error.Failed" not in str(e
):
3344 self
.exceptions
= True
3345 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e
))
3347 p2p
.RemoveClient({ 'peer': self
.peer_path
})
3349 self
.waiting_end
= True
3350 group_p2p
= dbus
.Interface(self
.g_if_obj
,
3351 WPAS_DBUS_IFACE_P2PDEVICE
)
3352 group_p2p
.Disconnect()
3354 def staDeauthorized(self
, name
):
3355 logger
.debug("staDeauthorized: " + name
)
3356 self
.deauthorized
= True
3358 def run_test(self
, *args
):
3359 logger
.debug("run_test")
3360 params
= dbus
.Dictionary({'persistent': True,
3362 logger
.info("Add a persistent group")
3363 p2p
.GroupAdd(params
)
3367 return self
.done
and self
.deauthorized
and not self
.exceptions
3369 with
TestDbusP2p(bus
) as t
:
3371 raise Exception("Expected signals not seen")
3373 dev
[1].wait_go_ending_session()
3375 def test_dbus_p2p_autogo_pbc(dev
, apdev
):
3376 """D-Bus P2P autonomous GO and PBC"""
3377 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
3378 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
3380 addr0
= dev
[0].p2p_dev_addr()
3382 class TestDbusP2p(TestDbus
):
3383 def __init__(self
, bus
):
3384 TestDbus
.__init
__(self
, bus
)
3386 self
.waiting_end
= False
3389 def __enter__(self
):
3390 gobject
.timeout_add(1, self
.run_test
)
3391 gobject
.timeout_add(15000, self
.timeout
)
3392 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
3394 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
3396 self
.add_signal(self
.groupFinished
, WPAS_DBUS_IFACE_P2PDEVICE
,
3398 self
.add_signal(self
.provisionDiscoveryPBCRequest
,
3399 WPAS_DBUS_IFACE_P2PDEVICE
,
3400 "ProvisionDiscoveryPBCRequest")
3401 self
.add_signal(self
.staAuthorized
, WPAS_DBUS_IFACE
,
3406 def groupStarted(self
, properties
):
3407 logger
.debug("groupStarted: " + str(properties
))
3408 self
.group
= properties
['group_object']
3409 self
.g_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
3410 properties
['interface_object'])
3411 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3412 dev1
.global_request("P2P_CONNECT " + addr0
+ " pbc join")
3414 def groupFinished(self
, properties
):
3415 logger
.debug("groupFinished: " + str(properties
))
3419 def deviceFound(self
, path
):
3420 logger
.debug("deviceFound: path=%s" % path
)
3421 peer_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, path
)
3422 self
.peer
= peer_obj
.GetAll(WPAS_DBUS_P2P_PEER
,
3423 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3425 logger
.debug('peer properties: ' + str(self
.peer
))
3427 def provisionDiscoveryPBCRequest(self
, peer_object
):
3428 logger
.debug("provisionDiscoveryPBCRequest - peer=%s" % peer_object
)
3429 self
.peer_path
= peer_object
3430 peer
= binascii
.unhexlify(peer_object
.split('/')[-1])
3435 addr
+= '%02x' % ord(p
)
3436 params
= { 'Role': 'registrar',
3437 'P2PDeviceAddress': self
.peer
['DeviceAddress'],
3439 logger
.info("Authorize peer to connect to the group")
3440 wps
= dbus
.Interface(self
.g_if_obj
, WPAS_DBUS_IFACE_WPS
)
3443 def staAuthorized(self
, name
):
3444 logger
.debug("staAuthorized: " + name
)
3445 group_p2p
= dbus
.Interface(self
.g_if_obj
,
3446 WPAS_DBUS_IFACE_P2PDEVICE
)
3447 group_p2p
.Disconnect()
3449 def run_test(self
, *args
):
3450 logger
.debug("run_test")
3451 params
= dbus
.Dictionary({'frequency': 2412})
3452 p2p
.GroupAdd(params
)
3458 with
TestDbusP2p(bus
) as t
:
3460 raise Exception("Expected signals not seen")
3462 dev
[1].wait_go_ending_session()
3463 dev
[1].flush_scan_cache()
3465 def test_dbus_p2p_autogo_legacy(dev
, apdev
):
3466 """D-Bus P2P autonomous GO and legacy STA"""
3467 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
3468 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
3470 addr0
= dev
[0].p2p_dev_addr()
3472 class TestDbusP2p(TestDbus
):
3473 def __init__(self
, bus
):
3474 TestDbus
.__init
__(self
, bus
)
3477 def __enter__(self
):
3478 gobject
.timeout_add(1, self
.run_test
)
3479 gobject
.timeout_add(15000, self
.timeout
)
3480 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
3482 self
.add_signal(self
.groupFinished
, WPAS_DBUS_IFACE_P2PDEVICE
,
3484 self
.add_signal(self
.staAuthorized
, WPAS_DBUS_IFACE
,
3489 def groupStarted(self
, properties
):
3490 logger
.debug("groupStarted: " + str(properties
))
3491 g_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
3492 properties
['group_object'])
3493 res
= g_obj
.GetAll(WPAS_DBUS_GROUP
,
3494 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3496 bssid
= ':'.join([binascii
.hexlify(l
) for l
in res
['BSSID']])
3499 params
= { 'Role': 'enrollee',
3502 g_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
3503 properties
['interface_object'])
3504 wps
= dbus
.Interface(g_if_obj
, WPAS_DBUS_IFACE_WPS
)
3506 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3507 dev1
.scan_for_bss(bssid
, freq
=2412)
3508 dev1
.request("WPS_PIN " + bssid
+ " " + pin
)
3509 self
.group_p2p
= dbus
.Interface(g_if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
3511 def groupFinished(self
, properties
):
3512 logger
.debug("groupFinished: " + str(properties
))
3516 def staAuthorized(self
, name
):
3517 logger
.debug("staAuthorized: " + name
)
3518 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3519 dev1
.request("DISCONNECT")
3520 self
.group_p2p
.Disconnect()
3522 def run_test(self
, *args
):
3523 logger
.debug("run_test")
3524 params
= dbus
.Dictionary({'frequency': 2412})
3525 p2p
.GroupAdd(params
)
3531 with
TestDbusP2p(bus
) as t
:
3533 raise Exception("Expected signals not seen")
3535 def test_dbus_p2p_join(dev
, apdev
):
3536 """D-Bus P2P join an autonomous GO"""
3537 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
3538 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
3540 addr1
= dev
[1].p2p_dev_addr()
3541 addr2
= dev
[2].p2p_dev_addr()
3542 dev
[1].p2p_start_go(freq
=2412)
3543 dev1_group_ifname
= dev
[1].group_ifname
3546 class TestDbusP2p(TestDbus
):
3547 def __init__(self
, bus
):
3548 TestDbus
.__init
__(self
, bus
)
3553 def __enter__(self
):
3554 gobject
.timeout_add(1, self
.run_test
)
3555 gobject
.timeout_add(15000, self
.timeout
)
3556 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
3558 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
3560 self
.add_signal(self
.groupFinished
, WPAS_DBUS_IFACE_P2PDEVICE
,
3562 self
.add_signal(self
.invitationResult
, WPAS_DBUS_IFACE_P2PDEVICE
,
3567 def deviceFound(self
, path
):
3568 logger
.debug("deviceFound: path=%s" % path
)
3569 peer_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, path
)
3570 res
= peer_obj
.GetAll(WPAS_DBUS_P2P_PEER
,
3571 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3573 logger
.debug('peer properties: ' + str(res
))
3574 if addr2
.replace(':','') in path
:
3576 elif addr1
.replace(':','') in path
:
3578 if self
.peer
and self
.go
:
3579 logger
.info("Join the group")
3581 args
= { 'peer': self
.go
,
3583 'wps_method': 'pin',
3585 pin
= p2p
.Connect(args
)
3587 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3588 dev1
.group_ifname
= dev1_group_ifname
3589 dev1
.group_request("WPS_PIN any " + pin
)
3591 def groupStarted(self
, properties
):
3592 logger
.debug("groupStarted: " + str(properties
))
3593 g_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
3594 properties
['interface_object'])
3595 role
= g_if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "Role",
3596 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3597 if role
!= "client":
3598 raise Exception("Unexpected role reported: " + role
)
3599 group
= g_if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "Group",
3600 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3601 if group
!= properties
['group_object']:
3602 raise Exception("Unexpected Group reported: " + str(group
))
3603 go
= g_if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "PeerGO",
3604 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3606 raise Exception("Unexpected PeerGO value: " + str(go
))
3608 g_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
3609 properties
['group_object'])
3610 res
= g_obj
.GetAll(WPAS_DBUS_GROUP
,
3611 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3613 logger
.debug("Group properties: " + str(res
))
3615 ext
= dbus
.ByteArray("\x11\x22\x33\x44")
3617 # Set(WPSVendorExtensions) not allowed for P2P Client
3618 g_obj
.Set(WPAS_DBUS_GROUP
, 'WPSVendorExtensions', res
,
3619 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3620 raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3621 except dbus
.exceptions
.DBusException
, e
:
3622 if "Error.Failed: Failed to set property" not in str(e
):
3623 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e
))
3625 group_p2p
= dbus
.Interface(g_if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
3626 args
= { 'duration1': 30000, 'interval1': 102400,
3627 'duration2': 20000, 'interval2': 102400 }
3628 group_p2p
.PresenceRequest(args
)
3630 args
= { 'peer': self
.peer
}
3631 group_p2p
.Invite(args
)
3633 def groupFinished(self
, properties
):
3634 logger
.debug("groupFinished: " + str(properties
))
3638 def invitationResult(self
, result
):
3639 logger
.debug("invitationResult: " + str(result
))
3640 if result
['status'] != 1:
3641 raise Exception("Unexpected invitation result: " + str(result
))
3642 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3643 dev1
.group_ifname
= dev1_group_ifname
3646 def run_test(self
, *args
):
3647 logger
.debug("run_test")
3648 p2p
.Find(dbus
.Dictionary({'DiscoveryType': 'social'}))
3654 with
TestDbusP2p(bus
) as t
:
3656 raise Exception("Expected signals not seen")
3658 dev
[2].p2p_stop_find()
3660 def test_dbus_p2p_invitation_received(dev
, apdev
):
3661 """D-Bus P2P and InvitationReceived"""
3662 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
3663 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
3665 form(dev
[0], dev
[1])
3666 addr0
= dev
[0].p2p_dev_addr()
3668 dev
[0].global_request("SET persistent_reconnect 0")
3670 if not dev
[1].discover_peer(addr0
, social
=True):
3671 raise Exception("Peer " + addr0
+ " not found")
3672 peer
= dev
[1].get_peer(addr0
)
3674 class TestDbusP2p(TestDbus
):
3675 def __init__(self
, bus
):
3676 TestDbus
.__init
__(self
, bus
)
3679 def __enter__(self
):
3680 gobject
.timeout_add(1, self
.run_test
)
3681 gobject
.timeout_add(15000, self
.timeout
)
3682 self
.add_signal(self
.invitationReceived
, WPAS_DBUS_IFACE_P2PDEVICE
,
3683 "InvitationReceived")
3687 def invitationReceived(self
, result
):
3688 logger
.debug("invitationReceived: " + str(result
))
3692 def run_test(self
, *args
):
3693 logger
.debug("run_test")
3694 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3695 cmd
= "P2P_INVITE persistent=" + peer
['persistent'] + " peer=" + addr0
3696 dev1
.global_request(cmd
)
3702 with
TestDbusP2p(bus
) as t
:
3704 raise Exception("Expected signals not seen")
3706 dev
[0].p2p_stop_find()
3707 dev
[1].p2p_stop_find()
3709 def test_dbus_p2p_config(dev
, apdev
):
3710 """D-Bus Get/Set P2PDeviceConfig"""
3712 _test_dbus_p2p_config(dev
, apdev
)
3714 dev
[0].request("P2P_SET ssid_postfix ")
3716 def _test_dbus_p2p_config(dev
, apdev
):
3717 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
3718 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
3720 res
= if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "P2PDeviceConfig",
3721 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3723 if_obj
.Set(WPAS_DBUS_IFACE_P2PDEVICE
, "P2PDeviceConfig", res
,
3724 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3725 res2
= if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "P2PDeviceConfig",
3726 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3729 if len(res
) != len(res2
):
3730 raise Exception("Different number of parameters")
3732 if res
[k
] != res2
[k
]:
3733 raise Exception("Parameter %s value changes" % k
)
3735 changes
= { 'SsidPostfix': 'foo',
3736 'VendorExtension': [ dbus
.ByteArray('\x11\x22\x33\x44') ],
3737 'SecondaryDeviceTypes': [ dbus
.ByteArray('\x11\x22\x33\x44\x55\x66\x77\x88') ]}
3738 if_obj
.Set(WPAS_DBUS_IFACE_P2PDEVICE
, "P2PDeviceConfig",
3739 dbus
.Dictionary(changes
, signature
='sv'),
3740 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3742 res2
= if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "P2PDeviceConfig",
3743 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3745 logger
.debug("P2PDeviceConfig: " + str(res2
))
3746 if 'VendorExtension' not in res2
or len(res2
['VendorExtension']) != 1:
3747 raise Exception("VendorExtension does not match")
3748 if 'SecondaryDeviceTypes' not in res2
or len(res2
['SecondaryDeviceTypes']) != 1:
3749 raise Exception("SecondaryDeviceType does not match")
3751 changes
= { 'SsidPostfix': '',
3752 'VendorExtension': dbus
.Array([], signature
="ay"),
3753 'SecondaryDeviceTypes': dbus
.Array([], signature
="ay") }
3754 if_obj
.Set(WPAS_DBUS_IFACE_P2PDEVICE
, "P2PDeviceConfig",
3755 dbus
.Dictionary(changes
, signature
='sv'),
3756 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3758 res3
= if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "P2PDeviceConfig",
3759 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3761 logger
.debug("P2PDeviceConfig: " + str(res3
))
3762 if 'VendorExtension' in res3
:
3763 raise Exception("VendorExtension not removed")
3764 if 'SecondaryDeviceTypes' in res3
:
3765 raise Exception("SecondaryDeviceType not removed")
3768 dev
[0].request("P2P_SET disabled 1")
3769 if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "P2PDeviceConfig",
3770 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3772 raise Exception("Invalid Get(P2PDeviceConfig) accepted")
3773 except dbus
.exceptions
.DBusException
, e
:
3774 if "Error.Failed: P2P is not available for this interface" not in str(e
):
3775 raise Exception("Unexpected error message for invalid Invite: " + str(e
))
3777 dev
[0].request("P2P_SET disabled 0")
3780 dev
[0].request("P2P_SET disabled 1")
3781 changes
= { 'SsidPostfix': 'foo' }
3782 if_obj
.Set(WPAS_DBUS_IFACE_P2PDEVICE
, "P2PDeviceConfig",
3783 dbus
.Dictionary(changes
, signature
='sv'),
3784 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3785 raise Exception("Invalid Set(P2PDeviceConfig) accepted")
3786 except dbus
.exceptions
.DBusException
, e
:
3787 if "Error.Failed: P2P is not available for this interface" not in str(e
):
3788 raise Exception("Unexpected error message for invalid Invite: " + str(e
))
3790 dev
[0].request("P2P_SET disabled 0")
3792 tests
= [ { 'DeviceName': 123 },
3793 { 'SsidPostfix': 123 },
3795 for changes
in tests
:
3797 if_obj
.Set(WPAS_DBUS_IFACE_P2PDEVICE
, "P2PDeviceConfig",
3798 dbus
.Dictionary(changes
, signature
='sv'),
3799 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3800 raise Exception("Invalid Set(P2PDeviceConfig) accepted")
3801 except dbus
.exceptions
.DBusException
, e
:
3802 if "InvalidArgs" not in str(e
):
3803 raise Exception("Unexpected error message for invalid Invite: " + str(e
))
3805 def test_dbus_p2p_persistent(dev
, apdev
):
3806 """D-Bus P2P persistent group"""
3807 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
3808 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
3810 class TestDbusP2p(TestDbus
):
3811 def __init__(self
, bus
):
3812 TestDbus
.__init
__(self
, bus
)
3814 def __enter__(self
):
3815 gobject
.timeout_add(1, self
.run_test
)
3816 gobject
.timeout_add(15000, self
.timeout
)
3817 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
3819 self
.add_signal(self
.groupFinished
, WPAS_DBUS_IFACE_P2PDEVICE
,
3821 self
.add_signal(self
.persistentGroupAdded
,
3822 WPAS_DBUS_IFACE_P2PDEVICE
,
3823 "PersistentGroupAdded")
3827 def groupStarted(self
, properties
):
3828 logger
.debug("groupStarted: " + str(properties
))
3829 g_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
3830 properties
['interface_object'])
3831 group_p2p
= dbus
.Interface(g_if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
3832 group_p2p
.Disconnect()
3834 def groupFinished(self
, properties
):
3835 logger
.debug("groupFinished: " + str(properties
))
3838 def persistentGroupAdded(self
, path
, properties
):
3839 logger
.debug("persistentGroupAdded: %s %s" % (path
, str(properties
)))
3840 self
.persistent
= path
3842 def run_test(self
, *args
):
3843 logger
.debug("run_test")
3844 params
= dbus
.Dictionary({'persistent': True,
3846 logger
.info("Add a persistent group")
3847 p2p
.GroupAdd(params
)
3853 with
TestDbusP2p(bus
) as t
:
3855 raise Exception("Expected signals not seen")
3856 persistent
= t
.persistent
3858 p_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, persistent
)
3859 res
= p_obj
.Get(WPAS_DBUS_PERSISTENT_GROUP
, "Properties",
3860 dbus_interface
=dbus
.PROPERTIES_IFACE
, byte_arrays
=True)
3861 logger
.info("Persistent group Properties: " + str(res
))
3862 vals
= dbus
.Dictionary({ 'ssid': 'DIRECT-foo' }, signature
='sv')
3863 p_obj
.Set(WPAS_DBUS_PERSISTENT_GROUP
, "Properties", vals
,
3864 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3865 res2
= p_obj
.Get(WPAS_DBUS_PERSISTENT_GROUP
, "Properties",
3866 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3867 if len(res
) != len(res2
):
3868 raise Exception("Different number of parameters")
3870 if k
!= 'ssid' and res
[k
] != res2
[k
]:
3871 raise Exception("Parameter %s value changes" % k
)
3872 if res2
['ssid'] != '"DIRECT-foo"':
3873 raise Exception("Unexpected ssid")
3875 args
= dbus
.Dictionary({ 'ssid': 'DIRECT-testing',
3876 'psk': '1234567890' }, signature
='sv')
3877 group
= p2p
.AddPersistentGroup(args
)
3879 groups
= if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "PersistentGroups",
3880 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3881 if len(groups
) != 2:
3882 raise Exception("Unexpected number of persistent groups: " + str(groups
))
3884 p2p
.RemoveAllPersistentGroups()
3886 groups
= if_obj
.Get(WPAS_DBUS_IFACE_P2PDEVICE
, "PersistentGroups",
3887 dbus_interface
=dbus
.PROPERTIES_IFACE
)
3888 if len(groups
) != 0:
3889 raise Exception("Unexpected number of persistent groups: " + str(groups
))
3892 p2p
.RemovePersistentGroup(persistent
)
3893 raise Exception("Invalid RemovePersistentGroup accepted")
3894 except dbus
.exceptions
.DBusException
, e
:
3895 if "NetworkUnknown: There is no such persistent group" not in str(e
):
3896 raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e
))
3898 def test_dbus_p2p_reinvoke_persistent(dev
, apdev
):
3899 """D-Bus P2P reinvoke persistent group"""
3900 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
3901 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
3903 addr0
= dev
[0].p2p_dev_addr()
3905 class TestDbusP2p(TestDbus
):
3906 def __init__(self
, bus
):
3907 TestDbus
.__init
__(self
, bus
)
3909 self
.waiting_end
= False
3911 self
.invited
= False
3913 def __enter__(self
):
3914 gobject
.timeout_add(1, self
.run_test
)
3915 gobject
.timeout_add(15000, self
.timeout
)
3916 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
3918 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
3920 self
.add_signal(self
.groupFinished
, WPAS_DBUS_IFACE_P2PDEVICE
,
3922 self
.add_signal(self
.persistentGroupAdded
,
3923 WPAS_DBUS_IFACE_P2PDEVICE
,
3924 "PersistentGroupAdded")
3925 self
.add_signal(self
.provisionDiscoveryRequestDisplayPin
,
3926 WPAS_DBUS_IFACE_P2PDEVICE
,
3927 "ProvisionDiscoveryRequestDisplayPin")
3928 self
.add_signal(self
.staAuthorized
, WPAS_DBUS_IFACE
,
3933 def groupStarted(self
, properties
):
3934 logger
.debug("groupStarted: " + str(properties
))
3935 self
.g_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
3936 properties
['interface_object'])
3937 if not self
.invited
:
3938 g_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
3939 properties
['group_object'])
3940 res
= g_obj
.GetAll(WPAS_DBUS_GROUP
,
3941 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3943 bssid
= ':'.join([binascii
.hexlify(l
) for l
in res
['BSSID']])
3944 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3945 dev1
.scan_for_bss(bssid
, freq
=2412)
3946 dev1
.global_request("P2P_CONNECT " + addr0
+ " 12345670 join")
3948 def groupFinished(self
, properties
):
3949 logger
.debug("groupFinished: " + str(properties
))
3954 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3955 dev1
.global_request("SET persistent_reconnect 1")
3958 args
= { 'persistent_group_object': dbus
.ObjectPath(path
),
3959 'peer': self
.peer_path
}
3961 pin
= p2p
.Invite(args
)
3962 raise Exception("Invalid Invite accepted")
3963 except dbus
.exceptions
.DBusException
, e
:
3964 if "InvalidArgs" not in str(e
):
3965 raise Exception("Unexpected error message for invalid Invite: " + str(e
))
3967 args
= { 'persistent_group_object': self
.persistent
,
3968 'peer': self
.peer_path
}
3969 pin
= p2p
.Invite(args
)
3972 self
.sta_group_ev
= dev1
.wait_global_event(["P2P-GROUP-STARTED"],
3974 if self
.sta_group_ev
is None:
3975 raise Exception("P2P-GROUP-STARTED event not seen")
3977 def persistentGroupAdded(self
, path
, properties
):
3978 logger
.debug("persistentGroupAdded: %s %s" % (path
, str(properties
)))
3979 self
.persistent
= path
3981 def deviceFound(self
, path
):
3982 logger
.debug("deviceFound: path=%s" % path
)
3983 peer_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, path
)
3984 self
.peer
= peer_obj
.GetAll(WPAS_DBUS_P2P_PEER
,
3985 dbus_interface
=dbus
.PROPERTIES_IFACE
,
3988 def provisionDiscoveryRequestDisplayPin(self
, peer_object
, pin
):
3989 logger
.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object
, pin
))
3990 self
.peer_path
= peer_object
3991 peer
= binascii
.unhexlify(peer_object
.split('/')[-1])
3996 addr
+= '%02x' % ord(p
)
3997 params
= { 'Role': 'registrar',
3998 'P2PDeviceAddress': self
.peer
['DeviceAddress'],
3999 'Bssid': self
.peer
['DeviceAddress'],
4002 logger
.info("Authorize peer to connect to the group")
4003 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4004 wps
= dbus
.Interface(self
.g_if_obj
, WPAS_DBUS_IFACE_WPS
)
4006 self
.sta_group_ev
= dev1
.wait_global_event(["P2P-GROUP-STARTED"],
4008 if self
.sta_group_ev
is None:
4009 raise Exception("P2P-GROUP-STARTED event not seen")
4011 def staAuthorized(self
, name
):
4012 logger
.debug("staAuthorized: " + name
)
4013 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4014 dev1
.group_form_result(self
.sta_group_ev
)
4016 ev
= dev1
.wait_global_event(["P2P-GROUP-REMOVED"], timeout
=10)
4018 raise Exception("Group removal timed out")
4019 group_p2p
= dbus
.Interface(self
.g_if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
4020 group_p2p
.Disconnect()
4022 def run_test(self
, *args
):
4023 logger
.debug("run_test")
4024 params
= dbus
.Dictionary({'persistent': True,
4026 logger
.info("Add a persistent group")
4027 p2p
.GroupAdd(params
)
4033 with
TestDbusP2p(bus
) as t
:
4035 raise Exception("Expected signals not seen")
4037 def test_dbus_p2p_go_neg_rx(dev
, apdev
):
4038 """D-Bus P2P GO Negotiation receive"""
4039 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
4040 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
4041 addr0
= dev
[0].p2p_dev_addr()
4043 class TestDbusP2p(TestDbus
):
4044 def __init__(self
, bus
):
4045 TestDbus
.__init
__(self
, bus
)
4048 def __enter__(self
):
4049 gobject
.timeout_add(1, self
.run_test
)
4050 gobject
.timeout_add(15000, self
.timeout
)
4051 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
4053 self
.add_signal(self
.goNegotiationRequest
,
4054 WPAS_DBUS_IFACE_P2PDEVICE
,
4055 "GONegotiationRequest",
4057 self
.add_signal(self
.goNegotiationSuccess
,
4058 WPAS_DBUS_IFACE_P2PDEVICE
,
4059 "GONegotiationSuccess",
4061 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
4063 self
.add_signal(self
.groupFinished
, WPAS_DBUS_IFACE_P2PDEVICE
,
4068 def deviceFound(self
, path
):
4069 logger
.debug("deviceFound: path=%s" % path
)
4071 def goNegotiationRequest(self
, path
, dev_passwd_id
, go_intent
=0):
4072 logger
.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path
, dev_passwd_id
, go_intent
))
4073 if dev_passwd_id
!= 1:
4074 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id
)
4075 args
= { 'peer': path
, 'wps_method': 'display', 'pin': '12345670',
4076 'go_intent': 15, 'persistent': False, 'frequency': 5175 }
4079 raise Exception("Invalid Connect accepted")
4080 except dbus
.exceptions
.DBusException
, e
:
4081 if "ConnectChannelUnsupported" not in str(e
):
4082 raise Exception("Unexpected error message for invalid Connect: " + str(e
))
4084 args
= { 'peer': path
, 'wps_method': 'display', 'pin': '12345670',
4085 'go_intent': 15, 'persistent': False }
4088 def goNegotiationSuccess(self
, properties
):
4089 logger
.debug("goNegotiationSuccess: properties=%s" % str(properties
))
4091 def groupStarted(self
, properties
):
4092 logger
.debug("groupStarted: " + str(properties
))
4093 g_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
4094 properties
['interface_object'])
4095 group_p2p
= dbus
.Interface(g_if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
4096 group_p2p
.Disconnect()
4098 def groupFinished(self
, properties
):
4099 logger
.debug("groupFinished: " + str(properties
))
4103 def run_test(self
, *args
):
4104 logger
.debug("run_test")
4106 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4107 if not dev1
.discover_peer(addr0
):
4108 raise Exception("Peer not found")
4109 dev1
.global_request("P2P_CONNECT " + addr0
+ " 12345670 enter")
4115 with
TestDbusP2p(bus
) as t
:
4117 raise Exception("Expected signals not seen")
4119 def test_dbus_p2p_go_neg_auth(dev
, apdev
):
4120 """D-Bus P2P GO Negotiation authorized"""
4121 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
4122 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
4123 addr0
= dev
[0].p2p_dev_addr()
4126 class TestDbusP2p(TestDbus
):
4127 def __init__(self
, bus
):
4128 TestDbus
.__init
__(self
, bus
)
4130 self
.peer_joined
= False
4131 self
.peer_disconnected
= False
4133 def __enter__(self
):
4134 gobject
.timeout_add(1, self
.run_test
)
4135 gobject
.timeout_add(15000, self
.timeout
)
4136 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
4138 self
.add_signal(self
.goNegotiationSuccess
,
4139 WPAS_DBUS_IFACE_P2PDEVICE
,
4140 "GONegotiationSuccess",
4142 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
4144 self
.add_signal(self
.groupFinished
, WPAS_DBUS_IFACE_P2PDEVICE
,
4146 self
.add_signal(self
.staDeauthorized
, WPAS_DBUS_IFACE
,
4148 self
.add_signal(self
.peerJoined
, WPAS_DBUS_GROUP
,
4150 self
.add_signal(self
.peerDisconnected
, WPAS_DBUS_GROUP
,
4155 def deviceFound(self
, path
):
4156 logger
.debug("deviceFound: path=%s" % path
)
4157 args
= { 'peer': path
, 'wps_method': 'keypad',
4158 'go_intent': 15, 'authorize_only': True }
4161 raise Exception("Invalid Connect accepted")
4162 except dbus
.exceptions
.DBusException
, e
:
4163 if "InvalidArgs" not in str(e
):
4164 raise Exception("Unexpected error message for invalid Connect: " + str(e
))
4166 args
= { 'peer': path
, 'wps_method': 'keypad', 'pin': '12345670',
4167 'go_intent': 15, 'authorize_only': True }
4170 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4171 if not dev1
.discover_peer(addr0
):
4172 raise Exception("Peer not found")
4173 dev1
.global_request("P2P_CONNECT " + addr0
+ " 12345670 display go_intent=0")
4174 ev
= dev1
.wait_global_event(["P2P-GROUP-STARTED"], timeout
=15);
4176 raise Exception("Group formation timed out")
4177 self
.sta_group_ev
= ev
4179 def goNegotiationSuccess(self
, properties
):
4180 logger
.debug("goNegotiationSuccess: properties=%s" % str(properties
))
4182 def groupStarted(self
, properties
):
4183 logger
.debug("groupStarted: " + str(properties
))
4184 self
.g_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
4185 properties
['interface_object'])
4186 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4187 dev1
.group_form_result(self
.sta_group_ev
)
4190 def staDeauthorized(self
, name
):
4191 logger
.debug("staDeuthorized: " + name
)
4192 group_p2p
= dbus
.Interface(self
.g_if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
4193 group_p2p
.Disconnect()
4195 def peerJoined(self
, peer
):
4196 logger
.debug("peerJoined: " + peer
)
4197 self
.peer_joined
= True
4199 def peerDisconnected(self
, peer
):
4200 logger
.debug("peerDisconnected: " + peer
)
4201 self
.peer_disconnected
= True
4203 def groupFinished(self
, properties
):
4204 logger
.debug("groupFinished: " + str(properties
))
4208 def run_test(self
, *args
):
4209 logger
.debug("run_test")
4210 p2p
.Find(dbus
.Dictionary({'DiscoveryType': 'social'}))
4214 return self
.done
and self
.peer_joined
and self
.peer_disconnected
4216 with
TestDbusP2p(bus
) as t
:
4218 raise Exception("Expected signals not seen")
4220 def test_dbus_p2p_go_neg_init(dev
, apdev
):
4221 """D-Bus P2P GO Negotiation initiation"""
4222 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
4223 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
4224 addr0
= dev
[0].p2p_dev_addr()
4227 class TestDbusP2p(TestDbus
):
4228 def __init__(self
, bus
):
4229 TestDbus
.__init
__(self
, bus
)
4231 self
.peer_group_added
= False
4232 self
.peer_group_removed
= False
4234 def __enter__(self
):
4235 gobject
.timeout_add(1, self
.run_test
)
4236 gobject
.timeout_add(15000, self
.timeout
)
4237 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
4239 self
.add_signal(self
.goNegotiationSuccess
,
4240 WPAS_DBUS_IFACE_P2PDEVICE
,
4241 "GONegotiationSuccess",
4243 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
4245 self
.add_signal(self
.groupFinished
, WPAS_DBUS_IFACE_P2PDEVICE
,
4247 self
.add_signal(self
.propertiesChanged
, dbus
.PROPERTIES_IFACE
,
4248 "PropertiesChanged")
4252 def deviceFound(self
, path
):
4253 logger
.debug("deviceFound: path=%s" % path
)
4254 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4255 args
= { 'peer': path
, 'wps_method': 'keypad', 'pin': '12345670',
4259 ev
= dev1
.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout
=15)
4261 raise Exception("Timeout while waiting for GO Neg Request")
4262 dev1
.global_request("P2P_CONNECT " + addr0
+ " 12345670 display go_intent=15")
4263 ev
= dev1
.wait_global_event(["P2P-GROUP-STARTED"], timeout
=15);
4265 raise Exception("Group formation timed out")
4266 self
.sta_group_ev
= ev
4268 def goNegotiationSuccess(self
, properties
):
4269 logger
.debug("goNegotiationSuccess: properties=%s" % str(properties
))
4271 def groupStarted(self
, properties
):
4272 logger
.debug("groupStarted: " + str(properties
))
4273 g_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
4274 properties
['interface_object'])
4275 group_p2p
= dbus
.Interface(g_if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
4276 group_p2p
.Disconnect()
4277 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4278 dev1
.group_form_result(self
.sta_group_ev
)
4281 def groupFinished(self
, properties
):
4282 logger
.debug("groupFinished: " + str(properties
))
4285 def propertiesChanged(self
, interface_name
, changed_properties
,
4286 invalidated_properties
):
4287 logger
.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name
, str(changed_properties
), str(invalidated_properties
)))
4288 if interface_name
!= WPAS_DBUS_P2P_PEER
:
4290 if "Groups" not in changed_properties
:
4292 if len(changed_properties
["Groups"]) > 0:
4293 self
.peer_group_added
= True
4294 if len(changed_properties
["Groups"]) == 0:
4295 self
.peer_group_removed
= True
4298 def run_test(self
, *args
):
4299 logger
.debug("run_test")
4300 p2p
.Find(dbus
.Dictionary({'DiscoveryType': 'social'}))
4304 return self
.done
and self
.peer_group_added
and self
.peer_group_removed
4306 with
TestDbusP2p(bus
) as t
:
4308 raise Exception("Expected signals not seen")
4310 def test_dbus_p2p_group_termination_by_go(dev
, apdev
):
4311 """D-Bus P2P group removal on GO terminating the group"""
4312 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
4313 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
4314 addr0
= dev
[0].p2p_dev_addr()
4317 class TestDbusP2p(TestDbus
):
4318 def __init__(self
, bus
):
4319 TestDbus
.__init
__(self
, bus
)
4321 self
.peer_group_added
= False
4322 self
.peer_group_removed
= False
4324 def __enter__(self
):
4325 gobject
.timeout_add(1, self
.run_test
)
4326 gobject
.timeout_add(15000, self
.timeout
)
4327 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
4329 self
.add_signal(self
.goNegotiationSuccess
,
4330 WPAS_DBUS_IFACE_P2PDEVICE
,
4331 "GONegotiationSuccess",
4333 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
4335 self
.add_signal(self
.groupFinished
, WPAS_DBUS_IFACE_P2PDEVICE
,
4337 self
.add_signal(self
.propertiesChanged
, dbus
.PROPERTIES_IFACE
,
4338 "PropertiesChanged")
4342 def deviceFound(self
, path
):
4343 logger
.debug("deviceFound: path=%s" % path
)
4344 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4345 args
= { 'peer': path
, 'wps_method': 'keypad', 'pin': '12345670',
4349 ev
= dev1
.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout
=15)
4351 raise Exception("Timeout while waiting for GO Neg Request")
4352 dev1
.global_request("P2P_CONNECT " + addr0
+ " 12345670 display go_intent=15")
4353 ev
= dev1
.wait_global_event(["P2P-GROUP-STARTED"], timeout
=15);
4355 raise Exception("Group formation timed out")
4356 self
.sta_group_ev
= ev
4358 def goNegotiationSuccess(self
, properties
):
4359 logger
.debug("goNegotiationSuccess: properties=%s" % str(properties
))
4361 def groupStarted(self
, properties
):
4362 logger
.debug("groupStarted: " + str(properties
))
4363 g_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
4364 properties
['interface_object'])
4365 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4366 dev1
.group_form_result(self
.sta_group_ev
)
4369 def groupFinished(self
, properties
):
4370 logger
.debug("groupFinished: " + str(properties
))
4373 def propertiesChanged(self
, interface_name
, changed_properties
,
4374 invalidated_properties
):
4375 logger
.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name
, str(changed_properties
), str(invalidated_properties
)))
4376 if interface_name
!= WPAS_DBUS_P2P_PEER
:
4378 if "Groups" not in changed_properties
:
4380 if len(changed_properties
["Groups"]) > 0:
4381 self
.peer_group_added
= True
4382 if len(changed_properties
["Groups"]) == 0 and self
.peer_group_added
:
4383 self
.peer_group_removed
= True
4386 def run_test(self
, *args
):
4387 logger
.debug("run_test")
4388 p2p
.Find(dbus
.Dictionary({'DiscoveryType': 'social'}))
4392 return self
.done
and self
.peer_group_added
and self
.peer_group_removed
4394 with
TestDbusP2p(bus
) as t
:
4396 raise Exception("Expected signals not seen")
4398 def test_dbus_p2p_group_idle_timeout(dev
, apdev
):
4399 """D-Bus P2P group removal on idle timeout"""
4401 dev
[0].global_request("SET p2p_group_idle 1")
4402 _test_dbus_p2p_group_idle_timeout(dev
, apdev
)
4404 dev
[0].global_request("SET p2p_group_idle 0")
4406 def _test_dbus_p2p_group_idle_timeout(dev
, apdev
):
4407 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
4408 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
4409 addr0
= dev
[0].p2p_dev_addr()
4412 class TestDbusP2p(TestDbus
):
4413 def __init__(self
, bus
):
4414 TestDbus
.__init
__(self
, bus
)
4416 self
.group_started
= False
4417 self
.peer_group_added
= False
4418 self
.peer_group_removed
= False
4420 def __enter__(self
):
4421 gobject
.timeout_add(1, self
.run_test
)
4422 gobject
.timeout_add(15000, self
.timeout
)
4423 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
4425 self
.add_signal(self
.goNegotiationSuccess
,
4426 WPAS_DBUS_IFACE_P2PDEVICE
,
4427 "GONegotiationSuccess",
4429 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
4431 self
.add_signal(self
.groupFinished
, WPAS_DBUS_IFACE_P2PDEVICE
,
4433 self
.add_signal(self
.propertiesChanged
, dbus
.PROPERTIES_IFACE
,
4434 "PropertiesChanged")
4438 def deviceFound(self
, path
):
4439 logger
.debug("deviceFound: path=%s" % path
)
4440 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4441 args
= { 'peer': path
, 'wps_method': 'keypad', 'pin': '12345670',
4445 ev
= dev1
.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout
=15)
4447 raise Exception("Timeout while waiting for GO Neg Request")
4448 dev1
.global_request("P2P_CONNECT " + addr0
+ " 12345670 display go_intent=15")
4449 ev
= dev1
.wait_global_event(["P2P-GROUP-STARTED"], timeout
=15);
4451 raise Exception("Group formation timed out")
4452 self
.sta_group_ev
= ev
4454 def goNegotiationSuccess(self
, properties
):
4455 logger
.debug("goNegotiationSuccess: properties=%s" % str(properties
))
4457 def groupStarted(self
, properties
):
4458 logger
.debug("groupStarted: " + str(properties
))
4459 self
.group_started
= True
4460 g_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
4461 properties
['interface_object'])
4462 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4463 dev1
.group_form_result(self
.sta_group_ev
)
4464 ifaddr
= dev1
.group_request("STA-FIRST").splitlines()[0]
4465 # Force disassociation with different reason code so that the
4466 # P2P Client using D-Bus does not get normal group termination event
4468 dev1
.group_request("DEAUTHENTICATE " + ifaddr
+ " reason=0 test=0")
4471 def groupFinished(self
, properties
):
4472 logger
.debug("groupFinished: " + str(properties
))
4475 def propertiesChanged(self
, interface_name
, changed_properties
,
4476 invalidated_properties
):
4477 logger
.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name
, str(changed_properties
), str(invalidated_properties
)))
4478 if interface_name
!= WPAS_DBUS_P2P_PEER
:
4480 if not self
.group_started
:
4482 if "Groups" not in changed_properties
:
4484 if len(changed_properties
["Groups"]) > 0:
4485 self
.peer_group_added
= True
4486 if len(changed_properties
["Groups"]) == 0:
4487 self
.peer_group_removed
= True
4490 def run_test(self
, *args
):
4491 logger
.debug("run_test")
4492 p2p
.Find(dbus
.Dictionary({'DiscoveryType': 'social'}))
4496 return self
.done
and self
.peer_group_added
and self
.peer_group_removed
4498 with
TestDbusP2p(bus
) as t
:
4500 raise Exception("Expected signals not seen")
4502 def test_dbus_p2p_wps_failure(dev
, apdev
):
4503 """D-Bus P2P WPS failure"""
4504 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
4505 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
4506 addr0
= dev
[0].p2p_dev_addr()
4508 class TestDbusP2p(TestDbus
):
4509 def __init__(self
, bus
):
4510 TestDbus
.__init
__(self
, bus
)
4511 self
.wps_failed
= False
4512 self
.formation_failure
= False
4514 def __enter__(self
):
4515 gobject
.timeout_add(1, self
.run_test
)
4516 gobject
.timeout_add(15000, self
.timeout
)
4517 self
.add_signal(self
.goNegotiationRequest
,
4518 WPAS_DBUS_IFACE_P2PDEVICE
,
4519 "GONegotiationRequest",
4521 self
.add_signal(self
.goNegotiationSuccess
,
4522 WPAS_DBUS_IFACE_P2PDEVICE
,
4523 "GONegotiationSuccess",
4525 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
4527 self
.add_signal(self
.wpsFailed
, WPAS_DBUS_IFACE_P2PDEVICE
,
4529 self
.add_signal(self
.groupFormationFailure
,
4530 WPAS_DBUS_IFACE_P2PDEVICE
,
4531 "GroupFormationFailure")
4535 def goNegotiationRequest(self
, path
, dev_passwd_id
, go_intent
=0):
4536 logger
.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path
, dev_passwd_id
, go_intent
))
4537 if dev_passwd_id
!= 1:
4538 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id
)
4539 args
= { 'peer': path
, 'wps_method': 'display', 'pin': '12345670',
4543 def goNegotiationSuccess(self
, properties
):
4544 logger
.debug("goNegotiationSuccess: properties=%s" % str(properties
))
4546 def groupStarted(self
, properties
):
4547 logger
.debug("groupStarted: " + str(properties
))
4548 raise Exception("Unexpected GroupStarted")
4550 def wpsFailed(self
, name
, args
):
4551 logger
.debug("wpsFailed - name=%s args=%s" % (name
, str(args
)))
4552 self
.wps_failed
= True
4553 if self
.formation_failure
:
4556 def groupFormationFailure(self
, reason
):
4557 logger
.debug("groupFormationFailure - reason=%s" % reason
)
4558 self
.formation_failure
= True
4562 def run_test(self
, *args
):
4563 logger
.debug("run_test")
4565 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4566 if not dev1
.discover_peer(addr0
):
4567 raise Exception("Peer not found")
4568 dev1
.global_request("P2P_CONNECT " + addr0
+ " 87654321 enter")
4572 return self
.wps_failed
and self
.formation_failure
4574 with
TestDbusP2p(bus
) as t
:
4576 raise Exception("Expected signals not seen")
4578 def test_dbus_p2p_two_groups(dev
, apdev
):
4579 """D-Bus P2P with two concurrent groups"""
4580 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
4581 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
4583 dev
[0].request("SET p2p_no_group_iface 0")
4584 addr0
= dev
[0].p2p_dev_addr()
4585 addr1
= dev
[1].p2p_dev_addr()
4586 addr2
= dev
[2].p2p_dev_addr()
4587 dev
[1].p2p_start_go(freq
=2412)
4588 dev1_group_ifname
= dev
[1].group_ifname
4590 class TestDbusP2p(TestDbus
):
4591 def __init__(self
, bus
):
4592 TestDbus
.__init
__(self
, bus
)
4598 self
.groups_removed
= False
4600 def __enter__(self
):
4601 gobject
.timeout_add(1, self
.run_test
)
4602 gobject
.timeout_add(15000, self
.timeout
)
4603 self
.add_signal(self
.propertiesChanged
, dbus
.PROPERTIES_IFACE
,
4604 "PropertiesChanged", byte_arrays
=True)
4605 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
4607 self
.add_signal(self
.groupStarted
, WPAS_DBUS_IFACE_P2PDEVICE
,
4609 self
.add_signal(self
.groupFinished
, WPAS_DBUS_IFACE_P2PDEVICE
,
4611 self
.add_signal(self
.peerJoined
, WPAS_DBUS_GROUP
,
4616 def propertiesChanged(self
, interface_name
, changed_properties
,
4617 invalidated_properties
):
4618 logger
.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name
, str(changed_properties
), str(invalidated_properties
)))
4620 def deviceFound(self
, path
):
4621 logger
.debug("deviceFound: path=%s" % path
)
4622 if addr2
.replace(':','') in path
:
4624 elif addr1
.replace(':','') in path
:
4626 if self
.go
and not self
.group1
:
4627 logger
.info("Join the group")
4630 dev1
= WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4631 dev1
.group_ifname
= dev1_group_ifname
4632 dev1
.group_request("WPS_PIN any " + pin
)
4633 args
= { 'peer': self
.go
,
4635 'wps_method': 'pin',
4640 def groupStarted(self
, properties
):
4641 logger
.debug("groupStarted: " + str(properties
))
4642 prop
= if_obj
.GetAll(WPAS_DBUS_IFACE_P2PDEVICE
,
4643 dbus_interface
=dbus
.PROPERTIES_IFACE
)
4644 logger
.debug("p2pdevice properties: " + str(prop
))
4646 g_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
4647 properties
['group_object'])
4648 res
= g_obj
.GetAll(WPAS_DBUS_GROUP
,
4649 dbus_interface
=dbus
.PROPERTIES_IFACE
,
4651 logger
.debug("Group properties: " + str(res
))
4654 self
.group1
= properties
['group_object']
4655 self
.group1iface
= properties
['interface_object']
4656 self
.g1_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
4659 logger
.info("Start autonomous GO")
4660 params
= dbus
.Dictionary({ 'frequency': 2412 })
4661 p2p
.GroupAdd(params
)
4662 elif not self
.group2
:
4663 self
.group2
= properties
['group_object']
4664 self
.group2iface
= properties
['interface_object']
4665 self
.g2_if_obj
= bus
.get_object(WPAS_DBUS_SERVICE
,
4667 self
.g2_bssid
= res
['BSSID']
4669 if self
.group1
and self
.group2
:
4670 logger
.info("Authorize peer to join the group")
4671 a2
= binascii
.unhexlify(addr2
.replace(':',''))
4672 params
= { 'Role': 'enrollee',
4673 'P2PDeviceAddress': dbus
.ByteArray(a2
),
4674 'Bssid': dbus
.ByteArray(a2
),
4677 g_wps
= dbus
.Interface(self
.g2_if_obj
, WPAS_DBUS_IFACE_WPS
)
4680 bssid
= ':'.join([binascii
.hexlify(l
) for l
in self
.g2_bssid
])
4681 dev2
= WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
4682 dev2
.scan_for_bss(bssid
, freq
=2412)
4683 dev2
.global_request("P2P_CONNECT " + bssid
+ " 12345670 join freq=2412")
4684 ev
= dev2
.wait_global_event(["P2P-GROUP-STARTED"], timeout
=15);
4686 raise Exception("Group join timed out")
4687 self
.dev2_group_ev
= ev
4689 def groupFinished(self
, properties
):
4690 logger
.debug("groupFinished: " + str(properties
))
4692 if self
.group1
== properties
['group_object']:
4694 elif self
.group2
== properties
['group_object']:
4697 if not self
.group1
and not self
.group2
:
4701 def peerJoined(self
, peer
):
4702 logger
.debug("peerJoined: " + peer
)
4703 if self
.groups_removed
:
4705 self
.check_results()
4707 dev2
= WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
4708 dev2
.group_form_result(self
.dev2_group_ev
)
4711 logger
.info("Disconnect group2")
4712 group_p2p
= dbus
.Interface(self
.g2_if_obj
,
4713 WPAS_DBUS_IFACE_P2PDEVICE
)
4714 group_p2p
.Disconnect()
4716 logger
.info("Disconnect group1")
4717 group_p2p
= dbus
.Interface(self
.g1_if_obj
,
4718 WPAS_DBUS_IFACE_P2PDEVICE
)
4719 group_p2p
.Disconnect()
4720 self
.groups_removed
= True
4722 def check_results(self
):
4723 logger
.info("Check results with two concurrent groups in operation")
4725 g1_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, self
.group1
)
4726 res1
= g1_obj
.GetAll(WPAS_DBUS_GROUP
,
4727 dbus_interface
=dbus
.PROPERTIES_IFACE
,
4730 g2_obj
= bus
.get_object(WPAS_DBUS_SERVICE
, self
.group2
)
4731 res2
= g2_obj
.GetAll(WPAS_DBUS_GROUP
,
4732 dbus_interface
=dbus
.PROPERTIES_IFACE
,
4735 logger
.info("group1 = " + self
.group1
)
4736 logger
.debug("Group properties: " + str(res1
))
4738 logger
.info("group2 = " + self
.group2
)
4739 logger
.debug("Group properties: " + str(res2
))
4741 prop
= if_obj
.GetAll(WPAS_DBUS_IFACE_P2PDEVICE
,
4742 dbus_interface
=dbus
.PROPERTIES_IFACE
)
4743 logger
.debug("p2pdevice properties: " + str(prop
))
4745 if res1
['Role'] != 'client':
4746 raise Exception("Group1 role reported incorrectly: " + res1
['Role'])
4747 if res2
['Role'] != 'GO':
4748 raise Exception("Group2 role reported incorrectly: " + res2
['Role'])
4749 if prop
['Role'] != 'device':
4750 raise Exception("p2pdevice role reported incorrectly: " + prop
['Role'])
4752 if len(res2
['Members']) != 1:
4753 raise Exception("Unexpected Members value for group 2")
4755 def run_test(self
, *args
):
4756 logger
.debug("run_test")
4757 p2p
.Find(dbus
.Dictionary({'DiscoveryType': 'social'}))
4763 with
TestDbusP2p(bus
) as t
:
4765 raise Exception("Expected signals not seen")
4767 dev
[1].remove_group()
4769 def test_dbus_p2p_cancel(dev
, apdev
):
4770 """D-Bus P2P Cancel"""
4771 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
4772 p2p
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE_P2PDEVICE
)
4775 raise Exception("Unexpected p2p.Cancel() success")
4776 except dbus
.exceptions
.DBusException
, e
:
4779 addr0
= dev
[0].p2p_dev_addr()
4782 class TestDbusP2p(TestDbus
):
4783 def __init__(self
, bus
):
4784 TestDbus
.__init
__(self
, bus
)
4787 def __enter__(self
):
4788 gobject
.timeout_add(1, self
.run_test
)
4789 gobject
.timeout_add(15000, self
.timeout
)
4790 self
.add_signal(self
.deviceFound
, WPAS_DBUS_IFACE_P2PDEVICE
,
4795 def deviceFound(self
, path
):
4796 logger
.debug("deviceFound: path=%s" % path
)
4797 args
= { 'peer': path
, 'wps_method': 'keypad', 'pin': '12345670',
4804 def run_test(self
, *args
):
4805 logger
.debug("run_test")
4806 p2p
.Find(dbus
.Dictionary({'DiscoveryType': 'social'}))
4812 with
TestDbusP2p(bus
) as t
:
4814 raise Exception("Expected signals not seen")
4816 def test_dbus_introspect(dev
, apdev
):
4817 """D-Bus introspection"""
4818 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
4820 res
= if_obj
.Introspect(WPAS_DBUS_IFACE
,
4821 dbus_interface
=dbus
.INTROSPECTABLE_IFACE
)
4822 logger
.info("Initial Introspect: " + str(res
))
4823 if res
is None or "Introspectable" not in res
or "GroupStarted" not in res
:
4824 raise Exception("Unexpected initial Introspect response: " + str(res
))
4825 if "FastReauth" not in res
or "PassiveScan" not in res
:
4826 raise Exception("Unexpected initial Introspect response: " + str(res
))
4828 with
alloc_fail(dev
[0], 1, "wpa_dbus_introspect"):
4829 res2
= if_obj
.Introspect(WPAS_DBUS_IFACE
,
4830 dbus_interface
=dbus
.INTROSPECTABLE_IFACE
)
4831 logger
.info("Introspect: " + str(res2
))
4832 if res2
is not None:
4833 raise Exception("Unexpected Introspect response")
4835 with
alloc_fail(dev
[0], 1, "=add_interface;wpa_dbus_introspect"):
4836 res2
= if_obj
.Introspect(WPAS_DBUS_IFACE
,
4837 dbus_interface
=dbus
.INTROSPECTABLE_IFACE
)
4838 logger
.info("Introspect: " + str(res2
))
4840 raise Exception("No Introspect response")
4841 if len(res2
) >= len(res
):
4842 raise Exception("Unexpected Introspect response")
4844 with
alloc_fail(dev
[0], 1, "wpabuf_alloc;add_interface;wpa_dbus_introspect"):
4845 res2
= if_obj
.Introspect(WPAS_DBUS_IFACE
,
4846 dbus_interface
=dbus
.INTROSPECTABLE_IFACE
)
4847 logger
.info("Introspect: " + str(res2
))
4849 raise Exception("No Introspect response")
4850 if len(res2
) >= len(res
):
4851 raise Exception("Unexpected Introspect response")
4853 with
alloc_fail(dev
[0], 2, "=add_interface;wpa_dbus_introspect"):
4854 res2
= if_obj
.Introspect(WPAS_DBUS_IFACE
,
4855 dbus_interface
=dbus
.INTROSPECTABLE_IFACE
)
4856 logger
.info("Introspect: " + str(res2
))
4858 raise Exception("No Introspect response")
4859 if len(res2
) >= len(res
):
4860 raise Exception("Unexpected Introspect response")
4862 def test_dbus_ap(dev
, apdev
):
4863 """D-Bus AddNetwork for AP mode"""
4864 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
4865 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
4867 ssid
= "test-wpa2-psk"
4868 passphrase
= 'qwertyuiop'
4870 class TestDbusConnect(TestDbus
):
4871 def __init__(self
, bus
):
4872 TestDbus
.__init
__(self
, bus
)
4873 self
.started
= False
4875 def __enter__(self
):
4876 gobject
.timeout_add(1, self
.run_connect
)
4877 gobject
.timeout_add(15000, self
.timeout
)
4878 self
.add_signal(self
.networkAdded
, WPAS_DBUS_IFACE
, "NetworkAdded")
4879 self
.add_signal(self
.networkSelected
, WPAS_DBUS_IFACE
,
4881 self
.add_signal(self
.propertiesChanged
, WPAS_DBUS_IFACE
,
4882 "PropertiesChanged")
4886 def networkAdded(self
, network
, properties
):
4887 logger
.debug("networkAdded: %s" % str(network
))
4888 logger
.debug(str(properties
))
4890 def networkSelected(self
, network
):
4891 logger
.debug("networkSelected: %s" % str(network
))
4892 self
.network_selected
= True
4894 def propertiesChanged(self
, properties
):
4895 logger
.debug("propertiesChanged: %s" % str(properties
))
4896 if 'State' in properties
and properties
['State'] == "completed":
4900 def run_connect(self
, *args
):
4901 logger
.debug("run_connect")
4902 args
= dbus
.Dictionary({ 'ssid': ssid
,
4903 'key_mgmt': 'WPA-PSK',
4906 'frequency': 2412 },
4908 self
.netw
= iface
.AddNetwork(args
)
4909 iface
.SelectNetwork(self
.netw
)
4915 with
TestDbusConnect(bus
) as t
:
4917 raise Exception("Expected signals not seen")
4918 dev
[1].connect(ssid
, psk
=passphrase
, scan_freq
="2412")
4920 def test_dbus_connect_wpa_eap(dev
, apdev
):
4921 """D-Bus AddNetwork and connection with WPA+WPA2-Enterprise AP"""
4922 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
4923 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
4925 ssid
= "test-wpa-eap"
4926 params
= hostapd
.wpa_eap_params(ssid
=ssid
)
4928 params
["rsn_pairwise"] = "CCMP"
4929 hapd
= hostapd
.add_ap(apdev
[0]['ifname'], params
)
4931 class TestDbusConnect(TestDbus
):
4932 def __init__(self
, bus
):
4933 TestDbus
.__init
__(self
, bus
)
4936 def __enter__(self
):
4937 gobject
.timeout_add(1, self
.run_connect
)
4938 gobject
.timeout_add(15000, self
.timeout
)
4939 self
.add_signal(self
.propertiesChanged
, WPAS_DBUS_IFACE
,
4940 "PropertiesChanged")
4941 self
.add_signal(self
.eap
, WPAS_DBUS_IFACE
, "EAP")
4945 def propertiesChanged(self
, properties
):
4946 logger
.debug("propertiesChanged: %s" % str(properties
))
4947 if 'State' in properties
and properties
['State'] == "completed":
4951 def eap(self
, status
, parameter
):
4952 logger
.debug("EAP: status=%s parameter=%s" % (status
, parameter
))
4954 def run_connect(self
, *args
):
4955 logger
.debug("run_connect")
4956 args
= dbus
.Dictionary({ 'ssid': ssid
,
4957 'key_mgmt': 'WPA-EAP',
4960 'password': 'password',
4961 'ca_cert': 'auth_serv/ca.pem',
4962 'phase2': 'auth=MSCHAPV2',
4963 'scan_freq': 2412 },
4965 self
.netw
= iface
.AddNetwork(args
)
4966 iface
.SelectNetwork(self
.netw
)
4972 with
TestDbusConnect(bus
) as t
:
4974 raise Exception("Expected signals not seen")
4976 def test_dbus_ap_scan_2_ap_mode_scan(dev
, apdev
):
4977 """AP_SCAN 2 AP mode and D-Bus Scan()"""
4979 _test_dbus_ap_scan_2_ap_mode_scan(dev
, apdev
)
4981 dev
[0].request("AP_SCAN 1")
4983 def _test_dbus_ap_scan_2_ap_mode_scan(dev
, apdev
):
4984 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
4985 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
4987 if "OK" not in dev
[0].request("AP_SCAN 2"):
4988 raise Exception("Failed to set AP_SCAN 2")
4990 id = dev
[0].add_network()
4991 dev
[0].set_network(id, "mode", "2")
4992 dev
[0].set_network_quoted(id, "ssid", "wpas-ap-open")
4993 dev
[0].set_network(id, "key_mgmt", "NONE")
4994 dev
[0].set_network(id, "frequency", "2412")
4995 dev
[0].set_network(id, "scan_freq", "2412")
4996 dev
[0].set_network(id, "disabled", "0")
4997 dev
[0].select_network(id)
4998 ev
= dev
[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout
=5)
5000 raise Exception("AP failed to start")
5002 with
fail_test(dev
[0], 1, "wpa_driver_nl80211_scan"):
5003 iface
.Scan({'Type': 'active',
5005 'Channels': [(dbus
.UInt32(2412), dbus
.UInt32(20))]})
5006 ev
= dev
[0].wait_event(["CTRL-EVENT-SCAN-FAILED",
5007 "AP-DISABLED"], timeout
=5)
5009 raise Exception("CTRL-EVENT-SCAN-FAILED not seen")
5010 if "AP-DISABLED" in ev
:
5011 raise Exception("Unexpected AP-DISABLED event")
5013 # Wait for the retry to scan happen
5014 ev
= dev
[0].wait_event(["CTRL-EVENT-SCAN-FAILED",
5015 "AP-DISABLED"], timeout
=5)
5017 raise Exception("CTRL-EVENT-SCAN-FAILED not seen - retry")
5018 if "AP-DISABLED" in ev
:
5019 raise Exception("Unexpected AP-DISABLED event - retry")
5021 dev
[1].connect("wpas-ap-open", key_mgmt
="NONE", scan_freq
="2412")
5022 dev
[1].request("DISCONNECT")
5023 dev
[1].wait_disconnected()
5024 dev
[0].request("DISCONNECT")
5025 dev
[0].wait_disconnected()
5027 def test_dbus_expectdisconnect(dev
, apdev
):
5028 """D-Bus ExpectDisconnect"""
5029 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
5030 wpas
= dbus
.Interface(wpas_obj
, WPAS_DBUS_SERVICE
)
5032 params
= { "ssid": "test-open" }
5033 hapd
= hostapd
.add_ap(apdev
[0]['ifname'], params
)
5034 dev
[0].connect("test-open", key_mgmt
="NONE", scan_freq
="2412")
5036 # This does not really verify the behavior other than by going through the
5037 # code path for additional coverage.
5038 wpas
.ExpectDisconnect()
5039 dev
[0].request("DISCONNECT")
5040 dev
[0].wait_disconnected()
5042 def test_dbus_save_config(dev
, apdev
):
5043 """D-Bus SaveConfig"""
5044 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
5045 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
5048 raise Exception("SaveConfig() accepted unexpectedly")
5049 except dbus
.exceptions
.DBusException
, e
:
5050 if not str(e
).startswith("fi.w1.wpa_supplicant1.UnknownError: Not allowed to update configuration"):
5051 raise Exception("Unexpected error message for SaveConfig(): " + str(e
))
5053 def test_dbus_vendor_elem(dev
, apdev
):
5054 """D-Bus vendor element operations"""
5056 _test_dbus_vendor_elem(dev
, apdev
)
5058 dev
[0].request("VENDOR_ELEM_REMOVE 1 *")
5060 def _test_dbus_vendor_elem(dev
, apdev
):
5061 (bus
,wpas_obj
,path
,if_obj
) = prepare_dbus(dev
[0])
5062 iface
= dbus
.Interface(if_obj
, WPAS_DBUS_IFACE
)
5064 dev
[0].request("VENDOR_ELEM_REMOVE 1 *")
5067 ie
= dbus
.ByteArray("\x00\x00")
5068 iface
.VendorElemAdd(-1, ie
)
5069 raise Exception("Invalid VendorElemAdd() accepted")
5070 except dbus
.exceptions
.DBusException
, e
:
5071 if "InvalidArgs" not in str(e
) or "Invalid ID" not in str(e
):
5072 raise Exception("Unexpected error message for invalid VendorElemAdd[1]: " + str(e
))
5075 ie
= dbus
.ByteArray("")
5076 iface
.VendorElemAdd(1, ie
)
5077 raise Exception("Invalid VendorElemAdd() accepted")
5078 except dbus
.exceptions
.DBusException
, e
:
5079 if "InvalidArgs" not in str(e
) or "Invalid value" not in str(e
):
5080 raise Exception("Unexpected error message for invalid VendorElemAdd[2]: " + str(e
))
5083 ie
= dbus
.ByteArray("\x00\x01")
5084 iface
.VendorElemAdd(1, ie
)
5085 raise Exception("Invalid VendorElemAdd() accepted")
5086 except dbus
.exceptions
.DBusException
, e
:
5087 if "InvalidArgs" not in str(e
) or "Parse error" not in str(e
):
5088 raise Exception("Unexpected error message for invalid VendorElemAdd[3]: " + str(e
))
5091 iface
.VendorElemGet(-1)
5092 raise Exception("Invalid VendorElemGet() accepted")
5093 except dbus
.exceptions
.DBusException
, e
:
5094 if "InvalidArgs" not in str(e
) or "Invalid ID" not in str(e
):
5095 raise Exception("Unexpected error message for invalid VendorElemGet[1]: " + str(e
))
5098 iface
.VendorElemGet(1)
5099 raise Exception("Invalid VendorElemGet() accepted")
5100 except dbus
.exceptions
.DBusException
, e
:
5101 if "InvalidArgs" not in str(e
) or "ID value does not exist" not in str(e
):
5102 raise Exception("Unexpected error message for invalid VendorElemGet[2]: " + str(e
))
5105 ie
= dbus
.ByteArray("\x00\x00")
5106 iface
.VendorElemRem(-1, ie
)
5107 raise Exception("Invalid VendorElemRemove() accepted")
5108 except dbus
.exceptions
.DBusException
, e
:
5109 if "InvalidArgs" not in str(e
) or "Invalid ID" not in str(e
):
5110 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e
))
5113 ie
= dbus
.ByteArray("")
5114 iface
.VendorElemRem(1, ie
)
5115 raise Exception("Invalid VendorElemRemove() accepted")
5116 except dbus
.exceptions
.DBusException
, e
:
5117 if "InvalidArgs" not in str(e
) or "Invalid value" not in str(e
):
5118 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e
))
5120 iface
.VendorElemRem(1, "*")
5122 ie
= dbus
.ByteArray("\x00\x01\x00")
5123 iface
.VendorElemAdd(1, ie
)
5125 val
= iface
.VendorElemGet(1)
5126 if len(val
) != len(ie
):
5127 raise Exception("Unexpected VendorElemGet length")
5128 for i
in range(len(val
)):
5129 if val
[i
] != dbus
.Byte(ie
[i
]):
5130 raise Exception("Unexpected VendorElemGet data")
5132 ie2
= dbus
.ByteArray("\xe0\x00")
5133 iface
.VendorElemAdd(1, ie2
)
5136 val
= iface
.VendorElemGet(1)
5137 if len(val
) != len(ies
):
5138 raise Exception("Unexpected VendorElemGet length[2]")
5139 for i
in range(len(val
)):
5140 if val
[i
] != dbus
.Byte(ies
[i
]):
5141 raise Exception("Unexpected VendorElemGet data[2]")
5144 test_ie
= dbus
.ByteArray("\x01\x01")
5145 iface
.VendorElemRem(1, test_ie
)
5146 raise Exception("Invalid VendorElemRemove() accepted")
5147 except dbus
.exceptions
.DBusException
, e
:
5148 if "InvalidArgs" not in str(e
) or "Parse error" not in str(e
):
5149 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e
))
5151 iface
.VendorElemRem(1, ie
)
5152 val
= iface
.VendorElemGet(1)
5153 if len(val
) != len(ie2
):
5154 raise Exception("Unexpected VendorElemGet length[3]")
5156 iface
.VendorElemRem(1, "*")
5158 iface
.VendorElemGet(1)
5159 raise Exception("Invalid VendorElemGet() accepted after removal")
5160 except dbus
.exceptions
.DBusException
, e
:
5161 if "InvalidArgs" not in str(e
) or "ID value does not exist" not in str(e
):
5162 raise Exception("Unexpected error message for invalid VendorElemGet after removal: " + str(e
))