]> git.ipfire.org Git - thirdparty/lldpd.git/blob - tests/integration/fixtures/namespaces.py
tests: replace integration test by py.test+namespace tests
[thirdparty/lldpd.git] / tests / integration / fixtures / namespaces.py
1 import contextlib
2 import ctypes
3 import errno
4 import os
5 import pyroute2
6 import pytest
7 import signal
8
9 # All allowed namespace types
10 NAMESPACE_FLAGS = dict(mnt=0x00020000,
11 uts=0x04000000,
12 ipc=0x08000000,
13 user=0x10000000,
14 pid=0x20000000,
15 net=0x40000000)
16 STACKSIZE = 1024*1024
17
18 libc = ctypes.CDLL('libc.so.6', use_errno=True)
19
20
21 @contextlib.contextmanager
22 def keep_directory():
23 """Restore the current directory on exit."""
24 pwd = os.getcwd()
25 try:
26 yield
27 finally:
28 os.chdir(pwd)
29
30
31 class Namespace(object):
32 """Combine several namespaces into one.
33
34 This gets a list of namespace types to create and combine into one. The
35 combined namespace can be used as a context manager to enter all the
36 created namespaces and exit them at the end.
37 """
38
39 def __init__(self, *namespaces):
40 self.namespaces = namespaces
41 for ns in namespaces:
42 assert ns in NAMESPACE_FLAGS
43
44 # Get a pipe to signal the future child to exit
45 self.pipe = os.pipe()
46
47 # First, create a child in the given namespaces
48 child = ctypes.CFUNCTYPE(ctypes.c_int)(self.child)
49 child_stack = ctypes.create_string_buffer(STACKSIZE)
50 child_stack_pointer = ctypes.c_void_p(
51 ctypes.cast(child_stack,
52 ctypes.c_void_p).value + STACKSIZE)
53 flags = signal.SIGCHLD
54 for ns in namespaces:
55 flags |= NAMESPACE_FLAGS[ns]
56 pid = libc.clone(child, child_stack_pointer, flags)
57 if pid == -1:
58 e = ctypes.get_errno()
59 raise OSError(e, os.strerror(e))
60
61 # If a user namespace, map UID 0 to the current one
62 if 'user' in namespaces:
63 uid_map = '0 {} 1'.format(os.getuid())
64 gid_map = '0 {} 1'.format(os.getgid())
65 with open('/proc/{}/uid_map'.format(pid), 'w') as f:
66 f.write(uid_map)
67 with open('/proc/{}/setgroups'.format(pid), 'w') as f:
68 f.write('deny')
69 with open('/proc/{}/gid_map'.format(pid), 'w') as f:
70 f.write(gid_map)
71
72 # Retrieve a file descriptor to this new namespace
73 self.next = [os.open('/proc/{}/ns/{}'.format(pid, x),
74 os.O_RDONLY) for x in namespaces]
75
76 # Keep a file descriptor to our old namespaces
77 self.previous = [os.open('/proc/self/ns/{}'.format(x),
78 os.O_RDONLY) for x in namespaces]
79
80 # Tell the child all is done and let it die
81 os.close(self.pipe[0])
82 if 'pid' not in namespaces:
83 os.close(self.pipe[1])
84 os.waitpid(pid, 0)
85
86 def child(self):
87 """Cloned child.
88
89 Just be here until our parent extract the file descriptor from
90 us.
91
92 """
93 os.close(self.pipe[1])
94
95 # For a network namespace, enable lo
96 if 'net' in self.namespaces:
97 ipr = pyroute2.IPRoute()
98 lo = ipr.link_lookup(ifname='lo')[0]
99 ipr.link('set', index=lo, state='up')
100 # For a mount namespace, make it private
101 if 'mnt' in self.namespaces:
102 libc.mount(b"none", b"/", None,
103 # MS_REC | MS_PRIVATE
104 16384 | (1 << 18),
105 None)
106
107 while True:
108 try:
109 os.read(self.pipe[0], 1)
110 except OSError as e:
111 if e.errno in [errno.EAGAIN, errno.EINTR]:
112 continue
113 break
114
115 os._exit(0)
116
117 def fd(self, namespace):
118 """Return the file descriptor associated to a namespace"""
119 assert namespace in self.namespaces
120 return self.next[self.namespaces.index(namespace)]
121
122 def __enter__(self):
123 with keep_directory():
124 for n in self.next:
125 if libc.setns(n, 0) == -1:
126 ns = self.namespaces[self.next.index(n)] # NOQA
127 e = ctypes.get_errno()
128 raise OSError(e, os.strerror(e))
129
130 def __exit__(self, *exc):
131 with keep_directory():
132 err = None
133 for p in reversed(self.previous):
134 if libc.setns(p, 0) == -1 and err is None:
135 ns = self.namespaces[self.previous.index(p)] # NOQA
136 e = ctypes.get_errno()
137 err = OSError(e, os.strerror(e))
138 if err:
139 raise err
140
141 def __repr__(self):
142 return 'Namespace({})'.format(", ".join(self.namespaces))
143
144
145 class NamespaceFactory(object):
146 """Dynamically create namespaces as they are created.
147
148 Those namespaces are namespaces for IPC, net, mount and UTS. PID
149 is a bit special as we have to keep a process for that. We don't
150 do that to ensure that everything is cleaned
151 automatically. Therefore, the child process is killed as soon as
152 we got a file descriptor to the namespace. We don't use a user
153 namespace either because we are unlikely to be able to exit it.
154
155 """
156
157 def __init__(self):
158 self.namespaces = {}
159
160 def __call__(self, ns):
161 """Return a namespace. Create it if it doesn't exist."""
162 if ns in self.namespaces:
163 return self.namespaces[ns]
164 self.namespaces[ns] = Namespace('ipc', 'net', 'mnt', 'uts')
165 return self.namespaces[ns]
166
167
168 @pytest.fixture
169 def namespaces():
170 return NamespaceFactory()