]> git.ipfire.org Git - thirdparty/hostap.git/blobdiff - tests/hwsim/hostapd.py
tests: remote: Generate and send BSS configuration files
[thirdparty/hostap.git] / tests / hwsim / hostapd.py
index 7cd454291ed1b2d66b28075f9d337dc36893e35b..04e460c1d73669a065c4d20900ade368bb186e46 100644 (file)
@@ -5,6 +5,7 @@
 # See README for more details.
 
 import os
+import re
 import time
 import logging
 import binascii
@@ -19,7 +20,7 @@ hapd_ctrl = '/var/run/hostapd'
 hapd_global = '/var/run/hostapd-global'
 
 def mac2tuple(mac):
-    return struct.unpack('6B', binascii.unhexlify(mac.replace(':','')))
+    return struct.unpack('6B', binascii.unhexlify(mac.replace(':', '')))
 
 class HostapdGlobal:
     def __init__(self, apdev=None, global_ctrl_override=None):
@@ -85,17 +86,17 @@ class HostapdGlobal:
         if driver:
             cmd += " " + driver
         res = self.request(cmd)
-        if not "OK" in res:
+        if "OK" not in res:
             raise Exception("Could not add hostapd interface " + ifname)
 
     def add_iface(self, ifname, confname):
         res = self.request("ADD " + ifname + " config=" + confname)
-        if not "OK" in res:
+        if "OK" not in res:
             raise Exception("Could not add hostapd interface")
 
     def add_bss(self, phy, confname, ignore_error=False):
         res = self.request("ADD bss_config=" + phy + ":" + confname)
-        if not "OK" in res:
+        if "OK" not in res:
             if not ignore_error:
                 raise Exception("Could not add hostapd BSS")
 
@@ -135,6 +136,9 @@ class HostapdGlobal:
         self.ctrl.terminate()
         self.ctrl = None
 
+    def send_file(self, src, dst):
+        self.host.send_file(src, dst)
+
 class Hostapd:
     def __init__(self, ifname, bssidx=0, hostname=None, port=8877):
         self.hostname = hostname
@@ -179,6 +183,9 @@ class Hostapd:
             self.bssid = self.get_status_field('bssid[%d]' % self.bssidx)
         return self.bssid
 
+    def get_addr(self, group=False):
+        return self.own_addr()
+
     def request(self, cmd):
         logger.debug(self.dbg + ": CTRL: " + cmd)
         return self.ctrl.request(cmd)
@@ -187,7 +194,7 @@ class Hostapd:
         return "PONG" in self.request("PING")
 
     def set(self, field, value):
-        if not "OK" in self.request("SET " + field + " " + value):
+        if "OK" not in self.request("SET " + field + " " + value):
             raise Exception("Failed to set hostapd parameter " + field)
 
     def set_defaults(self):
@@ -233,11 +240,11 @@ class Hostapd:
         self.set("wep_key0", key)
 
     def enable(self):
-        if not "OK" in self.request("ENABLE"):
+        if "OK" not in self.request("ENABLE"):
             raise Exception("Failed to enable hostapd interface " + self.ifname)
 
     def disable(self):
-        if not "OK" in self.request("DISABLE"):
+        if "OK" not in self.request("DISABLE"):
             raise Exception("Failed to disable hostapd interface " + self.ifname)
 
     def dump_monitor(self):
@@ -246,6 +253,8 @@ class Hostapd:
             logger.debug(self.dbg + ": " + ev)
 
     def wait_event(self, events, timeout):
+        if not isinstance(events, list):
+            raise Exception("Hostapd.wait_event() called with incorrect events argument type")
         start = os.times()[4]
         while True:
             while self.mon.pending():
@@ -262,12 +271,31 @@ class Hostapd:
                 break
         return None
 
