]> git.ipfire.org Git - thirdparty/hostap.git/commitdiff
tests: Replace tcpdump with wlantest
authorJouni Malinen <j@w1.fi>
Fri, 27 Dec 2019 21:17:43 +0000 (23:17 +0200)
committerJouni Malinen <j@w1.fi>
Fri, 27 Dec 2019 21:43:09 +0000 (23:43 +0200)
This removes dependency on tcpdump by using an already included test
tool for capturing frames with Ethernet headers. There were some issues
in getting tcpdump working on Ubuntu 19.10, so this seems to be a clean
way of addressing that.

Signed-off-by: Jouni Malinen <j@w1.fi>
tests/hwsim/test_ap_hs20.py
tests/hwsim/test_ap_open.py
tests/hwsim/test_ap_psk.py
tests/hwsim/test_dpp.py
tests/hwsim/test_macsec.py
tests/hwsim/wlantest.py

index 1c4c0fa33dc8661af8bdc5610330bf7888491f00..c33e0dcae09ddf7db8c99d10192c1f93ff4a94c1 100644 (file)
@@ -22,6 +22,7 @@ import hwsim_utils
 from tshark import run_tshark
 from wlantest import Wlantest
 from wpasupplicant import WpaSupplicant
+from wlantest import WlantestCapture
 from test_ap_eap import check_eap_capa, check_domain_match_full
 from test_gas import gas_rx, parse_gas, action_response, anqp_initial_resp, send_gas_resp, ACTION_CATEG_PUBLIC, GAS_INITIAL_RESPONSE
 
@@ -4671,18 +4672,10 @@ def _test_proxyarp_open(dev, apdev, params, ebtables=False):
 
     time.sleep(0.5)
     cmd = {}
-    cmd[0] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'ap-br0',
-                               '-w', cap_br, '-s', '2000'],
-                              stderr=open('/dev/null', 'w'))
-    cmd[1] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', dev[0].ifname,
-                               '-w', cap_dev0, '-s', '2000'],
-                              stderr=open('/dev/null', 'w'))
-    cmd[2] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', dev[1].ifname,
-                               '-w', cap_dev1, '-s', '2000'],
-                              stderr=open('/dev/null', 'w'))
-    cmd[3] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', dev[2].ifname,
-                               '-w', cap_dev2, '-s', '2000'],
-                              stderr=open('/dev/null', 'w'))
+    cmd[0] = WlantestCapture('ap-br0', cap_br)
+    cmd[1] = WlantestCapture(dev[0].ifname, cap_dev0)
+    cmd[2] = WlantestCapture(dev[1].ifname, cap_dev1)
+    cmd[3] = WlantestCapture(dev[2].ifname, cap_dev2)
 
     dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")
     dev[1].connect("open", key_mgmt="NONE", scan_freq="2412")
@@ -4875,7 +4868,7 @@ def _test_proxyarp_open(dev, apdev, params, ebtables=False):
     dev[1].request("DISCONNECT")
     time.sleep(1.5)
     for i in range(len(cmd)):
-        cmd[i].terminate()
+        cmd[i].close()
     time.sleep(0.1)
     macs = get_bridge_macs("ap-br0")
     logger.info("After disconnect (showmacs): " + str(macs))
@@ -5019,18 +5012,10 @@ def _test_proxyarp_open_ipv6(dev, apdev, params, ebtables=False):
 
     time.sleep(0.5)
     cmd = {}
-    cmd[0] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'ap-br0',
-                               '-w', cap_br, '-s', '2000'],
-                              stderr=open('/dev/null', 'w'))
-    cmd[1] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', dev[0].ifname,
-                               '-w', cap_dev0, '-s', '2000'],
-                              stderr=open('/dev/null', 'w'))
-    cmd[2] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', dev[1].ifname,
-                               '-w', cap_dev1, '-s', '2000'],
-                              stderr=open('/dev/null', 'w'))
-    cmd[3] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', dev[2].ifname,
-                               '-w', cap_dev2, '-s', '2000'],
-                              stderr=open('/dev/null', 'w'))
+    cmd[0] = WlantestCapture('ap-br0', cap_br)
+    cmd[1] = WlantestCapture(dev[0].ifname, cap_dev0)
+    cmd[2] = WlantestCapture(dev[1].ifname, cap_dev1)
+    cmd[3] = WlantestCapture(dev[2].ifname, cap_dev2)
 
     dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")
     dev[1].connect("open", key_mgmt="NONE", scan_freq="2412")
