From: Vincent Bernat Date: Sat, 16 Jun 2018 15:30:06 +0000 (+0200) Subject: tests: add SNMP-related tests X-Git-Tag: 1.0.2~16^2~1 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=987454994be604a3b8c27e58d68ff7d88fcf24de;p=thirdparty%2Flldpd.git tests: add SNMP-related tests --- diff --git a/tests/ci/install.sh b/tests/ci/install.sh index b4e2a97f..b19bd35d 100755 --- a/tests/ci/install.sh +++ b/tests/ci/install.sh @@ -20,7 +20,8 @@ case "$(uname -s)" in libsnmp-dev libxml2-dev \ libevent-dev libreadline-dev libbsd-dev \ check libc6-dbg libevent-dbg libseccomp-dev \ - libpcap-dev libcap-dev + libpcap-dev libcap-dev \ + snmpd snmp [ $CC != gcc ] || \ sudo apt-get -qqy install gcc-5 # For integration tests diff --git a/tests/integration/fixtures/namespaces.py b/tests/integration/fixtures/namespaces.py index 7548f2ed..a5d62ba3 100644 --- a/tests/integration/fixtures/namespaces.py +++ b/tests/integration/fixtures/namespaces.py @@ -5,6 +5,7 @@ import os import pyroute2 import pytest import signal +import multiprocessing # All allowed namespace types NAMESPACE_FLAGS = dict(mnt=0x00020000, @@ -43,6 +44,47 @@ def mount_sys(target="/sys"): raise OSError(e, os.strerror(e)) +def mount_tmpfs(target, private=False): + flags = [0] + if private: + flags.append(1 << 18) # MS_PRIVATE + flags.append(1 << 19) # MS_SLAVE + for fl in flags: + ret = libc.mount(b"none", + target.encode('ascii'), + b"tmpfs", + fl, + None) + if ret == -1: + e = ctypes.get_errno() + raise OSError(e, os.strerror(e)) + + +def _mount_proc(target): + flags = [2 | 4 | 8] # MS_NOSUID | MS_NODEV | MS_NOEXEC + flags.append(1 << 18) # MS_PRIVATE + flags.append(1 << 19) # MS_SLAVE + for fl in flags: + ret = libc.mount(b"proc", + target.encode('ascii'), + b"proc", + fl, + None) + if ret == -1: + e = ctypes.get_errno() + raise OSError(e, os.strerror(e)) + + +def mount_proc(target="/proc"): + # We need to be sure /proc is correct. We do that in another + # process as this doesn't play well with setns(). + if not os.path.isdir(target): + os.mkdir(target) + p = multiprocessing.Process(target=_mount_proc, args=(target,)) + p.start() + p.join() + + class Namespace(object): """Combine several namespaces into one. @@ -127,7 +169,6 @@ class Namespace(object): # MS_REC | MS_PRIVATE 16384 | (1 << 18), None) - mount_sys() while True: try: @@ -179,17 +220,26 @@ class NamespaceFactory(object): """ - def __init__(self): + def __init__(self, tmpdir): self.namespaces = {} + self.tmpdir = tmpdir def __call__(self, ns): """Return a namespace. Create it if it doesn't exist.""" if ns in self.namespaces: return self.namespaces[ns] + self.namespaces[ns] = Namespace('ipc', 'net', 'mnt', 'uts') + with self.namespaces[ns]: + mount_proc() + mount_sys() + # Also setup the "namespace-dependant" directory + self.tmpdir.join("ns").ensure(dir=True) + mount_tmpfs(str(self.tmpdir.join("ns")), private=True) + return self.namespaces[ns] @pytest.fixture -def namespaces(): - return NamespaceFactory() +def namespaces(tmpdir): + return NamespaceFactory(tmpdir) diff --git a/tests/integration/fixtures/programs.py b/tests/integration/fixtures/programs.py index 63f7e1a2..a5de4836 100644 --- a/tests/integration/fixtures/programs.py +++ b/tests/integration/fixtures/programs.py @@ -13,6 +13,8 @@ import platform import ctypes from collections import namedtuple +from .namespaces import mount_proc, mount_tmpfs + libc = ctypes.CDLL('libc.so.6', use_errno=True) @@ -27,47 +29,6 @@ def mount_bind(source, target): raise OSError(e, os.strerror(e)) -def mount_tmpfs(target, private=False): - flags = [0] - if private: - flags.append(1 << 18) # MS_PRIVATE - flags.append(1 << 19) # MS_SLAVE - for fl in flags: - ret = libc.mount(b"none", - target.encode('ascii'), - b"tmpfs", - fl, - None) - if ret == -1: - e = ctypes.get_errno() - raise OSError(e, os.strerror(e)) - - -def _mount_proc(target): - flags = [2 | 4 | 8] # MS_NOSUID | MS_NODEV | MS_NOEXEC - flags.append(1 << 18) # MS_PRIVATE - flags.append(1 << 19) # MS_SLAVE - for fl in flags: - ret = libc.mount(b"proc", - target.encode('ascii'), - b"proc", - fl, - None) - if ret == -1: - e = ctypes.get_errno() - raise OSError(e, os.strerror(e)) - - -def mount_proc(target="/proc"): - # We need to be sure /proc is correct. We do that in another - # process as this doesn't play well with setns(). - if not os.path.isdir(target): - os.mkdir(target) - p = multiprocessing.Process(target=_mount_proc, args=(target,)) - p.start() - p.join() - - def most_recent(*args): """Return the most recent files matching one of the provided glob expression.""" @@ -188,7 +149,6 @@ class LldpdFactory(object): # Setup privsep. While not enforced, we assume we are running in a # throwaway mount namespace. tmpdir = self.tmpdir - mount_proc() if self.config.lldpd.privsep.enabled: # Chroot chroot = self.config.lldpd.privsep.chroot @@ -222,10 +182,6 @@ class LldpdFactory(object): _replace_file(tmpdir, "/etc/passwd", passwd) _replace_file(tmpdir, "/etc/group", fgroup) - # Also setup the "namespace-dependant" directory - tmpdir.join("ns").ensure(dir=True) - mount_tmpfs(str(tmpdir.join("ns")), private=True) - # We also need a proper /etc/os-release _replace_file(tmpdir, "/etc/os-release", """PRETTY_NAME="Spectacular GNU/Linux 2016" @@ -322,6 +278,89 @@ def lldpcli(request, tmpdir): return run +@pytest.fixture() +def snmpd(request, tmpdir): + """Execute ``snmpd``.""" + count = [0] + + def run(*args): + conffile = tmpdir.join("ns", "snmpd.conf") + pidfile = tmpdir.join("ns", "snmpd.pid") + with conffile.open("w") as f: + f.write(""" +rocommunity public +rwcommunity private +master agentx +trap2sink 127.0.0.1 +""") + sargs = ("-I", + "snmp_mib,sysORTable" + ",usmConf,usmStats,usmUser" + ",vacm_conf,vacm_context,vacm_vars", + "-Lf", "/dev/null", + "-p", str(pidfile), + "-C", "-c", str(conffile)) + try: + p = subprocess.Popen(("snmpd",) + sargs + args, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + except OSError as e: + if e.errno == os.errno.ENOENT: + pytest.skip("snmpd not present") + return + raise e + stdout, stderr = p.communicate(timeout=5) + result = namedtuple('ProcessResult', + ['returncode', 'stdout', 'stderr'])( + p.returncode, stdout, stderr) + request.node.add_report_section( + 'run', 'snmpd output {}'.format(count[0]), + format_process_output("snmpd", sargs, result)) + count[0] += 1 + time.sleep(1) + + def kill(): + try: + with pidfile.open("r") as p: + os.kill(int(p.read())) + except: + pass + request.addfinalizer(kill) + + return run + + +@pytest.fixture() +def snmpwalk(): + def run(*args): + try: + p = subprocess.Popen(("env", "MIBDIRS=", + "snmpwalk", + "-v2c", "-c", "private", + "-Ob", "-Oe", "-On", + "localhost") + args, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + except OSError as e: + if e.errno == os.errno.ENOENT: + pytest.skip("snmpwalk not present") + return + raise e + stdout, stderr = p.communicate(timeout=30) + result = namedtuple('ProcessResult', + ['returncode', 'stdout', 'stderr'])( + p.returncode, stdout, stderr) + # When keyvalue is requested, return a formatted result + assert result.returncode == 0 + out = {} + for k, v in [l.split(' = ', 2) + for l in result.stdout.decode('ascii').split("\n") + if ' = ' in l]: + out[k] = v + return out + return run + + def pytest_runtest_makereport(item, call): """Collect outputs written to tmpdir and put them in report.""" # Only do that after tests are run, but not on teardown (too late) diff --git a/tests/integration/test_snmp.py b/tests/integration/test_snmp.py new file mode 100644 index 00000000..2a3e85da --- /dev/null +++ b/tests/integration/test_snmp.py @@ -0,0 +1,29 @@ +import pytest +import time + +pytestmark = pytest.mark.skipif(not pytest.config.lldpd.snmp, + reason="no SNMP support") + + +def test_snmp_register(snmpd, snmpwalk, lldpd, namespaces): + with namespaces(1): + snmpd() + lldpd("-x") + out = snmpwalk(".1.3.6.1.2.1.1.9.1.3") + assert 'STRING: "lldpMIB implementation by lldpd"' in out.values() + + +def test_snmp_one_neighbor(snmpd, snmpwalk, lldpd, namespaces): + with namespaces(1): + snmpd() + lldpd("-x") + with namespaces(2): + lldpd() + with namespaces(1): + out = snmpwalk(".1.0.8802.1.1.2.1") + assert out['.1.0.8802.1.1.2.1.2.1.0'].startswith( + "Timeticks: ") + assert out['.1.0.8802.1.1.2.1.3.2.0'] == 'STRING: "ns-1.example.com"' + assert out['.1.0.8802.1.1.2.1.3.3.0'] == 'STRING: "ns-1.example.com"' + assert out['.1.0.8802.1.1.2.1.3.4.0'].startswith( + 'STRING: "Spectacular GNU/Linux 2016 Linux')