import pyroute2
import pytest
import signal
+import multiprocessing
# All allowed namespace types
NAMESPACE_FLAGS = dict(mnt=0x00020000,
raise OSError(e, os.strerror(e))
+def mount_tmpfs(target, private=False):
+ flags = [0]
+ if private:
+ flags.append(1 << 18) # MS_PRIVATE
+ flags.append(1 << 19) # MS_SLAVE
+ for fl in flags:
+ ret = libc.mount(b"none",
+ target.encode('ascii'),
+ b"tmpfs",
+ fl,
+ None)
+ if ret == -1:
+ e = ctypes.get_errno()
+ raise OSError(e, os.strerror(e))
+
+
+def _mount_proc(target):
+ flags = [2 | 4 | 8] # MS_NOSUID | MS_NODEV | MS_NOEXEC
+ flags.append(1 << 18) # MS_PRIVATE
+ flags.append(1 << 19) # MS_SLAVE
+ for fl in flags:
+ ret = libc.mount(b"proc",
+ target.encode('ascii'),
+ b"proc",
+ fl,
+ None)
+ if ret == -1:
+ e = ctypes.get_errno()
+ raise OSError(e, os.strerror(e))
+
+
+def mount_proc(target="/proc"):
+ # We need to be sure /proc is correct. We do that in another
+ # process as this doesn't play well with setns().
+ if not os.path.isdir(target):
+ os.mkdir(target)
+ p = multiprocessing.Process(target=_mount_proc, args=(target,))
+ p.start()
+ p.join()
+
+
class Namespace(object):
"""Combine several namespaces into one.
# MS_REC | MS_PRIVATE
16384 | (1 << 18),
None)
- mount_sys()
while True:
try:
"""
- def __init__(self):
+ def __init__(self, tmpdir):
self.namespaces = {}
+ self.tmpdir = tmpdir
def __call__(self, ns):
"""Return a namespace. Create it if it doesn't exist."""
if ns in self.namespaces:
return self.namespaces[ns]
+
self.namespaces[ns] = Namespace('ipc', 'net', 'mnt', 'uts')
+ with self.namespaces[ns]:
+ mount_proc()
+ mount_sys()
+ # Also setup the "namespace-dependant" directory
+ self.tmpdir.join("ns").ensure(dir=True)
+ mount_tmpfs(str(self.tmpdir.join("ns")), private=True)
+
return self.namespaces[ns]
@pytest.fixture
-def namespaces():
- return NamespaceFactory()
+def namespaces(tmpdir):
+ return NamespaceFactory(tmpdir)
import ctypes
from collections import namedtuple
+from .namespaces import mount_proc, mount_tmpfs
+
libc = ctypes.CDLL('libc.so.6', use_errno=True)
raise OSError(e, os.strerror(e))
-def mount_tmpfs(target, private=False):
- flags = [0]
- if private:
- flags.append(1 << 18) # MS_PRIVATE
- flags.append(1 << 19) # MS_SLAVE
- for fl in flags:
- ret = libc.mount(b"none",
- target.encode('ascii'),
- b"tmpfs",
- fl,
- None)
- if ret == -1:
- e = ctypes.get_errno()
- raise OSError(e, os.strerror(e))
-
-
-def _mount_proc(target):
- flags = [2 | 4 | 8] # MS_NOSUID | MS_NODEV | MS_NOEXEC
- flags.append(1 << 18) # MS_PRIVATE
- flags.append(1 << 19) # MS_SLAVE
- for fl in flags:
- ret = libc.mount(b"proc",
- target.encode('ascii'),
- b"proc",
- fl,
- None)
- if ret == -1:
- e = ctypes.get_errno()
- raise OSError(e, os.strerror(e))
-
-
-def mount_proc(target="/proc"):
- # We need to be sure /proc is correct. We do that in another
- # process as this doesn't play well with setns().
- if not os.path.isdir(target):
- os.mkdir(target)
- p = multiprocessing.Process(target=_mount_proc, args=(target,))
- p.start()
- p.join()
-
-
def most_recent(*args):
"""Return the most recent files matching one of the provided glob
expression."""
# Setup privsep. While not enforced, we assume we are running in a
# throwaway mount namespace.
tmpdir = self.tmpdir
- mount_proc()
if self.config.lldpd.privsep.enabled:
# Chroot
chroot = self.config.lldpd.privsep.chroot
_replace_file(tmpdir, "/etc/passwd", passwd)
_replace_file(tmpdir, "/etc/group", fgroup)
- # Also setup the "namespace-dependant" directory
- tmpdir.join("ns").ensure(dir=True)
- mount_tmpfs(str(tmpdir.join("ns")), private=True)
-
# We also need a proper /etc/os-release
_replace_file(tmpdir, "/etc/os-release",
"""PRETTY_NAME="Spectacular GNU/Linux 2016"
return run
+@pytest.fixture()
+def snmpd(request, tmpdir):
+ """Execute ``snmpd``."""
+ count = [0]
+
+ def run(*args):
+ conffile = tmpdir.join("ns", "snmpd.conf")
+ pidfile = tmpdir.join("ns", "snmpd.pid")
+ with conffile.open("w") as f:
+ f.write("""
+rocommunity public
+rwcommunity private
+master agentx
+trap2sink 127.0.0.1
+""")
+ sargs = ("-I",
+ "snmp_mib,sysORTable"
+ ",usmConf,usmStats,usmUser"
+ ",vacm_conf,vacm_context,vacm_vars",
+ "-Lf", "/dev/null",
+ "-p", str(pidfile),
+ "-C", "-c", str(conffile))
+ try:
+ p = subprocess.Popen(("snmpd",) + sargs + args,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ except OSError as e:
+ if e.errno == os.errno.ENOENT:
+ pytest.skip("snmpd not present")
+ return
+ raise e
+ stdout, stderr = p.communicate(timeout=5)
+ result = namedtuple('ProcessResult',
+ ['returncode', 'stdout', 'stderr'])(
+ p.returncode, stdout, stderr)
+ request.node.add_report_section(
+ 'run', 'snmpd output {}'.format(count[0]),
+ format_process_output("snmpd", sargs, result))
+ count[0] += 1
+ time.sleep(1)
+
+ def kill():
+ try:
+ with pidfile.open("r") as p:
+ os.kill(int(p.read()))
+ except:
+ pass
+ request.addfinalizer(kill)
+
+ return run
+
+
+@pytest.fixture()
+def snmpwalk():
+ def run(*args):
+ try:
+ p = subprocess.Popen(("env", "MIBDIRS=",
+ "snmpwalk",
+ "-v2c", "-c", "private",
+ "-Ob", "-Oe", "-On",
+ "localhost") + args,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ except OSError as e:
+ if e.errno == os.errno.ENOENT:
+ pytest.skip("snmpwalk not present")
+ return
+ raise e
+ stdout, stderr = p.communicate(timeout=30)
+ result = namedtuple('ProcessResult',
+ ['returncode', 'stdout', 'stderr'])(
+ p.returncode, stdout, stderr)
+ # When keyvalue is requested, return a formatted result
+ assert result.returncode == 0
+ out = {}
+ for k, v in [l.split(' = ', 2)
+ for l in result.stdout.decode('ascii').split("\n")
+ if ' = ' in l]:
+ out[k] = v
+ return out
+ return run
+
+
def pytest_runtest_makereport(item, call):
"""Collect outputs written to tmpdir and put them in report."""
# Only do that after tests are run, but not on teardown (too late)