]>
git.ipfire.org Git - thirdparty/lldpd.git/blob - tests/integration/fixtures/network.py
7 from .namespaces
import Namespace
11 """Turn an int into a MAC address."""
12 return ":".join(('{:02x}',)*6).format(
13 *struct
.unpack('BBBBBB', c
.to_bytes(6, byteorder
='big')))
16 class LinksFactory(object):
17 """A factory for veth pair of interfaces and other L2 stuff.
19 Each veth interfaces will get named ethX with X strictly
20 increasing at each call.
25 # We create all those links in a dedicated namespace to avoid
26 # conflict with other namespaces.
27 self
.ns
= Namespace('net')
30 def __call__(self
, *args
, **kwargs
):
31 return self
.veth(*args
, **kwargs
)
33 def veth(self
, ns1
, ns2
, sleep
=0, mtu
=1500):
34 """Create a veth pair between two namespaces."""
36 # First, create a link
37 first
= 'eth{}'.format(self
.count
)
38 second
= 'eth{}'.format(self
.count
+ 1)
39 ipr
= pyroute2
.IPRoute()
44 idx
= [ipr
.link_lookup(ifname
=x
)[0]
45 for x
in (first
, second
)]
47 # Set an easy to remember MAC address
48 ipr
.link('set', index
=idx
[0],
49 address
=int_to_mac(self
.count
+ 1))
50 ipr
.link('set', index
=idx
[1],
51 address
=int_to_mac(self
.count
+ 2))
54 ipr
.link('set', index
=idx
[0], mtu
=mtu
)
55 ipr
.link('set', index
=idx
[1], mtu
=mtu
)
57 # Then, move each to the target namespace
58 ipr
.link('set', index
=idx
[0], net_ns_fd
=ns1
.fd('net'))
59 ipr
.link('set', index
=idx
[1], net_ns_fd
=ns2
.fd('net'))
63 ipr
= pyroute2
.IPRoute()
64 ipr
.link('set', index
=idx
[0], state
='up')
67 ipr
= pyroute2
.IPRoute()
68 ipr
.link('set', index
=idx
[1], state
='up')
72 def bridge(self
, name
, *ifaces
):
73 """Create a bridge."""
74 ipr
= pyroute2
.IPRoute()
79 idx
= ipr
.link_lookup(ifname
=name
)[0]
82 port
= ipr
.link_lookup(ifname
=iface
)[0]
83 ipr
.link('set', index
=port
, master
=idx
)
85 ipr
.link('set', index
=idx
, state
='up')
88 def _bond_or_team(self
, kind
, name
, *ifaces
):
89 """Create a bond or a team."""
90 ipr
= pyroute2
.RawIPRoute()
95 idx
= ipr
.link_lookup(ifname
=name
)[0]
98 slave
= ipr
.link_lookup(ifname
=iface
)[0]
99 ipr
.link('set', index
=slave
, state
='down')
100 ipr
.link('set', index
=slave
, master
=idx
)
102 ipr
.link('set', index
=idx
, state
='up')
105 def team(self
, name
, *ifaces
):
107 # Unfortunately, pyroute2 will try to run teamd too. This
109 return self
._bond
_or
_team
("team", name
, *ifaces
)
111 def bond(self
, name
, *ifaces
):
113 return self
._bond
_or
_team
("bond", name
, *ifaces
)
115 def dummy(self
, name
):
116 """Create a dummy interface."""
117 ipr
= pyroute2
.IPRoute()
118 ipr
.link('add', ifname
=name
, kind
='dummy')
119 idx
= ipr
.link_lookup(ifname
=name
)[0]
120 ipr
.link('set', index
=idx
, state
='up')
123 def vlan(self
, name
, id, iface
):
125 ipr
= pyroute2
.IPRoute()
126 idx
= ipr
.link_lookup(ifname
=iface
)[0]
132 idx
= ipr
.link_lookup(ifname
=name
)[0]
133 ipr
.link('set', index
=idx
, state
='up')
136 def bridge_vlan(self
, iface
, vid
, tagged
=True, pvid
=False, remove
=False):
137 ipr
= pyroute2
.IPRoute()
138 idx
= ipr
.link_lookup(ifname
=iface
)[0]
141 flags
.append('untagged')
145 ipr
.vlan_filter('del', index
=idx
, vlan_info
={"vid": 1})
146 ipr
.vlan_filter('add' if not remove
else 'del', index
=idx
,
147 vlan_info
={'vid': vid
,
151 ipr
= pyroute2
.IPRoute()
152 idx
= ipr
.link_lookup(ifname
=name
)[0]
153 ipr
.link('set', index
=idx
, state
='up')
155 def down(self
, name
):
156 ipr
= pyroute2
.IPRoute()
157 idx
= ipr
.link_lookup(ifname
=name
)[0]
158 ipr
.link('set', index
=idx
, state
='down')
160 def remove(self
, name
):
161 ipr
= pyroute2
.IPRoute()
162 idx
= ipr
.link_lookup(ifname
=name
)[0]
163 ipr
.link('del', index
=idx
)
165 def unbridge(self
, bridgename
, name
):
166 ipr
= pyroute2
.IPRoute()
167 idx
= ipr
.link_lookup(ifname
=name
)[0]
168 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
, 0)
169 ifr
= struct
.pack("16si", b
"br42", idx
)
171 0x89a3, # SIOCBRDELIF
178 return LinksFactory()