+    def wait_sta(self, addr=None, timeout=2):
+        ev = self.wait_event(["AP-STA-CONNECT"], timeout=timeout)
+        if ev is None:
+            raise Exception("AP did not report STA connection")
+        if addr and addr not in ev:
+            raise Exception("Unexpected STA address in connection event: " + ev)
+
+    def wait_ptkinitdone(self, addr, timeout=2):
+        while timeout > 0:
+            sta = self.get_sta(addr)
+            if 'hostapdWPAPTKState' not in sta:
+                raise Exception("GET_STA did not return hostapdWPAPTKState")
+            state = sta['hostapdWPAPTKState']
+            if state == "11":
+                return
+            time.sleep(0.1)
+            timeout -= 0.1
+        raise Exception("Timeout while waiting for PTKINITDONE")
+
     def get_status(self):
         res = self.request("STATUS")
         lines = res.splitlines()
         vals = dict()
         for l in lines:
-            [name,value] = l.split('=', 1)
+            [name, value] = l.split('=', 1)
             vals[name] = value
         return vals
 
@@ -282,7 +310,7 @@ class Hostapd:
         lines = res.splitlines()
         vals = dict()
         for l in lines:
-            [name,value] = l.split('=', 1)
+            [name, value] = l.split('=', 1)
             vals[name] = value
         return vals
 
@@ -297,7 +325,7 @@ class Hostapd:
         lines = res.splitlines()
         vals = dict()
         for l in lines:
-            [name,value] = l.split('=', 1)
+            [name, value] = l.split('=', 1)
             vals[name] = value
         return vals
 
@@ -349,7 +377,7 @@ class Hostapd:
                 vals['addr'] = l
                 first = False
             else:
-                [name,value] = l.split('=', 1)
+                [name, value] = l.split('=', 1)
                 vals[name] = value
         return vals
 
@@ -373,7 +401,7 @@ class Hostapd:
             if addr not in l:
                 continue
             vals = dict()
-            [index,aa,pmkid,expiration,opportunistic] = l.split(' ')
+            [index, aa, pmkid, expiration, opportunistic] = l.split(' ')
             vals['index'] = index
             vals['pmkid'] = pmkid
             vals['expiration'] = expiration
@@ -387,8 +415,124 @@ class Hostapd:
             raise Exception("Failed to parse QR Code URI")
         return int(res)
 