@@ -5134,7 +5119,7 @@ def _test_proxyarp_open_ipv6(dev, apdev, params, ebtables=False):
     dev[1].request("DISCONNECT")
     time.sleep(0.5)
     for i in range(len(cmd)):
-        cmd[i].terminate()
+        cmd[i].close()
     macs = get_bridge_macs("ap-br0")
     logger.info("After disconnect (showmacs): " + str(macs))
     matches = get_permanent_neighbors("ap-br0")
index dd479d833f80406a2f21469c30f8c97d22601dcb..0d83438c4583ebdc270ebbd7b258c1353652321c 100644 (file)
@@ -17,6 +17,7 @@ import hwsim_utils
 from tshark import run_tshark
 from utils import *
 from wpasupplicant import WpaSupplicant
+from wlantest import WlantestCapture
 from test_ap_ht import set_world_reg
 
 @remote_compatible
@@ -911,10 +912,7 @@ def test_ap_open_layer_2_update(dev, apdev, params):
     cap = os.path.join(params['logdir'], prefix + "." + ifname + ".pcap")
 
     hapd = hostapd.add_ap(apdev[0], {"ssid": "open"})
-    capture = subprocess.Popen(['tcpdump', '-p', '-U', '-i', ifname,
-                                '-w', cap, '-s', '2000'],
-                               stdout=subprocess.PIPE,
-                               stderr=subprocess.PIPE)
+    wt = WlantestCapture(ifname, cap)
     time.sleep(1)
 
     dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")
@@ -923,11 +921,7 @@ def test_ap_open_layer_2_update(dev, apdev, params):
     time.sleep(1)
     hwsim_utils.test_connectivity(dev[0], hapd)
     time.sleep(0.5)
-    capture.terminate()
-    res = capture.communicate()
-    logger.info("tcpdump stdout: " + res[0].decode())
-    logger.info("tcpdump stderr: " + res[1].decode())
-    time.sleep(0.5)
+    wt.close()
 
     # Check for Layer 2 Update frame and unexpected frames from the station
     # that did not fully complete authentication.
index d32b11cbf64c23d91d7fdcf860cf3e4b3e767c33..472264c69011ff3d137baadcfd07ff291b8a7421 100644 (file)
@@ -23,6 +23,7 @@ from utils import HwsimSkip, fail_test, skip_with_fips, start_monitor, stop_moni
 import hwsim_utils
 from wpasupplicant import WpaSupplicant
 from tshark import run_tshark
+from wlantest import WlantestCapture
 
 def check_mib(dev, vals):
     mib = dev.get_mib()
@@ -3219,10 +3220,7 @@ def test_ap_wpa2_psk_inject_assoc(dev, apdev, params):
     params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
     params["wpa_key_mgmt"] = "WPA-PSK"
     hapd = hostapd.add_ap(apdev[0], params)
-    capture = subprocess.Popen(['tcpdump', '-p', '-U', '-i', ifname,
-                                '-w', cap, '-s', '2000'],
-                               stdout=subprocess.PIPE,
-                               stderr=subprocess.PIPE)
+    wt = WlantestCapture(ifname, cap)
     time.sleep(1)
 
     bssid = hapd.own_addr().replace(':', '')
@@ -3260,10 +3258,7 @@ def test_ap_wpa2_psk_inject_assoc(dev, apdev, params):
     time.sleep(1)
     hwsim_utils.test_connectivity(dev[0], hapd)
     time.sleep(0.5)
-    capture.terminate()
-    res = capture.communicate()
-    logger.info("tcpdump stdout: " + res[0].decode())
-    logger.info("tcpdump stderr: " + res[1].decode())
+    wt.close()
     time.sleep(0.5)
 
     # Check for Layer 2 Update frame and unexpected frames from the station
index 2e87c156451cc111b2e726f8c6c9ec86ea8c07f7..7527b6c145194beda83f4a3067f78e0d45f45750 100644 (file)
@@ -21,6 +21,7 @@ import hwsim_utils
 from hwsim import HWSimRadio
 from utils import HwsimSkip, alloc_fail, fail_test, wait_fail_trigger
 from wpasupplicant import WpaSupplicant
+from wlantest import WlantestCapture
 
 try:
     import OpenSSL
