]>
git.ipfire.org Git - thirdparty/lldpd.git/blob - tests/integration/fixtures/namespaces.py
10 # All allowed namespace types
11 NAMESPACE_FLAGS
= dict(mnt
=0x00020000,
19 libc
= ctypes
.CDLL('libc.so.6', use_errno
=True)
22 @contextlib.contextmanager
24 """Restore the current directory on exit."""
32 def mount_sys(target
="/sys"):
33 flags
= [2 |
4 |
8] # MS_NOSUID | MS_NODEV | MS_NOEXEC
34 flags
.append(1 << 18) # MS_PRIVATE
35 flags
.append(1 << 19) # MS_SLAVE
37 ret
= libc
.mount(b
"none",
38 target
.encode('ascii'),
43 e
= ctypes
.get_errno()
44 raise OSError(e
, os
.strerror(e
))
47 def mount_tmpfs(target
, private
=False):
50 flags
.append(1 << 18) # MS_PRIVATE
51 flags
.append(1 << 19) # MS_SLAVE
53 ret
= libc
.mount(b
"none",
54 target
.encode('ascii'),
59 e
= ctypes
.get_errno()
60 raise OSError(e
, os
.strerror(e
))
63 def _mount_proc(target
):
64 flags
= [2 |
4 |
8] # MS_NOSUID | MS_NODEV | MS_NOEXEC
65 flags
.append(1 << 18) # MS_PRIVATE
66 flags
.append(1 << 19) # MS_SLAVE
68 ret
= libc
.mount(b
"proc",
69 target
.encode('ascii'),
74 e
= ctypes
.get_errno()
75 raise OSError(e
, os
.strerror(e
))
78 def mount_proc(target
="/proc"):
79 # We need to be sure /proc is correct. We do that in another
80 # process as this doesn't play well with setns().
81 if not os
.path
.isdir(target
):
83 p
= multiprocessing
.Process(target
=_mount_proc
, args
=(target
,))
88 class Namespace(object):
89 """Combine several namespaces into one.
91 This gets a list of namespace types to create and combine into one. The
92 combined namespace can be used as a context manager to enter all the
93 created namespaces and exit them at the end.
96 def __init__(self
, *namespaces
):
98 self
.namespaces
= namespaces
100 assert ns
in NAMESPACE_FLAGS
102 # Get a pipe to signal the future child to exit
103 self
.pipe
= os
.pipe()
105 # First, create a child in the given namespaces
106 child
= ctypes
.CFUNCTYPE(ctypes
.c_int
)(self
.child
)
107 child_stack
= ctypes
.create_string_buffer(STACKSIZE
)
108 child_stack_pointer
= ctypes
.c_void_p(
109 ctypes
.cast(child_stack
,
110 ctypes
.c_void_p
).value
+ STACKSIZE
)
111 flags
= signal
.SIGCHLD
112 for ns
in namespaces
:
113 flags |
= NAMESPACE_FLAGS
[ns
]
114 pid
= libc
.clone(child
, child_stack_pointer
, flags
)
116 e
= ctypes
.get_errno()
117 raise OSError(e
, os
.strerror(e
))
119 # If a user namespace, map UID 0 to the current one
120 if 'user' in namespaces
:
121 uid_map
= '0 {} 1'.format(os
.getuid())
122 gid_map
= '0 {} 1'.format(os
.getgid())
123 with
open('/proc/{}/uid_map'.format(pid
), 'w') as f
:
125 with
open('/proc/{}/setgroups'.format(pid
), 'w') as f
:
127 with
open('/proc/{}/gid_map'.format(pid
), 'w') as f
:
130 # Retrieve a file descriptor to this new namespace
131 self
.next
= [os
.open('/proc/{}/ns/{}'.format(pid
, x
),
132 os
.O_RDONLY
) for x
in namespaces
]
134 # Keep a file descriptor to our old namespaces
135 self
.previous
= [os
.open('/proc/self/ns/{}'.format(x
),
136 os
.O_RDONLY
) for x
in namespaces
]
138 # Tell the child all is done and let it die
139 os
.close(self
.pipe
[0])
140 if 'pid' not in namespaces
:
141 os
.close(self
.pipe
[1])
148 for fd
in self
.previous
:
150 if self
.pipe
is not None:
151 os
.close(self
.pipe
[1])
156 Just be here until our parent extract the file descriptor from
160 os
.close(self
.pipe
[1])
162 # For a network namespace, enable lo
163 if 'net' in self
.namespaces
:
164 ipr
= pyroute2
.IPRoute()
165 lo
= ipr
.link_lookup(ifname
='lo')[0]
166 ipr
.link('set', index
=lo
, state
='up')
167 # For a mount namespace, make it private
168 if 'mnt' in self
.namespaces
:
169 libc
.mount(b
"none", b
"/", None,
170 # MS_REC | MS_PRIVATE
176 os
.read(self
.pipe
[0], 1)
178 if e
.errno
in [errno
.EAGAIN
, errno
.EINTR
]:
184 def fd(self
, namespace
):
185 """Return the file descriptor associated to a namespace"""
186 assert namespace
in self
.namespaces
187 return self
.next
[self
.namespaces
.index(namespace
)]
190 with
keep_directory():
192 if libc
.setns(n
, 0) == -1:
193 ns
= self
.namespaces
[self
.next
.index(n
)] # NOQA
194 e
= ctypes
.get_errno()
195 raise OSError(e
, os
.strerror(e
))
197 def __exit__(self
, *exc
):
198 with
keep_directory():
200 for p
in reversed(self
.previous
):
201 if libc
.setns(p
, 0) == -1 and err
is None:
202 ns
= self
.namespaces
[self
.previous
.index(p
)] # NOQA
203 e
= ctypes
.get_errno()
204 err
= OSError(e
, os
.strerror(e
))
209 return 'Namespace({})'.format(", ".join(self
.namespaces
))
212 class NamespaceFactory(object):
213 """Dynamically create namespaces as they are created.
215 Those namespaces are namespaces for IPC, net, mount and UTS. PID
216 is a bit special as we have to keep a process for that. We don't
217 do that to ensure that everything is cleaned
218 automatically. Therefore, the child process is killed as soon as
219 we got a file descriptor to the namespace. We don't use a user
220 namespace either because we are unlikely to be able to exit it.
224 def __init__(self
, tmpdir
):
228 def __call__(self
, ns
):
229 """Return a namespace. Create it if it doesn't exist."""
230 if ns
in self
.namespaces
:
231 return self
.namespaces
[ns
]
233 self
.namespaces
[ns
] = Namespace('ipc', 'net', 'mnt', 'uts')
234 with self
.namespaces
[ns
]:
237 # Also setup the "namespace-dependant" directory
238 self
.tmpdir
.join("ns").ensure(dir=True)
239 mount_tmpfs(str(self
.tmpdir
.join("ns")), private
=True)
241 return self
.namespaces
[ns
]
245 def namespaces(tmpdir
):
246 return NamespaceFactory(tmpdir
)