from tshark import run_tshark
from wlantest import Wlantest
from wpasupplicant import WpaSupplicant
+from wlantest import WlantestCapture
from test_ap_eap import check_eap_capa, check_domain_match_full
from test_gas import gas_rx, parse_gas, action_response, anqp_initial_resp, send_gas_resp, ACTION_CATEG_PUBLIC, GAS_INITIAL_RESPONSE
time.sleep(0.5)
cmd = {}
- cmd[0] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'ap-br0',
- '-w', cap_br, '-s', '2000'],
- stderr=open('/dev/null', 'w'))
- cmd[1] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', dev[0].ifname,
- '-w', cap_dev0, '-s', '2000'],
- stderr=open('/dev/null', 'w'))
- cmd[2] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', dev[1].ifname,
- '-w', cap_dev1, '-s', '2000'],
- stderr=open('/dev/null', 'w'))
- cmd[3] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', dev[2].ifname,
- '-w', cap_dev2, '-s', '2000'],
- stderr=open('/dev/null', 'w'))
+ cmd[0] = WlantestCapture('ap-br0', cap_br)
+ cmd[1] = WlantestCapture(dev[0].ifname, cap_dev0)
+ cmd[2] = WlantestCapture(dev[1].ifname, cap_dev1)
+ cmd[3] = WlantestCapture(dev[2].ifname, cap_dev2)
dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")
dev[1].connect("open", key_mgmt="NONE", scan_freq="2412")
dev[1].request("DISCONNECT")
time.sleep(1.5)
for i in range(len(cmd)):
- cmd[i].terminate()
+ cmd[i].close()
time.sleep(0.1)
macs = get_bridge_macs("ap-br0")
logger.info("After disconnect (showmacs): " + str(macs))
time.sleep(0.5)
cmd = {}
- cmd[0] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'ap-br0',
- '-w', cap_br, '-s', '2000'],
- stderr=open('/dev/null', 'w'))
- cmd[1] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', dev[0].ifname,
- '-w', cap_dev0, '-s', '2000'],
- stderr=open('/dev/null', 'w'))
- cmd[2] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', dev[1].ifname,
- '-w', cap_dev1, '-s', '2000'],
- stderr=open('/dev/null', 'w'))
- cmd[3] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', dev[2].ifname,
- '-w', cap_dev2, '-s', '2000'],
- stderr=open('/dev/null', 'w'))
+ cmd[0] = WlantestCapture('ap-br0', cap_br)
+ cmd[1] = WlantestCapture(dev[0].ifname, cap_dev0)
+ cmd[2] = WlantestCapture(dev[1].ifname, cap_dev1)
+ cmd[3] = WlantestCapture(dev[2].ifname, cap_dev2)
dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")
dev[1].connect("open", key_mgmt="NONE", scan_freq="2412")
dev[1].request("DISCONNECT")
time.sleep(0.5)
for i in range(len(cmd)):
- cmd[i].terminate()
+ cmd[i].close()
macs = get_bridge_macs("ap-br0")
logger.info("After disconnect (showmacs): " + str(macs))
matches = get_permanent_neighbors("ap-br0")
from tshark import run_tshark
from utils import *
from wpasupplicant import WpaSupplicant
+from wlantest import WlantestCapture
from test_ap_ht import set_world_reg
@remote_compatible
cap = os.path.join(params['logdir'], prefix + "." + ifname + ".pcap")
hapd = hostapd.add_ap(apdev[0], {"ssid": "open"})
- capture = subprocess.Popen(['tcpdump', '-p', '-U', '-i', ifname,
- '-w', cap, '-s', '2000'],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
+ wt = WlantestCapture(ifname, cap)
time.sleep(1)
dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")
time.sleep(1)
hwsim_utils.test_connectivity(dev[0], hapd)
time.sleep(0.5)
- capture.terminate()
- res = capture.communicate()
- logger.info("tcpdump stdout: " + res[0].decode())
- logger.info("tcpdump stderr: " + res[1].decode())
- time.sleep(0.5)
+ wt.close()
# Check for Layer 2 Update frame and unexpected frames from the station
# that did not fully complete authentication.
import hwsim_utils
from wpasupplicant import WpaSupplicant
from tshark import run_tshark
+from wlantest import WlantestCapture
def check_mib(dev, vals):
mib = dev.get_mib()
params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
params["wpa_key_mgmt"] = "WPA-PSK"
hapd = hostapd.add_ap(apdev[0], params)
- capture = subprocess.Popen(['tcpdump', '-p', '-U', '-i', ifname,
- '-w', cap, '-s', '2000'],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
+ wt = WlantestCapture(ifname, cap)
time.sleep(1)
bssid = hapd.own_addr().replace(':', '')
time.sleep(1)
hwsim_utils.test_connectivity(dev[0], hapd)
time.sleep(0.5)
- capture.terminate()
- res = capture.communicate()
- logger.info("tcpdump stdout: " + res[0].decode())
- logger.info("tcpdump stderr: " + res[1].decode())
+ wt.close()
time.sleep(0.5)
# Check for Layer 2 Update frame and unexpected frames from the station
from hwsim import HWSimRadio
from utils import HwsimSkip, alloc_fail, fail_test, wait_fail_trigger
from wpasupplicant import WpaSupplicant
+from wlantest import WlantestCapture
try:
import OpenSSL
prefix = "dpp_controller_relay"
cap_lo = os.path.join(params['logdir'], prefix + ".lo.pcap")
- cmd = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'lo',
- '-w', cap_lo, '-s', '2000'],
- stderr=open('/dev/null', 'w'))
+ wt = WlantestCapture('lo', cap_lo)
# Controller
conf_id = dev[1].dpp_configurator_add()
dev[0].wait_connected()
time.sleep(0.5)
- cmd.terminate()
+ wt.close()
def test_dpp_tcp(dev, apdev, params):
"""DPP over TCP"""
check_dpp_capab(dev[0])
check_dpp_capab(dev[1])
- cmd = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'lo',
- '-w', cap_lo, '-s', '2000'],
- stderr=open('/dev/null', 'w'))
+ wt = WlantestCapture('lo', cap_lo)
time.sleep(1)
# Controller
allow_enrollee_failure=True,
allow_configurator_failure=True)
time.sleep(0.5)
- cmd.terminate()
+ wt.close()
def test_dpp_tcp_controller_start_failure(dev, apdev, params):
"""DPP Controller startup failure"""
from wpasupplicant import WpaSupplicant
import hwsim_utils
from utils import HwsimSkip, alloc_fail, fail_test, wait_fail_trigger
+from wlantest import WlantestCapture
def cleanup_macsec():
wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5', monitor=False)
subprocess.check_call(["ip", "link", "set", "dev", "veth%d" % i, "up"])
cmd = {}
- cmd[0] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth0',
- '-w', cap_veth0, '-s', '2000',
- '--immediate-mode'],
- stderr=open('/dev/null', 'w'))
- cmd[1] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth1',
- '-w', cap_veth1, '-s', '2000',
- '--immediate-mode'],
- stderr=open('/dev/null', 'w'))
+ cmd[0] = WlantestCapture('veth0', cap_veth0)
+ cmd[1] = WlantestCapture('veth1', cap_veth1)
wpa = add_wpas_interfaces()
wpas0 = wpa[0]
if expect_failure:
for i in range(len(cmd)):
- cmd[i].terminate()
+ cmd[i].close()
return
- cmd[2] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname0,
- '-w', cap_macsec0, '-s', '2000',
- '--immediate-mode'],
- stderr=open('/dev/null', 'w'))
- cmd[3] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname1,
- '-w', cap_macsec1, '-s', '2000',
- '--immediate-mode'],
- stderr=open('/dev/null', 'w'))
+ cmd[2] = WlantestCapture(macsec_ifname0, cap_macsec0)
+ cmd[3] = WlantestCapture(macsec_ifname1, cap_macsec1)
time.sleep(0.5)
mi0 = wpas0.get_status_field("mi")
time.sleep(1)
for i in range(len(cmd)):
- cmd[i].terminate()
+ cmd[i].close()
def cleanup_macsec_br(count):
wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5', monitor=False)
"up"])
cmd = {}
- cmd[0] = subprocess.Popen(['ip', 'netns', 'exec', 'ns0',
- 'tcpdump', '-p', '-U', '-i', 'veth0',
- '-w', cap_veth0, '-s', '2000',
- '--immediate-mode'],
- stderr=open('/dev/null', 'w'))
- cmd[1] = subprocess.Popen(['ip', 'netns', 'exec', 'ns1',
- 'tcpdump', '-p', '-U', '-i', 'veth1',
- '-w', cap_veth1, '-s', '2000',
- '--immediate-mode'],
- stderr=open('/dev/null', 'w'))
+ cmd[0] = WlantestCapture('veth0', cap_veth0, netns='ns0')
+ cmd[1] = WlantestCapture('veth1', cap_veth1, netns='ns1')
write_conf(conffile + '0')
write_conf(conffile + '1', mka_priority=100)
break
time.sleep(1)
- cmd[2] = subprocess.Popen(['ip', 'netns', 'exec', 'ns0',
- 'tcpdump', '-p', '-U', '-i', macsec_ifname0,
- '-w', cap_macsec0, '-s', '2000',
- '--immediate-mode'],
- stderr=open('/dev/null', 'w'))
- cmd[3] = subprocess.Popen(['ip', 'netns', 'exec', 'ns0',
- 'tcpdump', '-p', '-U', '-i', macsec_ifname1,
- '-w', cap_macsec1, '-s', '2000',
- '--immediate-mode'],
- stderr=open('/dev/null', 'w'))
+ cmd[2] = WlantestCapture(macsec_ifname0, cap_macsec0, netns='ns0')
+ cmd[3] = WlantestCapture(macsec_ifname1, cap_macsec1, netns='ns0')
time.sleep(0.5)
logger.info("wpas0 STATUS:\n" + wpas0.request("STATUS"))
time.sleep(1)
for i in range(len(cmd)):
- cmd[i].terminate()
+ cmd[i].close()
def test_macsec_psk_fail_cp(dev, apdev):
"""MACsec PSK local failures in CP state machine"""
subprocess.check_call(["ip", "link", "set", "dev", "veth%d" % i, "up"])
cmd = {}
- cmd[0] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth0',
- '-w', cap_veth0, '-s', '2000',
- '--immediate-mode'],
- stderr=open('/dev/null', 'w'))
- cmd[1] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth1',
- '-w', cap_veth1, '-s', '2000',
- '--immediate-mode'],
- stderr=open('/dev/null', 'w'))
+ cmd[0] = WlantestCapture('veth0', cap_veth0)
+ cmd[1] = WlantestCapture('veth1', cap_veth1)
wpa = add_wpas_interfaces(count=1)
wpas0 = wpa[0]
if expect_failure:
for i in range(len(cmd)):
- cmd[i].terminate()
+ cmd[i].close()
return
macsec_ifname0 = wpas0.get_driver_status_field("parent_ifname")
macsec_ifname1 = hapd.get_driver_status_field("parent_ifname")
- cmd[2] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname0,
- '-w', cap_macsec0, '-s', '2000',
- '--immediate-mode'],
- stderr=open('/dev/null', 'w'))
- cmd[3] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname1,
- '-w', cap_macsec1, '-s', '2000',
- '--immediate-mode'],
- stderr=open('/dev/null', 'w'))
+ cmd[2] = WlantestCapture(macsec_ifname0, cap_macsec0)
+ cmd[3] = WlantestCapture(macsec_ifname1, cap_macsec1)
time.sleep(0.5)
logger.info("wpas0 MIB:\n" + wpas0.request("MIB"))
time.sleep(1)
for i in range(len(cmd)):
- cmd[i].terminate()
+ cmd[i].close()
def test_macsec_hostapd_eap(dev, apdev, params):
"""MACsec EAP with hostapd"""
subprocess.check_call(["ip", "link", "set", "dev", "veth%d" % i, "up"])
cmd = {}
- cmd[0] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth0',
- '-w', cap_veth0, '-s', '2000',
- '--immediate-mode'],
- stderr=open('/dev/null', 'w'))
- cmd[1] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth1',
- '-w', cap_veth1, '-s', '2000',
- '--immediate-mode'],
- stderr=open('/dev/null', 'w'))
+ cmd[0] = WlantestCapture('veth0', cap_veth0)
+ cmd[1] = WlantestCapture('veth1', cap_veth1)
wpa = add_wpas_interfaces(count=1)
wpas0 = wpa[0]
if expect_failure:
for i in range(len(cmd)):
- cmd[i].terminate()
+ cmd[i].close()
return
macsec_ifname0 = wpas0.get_driver_status_field("parent_ifname")
macsec_ifname1 = hapd.get_driver_status_field("parent_ifname")
- cmd[2] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname0,
- '-w', cap_macsec0, '-s', '2000',
- '--immediate-mode'],
- stderr=open('/dev/null', 'w'))
- cmd[3] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname1,
- '-w', cap_macsec1, '-s', '2000',
- '--immediate-mode'],
- stderr=open('/dev/null', 'w'))
+ cmd[2] = WlantestCapture(macsec_ifname0, cap_macsec0)
+ cmd[3] = WlantestCapture(macsec_ifname1, cap_macsec1)
time.sleep(0.5)
logger.info("wpas0 MIB:\n" + wpas0.request("MIB"))
time.sleep(1)
for i in range(len(cmd)):
- cmd[i].terminate()
+ cmd[i].close()
# Python class for controlling wlantest
-# Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
+# Copyright (c) 2013-2019, Jouni Malinen <j@w1.fi>
#
# This software may be distributed under the terms of the BSD license.
# See README for more details.
tx[tid] = self.get_tx_tid(bssid, addr, tid)
rx[tid] = self.get_rx_tid(bssid, addr, tid)
return [tx, rx]
+
+class WlantestCapture:
+ def __init__(self, ifname, output, netns=None):
+ self.cmd = None
+ self.ifname = ifname
+ if os.path.isfile('../../wlantest/wlantest'):
+ bin = '../../wlantest/wlantest'
+ else:
+ bin = 'wlantest'
+ logger.debug("wlantest[%s] starting" % ifname)
+ args = [bin, '-e', '-i', ifname, '-w', output]
+ if netns:
+ args = ['ip', 'netns', 'exec', netns] + args
+ self.cmd = subprocess.Popen(args,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+
+ def __del__(self):
+ if self.cmd:
+ self.close()
+
+ def close(self):
+ logger.debug("wlantest[%s] stopping" % self.ifname)
+ self.cmd.terminate()
+ res = self.cmd.communicate()
+ if len(res[0]) > 0:
+ logger.debug("wlantest[%s] stdout: %s" % (self.ifname,
+ res[0].decode().strip()))
+ if len(res[1]) > 0:
+ logger.debug("wlantest[%s] stderr: %s" % (self.ifname,
+ res[1].decode().strip()))
+ self.cmd = None