+    def dpp_bootstrap_gen(self, type="qrcode", chan=None, mac=None, info=None,
+                          curve=None, key=None):
+        cmd = "DPP_BOOTSTRAP_GEN type=" + type
+        if chan:
+            cmd += " chan=" + chan
+        if mac:
+            if mac is True:
+                mac = self.own_addr()
+            cmd += " mac=" + mac.replace(':', '')
+        if info:
+            cmd += " info=" + info
+        if curve:
+            cmd += " curve=" + curve
+        if key:
+            cmd += " key=" + key
+        res = self.request(cmd)
+        if "FAIL" in res:
+            raise Exception("Failed to generate bootstrapping info")
+        return int(res)
+
+    def dpp_listen(self, freq, netrole=None, qr=None, role=None):
+        cmd = "DPP_LISTEN " + str(freq)
+        if netrole:
+            cmd += " netrole=" + netrole
+        if qr:
+            cmd += " qr=" + qr
+        if role:
+            cmd += " role=" + role
+        if "OK" not in self.request(cmd):
+            raise Exception("Failed to start listen operation")
+
+    def dpp_auth_init(self, peer=None, uri=None, conf=None, configurator=None,
+                      extra=None, own=None, role=None, neg_freq=None,
+                      ssid=None, passphrase=None, expect_fail=False):
+        cmd = "DPP_AUTH_INIT"
+        if peer is None:
+            peer = self.dpp_qr_code(uri)
+        cmd += " peer=%d" % peer
+        if own is not None:
+            cmd += " own=%d" % own
+        if role:
+            cmd += " role=" + role
+        if extra:
+            cmd += " " + extra
+        if conf:
+            cmd += " conf=" + conf
+        if configurator is not None:
+            cmd += " configurator=%d" % configurator
+        if neg_freq:
+            cmd += " neg_freq=%d" % neg_freq
+        if ssid:
+            cmd += " ssid=" + binascii.hexlify(ssid.encode()).decode()
+        if passphrase:
+            cmd += " pass=" + binascii.hexlify(passphrase.encode()).decode()
+        res = self.request(cmd)
+        if expect_fail:
+            if "FAIL" not in res:
+                raise Exception("DPP authentication started unexpectedly")
+            return
+        if "OK" not in res:
+            raise Exception("Failed to initiate DPP Authentication")
+
+    def dpp_pkex_init(self, identifier, code, role=None, key=None, curve=None,
+                      extra=None, use_id=None):
+        if use_id is None:
+            id1 = self.dpp_bootstrap_gen(type="pkex", key=key, curve=curve)
+        else:
+            id1 = use_id
+        cmd = "own=%d " % id1
+        if identifier:
+            cmd += "identifier=%s " % identifier
+        cmd += "init=1 "
+        if role:
+            cmd += "role=%s " % role
+        if extra:
+            cmd += extra + " "
+        cmd += "code=%s" % code
+        res = self.request("DPP_PKEX_ADD " + cmd)
+        if "FAIL" in res:
+            raise Exception("Failed to set PKEX data (initiator)")
+        return id1
+
+    def dpp_pkex_resp(self, freq, identifier, code, key=None, curve=None,
+                      listen_role=None):
+        id0 = self.dpp_bootstrap_gen(type="pkex", key=key, curve=curve)
+        cmd = "own=%d " % id0
+        if identifier:
+            cmd += "identifier=%s " % identifier
+        cmd += "code=%s" % code
+        res = self.request("DPP_PKEX_ADD " + cmd)
+        if "FAIL" in res:
+            raise Exception("Failed to set PKEX data (responder)")
+        self.dpp_listen(freq, role=listen_role)
+
+    def dpp_configurator_add(self, curve=None, key=None):
+        cmd = "DPP_CONFIGURATOR_ADD"
+        if curve:
+            cmd += " curve=" + curve
+        if key:
+            cmd += " key=" + key
+        res = self.request(cmd)
+        if "FAIL" in res:
+            raise Exception("Failed to add configurator")
+        return int(res)
+
+    def dpp_configurator_remove(self, conf_id):
+        res = self.request("DPP_CONFIGURATOR_REMOVE %d" % conf_id)
+        if "OK" not in res:
+            raise Exception("DPP_CONFIGURATOR_REMOVE failed")
+
+    def note(self, txt):
+        self.request("NOTE " + txt)
+
+    def send_file(self, src, dst):
+        self.host.send_file(src, dst)
+
 def add_ap(apdev, params, wait_enabled=True, no_enable=False, timeout=30,
-           global_ctrl_override=None):
+           global_ctrl_override=None, driver=False):
         if isinstance(apdev, dict):
             ifname = apdev['ifname']
             try:
