#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-2.0
+"""
+Tests related to standard netdevice statistics.
+"""
+
import errno
import subprocess
import time
from lib.py import ksft_run, ksft_exit, ksft_pr
from lib.py import ksft_ge, ksft_eq, ksft_is, ksft_in, ksft_lt, ksft_true, ksft_raises
-from lib.py import KsftSkipEx, KsftXfailEx
+from lib.py import KsftSkipEx, KsftXfailEx, KsftFailEx
from lib.py import ksft_disruptive
from lib.py import EthtoolFamily, NetdevFamily, RtnlFamily, NlError
from lib.py import NetDrvEnv
def check_pause(cfg) -> None:
- global ethnl
+ """
+ Check that drivers which support Pause config also report standard
+ pause stats.
+ """
try:
ethnl.pause_get({"header": {"dev-index": cfg.ifindex}})
except NlError as e:
if e.error == errno.EOPNOTSUPP:
- raise KsftXfailEx("pause not supported by the device")
+ raise KsftXfailEx("pause not supported by the device") from e
raise
data = ethnl.pause_get({"header": {"dev-index": cfg.ifindex,
def check_fec(cfg) -> None:
- global ethnl
+ """
+ Check that drivers which support FEC config also report standard
+ FEC stats.
+ """
try:
ethnl.fec_get({"header": {"dev-index": cfg.ifindex}})
except NlError as e:
if e.error == errno.EOPNOTSUPP:
- raise KsftXfailEx("FEC not supported by the device")
+ raise KsftXfailEx("FEC not supported by the device") from e
raise
data = ethnl.fec_get({"header": {"dev-index": cfg.ifindex,
def pkt_byte_sum(cfg) -> None:
- global netfam, rtnl
+ """
+ Check that qstat and interface stats match in value.
+ """
def get_qstat(test):
- global netfam
stats = netfam.qstats_get({}, dump=True)
if stats:
for qs in stats:
if qs["ifindex"]== test.ifindex:
return qs
+ return None
qstat = get_qstat(cfg)
if qstat is None:
for _ in range(10):
rtstat = rtnl.getlink({"ifi-index": cfg.ifindex})['stats64']
if stat_cmp(rtstat, qstat) < 0:
- raise Exception("RTNL stats are lower, fetched later")
+ raise KsftFailEx("RTNL stats are lower, fetched later")
qstat = get_qstat(cfg)
if stat_cmp(rtstat, qstat) > 0:
- raise Exception("Qstats are lower, fetched later")
+ raise KsftFailEx("Qstats are lower, fetched later")
def qstat_by_ifindex(cfg) -> None:
- global netfam
- global rtnl
+ """ Qstats Netlink API tests - querying by ifindex. """
# Construct a map ifindex -> [dump, by-index, dump]
ifindexes = {}
for entry in stats:
ifindexes[entry['ifindex']] = [entry, None, None]
- for ifindex in ifindexes.keys():
+ for ifindex in ifindexes:
entry = netfam.qstats_get({"ifindex": ifindex}, dump=True)
ksft_eq(len(entry), 1)
ifindexes[entry[0]['ifindex']][1] = entry[0]
# Try to get stats for lowest unused ifindex but not 0
devs = rtnl.getlink({}, dump=True)
- all_ifindexes = set([dev["ifi-index"] for dev in devs])
+ all_ifindexes = set(dev["ifi-index"] for dev in devs)
lowest = 2
while lowest in all_ifindexes:
lowest += 1
@ksft_disruptive
def check_down(cfg) -> None:
+ """ Test statistics (interface and qstat) are not impacted by ifdown """
+
try:
qstat = netfam.qstats_get({"ifindex": cfg.ifindex}, dump=True)[0]
except NlError as e:
if e.error == errno.EOPNOTSUPP:
- raise KsftSkipEx("qstats not supported by the device")
+ raise KsftSkipEx("qstats not supported by the device") from e
raise
ip(f"link set dev {cfg.dev['ifname']} down")
defer(ip, f"link set dev {cfg.dev['ifname']} up")
qstat2 = netfam.qstats_get({"ifindex": cfg.ifindex}, dump=True)[0]
- for k, v in qstat.items():
+ for k in qstat:
ksft_ge(qstat2[k], qstat[k], comment=f"{k} went backwards on device down")
# exercise per-queue API to make sure that "device down" state
def main() -> None:
+ """ Ksft boiler plate main """
+
with NetDrvEnv(__file__, queue_count=100) as cfg:
ksft_run([check_pause, check_fec, pkt_byte_sum, qstat_by_ifindex,
check_down, procfs_hammer, procfs_downup_hammer],