@@ -4480,9 +4481,7 @@ def run_dpp_controller_relay(dev, apdev, params):
     prefix = "dpp_controller_relay"
     cap_lo = os.path.join(params['logdir'], prefix + ".lo.pcap")
 
-    cmd = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'lo',
-                            '-w', cap_lo, '-s', '2000'],
-                           stderr=open('/dev/null', 'w'))
+    wt = WlantestCapture('lo', cap_lo)
 
     # Controller
     conf_id = dev[1].dpp_configurator_add()
@@ -4526,7 +4525,7 @@ def run_dpp_controller_relay(dev, apdev, params):
     dev[0].wait_connected()
 
     time.sleep(0.5)
-    cmd.terminate()
+    wt.close()
 
 def test_dpp_tcp(dev, apdev, params):
     """DPP over TCP"""
@@ -4550,9 +4549,7 @@ def run_dpp_tcp(dev, apdev, cap_lo, port=None):
     check_dpp_capab(dev[0])
     check_dpp_capab(dev[1])
 
-    cmd = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'lo',
-                            '-w', cap_lo, '-s', '2000'],
-                           stderr=open('/dev/null', 'w'))
+    wt = WlantestCapture('lo', cap_lo)
     time.sleep(1)
 
     # Controller
@@ -4583,7 +4580,7 @@ def run_dpp_tcp(dev, apdev, cap_lo, port=None):
                       allow_enrollee_failure=True,
                       allow_configurator_failure=True)
     time.sleep(0.5)
-    cmd.terminate()
+    wt.close()
 
 def test_dpp_tcp_controller_start_failure(dev, apdev, params):
     """DPP Controller startup failure"""
index 8ef4fec6165bbdce96a6c365b511f9d804264335..e521c6b3d337096fc0a09ca46a71a17136082599 100644 (file)
@@ -16,6 +16,7 @@ import hostapd
 from wpasupplicant import WpaSupplicant
 import hwsim_utils
 from utils import HwsimSkip, alloc_fail, fail_test, wait_fail_trigger
+from wlantest import WlantestCapture
 
 def cleanup_macsec():
     wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5', monitor=False)
@@ -275,14 +276,8 @@ def run_macsec_psk(dev, apdev, params, prefix, integ_only=False, port0=None,
         subprocess.check_call(["ip", "link", "set", "dev", "veth%d" % i, "up"])
 
     cmd = {}
-    cmd[0] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth0',
-                               '-w', cap_veth0, '-s', '2000',
-                               '--immediate-mode'],
-                              stderr=open('/dev/null', 'w'))
-    cmd[1] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth1',
-                               '-w', cap_veth1, '-s', '2000',
-                               '--immediate-mode'],
-                              stderr=open('/dev/null', 'w'))
+    cmd[0] = WlantestCapture('veth0', cap_veth0)
+    cmd[1] = WlantestCapture('veth1', cap_veth1)
 
     wpa = add_wpas_interfaces()
     wpas0 = wpa[0]
@@ -307,17 +302,11 @@ def run_macsec_psk(dev, apdev, params, prefix, integ_only=False, port0=None,
 
     if expect_failure:
         for i in range(len(cmd)):
-            cmd[i].terminate()
+            cmd[i].close()
         return
 
-    cmd[2] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname0,
-                               '-w', cap_macsec0, '-s', '2000',
-                               '--immediate-mode'],
-                              stderr=open('/dev/null', 'w'))
-    cmd[3] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname1,
-                               '-w', cap_macsec1, '-s', '2000',
-                               '--immediate-mode'],
-                              stderr=open('/dev/null', 'w'))
+    cmd[2] = WlantestCapture(macsec_ifname0, cap_macsec0)
+    cmd[3] = WlantestCapture(macsec_ifname1, cap_macsec1)
     time.sleep(0.5)
 
     mi0 = wpas0.get_status_field("mi")
@@ -353,7 +342,7 @@ def run_macsec_psk(dev, apdev, params, prefix, integ_only=False, port0=None,
 
     time.sleep(1)
     for i in range(len(cmd)):
-        cmd[i].terminate()
+        cmd[i].close()
 
 def cleanup_macsec_br(count):
     wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5', monitor=False)
@@ -591,16 +580,8 @@ def run_macsec_psk_ns(dev, apdev, params):
                                "up"])
 
     cmd = {}
