#!/usr/bin/env python3
#
# Test case executor
-# Copyright (c) 2013-2015, Jouni Malinen <j@w1.fi>
+# Copyright (c) 2013-2019, Jouni Malinen <j@w1.fi>
#
# This software may be distributed under the terms of the BSD license.
# See README for more details.
pass
if wpas:
wpas.close_ctrl()
+ del wpas
try:
hapd = HostapdGlobal()
reset_ok = reset_devs(dev, apdev)
wpas = None
try:
- wpas = WpaSupplicant(global_iface="/tmp/wpas-wlan5")
+ wpas = WpaSupplicant(global_iface="/tmp/wpas-wlan5",
+ monitor=False)
rename_log(args.logdir, 'log5', name, wpas)
if not args.no_reset:
wpas.remove_ifname()
pass
if wpas:
wpas.close_ctrl()
+ del wpas
for i in range(0, 3):
rename_log(args.logdir, 'log' + str(i), name, dev[i])
# Test cases for MACsec/MKA
-# Copyright (c) 2018, Jouni Malinen <j@w1.fi>
+# Copyright (c) 2018-2019, Jouni Malinen <j@w1.fi>
#
# This software may be distributed under the terms of the BSD license.
# See README for more details.
from utils import HwsimSkip, alloc_fail, fail_test, wait_fail_trigger
def cleanup_macsec():
- wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
+ wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5', monitor=False)
wpas.interface_remove("veth0")
wpas.interface_remove("veth1")
+ del wpas
subprocess.call(["ip", "link", "del", "veth0"],
stderr=open('/dev/null', 'w'))
"""MACsec PSK - MKA life time"""
try:
run_macsec_psk(dev, apdev, params, "macsec_psk_mka_life_time")
- wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
+ wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5', monitor=False)
wpas.interface_remove("veth1")
+ del wpas
# Wait for live peer to be removed on veth0
time.sleep(6.1)
finally:
cmd[i].terminate()
def cleanup_macsec_br(count):
- wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
+ wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5', monitor=False)
for i in range(count):
wpas.interface_remove("veth%d" % i)
subprocess.call(["ip", "link", "del", "veth%d" % i],
logger.info("Data traffic test failed - ignore for now for >= 3 device cases")
for i in range(count):
- wpa[i].dump_monitor()
+ wpa[i].close_monitor()
for i in range(count):
+ wpa[0].close_control()
del wpa[0]
def test_macsec_psk_ns(dev, apdev, params):
if "2 packets transmitted, 2 received" not in res:
raise Exception("ping did not work")
+ wpas0.close_monitor()
wpas0.request("TERMINATE")
+ wpas0.close_control()
del wpas0
+ wpas1.close_monitor()
wpas1.request("TERMINATE")
+ wpas1.close_control()
del wpas1
time.sleep(1)
self.monitor = monitor
self.hostname = hostname
self.group_ifname = None
+ self.global_mon = None
+ self.global_ctrl = None
self.gctrl_mon = None
+ self.ctrl = None
+ self.mon = None
+ self.ifname = None
self.host = remotehost.Host(hostname, ifname)
self._group_dbg = None
if ifname:
self.p2p_dev_ifname = 'p2p-dev-' + self.ifname
else:
self.p2p_dev_ifname = ifname
- else:
- self.ifname = None
self.global_iface = global_iface
if global_iface:
- self.global_mon = None
if hostname != None:
self.global_ctrl = wpaspy.Ctrl(hostname, global_port)
if self.monitor:
self.global_dbg = ""
if self.monitor:
self.global_mon.attach()
- else:
- self.global_mon = None
+
+ def __del__(self):
+ self.close_monitor()
+ self.close_control()
+
+ def close_control_ctrl(self):
+ if self.ctrl:
+ del self.ctrl
+ self.ctrl = None
+
+ def close_control_global(self):
+ if self.global_ctrl:
+ del self.global_ctrl
+ self.global_ctrl = None
+
+ def close_control(self):
+ self.close_control_ctrl()
+ self.close_control_global()
+
+ def close_monitor_mon(self):
+ if not self.mon:
+ return
+ try:
+ while self.mon.pending():
+ ev = self.mon.recv()
+ logger.debug(self.dbg + ": " + ev)
+ except:
+ pass
+ try:
+ self.mon.detach()
+ except ConnectionRefusedError:
+ pass
+ del self.mon
+ self.mon = None
+
+ def close_monitor_global(self):
+ if not self.global_mon:
+ return
+ try:
+ while self.global_mon.pending():
+ ev = self.global_mon.recv()
+ logger.debug(self.global_dbg + ": " + ev)
+ except:
+ pass
+ try:
+ self.global_mon.detach()
+ except ConnectionRefusedError:
+ pass
+ del self.global_mon
+ self.global_mon = None
+
+ def close_monitor_group(self):
+ if not self.gctrl_mon:
+ return
+ try:
+ while self.gctrl_mon.pending():
+ ev = self.gctrl_mon.recv()
+ logger.debug(self.dbg + ": " + ev)
+ except:
+ pass
+ try:
+ self.gctrl_mon.detach()
+ except:
+ pass
+ del self.gctrl_mon
+ self.gctrl_mon = None
+
+ def close_monitor(self):
+ self.close_monitor_mon()
+ self.close_monitor_global()
+ self.close_monitor_group()
def cmd_execute(self, cmd_array, shell=False):
if self.hostname is None:
def terminate(self):
if self.global_mon:
- self.global_mon.detach()
- self.global_mon = None
+ self.close_monitor_global()
self.global_ctrl.terminate()
self.global_ctrl = None
def close_ctrl(self):
- if self.global_mon:
- self.global_mon.detach()
- self.global_mon = None
- self.global_ctrl = None
+ self.close_monitor_global()
+ self.close_control_global()
self.remove_ifname()
def set_ifname(self, ifname, hostname=None, port=9877):
+ self.remove_ifname()
self.ifname = ifname
if hostname != None:
self.ctrl = wpaspy.Ctrl(hostname, port)
self.mon.attach()
def remove_ifname(self):
- if self.ifname:
- self.mon.detach()
- self.mon = None
- self.ctrl = None
- self.ifname = None
+ self.close_monitor_mon()
+ self.close_control_ctrl()
+ self.ifname = None
def get_ctrl_iface_port(self, ifname):
if self.hostname is None:
self.global_request("REMOVE_NETWORK all")
self.global_request("SET p2p_no_group_iface 1")
self.global_request("P2P_FLUSH")
- if self.gctrl_mon:
- try:
- self.gctrl_mon.detach()
- except:
- pass
- self.gctrl_mon = None
+ self.close_monitor_group()
self.group_ifname = None
self.dump_monitor()
return self.wait_event(events, timeout)
def wait_go_ending_session(self):
- if self.gctrl_mon:
- try:
- self.gctrl_mon.detach()
- except:
- pass
- self.gctrl_mon = None
+ self.close_monitor_group()
timeout = 3 if self.hostname is None else 10
ev = self.wait_global_event(["P2P-GROUP-REMOVED"], timeout=timeout)
if ev is None:
return (count_iface, count_global)
def remove_group(self, ifname=None):
- if self.gctrl_mon:
- try:
- self.gctrl_mon.detach()
- except:
- pass
- self.gctrl_mon = None
+ self.close_monitor_group()
if ifname is None:
ifname = self.group_ifname if self.group_ifname else self.ifname
if "OK" not in self.global_request("P2P_GROUP_REMOVE " + ifname):