@@ -407,20 +551,20 @@ def add_ap(apdev, params, wait_enabled=True, no_enable=False, timeout=30,
         hapd_global = HostapdGlobal(apdev,
                                     global_ctrl_override=global_ctrl_override)
         hapd_global.remove(ifname)
-        hapd_global.add(ifname)
+        hapd_global.add(ifname, driver=driver)
         port = hapd_global.get_ctrl_iface_port(ifname)
         hapd = Hostapd(ifname, hostname=hostname, port=port)
         if not hapd.ping():
             raise Exception("Could not ping hostapd")
         hapd.set_defaults()
-        fields = [ "ssid", "wpa_passphrase", "nas_identifier", "wpa_key_mgmt",
-                   "wpa",
-                   "wpa_pairwise", "rsn_pairwise", "auth_server_addr",
-                   "acct_server_addr", "osu_server_uri" ]
+        fields = ["ssid", "wpa_passphrase", "nas_identifier", "wpa_key_mgmt",
+                  "wpa", "wpa_deny_ptk0_rekey",
+                  "wpa_pairwise", "rsn_pairwise", "auth_server_addr",
+                  "acct_server_addr", "osu_server_uri"]
         for field in fields:
             if field in params:
                 hapd.set(field, params[field])
-        for f,v in list(params.items()):
+        for f, v in list(params.items()):
             if f in fields:
                 continue
             if isinstance(v, list):
@@ -450,6 +594,8 @@ def add_bss(apdev, ifname, confname, ignore_error=False):
         hostname = None
         port = 8878
     hapd_global = HostapdGlobal(apdev)
+    confname = cfg_file(apdev, confname, ifname)
+    hapd_global.send_file(confname, confname)
     hapd_global.add_bss(phy, confname, ignore_error)
     port = hapd_global.get_ctrl_iface_port(ifname)
     hapd = Hostapd(ifname, hostname=hostname, port=port)
@@ -468,6 +614,8 @@ def add_iface(apdev, confname):
         hostname = None
         port = 8878
     hapd_global = HostapdGlobal(apdev)
+    confname = cfg_file(apdev, confname, ifname)
+    hapd_global.send_file(confname, confname)
     hapd_global.add_iface(ifname, confname)
     port = hapd_global.get_ctrl_iface_port(ifname)
     hapd = Hostapd(ifname, hostname=hostname, port=port)
@@ -497,20 +645,23 @@ def terminate(apdev):
     hapd_global = HostapdGlobal(apdev)
     hapd_global.terminate()
 
-def wpa2_params(ssid=None, passphrase=None):
-    params = { "wpa": "2",
-               "wpa_key_mgmt": "WPA-PSK",
-               "rsn_pairwise": "CCMP" }
+def wpa2_params(ssid=None, passphrase=None, wpa_key_mgmt="WPA-PSK",
+                ieee80211w=None):
+    params = {"wpa": "2",
+              "wpa_key_mgmt": wpa_key_mgmt,
+              "rsn_pairwise": "CCMP"}
     if ssid:
         params["ssid"] = ssid
     if passphrase:
         params["wpa_passphrase"] = passphrase
+    if ieee80211w is not None:
+        params["ieee80211w"] = ieee80211w
     return params
 
 def wpa_params(ssid=None, passphrase=None):
-    params = { "wpa": "1",
-               "wpa_key_mgmt": "WPA-PSK",
-               "wpa_pairwise": "TKIP" }
+    params = {"wpa": "1",
+              "wpa_key_mgmt": "WPA-PSK",
+              "wpa_pairwise": "TKIP"}
     if ssid:
         params["ssid"] = ssid
     if passphrase:
@@ -518,10 +669,10 @@ def wpa_params(ssid=None, passphrase=None):
     return params
 
 def wpa_mixed_params(ssid=None, passphrase=None):
-    params = { "wpa": "3",
-               "wpa_key_mgmt": "WPA-PSK",
-               "wpa_pairwise": "TKIP",
-               "rsn_pairwise": "CCMP" }
+    params = {"wpa": "3",
+              "wpa_key_mgmt": "WPA-PSK",
+              "wpa_pairwise": "TKIP",
+              "rsn_pairwise": "CCMP"}
     if ssid:
         params["ssid"] = ssid
     if passphrase:
@@ -529,10 +680,10 @@ def wpa_mixed_params(ssid=None, passphrase=None):
     return params
 
 def radius_params():
-    params = { "auth_server_addr": "127.0.0.1",
-               "auth_server_port": "1812",
-               "auth_server_shared_secret": "radius",
-               "nas_identifier": "nas.w1.fi" }
+    params = {"auth_server_addr": "127.0.0.1",
+              "auth_server_port": "1812",
+              "auth_server_shared_secret": "radius",
+              "nas_identifier": "nas.w1.fi"}
     return params
 
 def wpa_eap_params(ssid=None):
@@ -556,8 +707,8 @@ def wpa2_eap_params(ssid=None):
     return params
 
 def b_only_params(channel="1", ssid=None, country=None):
-    params = { "hw_mode" : "b",
-               "channel" : channel }
+    params = {"hw_mode": "b",
+              "channel": channel}
     if ssid:
         params["ssid"] = ssid
     if country:
@@ -565,8 +716,8 @@ def b_only_params(channel="1", ssid=None, country=None):
     return params
 
 def g_only_params(channel="1", ssid=None, country=None):
-    params = { "hw_mode" : "g",
-               "channel" : channel }
+    params = {"hw_mode": "g",
+              "channel": channel}
     if ssid:
         params["ssid"] = ssid
     if country:
@@ -574,8 +725,8 @@ def g_only_params(channel="1", ssid=None, country=None):
     return params
 
 def a_only_params(channel="36", ssid=None, country=None):
-    params = { "hw_mode" : "a",
-               "channel" : channel }
+    params = {"hw_mode": "a",
+              "channel": channel}
     if ssid:
         params["ssid"] = ssid
     if country:
@@ -583,9 +734,9 @@ def a_only_params(channel="36", ssid=None, country=None):
     return params
 
 def ht20_params(channel="1", ssid=None, country=None):
-    params = { "ieee80211n" : "1",
-               "channel" : channel,
-               "hw_mode" : "g" }
+    params = {"ieee80211n": "1",
+              "channel": channel,
+              "hw_mode": "g"}
     if int(channel) > 14:
         params["hw_mode"] = "a"
     if ssid:
@@ -607,3 +758,77 @@ def ht40_minus_params(channel="1", ssid=None, country=None):
 def cmd_execute(apdev, cmd, shell=False):
     hapd_global = HostapdGlobal(apdev)
     return hapd_global.cmd_execute(cmd, shell=shell)
+
+def send_file(apdev, src, dst):
+    hapd_global = HostapdGlobal(apdev)
+    return hapd_global.send_file(src, dst)
+
+def acl_file(dev, apdev, conf):
+    filename = os.path.join("/tmp", conf)
+
+    if conf == 'hostapd.macaddr':
+        with open(filename, 'w') as f:
+            mac0 = dev[0].get_status_field("address")
+            f.write(mac0 + '\n')
+            f.write("02:00:00:00:00:12\n")
+            f.write("02:00:00:00:00:34\n")
+            f.write("-02:00:00:00:00:12\n")
+            f.write("-02:00:00:00:00:34\n")
+            f.write("01:01:01:01:01:01\n")
+            f.write("03:01:01:01:01:03\n")
+    elif conf == 'hostapd.accept':
+        with open(filename, 'w') as f:
+            mac0 = dev[0].get_status_field("address")
+            mac1 = dev[1].get_status_field("address")
+            f.write(mac0 + "    1\n")
+            f.write(mac1 + "    2\n")
+    elif conf == 'hostapd.accept2':
+        with open(filename, 'w') as f:
+            mac0 = dev[0].get_status_field("address")
+            mac1 = dev[1].get_status_field("address")
+            mac2 = dev[2].get_status_field("address")
+            f.write(mac0 + "    1\n")
+            f.write(mac1 + "    2\n")
+            f.write(mac2 + "    3\n")
+    else:
+        return conf
+
+    return filename
+
+def bssid_inc(apdev, inc=1):
+    parts = apdev['bssid'].split(':')
+    parts[5] = '%02x' % (int(parts[5], 16) + int(inc))
+    bssid = '%s:%s:%s:%s:%s:%s' % (parts[0], parts[1], parts[2],
+                                   parts[3], parts[4], parts[5])
+    return bssid
+
+def cfg_file(apdev, conf, ifname=None):
+    # put cfg file in /tmp directory
+    fname = os.path.join("/tmp", conf)
+
+    match = re.search(r'^bss-\d+', conf)
+    if match:
+        with open(fname, 'w') as f:
+            idx = ''.join(filter(str.isdigit, conf))
+            if ifname is None:
+                ifname = apdev['ifname']
+                if idx != '1':
+                    ifname = ifname + '-' + idx
+
+            f.write("driver=nl80211\n")
+            f.write("ctrl_interface=/var/run/hostapd\n")
+            f.write("hw_mode=g\n")
+            f.write("channel=1\n")
+            f.write("ieee80211n=1\n")
+            f.write("interface=%s\n" % ifname)
+
+            f.write("ssid=bss-%s\n" % idx)
+            if conf == 'bss-2-dup.conf':
+                bssid = apdev['bssid']
+            else:
+                bssid = bssid_inc(apdev, int(idx) - 1)
+            f.write("bssid=%s\n" % bssid)
+    else:
+        return conf
+
+    return fname