In addition, convert many of the uses to do so.
Signed-off-by: Jouni Malinen <j@w1.fi>
cap = os.path.join(params['logdir'], prefix + "." + ifname + ".pcap")
hapd = hostapd.add_ap(apdev[0], {"ssid": "open"})
- wt = WlantestCapture(ifname, cap)
- time.sleep(1)
-
- dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")
- hapd.wait_sta()
- hwsim_utils.test_connectivity(dev[0], hapd)
- time.sleep(1)
- hwsim_utils.test_connectivity(dev[0], hapd)
- time.sleep(0.5)
- wt.close()
+ with WlantestCapture(ifname, cap):
+ dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")
+ hapd.wait_sta()
+ hwsim_utils.test_connectivity(dev[0], hapd)
+ time.sleep(1)
+ hwsim_utils.test_connectivity(dev[0], hapd)
# Check for Layer 2 Update frame and unexpected frames from the station
# that did not fully complete authentication.
params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
params["wpa_key_mgmt"] = "WPA-PSK"
hapd = hostapd.add_ap(apdev[0], params)
- wt = WlantestCapture(ifname, cap)
- time.sleep(1)
-
- bssid = hapd.own_addr().replace(':', '')
-
- hapd.request("SET ext_mgmt_frame_handling 1")
- addr = "021122334455"
- auth = "b0003a01" + bssid + addr + bssid + '1000000001000000'
- res = hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % auth)
- if "OK" not in res:
- raise Exception("MGMT_RX_PROCESS failed")
- ev = hapd.wait_event(["MGMT-TX-STATUS"], timeout=5)
- if ev is None:
- raise Exception("No TX status seen")
- ev = ev.replace("ok=0", "ok=1")
- cmd = "MGMT_TX_STATUS_PROCESS %s" % (" ".join(ev.split(' ')[1:4]))
- if "OK" not in hapd.request(cmd):
- raise Exception("MGMT_TX_STATUS_PROCESS failed")
+ with WlantestCapture(ifname, cap):
+ bssid = hapd.own_addr().replace(':', '')
- assoc = "00003a01" + bssid + addr + bssid + '2000' + '31040500' + '000474657374' + '010802040b160c121824' + '30140100000fac040100000fac040100000fac020000'
- res = hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % assoc)
- if "OK" not in res:
- raise Exception("MGMT_RX_PROCESS failed")
- ev = hapd.wait_event(["MGMT-TX-STATUS"], timeout=5)
- if ev is None:
- raise Exception("No TX status seen")
- ev = ev.replace("ok=0", "ok=1")
- cmd = "MGMT_TX_STATUS_PROCESS %s" % (" ".join(ev.split(' ')[1:4]))
- if "OK" not in hapd.request(cmd):
- raise Exception("MGMT_TX_STATUS_PROCESS failed")
- hapd.request("SET ext_mgmt_frame_handling 0")
-
- dev[0].connect(ssid, psk="12345678", scan_freq="2412")
- hapd.wait_sta()
- hwsim_utils.test_connectivity(dev[0], hapd)
- time.sleep(1)
- hwsim_utils.test_connectivity(dev[0], hapd)
- time.sleep(0.5)
- wt.close()
+ hapd.request("SET ext_mgmt_frame_handling 1")
+ addr = "021122334455"
+ auth = "b0003a01" + bssid + addr + bssid + '1000000001000000'
+ res = hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % auth)
+ if "OK" not in res:
+ raise Exception("MGMT_RX_PROCESS failed")
+ ev = hapd.wait_event(["MGMT-TX-STATUS"], timeout=5)
+ if ev is None:
+ raise Exception("No TX status seen")
+ ev = ev.replace("ok=0", "ok=1")
+ cmd = "MGMT_TX_STATUS_PROCESS %s" % (" ".join(ev.split(' ')[1:4]))
+ if "OK" not in hapd.request(cmd):
+ raise Exception("MGMT_TX_STATUS_PROCESS failed")
+
+ assoc = "00003a01" + bssid + addr + bssid + '2000' + '31040500' + '000474657374' + '010802040b160c121824' + '30140100000fac040100000fac040100000fac020000'
+ res = hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % assoc)
+ if "OK" not in res:
+ raise Exception("MGMT_RX_PROCESS failed")
+ ev = hapd.wait_event(["MGMT-TX-STATUS"], timeout=5)
+ if ev is None:
+ raise Exception("No TX status seen")
+ ev = ev.replace("ok=0", "ok=1")
+ cmd = "MGMT_TX_STATUS_PROCESS %s" % (" ".join(ev.split(' ')[1:4]))
+ if "OK" not in hapd.request(cmd):
+ raise Exception("MGMT_TX_STATUS_PROCESS failed")
+ hapd.request("SET ext_mgmt_frame_handling 0")
+
+ dev[0].connect(ssid, psk="12345678", scan_freq="2412")
+ hapd.wait_sta()
+ hwsim_utils.test_connectivity(dev[0], hapd)
+ time.sleep(1)
+ hwsim_utils.test_connectivity(dev[0], hapd)
time.sleep(0.5)
# Check for Layer 2 Update frame and unexpected frames from the station
check_dpp_capab(dev[1], min_ver=2)
cap_lo = params['prefix'] + ".lo.pcap"
- wt = WlantestCapture('lo', cap_lo)
+ with WlantestCapture('lo', cap_lo):
+ run_dpp_controller_relay2(dev, apdev, params, chirp, discover,
+ duplicate)
+def run_dpp_controller_relay2(dev, apdev, params, chirp=False, discover=False,
+ duplicate=False):
# Controller
conf_id = dev[1].dpp_configurator_add()
dev[1].set("dpp_configurator_params",
dev[0].wait_connected()
relay.wait_sta()
- time.sleep(0.5)
- wt.close()
-
def test_dpp_controller_init_through_relay(dev, apdev, params):
"""DPP Controller initiating through Relay"""
try:
check_dpp_capab(dev[1], min_ver=2)
cap_lo = os.path.join(params['prefix'], ".lo.pcap")
- wt = WlantestCapture('lo', cap_lo)
+ with WlantestCapture('lo', cap_lo):
+ run_dpp_controller_init_through_relay2(dev, apdev, params, dynamic,
+ add)
+def run_dpp_controller_init_through_relay2(dev, apdev, params, dynamic=False,
+ add=False):
# Controller
conf_id = dev[1].dpp_configurator_add()
dev[1].set("dpp_configurator_params",
if add:
relay.request("DPP_RELAY_REMOVE_CONTROLLER 127.0.0.1")
- time.sleep(0.5)
- wt.close()
-
class MyTCPServer(TCPServer):
def __init__(self, addr, handler):
self.allow_reuse_address = True
check_dpp_capab(dev0)
check_dpp_capab(dev1)
- wt = WlantestCapture('lo', cap_lo)
- time.sleep(1)
+ with WlantestCapture('lo', cap_lo):
+ run_dpp_tcp2(dev0, dev1, cap_lo, port, mutual)
+def run_dpp_tcp2(dev0, dev1, cap_lo, port=None, mutual=False):
# Controller
conf_id = dev1.dpp_configurator_add()
dev1.set("dpp_configurator_params",
wait_auth_success(dev1, dev0, configurator=dev1, enrollee=dev0,
allow_enrollee_failure=True,
allow_configurator_failure=True)
- time.sleep(0.5)
- wt.close()
def test_dpp_tcp_conf_init(dev, apdev, params):
"""DPP over TCP (Configurator initiates)"""
check_dpp_capab(dev0, min_ver=2)
check_dpp_capab(dev1, min_ver=2)
- wt = WlantestCapture('lo', cap_lo)
- time.sleep(1)
+ with WlantestCapture('lo', cap_lo):
+ run_dpp_tcp_conf_init2(dev0, dev1, cap_lo, port, conf)
+def run_dpp_tcp_conf_init2(dev0, dev1, cap_lo, port=None, conf="sta-dpp"):
id_c = dev1.dpp_bootstrap_gen()
uri_c = dev1.request("DPP_BOOTSTRAP_GET_URI %d" % id_c)
res = dev1.request("DPP_BOOTSTRAP_INFO %d" % id_c)
wait_auth_success(dev1, dev0, configurator=dev0, enrollee=dev1,
allow_enrollee_failure=True,
allow_configurator_failure=True)
- time.sleep(0.5)
- wt.close()
def test_dpp_tcp_controller_management_hostapd(dev, apdev, params):
"""DPP Controller management in hostapd"""
cap_lo = params['prefix'] + ".lo.pcap"
- wt = WlantestCapture('lo', cap_lo)
- time.sleep(1)
+ with WlantestCapture('lo', cap_lo) as wt:
+ _run_dpp_enterprise_tcp(dev, apdev, params, wt)
+def _run_dpp_enterprise_tcp(dev, apdev, params, wt):
# Controller
conf_id = dev[1].dpp_configurator_add()
csrattrs = "MAsGCSqGSIb3DQEJBw=="
if "DPP-CONF-RECEIVED" not in ev:
raise Exception("DPP configuration did not succeed (Enrollee)")
- time.sleep(0.5)
- wt.close()
-
def test_dpp_enterprise_tcp2(dev, apdev, params):
"""DPP over TCP for enterprise provisioning (Controller initiating)"""
if not openssl_imported:
check_dpp_capab(dev[1])
cap_lo = params['prefix'] + ".lo.pcap"
- cert_file = params['prefix'] + ".cert.pem"
- pkcs7_file = params['prefix'] + ".pkcs7.der"
-
- with open("auth_serv/ec-ca.pem", "rb") as f:
- res = f.read()
- cacert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
- res)
-
- with open("auth_serv/ec-ca.key", "rb") as f:
- res = f.read()
- cakey = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, res)
- wt = WlantestCapture('lo', cap_lo)
- time.sleep(1)
+ with WlantestCapture('lo', cap_lo) as wt:
+ _run_dpp_enterprise_tcp2(dev, apdev, params, wt)
+def _run_dpp_enterprise_tcp2(dev, apdev, params, wt):
# Client/Enrollee/Responder
id_e = dev[0].dpp_bootstrap_gen()
uri_e = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id_e)
check_dpp_capab(dev0, min_ver=3)
check_dpp_capab(dev1, min_ver=3)
- wt = WlantestCapture('lo', cap_lo)
- time.sleep(1)
+ with WlantestCapture('lo', cap_lo):
+ run_dpp_tcp_pkex2(dev0, dev1, cap_lo, sae, status)
+def run_dpp_tcp_pkex2(dev0, dev1, cap_lo, sae=False, status=False):
# Controller
if sae:
ssid = binascii.hexlify("sae".encode()).decode()
if 'wait_conn_status' not in res or not res['wait_conn_status']:
raise Exception("wait_conn_status not reported")
- time.sleep(0.5)
- wt.close()
-
def test_dpp_tcp_pkex(dev, apdev, params):
"""DPP/PKEXv2 over TCP"""
prefix = "dpp_tcp_pkex"
prefix = "dpp_controller_relay_pkex"
cap_lo = os.path.join(params['logdir'], prefix + ".lo.pcap")
- wt = WlantestCapture('lo', cap_lo)
+ with WlantestCapture('lo', cap_lo):
+ run_dpp_controller_relay_pkex2(dev, apdev, params)
+def run_dpp_controller_relay_pkex2(dev, apdev, params):
# Controller
conf_id = dev[1].dpp_configurator_add()
dev[1].set("dpp_configurator_params",
dev[0].wait_connected()
dev[0].dump_monitor()
- time.sleep(0.5)
- wt.close()
-
def dpp_pb_ap(apdev):
params = {"ssid": "sae",
"dpp_configurator_connectivity": "1",
self.cmd = subprocess.Popen(args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
+ time.sleep(1)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, type, value, traceback):
+ time.sleep(0.5)
+ self.close()
+ time.sleep(0.5)
def __del__(self):
if self.cmd:
+ print("WlantestCapture.__del__ needed to run close()")
self.close()
def close(self):
+ if not self.cmd:
+ return
logger.debug("wlantest[%s] stopping" % self.ifname)
self.cmd.terminate()
res = self.cmd.communicate()