-    cmd[0] = subprocess.Popen(['ip', 'netns', 'exec', 'ns0',
-                               'tcpdump', '-p', '-U', '-i', 'veth0',
-                               '-w', cap_veth0, '-s', '2000',
-                               '--immediate-mode'],
-                              stderr=open('/dev/null', 'w'))
-    cmd[1] = subprocess.Popen(['ip', 'netns', 'exec', 'ns1',
-                               'tcpdump', '-p', '-U', '-i', 'veth1',
-                               '-w', cap_veth1, '-s', '2000',
-                               '--immediate-mode'],
-                              stderr=open('/dev/null', 'w'))
+    cmd[0] = WlantestCapture('veth0', cap_veth0, netns='ns0')
+    cmd[1] = WlantestCapture('veth1', cap_veth1, netns='ns1')
 
     write_conf(conffile + '0')
     write_conf(conffile + '1', mka_priority=100)
@@ -661,16 +642,8 @@ def run_macsec_psk_ns(dev, apdev, params):
             break
         time.sleep(1)
 
-    cmd[2] = subprocess.Popen(['ip', 'netns', 'exec', 'ns0',
-                               'tcpdump', '-p', '-U', '-i', macsec_ifname0,
-                               '-w', cap_macsec0, '-s', '2000',
-                               '--immediate-mode'],
-                              stderr=open('/dev/null', 'w'))
-    cmd[3] = subprocess.Popen(['ip', 'netns', 'exec', 'ns0',
-                               'tcpdump', '-p', '-U', '-i', macsec_ifname1,
-                               '-w', cap_macsec1, '-s', '2000',
-                               '--immediate-mode'],
-                              stderr=open('/dev/null', 'w'))
+    cmd[2] = WlantestCapture(macsec_ifname0, cap_macsec0, netns='ns0')
+    cmd[3] = WlantestCapture(macsec_ifname1, cap_macsec1, netns='ns0')
     time.sleep(0.5)
 
     logger.info("wpas0 STATUS:\n" + wpas0.request("STATUS"))
@@ -708,7 +681,7 @@ def run_macsec_psk_ns(dev, apdev, params):
 
     time.sleep(1)
     for i in range(len(cmd)):
-        cmd[i].terminate()
+        cmd[i].close()
 
 def test_macsec_psk_fail_cp(dev, apdev):
     """MACsec PSK local failures in CP state machine"""
@@ -769,14 +742,8 @@ def run_macsec_hostapd_psk(dev, apdev, params, prefix, integ_only=False,
         subprocess.check_call(["ip", "link", "set", "dev", "veth%d" % i, "up"])
 
     cmd = {}
-    cmd[0] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth0',
-                               '-w', cap_veth0, '-s', '2000',
-                               '--immediate-mode'],
-                              stderr=open('/dev/null', 'w'))
-    cmd[1] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth1',
-                               '-w', cap_veth1, '-s', '2000',
-                               '--immediate-mode'],
-                              stderr=open('/dev/null', 'w'))
+    cmd[0] = WlantestCapture('veth0', cap_veth0)
+    cmd[1] = WlantestCapture('veth1', cap_veth1)
 
     wpa = add_wpas_interfaces(count=1)
     wpas0 = wpa[0]
@@ -816,20 +783,14 @@ def run_macsec_hostapd_psk(dev, apdev, params, prefix, integ_only=False,
 
     if expect_failure:
         for i in range(len(cmd)):
-            cmd[i].terminate()
+            cmd[i].close()
         return
 
     macsec_ifname0 = wpas0.get_driver_status_field("parent_ifname")
     macsec_ifname1 = hapd.get_driver_status_field("parent_ifname")
 
-    cmd[2] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname0,
-                               '-w', cap_macsec0, '-s', '2000',
-                               '--immediate-mode'],
-                              stderr=open('/dev/null', 'w'))
-    cmd[3] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname1,
-                               '-w', cap_macsec1, '-s', '2000',
-                               '--immediate-mode'],
-                              stderr=open('/dev/null', 'w'))
+    cmd[2] = WlantestCapture(macsec_ifname0, cap_macsec0)
+    cmd[3] = WlantestCapture(macsec_ifname1, cap_macsec1)
     time.sleep(0.5)
 
     logger.info("wpas0 MIB:\n" +  wpas0.request("MIB"))
