# See README for more details.
import os
+import re
import time
import logging
import binascii
hapd_global = '/var/run/hostapd-global'
def mac2tuple(mac):
- return struct.unpack('6B', binascii.unhexlify(mac.replace(':','')))
+ return struct.unpack('6B', binascii.unhexlify(mac.replace(':', '')))
class HostapdGlobal:
def __init__(self, apdev=None, global_ctrl_override=None):
if driver:
cmd += " " + driver
res = self.request(cmd)
- if not "OK" in res:
+ if "OK" not in res:
raise Exception("Could not add hostapd interface " + ifname)
def add_iface(self, ifname, confname):
res = self.request("ADD " + ifname + " config=" + confname)
- if not "OK" in res:
+ if "OK" not in res:
raise Exception("Could not add hostapd interface")
def add_bss(self, phy, confname, ignore_error=False):
res = self.request("ADD bss_config=" + phy + ":" + confname)
- if not "OK" in res:
+ if "OK" not in res:
if not ignore_error:
raise Exception("Could not add hostapd BSS")
self.ctrl.terminate()
self.ctrl = None
+ def send_file(self, src, dst):
+ self.host.send_file(src, dst)
+
class Hostapd:
def __init__(self, ifname, bssidx=0, hostname=None, port=8877):
self.hostname = hostname
self.bssid = self.get_status_field('bssid[%d]' % self.bssidx)
return self.bssid
+ def get_addr(self, group=False):
+ return self.own_addr()
+
def request(self, cmd):
logger.debug(self.dbg + ": CTRL: " + cmd)
return self.ctrl.request(cmd)
return "PONG" in self.request("PING")
def set(self, field, value):
- if not "OK" in self.request("SET " + field + " " + value):
+ if "OK" not in self.request("SET " + field + " " + value):
raise Exception("Failed to set hostapd parameter " + field)
def set_defaults(self):
self.set("wep_key0", key)
def enable(self):
- if not "OK" in self.request("ENABLE"):
+ if "OK" not in self.request("ENABLE"):
raise Exception("Failed to enable hostapd interface " + self.ifname)
def disable(self):
- if not "OK" in self.request("DISABLE"):
+ if "OK" not in self.request("DISABLE"):
raise Exception("Failed to disable hostapd interface " + self.ifname)
def dump_monitor(self):
logger.debug(self.dbg + ": " + ev)
def wait_event(self, events, timeout):
+ if not isinstance(events, list):
+ raise Exception("Hostapd.wait_event() called with incorrect events argument type")
start = os.times()[4]
while True:
while self.mon.pending():
break
return None
+ def wait_sta(self, addr=None, timeout=2):
+ ev = self.wait_event(["AP-STA-CONNECT"], timeout=timeout)
+ if ev is None:
+ raise Exception("AP did not report STA connection")
+ if addr and addr not in ev:
+ raise Exception("Unexpected STA address in connection event: " + ev)
+
+ def wait_ptkinitdone(self, addr, timeout=2):
+ while timeout > 0:
+ sta = self.get_sta(addr)
+ if 'hostapdWPAPTKState' not in sta:
+ raise Exception("GET_STA did not return hostapdWPAPTKState")
+ state = sta['hostapdWPAPTKState']
+ if state == "11":
+ return
+ time.sleep(0.1)
+ timeout -= 0.1
+ raise Exception("Timeout while waiting for PTKINITDONE")
+
def get_status(self):
res = self.request("STATUS")
lines = res.splitlines()
vals = dict()
for l in lines:
- [name,value] = l.split('=', 1)
+ [name, value] = l.split('=', 1)
vals[name] = value
return vals
lines = res.splitlines()
vals = dict()
for l in lines:
- [name,value] = l.split('=', 1)
+ [name, value] = l.split('=', 1)
vals[name] = value
return vals
lines = res.splitlines()
vals = dict()
for l in lines:
- [name,value] = l.split('=', 1)
+ [name, value] = l.split('=', 1)
vals[name] = value
return vals
vals['addr'] = l
first = False
else:
- [name,value] = l.split('=', 1)
+ [name, value] = l.split('=', 1)
vals[name] = value
return vals
if addr not in l:
continue
vals = dict()
- [index,aa,pmkid,expiration,opportunistic] = l.split(' ')
+ [index, aa, pmkid, expiration, opportunistic] = l.split(' ')
vals['index'] = index
vals['pmkid'] = pmkid
vals['expiration'] = expiration
raise Exception("Failed to parse QR Code URI")
return int(res)
+ def dpp_bootstrap_gen(self, type="qrcode", chan=None, mac=None, info=None,
+ curve=None, key=None):
+ cmd = "DPP_BOOTSTRAP_GEN type=" + type
+ if chan:
+ cmd += " chan=" + chan
+ if mac:
+ if mac is True:
+ mac = self.own_addr()
+ cmd += " mac=" + mac.replace(':', '')
+ if info:
+ cmd += " info=" + info
+ if curve:
+ cmd += " curve=" + curve
+ if key:
+ cmd += " key=" + key
+ res = self.request(cmd)
+ if "FAIL" in res:
+ raise Exception("Failed to generate bootstrapping info")
+ return int(res)
+
+ def dpp_listen(self, freq, netrole=None, qr=None, role=None):
+ cmd = "DPP_LISTEN " + str(freq)
+ if netrole:
+ cmd += " netrole=" + netrole
+ if qr:
+ cmd += " qr=" + qr
+ if role:
+ cmd += " role=" + role
+ if "OK" not in self.request(cmd):
+ raise Exception("Failed to start listen operation")
+
+ def dpp_auth_init(self, peer=None, uri=None, conf=None, configurator=None,
+ extra=None, own=None, role=None, neg_freq=None,
+ ssid=None, passphrase=None, expect_fail=False):
+ cmd = "DPP_AUTH_INIT"
+ if peer is None:
+ peer = self.dpp_qr_code(uri)
+ cmd += " peer=%d" % peer
+ if own is not None:
+ cmd += " own=%d" % own
+ if role:
+ cmd += " role=" + role
+ if extra:
+ cmd += " " + extra
+ if conf:
+ cmd += " conf=" + conf
+ if configurator is not None:
+ cmd += " configurator=%d" % configurator
+ if neg_freq:
+ cmd += " neg_freq=%d" % neg_freq
+ if ssid:
+ cmd += " ssid=" + binascii.hexlify(ssid.encode()).decode()
+ if passphrase:
+ cmd += " pass=" + binascii.hexlify(passphrase.encode()).decode()
+ res = self.request(cmd)
+ if expect_fail:
+ if "FAIL" not in res:
+ raise Exception("DPP authentication started unexpectedly")
+ return
+ if "OK" not in res:
+ raise Exception("Failed to initiate DPP Authentication")
+
+ def dpp_pkex_init(self, identifier, code, role=None, key=None, curve=None,
+ extra=None, use_id=None):
+ if use_id is None:
+ id1 = self.dpp_bootstrap_gen(type="pkex", key=key, curve=curve)
+ else:
+ id1 = use_id
+ cmd = "own=%d " % id1
+ if identifier:
+ cmd += "identifier=%s " % identifier
+ cmd += "init=1 "
+ if role:
+ cmd += "role=%s " % role
+ if extra:
+ cmd += extra + " "
+ cmd += "code=%s" % code
+ res = self.request("DPP_PKEX_ADD " + cmd)
+ if "FAIL" in res:
+ raise Exception("Failed to set PKEX data (initiator)")
+ return id1
+
+ def dpp_pkex_resp(self, freq, identifier, code, key=None, curve=None,
+ listen_role=None):
+ id0 = self.dpp_bootstrap_gen(type="pkex", key=key, curve=curve)
+ cmd = "own=%d " % id0
+ if identifier:
+ cmd += "identifier=%s " % identifier
+ cmd += "code=%s" % code
+ res = self.request("DPP_PKEX_ADD " + cmd)
+ if "FAIL" in res:
+ raise Exception("Failed to set PKEX data (responder)")
+ self.dpp_listen(freq, role=listen_role)
+
+ def dpp_configurator_add(self, curve=None, key=None):
+ cmd = "DPP_CONFIGURATOR_ADD"
+ if curve:
+ cmd += " curve=" + curve
+ if key:
+ cmd += " key=" + key
+ res = self.request(cmd)
+ if "FAIL" in res:
+ raise Exception("Failed to add configurator")
+ return int(res)
+
+ def dpp_configurator_remove(self, conf_id):
+ res = self.request("DPP_CONFIGURATOR_REMOVE %d" % conf_id)
+ if "OK" not in res:
+ raise Exception("DPP_CONFIGURATOR_REMOVE failed")
+
+ def note(self, txt):
+ self.request("NOTE " + txt)
+
+ def send_file(self, src, dst):
+ self.host.send_file(src, dst)
+
def add_ap(apdev, params, wait_enabled=True, no_enable=False, timeout=30,
- global_ctrl_override=None):
+ global_ctrl_override=None, driver=False):
if isinstance(apdev, dict):
ifname = apdev['ifname']
try:
hapd_global = HostapdGlobal(apdev,
global_ctrl_override=global_ctrl_override)
hapd_global.remove(ifname)
- hapd_global.add(ifname)
+ hapd_global.add(ifname, driver=driver)
port = hapd_global.get_ctrl_iface_port(ifname)
hapd = Hostapd(ifname, hostname=hostname, port=port)
if not hapd.ping():
raise Exception("Could not ping hostapd")
hapd.set_defaults()
- fields = [ "ssid", "wpa_passphrase", "nas_identifier", "wpa_key_mgmt",
- "wpa",
- "wpa_pairwise", "rsn_pairwise", "auth_server_addr",
- "acct_server_addr", "osu_server_uri" ]
+ fields = ["ssid", "wpa_passphrase", "nas_identifier", "wpa_key_mgmt",
+ "wpa", "wpa_deny_ptk0_rekey",
+ "wpa_pairwise", "rsn_pairwise", "auth_server_addr",
+ "acct_server_addr", "osu_server_uri"]
for field in fields:
if field in params:
hapd.set(field, params[field])
- for f,v in list(params.items()):
+ for f, v in list(params.items()):
if f in fields:
continue
if isinstance(v, list):
hostname = None
port = 8878
hapd_global = HostapdGlobal(apdev)
+ confname = cfg_file(apdev, confname, ifname)
+ hapd_global.send_file(confname, confname)
hapd_global.add_bss(phy, confname, ignore_error)
port = hapd_global.get_ctrl_iface_port(ifname)
hapd = Hostapd(ifname, hostname=hostname, port=port)
hostname = None
port = 8878
hapd_global = HostapdGlobal(apdev)
+ confname = cfg_file(apdev, confname, ifname)
+ hapd_global.send_file(confname, confname)
hapd_global.add_iface(ifname, confname)
port = hapd_global.get_ctrl_iface_port(ifname)
hapd = Hostapd(ifname, hostname=hostname, port=port)
hapd_global = HostapdGlobal(apdev)
hapd_global.terminate()
-def wpa2_params(ssid=None, passphrase=None):
- params = { "wpa": "2",
- "wpa_key_mgmt": "WPA-PSK",
- "rsn_pairwise": "CCMP" }
+def wpa2_params(ssid=None, passphrase=None, wpa_key_mgmt="WPA-PSK",
+ ieee80211w=None):
+ params = {"wpa": "2",
+ "wpa_key_mgmt": wpa_key_mgmt,
+ "rsn_pairwise": "CCMP"}
if ssid:
params["ssid"] = ssid
if passphrase:
params["wpa_passphrase"] = passphrase
+ if ieee80211w is not None:
+ params["ieee80211w"] = ieee80211w
return params
def wpa_params(ssid=None, passphrase=None):
- params = { "wpa": "1",
- "wpa_key_mgmt": "WPA-PSK",
- "wpa_pairwise": "TKIP" }
+ params = {"wpa": "1",
+ "wpa_key_mgmt": "WPA-PSK",
+ "wpa_pairwise": "TKIP"}
if ssid:
params["ssid"] = ssid
if passphrase:
return params
def wpa_mixed_params(ssid=None, passphrase=None):
- params = { "wpa": "3",
- "wpa_key_mgmt": "WPA-PSK",
- "wpa_pairwise": "TKIP",
- "rsn_pairwise": "CCMP" }
+ params = {"wpa": "3",
+ "wpa_key_mgmt": "WPA-PSK",
+ "wpa_pairwise": "TKIP",
+ "rsn_pairwise": "CCMP"}
if ssid:
params["ssid"] = ssid
if passphrase:
return params
def radius_params():
- params = { "auth_server_addr": "127.0.0.1",
- "auth_server_port": "1812",
- "auth_server_shared_secret": "radius",
- "nas_identifier": "nas.w1.fi" }
+ params = {"auth_server_addr": "127.0.0.1",
+ "auth_server_port": "1812",
+ "auth_server_shared_secret": "radius",
+ "nas_identifier": "nas.w1.fi"}
return params
def wpa_eap_params(ssid=None):
return params
def b_only_params(channel="1", ssid=None, country=None):
- params = { "hw_mode" : "b",
- "channel" : channel }
+ params = {"hw_mode": "b",
+ "channel": channel}
if ssid:
params["ssid"] = ssid
if country:
return params
def g_only_params(channel="1", ssid=None, country=None):
- params = { "hw_mode" : "g",
- "channel" : channel }
+ params = {"hw_mode": "g",
+ "channel": channel}
if ssid:
params["ssid"] = ssid
if country:
return params
def a_only_params(channel="36", ssid=None, country=None):
- params = { "hw_mode" : "a",
- "channel" : channel }
+ params = {"hw_mode": "a",
+ "channel": channel}
if ssid:
params["ssid"] = ssid
if country:
return params
def ht20_params(channel="1", ssid=None, country=None):
- params = { "ieee80211n" : "1",
- "channel" : channel,
- "hw_mode" : "g" }
+ params = {"ieee80211n": "1",
+ "channel": channel,
+ "hw_mode": "g"}
if int(channel) > 14:
params["hw_mode"] = "a"
if ssid:
def cmd_execute(apdev, cmd, shell=False):
hapd_global = HostapdGlobal(apdev)
return hapd_global.cmd_execute(cmd, shell=shell)
+
+def send_file(apdev, src, dst):
+ hapd_global = HostapdGlobal(apdev)
+ return hapd_global.send_file(src, dst)
+
+def acl_file(dev, apdev, conf):
+ filename = os.path.join("/tmp", conf)
+
+ if conf == 'hostapd.macaddr':
+ with open(filename, 'w') as f:
+ mac0 = dev[0].get_status_field("address")
+ f.write(mac0 + '\n')
+ f.write("02:00:00:00:00:12\n")
+ f.write("02:00:00:00:00:34\n")
+ f.write("-02:00:00:00:00:12\n")
+ f.write("-02:00:00:00:00:34\n")
+ f.write("01:01:01:01:01:01\n")
+ f.write("03:01:01:01:01:03\n")
+ elif conf == 'hostapd.accept':
+ with open(filename, 'w') as f:
+ mac0 = dev[0].get_status_field("address")
+ mac1 = dev[1].get_status_field("address")
+ f.write(mac0 + " 1\n")
+ f.write(mac1 + " 2\n")
+ elif conf == 'hostapd.accept2':
+ with open(filename, 'w') as f:
+ mac0 = dev[0].get_status_field("address")
+ mac1 = dev[1].get_status_field("address")
+ mac2 = dev[2].get_status_field("address")
+ f.write(mac0 + " 1\n")
+ f.write(mac1 + " 2\n")
+ f.write(mac2 + " 3\n")
+ else:
+ return conf
+
+ return filename
+
+def bssid_inc(apdev, inc=1):
+ parts = apdev['bssid'].split(':')
+ parts[5] = '%02x' % (int(parts[5], 16) + int(inc))
+ bssid = '%s:%s:%s:%s:%s:%s' % (parts[0], parts[1], parts[2],
+ parts[3], parts[4], parts[5])
+ return bssid
+
+def cfg_file(apdev, conf, ifname=None):
+ # put cfg file in /tmp directory
+ fname = os.path.join("/tmp", conf)
+
+ match = re.search(r'^bss-\d+', conf)
+ if match:
+ with open(fname, 'w') as f:
+ idx = ''.join(filter(str.isdigit, conf))
+ if ifname is None:
+ ifname = apdev['ifname']
+ if idx != '1':
+ ifname = ifname + '-' + idx
+
+ f.write("driver=nl80211\n")
+ f.write("ctrl_interface=/var/run/hostapd\n")
+ f.write("hw_mode=g\n")
+ f.write("channel=1\n")
+ f.write("ieee80211n=1\n")
+ f.write("interface=%s\n" % ifname)
+
+ f.write("ssid=bss-%s\n" % idx)
+ if conf == 'bss-2-dup.conf':
+ bssid = apdev['bssid']
+ else:
+ bssid = bssid_inc(apdev, int(idx) - 1)
+ f.write("bssid=%s\n" % bssid)
+ else:
+ return conf
+
+ return fname