]>
git.ipfire.org Git - thirdparty/lldpd.git/blob - tests/integration/fixtures/namespaces.py
10 # All allowed namespace types
11 NAMESPACE_FLAGS
= dict(
19 STACKSIZE
= 1024 * 1024
21 libc
= ctypes
.CDLL("libc.so.6", use_errno
=True)
24 @contextlib.contextmanager
26 """Restore the current directory on exit."""
34 def mount_sys(target
="/sys"):
35 flags
= [2 |
4 |
8] # MS_NOSUID | MS_NODEV | MS_NOEXEC
36 flags
.append(1 << 18) # MS_PRIVATE
37 flags
.append(1 << 19) # MS_SLAVE
39 ret
= libc
.mount(b
"none", target
.encode("ascii"), b
"sysfs", fl
, None)
41 e
= ctypes
.get_errno()
42 raise OSError(e
, os
.strerror(e
))
45 def mount_tmpfs(target
, private
=False):
48 flags
.append(1 << 18) # MS_PRIVATE
49 flags
.append(1 << 19) # MS_SLAVE
51 ret
= libc
.mount(b
"none", target
.encode("ascii"), b
"tmpfs", fl
, None)
53 e
= ctypes
.get_errno()
54 raise OSError(e
, os
.strerror(e
))
57 def _mount_proc(target
):
58 flags
= [2 |
4 |
8] # MS_NOSUID | MS_NODEV | MS_NOEXEC
59 flags
.append(1 << 18) # MS_PRIVATE
60 flags
.append(1 << 19) # MS_SLAVE
62 ret
= libc
.mount(b
"proc", target
.encode("ascii"), b
"proc", fl
, None)
64 e
= ctypes
.get_errno()
65 raise OSError(e
, os
.strerror(e
))
68 def mount_proc(target
="/proc"):
69 # We need to be sure /proc is correct. We do that in another
70 # process as this doesn't play well with setns().
71 if not os
.path
.isdir(target
):
73 p
= multiprocessing
.Process(target
=_mount_proc
, args
=(target
,))
78 class Namespace(object):
79 """Combine several namespaces into one.
81 This gets a list of namespace types to create and combine into one. The
82 combined namespace can be used as a context manager to enter all the
83 created namespaces and exit them at the end.
86 def __init__(self
, *namespaces
):
88 self
.namespaces
= namespaces
90 assert ns
in NAMESPACE_FLAGS
92 # Get a pipe to signal the future child to exit
95 # First, create a child in the given namespaces
96 child
= ctypes
.CFUNCTYPE(ctypes
.c_int
)(self
.child
)
97 child_stack
= ctypes
.create_string_buffer(STACKSIZE
)
98 child_stack_pointer
= ctypes
.c_void_p(
99 ctypes
.cast(child_stack
, ctypes
.c_void_p
).value
+ STACKSIZE
101 flags
= signal
.SIGCHLD
102 for ns
in namespaces
:
103 flags |
= NAMESPACE_FLAGS
[ns
]
104 pid
= libc
.clone(child
, child_stack_pointer
, flags
)
106 e
= ctypes
.get_errno()
107 raise OSError(e
, os
.strerror(e
))
109 # If a user namespace, map UID 0 to the current one
110 if "user" in namespaces
:
111 uid_map
= "0 {} 1".format(os
.getuid())
112 gid_map
= "0 {} 1".format(os
.getgid())
113 with
open("/proc/{}/uid_map".format(pid
), "w") as f
:
115 with
open("/proc/{}/setgroups".format(pid
), "w") as f
:
117 with
open("/proc/{}/gid_map".format(pid
), "w") as f
:
120 # Retrieve a file descriptor to this new namespace
122 os
.open("/proc/{}/ns/{}".format(pid
, x
), os
.O_RDONLY
) for x
in namespaces
125 # Keep a file descriptor to our old namespaces
127 os
.open("/proc/self/ns/{}".format(x
), os
.O_RDONLY
) for x
in namespaces
130 # Tell the child all is done and let it die
131 os
.close(self
.pipe
[0])
132 if "pid" not in namespaces
:
133 os
.close(self
.pipe
[1])
140 for fd
in self
.previous
:
142 if self
.pipe
is not None:
143 os
.close(self
.pipe
[1])
148 Just be here until our parent extract the file descriptor from
152 os
.close(self
.pipe
[1])
154 # For a network namespace, enable lo
155 if "net" in self
.namespaces
:
156 with pyroute2
.IPRoute() as ipr
:
157 lo
= ipr
.link_lookup(ifname
="lo")[0]
158 ipr
.link("set", index
=lo
, state
="up")
159 # For a mount namespace, make it private
160 if "mnt" in self
.namespaces
:
165 # MS_REC | MS_PRIVATE
172 os
.read(self
.pipe
[0], 1)
174 if e
.errno
in [errno
.EAGAIN
, errno
.EINTR
]:
180 def fd(self
, namespace
):
181 """Return the file descriptor associated to a namespace"""
182 assert namespace
in self
.namespaces
183 return self
.next
[self
.namespaces
.index(namespace
)]
186 with
keep_directory():
188 if libc
.setns(n
, 0) == -1:
189 ns
= self
.namespaces
[self
.next
.index(n
)] # NOQA
190 e
= ctypes
.get_errno()
191 raise OSError(e
, os
.strerror(e
))
193 def __exit__(self
, *exc
):
194 with
keep_directory():
196 for p
in reversed(self
.previous
):
197 if libc
.setns(p
, 0) == -1 and err
is None:
198 ns
= self
.namespaces
[self
.previous
.index(p
)] # NOQA
199 e
= ctypes
.get_errno()
200 err
= OSError(e
, os
.strerror(e
))
205 return "Namespace({})".format(", ".join(self
.namespaces
))
208 class NamespaceFactory(object):
209 """Dynamically create namespaces as they are created.
211 Those namespaces are namespaces for IPC, net, mount and UTS. PID
212 is a bit special as we have to keep a process for that. We don't
213 do that to ensure that everything is cleaned
214 automatically. Therefore, the child process is killed as soon as
215 we got a file descriptor to the namespace. We don't use a user
216 namespace either because we are unlikely to be able to exit it.
220 def __init__(self
, tmpdir
):
224 def __call__(self
, ns
):
225 """Return a namespace. Create it if it doesn't exist."""
226 if ns
in self
.namespaces
:
227 return self
.namespaces
[ns
]
229 self
.namespaces
[ns
] = Namespace("ipc", "net", "mnt", "uts")
230 with self
.namespaces
[ns
]:
233 # Also setup the "namespace-dependant" directory
234 self
.tmpdir
.join("ns").ensure(dir=True)
235 mount_tmpfs(str(self
.tmpdir
.join("ns")), private
=True)
237 return self
.namespaces
[ns
]
241 def namespaces(tmpdir
):
242 return NamespaceFactory(tmpdir
)