]> git.ipfire.org Git - thirdparty/hostap.git/commitdiff
tests: Allow WlantestCapture to be used with context managers
authorJouni Malinen <j@w1.fi>
Sun, 17 Mar 2024 14:27:54 +0000 (16:27 +0200)
committerJouni Malinen <j@w1.fi>
Sun, 17 Mar 2024 15:10:33 +0000 (17:10 +0200)
In addition, convert many of the uses to do so.

Signed-off-by: Jouni Malinen <j@w1.fi>
tests/hwsim/test_ap_open.py
tests/hwsim/test_ap_psk.py
tests/hwsim/test_dpp.py
tests/hwsim/test_dpp3.py
tests/hwsim/wlantest.py

index fcec8721051ebc97d840bf1cd3d309339664f7bc..8c8a9c4444637802d66694e463e55851a866325d 100644 (file)
@@ -996,16 +996,12 @@ def test_ap_open_layer_2_update(dev, apdev, params):
     cap = os.path.join(params['logdir'], prefix + "." + ifname + ".pcap")
 
     hapd = hostapd.add_ap(apdev[0], {"ssid": "open"})
-    wt = WlantestCapture(ifname, cap)
-    time.sleep(1)
-
-    dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")
-    hapd.wait_sta()
-    hwsim_utils.test_connectivity(dev[0], hapd)
-    time.sleep(1)
-    hwsim_utils.test_connectivity(dev[0], hapd)
-    time.sleep(0.5)
-    wt.close()
+    with WlantestCapture(ifname, cap):
+        dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")
+        hapd.wait_sta()
+        hwsim_utils.test_connectivity(dev[0], hapd)
+        time.sleep(1)
+        hwsim_utils.test_connectivity(dev[0], hapd)
 
     # Check for Layer 2 Update frame and unexpected frames from the station
     # that did not fully complete authentication.
index 157b863d799a69aaa54cc1706e9fc629bb43309b..9e34e7b6df99134947ab9f43aa7598faa879f25f 100644 (file)
@@ -3437,45 +3437,41 @@ def test_ap_wpa2_psk_inject_assoc(dev, apdev, params):
     params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
     params["wpa_key_mgmt"] = "WPA-PSK"
     hapd = hostapd.add_ap(apdev[0], params)
-    wt = WlantestCapture(ifname, cap)
-    time.sleep(1)
-
-    bssid = hapd.own_addr().replace(':', '')
-
-    hapd.request("SET ext_mgmt_frame_handling 1")
-    addr = "021122334455"
-    auth = "b0003a01" + bssid + addr + bssid + '1000000001000000'
-    res = hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % auth)
-    if "OK" not in res:
-        raise Exception("MGMT_RX_PROCESS failed")
-    ev = hapd.wait_event(["MGMT-TX-STATUS"], timeout=5)
-    if ev is None:
-        raise Exception("No TX status seen")
-    ev = ev.replace("ok=0", "ok=1")
-    cmd = "MGMT_TX_STATUS_PROCESS %s" % (" ".join(ev.split(' ')[1:4]))
-    if "OK" not in hapd.request(cmd):
-        raise Exception("MGMT_TX_STATUS_PROCESS failed")
+    with WlantestCapture(ifname, cap):
+        bssid = hapd.own_addr().replace(':', '')
 