@@ -843,7 +804,7 @@ def run_macsec_hostapd_psk(dev, apdev, params, prefix, integ_only=False,
 
     time.sleep(1)
     for i in range(len(cmd)):
-        cmd[i].terminate()
+        cmd[i].close()
 
 def test_macsec_hostapd_eap(dev, apdev, params):
     """MACsec EAP with hostapd"""
@@ -865,14 +826,8 @@ def run_macsec_hostapd_eap(dev, apdev, params, prefix, integ_only=False,
         subprocess.check_call(["ip", "link", "set", "dev", "veth%d" % i, "up"])
 
     cmd = {}
-    cmd[0] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth0',
-                               '-w', cap_veth0, '-s', '2000',
-                               '--immediate-mode'],
-                              stderr=open('/dev/null', 'w'))
-    cmd[1] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', 'veth1',
-                               '-w', cap_veth1, '-s', '2000',
-                               '--immediate-mode'],
-                              stderr=open('/dev/null', 'w'))
+    cmd[0] = WlantestCapture('veth0', cap_veth0)
+    cmd[1] = WlantestCapture('veth1', cap_veth1)
 
     wpa = add_wpas_interfaces(count=1)
     wpas0 = wpa[0]
@@ -911,20 +866,14 @@ def run_macsec_hostapd_eap(dev, apdev, params, prefix, integ_only=False,
 
     if expect_failure:
         for i in range(len(cmd)):
-            cmd[i].terminate()
+            cmd[i].close()
         return
 
     macsec_ifname0 = wpas0.get_driver_status_field("parent_ifname")
     macsec_ifname1 = hapd.get_driver_status_field("parent_ifname")
 
-    cmd[2] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname0,
-                               '-w', cap_macsec0, '-s', '2000',
-                               '--immediate-mode'],
-                              stderr=open('/dev/null', 'w'))
-    cmd[3] = subprocess.Popen(['tcpdump', '-p', '-U', '-i', macsec_ifname1,
-                               '-w', cap_macsec1, '-s', '2000',
-                               '--immediate-mode'],
-                              stderr=open('/dev/null', 'w'))
+    cmd[2] = WlantestCapture(macsec_ifname0, cap_macsec0)
+    cmd[3] = WlantestCapture(macsec_ifname1, cap_macsec1)
     time.sleep(0.5)
 
     logger.info("wpas0 MIB:\n" +  wpas0.request("MIB"))
@@ -938,4 +887,4 @@ def run_macsec_hostapd_eap(dev, apdev, params, prefix, integ_only=False,
 
     time.sleep(1)
     for i in range(len(cmd)):
-        cmd[i].terminate()
+        cmd[i].close()
index 5bb268f85553fa12754309210a2ab06ef463d84c..b0e817913417a47803433eb3059080da34132356 100644 (file)
@@ -1,5 +1,5 @@
 # Python class for controlling wlantest
-# Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
+# Copyright (c) 2013-2019, Jouni Malinen <j@w1.fi>
 #
 # This software may be distributed under the terms of the BSD license.
 # See README for more details.
@@ -242,3 +242,35 @@ class Wlantest:
             tx[tid] = self.get_tx_tid(bssid, addr, tid)
             rx[tid] = self.get_rx_tid(bssid, addr, tid)
         return [tx, rx]
+
+class WlantestCapture:
+    def __init__(self, ifname, output, netns=None):
+        self.cmd = None
+        self.ifname = ifname
+        if os.path.isfile('../../wlantest/wlantest'):
+            bin = '../../wlantest/wlantest'
+        else:
+            bin = 'wlantest'
+        logger.debug("wlantest[%s] starting" % ifname)
+        args = [bin, '-e', '-i', ifname, '-w', output]
+        if netns:
+            args = ['ip', 'netns', 'exec', netns] + args
+        self.cmd = subprocess.Popen(args,
+                                    stdout=subprocess.PIPE,
+                                    stderr=subprocess.PIPE)
+
+    def __del__(self):
+        if self.cmd:
+            self.close()
+
+    def close(self):
+        logger.debug("wlantest[%s] stopping" % self.ifname)
+        self.cmd.terminate()
+        res = self.cmd.communicate()
+        if len(res[0]) > 0:
+            logger.debug("wlantest[%s] stdout: %s" % (self.ifname,
+                                                      res[0].decode().strip()))
+        if len(res[1]) > 0:
+            logger.debug("wlantest[%s] stderr: %s" % (self.ifname,
+                                                      res[1].decode().strip()))
+        self.cmd = None