]> git.ipfire.org Git - thirdparty/hostap.git/blob - tests/hwsim/hostapd.py
82d21567ecd89bb02a075f9de648ab70e3e1939c
[thirdparty/hostap.git] / tests / hwsim / hostapd.py
1 # Python class for controlling hostapd
2 # Copyright (c) 2013-2019, Jouni Malinen <j@w1.fi>
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6
7 import os
8 import time
9 import logging
10 import binascii
11 import struct
12 import wpaspy
13 import remotehost
14 import utils
15 import subprocess
16
17 logger = logging.getLogger()
18 hapd_ctrl = '/var/run/hostapd'
19 hapd_global = '/var/run/hostapd-global'
20
21 def mac2tuple(mac):
22 return struct.unpack('6B', binascii.unhexlify(mac.replace(':', '')))
23
24 class HostapdGlobal:
25 def __init__(self, apdev=None, global_ctrl_override=None):
26 try:
27 hostname = apdev['hostname']
28 port = apdev['port']
29 except:
30 hostname = None
31 port = 8878
32 self.host = remotehost.Host(hostname)
33 self.hostname = hostname
34 self.port = port
35 if hostname is None:
36 global_ctrl = hapd_global
37 if global_ctrl_override:
38 global_ctrl = global_ctrl_override
39 self.ctrl = wpaspy.Ctrl(global_ctrl)
40 self.mon = wpaspy.Ctrl(global_ctrl)
41 self.dbg = ""
42 else:
43 self.ctrl = wpaspy.Ctrl(hostname, port)
44 self.mon = wpaspy.Ctrl(hostname, port)
45 self.dbg = hostname + "/" + str(port)
46 self.mon.attach()
47
48 def cmd_execute(self, cmd_array, shell=False):
49 if self.hostname is None:
50 if shell:
51 cmd = ' '.join(cmd_array)
52 else:
53 cmd = cmd_array
54 proc = subprocess.Popen(cmd, stderr=subprocess.STDOUT,
55 stdout=subprocess.PIPE, shell=shell)
56 out = proc.communicate()[0]
57 ret = proc.returncode
58 return ret, out.decode()
59 else:
60 return self.host.execute(cmd_array)
61
62 def request(self, cmd, timeout=10):
63 logger.debug(self.dbg + ": CTRL(global): " + cmd)
64 return self.ctrl.request(cmd, timeout)
65
66 def wait_event(self, events, timeout):
67 start = os.times()[4]
68 while True:
69 while self.mon.pending():
70 ev = self.mon.recv()
71 logger.debug(self.dbg + "(global): " + ev)
72 for event in events:
73 if event in ev:
74 return ev
75 now = os.times()[4]
76 remaining = start + timeout - now
77 if remaining <= 0:
78 break
79 if not self.mon.pending(timeout=remaining):
80 break
81 return None
82
83 def add(self, ifname, driver=None):
84 cmd = "ADD " + ifname + " " + hapd_ctrl
85 if driver:
86 cmd += " " + driver
87 res = self.request(cmd)
88 if "OK" not in res:
89 raise Exception("Could not add hostapd interface " + ifname)
90
91 def add_iface(self, ifname, confname):
92 res = self.request("ADD " + ifname + " config=" + confname)
93 if "OK" not in res:
94 raise Exception("Could not add hostapd interface")
95
96 def add_bss(self, phy, confname, ignore_error=False):
97 res = self.request("ADD bss_config=" + phy + ":" + confname)
98 if "OK" not in res:
99 if not ignore_error:
100 raise Exception("Could not add hostapd BSS")
101
102 def remove(self, ifname):
103 self.request("REMOVE " + ifname, timeout=30)
104
105 def relog(self):
106 self.request("RELOG")
107
108 def flush(self):
109 self.request("FLUSH")
110
111 def get_ctrl_iface_port(self, ifname):
112 if self.hostname is None:
113 return None
114
115 res = self.request("INTERFACES ctrl")
116 lines = res.splitlines()
117 found = False
118 for line in lines:
119 words = line.split()
120 if words[0] == ifname:
121 found = True
122 break
123 if not found:
124 raise Exception("Could not find UDP port for " + ifname)
125 res = line.find("ctrl_iface=udp:")
126 if res == -1:
127 raise Exception("Wrong ctrl_interface format")
128 words = line.split(":")
129 return int(words[1])
130
131 def terminate(self):
132 self.mon.detach()
133 self.mon.close()
134 self.mon = None
135 self.ctrl.terminate()
136 self.ctrl = None
137
138 class Hostapd:
139 def __init__(self, ifname, bssidx=0, hostname=None, port=8877):
140 self.hostname = hostname
141 self.host = remotehost.Host(hostname, ifname)
142 self.ifname = ifname
143 if hostname is None:
144 self.ctrl = wpaspy.Ctrl(os.path.join(hapd_ctrl, ifname))
145 self.mon = wpaspy.Ctrl(os.path.join(hapd_ctrl, ifname))
146 self.dbg = ifname
147 else:
148 self.ctrl = wpaspy.Ctrl(hostname, port)
149 self.mon = wpaspy.Ctrl(hostname, port)
150 self.dbg = hostname + "/" + ifname
151 self.mon.attach()
152 self.bssid = None
153 self.bssidx = bssidx
154
155 def cmd_execute(self, cmd_array, shell=False):
156 if self.hostname is None:
157 if shell:
158 cmd = ' '.join(cmd_array)
159 else:
160 cmd = cmd_array
161 proc = subprocess.Popen(cmd, stderr=subprocess.STDOUT,
162 stdout=subprocess.PIPE, shell=shell)
163 out = proc.communicate()[0]
164 ret = proc.returncode
165 return ret, out.decode()
166 else:
167 return self.host.execute(cmd_array)
168
169 def close_ctrl(self):
170 if self.mon is not None:
171 self.mon.detach()
172 self.mon.close()
173 self.mon = None
174 self.ctrl.close()
175 self.ctrl = None
176
177 def own_addr(self):
178 if self.bssid is None:
179 self.bssid = self.get_status_field('bssid[%d]' % self.bssidx)
180 return self.bssid
181
182 def request(self, cmd):
183 logger.debug(self.dbg + ": CTRL: " + cmd)
184 return self.ctrl.request(cmd)
185
186 def ping(self):
187 return "PONG" in self.request("PING")
188
189 def set(self, field, value):
190 if "OK" not in self.request("SET " + field + " " + value):
191 raise Exception("Failed to set hostapd parameter " + field)
192
193 def set_defaults(self):
194 self.set("driver", "nl80211")
195 self.set("hw_mode", "g")
196 self.set("channel", "1")
197 self.set("ieee80211n", "1")
198 self.set("logger_stdout", "-1")
199 self.set("logger_stdout_level", "0")
200
201 def set_open(self, ssid):
202 self.set_defaults()
203 self.set("ssid", ssid)
204
205 def set_wpa2_psk(self, ssid, passphrase):
206 self.set_defaults()
207 self.set("ssid", ssid)
208 self.set("wpa_passphrase", passphrase)
209 self.set("wpa", "2")
210 self.set("wpa_key_mgmt", "WPA-PSK")
211 self.set("rsn_pairwise", "CCMP")
212
213 def set_wpa_psk(self, ssid, passphrase):
214 self.set_defaults()
215 self.set("ssid", ssid)
216 self.set("wpa_passphrase", passphrase)
217 self.set("wpa", "1")
218 self.set("wpa_key_mgmt", "WPA-PSK")
219 self.set("wpa_pairwise", "TKIP")
220
221 def set_wpa_psk_mixed(self, ssid, passphrase):
222 self.set_defaults()
223 self.set("ssid", ssid)
224 self.set("wpa_passphrase", passphrase)
225 self.set("wpa", "3")
226 self.set("wpa_key_mgmt", "WPA-PSK")
227 self.set("wpa_pairwise", "TKIP")
228 self.set("rsn_pairwise", "CCMP")
229
230 def set_wep(self, ssid, key):
231 self.set_defaults()
232 self.set("ssid", ssid)
233 self.set("wep_key0", key)
234
235 def enable(self):
236 if "OK" not in self.request("ENABLE"):
237 raise Exception("Failed to enable hostapd interface " + self.ifname)
238
239 def disable(self):
240 if "OK" not in self.request("DISABLE"):
241 raise Exception("Failed to disable hostapd interface " + self.ifname)
242
243 def dump_monitor(self):
244 while self.mon.pending():
245 ev = self.mon.recv()
246 logger.debug(self.dbg + ": " + ev)
247
248 def wait_event(self, events, timeout):
249 start = os.times()[4]
250 while True:
251 while self.mon.pending():
252 ev = self.mon.recv()
253 logger.debug(self.dbg + ": " + ev)
254 for event in events:
255 if event in ev:
256 return ev
257 now = os.times()[4]
258 remaining = start + timeout - now
259 if remaining <= 0:
260 break
261 if not self.mon.pending(timeout=remaining):
262 break
263 return None
264
265 def get_status(self):
266 res = self.request("STATUS")
267 lines = res.splitlines()
268 vals = dict()
269 for l in lines:
270 [name, value] = l.split('=', 1)
271 vals[name] = value
272 return vals
273
274 def get_status_field(self, field):
275 vals = self.get_status()
276 if field in vals:
277 return vals[field]
278 return None
279
280 def get_driver_status(self):
281 res = self.request("STATUS-DRIVER")
282 lines = res.splitlines()
283 vals = dict()
284 for l in lines:
285 [name, value] = l.split('=', 1)
286 vals[name] = value
287 return vals
288
289 def get_driver_status_field(self, field):
290 vals = self.get_driver_status()
291 if field in vals:
292 return vals[field]
293 return None
294
295 def get_config(self):
296 res = self.request("GET_CONFIG")
297 lines = res.splitlines()
298 vals = dict()
299 for l in lines:
300 [name, value] = l.split('=', 1)
301 vals[name] = value
302 return vals
303
304 def mgmt_rx(self, timeout=5):
305 ev = self.wait_event(["MGMT-RX"], timeout=timeout)
306 if ev is None:
307 return None
308 msg = {}
309 frame = binascii.unhexlify(ev.split(' ')[1])
310 msg['frame'] = frame
311
312 hdr = struct.unpack('<HH6B6B6BH', frame[0:24])
313 msg['fc'] = hdr[0]
314 msg['subtype'] = (hdr[0] >> 4) & 0xf
315 hdr = hdr[1:]
316 msg['duration'] = hdr[0]
317 hdr = hdr[1:]
318 msg['da'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
319 hdr = hdr[6:]
320 msg['sa'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
321 hdr = hdr[6:]
322 msg['bssid'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
323 hdr = hdr[6:]
324 msg['seq_ctrl'] = hdr[0]
325 msg['payload'] = frame[24:]
326
327 return msg
328
329 def mgmt_tx(self, msg):
330 t = (msg['fc'], 0) + mac2tuple(msg['da']) + mac2tuple(msg['sa']) + mac2tuple(msg['bssid']) + (0,)
331 hdr = struct.pack('<HH6B6B6BH', *t)
332 res = self.request("MGMT_TX " + binascii.hexlify(hdr + msg['payload']).decode())
333 if "OK" not in res:
334 raise Exception("MGMT_TX command to hostapd failed")
335
336 def get_sta(self, addr, info=None, next=False):
337 cmd = "STA-NEXT " if next else "STA "
338 if addr is None:
339 res = self.request("STA-FIRST")
340 elif info:
341 res = self.request(cmd + addr + " " + info)
342 else:
343 res = self.request(cmd + addr)
344 lines = res.splitlines()
345 vals = dict()
346 first = True
347 for l in lines:
348 if first and '=' not in l:
349 vals['addr'] = l
350 first = False
351 else:
352 [name, value] = l.split('=', 1)
353 vals[name] = value
354 return vals
355
356 def get_mib(self, param=None):
357 if param:
358 res = self.request("MIB " + param)
359 else:
360 res = self.request("MIB")
361 lines = res.splitlines()
362 vals = dict()
363 for l in lines:
364 name_val = l.split('=', 1)
365 if len(name_val) > 1:
366 vals[name_val[0]] = name_val[1]
367 return vals
368
369 def get_pmksa(self, addr):
370 res = self.request("PMKSA")
371 lines = res.splitlines()
372 for l in lines:
373 if addr not in l:
374 continue
375 vals = dict()
376 [index, aa, pmkid, expiration, opportunistic] = l.split(' ')
377 vals['index'] = index
378 vals['pmkid'] = pmkid
379 vals['expiration'] = expiration
380 vals['opportunistic'] = opportunistic
381 return vals
382 return None
383
384 def dpp_qr_code(self, uri):
385 res = self.request("DPP_QR_CODE " + uri)
386 if "FAIL" in res:
387 raise Exception("Failed to parse QR Code URI")
388 return int(res)
389
390 def dpp_bootstrap_gen(self, type="qrcode", chan=None, mac=None, info=None,
391 curve=None, key=None):
392 cmd = "DPP_BOOTSTRAP_GEN type=" + type
393 if chan:
394 cmd += " chan=" + chan
395 if mac:
396 if mac is True:
397 mac = self.own_addr()
398 cmd += " mac=" + mac.replace(':', '')
399 if info:
400 cmd += " info=" + info
401 if curve:
402 cmd += " curve=" + curve
403 if key:
404 cmd += " key=" + key
405 res = self.request(cmd)
406 if "FAIL" in res:
407 raise Exception("Failed to generate bootstrapping info")
408 return int(res)
409
410 def dpp_listen(self, freq, netrole=None, qr=None, role=None):
411 cmd = "DPP_LISTEN " + str(freq)
412 if netrole:
413 cmd += " netrole=" + netrole
414 if qr:
415 cmd += " qr=" + qr
416 if role:
417 cmd += " role=" + role
418 if "OK" not in self.request(cmd):
419 raise Exception("Failed to start listen operation")
420
421 def dpp_auth_init(self, peer=None, uri=None, conf=None, configurator=None,
422 extra=None, own=None, role=None, neg_freq=None,
423 ssid=None, passphrase=None, expect_fail=False):
424 cmd = "DPP_AUTH_INIT"
425 if peer is None:
426 peer = self.dpp_qr_code(uri)
427 cmd += " peer=%d" % peer
428 if own is not None:
429 cmd += " own=%d" % own
430 if role:
431 cmd += " role=" + role
432 if extra:
433 cmd += " " + extra
434 if conf:
435 cmd += " conf=" + conf
436 if configurator is not None:
437 cmd += " configurator=%d" % configurator
438 if neg_freq:
439 cmd += " neg_freq=%d" % neg_freq
440 if ssid:
441 cmd += " ssid=" + binascii.hexlify(ssid.encode()).decode()
442 if passphrase:
443 cmd += " pass=" + binascii.hexlify(passphrase.encode()).decode()
444 res = self.request(cmd)
445 if expect_fail:
446 if "FAIL" not in res:
447 raise Exception("DPP authentication started unexpectedly")
448 return
449 if "OK" not in res:
450 raise Exception("Failed to initiate DPP Authentication")
451
452 def dpp_pkex_init(self, identifier, code, role=None, key=None, curve=None,
453 extra=None, use_id=None):
454 if use_id is None:
455 id1 = self.dpp_bootstrap_gen(type="pkex", key=key, curve=curve)
456 else:
457 id1 = use_id
458 cmd = "own=%d " % id1
459 if identifier:
460 cmd += "identifier=%s " % identifier
461 cmd += "init=1 "
462 if role:
463 cmd += "role=%s " % role
464 if extra:
465 cmd += extra + " "
466 cmd += "code=%s" % code
467 res = self.request("DPP_PKEX_ADD " + cmd)
468 if "FAIL" in res:
469 raise Exception("Failed to set PKEX data (initiator)")
470 return id1
471
472 def dpp_pkex_resp(self, freq, identifier, code, key=None, curve=None,
473 listen_role=None):
474 id0 = self.dpp_bootstrap_gen(type="pkex", key=key, curve=curve)
475 cmd = "own=%d " % id0
476 if identifier:
477 cmd += "identifier=%s " % identifier
478 cmd += "code=%s" % code
479 res = self.request("DPP_PKEX_ADD " + cmd)
480 if "FAIL" in res:
481 raise Exception("Failed to set PKEX data (responder)")
482 self.dpp_listen(freq, role=listen_role)
483
484 def dpp_configurator_add(self, curve=None, key=None):
485 cmd = "DPP_CONFIGURATOR_ADD"
486 if curve:
487 cmd += " curve=" + curve
488 if key:
489 cmd += " key=" + key
490 res = self.request(cmd)
491 if "FAIL" in res:
492 raise Exception("Failed to add configurator")
493 return int(res)
494
495 def dpp_configurator_remove(self, conf_id):
496 res = self.request("DPP_CONFIGURATOR_REMOVE %d" % conf_id)
497 if "OK" not in res:
498 raise Exception("DPP_CONFIGURATOR_REMOVE failed")
499
500 def add_ap(apdev, params, wait_enabled=True, no_enable=False, timeout=30,
501 global_ctrl_override=None):
502 if isinstance(apdev, dict):
503 ifname = apdev['ifname']
504 try:
505 hostname = apdev['hostname']
506 port = apdev['port']
507 logger.info("Starting AP " + hostname + "/" + port + " " + ifname)
508 except:
509 logger.info("Starting AP " + ifname)
510 hostname = None
511 port = 8878
512 else:
513 ifname = apdev
514 logger.info("Starting AP " + ifname + " (old add_ap argument type)")
515 hostname = None
516 port = 8878
517 hapd_global = HostapdGlobal(apdev,
518 global_ctrl_override=global_ctrl_override)
519 hapd_global.remove(ifname)
520 hapd_global.add(ifname)
521 port = hapd_global.get_ctrl_iface_port(ifname)
522 hapd = Hostapd(ifname, hostname=hostname, port=port)
523 if not hapd.ping():
524 raise Exception("Could not ping hostapd")
525 hapd.set_defaults()
526 fields = ["ssid", "wpa_passphrase", "nas_identifier", "wpa_key_mgmt",
527 "wpa",
528 "wpa_pairwise", "rsn_pairwise", "auth_server_addr",
529 "acct_server_addr", "osu_server_uri"]
530 for field in fields:
531 if field in params:
532 hapd.set(field, params[field])
533 for f, v in list(params.items()):
534 if f in fields:
535 continue
536 if isinstance(v, list):
537 for val in v:
538 hapd.set(f, val)
539 else:
540 hapd.set(f, v)
541 if no_enable:
542 return hapd
543 hapd.enable()
544 if wait_enabled:
545 ev = hapd.wait_event(["AP-ENABLED", "AP-DISABLED"], timeout=timeout)
546 if ev is None:
547 raise Exception("AP startup timed out")
548 if "AP-ENABLED" not in ev:
549 raise Exception("AP startup failed")
550 return hapd
551
552 def add_bss(apdev, ifname, confname, ignore_error=False):
553 phy = utils.get_phy(apdev)
554 try:
555 hostname = apdev['hostname']
556 port = apdev['port']
557 logger.info("Starting BSS " + hostname + "/" + port + " phy=" + phy + " ifname=" + ifname)
558 except:
559 logger.info("Starting BSS phy=" + phy + " ifname=" + ifname)
560 hostname = None
561 port = 8878
562 hapd_global = HostapdGlobal(apdev)
563 hapd_global.add_bss(phy, confname, ignore_error)
564 port = hapd_global.get_ctrl_iface_port(ifname)
565 hapd = Hostapd(ifname, hostname=hostname, port=port)
566 if not hapd.ping():
567 raise Exception("Could not ping hostapd")
568 return hapd
569
570 def add_iface(apdev, confname):
571 ifname = apdev['ifname']
572 try:
573 hostname = apdev['hostname']
574 port = apdev['port']
575 logger.info("Starting interface " + hostname + "/" + port + " " + ifname)
576 except:
577 logger.info("Starting interface " + ifname)
578 hostname = None
579 port = 8878
580 hapd_global = HostapdGlobal(apdev)
581 hapd_global.add_iface(ifname, confname)
582 port = hapd_global.get_ctrl_iface_port(ifname)
583 hapd = Hostapd(ifname, hostname=hostname, port=port)
584 if not hapd.ping():
585 raise Exception("Could not ping hostapd")
586 return hapd
587
588 def remove_bss(apdev, ifname=None):
589 if ifname == None:
590 ifname = apdev['ifname']
591 try:
592 hostname = apdev['hostname']
593 port = apdev['port']
594 logger.info("Removing BSS " + hostname + "/" + port + " " + ifname)
595 except:
596 logger.info("Removing BSS " + ifname)
597 hapd_global = HostapdGlobal(apdev)
598 hapd_global.remove(ifname)
599
600 def terminate(apdev):
601 try:
602 hostname = apdev['hostname']
603 port = apdev['port']
604 logger.info("Terminating hostapd " + hostname + "/" + port)
605 except:
606 logger.info("Terminating hostapd")
607 hapd_global = HostapdGlobal(apdev)
608 hapd_global.terminate()
609
610 def wpa2_params(ssid=None, passphrase=None):
611 params = {"wpa": "2",
612 "wpa_key_mgmt": "WPA-PSK",
613 "rsn_pairwise": "CCMP"}
614 if ssid:
615 params["ssid"] = ssid
616 if passphrase:
617 params["wpa_passphrase"] = passphrase
618 return params
619
620 def wpa_params(ssid=None, passphrase=None):
621 params = {"wpa": "1",
622 "wpa_key_mgmt": "WPA-PSK",
623 "wpa_pairwise": "TKIP"}
624 if ssid:
625 params["ssid"] = ssid
626 if passphrase:
627 params["wpa_passphrase"] = passphrase
628 return params
629
630 def wpa_mixed_params(ssid=None, passphrase=None):
631 params = {"wpa": "3",
632 "wpa_key_mgmt": "WPA-PSK",
633 "wpa_pairwise": "TKIP",
634 "rsn_pairwise": "CCMP"}
635 if ssid:
636 params["ssid"] = ssid
637 if passphrase:
638 params["wpa_passphrase"] = passphrase
639 return params
640
641 def radius_params():
642 params = {"auth_server_addr": "127.0.0.1",
643 "auth_server_port": "1812",
644 "auth_server_shared_secret": "radius",
645 "nas_identifier": "nas.w1.fi"}
646 return params
647
648 def wpa_eap_params(ssid=None):
649 params = radius_params()
650 params["wpa"] = "1"
651 params["wpa_key_mgmt"] = "WPA-EAP"
652 params["wpa_pairwise"] = "TKIP"
653 params["ieee8021x"] = "1"
654 if ssid:
655 params["ssid"] = ssid
656 return params
657
658 def wpa2_eap_params(ssid=None):
659 params = radius_params()
660 params["wpa"] = "2"
661 params["wpa_key_mgmt"] = "WPA-EAP"
662 params["rsn_pairwise"] = "CCMP"
663 params["ieee8021x"] = "1"
664 if ssid:
665 params["ssid"] = ssid
666 return params
667
668 def b_only_params(channel="1", ssid=None, country=None):
669 params = {"hw_mode": "b",
670 "channel": channel}
671 if ssid:
672 params["ssid"] = ssid
673 if country:
674 params["country_code"] = country
675 return params
676
677 def g_only_params(channel="1", ssid=None, country=None):
678 params = {"hw_mode": "g",
679 "channel": channel}
680 if ssid:
681 params["ssid"] = ssid
682 if country:
683 params["country_code"] = country
684 return params
685
686 def a_only_params(channel="36", ssid=None, country=None):
687 params = {"hw_mode": "a",
688 "channel": channel}
689 if ssid:
690 params["ssid"] = ssid
691 if country:
692 params["country_code"] = country
693 return params
694
695 def ht20_params(channel="1", ssid=None, country=None):
696 params = {"ieee80211n": "1",
697 "channel": channel,
698 "hw_mode": "g"}
699 if int(channel) > 14:
700 params["hw_mode"] = "a"
701 if ssid:
702 params["ssid"] = ssid
703 if country:
704 params["country_code"] = country
705 return params
706
707 def ht40_plus_params(channel="1", ssid=None, country=None):
708 params = ht20_params(channel, ssid, country)
709 params['ht_capab'] = "[HT40+]"
710 return params
711
712 def ht40_minus_params(channel="1", ssid=None, country=None):
713 params = ht20_params(channel, ssid, country)
714 params['ht_capab'] = "[HT40-]"
715 return params
716
717 def cmd_execute(apdev, cmd, shell=False):
718 hapd_global = HostapdGlobal(apdev)
719 return hapd_global.cmd_execute(cmd, shell=shell)