]> git.ipfire.org Git - thirdparty/hostap.git/blame - tests/hwsim/hostapd.py
tests: PSK file error cases
[thirdparty/hostap.git] / tests / hwsim / hostapd.py
CommitLineData
b8cd4c54 1# Python class for controlling hostapd
36408936 2# Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
b8cd4c54
JM
3#
4# This software may be distributed under the terms of the BSD license.
5# See README for more details.
6
7import os
8import time
9import logging
bfe375ec
JM
10import binascii
11import struct
b8cd4c54
JM
12import wpaspy
13
c9aa4308 14logger = logging.getLogger()
b8cd4c54 15hapd_ctrl = '/var/run/hostapd'
8c87f65f 16hapd_global = '/var/run/hostapd-global'
b8cd4c54 17
bfe375ec
JM
18def mac2tuple(mac):
19 return struct.unpack('6B', binascii.unhexlify(mac.replace(':','')))
20
b8cd4c54
JM
21class HostapdGlobal:
22 def __init__(self):
23 self.ctrl = wpaspy.Ctrl(hapd_global)
24
25 def add(self, ifname):
26 res = self.ctrl.request("ADD " + ifname + " " + hapd_ctrl)
27 if not "OK" in res:
28 raise Exception("Could not add hostapd interface " + ifname)
29
77990cd7
JM
30 def add_iface(self, ifname, confname):
31 res = self.ctrl.request("ADD " + ifname + " config=" + confname)
32 if not "OK" in res:
33 raise Exception("Could not add hostapd interface")
34
a6333977
JM
35 def add_bss(self, phy, confname, ignore_error=False):
36 res = self.ctrl.request("ADD bss_config=" + phy + ":" + confname)
37 if not "OK" in res:
38 if not ignore_error:
39 raise Exception("Could not add hostapd BSS")
40
b8cd4c54 41 def remove(self, ifname):
c2149b08 42 self.ctrl.request("REMOVE " + ifname, timeout=30)
b8cd4c54 43
75428961
JM
44 def relog(self):
45 self.ctrl.request("RELOG")
46
f8949f5f
JM
47 def flush(self):
48 self.ctrl.request("FLUSH")
49
b8cd4c54
JM
50
51class Hostapd:
52 def __init__(self, ifname):
53 self.ifname = ifname
54 self.ctrl = wpaspy.Ctrl(os.path.join(hapd_ctrl, ifname))
b47750be
JM
55 self.mon = wpaspy.Ctrl(os.path.join(hapd_ctrl, ifname))
56 self.mon.attach()
b8cd4c54
JM
57
58 def request(self, cmd):
59 logger.debug(self.ifname + ": CTRL: " + cmd)
60 return self.ctrl.request(cmd)
61
62 def ping(self):
63 return "PONG" in self.request("PING")
64
65 def set(self, field, value):
b8cd4c54
JM
66 if not "OK" in self.request("SET " + field + " " + value):
67 raise Exception("Failed to set hostapd parameter " + field)
68
69 def set_defaults(self):
70 self.set("driver", "nl80211")
71 self.set("hw_mode", "g")
72 self.set("channel", "1")
73 self.set("ieee80211n", "1")
789b9f1d
JM
74 self.set("logger_stdout", "-1")
75 self.set("logger_stdout_level", "0")
b8cd4c54
JM
76
77 def set_open(self, ssid):
78 self.set_defaults()
79 self.set("ssid", ssid)
80
81 def set_wpa2_psk(self, ssid, passphrase):
82 self.set_defaults()
83 self.set("ssid", ssid)
84 self.set("wpa_passphrase", passphrase)
85 self.set("wpa", "2")
86 self.set("wpa_key_mgmt", "WPA-PSK")
87 self.set("rsn_pairwise", "CCMP")
88
e492837b
JM
89 def set_wpa_psk(self, ssid, passphrase):
90 self.set_defaults()
91 self.set("ssid", ssid)
92 self.set("wpa_passphrase", passphrase)
93 self.set("wpa", "1")
94 self.set("wpa_key_mgmt", "WPA-PSK")
95 self.set("wpa_pairwise", "TKIP")
96
97 def set_wpa_psk_mixed(self, ssid, passphrase):
98 self.set_defaults()
99 self.set("ssid", ssid)
100 self.set("wpa_passphrase", passphrase)
101 self.set("wpa", "3")
102 self.set("wpa_key_mgmt", "WPA-PSK")
103 self.set("wpa_pairwise", "TKIP")
104 self.set("rsn_pairwise", "CCMP")
105
0165c4be
JM
106 def set_wep(self, ssid, key):
107 self.set_defaults()
108 self.set("ssid", ssid)
109 self.set("wep_key0", key)
110
b8cd4c54 111 def enable(self):
d45e417f 112 if not "OK" in self.request("ENABLE"):
b8cd4c54
JM
113 raise Exception("Failed to enable hostapd interface " + self.ifname)
114
115 def disable(self):
00f74dbd 116 if not "OK" in self.request("DISABLE"):
b8cd4c54 117 raise Exception("Failed to disable hostapd interface " + self.ifname)
e259d186 118
b47750be
JM
119 def dump_monitor(self):
120 while self.mon.pending():
121 ev = self.mon.recv()
122 logger.debug(self.ifname + ": " + ev)
123
124 def wait_event(self, events, timeout):
36408936
JM
125 start = os.times()[4]
126 while True:
b47750be
JM
127 while self.mon.pending():
128 ev = self.mon.recv()
129 logger.debug(self.ifname + ": " + ev)
130 for event in events:
131 if event in ev:
132 return ev
36408936
JM
133 now = os.times()[4]
134 remaining = start + timeout - now
135 if remaining <= 0:
136 break
137 if not self.mon.pending(timeout=remaining):
138 break
b47750be
JM
139 return None
140
141 def get_status(self):
142 res = self.request("STATUS")
143 lines = res.splitlines()
144 vals = dict()
145 for l in lines:
146 [name,value] = l.split('=', 1)
147 vals[name] = value
148 return vals
149
150 def get_status_field(self, field):
151 vals = self.get_status()
152 if field in vals:
153 return vals[field]
154 return None
155
a36158be
JM
156 def get_driver_status(self):
157 res = self.request("STATUS-DRIVER")
158 lines = res.splitlines()
159 vals = dict()
160 for l in lines:
161 [name,value] = l.split('=', 1)
162 vals[name] = value
163 return vals
164
165 def get_driver_status_field(self, field):
166 vals = self.get_driver_status()
167 if field in vals:
168 return vals[field]
169 return None
170
65038313
JM
171 def get_config(self):
172 res = self.request("GET_CONFIG")
173 lines = res.splitlines()
174 vals = dict()
175 for l in lines:
176 [name,value] = l.split('=', 1)
177 vals[name] = value
178 return vals
179
bfe375ec
JM
180 def mgmt_rx(self, timeout=5):
181 ev = self.wait_event(["MGMT-RX"], timeout=timeout)
182 if ev is None:
183 return None
184 msg = {}
185 frame = binascii.unhexlify(ev.split(' ')[1])
186 msg['frame'] = frame
187
188 hdr = struct.unpack('<HH6B6B6BH', frame[0:24])
189 msg['fc'] = hdr[0]
190 msg['subtype'] = (hdr[0] >> 4) & 0xf
191 hdr = hdr[1:]
192 msg['duration'] = hdr[0]
193 hdr = hdr[1:]
194 msg['da'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
195 hdr = hdr[6:]
196 msg['sa'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
197 hdr = hdr[6:]
198 msg['bssid'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
199 hdr = hdr[6:]
200 msg['seq_ctrl'] = hdr[0]
201 msg['payload'] = frame[24:]
202
203 return msg
204
205 def mgmt_tx(self, msg):
206 t = (msg['fc'], 0) + mac2tuple(msg['da']) + mac2tuple(msg['sa']) + mac2tuple(msg['bssid']) + (0,)
207 hdr = struct.pack('<HH6B6B6BH', *t)
208 self.request("MGMT_TX " + binascii.hexlify(hdr + msg['payload']))
209
cce26eb4
JM
210 def get_sta(self, addr, info=None, next=False):
211 cmd = "STA-NEXT " if next else "STA "
212 if addr is None:
213 res = self.request("STA-FIRST")
214 elif info:
215 res = self.request(cmd + addr + " " + info)
5dec879d 216 else:
cce26eb4 217 res = self.request(cmd + addr)
6435799b
JM
218 lines = res.splitlines()
219 vals = dict()
220 first = True
221 for l in lines:
222 if first:
223 vals['addr'] = l
224 first = False
225 else:
226 [name,value] = l.split('=', 1)
227 vals[name] = value
228 return vals
229
4fcee244
JM
230 def get_mib(self, param=None):
231 if param:
232 res = self.request("MIB " + param)
233 else:
234 res = self.request("MIB")
7fd15145
JM
235 lines = res.splitlines()
236 vals = dict()
237 for l in lines:
4fcee244
JM
238 name_val = l.split('=', 1)
239 if len(name_val) > 1:
240 vals[name_val[0]] = name_val[1]
7fd15145
JM
241 return vals
242
138ec97e 243def add_ap(ifname, params, wait_enabled=True, no_enable=False):
e259d186
JM
244 logger.info("Starting AP " + ifname)
245 hapd_global = HostapdGlobal()
246 hapd_global.remove(ifname)
247 hapd_global.add(ifname)
248 hapd = Hostapd(ifname)
249 if not hapd.ping():
250 raise Exception("Could not ping hostapd")
251 hapd.set_defaults()
cd7f1b9a
JM
252 fields = [ "ssid", "wpa_passphrase", "nas_identifier", "wpa_key_mgmt",
253 "wpa",
7fd15145 254 "wpa_pairwise", "rsn_pairwise", "auth_server_addr",
97de642a 255 "acct_server_addr", "osu_server_uri" ]
e259d186
JM
256 for field in fields:
257 if field in params:
258 hapd.set(field, params[field])
93a06242
JM
259 for f,v in params.items():
260 if f in fields:
261 continue
262 if isinstance(v, list):
263 for val in v:
264 hapd.set(f, val)
265 else:
266 hapd.set(f, v)
138ec97e
JM
267 if no_enable:
268 return hapd
e259d186 269 hapd.enable()
629dbdd3 270 if wait_enabled:
f8ad9dc2 271 ev = hapd.wait_event(["AP-ENABLED", "AP-DISABLED"], timeout=30)
629dbdd3
JM
272 if ev is None:
273 raise Exception("AP startup timed out")
f8ad9dc2
JM
274 if "AP-ENABLED" not in ev:
275 raise Exception("AP startup failed")
b47750be 276 return hapd
e259d186 277
a6333977
JM
278def add_bss(phy, ifname, confname, ignore_error=False):
279 logger.info("Starting BSS phy=" + phy + " ifname=" + ifname)
280 hapd_global = HostapdGlobal()
281 hapd_global.add_bss(phy, confname, ignore_error)
282 hapd = Hostapd(ifname)
283 if not hapd.ping():
284 raise Exception("Could not ping hostapd")
285
77990cd7
JM
286def add_iface(ifname, confname):
287 logger.info("Starting interface " + ifname)
288 hapd_global = HostapdGlobal()
289 hapd_global.add_iface(ifname, confname)
290 hapd = Hostapd(ifname)
291 if not hapd.ping():
292 raise Exception("Could not ping hostapd")
293
a6333977
JM
294def remove_bss(ifname):
295 logger.info("Removing BSS " + ifname)
296 hapd_global = HostapdGlobal()
297 hapd_global.remove(ifname)
298
e259d186
JM
299def wpa2_params(ssid=None, passphrase=None):
300 params = { "wpa": "2",
301 "wpa_key_mgmt": "WPA-PSK",
302 "rsn_pairwise": "CCMP" }
303 if ssid:
304 params["ssid"] = ssid
305 if passphrase:
306 params["wpa_passphrase"] = passphrase
307 return params
308
309def wpa_params(ssid=None, passphrase=None):
310 params = { "wpa": "1",
311 "wpa_key_mgmt": "WPA-PSK",
312 "wpa_pairwise": "TKIP" }
313 if ssid:
314 params["ssid"] = ssid
315 if passphrase:
316 params["wpa_passphrase"] = passphrase
317 return params
318
319def wpa_mixed_params(ssid=None, passphrase=None):
320 params = { "wpa": "3",
321 "wpa_key_mgmt": "WPA-PSK",
322 "wpa_pairwise": "TKIP",
323 "rsn_pairwise": "CCMP" }
324 if ssid:
325 params["ssid"] = ssid
326 if passphrase:
327 params["wpa_passphrase"] = passphrase
328 return params
9626962d
JM
329
330def radius_params():
331 params = { "auth_server_addr": "127.0.0.1",
332 "auth_server_port": "1812",
333 "auth_server_shared_secret": "radius",
334 "nas_identifier": "nas.w1.fi" }
335 return params
336
71390dc8
JM
337def wpa_eap_params(ssid=None):
338 params = radius_params()
339 params["wpa"] = "1"
340 params["wpa_key_mgmt"] = "WPA-EAP"
341 params["wpa_pairwise"] = "TKIP"
342 params["ieee8021x"] = "1"
343 if ssid:
344 params["ssid"] = ssid
345 return params
346
9626962d
JM
347def wpa2_eap_params(ssid=None):
348 params = radius_params()
349 params["wpa"] = "2"
350 params["wpa_key_mgmt"] = "WPA-EAP"
351 params["rsn_pairwise"] = "CCMP"
352 params["ieee8021x"] = "1"
353 if ssid:
354 params["ssid"] = ssid
355 return params