]> git.ipfire.org Git - thirdparty/hostap.git/blame - tests/hwsim/test_ap_wps.py
tests: Increase Wi-Fi Display testing coverage
[thirdparty/hostap.git] / tests / hwsim / test_ap_wps.py
CommitLineData
302b7a1b 1# WPS tests
8674c022 2# Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
302b7a1b
JM
3#
4# This software may be distributed under the terms of the BSD license.
5# See README for more details.
6
2035b170 7import os
302b7a1b
JM
8import time
9import subprocess
10import logging
c9aa4308 11logger = logging.getLogger()
1013a576 12import re
44ff0400 13import socket
47c549fd
JM
14import httplib
15import urlparse
16import urllib
17import xml.etree.ElementTree as ET
18import StringIO
302b7a1b
JM
19
20import hwsim_utils
21import hostapd
22
ae3ad328 23def test_ap_wps_init(dev, apdev):
302b7a1b
JM
24 """Initial AP configuration with first WPS Enrollee"""
25 ssid = "test-wps"
ae3ad328 26 hostapd.add_ap(apdev[0]['ifname'],
302b7a1b 27 { "ssid": ssid, "eap_server": "1", "wps_state": "1" })
ae3ad328 28 hapd = hostapd.Hostapd(apdev[0]['ifname'])
302b7a1b
JM
29 logger.info("WPS provisioning step")
30 hapd.request("WPS_PBC")
d671a420
JM
31 if "PBC Status: Active" not in hapd.request("WPS_GET_STATUS"):
32 raise Exception("PBC status not shown correctly")
302b7a1b
JM
33 dev[0].dump_monitor()
34 dev[0].request("WPS_PBC")
853b49a0 35 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
302b7a1b
JM
36 if ev is None:
37 raise Exception("Association with the AP timed out")
38 status = dev[0].get_status()
ae3ad328 39 if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
302b7a1b
JM
40 raise Exception("Not fully connected")
41 if status['ssid'] != ssid:
42 raise Exception("Unexpected SSID")
43 if status['pairwise_cipher'] != 'CCMP':
44 raise Exception("Unexpected encryption configuration")
45 if status['key_mgmt'] != 'WPA2-PSK':
46 raise Exception("Unexpected key_mgmt")
47
d671a420
JM
48 status = hapd.request("WPS_GET_STATUS")
49 if "PBC Status: Disabled" not in status:
50 raise Exception("PBC status not shown correctly")
51 if "Last WPS result: Success" not in status:
52 raise Exception("Last WPS result not shown correctly")
53 if "Peer Address: " + dev[0].p2p_interface_addr() not in status:
54 raise Exception("Peer address not shown correctly")
75b25ece
JM
55 conf = hapd.request("GET_CONFIG")
56 if "wps_state=configured" not in conf:
57 raise Exception("AP not in WPS configured state")
58 if "rsn_pairwise_cipher=CCMP TKIP" not in conf:
59 raise Exception("Unexpected rsn_pairwise_cipher")
60 if "wpa_pairwise_cipher=CCMP TKIP" not in conf:
61 raise Exception("Unexpected wpa_pairwise_cipher")
62 if "group_cipher=TKIP" not in conf:
63 raise Exception("Unexpected group_cipher")
d671a420 64
18030dc0
JM
65def test_ap_wps_init_2ap_pbc(dev, apdev):
66 """Initial two-radio AP configuration with first WPS PBC Enrollee"""
67 ssid = "test-wps"
68 params = { "ssid": ssid, "eap_server": "1", "wps_state": "1" }
69 hostapd.add_ap(apdev[0]['ifname'], params)
70 hostapd.add_ap(apdev[1]['ifname'], params)
71 hapd = hostapd.Hostapd(apdev[0]['ifname'])
72 logger.info("WPS provisioning step")
73 hapd.request("WPS_PBC")
18030dc0
JM
74 dev[0].scan(freq="2412")
75 bss = dev[0].get_bss(apdev[0]['bssid'])
76 if "[WPS-PBC]" not in bss['flags']:
77 raise Exception("WPS-PBC flag missing from AP1")
78 bss = dev[0].get_bss(apdev[1]['bssid'])
79 if "[WPS-PBC]" not in bss['flags']:
80 raise Exception("WPS-PBC flag missing from AP2")
81 dev[0].dump_monitor()
82 dev[0].request("WPS_PBC")
83 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
84 if ev is None:
85 raise Exception("Association with the AP timed out")
86
18030dc0
JM
87 dev[1].scan(freq="2412")
88 bss = dev[1].get_bss(apdev[0]['bssid'])
89 if "[WPS-PBC]" in bss['flags']:
90 raise Exception("WPS-PBC flag not cleared from AP1")
91 bss = dev[1].get_bss(apdev[1]['bssid'])
92 if "[WPS-PBC]" in bss['flags']:
93 raise Exception("WPS-PBC flag bit ckeared from AP2")
94
95def test_ap_wps_init_2ap_pin(dev, apdev):
96 """Initial two-radio AP configuration with first WPS PIN Enrollee"""
97 ssid = "test-wps"
98 params = { "ssid": ssid, "eap_server": "1", "wps_state": "1" }
99 hostapd.add_ap(apdev[0]['ifname'], params)
100 hostapd.add_ap(apdev[1]['ifname'], params)
101 hapd = hostapd.Hostapd(apdev[0]['ifname'])
102 logger.info("WPS provisioning step")
103 pin = dev[0].wps_read_pin()
104 hapd.request("WPS_PIN any " + pin)
18030dc0
JM
105 dev[0].scan(freq="2412")
106 bss = dev[0].get_bss(apdev[0]['bssid'])
107 if "[WPS-AUTH]" not in bss['flags']:
108 raise Exception("WPS-AUTH flag missing from AP1")
109 bss = dev[0].get_bss(apdev[1]['bssid'])
110 if "[WPS-AUTH]" not in bss['flags']:
111 raise Exception("WPS-AUTH flag missing from AP2")
112 dev[0].dump_monitor()
113 dev[0].request("WPS_PIN any " + pin)
114 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
115 if ev is None:
116 raise Exception("Association with the AP timed out")
117
18030dc0
JM
118 dev[1].scan(freq="2412")
119 bss = dev[1].get_bss(apdev[0]['bssid'])
120 if "[WPS-AUTH]" in bss['flags']:
121 raise Exception("WPS-AUTH flag not cleared from AP1")
122 bss = dev[1].get_bss(apdev[1]['bssid'])
123 if "[WPS-AUTH]" in bss['flags']:
124 raise Exception("WPS-AUTH flag bit ckeared from AP2")
125
35831e94
JM
126def test_ap_wps_init_through_wps_config(dev, apdev):
127 """Initial AP configuration using wps_config command"""
128 ssid = "test-wps-init-config"
129 hostapd.add_ap(apdev[0]['ifname'],
130 { "ssid": ssid, "eap_server": "1", "wps_state": "1" })
131 hapd = hostapd.Hostapd(apdev[0]['ifname'])
132 if "FAIL" in hapd.request("WPS_CONFIG " + ssid.encode("hex") + " WPA2PSK CCMP " + "12345678".encode("hex")):
133 raise Exception("WPS_CONFIG command failed")
180cd73d
JM
134 ev = hapd.wait_event(["WPS-NEW-AP-SETTINGS"], timeout=5)
135 if ev is None:
136 raise Exception("Timeout on WPS-NEW-AP-SETTINGS events")
137 # It takes some time for the AP to update Beacon and Probe Response frames,
138 # so wait here before requesting the scan to be started to avoid adding
139 # extra five second wait to the test due to fetching obsolete scan results.
140 hapd.ping()
141 time.sleep(0.2)
35831e94
JM
142 dev[0].connect(ssid, psk="12345678", scan_freq="2412", proto="WPA2",
143 pairwise="CCMP", group="CCMP")
144
ae3ad328 145def test_ap_wps_conf(dev, apdev):
302b7a1b
JM
146 """WPS PBC provisioning with configured AP"""
147 ssid = "test-wps-conf"
ae3ad328 148 hostapd.add_ap(apdev[0]['ifname'],
302b7a1b
JM
149 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
150 "wpa_passphrase": "12345678", "wpa": "2",
151 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
ae3ad328 152 hapd = hostapd.Hostapd(apdev[0]['ifname'])
302b7a1b
JM
153 logger.info("WPS provisioning step")
154 hapd.request("WPS_PBC")
155 dev[0].dump_monitor()
156 dev[0].request("WPS_PBC")
7e3f110b 157 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
302b7a1b
JM
158 if ev is None:
159 raise Exception("Association with the AP timed out")
160 status = dev[0].get_status()
ae3ad328 161 if status['wpa_state'] != 'COMPLETED':
302b7a1b 162 raise Exception("Not fully connected")
ae3ad328
JM
163 if status['bssid'] != apdev[0]['bssid']:
164 raise Exception("Unexpected BSSID")
302b7a1b
JM
165 if status['ssid'] != ssid:
166 raise Exception("Unexpected SSID")
167 if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
168 raise Exception("Unexpected encryption configuration")
169 if status['key_mgmt'] != 'WPA2-PSK':
170 raise Exception("Unexpected key_mgmt")
171
097cd9cd
JM
172 sta = hapd.get_sta(dev[0].p2p_interface_addr())
173 if 'wpsDeviceName' not in sta or sta['wpsDeviceName'] != "Device A":
174 raise Exception("Device name not available in STA command")
175
04e62788
JM
176def test_ap_wps_twice(dev, apdev):
177 """WPS provisioning with twice to change passphrase"""
178 ssid = "test-wps-twice"
179 params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
180 "wpa_passphrase": "12345678", "wpa": "2",
181 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" }
182 hostapd.add_ap(apdev[0]['ifname'], params)
183 hapd = hostapd.Hostapd(apdev[0]['ifname'])
184 logger.info("WPS provisioning step")
185 hapd.request("WPS_PBC")
04e62788
JM
186 dev[0].dump_monitor()
187 dev[0].request("WPS_PBC")
188 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
189 if ev is None:
190 raise Exception("Association with the AP timed out")
191 dev[0].request("DISCONNECT")
192
193 logger.info("Restart AP with different passphrase and re-run WPS")
194 hapd_global = hostapd.HostapdGlobal()
195 hapd_global.remove(apdev[0]['ifname'])
196 params['wpa_passphrase'] = 'another passphrase'
197 hostapd.add_ap(apdev[0]['ifname'], params)
198 hapd = hostapd.Hostapd(apdev[0]['ifname'])
199 logger.info("WPS provisioning step")
200 hapd.request("WPS_PBC")
201 dev[0].dump_monitor()
202 dev[0].request("WPS_PBC")
203 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
204 if ev is None:
205 raise Exception("Association with the AP timed out")
206 networks = dev[0].list_networks()
207 if len(networks) > 1:
208 raise Exception("Unexpected duplicated network block present")
209
d658205a
JM
210def test_ap_wps_incorrect_pin(dev, apdev):
211 """WPS PIN provisioning with incorrect PIN"""
212 ssid = "test-wps-incorrect-pin"
213 hostapd.add_ap(apdev[0]['ifname'],
214 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
215 "wpa_passphrase": "12345678", "wpa": "2",
216 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
217 hapd = hostapd.Hostapd(apdev[0]['ifname'])
218
219 logger.info("WPS provisioning attempt 1")
220 hapd.request("WPS_PIN any 12345670")
d658205a
JM
221 dev[0].dump_monitor()
222 dev[0].request("WPS_PIN any 55554444")
223 ev = dev[0].wait_event(["WPS-FAIL"], timeout=30)
224 if ev is None:
225 raise Exception("WPS operation timed out")
226 if "config_error=18" not in ev:
227 raise Exception("Incorrect config_error reported")
228 if "msg=8" not in ev:
229 raise Exception("PIN error detected on incorrect message")
230 ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
231 if ev is None:
232 raise Exception("Timeout on disconnection event")
233 dev[0].request("WPS_CANCEL")
234 # if a scan was in progress, wait for it to complete before trying WPS again
235 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 5)
236
d671a420
JM
237 status = hapd.request("WPS_GET_STATUS")
238 if "Last WPS result: Failed" not in status:
239 raise Exception("WPS failure result not shown correctly")
240
d658205a
JM
241 logger.info("WPS provisioning attempt 2")
242 hapd.request("WPS_PIN any 12345670")
243 dev[0].dump_monitor()
244 dev[0].request("WPS_PIN any 12344444")
245 ev = dev[0].wait_event(["WPS-FAIL"], timeout=30)
246 if ev is None:
247 raise Exception("WPS operation timed out")
248 if "config_error=18" not in ev:
249 raise Exception("Incorrect config_error reported")
250 if "msg=10" not in ev:
251 raise Exception("PIN error detected on incorrect message")
252 ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
253 if ev is None:
254 raise Exception("Timeout on disconnection event")
255
ae3ad328 256def test_ap_wps_conf_pin(dev, apdev):
302b7a1b
JM
257 """WPS PIN provisioning with configured AP"""
258 ssid = "test-wps-conf-pin"
ae3ad328 259 hostapd.add_ap(apdev[0]['ifname'],
302b7a1b
JM
260 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
261 "wpa_passphrase": "12345678", "wpa": "2",
262 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
ae3ad328 263 hapd = hostapd.Hostapd(apdev[0]['ifname'])
302b7a1b
JM
264 logger.info("WPS provisioning step")
265 pin = dev[0].wps_read_pin()
266 hapd.request("WPS_PIN any " + pin)
267 dev[0].dump_monitor()
268 dev[0].request("WPS_PIN any " + pin)
7e3f110b 269 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
302b7a1b
JM
270 if ev is None:
271 raise Exception("Association with the AP timed out")
272 status = dev[0].get_status()
ae3ad328 273 if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
302b7a1b
JM
274 raise Exception("Not fully connected")
275 if status['ssid'] != ssid:
276 raise Exception("Unexpected SSID")
277 if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
278 raise Exception("Unexpected encryption configuration")
279 if status['key_mgmt'] != 'WPA2-PSK':
280 raise Exception("Unexpected key_mgmt")
281
362ba6de
JM
282 dev[1].scan(freq="2412")
283 bss = dev[1].get_bss(apdev[0]['bssid'])
284 if "[WPS-AUTH]" in bss['flags']:
285 raise Exception("WPS-AUTH flag not cleared")
a60a6d6b
JM
286 logger.info("Try to connect from another station using the same PIN")
287 dev[1].request("WPS_PIN any " + pin)
288 ev = dev[1].wait_event(["WPS-M2D","CTRL-EVENT-CONNECTED"], timeout=30)
289 if ev is None:
290 raise Exception("Operation timed out")
291 if "WPS-M2D" not in ev:
292 raise Exception("Unexpected WPS operation started")
362ba6de 293
e9129860
JM
294def test_ap_wps_conf_pin_2sta(dev, apdev):
295 """Two stations trying to use WPS PIN at the same time"""
296 ssid = "test-wps-conf-pin2"
297 hostapd.add_ap(apdev[0]['ifname'],
298 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
299 "wpa_passphrase": "12345678", "wpa": "2",
300 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
301 hapd = hostapd.Hostapd(apdev[0]['ifname'])
302 logger.info("WPS provisioning step")
303 pin = "12345670"
304 pin2 = "55554444"
305 hapd.request("WPS_PIN " + dev[0].get_status_field("uuid") + " " + pin)
306 hapd.request("WPS_PIN " + dev[1].get_status_field("uuid") + " " + pin)
e9129860 307 dev[0].dump_monitor()
e9129860
JM
308 dev[1].dump_monitor()
309 dev[0].request("WPS_PIN any " + pin)
310 dev[1].request("WPS_PIN any " + pin)
311 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
312 if ev is None:
313 raise Exception("Association with the AP timed out")
314 ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
315 if ev is None:
316 raise Exception("Association with the AP timed out")
0489e880
JM
317
318def test_ap_wps_conf_pin_timeout(dev, apdev):
319 """WPS PIN provisioning with configured AP timing out PIN"""
320 ssid = "test-wps-conf-pin"
321 hostapd.add_ap(apdev[0]['ifname'],
322 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
323 "wpa_passphrase": "12345678", "wpa": "2",
324 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
325 hapd = hostapd.Hostapd(apdev[0]['ifname'])
326 addr = dev[0].p2p_interface_addr()
327 pin = dev[0].wps_read_pin()
328 if "FAIL" not in hapd.request("WPS_PIN "):
329 raise Exception("Unexpected success on invalid WPS_PIN")
330 hapd.request("WPS_PIN any " + pin + " 1")
331 time.sleep(1.1)
332 dev[0].request("WPS_PIN any " + pin)
333 ev = hapd.wait_event(["WPS-PIN-NEEDED"], timeout=20)
334 if ev is None:
335 raise Exception("WPS-PIN-NEEDED event timed out")
336 ev = dev[0].wait_event(["WPS-M2D"])
337 if ev is None:
338 raise Exception("M2D not reported")
339 dev[0].request("WPS_CANCEL")
340
341 hapd.request("WPS_PIN any " + pin + " 20 " + addr)
342 dev[0].request("WPS_PIN any " + pin)
343 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
344 if ev is None:
345 raise Exception("Association with the AP timed out")
e9129860 346
ae3ad328 347def test_ap_wps_reg_connect(dev, apdev):
302b7a1b 348 """WPS registrar using AP PIN to connect"""
803edd1c 349 ssid = "test-wps-reg-ap-pin"
302b7a1b 350 appin = "12345670"
ae3ad328 351 hostapd.add_ap(apdev[0]['ifname'],
302b7a1b
JM
352 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
353 "wpa_passphrase": "12345678", "wpa": "2",
354 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
355 "ap_pin": appin})
356 logger.info("WPS provisioning step")
302b7a1b 357 dev[0].dump_monitor()
6edaee9c 358 dev[0].wps_reg(apdev[0]['bssid'], appin)
302b7a1b 359 status = dev[0].get_status()
ae3ad328 360 if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
302b7a1b
JM
361 raise Exception("Not fully connected")
362 if status['ssid'] != ssid:
363 raise Exception("Unexpected SSID")
364 if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
365 raise Exception("Unexpected encryption configuration")
366 if status['key_mgmt'] != 'WPA2-PSK':
367 raise Exception("Unexpected key_mgmt")
368
9488858f
JM
369def check_wps_reg_failure(dev, ap, appin):
370 dev.request("WPS_REG " + ap['bssid'] + " " + appin)
371 ev = dev.wait_event(["WPS-SUCCESS", "WPS-FAIL"], timeout=15)
372 if ev is None:
373 raise Exception("WPS operation timed out")
374 if "WPS-SUCCESS" in ev:
375 raise Exception("WPS operation succeeded unexpectedly")
376 if "config_error=15" not in ev:
377 raise Exception("WPS setup locked state was not reported correctly")
378
e4357b19
JM
379def test_ap_wps_random_ap_pin(dev, apdev):
380 """WPS registrar using random AP PIN"""
381 ssid = "test-wps-reg-random-ap-pin"
382 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
383 hostapd.add_ap(apdev[0]['ifname'],
384 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
385 "wpa_passphrase": "12345678", "wpa": "2",
386 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
387 "device_name": "Wireless AP", "manufacturer": "Company",
388 "model_name": "WAP", "model_number": "123",
389 "serial_number": "12345", "device_type": "6-0050F204-1",
390 "os_version": "01020300",
391 "config_methods": "label push_button",
392 "uuid": ap_uuid, "upnp_iface": "lo" })
393 hapd = hostapd.Hostapd(apdev[0]['ifname'])
394 appin = hapd.request("WPS_AP_PIN random")
395 if "FAIL" in appin:
396 raise Exception("Could not generate random AP PIN")
397 if appin not in hapd.request("WPS_AP_PIN get"):
398 raise Exception("Could not fetch current AP PIN")
399 logger.info("WPS provisioning step")
e4357b19
JM
400 dev[0].wps_reg(apdev[0]['bssid'], appin)
401
402 hapd.request("WPS_AP_PIN disable")
403 logger.info("WPS provisioning step with AP PIN disabled")
9488858f
JM
404 check_wps_reg_failure(dev[1], apdev[0], appin)
405
406 logger.info("WPS provisioning step with AP PIN reset")
407 appin = "12345670"
408 hapd.request("WPS_AP_PIN set " + appin)
409 dev[1].wps_reg(apdev[0]['bssid'], appin)
410 dev[0].request("REMOVE_NETWORK all")
411 dev[1].request("REMOVE_NETWORK all")
412 dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
413 dev[1].wait_event(["CTRL-EVENT-DISCONNECTED"])
414
415 logger.info("WPS provisioning step after AP PIN timeout")
416 hapd.request("WPS_AP_PIN disable")
417 appin = hapd.request("WPS_AP_PIN random 1")
418 time.sleep(1.1)
419 if "FAIL" not in hapd.request("WPS_AP_PIN get"):
420 raise Exception("AP PIN unexpectedly still enabled")
421 check_wps_reg_failure(dev[0], apdev[0], appin)
422
423 logger.info("WPS provisioning step after AP PIN timeout(2)")
424 hapd.request("WPS_AP_PIN disable")
425 appin = "12345670"
426 hapd.request("WPS_AP_PIN set " + appin + " 1")
427 time.sleep(1.1)
428 if "FAIL" not in hapd.request("WPS_AP_PIN get"):
429 raise Exception("AP PIN unexpectedly still enabled")
430 check_wps_reg_failure(dev[1], apdev[0], appin)
e4357b19 431
ae3ad328 432def test_ap_wps_reg_config(dev, apdev):
4b727c5c 433 """WPS registrar configuring an AP using AP PIN"""
302b7a1b
JM
434 ssid = "test-wps-init-ap-pin"
435 appin = "12345670"
ae3ad328 436 hostapd.add_ap(apdev[0]['ifname'],
302b7a1b
JM
437 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
438 "ap_pin": appin})
439 logger.info("WPS configuration step")
302b7a1b
JM
440 dev[0].dump_monitor()
441 new_ssid = "wps-new-ssid"
442 new_passphrase = "1234567890"
6edaee9c
JM
443 dev[0].wps_reg(apdev[0]['bssid'], appin, new_ssid, "WPA2PSK", "CCMP",
444 new_passphrase)
302b7a1b 445 status = dev[0].get_status()
ae3ad328 446 if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
302b7a1b
JM
447 raise Exception("Not fully connected")
448 if status['ssid'] != new_ssid:
449 raise Exception("Unexpected SSID")
450 if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
451 raise Exception("Unexpected encryption configuration")
452 if status['key_mgmt'] != 'WPA2-PSK':
453 raise Exception("Unexpected key_mgmt")
454
375afd7c
JM
455 logger.info("Re-configure back to open")
456 dev[0].request("REMOVE_NETWORK all")
457 dev[0].request("BSS_FLUSH 0")
458 dev[0].request("SCAN freq=2412 only_new=1")
459 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
460 if ev is None:
461 raise Exception("Scan timed out")
462 dev[0].dump_monitor()
463 dev[0].wps_reg(apdev[0]['bssid'], appin, "wps-open", "OPEN", "NONE", "")
464 status = dev[0].get_status()
465 if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
466 raise Exception("Not fully connected")
467 if status['ssid'] != "wps-open":
468 raise Exception("Unexpected SSID")
469 if status['key_mgmt'] != 'NONE':
470 raise Exception("Unexpected key_mgmt")
471
4b727c5c
JM
472def test_ap_wps_reg_config_ext_processing(dev, apdev):
473 """WPS registrar configuring an AP with external config processing"""
474 ssid = "test-wps-init-ap-pin"
475 appin = "12345670"
476 params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
477 "wps_cred_processing": "1", "ap_pin": appin}
478 hapd = hostapd.add_ap(apdev[0]['ifname'], params)
479 new_ssid = "wps-new-ssid"
480 new_passphrase = "1234567890"
481 dev[0].wps_reg(apdev[0]['bssid'], appin, new_ssid, "WPA2PSK", "CCMP",
482 new_passphrase, no_wait=True)
483 ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15)
484 if ev is None:
485 raise Exception("WPS registrar operation timed out")
486 ev = hapd.wait_event(["WPS-NEW-AP-SETTINGS"], timeout=15)
487 if ev is None:
488 raise Exception("WPS configuration timed out")
489 if "1026" not in ev:
490 raise Exception("AP Settings missing from event")
491 hapd.request("SET wps_cred_processing 0")
492 if "FAIL" in hapd.request("WPS_CONFIG " + new_ssid.encode("hex") + " WPA2PSK CCMP " + new_passphrase.encode("hex")):
493 raise Exception("WPS_CONFIG command failed")
494 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
495 if ev is None:
496 raise Exception("Association with the AP timed out")
497
eeefe187
JM
498def test_ap_wps_reg_config_tkip(dev, apdev):
499 """WPS registrar configuring AP to use TKIP and AP upgrading to TKIP+CCMP"""
500 ssid = "test-wps-init-ap"
501 appin = "12345670"
502 hostapd.add_ap(apdev[0]['ifname'],
503 { "ssid": ssid, "eap_server": "1", "wps_state": "1",
504 "ap_pin": appin})
505 logger.info("WPS configuration step")
eeefe187
JM
506 dev[0].request("SET wps_version_number 0x10")
507 dev[0].dump_monitor()
508 new_ssid = "wps-new-ssid-with-tkip"
509 new_passphrase = "1234567890"
510 dev[0].wps_reg(apdev[0]['bssid'], appin, new_ssid, "WPAPSK", "TKIP",
511 new_passphrase)
512 logger.info("Re-connect to verify WPA2 mixed mode")
513 dev[0].request("DISCONNECT")
514 id = 0
515 dev[0].set_network(id, "pairwise", "CCMP")
516 dev[0].set_network(id, "proto", "RSN")
517 dev[0].connect_network(id)
518 status = dev[0].get_status()
519 if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
520 raise Exception("Not fully connected")
521 if status['ssid'] != new_ssid:
522 raise Exception("Unexpected SSID")
523 if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'TKIP':
524 raise Exception("Unexpected encryption configuration")
525 if status['key_mgmt'] != 'WPA2-PSK':
526 raise Exception("Unexpected key_mgmt")
527
6645ff50
JM
528def test_ap_wps_setup_locked(dev, apdev):
529 """WPS registrar locking up AP setup on AP PIN failures"""
530 ssid = "test-wps-incorrect-ap-pin"
531 appin = "12345670"
532 hostapd.add_ap(apdev[0]['ifname'],
533 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
534 "wpa_passphrase": "12345678", "wpa": "2",
535 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
536 "ap_pin": appin})
6645ff50
JM
537 new_ssid = "wps-new-ssid-test"
538 new_passphrase = "1234567890"
539
540 ap_setup_locked=False
541 for pin in ["55554444", "1234", "12345678", "00000000", "11111111"]:
542 dev[0].dump_monitor()
543 logger.info("Try incorrect AP PIN - attempt " + pin)
544 dev[0].wps_reg(apdev[0]['bssid'], pin, new_ssid, "WPA2PSK",
545 "CCMP", new_passphrase, no_wait=True)
546 ev = dev[0].wait_event(["WPS-FAIL", "CTRL-EVENT-CONNECTED"])
547 if ev is None:
548 raise Exception("Timeout on receiving WPS operation failure event")
549 if "CTRL-EVENT-CONNECTED" in ev:
550 raise Exception("Unexpected connection")
551 if "config_error=15" in ev:
552 logger.info("AP Setup Locked")
553 ap_setup_locked=True
554 elif "config_error=18" not in ev:
555 raise Exception("config_error=18 not reported")
556 ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
557 if ev is None:
558 raise Exception("Timeout on disconnection event")
559 time.sleep(0.1)
560 if not ap_setup_locked:
561 raise Exception("AP setup was not locked")
562
d671a420
JM
563 hapd = hostapd.Hostapd(apdev[0]['ifname'])
564 status = hapd.request("WPS_GET_STATUS")
565 if "Last WPS result: Failed" not in status:
566 raise Exception("WPS failure result not shown correctly")
567 if "Peer Address: " + dev[0].p2p_interface_addr() not in status:
568 raise Exception("Peer address not shown correctly")
569
6645ff50
JM
570 time.sleep(0.5)
571 dev[0].dump_monitor()
572 logger.info("WPS provisioning step")
573 pin = dev[0].wps_read_pin()
574 hapd = hostapd.Hostapd(apdev[0]['ifname'])
575 hapd.request("WPS_PIN any " + pin)
576 dev[0].request("WPS_PIN any " + pin)
577 ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=30)
578 if ev is None:
579 raise Exception("WPS success was not reported")
580 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
581 if ev is None:
582 raise Exception("Association with the AP timed out")
583
c1cec68b
JM
584 appin = hapd.request("WPS_AP_PIN random")
585 if "FAIL" in appin:
586 raise Exception("Could not generate random AP PIN")
587 ev = hapd.wait_event(["WPS-AP-SETUP-UNLOCKED"], timeout=10)
588 if ev is None:
589 raise Exception("Failed to unlock AP PIN")
590
33c9b8d8
JM
591def test_ap_wps_setup_locked_timeout(dev, apdev):
592 """WPS re-enabling AP PIN after timeout"""
593 ssid = "test-wps-incorrect-ap-pin"
594 appin = "12345670"
595 hostapd.add_ap(apdev[0]['ifname'],
596 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
597 "wpa_passphrase": "12345678", "wpa": "2",
598 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
599 "ap_pin": appin})
600 new_ssid = "wps-new-ssid-test"
601 new_passphrase = "1234567890"
602
603 ap_setup_locked=False
604 for pin in ["55554444", "1234", "12345678", "00000000", "11111111"]:
605 dev[0].dump_monitor()
606 logger.info("Try incorrect AP PIN - attempt " + pin)
607 dev[0].wps_reg(apdev[0]['bssid'], pin, new_ssid, "WPA2PSK",
608 "CCMP", new_passphrase, no_wait=True)
609 ev = dev[0].wait_event(["WPS-FAIL", "CTRL-EVENT-CONNECTED"])
610 if ev is None:
611 raise Exception("Timeout on receiving WPS operation failure event")
612 if "CTRL-EVENT-CONNECTED" in ev:
613 raise Exception("Unexpected connection")
614 if "config_error=15" in ev:
615 logger.info("AP Setup Locked")
616 ap_setup_locked=True
617 break
618 elif "config_error=18" not in ev:
619 raise Exception("config_error=18 not reported")
620 ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"])
621 if ev is None:
622 raise Exception("Timeout on disconnection event")
623 time.sleep(0.1)
624 if not ap_setup_locked:
625 raise Exception("AP setup was not locked")
626 hapd = hostapd.Hostapd(apdev[0]['ifname'])
627 ev = hapd.wait_event(["WPS-AP-SETUP-UNLOCKED"], timeout=80)
628 if ev is None:
629 raise Exception("AP PIN did not get unlocked on 60 second timeout")
630
ae3ad328 631def test_ap_wps_pbc_overlap_2ap(dev, apdev):
302b7a1b 632 """WPS PBC session overlap with two active APs"""
ae3ad328 633 hostapd.add_ap(apdev[0]['ifname'],
302b7a1b
JM
634 { "ssid": "wps1", "eap_server": "1", "wps_state": "2",
635 "wpa_passphrase": "12345678", "wpa": "2",
636 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
637 "wps_independent": "1"})
ae3ad328 638 hostapd.add_ap(apdev[1]['ifname'],
302b7a1b
JM
639 { "ssid": "wps2", "eap_server": "1", "wps_state": "2",
640 "wpa_passphrase": "123456789", "wpa": "2",
641 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
642 "wps_independent": "1"})
ae3ad328 643 hapd = hostapd.Hostapd(apdev[0]['ifname'])
302b7a1b 644 hapd.request("WPS_PBC")
ae3ad328 645 hapd2 = hostapd.Hostapd(apdev[1]['ifname'])
302b7a1b
JM
646 hapd2.request("WPS_PBC")
647 logger.info("WPS provisioning step")
648 dev[0].dump_monitor()
649 dev[0].request("WPS_PBC")
650 ev = dev[0].wait_event(["WPS-OVERLAP-DETECTED"], timeout=15)
651 if ev is None:
652 raise Exception("PBC session overlap not detected")
653
ae3ad328 654def test_ap_wps_pbc_overlap_2sta(dev, apdev):
302b7a1b
JM
655 """WPS PBC session overlap with two active STAs"""
656 ssid = "test-wps-pbc-overlap"
ae3ad328 657 hostapd.add_ap(apdev[0]['ifname'],
302b7a1b
JM
658 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
659 "wpa_passphrase": "12345678", "wpa": "2",
660 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
ae3ad328 661 hapd = hostapd.Hostapd(apdev[0]['ifname'])
302b7a1b
JM
662 logger.info("WPS provisioning step")
663 hapd.request("WPS_PBC")
302b7a1b
JM
664 dev[0].dump_monitor()
665 dev[1].dump_monitor()
666 dev[0].request("WPS_PBC")
667 dev[1].request("WPS_PBC")
668 ev = dev[0].wait_event(["WPS-M2D"], timeout=15)
669 if ev is None:
670 raise Exception("PBC session overlap not detected (dev0)")
671 if "config_error=12" not in ev:
672 raise Exception("PBC session overlap not correctly reported (dev0)")
673 ev = dev[1].wait_event(["WPS-M2D"], timeout=15)
674 if ev is None:
675 raise Exception("PBC session overlap not detected (dev1)")
676 if "config_error=12" not in ev:
677 raise Exception("PBC session overlap not correctly reported (dev1)")
11e7eeba
JM
678 hapd.request("WPS_CANCEL")
679 ret = hapd.request("WPS_PBC")
680 if "FAIL" not in ret:
681 raise Exception("PBC mode allowed to be started while PBC overlap still active")
6edaee9c 682
71afe834
JM
683def test_ap_wps_cancel(dev, apdev):
684 """WPS AP cancelling enabled config method"""
685 ssid = "test-wps-ap-cancel"
686 hostapd.add_ap(apdev[0]['ifname'],
687 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
688 "wpa_passphrase": "12345678", "wpa": "2",
689 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" })
690 bssid = apdev[0]['bssid']
691 hapd = hostapd.Hostapd(apdev[0]['ifname'])
692
693 logger.info("Verify PBC enable/cancel")
694 hapd.request("WPS_PBC")
71afe834
JM
695 dev[0].scan(freq="2412")
696 bss = dev[0].get_bss(apdev[0]['bssid'])
697 if "[WPS-PBC]" not in bss['flags']:
698 raise Exception("WPS-PBC flag missing")
699 if "FAIL" in hapd.request("WPS_CANCEL"):
700 raise Exception("WPS_CANCEL failed")
701 dev[0].scan(freq="2412")
702 bss = dev[0].get_bss(apdev[0]['bssid'])
703 if "[WPS-PBC]" in bss['flags']:
704 raise Exception("WPS-PBC flag not cleared")
705
706 logger.info("Verify PIN enable/cancel")
707 hapd.request("WPS_PIN any 12345670")
708 dev[0].scan(freq="2412")
709 bss = dev[0].get_bss(apdev[0]['bssid'])
710 if "[WPS-AUTH]" not in bss['flags']:
711 raise Exception("WPS-AUTH flag missing")
712 if "FAIL" in hapd.request("WPS_CANCEL"):
713 raise Exception("WPS_CANCEL failed")
714 dev[0].scan(freq="2412")
715 bss = dev[0].get_bss(apdev[0]['bssid'])
716 if "[WPS-AUTH]" in bss['flags']:
717 raise Exception("WPS-AUTH flag not cleared")
718
6edaee9c
JM
719def test_ap_wps_er_add_enrollee(dev, apdev):
720 """WPS ER configuring AP and adding a new enrollee using PIN"""
721 ssid = "wps-er-add-enrollee"
722 ap_pin = "12345670"
723 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
724 hostapd.add_ap(apdev[0]['ifname'],
725 { "ssid": ssid, "eap_server": "1", "wps_state": "1",
726 "device_name": "Wireless AP", "manufacturer": "Company",
727 "model_name": "WAP", "model_number": "123",
728 "serial_number": "12345", "device_type": "6-0050F204-1",
729 "os_version": "01020300",
730 "config_methods": "label push_button",
731 "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"})
732 logger.info("WPS configuration step")
733 new_passphrase = "1234567890"
734 dev[0].dump_monitor()
6edaee9c
JM
735 dev[0].wps_reg(apdev[0]['bssid'], ap_pin, ssid, "WPA2PSK", "CCMP",
736 new_passphrase)
737 status = dev[0].get_status()
738 if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
739 raise Exception("Not fully connected")
740 if status['ssid'] != ssid:
741 raise Exception("Unexpected SSID")
742 if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'CCMP':
743 raise Exception("Unexpected encryption configuration")
744 if status['key_mgmt'] != 'WPA2-PSK':
745 raise Exception("Unexpected key_mgmt")
746
747 logger.info("Start ER")
748 dev[0].request("WPS_ER_START ifname=lo")
749 ev = dev[0].wait_event(["WPS-ER-AP-ADD"], timeout=15)
750 if ev is None:
751 raise Exception("AP discovery timed out")
752 if ap_uuid not in ev:
753 raise Exception("Expected AP UUID not found")
754
755 logger.info("Learn AP configuration through UPnP")
756 dev[0].dump_monitor()
757 dev[0].request("WPS_ER_LEARN " + ap_uuid + " " + ap_pin)
758 ev = dev[0].wait_event(["WPS-ER-AP-SETTINGS"], timeout=15)
759 if ev is None:
760 raise Exception("AP learn timed out")
761 if ap_uuid not in ev:
762 raise Exception("Expected AP UUID not in settings")
763 if "ssid=" + ssid not in ev:
764 raise Exception("Expected SSID not in settings")
765 if "key=" + new_passphrase not in ev:
766 raise Exception("Expected passphrase not in settings")
767
768 logger.info("Add Enrollee using ER")
769 pin = dev[1].wps_read_pin()
770 dev[0].dump_monitor()
771 dev[0].request("WPS_ER_PIN any " + pin + " " + dev[1].p2p_interface_addr())
6edaee9c
JM
772 dev[1].dump_monitor()
773 dev[1].request("WPS_PIN any " + pin)
846be889 774 ev = dev[1].wait_event(["WPS-SUCCESS"], timeout=30)
6edaee9c
JM
775 if ev is None:
776 raise Exception("Enrollee did not report success")
777 ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
778 if ev is None:
779 raise Exception("Association with the AP timed out")
780 ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15)
781 if ev is None:
782 raise Exception("WPS ER did not report success")
783 hwsim_utils.test_connectivity_sta(dev[0], dev[1])
784
11c26f1b
JM
785 logger.info("Add a specific Enrollee using ER")
786 pin = dev[2].wps_read_pin()
787 addr2 = dev[2].p2p_interface_addr()
788 dev[0].dump_monitor()
789 dev[2].dump_monitor()
790 dev[2].request("WPS_PIN any " + pin)
791 ev = dev[0].wait_event(["WPS-ER-ENROLLEE-ADD"], timeout=10)
792 if ev is None:
793 raise Exception("Enrollee not seen")
794 if addr2 not in ev:
795 raise Exception("Unexpected Enrollee MAC address")
796 dev[0].request("WPS_ER_PIN " + addr2 + " " + pin + " " + addr2)
797 ev = dev[2].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
798 if ev is None:
799 raise Exception("Association with the AP timed out")
800 ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15)
801 if ev is None:
802 raise Exception("WPS ER did not report success")
803
38ae43de
JM
804 logger.info("Verify registrar selection behavior")
805 dev[0].request("WPS_ER_PIN any " + pin + " " + dev[1].p2p_interface_addr())
806 dev[1].request("DISCONNECT")
807 dev[1].wait_event(["CTRL-EVENT-DISCONNECTED"])
808 dev[1].scan(freq="2412")
809 bss = dev[1].get_bss(apdev[0]['bssid'])
810 if "[WPS-AUTH]" not in bss['flags']:
811 raise Exception("WPS-AUTH flag missing")
812
813 logger.info("Stop ER")
814 dev[0].dump_monitor()
815 dev[0].request("WPS_ER_STOP")
816 ev = dev[0].wait_event(["WPS-ER-AP-REMOVE"])
817 if ev is None:
818 raise Exception("WPS ER unsubscription timed out")
8697cbc0
JM
819 # It takes some time for the UPnP UNSUBSCRIBE command to go through, so wait
820 # a bit before verifying that the scan results have change.
821 time.sleep(0.2)
38ae43de
JM
822
823 dev[1].scan(freq="2412")
824 bss = dev[1].get_bss(apdev[0]['bssid'])
825 if "[WPS-AUTH]" in bss['flags']:
826 raise Exception("WPS-AUTH flag not removed")
827
6edaee9c
JM
828def test_ap_wps_er_add_enrollee_pbc(dev, apdev):
829 """WPS ER connected to AP and adding a new enrollee using PBC"""
830 ssid = "wps-er-add-enrollee-pbc"
831 ap_pin = "12345670"
832 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
833 hostapd.add_ap(apdev[0]['ifname'],
834 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
835 "wpa_passphrase": "12345678", "wpa": "2",
836 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
837 "device_name": "Wireless AP", "manufacturer": "Company",
838 "model_name": "WAP", "model_number": "123",
839 "serial_number": "12345", "device_type": "6-0050F204-1",
840 "os_version": "01020300",
841 "config_methods": "label push_button",
842 "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"})
843 logger.info("Learn AP configuration")
844 dev[0].dump_monitor()
6edaee9c
JM
845 dev[0].wps_reg(apdev[0]['bssid'], ap_pin)
846 status = dev[0].get_status()
847 if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
848 raise Exception("Not fully connected")
849
850 logger.info("Start ER")
851 dev[0].request("WPS_ER_START ifname=lo")
852 ev = dev[0].wait_event(["WPS-ER-AP-ADD"], timeout=15)
853 if ev is None:
854 raise Exception("AP discovery timed out")
855 if ap_uuid not in ev:
856 raise Exception("Expected AP UUID not found")
857
858 logger.info("Use learned network configuration on ER")
859 dev[0].request("WPS_ER_SET_CONFIG " + ap_uuid + " 0")
860
861 logger.info("Add Enrollee using ER and PBC")
862 dev[0].dump_monitor()
863 enrollee = dev[1].p2p_interface_addr()
6edaee9c
JM
864 dev[1].dump_monitor()
865 dev[1].request("WPS_PBC")
866
8674c022
JM
867 for i in range(0, 2):
868 ev = dev[0].wait_event(["WPS-ER-ENROLLEE-ADD"], timeout=15)
869 if ev is None:
870 raise Exception("Enrollee discovery timed out")
871 if enrollee in ev:
872 break
873 if i == 1:
874 raise Exception("Expected Enrollee not found")
6edaee9c
JM
875 dev[0].request("WPS_ER_PBC " + enrollee)
876
877 ev = dev[1].wait_event(["WPS-SUCCESS"], timeout=15)
878 if ev is None:
879 raise Exception("Enrollee did not report success")
880 ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
881 if ev is None:
882 raise Exception("Association with the AP timed out")
883 ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15)
884 if ev is None:
885 raise Exception("WPS ER did not report success")
886 hwsim_utils.test_connectivity_sta(dev[0], dev[1])
bff3ac5b 887
800bcf4e
JM
888 # verify BSSID selection of the AP instead of UUID
889 if "FAIL" in dev[0].request("WPS_ER_SET_CONFIG " + apdev[0]['bssid'] + " 0"):
890 raise Exception("Could not select AP based on BSSID")
891
1f020f5e
JM
892def test_ap_wps_er_v10_add_enrollee_pin(dev, apdev):
893 """WPS v1.0 ER connected to AP and adding a new enrollee using PIN"""
894 ssid = "wps-er-add-enrollee-pbc"
895 ap_pin = "12345670"
896 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
897 hostapd.add_ap(apdev[0]['ifname'],
898 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
899 "wpa_passphrase": "12345678", "wpa": "2",
900 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
901 "device_name": "Wireless AP", "manufacturer": "Company",
902 "model_name": "WAP", "model_number": "123",
903 "serial_number": "12345", "device_type": "6-0050F204-1",
904 "os_version": "01020300",
905 "config_methods": "label push_button",
906 "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"})
907 logger.info("Learn AP configuration")
908 dev[0].request("SET wps_version_number 0x10")
909 dev[0].dump_monitor()
910 dev[0].wps_reg(apdev[0]['bssid'], ap_pin)
911 status = dev[0].get_status()
912 if status['wpa_state'] != 'COMPLETED' or status['bssid'] != apdev[0]['bssid']:
913 raise Exception("Not fully connected")
914
915 logger.info("Start ER")
916 dev[0].request("WPS_ER_START ifname=lo")
917 ev = dev[0].wait_event(["WPS-ER-AP-ADD"], timeout=15)
918 if ev is None:
919 raise Exception("AP discovery timed out")
920 if ap_uuid not in ev:
921 raise Exception("Expected AP UUID not found")
922
923 logger.info("Use learned network configuration on ER")
924 dev[0].request("WPS_ER_SET_CONFIG " + ap_uuid + " 0")
925
926 logger.info("Add Enrollee using ER and PIN")
927 enrollee = dev[1].p2p_interface_addr()
928 pin = dev[1].wps_read_pin()
929 dev[0].dump_monitor()
930 dev[0].request("WPS_ER_PIN any " + pin + " " + enrollee)
931 dev[1].dump_monitor()
932 dev[1].request("WPS_PIN any " + pin)
933 ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"], timeout=15)
934 if ev is None:
935 raise Exception("Association with the AP timed out")
936 ev = dev[0].wait_event(["WPS-SUCCESS"], timeout=15)
937 if ev is None:
938 raise Exception("WPS ER did not report success")
939
be923570
JM
940def test_ap_wps_er_config_ap(dev, apdev):
941 """WPS ER configuring AP over UPnP"""
942 ssid = "wps-er-ap-config"
943 ap_pin = "12345670"
944 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
945 hostapd.add_ap(apdev[0]['ifname'],
946 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
947 "wpa_passphrase": "12345678", "wpa": "2",
948 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
949 "device_name": "Wireless AP", "manufacturer": "Company",
950 "model_name": "WAP", "model_number": "123",
951 "serial_number": "12345", "device_type": "6-0050F204-1",
952 "os_version": "01020300",
953 "config_methods": "label push_button",
954 "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo"})
955
956 logger.info("Connect ER to the AP")
957 dev[0].connect(ssid, psk="12345678", scan_freq="2412")
958
959 logger.info("WPS configuration step")
960 dev[0].request("WPS_ER_START ifname=lo")
961 ev = dev[0].wait_event(["WPS-ER-AP-ADD"], timeout=15)
962 if ev is None:
963 raise Exception("AP discovery timed out")
964 if ap_uuid not in ev:
965 raise Exception("Expected AP UUID not found")
966 new_passphrase = "1234567890"
967 dev[0].request("WPS_ER_CONFIG " + apdev[0]['bssid'] + " " + ap_pin + " " +
968 ssid.encode("hex") + " WPA2PSK CCMP " +
969 new_passphrase.encode("hex"))
970 ev = dev[0].wait_event(["WPS-SUCCESS"])
971 if ev is None:
972 raise Exception("WPS ER configuration operation timed out")
973 dev[1].wait_event(["CTRL-EVENT-DISCONNECTED"])
974 dev[0].connect(ssid, psk="1234567890", scan_freq="2412")
975
bff3ac5b
JM
976def test_ap_wps_fragmentation(dev, apdev):
977 """WPS with fragmentation in EAP-WSC and mixed mode WPA+WPA2"""
978 ssid = "test-wps-fragmentation"
979 hostapd.add_ap(apdev[0]['ifname'],
980 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
981 "wpa_passphrase": "12345678", "wpa": "3",
982 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
983 "wpa_pairwise": "TKIP",
984 "fragment_size": "50" })
985 hapd = hostapd.Hostapd(apdev[0]['ifname'])
986 logger.info("WPS provisioning step")
987 hapd.request("WPS_PBC")
bff3ac5b
JM
988 dev[0].dump_monitor()
989 dev[0].request("SET wps_fragment_size 50")
990 dev[0].request("WPS_PBC")
991 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
992 if ev is None:
993 raise Exception("Association with the AP timed out")
994 status = dev[0].get_status()
995 if status['wpa_state'] != 'COMPLETED':
996 raise Exception("Not fully connected")
997 if status['pairwise_cipher'] != 'CCMP' or status['group_cipher'] != 'TKIP':
998 raise Exception("Unexpected encryption configuration")
999 if status['key_mgmt'] != 'WPA2-PSK':
1000 raise Exception("Unexpected key_mgmt")
10ea6848
JM
1001
1002def test_ap_wps_new_version_sta(dev, apdev):
1003 """WPS compatibility with new version number on the station"""
1004 ssid = "test-wps-ver"
1005 hostapd.add_ap(apdev[0]['ifname'],
1006 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1007 "wpa_passphrase": "12345678", "wpa": "2",
1008 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" })
1009 hapd = hostapd.Hostapd(apdev[0]['ifname'])
1010 logger.info("WPS provisioning step")
1011 hapd.request("WPS_PBC")
10ea6848
JM
1012 dev[0].dump_monitor()
1013 dev[0].request("SET wps_version_number 0x43")
dccafedb 1014 dev[0].request("SET wps_vendor_ext_m1 000137100100020001")
10ea6848
JM
1015 dev[0].request("WPS_PBC")
1016 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
1017 if ev is None:
1018 raise Exception("Association with the AP timed out")
1019
1020def test_ap_wps_new_version_ap(dev, apdev):
1021 """WPS compatibility with new version number on the AP"""
1022 ssid = "test-wps-ver"
1023 hostapd.add_ap(apdev[0]['ifname'],
1024 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1025 "wpa_passphrase": "12345678", "wpa": "2",
1026 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" })
1027 hapd = hostapd.Hostapd(apdev[0]['ifname'])
1028 logger.info("WPS provisioning step")
1029 if "FAIL" in hapd.request("SET wps_version_number 0x43"):
1030 raise Exception("Failed to enable test functionality")
1031 hapd.request("WPS_PBC")
10ea6848
JM
1032 dev[0].dump_monitor()
1033 dev[0].request("WPS_PBC")
1034 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
1035 hapd.request("SET wps_version_number 0x20")
1036 if ev is None:
1037 raise Exception("Association with the AP timed out")
3bdf7d7f
JM
1038
1039def test_ap_wps_check_pin(dev, apdev):
1040 """Verify PIN checking through control interface"""
1041 hostapd.add_ap(apdev[0]['ifname'],
1042 { "ssid": "wps", "eap_server": "1", "wps_state": "2",
1043 "wpa_passphrase": "12345678", "wpa": "2",
1044 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP" })
1045 hapd = hostapd.Hostapd(apdev[0]['ifname'])
1046 for t in [ ("12345670", "12345670"),
1047 ("12345678", "FAIL-CHECKSUM"),
1048 ("1234-5670", "12345670"),
1049 ("1234 5670", "12345670"),
1050 ("1-2.3:4 5670", "12345670") ]:
1051 res = hapd.request("WPS_CHECK_PIN " + t[0]).rstrip('\n')
1052 res2 = dev[0].request("WPS_CHECK_PIN " + t[0]).rstrip('\n')
1053 if res != res2:
1054 raise Exception("Unexpected difference in WPS_CHECK_PIN responses")
1055 if res != t[1]:
1056 raise Exception("Incorrect WPS_CHECK_PIN response {} (expected {})".format(res, t[1]))
9ba1fcb0 1057
ac786d67
JM
1058 if "FAIL" not in hapd.request("WPS_CHECK_PIN 12345"):
1059 raise Exception("Unexpected WPS_CHECK_PIN success")
1060 if "FAIL" not in hapd.request("WPS_CHECK_PIN 123456789"):
1061 raise Exception("Unexpected WPS_CHECK_PIN success")
1062
9ba1fcb0
JM
1063def test_ap_wps_wep_config(dev, apdev):
1064 """WPS 2.0 AP rejecting WEP configuration"""
1065 ssid = "test-wps-config"
1066 appin = "12345670"
1067 hostapd.add_ap(apdev[0]['ifname'],
1068 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1069 "ap_pin": appin})
1070 hapd = hostapd.Hostapd(apdev[0]['ifname'])
1071 dev[0].wps_reg(apdev[0]['bssid'], appin, "wps-new-ssid-wep", "OPEN", "WEP",
1072 "hello", no_wait=True)
1073 ev = hapd.wait_event(["WPS-FAIL"], timeout=15)
1074 if ev is None:
1075 raise Exception("WPS-FAIL timed out")
1076 if "reason=2" not in ev:
1077 raise Exception("Unexpected reason code in WPS-FAIL")
1078 status = hapd.request("WPS_GET_STATUS")
1079 if "Last WPS result: Failed" not in status:
1080 raise Exception("WPS failure result not shown correctly")
1081 if "Failure Reason: WEP Prohibited" not in status:
1082 raise Exception("Failure reason not reported correctly")
1083 if "Peer Address: " + dev[0].p2p_interface_addr() not in status:
1084 raise Exception("Peer address not shown correctly")
1013a576 1085
11d78bb1
JM
1086def test_ap_wps_wep_enroll(dev, apdev):
1087 """WPS 2.0 STA rejecting WEP configuration"""
1088 ssid = "test-wps-wep"
1089 hostapd.add_ap(apdev[0]['ifname'],
1090 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1091 "skip_cred_build": "1", "extra_cred": "wps-wep-cred" })
1092 hapd = hostapd.Hostapd(apdev[0]['ifname'])
1093 hapd.request("WPS_PBC")
1094 dev[0].request("WPS_PBC")
1095 ev = dev[0].wait_event(["WPS-FAIL"], timeout=15)
1096 if ev is None:
1097 raise Exception("WPS-FAIL event timed out")
1098 if "msg=12" not in ev or "reason=2 (WEP Prohibited)" not in ev:
1099 raise Exception("Unexpected WPS-FAIL event: " + ev)
1100
1013a576
JM
1101def test_ap_wps_ie_fragmentation(dev, apdev):
1102 """WPS AP using fragmented WPS IE"""
1103 ssid = "test-wps-ie-fragmentation"
1104 params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1105 "wpa_passphrase": "12345678", "wpa": "2",
1106 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
1107 "device_name": "1234567890abcdef1234567890abcdef",
1108 "manufacturer": "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef",
1109 "model_name": "1234567890abcdef1234567890abcdef",
1110 "model_number": "1234567890abcdef1234567890abcdef",
1111 "serial_number": "1234567890abcdef1234567890abcdef" }
1112 hostapd.add_ap(apdev[0]['ifname'], params)
1113 hapd = hostapd.Hostapd(apdev[0]['ifname'])
1114 hapd.request("WPS_PBC")
1115 dev[0].request("WPS_PBC")
1116 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
1117 if ev is None:
1118 raise Exception("Association with the AP timed out")
1119 bss = dev[0].get_bss(apdev[0]['bssid'])
1120 if "wps_device_name" not in bss or bss['wps_device_name'] != "1234567890abcdef1234567890abcdef":
d7a68ad6 1121 logger.info(bss)
1013a576
JM
1122 raise Exception("Device Name not received correctly")
1123 if len(re.findall("dd..0050f204", bss['ie'])) != 2:
1124 raise Exception("Unexpected number of WPS IEs")
44ff0400 1125
2035b170
JM
1126def get_psk(pskfile):
1127 psks = {}
1128 with open(pskfile, "r") as f:
1129 lines = f.read().splitlines()
1130 for l in lines:
1131 if l == "# WPA PSKs":
1132 continue
1133 (addr,psk) = l.split(' ')
1134 psks[addr] = psk
1135 return psks
1136
1137def test_ap_wps_per_station_psk(dev, apdev):
1138 """WPS PBC provisioning with per-station PSK"""
1139 addr0 = dev[0].p2p_dev_addr()
1140 addr1 = dev[1].p2p_dev_addr()
1141 addr2 = dev[2].p2p_dev_addr()
1142 ssid = "wps"
1143 appin = "12345670"
1144 pskfile = "/tmp/ap_wps_per_enrollee_psk.psk_file"
1145 try:
1146 os.remove(pskfile)
1147 except:
1148 pass
1149
1150 try:
1151 with open(pskfile, "w") as f:
1152 f.write("# WPA PSKs\n")
1153
1154 params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1155 "wpa": "2", "wpa_key_mgmt": "WPA-PSK",
1156 "rsn_pairwise": "CCMP", "ap_pin": appin,
1157 "wpa_psk_file": pskfile }
1158 hapd = hostapd.add_ap(apdev[0]['ifname'], params)
1159
1160 logger.info("First enrollee")
1161 hapd.request("WPS_PBC")
1162 dev[0].request("WPS_PBC")
1163 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"])
1164 if ev is None:
1165 raise Exception("Association with the AP timed out (1)")
1166
1167 logger.info("Second enrollee")
1168 hapd.request("WPS_PBC")
1169 dev[1].request("WPS_PBC")
1170 ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"])
1171 if ev is None:
1172 raise Exception("Association with the AP timed out (2)")
1173
1174 logger.info("External registrar")
1175 dev[2].wps_reg(apdev[0]['bssid'], appin)
1176
1177 logger.info("Verifying PSK results")
1178 psks = get_psk(pskfile)
1179 if addr0 not in psks:
1180 raise Exception("No PSK recorded for sta0")
1181 if addr1 not in psks:
1182 raise Exception("No PSK recorded for sta1")
1183 if addr2 not in psks:
1184 raise Exception("No PSK recorded for sta2")
1185 if psks[addr0] == psks[addr1]:
1186 raise Exception("Same PSK recorded for sta0 and sta1")
1187 if psks[addr0] == psks[addr2]:
1188 raise Exception("Same PSK recorded for sta0 and sta2")
1189 if psks[addr1] == psks[addr2]:
1190 raise Exception("Same PSK recorded for sta1 and sta2")
1191
1192 dev[0].request("REMOVE_NETWORK all")
1193 logger.info("Second external registrar")
1194 dev[0].wps_reg(apdev[0]['bssid'], appin)
1195 psks2 = get_psk(pskfile)
1196 if addr0 not in psks2:
1197 raise Exception("No PSK recorded for sta0(reg)")
1198 if psks[addr0] == psks2[addr0]:
1199 raise Exception("Same PSK recorded for sta0(enrollee) and sta0(reg)")
1200 finally:
1201 os.remove(pskfile)
1202
373cce55
JM
1203def test_ap_wps_per_station_psk_failure(dev, apdev):
1204 """WPS PBC provisioning with per-station PSK (file not writable)"""
1205 addr0 = dev[0].p2p_dev_addr()
1206 addr1 = dev[1].p2p_dev_addr()
1207 addr2 = dev[2].p2p_dev_addr()
1208 ssid = "wps"
1209 appin = "12345670"
1210 pskfile = "/tmp/ap_wps_per_enrollee_psk.psk_file"
1211 try:
1212 os.remove(pskfile)
1213 except:
1214 pass
1215
1216 try:
1217 with open(pskfile, "w") as f:
1218 f.write("# WPA PSKs\n")
1219
1220 params = { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1221 "wpa": "2", "wpa_key_mgmt": "WPA-PSK",
1222 "rsn_pairwise": "CCMP", "ap_pin": appin,
1223 "wpa_psk_file": pskfile }
1224 hapd = hostapd.add_ap(apdev[0]['ifname'], params)
1225 if "FAIL" in hapd.request("SET wpa_psk_file /tmp/does/not/exists/ap_wps_per_enrollee_psk_failure.psk_file"):
1226 raise Exception("Failed to set wpa_psk_file")
1227
1228 logger.info("First enrollee")
1229 hapd.request("WPS_PBC")
1230 dev[0].request("WPS_PBC")
1231 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"])
1232 if ev is None:
1233 raise Exception("Association with the AP timed out (1)")
1234
1235 logger.info("Second enrollee")
1236 hapd.request("WPS_PBC")
1237 dev[1].request("WPS_PBC")
1238 ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"])
1239 if ev is None:
1240 raise Exception("Association with the AP timed out (2)")
1241
1242 logger.info("External registrar")
1243 dev[2].wps_reg(apdev[0]['bssid'], appin)
1244
1245 logger.info("Verifying PSK results")
1246 psks = get_psk(pskfile)
1247 if len(psks) > 0:
1248 raise Exception("PSK recorded unexpectedly")
1249 finally:
1250 os.remove(pskfile)
1251
e8518757
JM
1252def test_ap_wps_pin_request_file(dev, apdev):
1253 """WPS PIN provisioning with configured AP"""
1254 ssid = "wps"
1255 pinfile = "/tmp/ap_wps_pin_request_file.log"
1256 if os.path.exists(pinfile):
1257 subprocess.call(['sudo', 'rm', pinfile])
1258 hostapd.add_ap(apdev[0]['ifname'],
1259 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1260 "wps_pin_requests": pinfile,
1261 "wpa_passphrase": "12345678", "wpa": "2",
1262 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP"})
1263 hapd = hostapd.Hostapd(apdev[0]['ifname'])
1264 uuid = dev[0].get_status_field("uuid")
1265 pin = dev[0].wps_read_pin()
1266 try:
1267 dev[0].request("WPS_PIN any " + pin)
1268 ev = hapd.wait_event(["WPS-PIN-NEEDED"], timeout=15)
1269 if ev is None:
1270 raise Exception("PIN needed event not shown")
1271 if uuid not in ev:
1272 raise Exception("UUID mismatch")
1273 dev[0].request("WPS_CANCEL")
1274 success = False
1275 with open(pinfile, "r") as f:
1276 lines = f.readlines()
1277 for l in lines:
1278 if uuid in l:
1279 success = True
1280 break
1281 if not success:
1282 raise Exception("PIN request entry not in the log file")
1283 finally:
1284 subprocess.call(['sudo', 'rm', pinfile])
1285
56887c35
JM
1286def test_ap_wps_auto_setup_with_config_file(dev, apdev):
1287 """WPS auto-setup with configuration file"""
1288 conffile = "/tmp/ap_wps_auto_setup_with_config_file.conf"
1289 ifname = apdev[0]['ifname']
1290 try:
1291 with open(conffile, "w") as f:
1292 f.write("driver=nl80211\n")
1293 f.write("hw_mode=g\n")
1294 f.write("channel=1\n")
1295 f.write("ieee80211n=1\n")
1296 f.write("interface=%s\n" % ifname)
1297 f.write("ctrl_interface=/var/run/hostapd\n")
1298 f.write("ssid=wps\n")
1299 f.write("eap_server=1\n")
1300 f.write("wps_state=1\n")
1301 hostapd.add_bss('phy3', ifname, conffile)
1302 hapd = hostapd.Hostapd(ifname)
1303 hapd.request("WPS_PBC")
1304 dev[0].request("WPS_PBC")
1305 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=30)
1306 if ev is None:
1307 raise Exception("Association with the AP timed out")
1308 with open(conffile, "r") as f:
1309 lines = f.read().splitlines()
1310 vals = dict()
1311 for l in lines:
1312 try:
1313 [name,value] = l.split('=', 1)
1314 vals[name] = value
1315 except ValueError, e:
1316 if "# WPS configuration" in l:
1317 pass
1318 else:
1319 raise Exception("Unexpected configuration line: " + l)
1320 if vals['ieee80211n'] != '1' or vals['wps_state'] != '2' or "WPA-PSK" not in vals['wpa_key_mgmt']:
1321 raise Exception("Incorrect configuration: " + str(vals))
1322 finally:
1323 subprocess.call(['sudo', 'rm', conffile])
1324
44ff0400
JM
1325def add_ssdp_ap(ifname, ap_uuid):
1326 ssid = "wps-ssdp"
1327 ap_pin = "12345670"
1328 hostapd.add_ap(ifname,
1329 { "ssid": ssid, "eap_server": "1", "wps_state": "2",
1330 "wpa_passphrase": "12345678", "wpa": "2",
1331 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
1332 "device_name": "Wireless AP", "manufacturer": "Company",
1333 "model_name": "WAP", "model_number": "123",
1334 "serial_number": "12345", "device_type": "6-0050F204-1",
1335 "os_version": "01020300",
1336 "config_methods": "label push_button",
1337 "ap_pin": ap_pin, "uuid": ap_uuid, "upnp_iface": "lo",
1338 "friendly_name": "WPS Access Point",
1339 "manufacturer_url": "http://www.example.com/",
1340 "model_description": "Wireless Access Point",
1341 "model_url": "http://www.example.com/model/",
1342 "upc": "123456789012" })
1343
1344def ssdp_send(msg, no_recv=False):
1345 socket.setdefaulttimeout(1)
1346 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
1347 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1348 sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
1349 sock.bind(("127.0.0.1", 0))
1350 sock.sendto(msg, ("239.255.255.250", 1900))
1351 if no_recv:
1352 return None
1353 return sock.recv(1000)
1354
1355def ssdp_send_msearch(st):
1356 msg = '\r\n'.join([
1357 'M-SEARCH * HTTP/1.1',
1358 'HOST: 239.255.255.250:1900',
1359 'MX: 1',
1360 'MAN: "ssdp:discover"',
1361 'ST: ' + st,
1362 '', ''])
47c549fd 1363 return ssdp_send(msg)
44ff0400
JM
1364
1365def test_ap_wps_ssdp_msearch(dev, apdev):
1366 """WPS AP and SSDP M-SEARCH messages"""
1367 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1368 add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1369
1370 msg = '\r\n'.join([
1371 'M-SEARCH * HTTP/1.1',
1372 'Host: 239.255.255.250:1900',
1373 'Mx: 1',
1374 'Man: "ssdp:discover"',
1375 'St: urn:schemas-wifialliance-org:device:WFADevice:1',
1376 '', ''])
1377 ssdp_send(msg)
1378
1379 msg = '\r\n'.join([
1380 'M-SEARCH * HTTP/1.1',
1381 'host:\t239.255.255.250:1900\t\t\t\t \t\t',
1382 'mx: \t1\t\t ',
1383 'man: \t \t "ssdp:discover" ',
1384 'st: urn:schemas-wifialliance-org:device:WFADevice:1\t\t',
1385 '', ''])
1386 ssdp_send(msg)
1387
1388 ssdp_send_msearch("ssdp:all")
1389 ssdp_send_msearch("upnp:rootdevice")
1390 ssdp_send_msearch("uuid:" + ap_uuid)
1391 ssdp_send_msearch("urn:schemas-wifialliance-org:service:WFAWLANConfig:1")
1392 ssdp_send_msearch("urn:schemas-wifialliance-org:device:WFADevice:1");
1393
1394 msg = '\r\n'.join([
1395 'M-SEARCH * HTTP/1.1',
1396 'HOST:\t239.255.255.250:1900',
1397 'MAN: "ssdp:discover"',
1398 'MX: 130',
1399 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1400 '', ''])
1401 ssdp_send(msg, no_recv=True)
1402
1403def test_ap_wps_ssdp_invalid_msearch(dev, apdev):
1404 """WPS AP and invalid SSDP M-SEARCH messages"""
1405 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1406 add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1407
1408 socket.setdefaulttimeout(1)
1409 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
1410 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1411 sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
1412 sock.bind(("127.0.0.1", 0))
1413
1414 logger.debug("Missing MX")
1415 msg = '\r\n'.join([
1416 'M-SEARCH * HTTP/1.1',
1417 'HOST: 239.255.255.250:1900',
1418 'MAN: "ssdp:discover"',
1419 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1420 '', ''])
1421 sock.sendto(msg, ("239.255.255.250", 1900))
1422
1423 logger.debug("Negative MX")
1424 msg = '\r\n'.join([
1425 'M-SEARCH * HTTP/1.1',
1426 'HOST: 239.255.255.250:1900',
1427 'MX: -1',
1428 'MAN: "ssdp:discover"',
1429 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1430 '', ''])
1431 sock.sendto(msg, ("239.255.255.250", 1900))
1432
1433 logger.debug("Invalid MX")
1434 msg = '\r\n'.join([
1435 'M-SEARCH * HTTP/1.1',
1436 'HOST: 239.255.255.250:1900',
1437 'MX; 1',
1438 'MAN: "ssdp:discover"',
1439 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1440 '', ''])
1441 sock.sendto(msg, ("239.255.255.250", 1900))
1442
1443 logger.debug("Missing MAN")
1444 msg = '\r\n'.join([
1445 'M-SEARCH * HTTP/1.1',
1446 'HOST: 239.255.255.250:1900',
1447 'MX: 1',
1448 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1449 '', ''])
1450 sock.sendto(msg, ("239.255.255.250", 1900))
1451
1452 logger.debug("Invalid MAN")
1453 msg = '\r\n'.join([
1454 'M-SEARCH * HTTP/1.1',
1455 'HOST: 239.255.255.250:1900',
1456 'MX: 1',
1457 'MAN: foo',
1458 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1459 '', ''])
1460 sock.sendto(msg, ("239.255.255.250", 1900))
1461 msg = '\r\n'.join([
1462 'M-SEARCH * HTTP/1.1',
1463 'HOST: 239.255.255.250:1900',
1464 'MX: 1',
1465 'MAN; "ssdp:discover"',
1466 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1467 '', ''])
1468 sock.sendto(msg, ("239.255.255.250", 1900))
1469
1470 logger.debug("Missing HOST")
1471 msg = '\r\n'.join([
1472 'M-SEARCH * HTTP/1.1',
1473 'MAN: "ssdp:discover"',
1474 'MX: 1',
1475 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1476 '', ''])
1477 sock.sendto(msg, ("239.255.255.250", 1900))
1478
1479 logger.debug("Missing ST")
1480 msg = '\r\n'.join([
1481 'M-SEARCH * HTTP/1.1',
1482 'HOST: 239.255.255.250:1900',
1483 'MAN: "ssdp:discover"',
1484 'MX: 1',
1485 '', ''])
1486 sock.sendto(msg, ("239.255.255.250", 1900))
1487
1488 logger.debug("Mismatching ST")
1489 msg = '\r\n'.join([
1490 'M-SEARCH * HTTP/1.1',
1491 'HOST: 239.255.255.250:1900',
1492 'MAN: "ssdp:discover"',
1493 'MX: 1',
1494 'ST: uuid:16d5f8a9-4ee4-4f5e-81f9-cc6e2f47f42d',
1495 '', ''])
1496 sock.sendto(msg, ("239.255.255.250", 1900))
1497 msg = '\r\n'.join([
1498 'M-SEARCH * HTTP/1.1',
1499 'HOST: 239.255.255.250:1900',
1500 'MAN: "ssdp:discover"',
1501 'MX: 1',
1502 'ST: foo:bar',
1503 '', ''])
1504 sock.sendto(msg, ("239.255.255.250", 1900))
1505 msg = '\r\n'.join([
1506 'M-SEARCH * HTTP/1.1',
1507 'HOST: 239.255.255.250:1900',
1508 'MAN: "ssdp:discover"',
1509 'MX: 1',
1510 'ST: foobar',
1511 '', ''])
1512 sock.sendto(msg, ("239.255.255.250", 1900))
1513
1514 logger.debug("Invalid ST")
1515 msg = '\r\n'.join([
1516 'M-SEARCH * HTTP/1.1',
1517 'HOST: 239.255.255.250:1900',
1518 'MAN: "ssdp:discover"',
1519 'MX: 1',
1520 'ST; urn:schemas-wifialliance-org:device:WFADevice:1',
1521 '', ''])
1522 sock.sendto(msg, ("239.255.255.250", 1900))
1523
1524 logger.debug("Invalid M-SEARCH")
1525 msg = '\r\n'.join([
1526 'M+SEARCH * HTTP/1.1',
1527 'HOST: 239.255.255.250:1900',
1528 'MAN: "ssdp:discover"',
1529 'MX: 1',
1530 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1531 '', ''])
1532 sock.sendto(msg, ("239.255.255.250", 1900))
1533 msg = '\r\n'.join([
1534 'M-SEARCH-* HTTP/1.1',
1535 'HOST: 239.255.255.250:1900',
1536 'MAN: "ssdp:discover"',
1537 'MX: 1',
1538 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1539 '', ''])
1540 sock.sendto(msg, ("239.255.255.250", 1900))
1541
1542 logger.debug("Invalid message format")
1543 sock.sendto("NOTIFY * HTTP/1.1", ("239.255.255.250", 1900))
1544 msg = '\r'.join([
1545 'M-SEARCH * HTTP/1.1',
1546 'HOST: 239.255.255.250:1900',
1547 'MAN: "ssdp:discover"',
1548 'MX: 1',
1549 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1550 '', ''])
1551 sock.sendto(msg, ("239.255.255.250", 1900))
1552
1553 try:
1554 r = sock.recv(1000)
1555 raise Exception("Unexpected M-SEARCH response: " + r)
1556 except socket.timeout:
1557 pass
1558
1559 logger.debug("Valid M-SEARCH")
1560 msg = '\r\n'.join([
1561 'M-SEARCH * HTTP/1.1',
1562 'HOST: 239.255.255.250:1900',
1563 'MAN: "ssdp:discover"',
1564 'MX: 1',
1565 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1566 '', ''])
1567 sock.sendto(msg, ("239.255.255.250", 1900))
1568
1569 try:
1570 r = sock.recv(1000)
1571 pass
1572 except socket.timeout:
1573 raise Exception("No SSDP response")
1574
1575def test_ap_wps_ssdp_burst(dev, apdev):
1576 """WPS AP and SSDP burst"""
1577 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1578 add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1579
1580 msg = '\r\n'.join([
1581 'M-SEARCH * HTTP/1.1',
1582 'HOST: 239.255.255.250:1900',
1583 'MAN: "ssdp:discover"',
1584 'MX: 1',
1585 'ST: urn:schemas-wifialliance-org:device:WFADevice:1',
1586 '', ''])
1587 socket.setdefaulttimeout(1)
1588 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
1589 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1590 sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
1591 sock.bind(("127.0.0.1", 0))
1592 for i in range(0, 25):
1593 sock.sendto(msg, ("239.255.255.250", 1900))
1594 resp = 0
1595 while True:
1596 try:
1597 r = sock.recv(1000)
1598 if not r.startswith("HTTP/1.1 200 OK\r\n"):
1599 raise Exception("Unexpected message: " + r)
1600 resp += 1
1601 except socket.timeout:
1602 break
1603 if resp < 20:
1604 raise Exception("Too few SSDP responses")
1605
1606 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
1607 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1608 sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
1609 sock.bind(("127.0.0.1", 0))
1610 for i in range(0, 25):
1611 sock.sendto(msg, ("239.255.255.250", 1900))
1612 while True:
1613 try:
1614 r = sock.recv(1000)
1615 if ap_uuid in r:
1616 break
1617 except socket.timeout:
1618 raise Exception("No SSDP response")
47c549fd
JM
1619
1620def ssdp_get_location(uuid):
1621 res = ssdp_send_msearch("uuid:" + uuid)
1622 location = None
1623 for l in res.splitlines():
1624 if l.lower().startswith("location:"):
1625 location = l.split(':', 1)[1].strip()
1626 break
1627 if location is None:
1628 raise Exception("No UPnP location found")
1629 return location
1630
1631def upnp_get_urls(location):
1632 conn = urllib.urlopen(location)
1633 tree = ET.parse(conn)
1634 root = tree.getroot()
1635 urn = '{urn:schemas-upnp-org:device-1-0}'
1636 service = root.find("./" + urn + "device/" + urn + "serviceList/" + urn + "service")
1637 res = {}
1638 res['scpd_url'] = urlparse.urljoin(location, service.find(urn + 'SCPDURL').text)
1639 res['control_url'] = urlparse.urljoin(location, service.find(urn + 'controlURL').text)
1640 res['event_sub_url'] = urlparse.urljoin(location, service.find(urn + 'eventSubURL').text)
1641 return res
1642
1643def upnp_soap_action(conn, path, action, include_soap_action=True, soap_action_override=None):
1644 soapns = 'http://schemas.xmlsoap.org/soap/envelope/'
1645 wpsns = 'urn:schemas-wifialliance-org:service:WFAWLANConfig:1'
1646 ET.register_namespace('soapenv', soapns)
1647 ET.register_namespace('wfa', wpsns)
1648 attrib = {}
1649 attrib['{%s}encodingStyle' % soapns] = 'http://schemas.xmlsoap.org/soap/encoding/'
1650 root = ET.Element("{%s}Envelope" % soapns, attrib=attrib)
1651 body = ET.SubElement(root, "{%s}Body" % soapns)
1652 act = ET.SubElement(body, "{%s}%s" % (wpsns, action))
1653 tree = ET.ElementTree(root)
1654 soap = StringIO.StringIO()
1655 tree.write(soap, xml_declaration=True, encoding='utf-8')
1656
1657 headers = { "Content-type": 'text/xml; charset="utf-8"' }
1658 if include_soap_action:
1659 headers["SOAPAction"] = '"urn:schemas-wifialliance-org:service:WFAWLANConfig:1#%s"' % action
1660 elif soap_action_override:
1661 headers["SOAPAction"] = soap_action_override
1662 conn.request("POST", path, soap.getvalue(), headers)
1663 return conn.getresponse()
1664
1665def test_ap_wps_upnp(dev, apdev):
1666 """WPS AP and UPnP operations"""
1667 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1668 add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1669
1670 location = ssdp_get_location(ap_uuid)
1671 urls = upnp_get_urls(location)
1672
1673 conn = urllib.urlopen(urls['scpd_url'])
1674 scpd = conn.read()
1675
1676 conn = urllib.urlopen(urlparse.urljoin(location, "unknown.html"))
1677 if conn.getcode() != 404:
1678 raise Exception("Unexpected HTTP response to GET unknown URL")
1679
1680 url = urlparse.urlparse(location)
1681 conn = httplib.HTTPConnection(url.netloc)
1682 #conn.set_debuglevel(1)
1683 headers = { "Content-type": 'text/xml; charset="utf-8"',
1684 "SOAPAction": '"urn:schemas-wifialliance-org:service:WFAWLANConfig:1#GetDeviceInfo"' }
1685 conn.request("POST", "hello", "\r\n\r\n", headers)
1686 resp = conn.getresponse()
1687 if resp.status != 404:
1688 raise Exception("Unexpected HTTP response: %s" % resp.status)
1689
1690 conn.request("UNKNOWN", "hello", "\r\n\r\n", headers)
1691 resp = conn.getresponse()
1692 if resp.status != 501:
1693 raise Exception("Unexpected HTTP response: %s" % resp.status)
1694
1695 headers = { "Content-type": 'text/xml; charset="utf-8"',
1696 "SOAPAction": '"urn:some-unknown-action#GetDeviceInfo"' }
1697 ctrlurl = urlparse.urlparse(urls['control_url'])
1698 conn.request("POST", ctrlurl.path, "\r\n\r\n", headers)
1699 resp = conn.getresponse()
1700 if resp.status != 401:
1701 raise Exception("Unexpected HTTP response: %s" % resp.status)
1702
1703 logger.debug("GetDeviceInfo without SOAPAction header")
1704 resp = upnp_soap_action(conn, ctrlurl.path, "GetDeviceInfo",
1705 include_soap_action=False)
1706 if resp.status != 401:
1707 raise Exception("Unexpected HTTP response: %s" % resp.status)
1708
1709 logger.debug("GetDeviceInfo with invalid SOAPAction header")
1710 for act in [ "foo",
1711 "urn:schemas-wifialliance-org:service:WFAWLANConfig:1#GetDeviceInfo",
1712 '"urn:schemas-wifialliance-org:service:WFAWLANConfig:1"',
1713 '"urn:schemas-wifialliance-org:service:WFAWLANConfig:123#GetDevice']:
1714 resp = upnp_soap_action(conn, ctrlurl.path, "GetDeviceInfo",
1715 include_soap_action=False,
1716 soap_action_override=act)
1717 if resp.status != 401:
1718 raise Exception("Unexpected HTTP response: %s" % resp.status)
1719
1720 resp = upnp_soap_action(conn, ctrlurl.path, "GetDeviceInfo")
1721 if resp.status != 200:
1722 raise Exception("Unexpected HTTP response: %s" % resp.status)
1723 dev = resp.read()
1724 if "NewDeviceInfo" not in dev:
1725 raise Exception("Unexpected GetDeviceInfo response")
1726
1727 logger.debug("PutMessage without required parameters")
1728 resp = upnp_soap_action(conn, ctrlurl.path, "PutMessage")
1729 if resp.status != 600:
1730 raise Exception("Unexpected HTTP response: %s" % resp.status)
1731
1732 logger.debug("PutWLANResponse without required parameters")
1733 resp = upnp_soap_action(conn, ctrlurl.path, "PutWLANResponse")
1734 if resp.status != 600:
1735 raise Exception("Unexpected HTTP response: %s" % resp.status)
1736
1737 logger.debug("SetSelectedRegistrar from unregistered ER")
1738 resp = upnp_soap_action(conn, ctrlurl.path, "SetSelectedRegistrar")
1739 if resp.status != 501:
1740 raise Exception("Unexpected HTTP response: %s" % resp.status)
1741
1742 logger.debug("Unknown action")
1743 resp = upnp_soap_action(conn, ctrlurl.path, "Unknown")
1744 if resp.status != 401:
1745 raise Exception("Unexpected HTTP response: %s" % resp.status)
1746
1747def test_ap_wps_upnp_subscribe(dev, apdev):
1748 """WPS AP and UPnP event subscription"""
1749 ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
1750 add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
1751
1752 location = ssdp_get_location(ap_uuid)
1753 urls = upnp_get_urls(location)
1754 eventurl = urlparse.urlparse(urls['event_sub_url'])
1755
1756 url = urlparse.urlparse(location)
1757 conn = httplib.HTTPConnection(url.netloc)
1758 #conn.set_debuglevel(1)
1759 headers = { "callback": '<http://127.0.0.1:12345/event>',
1760 "timeout": "Second-1234" }
1761 conn.request("SUBSCRIBE", "hello", "\r\n\r\n", headers)
1762 resp = conn.getresponse()
1763 if resp.status != 412:
1764 raise Exception("Unexpected HTTP response: %s" % resp.status)
1765
1766 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1767 resp = conn.getresponse()
1768 if resp.status != 412:
1769 raise Exception("Unexpected HTTP response: %s" % resp.status)
1770
1771 headers = { "NT": "upnp:event",
1772 "timeout": "Second-1234" }
1773 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1774 resp = conn.getresponse()
1775 if resp.status != 412:
1776 raise Exception("Unexpected HTTP response: %s" % resp.status)
1777
1778 headers = { "callback": '<http://127.0.0.1:12345/event>',
1779 "NT": "upnp:foobar",
1780 "timeout": "Second-1234" }
1781 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1782 resp = conn.getresponse()
1783 if resp.status != 400:
1784 raise Exception("Unexpected HTTP response: %s" % resp.status)
1785
1786 logger.debug("Valid subscription")
1787 headers = { "callback": '<http://127.0.0.1:12345/event>',
1788 "NT": "upnp:event",
1789 "timeout": "Second-1234" }
1790 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1791 resp = conn.getresponse()
1792 if resp.status != 200:
1793 raise Exception("Unexpected HTTP response: %s" % resp.status)
1794 sid = resp.getheader("sid")
1795 logger.debug("Subscription SID " + sid)
1796
1797 logger.debug("Invalid re-subscription")
1798 headers = { "NT": "upnp:event",
1799 "sid": "123456734567854",
1800 "timeout": "Second-1234" }
1801 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1802 resp = conn.getresponse()
1803 if resp.status != 400:
1804 raise Exception("Unexpected HTTP response: %s" % resp.status)
1805
1806 logger.debug("Invalid re-subscription")
1807 headers = { "NT": "upnp:event",
1808 "sid": "uuid:123456734567854",
1809 "timeout": "Second-1234" }
1810 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1811 resp = conn.getresponse()
1812 if resp.status != 400:
1813 raise Exception("Unexpected HTTP response: %s" % resp.status)
1814
1815 logger.debug("Invalid re-subscription")
1816 headers = { "callback": '<http://127.0.0.1:12345/event>',
1817 "NT": "upnp:event",
1818 "sid": sid,
1819 "timeout": "Second-1234" }
1820 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1821 resp = conn.getresponse()
1822 if resp.status != 400:
1823 raise Exception("Unexpected HTTP response: %s" % resp.status)
1824
1825 logger.debug("SID mismatch in re-subscription")
1826 headers = { "NT": "upnp:event",
1827 "sid": "uuid:4c2bca79-1ff4-4e43-85d4-952a2b8a51fb",
1828 "timeout": "Second-1234" }
1829 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1830 resp = conn.getresponse()
1831 if resp.status != 412:
1832 raise Exception("Unexpected HTTP response: %s" % resp.status)
1833
1834 logger.debug("Valid re-subscription")
1835 headers = { "NT": "upnp:event",
1836 "sid": sid,
1837 "timeout": "Second-1234" }
1838 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1839 resp = conn.getresponse()
1840 if resp.status != 200:
1841 raise Exception("Unexpected HTTP response: %s" % resp.status)
1842 sid2 = resp.getheader("sid")
1843 logger.debug("Subscription SID " + sid2)
1844
1845 if sid != sid2:
1846 raise Exception("Unexpected SID change")
1847
1848 logger.debug("Valid re-subscription")
1849 headers = { "NT": "upnp:event",
1850 "sid": "uuid: \t \t" + sid.split(':')[1],
1851 "timeout": "Second-1234" }
1852 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1853 resp = conn.getresponse()
1854 if resp.status != 200:
1855 raise Exception("Unexpected HTTP response: %s" % resp.status)
1856
1857 logger.debug("Invalid unsubscription")
1858 headers = { "sid": sid }
1859 conn.request("UNSUBSCRIBE", "/hello", "\r\n\r\n", headers)
1860 resp = conn.getresponse()
1861 if resp.status != 412:
1862 raise Exception("Unexpected HTTP response: %s" % resp.status)
1863 headers = { "foo": "bar" }
1864 conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1865 resp = conn.getresponse()
1866 if resp.status != 412:
1867 raise Exception("Unexpected HTTP response: %s" % resp.status)
1868
1869 logger.debug("Valid unsubscription")
1870 headers = { "sid": sid }
1871 conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1872 resp = conn.getresponse()
1873 if resp.status != 200:
1874 raise Exception("Unexpected HTTP response: %s" % resp.status)
1875
1876 logger.debug("Unsubscription for not existing SID")
1877 headers = { "sid": sid }
1878 conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1879 resp = conn.getresponse()
1880 if resp.status != 412:
1881 raise Exception("Unexpected HTTP response: %s" % resp.status)
1882
1883 logger.debug("Invalid unsubscription")
1884 headers = { "sid": " \t \tfoo" }
1885 conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1886 resp = conn.getresponse()
1887 if resp.status != 400:
1888 raise Exception("Unexpected HTTP response: %s" % resp.status)
1889
1890 logger.debug("Invalid unsubscription")
1891 headers = { "sid": "uuid:\t \tfoo" }
1892 conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1893 resp = conn.getresponse()
1894 if resp.status != 400:
1895 raise Exception("Unexpected HTTP response: %s" % resp.status)
1896
1897 logger.debug("Invalid unsubscription")
1898 headers = { "NT": "upnp:event",
1899 "sid": sid }
1900 conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1901 resp = conn.getresponse()
1902 if resp.status != 400:
1903 raise Exception("Unexpected HTTP response: %s" % resp.status)
1904 headers = { "callback": '<http://127.0.0.1:12345/event>',
1905 "sid": sid }
1906 conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1907 resp = conn.getresponse()
1908 if resp.status != 400:
1909 raise Exception("Unexpected HTTP response: %s" % resp.status)
1910
1911 logger.debug("Valid subscription with multiple callbacks")
1912 headers = { "callback": '<http://127.0.0.1:12345/event> <http://127.0.0.1:12345/event>\t<http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event>',
1913 "NT": "upnp:event",
1914 "timeout": "Second-1234" }
1915 conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
1916 resp = conn.getresponse()
1917 if resp.status != 200:
1918 raise Exception("Unexpected HTTP response: %s" % resp.status)
1919 sid = resp.getheader("sid")
1920 logger.debug("Subscription SID " + sid)