-    assoc = "00003a01" + bssid + addr + bssid + '2000' + '31040500' + '000474657374' + '010802040b160c121824' + '30140100000fac040100000fac040100000fac020000'
-    res = hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % assoc)
-    if "OK" not in res:
-        raise Exception("MGMT_RX_PROCESS failed")
-    ev = hapd.wait_event(["MGMT-TX-STATUS"], timeout=5)
-    if ev is None:
-        raise Exception("No TX status seen")
-    ev = ev.replace("ok=0", "ok=1")
-    cmd = "MGMT_TX_STATUS_PROCESS %s" % (" ".join(ev.split(' ')[1:4]))
-    if "OK" not in hapd.request(cmd):
-        raise Exception("MGMT_TX_STATUS_PROCESS failed")
-    hapd.request("SET ext_mgmt_frame_handling 0")
-
-    dev[0].connect(ssid, psk="12345678", scan_freq="2412")
-    hapd.wait_sta()
-    hwsim_utils.test_connectivity(dev[0], hapd)
-    time.sleep(1)
-    hwsim_utils.test_connectivity(dev[0], hapd)
-    time.sleep(0.5)
-    wt.close()
+        hapd.request("SET ext_mgmt_frame_handling 1")
+        addr = "021122334455"
+        auth = "b0003a01" + bssid + addr + bssid + '1000000001000000'
+        res = hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % auth)
+        if "OK" not in res:
+            raise Exception("MGMT_RX_PROCESS failed")
+        ev = hapd.wait_event(["MGMT-TX-STATUS"], timeout=5)
+        if ev is None:
+            raise Exception("No TX status seen")
+        ev = ev.replace("ok=0", "ok=1")
+        cmd = "MGMT_TX_STATUS_PROCESS %s" % (" ".join(ev.split(' ')[1:4]))
+        if "OK" not in hapd.request(cmd):
+            raise Exception("MGMT_TX_STATUS_PROCESS failed")
+
+        assoc = "00003a01" + bssid + addr + bssid + '2000' + '31040500' + '000474657374' + '010802040b160c121824' + '30140100000fac040100000fac040100000fac020000'
+        res = hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % assoc)
+        if "OK" not in res:
+            raise Exception("MGMT_RX_PROCESS failed")
+        ev = hapd.wait_event(["MGMT-TX-STATUS"], timeout=5)
+        if ev is None:
+            raise Exception("No TX status seen")
+        ev = ev.replace("ok=0", "ok=1")
+        cmd = "MGMT_TX_STATUS_PROCESS %s" % (" ".join(ev.split(' ')[1:4]))
+        if "OK" not in hapd.request(cmd):
+            raise Exception("MGMT_TX_STATUS_PROCESS failed")
+        hapd.request("SET ext_mgmt_frame_handling 0")
+
+        dev[0].connect(ssid, psk="12345678", scan_freq="2412")
+        hapd.wait_sta()
+        hwsim_utils.test_connectivity(dev[0], hapd)
+        time.sleep(1)
+        hwsim_utils.test_connectivity(dev[0], hapd)
     time.sleep(0.5)
 
     # Check for Layer 2 Update frame and unexpected frames from the station
