# For a network namespace, enable lo
if 'net' in self.namespaces:
- ipr = pyroute2.IPRoute()
- lo = ipr.link_lookup(ifname='lo')[0]
- ipr.link('set', index=lo, state='up')
+ with pyroute2.IPRoute() as ipr:
+ lo = ipr.link_lookup(ifname='lo')[0]
+ ipr.link('set', index=lo, state='up')
# For a mount namespace, make it private
if 'mnt' in self.namespaces:
libc.mount(b"none", b"/", None,
# First, create a link
first = 'eth{}'.format(self.count)
second = 'eth{}'.format(self.count + 1)
- ipr = pyroute2.IPRoute()
- ipr.link('add',
- ifname=first,
- peer=second,
- kind='veth')
- idx = [ipr.link_lookup(ifname=x)[0]
- for x in (first, second)]
-
- # Set an easy to remember MAC address
- ipr.link('set', index=idx[0],
- address=int_to_mac(self.count + 1))
- ipr.link('set', index=idx[1],
- address=int_to_mac(self.count + 2))
-
- # Set MTU
- ipr.link('set', index=idx[0], mtu=mtu)
- ipr.link('set', index=idx[1], mtu=mtu)
-
- # Then, move each to the target namespace
- ipr.link('set', index=idx[0], net_ns_fd=ns1.fd('net'))
- ipr.link('set', index=idx[1], net_ns_fd=ns2.fd('net'))
+ with pyroute2.IPRoute() as ipr:
+ ipr.link('add',
+ ifname=first,
+ peer=second,
+ kind='veth')
+ idx = [ipr.link_lookup(ifname=x)[0]
+ for x in (first, second)]
+
+ # Set an easy to remember MAC address
+ ipr.link('set', index=idx[0],
+ address=int_to_mac(self.count + 1))
+ ipr.link('set', index=idx[1],
+ address=int_to_mac(self.count + 2))
+
+ # Set MTU
+ ipr.link('set', index=idx[0], mtu=mtu)
+ ipr.link('set', index=idx[1], mtu=mtu)
+
+ # Then, move each to the target namespace
+ ipr.link('set', index=idx[0], net_ns_fd=ns1.fd('net'))
+ ipr.link('set', index=idx[1], net_ns_fd=ns2.fd('net'))
# And put them up
with ns1:
- ipr = pyroute2.IPRoute()
- ipr.link('set', index=idx[0], state='up')
+ with pyroute2.IPRoute() as ipr:
+ ipr.link('set', index=idx[0], state='up')
time.sleep(sleep)
with ns2:
- ipr = pyroute2.IPRoute()
- ipr.link('set', index=idx[1], state='up')
+ with pyroute2.IPRoute() as ipr:
+ ipr.link('set', index=idx[1], state='up')
self.count += 2
def bridge(self, name, *ifaces, filtering=False):
"""Create a bridge."""
- ipr = pyroute2.IPRoute()
- # Create the bridge
- ipr.link('add',
- ifname=name,
- kind='bridge',
- br_vlan_filtering=filtering)
- idx = ipr.link_lookup(ifname=name)[0]
- # Attach interfaces
- for iface in ifaces:
- port = ipr.link_lookup(ifname=iface)[0]
- ipr.link('set', index=port, master=idx)
- # Put the bridge up
- ipr.link('set', index=idx, state='up')
- return idx
+ with pyroute2.IPRoute() as ipr:
+ # Create the bridge
+ ipr.link('add',
+ ifname=name,
+ kind='bridge',
+ br_vlan_filtering=filtering)
+ idx = ipr.link_lookup(ifname=name)[0]
+ # Attach interfaces
+ for iface in ifaces:
+ port = ipr.link_lookup(ifname=iface)[0]
+ ipr.link('set', index=port, master=idx)
+ # Put the bridge up
+ ipr.link('set', index=idx, state='up')
+ return idx
def _bond_or_team(self, kind, name, *ifaces):
"""Create a bond or a team."""
- ipr = pyroute2.RawIPRoute()
- # Create the bond
- ipr.link('add',
- ifname=name,
- kind=kind)
- idx = ipr.link_lookup(ifname=name)[0]
- # Attach interfaces
- for iface in ifaces:
- slave = ipr.link_lookup(ifname=iface)[0]
- ipr.link('set', index=slave, state='down')
- ipr.link('set', index=slave, master=idx)
- # Put the bond up
- ipr.link('set', index=idx, state='up')
- return idx
+ with pyroute2.RawIPRoute() as ipr:
+ # Create the bond
+ ipr.link('add',
+ ifname=name,
+ kind=kind)
+ idx = ipr.link_lookup(ifname=name)[0]
+ # Attach interfaces
+ for iface in ifaces:
+ slave = ipr.link_lookup(ifname=iface)[0]
+ ipr.link('set', index=slave, state='down')
+ ipr.link('set', index=slave, master=idx)
+ # Put the bond up
+ ipr.link('set', index=idx, state='up')
+ return idx
def team(self, name, *ifaces):
"""Create a team."""
def dummy(self, name):
"""Create a dummy interface."""
- ipr = pyroute2.IPRoute()
- ipr.link('add', ifname=name, kind='dummy')
- idx = ipr.link_lookup(ifname=name)[0]
- ipr.link('set', index=idx, state='up')
- return idx
+ with pyroute2.IPRoute() as ipr:
+ ipr.link('add', ifname=name, kind='dummy')
+ idx = ipr.link_lookup(ifname=name)[0]
+ ipr.link('set', index=idx, state='up')
+ return idx
def vlan(self, name, id, iface):
"""Create a VLAN."""
- ipr = pyroute2.IPRoute()
- idx = ipr.link_lookup(ifname=iface)[0]
- ipr.link('add',
- ifname=name,
- kind='vlan',
- vlan_id=id,
- link=idx)
- idx = ipr.link_lookup(ifname=name)[0]
- ipr.link('set', index=idx, state='up')
- return idx
+ with pyroute2.IPRoute() as ipr:
+ idx = ipr.link_lookup(ifname=iface)[0]
+ ipr.link('add',
+ ifname=name,
+ kind='vlan',
+ vlan_id=id,
+ link=idx)
+ idx = ipr.link_lookup(ifname=name)[0]
+ ipr.link('set', index=idx, state='up')
+ return idx
def bridge_vlan(self, iface, vid, tagged=True, pvid=False, remove=False):
- ipr = pyroute2.IPRoute()
- idx = ipr.link_lookup(ifname=iface)[0]
- flags = []
- if not tagged:
- flags.append('untagged')
- if pvid:
- flags.append('pvid')
- if not remove:
- ipr.vlan_filter('del', index=idx, vlan_info={"vid": 1})
- ipr.vlan_filter('add' if not remove else 'del', index=idx,
- vlan_info={'vid': vid,
- 'flags': flags})
+ with pyroute2.IPRoute() as ipr:
+ idx = ipr.link_lookup(ifname=iface)[0]
+ flags = []
+ if not tagged:
+ flags.append('untagged')
+ if pvid:
+ flags.append('pvid')
+ if not remove:
+ ipr.vlan_filter('del', index=idx, vlan_info={"vid": 1})
+ ipr.vlan_filter('add' if not remove else 'del', index=idx,
+ vlan_info={'vid': vid,
+ 'flags': flags})
def up(self, name):
- ipr = pyroute2.IPRoute()
- idx = ipr.link_lookup(ifname=name)[0]
- ipr.link('set', index=idx, state='up')
+ with pyroute2.IPRoute() as ipr:
+ idx = ipr.link_lookup(ifname=name)[0]
+ ipr.link('set', index=idx, state='up')
def down(self, name):
- ipr = pyroute2.IPRoute()
- idx = ipr.link_lookup(ifname=name)[0]
- ipr.link('set', index=idx, state='down')
+ with pyroute2.IPRoute() as ipr:
+ idx = ipr.link_lookup(ifname=name)[0]
+ ipr.link('set', index=idx, state='down')
def remove(self, name):
- ipr = pyroute2.IPRoute()
- idx = ipr.link_lookup(ifname=name)[0]
- ipr.link('del', index=idx)
+ with pyroute2.IPRoute() as ipr:
+ idx = ipr.link_lookup(ifname=name)[0]
+ ipr.link('del', index=idx)
def unbridge(self, bridgename, name):
- ipr = pyroute2.IPRoute()
- idx = ipr.link_lookup(ifname=name)[0]
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
- ifr = struct.pack("16si", b"br42", idx)
- fcntl.ioctl(s,
- 0x89a3, # SIOCBRDELIF
- ifr)
- s.close()
+ with pyroute2.IPRoute() as ipr:
+ idx = ipr.link_lookup(ifname=name)[0]
+ s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
+ ifr = struct.pack("16si", b"br42", idx)
+ fcntl.ioctl(s,
+ 0x89a3, # SIOCBRDELIF
+ ifr)
+ s.close()
@pytest.fixture
def test_forced_known_management_address(lldpd1, lldpd, lldpcli, namespaces):
with namespaces(2):
- ipr = pyroute2.IPRoute()
- idx = ipr.link_lookup(ifname="eth1")[0]
- ipr.addr('add', index=idx, address="192.168.14.2", mask=24)
+ with pyroute2.IPRoute() as ipr:
+ idx = ipr.link_lookup(ifname="eth1")[0]
+ ipr.addr('add', index=idx, address="192.168.14.2", mask=24)
lldpd("-m", "192.168.14.2")
with namespaces(1):
out = lldpcli("-f", "keyvalue", "show", "neighbors")
def test_management_address(lldpd1, lldpd, lldpcli, links, namespaces):
with namespaces(2):
- ipr = pyroute2.IPRoute()
- idx = ipr.link_lookup(ifname="eth1")[0]
- ipr.addr('add', index=idx, address="192.168.14.2", mask=24)
- ipr.addr('add', index=idx, address="172.25.21.47", mask=24)
+ with pyroute2.IPRoute() as ipr:
+ idx = ipr.link_lookup(ifname="eth1")[0]
+ ipr.addr('add', index=idx, address="192.168.14.2", mask=24)
+ ipr.addr('add', index=idx, address="172.25.21.47", mask=24)
lldpd("-m", "172.25.*")
with namespaces(1):
out = lldpcli("-f", "keyvalue", "show", "neighbors")
def test_management_interface(lldpd1, lldpd, lldpcli, links, namespaces):
links(namespaces(1), namespaces(2), 4)
with namespaces(2):
- ipr = pyroute2.IPRoute()
- idx = ipr.link_lookup(ifname="eth1")[0]
- ipr.addr('add', index=idx, address="192.168.14.2", mask=24)
- idx = ipr.link_lookup(ifname="eth3")[0]
- ipr.addr('add', index=idx, address="172.25.21.47", mask=24)
+ with pyroute2.IPRoute() as ipr:
+ idx = ipr.link_lookup(ifname="eth1")[0]
+ ipr.addr('add', index=idx, address="192.168.14.2", mask=24)
+ idx = ipr.link_lookup(ifname="eth3")[0]
+ ipr.addr('add', index=idx, address="172.25.21.47", mask=24)
lldpd("-m", "eth3")
with namespaces(1):
out = lldpcli("-f", "keyvalue", "show", "neighbors")
def test_change_management_address(lldpd1, lldpd, lldpcli, links, namespaces):
with namespaces(2):
- ipr = pyroute2.IPRoute()
- idx = ipr.link_lookup(ifname="eth1")[0]
- ipr.addr('add', index=idx, address="192.168.14.2", mask=24)
+ with pyroute2.IPRoute() as ipr:
+ idx = ipr.link_lookup(ifname="eth1")[0]
+ ipr.addr('add', index=idx, address="192.168.14.2", mask=24)
lldpd("-m", "192.168.*")
# We need a short TX interval as updating the IP address
# doesn't trigger a resend.
assert out["lldp.eth0.chassis.mgmt-ip"] == "192.168.14.2"
assert out["lldp.eth0.chassis.mgmt-iface"] == "2"
with namespaces(2):
- ipr.addr('del', index=idx, address="192.168.14.2", mask=24)
- ipr.addr('add', index=idx, address="192.168.14.5", mask=24)
+ with pyroute2.IPRoute() as ipr:
+ ipr.addr('del', index=idx, address="192.168.14.2", mask=24)
+ ipr.addr('add', index=idx, address="192.168.14.5", mask=24)
time.sleep(5)
with namespaces(1):
out = lldpcli("-f", "keyvalue", "show", "neighbors")
def test_portid_subtype_with_alias(lldpd1, lldpd, lldpcli, links, namespaces):
with namespaces(2):
- ipr = pyroute2.IPRoute()
- idx = ipr.link_lookup(ifname="eth1")[0]
- ipr.link('set', index=idx, ifalias="alias of eth1")
+ with pyroute2.IPRoute() as ipr:
+ idx = ipr.link_lookup(ifname="eth1")[0]
+ ipr.link('set', index=idx, ifalias="alias of eth1")
lldpd()
with namespaces(1):
out = lldpcli("-f", "keyvalue", "show", "neighbors")
def test_portid_subtype_macaddress(lldpd1, lldpd, lldpcli, links, namespaces):
with namespaces(2):
- ipr = pyroute2.IPRoute()
- idx = ipr.link_lookup(ifname="eth1")[0]
- ipr.link('set', index=idx, ifalias="alias of eth1")
+ with pyroute2.IPRoute() as ipr:
+ idx = ipr.link_lookup(ifname="eth1")[0]
+ ipr.link('set', index=idx, ifalias="alias of eth1")
lldpd()
lldpcli("configure", "lldp", "portidsubtype", "macaddress")
time.sleep(3)
def test_portid_subtype_local_with_alias(lldpd1, lldpd, lldpcli, namespaces):
with namespaces(2):
- ipr = pyroute2.IPRoute()
- idx = ipr.link_lookup(ifname="eth1")[0]
- ipr.link('set', index=idx, ifalias="alias of eth1")
+ with pyroute2.IPRoute() as ipr:
+ idx = ipr.link_lookup(ifname="eth1")[0]
+ ipr.link('set', index=idx, ifalias="alias of eth1")
lldpd()
lldpcli("configure", "lldp", "portidsubtype", "local", "localname")
time.sleep(3)
# unconfigure VLAN TX
lldpcli("unconfigure", "ports", "eth0", "lldp", "vlan-tx")
out = lldpcli("-f", "keyvalue", "show", "interfaces", "ports", "eth0")
- assert not "lldp.eth0.port.vlanTX.id" in out
- assert not "lldp.eth0.port.vlanTX.prio" in out
- assert not "lldp.eth0.port.vlanTX.dei" in out
+ assert "lldp.eth0.port.vlanTX.id" not in out
+ assert "lldp.eth0.port.vlanTX.prio" not in out
+ assert "lldp.eth0.port.vlanTX.dei" not in out
def test_set_interface_alias(lldpd1, lldpd, lldpcli, namespaces):
with namespaces(2):
lldpd()
with namespaces(1):
- ipr = pyroute2.IPRoute()
- link = ipr.link('get', ifname='eth0')[0]
- assert link.get_attr('IFLA_IFALIAS') == 'lldpd: connected to ns-2.example.com'
+ with pyroute2.IPRoute() as ipr:
+ link = ipr.link('get', ifname='eth0')[0]
+ assert link.get_attr('IFLA_IFALIAS') == 'lldpd: connected to ns-2.example.com'
def test_lldpdu_shutdown(lldpd, lldpcli, namespaces, links):
if when == 'after':
lldpd()
idx = links.bond('bond42', 'eth3', 'eth1')
- ipr = pyroute2.IPRoute()
- # The bond has the MAC of eth3
- assert ipr.get_links(idx)[0].get_attr('IFLA_ADDRESS') == \
- "00:00:00:00:00:04"
+ with pyroute2.IPRoute() as ipr:
+ # The bond has the MAC of eth3
+ assert ipr.get_links(idx)[0].get_attr('IFLA_ADDRESS') == \
+ "00:00:00:00:00:04"
if when == 'before':
lldpd()
else:
with namespaces(1):
lldpd("-s", "-ll", "-r")
with namespaces(2):
- ipr = pyroute2.IPRoute()
- idx = ipr.link_lookup(ifname="eth1")[0]
- ipr.addr('add', index=idx, address="192.168.14.2", mask=24)
+ with pyroute2.IPRoute() as ipr:
+ idx = ipr.link_lookup(ifname="eth1")[0]
+ ipr.addr('add', index=idx, address="192.168.14.2", mask=24)
lldpd("-ss", "-ll")
with namespaces(1):
out = lldpcli("-f", "keyvalue", "show", "neighbors", "details")