]> git.ipfire.org Git - thirdparty/lldpd.git/commitdiff
tests: add SNMP-related tests
authorVincent Bernat <vincent@bernat.im>
Sat, 16 Jun 2018 15:30:06 +0000 (17:30 +0200)
committerVincent Bernat <vincent@bernat.im>
Sat, 16 Jun 2018 15:56:00 +0000 (17:56 +0200)
tests/ci/install.sh
tests/integration/fixtures/namespaces.py
tests/integration/fixtures/programs.py
tests/integration/test_snmp.py [new file with mode: 0644]

index b4e2a97fb4400a9042f0ce01a9c7e978f08209aa..b19bd35d0c73588e0fe50e5dd98ba8afeb072a20 100755 (executable)
@@ -20,7 +20,8 @@ case "$(uname -s)" in
             libsnmp-dev libxml2-dev \
             libevent-dev libreadline-dev libbsd-dev \
             check libc6-dbg libevent-dbg libseccomp-dev \
-            libpcap-dev libcap-dev
+            libpcap-dev libcap-dev \
+            snmpd snmp
         [ $CC != gcc ] || \
             sudo apt-get -qqy install gcc-5
         # For integration tests
index 7548f2ed8debc77ca923f2b01ca65c6e2b78e82c..a5d62ba3717a8349c2f3d7c48dbdefdc8158bf5d 100644 (file)
@@ -5,6 +5,7 @@ import os
 import pyroute2
 import pytest
 import signal
