import logging
logger = logging.getLogger()
-def test_connectivity(ifname1, ifname2):
+def test_connectivity(ifname1, ifname2, dscp=None, tos=None):
if os.path.isfile("../../mac80211_hwsim/tools/hwsim_test"):
hwsim_test = "../../mac80211_hwsim/tools/hwsim_test"
else:
hwsim_test,
ifname1,
ifname2]
+ if dscp:
+ cmd.append('-D')
+ cmd.append(str(dscp))
+ elif tos:
+ cmd.append('-t')
+ cmd.append(str(tos))
try:
s = subprocess.check_output(cmd)
logger.debug(s)
logger.info(e.output)
raise
-def test_connectivity_p2p(dev1, dev2):
+def test_connectivity_p2p(dev1, dev2, dscp=None, tos=None):
ifname1 = dev1.group_ifname if dev1.group_ifname else dev1.ifname
ifname2 = dev2.group_ifname if dev2.group_ifname else dev2.ifname
- test_connectivity(ifname1, ifname2)
+ test_connectivity(ifname1, ifname2, dscp, tos)
-def test_connectivity_p2p_sta(dev1, dev2):
+def test_connectivity_p2p_sta(dev1, dev2, dscp=None, tos=None):
ifname1 = dev1.group_ifname if dev1.group_ifname else dev1.ifname
ifname2 = dev2.ifname
- test_connectivity(ifname1, ifname2)
+ test_connectivity(ifname1, ifname2, dscp, tos)
-def test_connectivity_sta(dev1, dev2):
+def test_connectivity_sta(dev1, dev2, dscp=None, tos=None):
ifname1 = dev1.ifname
ifname2 = dev2.ifname
- test_connectivity(ifname1, ifname2)
+ test_connectivity(ifname1, ifname2, dscp, tos)
import hwsim_utils
import hostapd
+from wlantest import Wlantest
+
+def check_qos_map(ap, dev, dscp, tid):
+ bssid = ap['bssid']
+ sta = dev.p2p_interface_addr()
+ wt = Wlantest()
+ wt.clear_sta_counters(bssid, sta)
+ hwsim_utils.test_connectivity(dev.ifname, ap['ifname'], dscp=dscp)
+ [ tx, rx ] = wt.get_tid_counters(bssid, sta)
+ if tx[tid] == 0:
+ logger.info("Expected TX DSCP " + str(dscp) + " with TID " + str(tid) + " but counters: " + str(tx))
+ raise Exception("No STA->AP data frame using the expected TID")
+ if rx[tid] == 0:
+ logger.info("Expected RX DSCP " + str(dscp) + " with TID " + str(tid) + " but counters: " + str(rx))
+ raise Exception("No AP->STA data frame using the expected TID")
def test_ap_qosmap(dev, apdev):
"""QoS mapping"""
return "skip"
ssid = "test-qosmap"
params = { "ssid": ssid }
- params['qos_map_set'] = '53,2,22,6,8,15,0,7,255,255,16,31,32,39,255,255,40,47,255,255'
+ params['qos_map_set'] = '53,2,22,6,8,15,0,7,255,255,16,31,32,39,255,255,40,47,48,55'
hostapd.add_ap(apdev[0]['ifname'], params)
dev[0].connect(ssid, key_mgmt="NONE", scan_freq="2412")
- hwsim_utils.test_connectivity(dev[0].ifname, apdev[0]['ifname'])
+ check_qos_map(apdev[0], dev[0], 53, 2)
+ check_qos_map(apdev[0], dev[0], 22, 6)
+ check_qos_map(apdev[0], dev[0], 8, 0)
+ check_qos_map(apdev[0], dev[0], 15, 0)
+ check_qos_map(apdev[0], dev[0], 0, 1)
+ check_qos_map(apdev[0], dev[0], 7, 1)
+ check_qos_map(apdev[0], dev[0], 16, 3)
+ check_qos_map(apdev[0], dev[0], 31, 3)
+ check_qos_map(apdev[0], dev[0], 32, 4)
+ check_qos_map(apdev[0], dev[0], 39, 4)
+ check_qos_map(apdev[0], dev[0], 40, 6)
+ check_qos_map(apdev[0], dev[0], 47, 6)
+ check_qos_map(apdev[0], dev[0], 48, 7)
+ check_qos_map(apdev[0], dev[0], 55, 7)
hapd = hostapd.Hostapd(apdev[0]['ifname'])
- hapd.request("SET_QOS_MAP_SET 22,6,8,15,0,7,255,255,16,31,32,39,255,255,40,47,255,255")
+ hapd.request("SET_QOS_MAP_SET 22,6,8,15,0,7,255,255,16,31,32,39,255,255,40,47,48,55")
hapd.request("SEND_QOS_MAP_CONF " + dev[0].get_status_field("address"))
- hwsim_utils.test_connectivity(dev[0].ifname, apdev[0]['ifname'])
+ check_qos_map(apdev[0], dev[0], 53, 7)
+ check_qos_map(apdev[0], dev[0], 22, 6)
+ check_qos_map(apdev[0], dev[0], 48, 7)
+ check_qos_map(apdev[0], dev[0], 55, 7)
+ check_qos_map(apdev[0], dev[0], 56, 56 >> 3)
+ check_qos_map(apdev[0], dev[0], 63, 63 >> 3)
+
+def test_ap_qosmap_default(dev, apdev):
+ """QoS mapping with default values"""
+ ssid = "test-qosmap-default"
+ params = { "ssid": ssid }
+ hostapd.add_ap(apdev[0]['ifname'], params)
+ dev[0].connect(ssid, key_mgmt="NONE", scan_freq="2412")
+ for dscp in [ 0, 7, 8, 15, 16, 23, 24, 31, 32, 39, 40, 47, 48, 55, 56, 63]:
+ check_qos_map(apdev[0], dev[0], dscp, dscp >> 3)
raise Exception("wlantest_cli command failed")
return int(res)
+ def clear_sta_counters(self, bssid, addr):
+ res = subprocess.check_output([self.wlantest_cli, "clear_sta_counters",
+ bssid, addr]);
+ if "FAIL" in res:
+ raise Exception("wlantest_cli command failed")
+
def tdls_clear(self, bssid, addr1, addr2):
res = subprocess.check_output([self.wlantest_cli, "clear_tdls_counters",
bssid, addr1, addr2]);
res = self.info_sta("key_mgmt", bssid, addr)
if key_mgmt not in res:
raise Exception("Unexpected STA key_mgmt")
+
+ def get_tx_tid(self, bssid, addr, tid):
+ res = subprocess.check_output([self.wlantest_cli, "get_tx_tid",
+ bssid, addr, str(tid)]);
+ if "FAIL" in res:
+ raise Exception("wlantest_cli command failed")
+ return int(res)
+
+ def get_rx_tid(self, bssid, addr, tid):
+ res = subprocess.check_output([self.wlantest_cli, "get_rx_tid",
+ bssid, addr, str(tid)]);
+ if "FAIL" in res:
+ raise Exception("wlantest_cli command failed")
+ return int(res)
+
+ def get_tid_counters(self, bssid, addr):
+ tx = {}
+ rx = {}
+ for tid in range(0, 17):
+ tx[tid] = self.get_tx_tid(bssid, addr, tid)
+ rx[tid] = self.get_rx_tid(bssid, addr, tid)
+ return [ tx, rx ]