]>
git.ipfire.org Git - thirdparty/hostap.git/blob - tests/hwsim/hostapd.py
1 # Python class for controlling hostapd
2 # Copyright (c) 2013-2019, Jouni Malinen <j@w1.fi>
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
17 logger
= logging
.getLogger()
18 hapd_ctrl
= '/var/run/hostapd'
19 hapd_global
= '/var/run/hostapd-global'
22 return struct
.unpack('6B', binascii
.unhexlify(mac
.replace(':', '')))
25 def __init__(self
, apdev
=None, global_ctrl_override
=None):
27 hostname
= apdev
['hostname']
32 self
.host
= remotehost
.Host(hostname
)
33 self
.hostname
= hostname
36 global_ctrl
= hapd_global
37 if global_ctrl_override
:
38 global_ctrl
= global_ctrl_override
39 self
.ctrl
= wpaspy
.Ctrl(global_ctrl
)
40 self
.mon
= wpaspy
.Ctrl(global_ctrl
)
43 self
.ctrl
= wpaspy
.Ctrl(hostname
, port
)
44 self
.mon
= wpaspy
.Ctrl(hostname
, port
)
45 self
.dbg
= hostname
+ "/" + str(port
)
48 def cmd_execute(self
, cmd_array
, shell
=False):
49 if self
.hostname
is None:
51 cmd
= ' '.join(cmd_array
)
54 proc
= subprocess
.Popen(cmd
, stderr
=subprocess
.STDOUT
,
55 stdout
=subprocess
.PIPE
, shell
=shell
)
56 out
= proc
.communicate()[0]
58 return ret
, out
.decode()
60 return self
.host
.execute(cmd_array
)
62 def request(self
, cmd
, timeout
=10):
63 logger
.debug(self
.dbg
+ ": CTRL(global): " + cmd
)
64 return self
.ctrl
.request(cmd
, timeout
)
66 def wait_event(self
, events
, timeout
):
69 while self
.mon
.pending():
71 logger
.debug(self
.dbg
+ "(global): " + ev
)
76 remaining
= start
+ timeout
- now
79 if not self
.mon
.pending(timeout
=remaining
):
83 def add(self
, ifname
, driver
=None):
84 cmd
= "ADD " + ifname
+ " " + hapd_ctrl
87 res
= self
.request(cmd
)
89 raise Exception("Could not add hostapd interface " + ifname
)
91 def add_iface(self
, ifname
, confname
):
92 res
= self
.request("ADD " + ifname
+ " config=" + confname
)
94 raise Exception("Could not add hostapd interface")
96 def add_bss(self
, phy
, confname
, ignore_error
=False):
97 res
= self
.request("ADD bss_config=" + phy
+ ":" + confname
)
100 raise Exception("Could not add hostapd BSS")
102 def remove(self
, ifname
):
103 self
.request("REMOVE " + ifname
, timeout
=30)
106 self
.request("RELOG")
109 self
.request("FLUSH")
111 def get_ctrl_iface_port(self
, ifname
):
112 if self
.hostname
is None:
115 res
= self
.request("INTERFACES ctrl")
116 lines
= res
.splitlines()
120 if words
[0] == ifname
:
124 raise Exception("Could not find UDP port for " + ifname
)
125 res
= line
.find("ctrl_iface=udp:")
127 raise Exception("Wrong ctrl_interface format")
128 words
= line
.split(":")
135 self
.ctrl
.terminate()
139 def __init__(self
, ifname
, bssidx
=0, hostname
=None, port
=8877):
140 self
.hostname
= hostname
141 self
.host
= remotehost
.Host(hostname
, ifname
)
144 self
.ctrl
= wpaspy
.Ctrl(os
.path
.join(hapd_ctrl
, ifname
))
145 self
.mon
= wpaspy
.Ctrl(os
.path
.join(hapd_ctrl
, ifname
))
148 self
.ctrl
= wpaspy
.Ctrl(hostname
, port
)
149 self
.mon
= wpaspy
.Ctrl(hostname
, port
)
150 self
.dbg
= hostname
+ "/" + ifname
155 def cmd_execute(self
, cmd_array
, shell
=False):
156 if self
.hostname
is None:
158 cmd
= ' '.join(cmd_array
)
161 proc
= subprocess
.Popen(cmd
, stderr
=subprocess
.STDOUT
,
162 stdout
=subprocess
.PIPE
, shell
=shell
)
163 out
= proc
.communicate()[0]
164 ret
= proc
.returncode
165 return ret
, out
.decode()
167 return self
.host
.execute(cmd_array
)
169 def close_ctrl(self
):
170 if self
.mon
is not None:
178 if self
.bssid
is None:
179 self
.bssid
= self
.get_status_field('bssid[%d]' % self
.bssidx
)
182 def request(self
, cmd
):
183 logger
.debug(self
.dbg
+ ": CTRL: " + cmd
)
184 return self
.ctrl
.request(cmd
)
187 return "PONG" in self
.request("PING")
189 def set(self
, field
, value
):
190 if "OK" not in self
.request("SET " + field
+ " " + value
):
191 raise Exception("Failed to set hostapd parameter " + field
)
193 def set_defaults(self
):
194 self
.set("driver", "nl80211")
195 self
.set("hw_mode", "g")
196 self
.set("channel", "1")
197 self
.set("ieee80211n", "1")
198 self
.set("logger_stdout", "-1")
199 self
.set("logger_stdout_level", "0")
201 def set_open(self
, ssid
):
203 self
.set("ssid", ssid
)
205 def set_wpa2_psk(self
, ssid
, passphrase
):
207 self
.set("ssid", ssid
)
208 self
.set("wpa_passphrase", passphrase
)
210 self
.set("wpa_key_mgmt", "WPA-PSK")
211 self
.set("rsn_pairwise", "CCMP")
213 def set_wpa_psk(self
, ssid
, passphrase
):
215 self
.set("ssid", ssid
)
216 self
.set("wpa_passphrase", passphrase
)
218 self
.set("wpa_key_mgmt", "WPA-PSK")
219 self
.set("wpa_pairwise", "TKIP")
221 def set_wpa_psk_mixed(self
, ssid
, passphrase
):
223 self
.set("ssid", ssid
)
224 self
.set("wpa_passphrase", passphrase
)
226 self
.set("wpa_key_mgmt", "WPA-PSK")
227 self
.set("wpa_pairwise", "TKIP")
228 self
.set("rsn_pairwise", "CCMP")
230 def set_wep(self
, ssid
, key
):
232 self
.set("ssid", ssid
)
233 self
.set("wep_key0", key
)
236 if "OK" not in self
.request("ENABLE"):
237 raise Exception("Failed to enable hostapd interface " + self
.ifname
)
240 if "OK" not in self
.request("DISABLE"):
241 raise Exception("Failed to disable hostapd interface " + self
.ifname
)
243 def dump_monitor(self
):
244 while self
.mon
.pending():
246 logger
.debug(self
.dbg
+ ": " + ev
)
248 def wait_event(self
, events
, timeout
):
249 start
= os
.times()[4]
251 while self
.mon
.pending():
253 logger
.debug(self
.dbg
+ ": " + ev
)
258 remaining
= start
+ timeout
- now
261 if not self
.mon
.pending(timeout
=remaining
):
265 def wait_sta(self
, addr
=None, timeout
=2):
266 ev
= self
.wait_event("AP-STA-CONNECT", timeout
=timeout
)
268 raise Exception("AP did not report STA connection")
269 if addr
and addr
not in ev
:
270 raise Exception("Unexpected STA address in connection event: " + ev
)
272 def get_status(self
):
273 res
= self
.request("STATUS")
274 lines
= res
.splitlines()
277 [name
, value
] = l
.split('=', 1)
281 def get_status_field(self
, field
):
282 vals
= self
.get_status()
287 def get_driver_status(self
):
288 res
= self
.request("STATUS-DRIVER")
289 lines
= res
.splitlines()
292 [name
, value
] = l
.split('=', 1)
296 def get_driver_status_field(self
, field
):
297 vals
= self
.get_driver_status()
302 def get_config(self
):
303 res
= self
.request("GET_CONFIG")
304 lines
= res
.splitlines()
307 [name
, value
] = l
.split('=', 1)
311 def mgmt_rx(self
, timeout
=5):
312 ev
= self
.wait_event(["MGMT-RX"], timeout
=timeout
)
316 frame
= binascii
.unhexlify(ev
.split(' ')[1])
319 hdr
= struct
.unpack('<HH6B6B6BH', frame
[0:24])
321 msg
['subtype'] = (hdr
[0] >> 4) & 0xf
323 msg
['duration'] = hdr
[0]
325 msg
['da'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr
[0:6]
327 msg
['sa'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr
[0:6]
329 msg
['bssid'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr
[0:6]
331 msg
['seq_ctrl'] = hdr
[0]
332 msg
['payload'] = frame
[24:]
336 def mgmt_tx(self
, msg
):
337 t
= (msg
['fc'], 0) + mac2tuple(msg
['da']) + mac2tuple(msg
['sa']) + mac2tuple(msg
['bssid']) + (0,)
338 hdr
= struct
.pack('<HH6B6B6BH', *t
)
339 res
= self
.request("MGMT_TX " + binascii
.hexlify(hdr
+ msg
['payload']).decode())
341 raise Exception("MGMT_TX command to hostapd failed")
343 def get_sta(self
, addr
, info
=None, next
=False):
344 cmd
= "STA-NEXT " if next
else "STA "
346 res
= self
.request("STA-FIRST")
348 res
= self
.request(cmd
+ addr
+ " " + info
)
350 res
= self
.request(cmd
+ addr
)
351 lines
= res
.splitlines()
355 if first
and '=' not in l
:
359 [name
, value
] = l
.split('=', 1)
363 def get_mib(self
, param
=None):
365 res
= self
.request("MIB " + param
)
367 res
= self
.request("MIB")
368 lines
= res
.splitlines()
371 name_val
= l
.split('=', 1)
372 if len(name_val
) > 1:
373 vals
[name_val
[0]] = name_val
[1]
376 def get_pmksa(self
, addr
):
377 res
= self
.request("PMKSA")
378 lines
= res
.splitlines()
383 [index
, aa
, pmkid
, expiration
, opportunistic
] = l
.split(' ')
384 vals
['index'] = index
385 vals
['pmkid'] = pmkid
386 vals
['expiration'] = expiration
387 vals
['opportunistic'] = opportunistic
391 def dpp_qr_code(self
, uri
):
392 res
= self
.request("DPP_QR_CODE " + uri
)
394 raise Exception("Failed to parse QR Code URI")
397 def dpp_bootstrap_gen(self
, type="qrcode", chan
=None, mac
=None, info
=None,
398 curve
=None, key
=None):
399 cmd
= "DPP_BOOTSTRAP_GEN type=" + type
401 cmd
+= " chan=" + chan
404 mac
= self
.own_addr()
405 cmd
+= " mac=" + mac
.replace(':', '')
407 cmd
+= " info=" + info
409 cmd
+= " curve=" + curve
412 res
= self
.request(cmd
)
414 raise Exception("Failed to generate bootstrapping info")
417 def dpp_listen(self
, freq
, netrole
=None, qr
=None, role
=None):
418 cmd
= "DPP_LISTEN " + str(freq
)
420 cmd
+= " netrole=" + netrole
424 cmd
+= " role=" + role
425 if "OK" not in self
.request(cmd
):
426 raise Exception("Failed to start listen operation")
428 def dpp_auth_init(self
, peer
=None, uri
=None, conf
=None, configurator
=None,
429 extra
=None, own
=None, role
=None, neg_freq
=None,
430 ssid
=None, passphrase
=None, expect_fail
=False):
431 cmd
= "DPP_AUTH_INIT"
433 peer
= self
.dpp_qr_code(uri
)
434 cmd
+= " peer=%d" % peer
436 cmd
+= " own=%d" % own
438 cmd
+= " role=" + role
442 cmd
+= " conf=" + conf
443 if configurator
is not None:
444 cmd
+= " configurator=%d" % configurator
446 cmd
+= " neg_freq=%d" % neg_freq
448 cmd
+= " ssid=" + binascii
.hexlify(ssid
.encode()).decode()
450 cmd
+= " pass=" + binascii
.hexlify(passphrase
.encode()).decode()
451 res
= self
.request(cmd
)
453 if "FAIL" not in res
:
454 raise Exception("DPP authentication started unexpectedly")
457 raise Exception("Failed to initiate DPP Authentication")
459 def dpp_pkex_init(self
, identifier
, code
, role
=None, key
=None, curve
=None,
460 extra
=None, use_id
=None):
462 id1
= self
.dpp_bootstrap_gen(type="pkex", key
=key
, curve
=curve
)
465 cmd
= "own=%d " % id1
467 cmd
+= "identifier=%s " % identifier
470 cmd
+= "role=%s " % role
473 cmd
+= "code=%s" % code
474 res
= self
.request("DPP_PKEX_ADD " + cmd
)
476 raise Exception("Failed to set PKEX data (initiator)")
479 def dpp_pkex_resp(self
, freq
, identifier
, code
, key
=None, curve
=None,
481 id0
= self
.dpp_bootstrap_gen(type="pkex", key
=key
, curve
=curve
)
482 cmd
= "own=%d " % id0
484 cmd
+= "identifier=%s " % identifier
485 cmd
+= "code=%s" % code
486 res
= self
.request("DPP_PKEX_ADD " + cmd
)
488 raise Exception("Failed to set PKEX data (responder)")
489 self
.dpp_listen(freq
, role
=listen_role
)
491 def dpp_configurator_add(self
, curve
=None, key
=None):
492 cmd
= "DPP_CONFIGURATOR_ADD"
494 cmd
+= " curve=" + curve
497 res
= self
.request(cmd
)
499 raise Exception("Failed to add configurator")
502 def dpp_configurator_remove(self
, conf_id
):
503 res
= self
.request("DPP_CONFIGURATOR_REMOVE %d" % conf_id
)
505 raise Exception("DPP_CONFIGURATOR_REMOVE failed")
508 self
.request("NOTE " + txt
)
510 def add_ap(apdev
, params
, wait_enabled
=True, no_enable
=False, timeout
=30,
511 global_ctrl_override
=None, driver
=False):
512 if isinstance(apdev
, dict):
513 ifname
= apdev
['ifname']
515 hostname
= apdev
['hostname']
517 logger
.info("Starting AP " + hostname
+ "/" + port
+ " " + ifname
)
519 logger
.info("Starting AP " + ifname
)
524 logger
.info("Starting AP " + ifname
+ " (old add_ap argument type)")
527 hapd_global
= HostapdGlobal(apdev
,
528 global_ctrl_override
=global_ctrl_override
)
529 hapd_global
.remove(ifname
)
530 hapd_global
.add(ifname
, driver
=driver
)
531 port
= hapd_global
.get_ctrl_iface_port(ifname
)
532 hapd
= Hostapd(ifname
, hostname
=hostname
, port
=port
)
534 raise Exception("Could not ping hostapd")
536 fields
= ["ssid", "wpa_passphrase", "nas_identifier", "wpa_key_mgmt",
538 "wpa_pairwise", "rsn_pairwise", "auth_server_addr",
539 "acct_server_addr", "osu_server_uri"]
542 hapd
.set(field
, params
[field
])
543 for f
, v
in list(params
.items()):
546 if isinstance(v
, list):
555 ev
= hapd
.wait_event(["AP-ENABLED", "AP-DISABLED"], timeout
=timeout
)
557 raise Exception("AP startup timed out")
558 if "AP-ENABLED" not in ev
:
559 raise Exception("AP startup failed")
562 def add_bss(apdev
, ifname
, confname
, ignore_error
=False):
563 phy
= utils
.get_phy(apdev
)
565 hostname
= apdev
['hostname']
567 logger
.info("Starting BSS " + hostname
+ "/" + port
+ " phy=" + phy
+ " ifname=" + ifname
)
569 logger
.info("Starting BSS phy=" + phy
+ " ifname=" + ifname
)
572 hapd_global
= HostapdGlobal(apdev
)
573 hapd_global
.add_bss(phy
, confname
, ignore_error
)
574 port
= hapd_global
.get_ctrl_iface_port(ifname
)
575 hapd
= Hostapd(ifname
, hostname
=hostname
, port
=port
)
577 raise Exception("Could not ping hostapd")
580 def add_iface(apdev
, confname
):
581 ifname
= apdev
['ifname']
583 hostname
= apdev
['hostname']
585 logger
.info("Starting interface " + hostname
+ "/" + port
+ " " + ifname
)
587 logger
.info("Starting interface " + ifname
)
590 hapd_global
= HostapdGlobal(apdev
)
591 hapd_global
.add_iface(ifname
, confname
)
592 port
= hapd_global
.get_ctrl_iface_port(ifname
)
593 hapd
= Hostapd(ifname
, hostname
=hostname
, port
=port
)
595 raise Exception("Could not ping hostapd")
598 def remove_bss(apdev
, ifname
=None):
600 ifname
= apdev
['ifname']
602 hostname
= apdev
['hostname']
604 logger
.info("Removing BSS " + hostname
+ "/" + port
+ " " + ifname
)
606 logger
.info("Removing BSS " + ifname
)
607 hapd_global
= HostapdGlobal(apdev
)
608 hapd_global
.remove(ifname
)
610 def terminate(apdev
):
612 hostname
= apdev
['hostname']
614 logger
.info("Terminating hostapd " + hostname
+ "/" + port
)
616 logger
.info("Terminating hostapd")
617 hapd_global
= HostapdGlobal(apdev
)
618 hapd_global
.terminate()
620 def wpa2_params(ssid
=None, passphrase
=None):
621 params
= {"wpa": "2",
622 "wpa_key_mgmt": "WPA-PSK",
623 "rsn_pairwise": "CCMP"}
625 params
["ssid"] = ssid
627 params
["wpa_passphrase"] = passphrase
630 def wpa_params(ssid
=None, passphrase
=None):
631 params
= {"wpa": "1",
632 "wpa_key_mgmt": "WPA-PSK",
633 "wpa_pairwise": "TKIP"}
635 params
["ssid"] = ssid
637 params
["wpa_passphrase"] = passphrase
640 def wpa_mixed_params(ssid
=None, passphrase
=None):
641 params
= {"wpa": "3",
642 "wpa_key_mgmt": "WPA-PSK",
643 "wpa_pairwise": "TKIP",
644 "rsn_pairwise": "CCMP"}
646 params
["ssid"] = ssid
648 params
["wpa_passphrase"] = passphrase
652 params
= {"auth_server_addr": "127.0.0.1",
653 "auth_server_port": "1812",
654 "auth_server_shared_secret": "radius",
655 "nas_identifier": "nas.w1.fi"}
658 def wpa_eap_params(ssid
=None):
659 params
= radius_params()
661 params
["wpa_key_mgmt"] = "WPA-EAP"
662 params
["wpa_pairwise"] = "TKIP"
663 params
["ieee8021x"] = "1"
665 params
["ssid"] = ssid
668 def wpa2_eap_params(ssid
=None):
669 params
= radius_params()
671 params
["wpa_key_mgmt"] = "WPA-EAP"
672 params
["rsn_pairwise"] = "CCMP"
673 params
["ieee8021x"] = "1"
675 params
["ssid"] = ssid
678 def b_only_params(channel
="1", ssid
=None, country
=None):
679 params
= {"hw_mode": "b",
682 params
["ssid"] = ssid
684 params
["country_code"] = country
687 def g_only_params(channel
="1", ssid
=None, country
=None):
688 params
= {"hw_mode": "g",
691 params
["ssid"] = ssid
693 params
["country_code"] = country
696 def a_only_params(channel
="36", ssid
=None, country
=None):
697 params
= {"hw_mode": "a",
700 params
["ssid"] = ssid
702 params
["country_code"] = country
705 def ht20_params(channel
="1", ssid
=None, country
=None):
706 params
= {"ieee80211n": "1",
709 if int(channel
) > 14:
710 params
["hw_mode"] = "a"
712 params
["ssid"] = ssid
714 params
["country_code"] = country
717 def ht40_plus_params(channel
="1", ssid
=None, country
=None):
718 params
= ht20_params(channel
, ssid
, country
)
719 params
['ht_capab'] = "[HT40+]"
722 def ht40_minus_params(channel
="1", ssid
=None, country
=None):
723 params
= ht20_params(channel
, ssid
, country
)
724 params
['ht_capab'] = "[HT40-]"
727 def cmd_execute(apdev
, cmd
, shell
=False):
728 hapd_global
= HostapdGlobal(apdev
)
729 return hapd_global
.cmd_execute(cmd
, shell
=shell
)