+import multiprocessing
 
 # All allowed namespace types
 NAMESPACE_FLAGS = dict(mnt=0x00020000,
@@ -43,6 +44,47 @@ def mount_sys(target="/sys"):
             raise OSError(e, os.strerror(e))
 
 
+def mount_tmpfs(target, private=False):
+    flags = [0]
+    if private:
+        flags.append(1 << 18)   # MS_PRIVATE
+        flags.append(1 << 19)   # MS_SLAVE
+    for fl in flags:
+        ret = libc.mount(b"none",
+                         target.encode('ascii'),
+                         b"tmpfs",
+                         fl,
+                         None)
+        if ret == -1:
+            e = ctypes.get_errno()
+            raise OSError(e, os.strerror(e))
+
+
+def _mount_proc(target):
+    flags = [2 | 4 | 8] # MS_NOSUID | MS_NODEV | MS_NOEXEC
+    flags.append(1 << 18)   # MS_PRIVATE
+    flags.append(1 << 19)   # MS_SLAVE
+    for fl in flags:
+        ret = libc.mount(b"proc",
+                         target.encode('ascii'),
+                         b"proc",
+                         fl,
+                         None)
+        if ret == -1:
+            e = ctypes.get_errno()
+            raise OSError(e, os.strerror(e))
+
+
+def mount_proc(target="/proc"):
+    # We need to be sure /proc is correct. We do that in another
+    # process as this doesn't play well with setns().
+    if not os.path.isdir(target):
+        os.mkdir(target)
+    p = multiprocessing.Process(target=_mount_proc, args=(target,))
+    p.start()
+    p.join()
+
+
 class Namespace(object):
     """Combine several namespaces into one.
 
@@ -127,7 +169,6 @@ class Namespace(object):
                        # MS_REC | MS_PRIVATE
                        16384 | (1 << 18),
                        None)
-            mount_sys()
 
         while True:
             try:
@@ -179,17 +220,26 @@ class NamespaceFactory(object):
 
     """
 
-    def __init__(self):
+    def __init__(self, tmpdir):
         self.namespaces = {}
+        self.tmpdir = tmpdir
 
     def __call__(self, ns):
         """Return a namespace. Create it if it doesn't exist."""
         if ns in self.namespaces:
             return self.namespaces[ns]
+
         self.namespaces[ns] = Namespace('ipc', 'net', 'mnt', 'uts')
+        with self.namespaces[ns]:
+            mount_proc()
+            mount_sys()
+            # Also setup the "namespace-dependant" directory
+            self.tmpdir.join("ns").ensure(dir=True)
+            mount_tmpfs(str(self.tmpdir.join("ns")), private=True)
+
         return self.namespaces[ns]
 
 
 @pytest.fixture
-def namespaces():
-    return NamespaceFactory()
+def namespaces(tmpdir):
+    return NamespaceFactory(tmpdir)
index 63f7e1a268dd8fa49583d0df1574bdef4033eada..a5de4836a911d4662eb322b002c2690979553f0b 100644 (file)
@@ -13,6 +13,8 @@ import platform
 import ctypes
 from collections import namedtuple
 
+from .namespaces import mount_proc, mount_tmpfs
+
 libc = ctypes.CDLL('libc.so.6', use_errno=True)
 
 
@@ -27,47 +29,6 @@ def mount_bind(source, target):
         raise OSError(e, os.strerror(e))
 
 
-def mount_tmpfs(target, private=False):
-    flags = [0]
-    if private:
-        flags.append(1 << 18)   # MS_PRIVATE
-        flags.append(1 << 19)   # MS_SLAVE
-    for fl in flags:
-        ret = libc.mount(b"none",
-                         target.encode('ascii'),
-                         b"tmpfs",
-                         fl,
-                         None)
-        if ret == -1:
-            e = ctypes.get_errno()
-            raise OSError(e, os.strerror(e))
-
-
-def _mount_proc(target):
-    flags = [2 | 4 | 8] # MS_NOSUID | MS_NODEV | MS_NOEXEC
-    flags.append(1 << 18)   # MS_PRIVATE
-    flags.append(1 << 19)   # MS_SLAVE
-    for fl in flags:
-        ret = libc.mount(b"proc",
-                         target.encode('ascii'),
-                         b"proc",
-                         fl,
-                         None)
-        if ret == -1:
-            e = ctypes.get_errno()
-            raise OSError(e, os.strerror(e))
-
-
-def mount_proc(target="/proc"):
-    # We need to be sure /proc is correct. We do that in another
-    # process as this doesn't play well with setns().
-    if not os.path.isdir(target):
-        os.mkdir(target)
-    p = multiprocessing.Process(target=_mount_proc, args=(target,))
-    p.start()
-    p.join()
-
-
 def most_recent(*args):
     """Return the most recent files matching one of the provided glob
     expression."""
@@ -188,7 +149,6 @@ class LldpdFactory(object):
         # Setup privsep. While not enforced, we assume we are running in a
         # throwaway mount namespace.
         tmpdir = self.tmpdir
-        mount_proc()
         if self.config.lldpd.privsep.enabled:
             # Chroot
             chroot = self.config.lldpd.privsep.chroot
@@ -222,10 +182,6 @@ class LldpdFactory(object):
                 _replace_file(tmpdir, "/etc/passwd", passwd)
                 _replace_file(tmpdir, "/etc/group", fgroup)
 
-        # Also setup the "namespace-dependant" directory
-        tmpdir.join("ns").ensure(dir=True)
-        mount_tmpfs(str(tmpdir.join("ns")), private=True)
-
         # We also need a proper /etc/os-release
         _replace_file(tmpdir, "/etc/os-release",
                       """PRETTY_NAME="Spectacular GNU/Linux 2016"
@@ -322,6 +278,89 @@ def lldpcli(request, tmpdir):
     return run
 
 
+@pytest.fixture()
+def snmpd(request, tmpdir):
+    """Execute ``snmpd``."""
+    count = [0]
+
+    def run(*args):
+        conffile = tmpdir.join("ns", "snmpd.conf")
+        pidfile = tmpdir.join("ns", "snmpd.pid")
+        with conffile.open("w") as f:
+            f.write("""
+rocommunity public
+rwcommunity private
+master agentx
+trap2sink 127.0.0.1
+""")
+        sargs = ("-I",
+                 "snmp_mib,sysORTable"
+                 ",usmConf,usmStats,usmUser"
+                 ",vacm_conf,vacm_context,vacm_vars",
+                 "-Lf", "/dev/null",
+                 "-p", str(pidfile),
+                 "-C", "-c", str(conffile))
+        try:
+            p = subprocess.Popen(("snmpd",) + sargs + args,
+                                 stdout=subprocess.PIPE,
+                                 stderr=subprocess.PIPE)
+        except OSError as e:
+            if e.errno == os.errno.ENOENT:
+                pytest.skip("snmpd not present")
+                return
+            raise e
+        stdout, stderr = p.communicate(timeout=5)
+        result = namedtuple('ProcessResult',
+                            ['returncode', 'stdout', 'stderr'])(
+                                p.returncode, stdout, stderr)
+        request.node.add_report_section(
+            'run', 'snmpd output {}'.format(count[0]),
+            format_process_output("snmpd", sargs, result))
+        count[0] += 1
+        time.sleep(1)
+
+        def kill():
+            try:
+                with pidfile.open("r") as p:
+                    os.kill(int(p.read()))
+            except:
+                pass
+        request.addfinalizer(kill)
+
+    return run
+
+
+@pytest.fixture()
+def snmpwalk():
+    def run(*args):
+        try:
+            p = subprocess.Popen(("env", "MIBDIRS=",
+                                  "snmpwalk",
+                                  "-v2c", "-c", "private",
+                                  "-Ob", "-Oe", "-On",
+                                  "localhost") + args,
+                                 stdout=subprocess.PIPE,
+                                 stderr=subprocess.PIPE)
+        except OSError as e:
+            if e.errno == os.errno.ENOENT:
+                pytest.skip("snmpwalk not present")
+                return
+            raise e
+        stdout, stderr = p.communicate(timeout=30)
+        result = namedtuple('ProcessResult',
+                            ['returncode', 'stdout', 'stderr'])(
+                                p.returncode, stdout, stderr)
+        # When keyvalue is requested, return a formatted result
+        assert result.returncode == 0
+        out = {}
+        for k, v in [l.split(' = ', 2)
+                     for l in result.stdout.decode('ascii').split("\n")
+                     if ' = ' in l]:
+            out[k] = v
+        return out
+    return run
+
+
 def pytest_runtest_makereport(item, call):
     """Collect outputs written to tmpdir and put them in report."""
     # Only do that after tests are run, but not on teardown (too late)
diff --git a/tests/integration/test_snmp.py b/tests/integration/test_snmp.py
new file mode 100644 (file)
index 0000000..2a3e85d
--- /dev/null
@@ -0,0 +1,29 @@
+import pytest
+import time
+
+pytestmark = pytest.mark.skipif(not pytest.config.lldpd.snmp,
+                                reason="no SNMP support")
+
+
+def test_snmp_register(snmpd, snmpwalk, lldpd, namespaces):
+    with namespaces(1):
+        snmpd()
+        lldpd("-x")
+        out = snmpwalk(".1.3.6.1.2.1.1.9.1.3")
+        assert 'STRING: "lldpMIB implementation by lldpd"' in out.values()
+
+
+def test_snmp_one_neighbor(snmpd, snmpwalk, lldpd, namespaces):
+    with namespaces(1):
+        snmpd()
+        lldpd("-x")
+    with namespaces(2):
+        lldpd()
+    with namespaces(1):
+        out = snmpwalk(".1.0.8802.1.1.2.1")
+        assert out['.1.0.8802.1.1.2.1.2.1.0'].startswith(
+            "Timeticks: ")
+        assert out['.1.0.8802.1.1.2.1.3.2.0'] == 'STRING: "ns-1.example.com"'
+        assert out['.1.0.8802.1.1.2.1.3.3.0'] == 'STRING: "ns-1.example.com"'
+        assert out['.1.0.8802.1.1.2.1.3.4.0'].startswith(
+            'STRING: "Spectacular GNU/Linux 2016 Linux')