index af8eb05406f3e6bf25d41995e9966d05a689fb7f..518983bd048096e84962b710553b8742122104af 100644 (file)
@@ -5781,8 +5781,12 @@ def run_dpp_controller_relay(dev, apdev, params, chirp=False, discover=False,
     check_dpp_capab(dev[1], min_ver=2)
     cap_lo = params['prefix'] + ".lo.pcap"
 
-    wt = WlantestCapture('lo', cap_lo)
+    with WlantestCapture('lo', cap_lo):
+        run_dpp_controller_relay2(dev, apdev, params, chirp, discover,
+                                  duplicate)
 
+def run_dpp_controller_relay2(dev, apdev, params, chirp=False, discover=False,
+                              duplicate=False):
     # Controller
     conf_id = dev[1].dpp_configurator_add()
     dev[1].set("dpp_configurator_params",
@@ -5892,9 +5896,6 @@ def run_dpp_controller_relay(dev, apdev, params, chirp=False, discover=False,
     dev[0].wait_connected()
     relay.wait_sta()
 
-    time.sleep(0.5)
-    wt.close()
-
 def test_dpp_controller_init_through_relay(dev, apdev, params):
     """DPP Controller initiating through Relay"""
     try:
@@ -5925,8 +5926,12 @@ def run_dpp_controller_init_through_relay(dev, apdev, params, dynamic=False,
     check_dpp_capab(dev[1], min_ver=2)
     cap_lo = os.path.join(params['prefix'], ".lo.pcap")
 
-    wt = WlantestCapture('lo', cap_lo)
+    with WlantestCapture('lo', cap_lo):
+        run_dpp_controller_init_through_relay2(dev, apdev, params, dynamic,
+                                               add)
 
+def run_dpp_controller_init_through_relay2(dev, apdev, params, dynamic=False,
+                                           add=False):
     # Controller
     conf_id = dev[1].dpp_configurator_add()
     dev[1].set("dpp_configurator_params",
@@ -6004,9 +6009,6 @@ def run_dpp_controller_init_through_relay(dev, apdev, params, dynamic=False,
     if add:
         relay.request("DPP_RELAY_REMOVE_CONTROLLER 127.0.0.1")
 
-    time.sleep(0.5)
-    wt.close()
-
 class MyTCPServer(TCPServer):
     def __init__(self, addr, handler):
         self.allow_reuse_address = True
@@ -6098,9 +6100,10 @@ def run_dpp_tcp(dev0, dev1, cap_lo, port=None, mutual=False):
     check_dpp_capab(dev0)
     check_dpp_capab(dev1)
 
-    wt = WlantestCapture('lo', cap_lo)
-    time.sleep(1)
+    with WlantestCapture('lo', cap_lo):
+        run_dpp_tcp2(dev0, dev1, cap_lo, port, mutual)
 
+def run_dpp_tcp2(dev0, dev1, cap_lo, port=None, mutual=False):
     # Controller
     conf_id = dev1.dpp_configurator_add()
     dev1.set("dpp_configurator_params",
@@ -6154,8 +6157,6 @@ def run_dpp_tcp(dev0, dev1, cap_lo, port=None, mutual=False):
     wait_auth_success(dev1, dev0, configurator=dev1, enrollee=dev0,
                       allow_enrollee_failure=True,
                       allow_configurator_failure=True)
-    time.sleep(0.5)
-    wt.close()
 
 def test_dpp_tcp_conf_init(dev, apdev, params):
     """DPP over TCP (Configurator initiates)"""
@@ -6175,9 +6176,10 @@ def run_dpp_tcp_conf_init(dev0, dev1, cap_lo, port=None, conf="sta-dpp"):
     check_dpp_capab(dev0, min_ver=2)
     check_dpp_capab(dev1, min_ver=2)
 
-    wt = WlantestCapture('lo', cap_lo)
-    time.sleep(1)
+    with WlantestCapture('lo', cap_lo):
+        run_dpp_tcp_conf_init2(dev0, dev1, cap_lo, port, conf)
 
+def run_dpp_tcp_conf_init2(dev0, dev1, cap_lo, port=None, conf="sta-dpp"):
     id_c = dev1.dpp_bootstrap_gen()
     uri_c = dev1.request("DPP_BOOTSTRAP_GET_URI %d" % id_c)
     res = dev1.request("DPP_BOOTSTRAP_INFO %d" % id_c)
@@ -6194,8 +6196,6 @@ def run_dpp_tcp_conf_init(dev0, dev1, cap_lo, port=None, conf="sta-dpp"):
     wait_auth_success(dev1, dev0, configurator=dev0, enrollee=dev1,
                       allow_enrollee_failure=True,
                       allow_configurator_failure=True)
-    time.sleep(0.5)
-    wt.close()
 
 def test_dpp_tcp_controller_management_hostapd(dev, apdev, params):
     """DPP Controller management in hostapd"""
@@ -7570,9 +7570,10 @@ def run_dpp_enterprise_tcp(dev, apdev, params):
 
     cap_lo = params['prefix'] + ".lo.pcap"
 
-    wt = WlantestCapture('lo', cap_lo)
-    time.sleep(1)
+    with WlantestCapture('lo', cap_lo) as wt:
+        _run_dpp_enterprise_tcp(dev, apdev, params, wt)
 
+def _run_dpp_enterprise_tcp(dev, apdev, params, wt):
     # Controller
     conf_id = dev[1].dpp_configurator_add()
     csrattrs = "MAsGCSqGSIb3DQEJBw=="
@@ -7641,9 +7642,6 @@ def run_dpp_enterprise_tcp_end(params, dev, wt):
     if "DPP-CONF-RECEIVED" not in ev:
         raise Exception("DPP configuration did not succeed (Enrollee)")
 
-    time.sleep(0.5)
-    wt.close()
-
 def test_dpp_enterprise_tcp2(dev, apdev, params):
     """DPP over TCP for enterprise provisioning (Controller initiating)"""
     if not openssl_imported:
@@ -7660,21 +7658,11 @@ def run_dpp_enterprise_tcp2(dev, apdev, params):
     check_dpp_capab(dev[1])
 
     cap_lo = params['prefix'] + ".lo.pcap"
-    cert_file = params['prefix'] + ".cert.pem"
-    pkcs7_file = params['prefix'] + ".pkcs7.der"
-
-    with open("auth_serv/ec-ca.pem", "rb") as f:
-        res = f.read()
-        cacert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
-                                                 res)
-
-    with open("auth_serv/ec-ca.key", "rb") as f:
-        res = f.read()
-        cakey = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, res)
 
-    wt = WlantestCapture('lo', cap_lo)
-    time.sleep(1)
+    with WlantestCapture('lo', cap_lo) as wt:
+        _run_dpp_enterprise_tcp2(dev, apdev, params, wt)
 
+def _run_dpp_enterprise_tcp2(dev, apdev, params, wt):
     # Client/Enrollee/Responder
     id_e = dev[0].dpp_bootstrap_gen()
     uri_e = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id_e)
index 0fde418ba0472277dfb52c20c6a711eeb366a7de..276ec64dad161926fc2931c2d8964fc4e48a4cd0 100644 (file)
@@ -59,9 +59,10 @@ def run_dpp_tcp_pkex(dev0, dev1, cap_lo, sae=False, status=False):
     check_dpp_capab(dev0, min_ver=3)
     check_dpp_capab(dev1, min_ver=3)
 
-    wt = WlantestCapture('lo', cap_lo)
-    time.sleep(1)
+    with WlantestCapture('lo', cap_lo):
+        run_dpp_tcp_pkex2(dev0, dev1, cap_lo, sae, status)
 
+def run_dpp_tcp_pkex2(dev0, dev1, cap_lo, sae=False, status=False):
     # Controller
     if sae:
         ssid = binascii.hexlify("sae".encode()).decode()
@@ -99,9 +100,6 @@ def run_dpp_tcp_pkex(dev0, dev1, cap_lo, sae=False, status=False):
         if 'wait_conn_status' not in res or not res['wait_conn_status']:
             raise Exception("wait_conn_status not reported")
 
-    time.sleep(0.5)
-    wt.close()
-
 def test_dpp_tcp_pkex(dev, apdev, params):
     """DPP/PKEXv2 over TCP"""
     prefix = "dpp_tcp_pkex"
@@ -221,8 +219,10 @@ def run_dpp_controller_relay_pkex(dev, apdev, params):
     prefix = "dpp_controller_relay_pkex"
     cap_lo = os.path.join(params['logdir'], prefix + ".lo.pcap")
 
-    wt = WlantestCapture('lo', cap_lo)
+    with WlantestCapture('lo', cap_lo):
+        run_dpp_controller_relay_pkex2(dev, apdev, params)
 
+def run_dpp_controller_relay_pkex2(dev, apdev, params):
     # Controller
     conf_id = dev[1].dpp_configurator_add()
     dev[1].set("dpp_configurator_params",
@@ -277,9 +277,6 @@ def run_dpp_controller_relay_pkex(dev, apdev, params):
     dev[0].wait_connected()
     dev[0].dump_monitor()
 
-    time.sleep(0.5)
-    wt.close()
-
 def dpp_pb_ap(apdev):
     params = {"ssid": "sae",
               "dpp_configurator_connectivity": "1",
index 16765d27a9de3a227434d1a092d0d5e13f723094..d15595b2beab1c9f485ee25a4ca3be527b83fdca 100644 (file)
@@ -259,12 +259,24 @@ class WlantestCapture:
         self.cmd = subprocess.Popen(args,
                                     stdout=subprocess.PIPE,
                                     stderr=subprocess.PIPE)
+        time.sleep(1)
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, type, value, traceback):
+        time.sleep(0.5)
+        self.close()
+        time.sleep(0.5)
 
     def __del__(self):
         if self.cmd:
+            print("WlantestCapture.__del__ needed to run close()")
             self.close()
 
     def close(self):
+        if not self.cmd:
+            return
         logger.debug("wlantest[%s] stopping" % self.ifname)
         self.cmd.terminate()
         res = self.cmd.communicate()