]> git.ipfire.org Git - thirdparty/hostap.git/blob - tests/hwsim/test_ap_wps.py
tests: WPS PIN request file
[thirdparty/hostap.git] / tests / hwsim / test_ap_wps.py
1 # WPS tests
2 # Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6
7 import os
8 import time
9 import subprocess
10 import logging
11 logger = logging.getLogger()
12 import re
13 import socket
14 import httplib
15 import urlparse
16 import urllib
17 import xml.etree.ElementTree as ET
18 import StringIO
19
20 import hwsim_utils
21 import hostapd
22
23 def test_ap_wps_init(dev, apdev):
24 """Initial AP configuration with first WPS Enrollee"""
25 ssid = "test-wps"
26 hostapd.add_ap(apdev[0]['ifname'],
27 { "ssid": ssid, "eap_server": "1", "wps_state": "1" })
28 hapd = hostapd.Hostapd(apdev[0]['ifname'])
29 logger.info("WPS provisioning step")
30 hapd.request("WPS_PBC")
31 if "PBC Status: Active" not in hapd.request("WPS_GET_STATUS"):
32 raise Exception("PBC status not shown correctly")
33 dev[0].dump_monitor()
34 dev[0].request("WPS_PBC")
35 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
36 if ev is None:
37 raise Exception("Association with the AP timed out")
38 status = dev[0].get_status()
39 if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
40 raise Exception("Not fully connected")
41 if status['ssid'] != ssid:
42 raise Exception("Unexpected SSID")
43 if status['pairwise_cipher'] != 'CCMP':
44 raise Exception("Unexpected encryption configuration")
45 if status['key_mgmt'] != 'WPA2-PSK':
46 raise Exception("Unexpected key_mgmt")
47
48 status = hapd.request("WPS_GET_STATUS")
49 if "PBC Status: Disabled" not in status:
50 raise Exception("PBC status not shown correctly")
51 if "Last WPS result: Success" not in status:
52 raise Exception("Last WPS result not shown correctly")
53 if "Peer Address: " + dev[0].p2p_interface_addr() not in status:
54 raise Exception("Peer address not shown correctly")
55 conf = hapd.request("GET_CONFIG")
56 if "wps_state=configured" not in conf:
57 raise Exception("AP not in WPS configured state")
58 if "rsn_pairwise_cipher=CCMP TKIP" not in conf:
59 raise Exception("Unexpected rsn_pairwise_cipher")
60 if "wpa_pairwise_cipher=CCMP TKIP" not in conf:
61 raise Exception("Unexpected wpa_pairwise_cipher")
62 if "group_cipher=TKIP" not in conf:
63 raise Exception("Unexpected group_cipher")
64
65 def test_ap_wps_init_2ap_pbc(dev, apdev):
66 """Initial two-radio AP configuration with first WPS PBC Enrollee"""
67 ssid = "test-wps"
68 params = { "ssid": ssid, "eap_server": "1", "wps_state": "1" }
69 hostapd.add_ap(apdev[0]['ifname'], params)
70 hostapd.add_ap(apdev[1]['ifname'], params)
71 hapd = hostapd.Hostapd(apdev[0]['ifname'])
72 logger.info("WPS provisioning step")
73 hapd.request("WPS_PBC")
74 dev[0].scan(freq="2412")
75 bss = dev[0].get_bss(apdev[0]['bssid'])
76 if "[WPS-PBC]" not in bss['flags']:
77 raise Exception("WPS-PBC flag missing from AP1")
78 bss = dev[0].get_bss(apdev[1]['bssid'])
79 if "[WPS-PBC]" not in bss['flags']:
80 raise Exception("WPS-PBC flag missing from AP2")
81 dev[0].dump_monitor()
82 dev[0].request("WPS_PBC")
83 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
84 if ev is None:
85 raise Exception("Association with the AP timed out")
86
87 dev[1].scan(freq="2412")
88 bss = dev[1].get_bss(apdev[0]['bssid'])
89 if "[WPS-PBC]" in bss['flags']:
90 raise Exception("WPS-PBC flag not cleared from AP1")
91 bss = dev[1].get_bss(apdev[1]['bssid'])
92 if "[WPS-PBC]" in bss['flags']:
93 raise Exception("WPS-PBC flag bit ckeared from AP2")
94
95 def test_ap_wps_init_2ap_pin(dev, apdev):
96 """Initial two-radio AP configuration with first WPS PIN Enrollee"""
97 ssid = "test-wps"
98 params = { "ssid": ssid, "eap_server": "1", "wps_state": "1" }
99 hostapd.add_ap(apdev[0]['ifname'], params)
100 hostapd.add_ap(apdev[1]['ifname'], params)
101 hapd = hostapd.Hostapd(apdev[0]['ifname'])
102 logger.info("WPS provisioning step")
103 pin = dev[0].wps_read_pin()
104 hapd.request("WPS_PIN any " + pin)
105 dev[0].scan(freq="2412")
106 bss = dev[0].get_bss(apdev[0]['bssid'])
107 if "[WPS-AUTH]" not in bss['flags']:
108 raise Exception("WPS-AUTH flag missing from AP1")
109 bss = dev[0].get_bss(apdev[1]['bssid'])
110 if "[WPS-AUTH]" not in bss['flags']:
111 raise Exception("WPS-AUTH flag missing from AP2")
112 dev[0].dump_monitor()
113 dev[0].request("WPS_PIN any " + pin)
114 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
115 if ev is None:
116 raise Exception("Association with the AP timed out")
117
118 dev[1].scan(freq="2412")
119 bss = dev[1].get_bss(apdev[0]['bssid'])
120 if "[WPS-AUTH]" in bss['flags']:
121 raise Exception("WPS-AUTH flag not cleared from AP1")
122 bss = dev[1].get_bss(apdev[1]['bssid'])
123 if "[WPS-AUTH]" in bss['flags']:
124 raise Exception("WPS-AUTH flag bit ckeared from AP2")
125
126 def test_ap_wps_init_through_wps_config(dev, apdev):
127 """Initial AP configuration using wps_config command"""
128 ssid = "test-wps-init-config"
129 hostapd.add_ap(apdev[0]['ifname'],
130 { "ssid": ssid, "eap_server": "1", "wps_state": "1" })
131 hapd = hostapd.Hostapd(apdev[0]['ifname'])
132 if "FAIL" in hapd.request("WPS_CONFIG " + ssid.encode("hex") + " WPA2PSK CCMP " + "12345678".encode("hex")):
133 raise Exception("WPS_CONFIG command failed")
134 ev = hapd.wait_event(["WPS-NEW-AP-SETTINGS"], timeout=5)
135 if ev is None:
136 raise Exception("Timeout on WPS-NEW-AP-SETTINGS events")
137 # It takes some time for the AP to update Beacon and Probe Response frames,
138 # so wait here before requesting the scan to be started to avoid adding
139 # extra five second wait to the test due to fetching obsolete scan results.
140 hapd.ping()
141 time.sleep(0.2)
142 dev[0].connect(ssid, psk="12345678", scan_freq="2412", proto="WPA2",
143 pairwise="CCMP", group="CCMP")
144
145 def test_ap_wps_conf(dev, apdev):
146 """WPS PBC provisioning with configured AP"""
147 ssid = "test-wps-conf"
148 hostapd.add_ap(apdev[0]['ifname'],
149 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
150 "wpa_passphrase": "12345678", "wpa": "2",
151 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
152 hapd = hostapd.Hostapd(apdev[0]['ifname'])
153 logger.info("WPS provisioning step")
154 hapd.request("WPS_PBC")
155 dev[0].dump_monitor()
156 dev[0].request("WPS_PBC")
157 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
158 if ev is None:
159 raise Exception("Association with the AP timed out")
160 status = dev[0].get_status()
161 if status['wpa_state'] != 'COMPLETED':
162 raise Exception("Not fully connected")
163 if status['bssid'] != apdev[0]['bssid']:
164 raise Exception("Unexpected BSSID")
165 if status['ssid'] != ssid:
166 raise Exception("Unexpected SSID")
167 if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
168 raise Exception("Unexpected encryption configuration")
169 if status['key_mgmt'] != 'WPA2-PSK':
170 raise Exception("Unexpected key_mgmt")
171
172 sta = hapd.get_sta(dev[0].p2p_interface_addr())
173 if 'wpsDeviceName' not in sta or sta['wpsDeviceName'] != "Device A":
174 raise Exception("Device name not available in STA command")
175
176 def test_ap_wps_twice(dev, apdev):
177 """WPS provisioning with twice to change passphrase"""
178 ssid = "test-wps-twice"
179 params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
180 "wpa_passphrase": "12345678", "wpa": "2",
181 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" }
182 hostapd.add_ap(apdev[0]['ifname'], params)
183 hapd = hostapd.Hostapd(apdev[0]['ifname'])
184 logger.info("WPS provisioning step")
185 hapd.request("WPS_PBC")
186 dev[0].dump_monitor()
187 dev[0].request("WPS_PBC")
188 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
189 if ev is None:
190 raise Exception("Association with the AP timed out")
191 dev[0].request("DISCONNECT")
192
193 logger.info("Restart AP with different passphrase and re-run WPS")
194 hapd_global = hostapd.HostapdGlobal()
195 hapd_global.remove(apdev[0]['ifname'])
196 params['wpa_passphrase'] = 'another passphrase'
197 hostapd.add_ap(apdev[0]['ifname'], params)
198 hapd = hostapd.Hostapd(apdev[0]['ifname'])
199 logger.info("WPS provisioning step")
200 hapd.request("WPS_PBC")
201 dev[0].dump_monitor()
202 dev[0].request("WPS_PBC")
203 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
204 if ev is None:
205 raise Exception("Association with the AP timed out")
206 networks = dev[0].list_networks()
207 if len(networks) > 1:
208 raise Exception("Unexpected duplicated network block present")
209
210 def test_ap_wps_incorrect_pin(dev, apdev):
211 """WPS PIN provisioning with incorrect PIN"""
212 ssid = "test-wps-incorrect-pin"
213 hostapd.add_ap(apdev[0]['ifname'],
214 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
215 "wpa_passphrase": "12345678", "wpa": "2",
216 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
217 hapd = hostapd.Hostapd(apdev[0]['ifname'])
218
219 logger.info("WPS provisioning attempt 1")
220 hapd.request("WPS_PIN any 12345670")
221 dev[0].dump_monitor()
222 dev[0].request("WPS_PIN any 55554444")
223 ev = dev[0].wait_event(["WPS-FAIL"], timeout=30)
224 if ev is None:
225 raise Exception("WPS operation timed out")
226 if "config_error=18" not in ev:
227 raise Exception("Incorrect config_error reported")
228 if "msg=8" not in ev:
229 raise Exception("PIN error detected on incorrect message")
230 ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
231 if ev is None:
232 raise Exception("Timeout on disconnection event")
233 dev[0].request("WPS_CANCEL")
234 # if a scan was in progress, wait for it to complete before trying WPS again
235 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 5)
236
237 status = hapd.request("WPS_GET_STATUS")
238 if "Last WPS result: Failed" not in status:
239 raise Exception("WPS failure result not shown correctly")
240
241 logger.info("WPS provisioning attempt 2")
242 hapd.request("WPS_PIN any 12345670")
243 dev[0].dump_monitor()
244 dev[0].request("WPS_PIN any 12344444")
245 ev = dev[0].wait_event(["WPS-FAIL"], timeout=30)
246 if ev is None:
247 raise Exception("WPS operation timed out")
248 if "config_error=18" not in ev:
249 raise Exception("Incorrect config_error reported")
250 if "msg=10" not in ev:
251 raise Exception("PIN error detected on incorrect message")
252 ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
253 if ev is None:
254 raise Exception("Timeout on disconnection event")
255
256 def test_ap_wps_conf_pin(dev, apdev):
257 """WPS PIN provisioning with configured AP"""
258 ssid = "test-wps-conf-pin"
259 hostapd.add_ap(apdev[0]['ifname'],
260 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
261 "wpa_passphrase": "12345678", "wpa": "2",
262 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
263 hapd = hostapd.Hostapd(apdev[0]['ifname'])
264 logger.info("WPS provisioning step")
265 pin = dev[0].wps_read_pin()
266 hapd.request("WPS_PIN any " + pin)
267 dev[0].dump_monitor()
268 dev[0].request("WPS_PIN any " + pin)
269 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
270 if ev is None:
271 raise Exception("Association with the AP timed out")
272 status = dev[0].get_status()
273 if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
274 raise Exception("Not fully connected")
275 if status['ssid'] != ssid:
276 raise Exception("Unexpected SSID")
277 if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
278 raise Exception("Unexpected encryption configuration")
279 if status['key_mgmt'] != 'WPA2-PSK':
280 raise Exception("Unexpected key_mgmt")
281
282 dev[1].scan(freq="2412")
283 bss = dev[1].get_bss(apdev[0]['bssid'])
284 if "[WPS-AUTH]" in bss['flags']:
285 raise Exception("WPS-AUTH flag not cleared")
286 logger.info("Try to connect from another station using the same PIN")
287 dev[1].request("WPS_PIN any " + pin)
288 ev = dev[1].wait_event(["WPS-M2D","CTRL-EVENT-CONNECTED"], timeout=30)
289 if ev is None:
290 raise Exception("Operation timed out")
291 if "WPS-M2D" not in ev:
292 raise Exception("Unexpected WPS operation started")
293
294 def test_ap_wps_conf_pin_2sta(dev, apdev):
295 """Two stations trying to use WPS PIN at the same time"""
296 ssid = "test-wps-conf-pin2"
297 hostapd.add_ap(apdev[0]['ifname'],
298 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
299 "wpa_passphrase": "12345678", "wpa": "2",
300 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
301 hapd = hostapd.Hostapd(apdev[0]['ifname'])
302 logger.info("WPS provisioning step")
303 pin = "12345670"
304 pin2 = "55554444"
305 hapd.request("WPS_PIN " + dev[0].get_status_field("uuid") + " " + pin)
306 hapd.request("WPS_PIN " + dev[1].get_status_field("uuid") + " " + pin)
307 dev[0].dump_monitor()
308 dev[1].dump_monitor()
309 dev[0].request("WPS_PIN any " + pin)
310 dev[1].request("WPS_PIN any " + pin)
311 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
312 if ev is None:
313 raise Exception("Association with the AP timed out")
314 ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
315 if ev is None:
316 raise Exception("Association with the AP timed out")
317
318 def test_ap_wps_reg_connect(dev, apdev):
319 """WPS registrar using AP PIN to connect"""
320 ssid = "test-wps-reg-ap-pin"
321 appin = "12345670"
322 hostapd.add_ap(apdev[0]['ifname'],
323 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
324 "wpa_passphrase": "12345678", "wpa": "2",
325 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
326 "ap_pin": appin})
327 logger.info("WPS provisioning step")
328 dev[0].dump_monitor()
329 dev[0].wps_reg(apdev[0]['bssid'], appin)
330 status = dev[0].get_status()
331 if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
332 raise Exception("Not fully connected")
333 if status['ssid'] != ssid:
334 raise Exception("Unexpected SSID")
335 if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
336 raise Exception("Unexpected encryption configuration")
337 if status['key_mgmt'] != 'WPA2-PSK':
338 raise Exception("Unexpected key_mgmt")
339
340 def check_wps_reg_failure(dev, ap, appin):
341 dev.request("WPS_REG " + ap['bssid'] + " " + appin)
342 ev = dev.wait_event(["WPS-SUCCESS", "WPS-FAIL"], timeout=15)
343 if ev is None:
344 raise Exception("WPS operation timed out")
345 if "WPS-SUCCESS" in ev:
346 raise Exception("WPS operation succeeded unexpectedly")
347 if "config_error=15" not in ev:
348 raise Exception("WPS setup locked state was not reported correctly")
349
350 def test_ap_wps_random_ap_pin(dev, apdev):
351 """WPS registrar using random AP PIN"""
352 ssid = "test-wps-reg-random-ap-pin"
353 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
354 hostapd.add_ap(apdev[0]['ifname'],
355 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
356 "wpa_passphrase": "12345678", "wpa": "2",
357 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
358 "device_name": "Wireless AP", "manufacturer": "Company",
359 "model_name": "WAP", "model_number": "123",
360 "serial_number": "12345", "device_type": "6-0050F204-1",
361 "os_version": "01020300",
362 "config_methods": "label push_button",
363 "uuid": ap_uuid, "upnp_iface": "lo" })
364 hapd = hostapd.Hostapd(apdev[0]['ifname'])
365 appin = hapd.request("WPS_AP_PIN random")
366 if "FAIL" in appin:
367 raise Exception("Could not generate random AP PIN")
368 if appin not in hapd.request("WPS_AP_PIN get"):
369 raise Exception("Could not fetch current AP PIN")
370 logger.info("WPS provisioning step")
371 dev[0].wps_reg(apdev[0]['bssid'], appin)
372
373 hapd.request("WPS_AP_PIN disable")
374 logger.info("WPS provisioning step with AP PIN disabled")
375 check_wps_reg_failure(dev[1], apdev[0], appin)
376
377 logger.info("WPS provisioning step with AP PIN reset")
378 appin = "12345670"
379 hapd.request("WPS_AP_PIN set " + appin)
380 dev[1].wps_reg(apdev[0]['bssid'], appin)
381 dev[0].request("REMOVE_NETWORK all")
382 dev[1].request("REMOVE_NETWORK all")
383 dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
384 dev[1].wait_event(["CTRL-EVENT-DISCONNECTED"])
385
386 logger.info("WPS provisioning step after AP PIN timeout")
387 hapd.request("WPS_AP_PIN disable")
388 appin = hapd.request("WPS_AP_PIN random 1")
389 time.sleep(1.1)
390 if "FAIL" not in hapd.request("WPS_AP_PIN get"):
391 raise Exception("AP PIN unexpectedly still enabled")
392 check_wps_reg_failure(dev[0], apdev[0], appin)
393
394 logger.info("WPS provisioning step after AP PIN timeout(2)")
395 hapd.request("WPS_AP_PIN disable")
396 appin = "12345670"
397 hapd.request("WPS_AP_PIN set " + appin + " 1")
398 time.sleep(1.1)
399 if "FAIL" not in hapd.request("WPS_AP_PIN get"):
400 raise Exception("AP PIN unexpectedly still enabled")
401 check_wps_reg_failure(dev[1], apdev[0], appin)
402
403 def test_ap_wps_reg_config(dev, apdev):
404 """WPS registrar configuring and AP using AP PIN"""
405 ssid = "test-wps-init-ap-pin"
406 appin = "12345670"
407 hostapd.add_ap(apdev[0]['ifname'],
408 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
409 "ap_pin": appin})
410 logger.info("WPS configuration step")
411 dev[0].dump_monitor()
412 new_ssid = "wps-new-ssid"
413 new_passphrase = "1234567890"
414 dev[0].wps_reg(apdev[0]['bssid'], appin, new_ssid, "WPA2PSK", "CCMP",
415 new_passphrase)
416 status = dev[0].get_status()
417 if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
418 raise Exception("Not fully connected")
419 if status['ssid'] != new_ssid:
420 raise Exception("Unexpected SSID")
421 if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
422 raise Exception("Unexpected encryption configuration")
423 if status['key_mgmt'] != 'WPA2-PSK':
424 raise Exception("Unexpected key_mgmt")
425
426 logger.info("Re-configure back to open")
427 dev[0].request("REMOVE_NETWORK all")
428 dev[0].request("BSS_FLUSH 0")
429 dev[0].request("SCAN freq=2412 only_new=1")
430 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
431 if ev is None:
432 raise Exception("Scan timed out")
433 dev[0].dump_monitor()
434 dev[0].wps_reg(apdev[0]['bssid'], appin, "wps-open", "OPEN", "NONE", "")
435 status = dev[0].get_status()
436 if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
437 raise Exception("Not fully connected")
438 if status['ssid'] != "wps-open":
439 raise Exception("Unexpected SSID")
440 if status['key_mgmt'] != 'NONE':
441 raise Exception("Unexpected key_mgmt")
442
443 def test_ap_wps_reg_config_tkip(dev, apdev):
444 """WPS registrar configuring AP to use TKIP and AP upgrading to TKIP+CCMP"""
445 ssid = "test-wps-init-ap"
446 appin = "12345670"
447 hostapd.add_ap(apdev[0]['ifname'],
448 { "ssid": ssid, "eap_server": "1", "wps_state": "1",
449 "ap_pin": appin})
450 logger.info("WPS configuration step")
451 dev[0].request("SET wps_version_number 0x10")
452 dev[0].dump_monitor()
453 new_ssid = "wps-new-ssid-with-tkip"
454 new_passphrase = "1234567890"
455 dev[0].wps_reg(apdev[0]['bssid'], appin, new_ssid, "WPAPSK", "TKIP",
456 new_passphrase)
457 logger.info("Re-connect to verify WPA2 mixed mode")
458 dev[0].request("DISCONNECT")
459 id = 0
460 dev[0].set_network(id, "pairwise", "CCMP")
461 dev[0].set_network(id, "proto", "RSN")
462 dev[0].connect_network(id)
463 status = dev[0].get_status()
464 if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
465 raise Exception("Not fully connected")
466 if status['ssid'] != new_ssid:
467 raise Exception("Unexpected SSID")
468 if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'TKIP':
469 raise Exception("Unexpected encryption configuration")
470 if status['key_mgmt'] != 'WPA2-PSK':
471 raise Exception("Unexpected key_mgmt")
472
473 def test_ap_wps_setup_locked(dev, apdev):
474 """WPS registrar locking up AP setup on AP PIN failures"""
475 ssid = "test-wps-incorrect-ap-pin"
476 appin = "12345670"
477 hostapd.add_ap(apdev[0]['ifname'],
478 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
479 "wpa_passphrase": "12345678", "wpa": "2",
480 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
481 "ap_pin": appin})
482 new_ssid = "wps-new-ssid-test"
483 new_passphrase = "1234567890"
484
485 ap_setup_locked=False
486 for pin in ["55554444", "1234", "12345678", "00000000", "11111111"]:
487 dev[0].dump_monitor()
488 logger.info("Try incorrect AP PIN - attempt " + pin)
489 dev[0].wps_reg(apdev[0]['bssid'], pin, new_ssid, "WPA2PSK",
490 "CCMP", new_passphrase, no_wait=True)
491 ev = dev[0].wait_event(["WPS-FAIL", "CTRL-EVENT-CONNECTED"])
492 if ev is None:
493 raise Exception("Timeout on receiving WPS operation failure event")
494 if "CTRL-EVENT-CONNECTED" in ev:
495 raise Exception("Unexpected connection")
496 if "config_error=15" in ev:
497 logger.info("AP Setup Locked")
498 ap_setup_locked=True
499 elif "config_error=18" not in ev:
500 raise Exception("config_error=18 not reported")
501 ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
502 if ev is None:
503 raise Exception("Timeout on disconnection event")
504 time.sleep(0.1)
505 if not ap_setup_locked:
506 raise Exception("AP setup was not locked")
507
508 hapd = hostapd.Hostapd(apdev[0]['ifname'])
509 status = hapd.request("WPS_GET_STATUS")
510 if "Last WPS result: Failed" not in status:
511 raise Exception("WPS failure result not shown correctly")
512 if "Peer Address: " + dev[0].p2p_interface_addr() not in status:
513 raise Exception("Peer address not shown correctly")
514
515 time.sleep(0.5)
516 dev[0].dump_monitor()
517 logger.info("WPS provisioning step")
518 pin = dev[0].wps_read_pin()
519 hapd = hostapd.Hostapd(apdev[0]['ifname'])
520 hapd.request("WPS_PIN any " + pin)
521 dev[0].request("WPS_PIN any " + pin)
522 ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=30)
523 if ev is None:
524 raise Exception("WPS success was not reported")
525 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
526 if ev is None:
527 raise Exception("Association with the AP timed out")
528
529 def test_ap_wps_pbc_overlap_2ap(dev, apdev):
530 """WPS PBC session overlap with two active APs"""
531 hostapd.add_ap(apdev[0]['ifname'],
532 { "ssid": "wps1", "eap_server": "1", "wps_state": "2",
533 "wpa_passphrase": "12345678", "wpa": "2",
534 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
535 "wps_independent": "1"})
536 hostapd.add_ap(apdev[1]['ifname'],
537 { "ssid": "wps2", "eap_server": "1", "wps_state": "2",
538 "wpa_passphrase": "123456789", "wpa": "2",
539 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
540 "wps_independent": "1"})
541 hapd = hostapd.Hostapd(apdev[0]['ifname'])
542 hapd.request("WPS_PBC")
543 hapd2 = hostapd.Hostapd(apdev[1]['ifname'])
544 hapd2.request("WPS_PBC")
545 logger.info("WPS provisioning step")
546 dev[0].dump_monitor()
547 dev[0].request("WPS_PBC")
548 ev = dev[0].wait_event(["WPS-OVERLAP-DETECTED"], timeout=15)
549 if ev is None:
550 raise Exception("PBC session overlap not detected")
551
552 def test_ap_wps_pbc_overlap_2sta(dev, apdev):
553 """WPS PBC session overlap with two active STAs"""
554 ssid = "test-wps-pbc-overlap"
555 hostapd.add_ap(apdev[0]['ifname'],
556 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
557 "wpa_passphrase": "12345678", "wpa": "2",
558 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
559 hapd = hostapd.Hostapd(apdev[0]['ifname'])
560 logger.info("WPS provisioning step")
561 hapd.request("WPS_PBC")
562 dev[0].dump_monitor()
563 dev[1].dump_monitor()
564 dev[0].request("WPS_PBC")
565 dev[1].request("WPS_PBC")
566 ev = dev[0].wait_event(["WPS-M2D"], timeout=15)
567 if ev is None:
568 raise Exception("PBC session overlap not detected (dev0)")
569 if "config_error=12" not in ev:
570 raise Exception("PBC session overlap not correctly reported (dev0)")
571 ev = dev[1].wait_event(["WPS-M2D"], timeout=15)
572 if ev is None:
573 raise Exception("PBC session overlap not detected (dev1)")
574 if "config_error=12" not in ev:
575 raise Exception("PBC session overlap not correctly reported (dev1)")
576 hapd.request("WPS_CANCEL")
577 ret = hapd.request("WPS_PBC")
578 if "FAIL" not in ret:
579 raise Exception("PBC mode allowed to be started while PBC overlap still active")
580
581 def test_ap_wps_cancel(dev, apdev):
582 """WPS AP cancelling enabled config method"""
583 ssid = "test-wps-ap-cancel"
584 hostapd.add_ap(apdev[0]['ifname'],
585 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
586 "wpa_passphrase": "12345678", "wpa": "2",
587 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" })
588 bssid = apdev[0]['bssid']
589 hapd = hostapd.Hostapd(apdev[0]['ifname'])
590
591 logger.info("Verify PBC enable/cancel")
592 hapd.request("WPS_PBC")
593 dev[0].scan(freq="2412")
594 bss = dev[0].get_bss(apdev[0]['bssid'])
595 if "[WPS-PBC]" not in bss['flags']:
596 raise Exception("WPS-PBC flag missing")
597 if "FAIL" in hapd.request("WPS_CANCEL"):
598 raise Exception("WPS_CANCEL failed")
599 dev[0].scan(freq="2412")
600 bss = dev[0].get_bss(apdev[0]['bssid'])
601 if "[WPS-PBC]" in bss['flags']:
602 raise Exception("WPS-PBC flag not cleared")
603
604 logger.info("Verify PIN enable/cancel")
605 hapd.request("WPS_PIN any 12345670")
606 dev[0].scan(freq="2412")
607 bss = dev[0].get_bss(apdev[0]['bssid'])
608 if "[WPS-AUTH]" not in bss['flags']:
609 raise Exception("WPS-AUTH flag missing")
610 if "FAIL" in hapd.request("WPS_CANCEL"):
611 raise Exception("WPS_CANCEL failed")
612 dev[0].scan(freq="2412")
613 bss = dev[0].get_bss(apdev[0]['bssid'])
614 if "[WPS-AUTH]" in bss['flags']:
615 raise Exception("WPS-AUTH flag not cleared")
616
617 def test_ap_wps_er_add_enrollee(dev, apdev):
618 """WPS ER configuring AP and adding a new enrollee using PIN"""
619 ssid = "wps-er-add-enrollee"
620 ap_pin = "12345670"
621 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
622 hostapd.add_ap(apdev[0]['ifname'],
623 { "ssid": ssid, "eap_server": "1", "wps_state": "1",
624 "device_name": "Wireless AP", "manufacturer": "Company",
625 "model_name": "WAP", "model_number": "123",
626 "serial_number": "12345", "device_type": "6-0050F204-1",
627 "os_version": "01020300",
628 "config_methods": "label push_button",
629 "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"})
630 logger.info("WPS configuration step")
631 new_passphrase = "1234567890"
632 dev[0].dump_monitor()
633 dev[0].wps_reg(apdev[0]['bssid'], ap_pin, ssid, "WPA2PSK", "CCMP",
634 new_passphrase)
635 status = dev[0].get_status()
636 if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
637 raise Exception("Not fully connected")
638 if status['ssid'] != ssid:
639 raise Exception("Unexpected SSID")
640 if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
641 raise Exception("Unexpected encryption configuration")
642 if status['key_mgmt'] != 'WPA2-PSK':
643 raise Exception("Unexpected key_mgmt")
644
645 logger.info("Start ER")
646 dev[0].request("WPS_ER_START ifname=lo")
647 ev = dev[0].wait_event(["WPS-ER-AP-ADD"], timeout=15)
648 if ev is None:
649 raise Exception("AP discovery timed out")
650 if ap_uuid not in ev:
651 raise Exception("Expected AP UUID not found")
652
653 logger.info("Learn AP configuration through UPnP")
654 dev[0].dump_monitor()
655 dev[0].request("WPS_ER_LEARN " + ap_uuid + " " + ap_pin)
656 ev = dev[0].wait_event(["WPS-ER-AP-SETTINGS"], timeout=15)
657 if ev is None:
658 raise Exception("AP learn timed out")
659 if ap_uuid not in ev:
660 raise Exception("Expected AP UUID not in settings")
661 if "ssid=" + ssid not in ev:
662 raise Exception("Expected SSID not in settings")
663 if "key=" + new_passphrase not in ev:
664 raise Exception("Expected passphrase not in settings")
665
666 logger.info("Add Enrollee using ER")
667 pin = dev[1].wps_read_pin()
668 dev[0].dump_monitor()
669 dev[0].request("WPS_ER_PIN any " + pin + " " + dev[1].p2p_interface_addr())
670 dev[1].dump_monitor()
671 dev[1].request("WPS_PIN any " + pin)
672 ev = dev[1].wait_event(["WPS-SUCCESS"], timeout=30)
673 if ev is None:
674 raise Exception("Enrollee did not report success")
675 ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
676 if ev is None:
677 raise Exception("Association with the AP timed out")
678 ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15)
679 if ev is None:
680 raise Exception("WPS ER did not report success")
681 hwsim_utils.test_connectivity_sta(dev[0], dev[1])
682
683 logger.info("Add a specific Enrollee using ER")
684 pin = dev[2].wps_read_pin()
685 addr2 = dev[2].p2p_interface_addr()
686 dev[0].dump_monitor()
687 dev[2].dump_monitor()
688 dev[2].request("WPS_PIN any " + pin)
689 ev = dev[0].wait_event(["WPS-ER-ENROLLEE-ADD"], timeout=10)
690 if ev is None:
691 raise Exception("Enrollee not seen")
692 if addr2 not in ev:
693 raise Exception("Unexpected Enrollee MAC address")
694 dev[0].request("WPS_ER_PIN " + addr2 + " " + pin + " " + addr2)
695 ev = dev[2].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
696 if ev is None:
697 raise Exception("Association with the AP timed out")
698 ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15)
699 if ev is None:
700 raise Exception("WPS ER did not report success")
701
702 logger.info("Verify registrar selection behavior")
703 dev[0].request("WPS_ER_PIN any " + pin + " " + dev[1].p2p_interface_addr())
704 dev[1].request("DISCONNECT")
705 dev[1].wait_event(["CTRL-EVENT-DISCONNECTED"])
706 dev[1].scan(freq="2412")
707 bss = dev[1].get_bss(apdev[0]['bssid'])
708 if "[WPS-AUTH]" not in bss['flags']:
709 raise Exception("WPS-AUTH flag missing")
710
711 logger.info("Stop ER")
712 dev[0].dump_monitor()
713 dev[0].request("WPS_ER_STOP")
714 ev = dev[0].wait_event(["WPS-ER-AP-REMOVE"])
715 if ev is None:
716 raise Exception("WPS ER unsubscription timed out")
717 # It takes some time for the UPnP UNSUBSCRIBE command to go through, so wait
718 # a bit before verifying that the scan results have change.
719 time.sleep(0.2)
720
721 dev[1].scan(freq="2412")
722 bss = dev[1].get_bss(apdev[0]['bssid'])
723 if "[WPS-AUTH]" in bss['flags']:
724 raise Exception("WPS-AUTH flag not removed")
725
726 def test_ap_wps_er_add_enrollee_pbc(dev, apdev):
727 """WPS ER connected to AP and adding a new enrollee using PBC"""
728 ssid = "wps-er-add-enrollee-pbc"
729 ap_pin = "12345670"
730 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
731 hostapd.add_ap(apdev[0]['ifname'],
732 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
733 "wpa_passphrase": "12345678", "wpa": "2",
734 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
735 "device_name": "Wireless AP", "manufacturer": "Company",
736 "model_name": "WAP", "model_number": "123",
737 "serial_number": "12345", "device_type": "6-0050F204-1",
738 "os_version": "01020300",
739 "config_methods": "label push_button",
740 "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"})
741 logger.info("Learn AP configuration")
742 dev[0].dump_monitor()
743 dev[0].wps_reg(apdev[0]['bssid'], ap_pin)
744 status = dev[0].get_status()
745 if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
746 raise Exception("Not fully connected")
747
748 logger.info("Start ER")
749 dev[0].request("WPS_ER_START ifname=lo")
750 ev = dev[0].wait_event(["WPS-ER-AP-ADD"], timeout=15)
751 if ev is None:
752 raise Exception("AP discovery timed out")
753 if ap_uuid not in ev:
754 raise Exception("Expected AP UUID not found")
755
756 logger.info("Use learned network configuration on ER")
757 dev[0].request("WPS_ER_SET_CONFIG " + ap_uuid + " 0")
758
759 logger.info("Add Enrollee using ER and PBC")
760 dev[0].dump_monitor()
761 enrollee = dev[1].p2p_interface_addr()
762 dev[1].dump_monitor()
763 dev[1].request("WPS_PBC")
764
765 for i in range(0, 2):
766 ev = dev[0].wait_event(["WPS-ER-ENROLLEE-ADD"], timeout=15)
767 if ev is None:
768 raise Exception("Enrollee discovery timed out")
769 if enrollee in ev:
770 break
771 if i == 1:
772 raise Exception("Expected Enrollee not found")
773 dev[0].request("WPS_ER_PBC " + enrollee)
774
775 ev = dev[1].wait_event(["WPS-SUCCESS"], timeout=15)
776 if ev is None:
777 raise Exception("Enrollee did not report success")
778 ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
779 if ev is None:
780 raise Exception("Association with the AP timed out")
781 ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15)
782 if ev is None:
783 raise Exception("WPS ER did not report success")
784 hwsim_utils.test_connectivity_sta(dev[0], dev[1])
785
786 # verify BSSID selection of the AP instead of UUID
787 if "FAIL" in dev[0].request("WPS_ER_SET_CONFIG " + apdev[0]['bssid'] + " 0"):
788 raise Exception("Could not select AP based on BSSID")
789
790 def test_ap_wps_er_v10_add_enrollee_pin(dev, apdev):
791 """WPS v1.0 ER connected to AP and adding a new enrollee using PIN"""
792 ssid = "wps-er-add-enrollee-pbc"
793 ap_pin = "12345670"
794 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
795 hostapd.add_ap(apdev[0]['ifname'],
796 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
797 "wpa_passphrase": "12345678", "wpa": "2",
798 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
799 "device_name": "Wireless AP", "manufacturer": "Company",
800 "model_name": "WAP", "model_number": "123",
801 "serial_number": "12345", "device_type": "6-0050F204-1",
802 "os_version": "01020300",
803 "config_methods": "label push_button",
804 "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"})
805 logger.info("Learn AP configuration")
806 dev[0].request("SET wps_version_number 0x10")
807 dev[0].dump_monitor()
808 dev[0].wps_reg(apdev[0]['bssid'], ap_pin)
809 status = dev[0].get_status()
810 if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
811 raise Exception("Not fully connected")
812
813 logger.info("Start ER")
814 dev[0].request("WPS_ER_START ifname=lo")
815 ev = dev[0].wait_event(["WPS-ER-AP-ADD"], timeout=15)
816 if ev is None:
817 raise Exception("AP discovery timed out")
818 if ap_uuid not in ev:
819 raise Exception("Expected AP UUID not found")
820
821 logger.info("Use learned network configuration on ER")
822 dev[0].request("WPS_ER_SET_CONFIG " + ap_uuid + " 0")
823
824 logger.info("Add Enrollee using ER and PIN")
825 enrollee = dev[1].p2p_interface_addr()
826 pin = dev[1].wps_read_pin()
827 dev[0].dump_monitor()
828 dev[0].request("WPS_ER_PIN any " + pin + " " + enrollee)
829 dev[1].dump_monitor()
830 dev[1].request("WPS_PIN any " + pin)
831 ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
832 if ev is None:
833 raise Exception("Association with the AP timed out")
834 ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15)
835 if ev is None:
836 raise Exception("WPS ER did not report success")
837
838 def test_ap_wps_er_config_ap(dev, apdev):
839 """WPS ER configuring AP over UPnP"""
840 ssid = "wps-er-ap-config"
841 ap_pin = "12345670"
842 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
843 hostapd.add_ap(apdev[0]['ifname'],
844 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
845 "wpa_passphrase": "12345678", "wpa": "2",
846 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
847 "device_name": "Wireless AP", "manufacturer": "Company",
848 "model_name": "WAP", "model_number": "123",
849 "serial_number": "12345", "device_type": "6-0050F204-1",
850 "os_version": "01020300",
851 "config_methods": "label push_button",
852 "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"})
853
854 logger.info("Connect ER to the AP")
855 dev[0].connect(ssid, psk="12345678", scan_freq="2412")
856
857 logger.info("WPS configuration step")
858 dev[0].request("WPS_ER_START ifname=lo")
859 ev = dev[0].wait_event(["WPS-ER-AP-ADD"], timeout=15)
860 if ev is None:
861 raise Exception("AP discovery timed out")
862 if ap_uuid not in ev:
863 raise Exception("Expected AP UUID not found")
864 new_passphrase = "1234567890"
865 dev[0].request("WPS_ER_CONFIG " + apdev[0]['bssid'] + " " + ap_pin + " " +
866 ssid.encode("hex") + " WPA2PSK CCMP " +
867 new_passphrase.encode("hex"))
868 ev = dev[0].wait_event(["WPS-SUCCESS"])
869 if ev is None:
870 raise Exception("WPS ER configuration operation timed out")
871 dev[1].wait_event(["CTRL-EVENT-DISCONNECTED"])
872 dev[0].connect(ssid, psk="1234567890", scan_freq="2412")
873
874 def test_ap_wps_fragmentation(dev, apdev):
875 """WPS with fragmentation in EAP-WSC and mixed mode WPA+WPA2"""
876 ssid = "test-wps-fragmentation"
877 hostapd.add_ap(apdev[0]['ifname'],
878 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
879 "wpa_passphrase": "12345678", "wpa": "3",
880 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
881 "wpa_pairwise": "TKIP",
882 "fragment_size": "50" })
883 hapd = hostapd.Hostapd(apdev[0]['ifname'])
884 logger.info("WPS provisioning step")
885 hapd.request("WPS_PBC")
886 dev[0].dump_monitor()
887 dev[0].request("SET wps_fragment_size 50")
888 dev[0].request("WPS_PBC")
889 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
890 if ev is None:
891 raise Exception("Association with the AP timed out")
892 status = dev[0].get_status()
893 if status['wpa_state'] != 'COMPLETED':
894 raise Exception("Not fully connected")
895 if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'TKIP':
896 raise Exception("Unexpected encryption configuration")
897 if status['key_mgmt'] != 'WPA2-PSK':
898 raise Exception("Unexpected key_mgmt")
899
900 def test_ap_wps_new_version_sta(dev, apdev):
901 """WPS compatibility with new version number on the station"""
902 ssid = "test-wps-ver"
903 hostapd.add_ap(apdev[0]['ifname'],
904 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
905 "wpa_passphrase": "12345678", "wpa": "2",
906 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" })
907 hapd = hostapd.Hostapd(apdev[0]['ifname'])
908 logger.info("WPS provisioning step")
909 hapd.request("WPS_PBC")
910 dev[0].dump_monitor()
911 dev[0].request("SET wps_version_number 0x43")
912 dev[0].request("SET wps_vendor_ext_m1 000137100100020001")
913 dev[0].request("WPS_PBC")
914 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
915 if ev is None:
916 raise Exception("Association with the AP timed out")
917
918 def test_ap_wps_new_version_ap(dev, apdev):
919 """WPS compatibility with new version number on the AP"""
920 ssid = "test-wps-ver"
921 hostapd.add_ap(apdev[0]['ifname'],
922 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
923 "wpa_passphrase": "12345678", "wpa": "2",
924 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" })
925 hapd = hostapd.Hostapd(apdev[0]['ifname'])
926 logger.info("WPS provisioning step")
927 if "FAIL" in hapd.request("SET wps_version_number 0x43"):
928 raise Exception("Failed to enable test functionality")
929 hapd.request("WPS_PBC")
930 dev[0].dump_monitor()
931 dev[0].request("WPS_PBC")
932 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
933 hapd.request("SET wps_version_number 0x20")
934 if ev is None:
935 raise Exception("Association with the AP timed out")
936
937 def test_ap_wps_check_pin(dev, apdev):
938 """Verify PIN checking through control interface"""
939 hostapd.add_ap(apdev[0]['ifname'],
940 { "ssid": "wps", "eap_server": "1", "wps_state": "2",
941 "wpa_passphrase": "12345678", "wpa": "2",
942 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" })
943 hapd = hostapd.Hostapd(apdev[0]['ifname'])
944 for t in [ ("12345670", "12345670"),
945 ("12345678", "FAIL-CHECKSUM"),
946 ("1234-5670", "12345670"),
947 ("1234 5670", "12345670"),
948 ("1-2.3:4 5670", "12345670") ]:
949 res = hapd.request("WPS_CHECK_PIN " + t[0]).rstrip('\n')
950 res2 = dev[0].request("WPS_CHECK_PIN " + t[0]).rstrip('\n')
951 if res != res2:
952 raise Exception("Unexpected difference in WPS_CHECK_PIN responses")
953 if res != t[1]:
954 raise Exception("Incorrect WPS_CHECK_PIN response {} (expected {})".format(res, t[1]))
955
956 def test_ap_wps_wep_config(dev, apdev):
957 """WPS 2.0 AP rejecting WEP configuration"""
958 ssid = "test-wps-config"
959 appin = "12345670"
960 hostapd.add_ap(apdev[0]['ifname'],
961 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
962 "ap_pin": appin})
963 hapd = hostapd.Hostapd(apdev[0]['ifname'])
964 dev[0].wps_reg(apdev[0]['bssid'], appin, "wps-new-ssid-wep", "OPEN", "WEP",
965 "hello", no_wait=True)
966 ev = hapd.wait_event(["WPS-FAIL"], timeout=15)
967 if ev is None:
968 raise Exception("WPS-FAIL timed out")
969 if "reason=2" not in ev:
970 raise Exception("Unexpected reason code in WPS-FAIL")
971 status = hapd.request("WPS_GET_STATUS")
972 if "Last WPS result: Failed" not in status:
973 raise Exception("WPS failure result not shown correctly")
974 if "Failure Reason: WEP Prohibited" not in status:
975 raise Exception("Failure reason not reported correctly")
976 if "Peer Address: " + dev[0].p2p_interface_addr() not in status:
977 raise Exception("Peer address not shown correctly")
978
979 def test_ap_wps_wep_enroll(dev, apdev):
980 """WPS 2.0 STA rejecting WEP configuration"""
981 ssid = "test-wps-wep"
982 hostapd.add_ap(apdev[0]['ifname'],
983 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
984 "skip_cred_build": "1", "extra_cred": "wps-wep-cred" })
985 hapd = hostapd.Hostapd(apdev[0]['ifname'])
986 hapd.request("WPS_PBC")
987 dev[0].request("WPS_PBC")
988 ev = dev[0].wait_event(["WPS-FAIL"], timeout=15)
989 if ev is None:
990 raise Exception("WPS-FAIL event timed out")
991 if "msg=12" not in ev or "reason=2 (WEP Prohibited)" not in ev:
992 raise Exception("Unexpected WPS-FAIL event: " + ev)
993
994 def test_ap_wps_ie_fragmentation(dev, apdev):
995 """WPS AP using fragmented WPS IE"""
996 ssid = "test-wps-ie-fragmentation"
997 params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
998 "wpa_passphrase": "12345678", "wpa": "2",
999 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
1000 "device_name": "1234567890abcdef1234567890abcdef",
1001 "manufacturer": "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef",
1002 "model_name": "1234567890abcdef1234567890abcdef",
1003 "model_number": "1234567890abcdef1234567890abcdef",
1004 "serial_number": "1234567890abcdef1234567890abcdef" }
1005 hostapd.add_ap(apdev[0]['ifname'], params)
1006 hapd = hostapd.Hostapd(apdev[0]['ifname'])
1007 hapd.request("WPS_PBC")
1008 dev[0].request("WPS_PBC")
1009 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
1010 if ev is None:
1011 raise Exception("Association with the AP timed out")
1012 bss = dev[0].get_bss(apdev[0]['bssid'])
1013 if "wps_device_name" not in bss or bss['wps_device_name'] != "1234567890abcdef1234567890abcdef":
1014 raise Exception("Device Name not received correctly")
1015 if len(re.findall("dd..0050f204", bss['ie'])) != 2:
1016 raise Exception("Unexpected number of WPS IEs")
1017
1018 def get_psk(pskfile):
1019 psks = {}
1020 with open(pskfile, "r") as f:
1021 lines = f.read().splitlines()
1022 for l in lines:
1023 if l == "# WPA PSKs":
1024 continue
1025 (addr,psk) = l.split(' ')
1026 psks[addr] = psk
1027 return psks
1028
1029 def test_ap_wps_per_station_psk(dev, apdev):
1030 """WPS PBC provisioning with per-station PSK"""
1031 addr0 = dev[0].p2p_dev_addr()
1032 addr1 = dev[1].p2p_dev_addr()
1033 addr2 = dev[2].p2p_dev_addr()
1034 ssid = "wps"
1035 appin = "12345670"
1036 pskfile = "/tmp/ap_wps_per_enrollee_psk.psk_file"
1037 try:
1038 os.remove(pskfile)
1039 except:
1040 pass
1041
1042 try:
1043 with open(pskfile, "w") as f:
1044 f.write("# WPA PSKs\n")
1045
1046 params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1047 "wpa": "2", "wpa_key_mgmt": "WPA-PSK",
1048 "rsn_pairwise": "CCMP", "ap_pin": appin,
1049 "wpa_psk_file": pskfile }
1050 hapd = hostapd.add_ap(apdev[0]['ifname'], params)
1051
1052 logger.info("First enrollee")
1053 hapd.request("WPS_PBC")
1054 dev[0].request("WPS_PBC")
1055 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"])
1056 if ev is None:
1057 raise Exception("Association with the AP timed out (1)")
1058
1059 logger.info("Second enrollee")
1060 hapd.request("WPS_PBC")
1061 dev[1].request("WPS_PBC")
1062 ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"])
1063 if ev is None:
1064 raise Exception("Association with the AP timed out (2)")
1065
1066 logger.info("External registrar")
1067 dev[2].wps_reg(apdev[0]['bssid'], appin)
1068
1069 logger.info("Verifying PSK results")
1070 psks = get_psk(pskfile)
1071 if addr0 not in psks:
1072 raise Exception("No PSK recorded for sta0")
1073 if addr1 not in psks:
1074 raise Exception("No PSK recorded for sta1")
1075 if addr2 not in psks:
1076 raise Exception("No PSK recorded for sta2")
1077 if psks[addr0] == psks[addr1]:
1078 raise Exception("Same PSK recorded for sta0 and sta1")
1079 if psks[addr0] == psks[addr2]:
1080 raise Exception("Same PSK recorded for sta0 and sta2")
1081 if psks[addr1] == psks[addr2]:
1082 raise Exception("Same PSK recorded for sta1 and sta2")
1083
1084 dev[0].request("REMOVE_NETWORK all")
1085 logger.info("Second external registrar")
1086 dev[0].wps_reg(apdev[0]['bssid'], appin)
1087 psks2 = get_psk(pskfile)
1088 if addr0 not in psks2:
1089 raise Exception("No PSK recorded for sta0(reg)")
1090 if psks[addr0] == psks2[addr0]:
1091 raise Exception("Same PSK recorded for sta0(enrollee) and sta0(reg)")
1092 finally:
1093 os.remove(pskfile)
1094
1095 def test_ap_wps_pin_request_file(dev, apdev):
1096 """WPS PIN provisioning with configured AP"""
1097 ssid = "wps"
1098 pinfile = "/tmp/ap_wps_pin_request_file.log"
1099 if os.path.exists(pinfile):
1100 subprocess.call(['sudo', 'rm', pinfile])
1101 hostapd.add_ap(apdev[0]['ifname'],
1102 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1103 "wps_pin_requests": pinfile,
1104 "wpa_passphrase": "12345678", "wpa": "2",
1105 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
1106 hapd = hostapd.Hostapd(apdev[0]['ifname'])
1107 uuid = dev[0].get_status_field("uuid")
1108 pin = dev[0].wps_read_pin()
1109 try:
1110 dev[0].request("WPS_PIN any " + pin)
1111 ev = hapd.wait_event(["WPS-PIN-NEEDED"], timeout=15)
1112 if ev is None:
1113 raise Exception("PIN needed event not shown")
1114 if uuid not in ev:
1115 raise Exception("UUID mismatch")
1116 dev[0].request("WPS_CANCEL")
1117 success = False
1118 with open(pinfile, "r") as f:
1119 lines = f.readlines()
1120 for l in lines:
1121 if uuid in l:
1122 success = True
1123 break
1124 if not success:
1125 raise Exception("PIN request entry not in the log file")
1126 finally:
1127 subprocess.call(['sudo', 'rm', pinfile])
1128
1129 def add_ssdp_ap(ifname, ap_uuid):
1130 ssid = "wps-ssdp"
1131 ap_pin = "12345670"
1132 hostapd.add_ap(ifname,
1133 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1134 "wpa_passphrase": "12345678", "wpa": "2",
1135 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
1136 "device_name": "Wireless AP", "manufacturer": "Company",
1137 "model_name": "WAP", "model_number": "123",
1138 "serial_number": "12345", "device_type": "6-0050F204-1",
1139 "os_version": "01020300",
1140 "config_methods": "label push_button",
1141 "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo",
1142 "friendly_name": "WPS Access Point",
1143 "manufacturer_url": "http://www.example.com/",
1144 "model_description": "Wireless Access Point",
1145 "model_url": "http://www.example.com/model/",
1146 "upc": "123456789012" })
1147
1148 def ssdp_send(msg, no_recv=False):
1149 socket.setdefaulttimeout(1)
1150 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
1151 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1152 sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
1153 sock.bind(("127.0.0.1", 0))
1154 sock.sendto(msg, ("239.255.255.250", 1900))
1155 if no_recv:
1156 return None
1157 return sock.recv(1000)
1158
1159 def ssdp_send_msearch(st):
1160 msg = '\r\n'.join([
1161 'M-SEARCH * HTTP/1.1',
1162 'HOST: 239.255.255.250:1900',
1163 'MX: 1',
1164 'MAN: "ssdp:discover"',
1165 'ST: ' + st,
1166 '', ''])
1167 return ssdp_send(msg)
1168
1169 def test_ap_wps_ssdp_msearch(dev, apdev):
1170 """WPS AP and SSDP M-SEARCH messages"""
1171 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1172 add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1173
1174 msg = '\r\n'.join([
1175 'M-SEARCH * HTTP/1.1',
1176 'Host: 239.255.255.250:1900',
1177 'Mx: 1',
1178 'Man: "ssdp:discover"',
1179 'St: urn:schemas-wifialliance-org:device:WFADevice:1',
1180 '', ''])
1181 ssdp_send(msg)
1182
1183 msg = '\r\n'.join([
1184 'M-SEARCH * HTTP/1.1',
1185 'host:\t239.255.255.250:1900\t\t\t\t \t\t',
1186 'mx: \t1\t\t ',
1187 'man: \t \t "ssdp:discover" ',
1188 'st: urn:schemas-wifialliance-org:device:WFADevice:1\t\t',
1189 '', ''])
1190 ssdp_send(msg)
1191
1192 ssdp_send_msearch("ssdp:all")
1193 ssdp_send_msearch("upnp:rootdevice")
1194 ssdp_send_msearch("uuid:" + ap_uuid)
1195 ssdp_send_msearch("urn:schemas-wifialliance-org:service:WFAWLANConfig:1")
1196 ssdp_send_msearch("urn:schemas-wifialliance-org:device:WFADevice:1");
1197
1198 msg = '\r\n'.join([
1199 'M-SEARCH * HTTP/1.1',
1200 'HOST:\t239.255.255.250:1900',
1201 'MAN: "ssdp:discover"',
1202 'MX: 130',
1203 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1204 '', ''])
1205 ssdp_send(msg, no_recv=True)
1206
1207 def test_ap_wps_ssdp_invalid_msearch(dev, apdev):
1208 """WPS AP and invalid SSDP M-SEARCH messages"""
1209 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1210 add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1211
1212 socket.setdefaulttimeout(1)
1213 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
1214 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1215 sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
1216 sock.bind(("127.0.0.1", 0))
1217
1218 logger.debug("Missing MX")
1219 msg = '\r\n'.join([
1220 'M-SEARCH * HTTP/1.1',
1221 'HOST: 239.255.255.250:1900',
1222 'MAN: "ssdp:discover"',
1223 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1224 '', ''])
1225 sock.sendto(msg, ("239.255.255.250", 1900))
1226
1227 logger.debug("Negative MX")
1228 msg = '\r\n'.join([
1229 'M-SEARCH * HTTP/1.1',
1230 'HOST: 239.255.255.250:1900',
1231 'MX: -1',
1232 'MAN: "ssdp:discover"',
1233 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1234 '', ''])
1235 sock.sendto(msg, ("239.255.255.250", 1900))
1236
1237 logger.debug("Invalid MX")
1238 msg = '\r\n'.join([
1239 'M-SEARCH * HTTP/1.1',
1240 'HOST: 239.255.255.250:1900',
1241 'MX; 1',
1242 'MAN: "ssdp:discover"',
1243 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1244 '', ''])
1245 sock.sendto(msg, ("239.255.255.250", 1900))
1246
1247 logger.debug("Missing MAN")
1248 msg = '\r\n'.join([
1249 'M-SEARCH * HTTP/1.1',
1250 'HOST: 239.255.255.250:1900',
1251 'MX: 1',
1252 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1253 '', ''])
1254 sock.sendto(msg, ("239.255.255.250", 1900))
1255
1256 logger.debug("Invalid MAN")
1257 msg = '\r\n'.join([
1258 'M-SEARCH * HTTP/1.1',
1259 'HOST: 239.255.255.250:1900',
1260 'MX: 1',
1261 'MAN: foo',
1262 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1263 '', ''])
1264 sock.sendto(msg, ("239.255.255.250", 1900))
1265 msg = '\r\n'.join([
1266 'M-SEARCH * HTTP/1.1',
1267 'HOST: 239.255.255.250:1900',
1268 'MX: 1',
1269 'MAN; "ssdp:discover"',
1270 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1271 '', ''])
1272 sock.sendto(msg, ("239.255.255.250", 1900))
1273
1274 logger.debug("Missing HOST")
1275 msg = '\r\n'.join([
1276 'M-SEARCH * HTTP/1.1',
1277 'MAN: "ssdp:discover"',
1278 'MX: 1',
1279 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1280 '', ''])
1281 sock.sendto(msg, ("239.255.255.250", 1900))
1282
1283 logger.debug("Missing ST")
1284 msg = '\r\n'.join([
1285 'M-SEARCH * HTTP/1.1',
1286 'HOST: 239.255.255.250:1900',
1287 'MAN: "ssdp:discover"',
1288 'MX: 1',
1289 '', ''])
1290 sock.sendto(msg, ("239.255.255.250", 1900))
1291
1292 logger.debug("Mismatching ST")
1293 msg = '\r\n'.join([
1294 'M-SEARCH * HTTP/1.1',
1295 'HOST: 239.255.255.250:1900',
1296 'MAN: "ssdp:discover"',
1297 'MX: 1',
1298 'ST: uuid:16d5f8a9-4ee4-4f5e-81f9-cc6e2f47f42d',
1299 '', ''])
1300 sock.sendto(msg, ("239.255.255.250", 1900))
1301 msg = '\r\n'.join([
1302 'M-SEARCH * HTTP/1.1',
1303 'HOST: 239.255.255.250:1900',
1304 'MAN: "ssdp:discover"',
1305 'MX: 1',
1306 'ST: foo:bar',
1307 '', ''])
1308 sock.sendto(msg, ("239.255.255.250", 1900))
1309 msg = '\r\n'.join([
1310 'M-SEARCH * HTTP/1.1',
1311 'HOST: 239.255.255.250:1900',
1312 'MAN: "ssdp:discover"',
1313 'MX: 1',
1314 'ST: foobar',
1315 '', ''])
1316 sock.sendto(msg, ("239.255.255.250", 1900))
1317
1318 logger.debug("Invalid ST")
1319 msg = '\r\n'.join([
1320 'M-SEARCH * HTTP/1.1',
1321 'HOST: 239.255.255.250:1900',
1322 'MAN: "ssdp:discover"',
1323 'MX: 1',
1324 'ST; urn:schemas-wifialliance-org:device:WFADevice:1',
1325 '', ''])
1326 sock.sendto(msg, ("239.255.255.250", 1900))
1327
1328 logger.debug("Invalid M-SEARCH")
1329 msg = '\r\n'.join([
1330 'M+SEARCH * HTTP/1.1',
1331 'HOST: 239.255.255.250:1900',
1332 'MAN: "ssdp:discover"',
1333 'MX: 1',
1334 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1335 '', ''])
1336 sock.sendto(msg, ("239.255.255.250", 1900))
1337 msg = '\r\n'.join([
1338 'M-SEARCH-* HTTP/1.1',
1339 'HOST: 239.255.255.250:1900',
1340 'MAN: "ssdp:discover"',
1341 'MX: 1',
1342 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1343 '', ''])
1344 sock.sendto(msg, ("239.255.255.250", 1900))
1345
1346 logger.debug("Invalid message format")
1347 sock.sendto("NOTIFY * HTTP/1.1", ("239.255.255.250", 1900))
1348 msg = '\r'.join([
1349 'M-SEARCH * HTTP/1.1',
1350 'HOST: 239.255.255.250:1900',
1351 'MAN: "ssdp:discover"',
1352 'MX: 1',
1353 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1354 '', ''])
1355 sock.sendto(msg, ("239.255.255.250", 1900))
1356
1357 try:
1358 r = sock.recv(1000)
1359 raise Exception("Unexpected M-SEARCH response: " + r)
1360 except socket.timeout:
1361 pass
1362
1363 logger.debug("Valid M-SEARCH")
1364 msg = '\r\n'.join([
1365 'M-SEARCH * HTTP/1.1',
1366 'HOST: 239.255.255.250:1900',
1367 'MAN: "ssdp:discover"',
1368 'MX: 1',
1369 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1370 '', ''])
1371 sock.sendto(msg, ("239.255.255.250", 1900))
1372
1373 try:
1374 r = sock.recv(1000)
1375 pass
1376 except socket.timeout:
1377 raise Exception("No SSDP response")
1378
1379 def test_ap_wps_ssdp_burst(dev, apdev):
1380 """WPS AP and SSDP burst"""
1381 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1382 add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1383
1384 msg = '\r\n'.join([
1385 'M-SEARCH * HTTP/1.1',
1386 'HOST: 239.255.255.250:1900',
1387 'MAN: "ssdp:discover"',
1388 'MX: 1',
1389 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1390 '', ''])
1391 socket.setdefaulttimeout(1)
1392 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
1393 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1394 sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
1395 sock.bind(("127.0.0.1", 0))
1396 for i in range(0, 25):
1397 sock.sendto(msg, ("239.255.255.250", 1900))
1398 resp = 0
1399 while True:
1400 try:
1401 r = sock.recv(1000)
1402 if not r.startswith("HTTP/1.1 200 OK\r\n"):
1403 raise Exception("Unexpected message: " + r)
1404 resp += 1
1405 except socket.timeout:
1406 break
1407 if resp < 20:
1408 raise Exception("Too few SSDP responses")
1409
1410 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
1411 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1412 sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
1413 sock.bind(("127.0.0.1", 0))
1414 for i in range(0, 25):
1415 sock.sendto(msg, ("239.255.255.250", 1900))
1416 while True:
1417 try:
1418 r = sock.recv(1000)
1419 if ap_uuid in r:
1420 break
1421 except socket.timeout:
1422 raise Exception("No SSDP response")
1423
1424 def ssdp_get_location(uuid):
1425 res = ssdp_send_msearch("uuid:" + uuid)
1426 location = None
1427 for l in res.splitlines():
1428 if l.lower().startswith("location:"):
1429 location = l.split(':', 1)[1].strip()
1430 break
1431 if location is None:
1432 raise Exception("No UPnP location found")
1433 return location
1434
1435 def upnp_get_urls(location):
1436 conn = urllib.urlopen(location)
1437 tree = ET.parse(conn)
1438 root = tree.getroot()
1439 urn = '{urn:schemas-upnp-org:device-1-0}'
1440 service = root.find("./" + urn + "device/" + urn + "serviceList/" + urn + "service")
1441 res = {}
1442 res['scpd_url'] = urlparse.urljoin(location, service.find(urn + 'SCPDURL').text)
1443 res['control_url'] = urlparse.urljoin(location, service.find(urn + 'controlURL').text)
1444 res['event_sub_url'] = urlparse.urljoin(location, service.find(urn + 'eventSubURL').text)
1445 return res
1446
1447 def upnp_soap_action(conn, path, action, include_soap_action=True, soap_action_override=None):
1448 soapns = 'http://schemas.xmlsoap.org/soap/envelope/'
1449 wpsns = 'urn:schemas-wifialliance-org:service:WFAWLANConfig:1'
1450 ET.register_namespace('soapenv', soapns)
1451 ET.register_namespace('wfa', wpsns)
1452 attrib = {}
1453 attrib['{%s}encodingStyle' % soapns] = 'http://schemas.xmlsoap.org/soap/encoding/'
1454 root = ET.Element("{%s}Envelope" % soapns, attrib=attrib)
1455 body = ET.SubElement(root, "{%s}Body" % soapns)
1456 act = ET.SubElement(body, "{%s}%s" % (wpsns, action))
1457 tree = ET.ElementTree(root)
1458 soap = StringIO.StringIO()
1459 tree.write(soap, xml_declaration=True, encoding='utf-8')
1460
1461 headers = { "Content-type": 'text/xml; charset="utf-8"' }
1462 if include_soap_action:
1463 headers["SOAPAction"] = '"urn:schemas-wifialliance-org:service:WFAWLANConfig:1#%s"' % action
1464 elif soap_action_override:
1465 headers["SOAPAction"] = soap_action_override
1466 conn.request("POST", path, soap.getvalue(), headers)
1467 return conn.getresponse()
1468
1469 def test_ap_wps_upnp(dev, apdev):
1470 """WPS AP and UPnP operations"""
1471 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1472 add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1473
1474 location = ssdp_get_location(ap_uuid)
1475 urls = upnp_get_urls(location)
1476
1477 conn = urllib.urlopen(urls['scpd_url'])
1478 scpd = conn.read()
1479
1480 conn = urllib.urlopen(urlparse.urljoin(location, "unknown.html"))
1481 if conn.getcode() != 404:
1482 raise Exception("Unexpected HTTP response to GET unknown URL")
1483
1484 url = urlparse.urlparse(location)
1485 conn = httplib.HTTPConnection(url.netloc)
1486 #conn.set_debuglevel(1)
1487 headers = { "Content-type": 'text/xml; charset="utf-8"',
1488 "SOAPAction": '"urn:schemas-wifialliance-org:service:WFAWLANConfig:1#GetDeviceInfo"' }
1489 conn.request("POST", "hello", "\r\n\r\n", headers)
1490 resp = conn.getresponse()
1491 if resp.status != 404:
1492 raise Exception("Unexpected HTTP response: %s" % resp.status)
1493
1494 conn.request("UNKNOWN", "hello", "\r\n\r\n", headers)
1495 resp = conn.getresponse()
1496 if resp.status != 501:
1497 raise Exception("Unexpected HTTP response: %s" % resp.status)
1498
1499 headers = { "Content-type": 'text/xml; charset="utf-8"',
1500 "SOAPAction": '"urn:some-unknown-action#GetDeviceInfo"' }
1501 ctrlurl = urlparse.urlparse(urls['control_url'])
1502 conn.request("POST", ctrlurl.path, "\r\n\r\n", headers)
1503 resp = conn.getresponse()
1504 if resp.status != 401:
1505 raise Exception("Unexpected HTTP response: %s" % resp.status)
1506
1507 logger.debug("GetDeviceInfo without SOAPAction header")
1508 resp = upnp_soap_action(conn, ctrlurl.path, "GetDeviceInfo",
1509 include_soap_action=False)
1510 if resp.status != 401:
1511 raise Exception("Unexpected HTTP response: %s" % resp.status)
1512
1513 logger.debug("GetDeviceInfo with invalid SOAPAction header")
1514 for act in [ "foo",
1515 "urn:schemas-wifialliance-org:service:WFAWLANConfig:1#GetDeviceInfo",
1516 '"urn:schemas-wifialliance-org:service:WFAWLANConfig:1"',
1517 '"urn:schemas-wifialliance-org:service:WFAWLANConfig:123#GetDevice']:
1518 resp = upnp_soap_action(conn, ctrlurl.path, "GetDeviceInfo",
1519 include_soap_action=False,
1520 soap_action_override=act)
1521 if resp.status != 401:
1522 raise Exception("Unexpected HTTP response: %s" % resp.status)
1523
1524 resp = upnp_soap_action(conn, ctrlurl.path, "GetDeviceInfo")
1525 if resp.status != 200:
1526 raise Exception("Unexpected HTTP response: %s" % resp.status)
1527 dev = resp.read()
1528 if "NewDeviceInfo" not in dev:
1529 raise Exception("Unexpected GetDeviceInfo response")
1530
1531 logger.debug("PutMessage without required parameters")
1532 resp = upnp_soap_action(conn, ctrlurl.path, "PutMessage")
1533 if resp.status != 600:
1534 raise Exception("Unexpected HTTP response: %s" % resp.status)
1535
1536 logger.debug("PutWLANResponse without required parameters")
1537 resp = upnp_soap_action(conn, ctrlurl.path, "PutWLANResponse")
1538 if resp.status != 600:
1539 raise Exception("Unexpected HTTP response: %s" % resp.status)
1540
1541 logger.debug("SetSelectedRegistrar from unregistered ER")
1542 resp = upnp_soap_action(conn, ctrlurl.path, "SetSelectedRegistrar")
1543 if resp.status != 501:
1544 raise Exception("Unexpected HTTP response: %s" % resp.status)
1545
1546 logger.debug("Unknown action")
1547 resp = upnp_soap_action(conn, ctrlurl.path, "Unknown")
1548 if resp.status != 401:
1549 raise Exception("Unexpected HTTP response: %s" % resp.status)
1550
1551 def test_ap_wps_upnp_subscribe(dev, apdev):
1552 """WPS AP and UPnP event subscription"""
1553 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1554 add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1555
1556 location = ssdp_get_location(ap_uuid)
1557 urls = upnp_get_urls(location)
1558 eventurl = urlparse.urlparse(urls['event_sub_url'])
1559
1560 url = urlparse.urlparse(location)
1561 conn = httplib.HTTPConnection(url.netloc)
1562 #conn.set_debuglevel(1)
1563 headers = { "callback": '<http://127.0.0.1:12345/event>',
1564 "timeout": "Second-1234" }
1565 conn.request("SUBSCRIBE", "hello", "\r\n\r\n", headers)
1566 resp = conn.getresponse()
1567 if resp.status != 412:
1568 raise Exception("Unexpected HTTP response: %s" % resp.status)
1569
1570 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1571 resp = conn.getresponse()
1572 if resp.status != 412:
1573 raise Exception("Unexpected HTTP response: %s" % resp.status)
1574
1575 headers = { "NT": "upnp:event",
1576 "timeout": "Second-1234" }
1577 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1578 resp = conn.getresponse()
1579 if resp.status != 412:
1580 raise Exception("Unexpected HTTP response: %s" % resp.status)
1581
1582 headers = { "callback": '<http://127.0.0.1:12345/event>',
1583 "NT": "upnp:foobar",
1584 "timeout": "Second-1234" }
1585 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1586 resp = conn.getresponse()
1587 if resp.status != 400:
1588 raise Exception("Unexpected HTTP response: %s" % resp.status)
1589
1590 logger.debug("Valid subscription")
1591 headers = { "callback": '<http://127.0.0.1:12345/event>',
1592 "NT": "upnp:event",
1593 "timeout": "Second-1234" }
1594 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1595 resp = conn.getresponse()
1596 if resp.status != 200:
1597 raise Exception("Unexpected HTTP response: %s" % resp.status)
1598 sid = resp.getheader("sid")
1599 logger.debug("Subscription SID " + sid)
1600
1601 logger.debug("Invalid re-subscription")
1602 headers = { "NT": "upnp:event",
1603 "sid": "123456734567854",
1604 "timeout": "Second-1234" }
1605 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1606 resp = conn.getresponse()
1607 if resp.status != 400:
1608 raise Exception("Unexpected HTTP response: %s" % resp.status)
1609
1610 logger.debug("Invalid re-subscription")
1611 headers = { "NT": "upnp:event",
1612 "sid": "uuid:123456734567854",
1613 "timeout": "Second-1234" }
1614 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1615 resp = conn.getresponse()
1616 if resp.status != 400:
1617 raise Exception("Unexpected HTTP response: %s" % resp.status)
1618
1619 logger.debug("Invalid re-subscription")
1620 headers = { "callback": '<http://127.0.0.1:12345/event>',
1621 "NT": "upnp:event",
1622 "sid": sid,
1623 "timeout": "Second-1234" }
1624 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1625 resp = conn.getresponse()
1626 if resp.status != 400:
1627 raise Exception("Unexpected HTTP response: %s" % resp.status)
1628
1629 logger.debug("SID mismatch in re-subscription")
1630 headers = { "NT": "upnp:event",
1631 "sid": "uuid:4c2bca79-1ff4-4e43-85d4-952a2b8a51fb",
1632 "timeout": "Second-1234" }
1633 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1634 resp = conn.getresponse()
1635 if resp.status != 412:
1636 raise Exception("Unexpected HTTP response: %s" % resp.status)
1637
1638 logger.debug("Valid re-subscription")
1639 headers = { "NT": "upnp:event",
1640 "sid": sid,
1641 "timeout": "Second-1234" }
1642 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1643 resp = conn.getresponse()
1644 if resp.status != 200:
1645 raise Exception("Unexpected HTTP response: %s" % resp.status)
1646 sid2 = resp.getheader("sid")
1647 logger.debug("Subscription SID " + sid2)
1648
1649 if sid != sid2:
1650 raise Exception("Unexpected SID change")
1651
1652 logger.debug("Valid re-subscription")
1653 headers = { "NT": "upnp:event",
1654 "sid": "uuid: \t \t" + sid.split(':')[1],
1655 "timeout": "Second-1234" }
1656 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1657 resp = conn.getresponse()
1658 if resp.status != 200:
1659 raise Exception("Unexpected HTTP response: %s" % resp.status)
1660
1661 logger.debug("Invalid unsubscription")
1662 headers = { "sid": sid }
1663 conn.request("UNSUBSCRIBE", "/hello", "\r\n\r\n", headers)
1664 resp = conn.getresponse()
1665 if resp.status != 412:
1666 raise Exception("Unexpected HTTP response: %s" % resp.status)
1667 headers = { "foo": "bar" }
1668 conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1669 resp = conn.getresponse()
1670 if resp.status != 412:
1671 raise Exception("Unexpected HTTP response: %s" % resp.status)
1672
1673 logger.debug("Valid unsubscription")
1674 headers = { "sid": sid }
1675 conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1676 resp = conn.getresponse()
1677 if resp.status != 200:
1678 raise Exception("Unexpected HTTP response: %s" % resp.status)
1679
1680 logger.debug("Unsubscription for not existing SID")
1681 headers = { "sid": sid }
1682 conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1683 resp = conn.getresponse()
1684 if resp.status != 412:
1685 raise Exception("Unexpected HTTP response: %s" % resp.status)
1686
1687 logger.debug("Invalid unsubscription")
1688 headers = { "sid": " \t \tfoo" }
1689 conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1690 resp = conn.getresponse()
1691 if resp.status != 400:
1692 raise Exception("Unexpected HTTP response: %s" % resp.status)
1693
1694 logger.debug("Invalid unsubscription")
1695 headers = { "sid": "uuid:\t \tfoo" }
1696 conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1697 resp = conn.getresponse()
1698 if resp.status != 400:
1699 raise Exception("Unexpected HTTP response: %s" % resp.status)
1700
1701 logger.debug("Invalid unsubscription")
1702 headers = { "NT": "upnp:event",
1703 "sid": sid }
1704 conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1705 resp = conn.getresponse()
1706 if resp.status != 400:
1707 raise Exception("Unexpected HTTP response: %s" % resp.status)
1708 headers = { "callback": '<http://127.0.0.1:12345/event>',
1709 "sid": sid }
1710 conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1711 resp = conn.getresponse()
1712 if resp.status != 400:
1713 raise Exception("Unexpected HTTP response: %s" % resp.status)
1714
1715 logger.debug("Valid subscription with multiple callbacks")
1716 headers = { "callback": '<http://127.0.0.1:12345/event> <http://127.0.0.1:12345/event>\t<http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event>',
1717 "NT": "upnp:event",
1718 "timeout": "Second-1234" }
1719 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1720 resp = conn.getresponse()
1721 if resp.status != 200:
1722 raise Exception("Unexpected HTTP response: %s" % resp.status)
1723 sid = resp.getheader("sid")
1724 logger.debug("Subscription SID " + sid)