#! /usr/bin/env python3
# SPDX-License-Identifier: GPL-2.0
+"""
+This module provides functional testing for the net/rds component.
+"""
import argparse
import ctypes
# Allow utils module to be imported from different directory
this_dir = os.path.dirname(os.path.realpath(__file__))
sys.path.append(os.path.join(this_dir, "../"))
-from lib.py.utils import ip
+# pylint: disable-next=wrong-import-position,import-error,no-name-in-module
+from lib.py.utils import ip # noqa: E402
libc = ctypes.cdll.LoadLibrary('libc.so.6')
setns = libc.setns
for net in [NET0, NET1]:
pcap = logdir+'/'+net+'.pcap'
fd, pcap_tmp = tempfile.mkstemp(suffix=".pcap", prefix=f"{net}-", dir="/tmp")
+ # pylint: disable-next=consider-using-with
p = subprocess.Popen(
['ip', 'netns', 'exec', net,
'/usr/sbin/tcpdump', '-i', 'any', '-w', pcap_tmp])
send_hashes.setdefault((sender.fileno(), receiver.fileno()),
hashlib.sha256()).update(f'<{send_data}>'.encode('utf-8'))
nr_send = nr_send + 1
- except BlockingIOError as e:
+ except BlockingIOError:
break
except OSError as e:
if e.errno in [errno.ENOBUFS, errno.ECONNRESET, errno.EPIPE]:
receiver.fileno()), hashlib.sha256()).update(
f'<{recv_data}>'.encode('utf-8'))
nr_recv = nr_recv + 1
- except BlockingIOError as e:
+ except BlockingIOError:
break
# exercise net/rds/tcp.c:rds_tcp_sysctl_reset()