]>
git.ipfire.org Git - thirdparty/lldpd.git/blob - tests/integration/fixtures/namespaces.py
9 # All allowed namespace types
10 NAMESPACE_FLAGS
= dict(mnt
=0x00020000,
18 libc
= ctypes
.CDLL('libc.so.6', use_errno
=True)
21 @contextlib.contextmanager
23 """Restore the current directory on exit."""
31 class Namespace(object):
32 """Combine several namespaces into one.
34 This gets a list of namespace types to create and combine into one. The
35 combined namespace can be used as a context manager to enter all the
36 created namespaces and exit them at the end.
39 def __init__(self
, *namespaces
):
40 self
.namespaces
= namespaces
42 assert ns
in NAMESPACE_FLAGS
44 # Get a pipe to signal the future child to exit
47 # First, create a child in the given namespaces
48 child
= ctypes
.CFUNCTYPE(ctypes
.c_int
)(self
.child
)
49 child_stack
= ctypes
.create_string_buffer(STACKSIZE
)
50 child_stack_pointer
= ctypes
.c_void_p(
51 ctypes
.cast(child_stack
,
52 ctypes
.c_void_p
).value
+ STACKSIZE
)
53 flags
= signal
.SIGCHLD
55 flags |
= NAMESPACE_FLAGS
[ns
]
56 pid
= libc
.clone(child
, child_stack_pointer
, flags
)
58 e
= ctypes
.get_errno()
59 raise OSError(e
, os
.strerror(e
))
61 # If a user namespace, map UID 0 to the current one
62 if 'user' in namespaces
:
63 uid_map
= '0 {} 1'.format(os
.getuid())
64 gid_map
= '0 {} 1'.format(os
.getgid())
65 with
open('/proc/{}/uid_map'.format(pid
), 'w') as f
:
67 with
open('/proc/{}/setgroups'.format(pid
), 'w') as f
:
69 with
open('/proc/{}/gid_map'.format(pid
), 'w') as f
:
72 # Retrieve a file descriptor to this new namespace
73 self
.next
= [os
.open('/proc/{}/ns/{}'.format(pid
, x
),
74 os
.O_RDONLY
) for x
in namespaces
]
76 # Keep a file descriptor to our old namespaces
77 self
.previous
= [os
.open('/proc/self/ns/{}'.format(x
),
78 os
.O_RDONLY
) for x
in namespaces
]
80 # Tell the child all is done and let it die
81 os
.close(self
.pipe
[0])
82 if 'pid' not in namespaces
:
83 os
.close(self
.pipe
[1])
89 Just be here until our parent extract the file descriptor from
93 os
.close(self
.pipe
[1])
95 # For a network namespace, enable lo
96 if 'net' in self
.namespaces
:
97 ipr
= pyroute2
.IPRoute()
98 lo
= ipr
.link_lookup(ifname
='lo')[0]
99 ipr
.link('set', index
=lo
, state
='up')
100 # For a mount namespace, make it private
101 if 'mnt' in self
.namespaces
:
102 libc
.mount(b
"none", b
"/", None,
103 # MS_REC | MS_PRIVATE
109 os
.read(self
.pipe
[0], 1)
111 if e
.errno
in [errno
.EAGAIN
, errno
.EINTR
]:
117 def fd(self
, namespace
):
118 """Return the file descriptor associated to a namespace"""
119 assert namespace
in self
.namespaces
120 return self
.next
[self
.namespaces
.index(namespace
)]
123 with
keep_directory():
125 if libc
.setns(n
, 0) == -1:
126 ns
= self
.namespaces
[self
.next
.index(n
)] # NOQA
127 e
= ctypes
.get_errno()
128 raise OSError(e
, os
.strerror(e
))
130 def __exit__(self
, *exc
):
131 with
keep_directory():
133 for p
in reversed(self
.previous
):
134 if libc
.setns(p
, 0) == -1 and err
is None:
135 ns
= self
.namespaces
[self
.previous
.index(p
)] # NOQA
136 e
= ctypes
.get_errno()
137 err
= OSError(e
, os
.strerror(e
))
142 return 'Namespace({})'.format(", ".join(self
.namespaces
))
145 class NamespaceFactory(object):
146 """Dynamically create namespaces as they are created.
148 Those namespaces are namespaces for IPC, net, mount and UTS. PID
149 is a bit special as we have to keep a process for that. We don't
150 do that to ensure that everything is cleaned
151 automatically. Therefore, the child process is killed as soon as
152 we got a file descriptor to the namespace. We don't use a user
153 namespace either because we are unlikely to be able to exit it.
160 def __call__(self
, ns
):
161 """Return a namespace. Create it if it doesn't exist."""
162 if ns
in self
.namespaces
:
163 return self
.namespaces
[ns
]
164 self
.namespaces
[ns
] = Namespace('ipc', 'net', 'mnt', 'uts')
165 return self
.namespaces
[ns
]
170 return NamespaceFactory()