from enum import Enum
from functools import partial
from io import IOBase
-from typing import Any, Dict, List, Optional, Tuple, Union, Type, cast
+from typing import Any, Dict, List, Optional, Tuple, Union, cast
# Third-party imports
from ctypes import CDLL, get_errno
fwmark: Optional[int] = 0 # Optional firewall mark
table: Optional[int] = None # Routing table ID
-from typing import Callable
-
class LifecycleException(Exception):
pass
**kwargs
))
else:
- assert len(kwargs) == 0
+ assert len(kwargs) == 0 # noqa: B101
self._routes.append(prefix_or_route)
@config_method
'''
assert self._phase == Lifecycle.CONFIG, \
- "Repeated calls to create() on a single network object"
+ "Repeated calls to create() on a single network object" # noqa: B101
+
for name, host in self._hosts.items():
host.netns_name = f'{self.name}.{name}'
associated resources
'''
assert self._phase == Lifecycle.RUNTIME, \
- "Network not setup"
+ "Network not setup" # noqa: B101
+
destroy_network(self)
self._phase = Lifecycle.CONFIG
# Enable tracing
MTR_NETEM_TRACE = len(os.getenv('MTR_NETEM_TRACE', '')) > 0
+
+def find_ip_command() -> Optional[str]:
+ '''
+ Search for the location of the `ip` command in common directories.
+ '''
+ # List of possible locations where the `ip` command might be located
+ possible_locations = [
+ "/usr/bin/ip",
+ "/sbin/ip",
+ "/usr/sbin/ip",
+ "/bin/ip"
+ ]
+
+ # Loop through the possible locations
+ for location in possible_locations:
+ # Check if the file exists and is executable
+ if os.path.isfile(location) and os.access(location, os.X_OK):
+ return location
+
+ return None
+
def run_cmd(*args, **kargs):
'''
Execute a shell command.
with open(path, 'w') as file:
file.write(str(value))
except Exception as e:
- raise RuntimeError('Failed to configure kernel option: {str(e)}')
+ raise RuntimeError(f'Failed to configure kernel option: {str(e)}')
def set_interface_rpfiler(intf_name : str, rpfilter : Rpfilter):
'''
cmd = partial(run_cmd, check=True)
+ ip_cmd = cast(str, find_ip_command())
+
# Add host namespaces
host : Host
for host in net._hosts.values():
- cmd([ 'ip', 'netns', 'add', host.netns_name ], check=True)
+ cmd([ ip_cmd, 'netns', 'add', host.netns_name ], check=True)
link : Link
for link in net._links.values():
# Add a virtual ethernet link
cmd([
- 'ip', 'link', 'add', intfs[0].name,
+ ip_cmd, 'link', 'add', intfs[0].name,
'type', 'veth', 'peer', 'name',
intfs[1].name
])
netns_name = cast(str, host.netns_name)
# Move a end of the link pair into the host's network namespace
- cmd(['ip', 'link', 'set', intf_name, 'netns', netns_name])
+ cmd([ip_cmd, 'link', 'set', intf_name, 'netns', netns_name])
with NetNamespace(netns_name):
# Configure the reverse pass filter
# Add IP addresses to the link
for addr in intf.addresses:
- cmd(['ip', 'addr', 'add', addr, 'dev', intf_name])
+ cmd([ip_cmd, 'addr', 'add', addr, 'dev', intf_name])
# Activate the interface
- cmd(['ip', 'link', 'set', intf.name, 'up'])
+ cmd([ip_cmd, 'link', 'set', intf.name, 'up'])
for host in net._hosts.values():
# Add the host's routes
for route in host._routes:
- cmd(['ip', 'route', 'add', *route_spec(route)])
+ cmd([ip_cmd, 'route', 'add', *route_spec(route)])
# Add the host's policy-database rules
for rule in host._rules:
- cmd(['ip', 'rule', 'add', *rule_spec(rule)])
+ cmd([ip_cmd, 'rule', 'add', *rule_spec(rule)])
def destroy_network(net : Network):
'''
This will remove all network namespaces and associated resources created during network setup.
'''
+ ip_cmd = cast(str, find_ip_command())
+
host : Host
for host in net._hosts.values():
- run_cmd([ 'ip', 'netns', 'delete', host.netns_name ])
+ run_cmd([ip_cmd, 'netns', 'delete', host.netns_name])
+
+# /usr/bin/ip
+# /sbin/ip
def has_iproute() -> bool:
'''Test if the host has iproute2 ensuring `ip -V` returns 0'''
+ ip_cmd = find_ip_command()
+
+ if ip_cmd is None:
+ return False
+
try:
- ip_result = subprocess.run(['ip', '-V'], capture_output=True)
+ ip_result = subprocess.run([ip_cmd, '-V'], capture_output=True)
except:
return False