# pylint: disable=no-name-in-module,wrong-import-position
sys.path.append(pathlib.Path(__file__).resolve().parent.as_posix())
-from lib import YnlFamily, Netlink, NlError, SpecFamily
+from lib import YnlFamily, Netlink, NlError, SpecFamily, SpecException, YnlException
SYS_SCHEMA_DIR='/usr/share/ynl'
RELATIVE_SCHEMA_DIR='../../../../Documentation/netlink'
if not os.path.isdir(schema_dir_):
schema_dir_ = SYS_SCHEMA_DIR
if not os.path.isdir(schema_dir_):
- raise Exception(f"Schema directory {schema_dir_} does not exist")
+ raise YnlException(f"Schema directory {schema_dir_} does not exist")
return schema_dir_
def spec_dir():
"""
spec_dir_ = schema_dir() + '/specs'
if not os.path.isdir(spec_dir_):
- raise Exception(f"Spec directory {spec_dir_} does not exist")
+ raise YnlException(f"Spec directory {spec_dir_} does not exist")
return spec_dir_
else:
spec = args.spec
if not os.path.isfile(spec):
- raise Exception(f"Spec file {spec} does not exist")
+ raise YnlException(f"Spec file {spec} does not exist")
if args.validate:
try:
SpecFamily(spec, args.schema)
- except Exception as error:
+ except SpecException as error:
print(error)
sys.exit(1)
return
#
+class YnlException(Exception):
+ pass
+
+
# pylint: disable=too-few-public-methods
class Netlink:
# Netlink socket
def as_auto_scalar(self, attr_type, byte_order=None):
if len(self.raw) != 4 and len(self.raw) != 8:
- raise Exception(f"Auto-scalar len payload be 4 or 8 bytes, got {len(self.raw)}")
+ raise YnlException(f"Auto-scalar len payload be 4 or 8 bytes, got {len(self.raw)}")
real_type = attr_type[0] + str(len(self.raw) * 8)
format_ = self.get_format(real_type, byte_order)
return format_.unpack(self.raw)[0]
def get_mcast_id(self, mcast_name, mcast_groups):
if mcast_name not in mcast_groups:
- raise Exception(f'Multicast group "{mcast_name}" not present in the spec')
+ raise YnlException(f'Multicast group "{mcast_name}" not present in the spec')
return mcast_groups[mcast_name].value
def msghdr_size(self):
def get_mcast_id(self, mcast_name, mcast_groups):
if mcast_name not in self.genl_family['mcast']:
- raise Exception(f'Multicast group "{mcast_name}" not present in the family')
+ raise YnlException(f'Multicast group "{mcast_name}" not present in the family')
return self.genl_family['mcast'][mcast_name]
def msghdr_size(self):
if name in scope.values:
return scope.values[name]
spec_name = scope.spec.yaml['name']
- raise Exception(
+ raise YnlException(
f"No value for '{name}' in attribute space '{spec_name}'")
- raise Exception(f"Attribute '{name}' not defined in any attribute-set")
+ raise YnlException(f"Attribute '{name}' not defined in any attribute-set")
#
self.yaml['protonum'])
else:
self.nlproto = GenlProtocol(self.yaml['name'])
- except KeyError:
- raise Exception(f"Family '{self.yaml['name']}' not supported by the kernel")
+ except KeyError as err:
+ raise YnlException(f"Family '{self.yaml['name']}' not supported by the kernel") from err
self._recv_dbg = False
# Note that netlink will use conservative (min) message size for
def _add_attr(self, space, name, value, search_attrs):
try:
attr = self.attr_sets[space][name]
- except KeyError:
- raise Exception(f"Space '{space}' has no attribute '{name}'")
+ except KeyError as err:
+ raise YnlException(f"Space '{space}' has no attribute '{name}'") from err
nl_type = attr.value
if attr.is_multi and isinstance(value, list):
format_ = NlAttr.get_format(attr.sub_type)
attr_payload = b''.join([format_.pack(x) for x in value])
else:
- raise Exception(f'Unknown type for binary attribute, value: {value}')
+ raise YnlException(f'Unknown type for binary attribute, value: {value}')
elif attr['type'] in NlAttr.type_formats or attr.is_auto_scalar:
scalar = self._get_scalar(attr, value)
if attr.is_auto_scalar:
attr_payload += self._add_attr(msg_format.attr_set,
subname, subvalue, sub_attrs)
else:
- raise Exception(f"Unknown attribute-set '{msg_format.attr_set}'")
+ raise YnlException(f"Unknown attribute-set '{msg_format.attr_set}'")
else:
- raise Exception(f'Unknown type at {space} {name} {value} {attr["type"]}')
+ raise YnlException(f'Unknown type at {space} {name} {value} {attr["type"]}')
return self._add_attr_raw(nl_type, attr_payload)
subattr = self._formatted_string(subattr, attr_spec.display_hint)
decoded.append(subattr)
else:
- raise Exception(f'Unknown {attr_spec["sub-type"]} with name {attr_spec["name"]}')
+ raise YnlException(f'Unknown {attr_spec["sub-type"]} with name {attr_spec["name"]}')
return decoded
def _decode_nest_type_value(self, attr, attr_spec):
def _resolve_selector(self, attr_spec, search_attrs):
sub_msg = attr_spec.sub_message
if sub_msg not in self.sub_msgs:
- raise Exception(f"No sub-message spec named {sub_msg} for {attr_spec.name}")
+ raise YnlException(f"No sub-message spec named {sub_msg} for {attr_spec.name}")
sub_msg_spec = self.sub_msgs[sub_msg]
selector = attr_spec.selector
value = search_attrs.lookup(selector)
if value not in sub_msg_spec.formats:
- raise Exception(f"No message format for '{value}' in sub-message spec '{sub_msg}'")
+ raise YnlException(f"No message format for '{value}' in sub-message spec '{sub_msg}'")
spec = sub_msg_spec.formats[value]
return spec, value
subdict = self._decode(NlAttrs(attr.raw, offset), msg_format.attr_set)
decoded.update(subdict)
else:
- raise Exception(f"Unknown attribute-set '{msg_format.attr_set}' when decoding '{attr_spec.name}'")
+ raise YnlException(f"Unknown attribute-set '{msg_format.attr_set}' "
+ f"when decoding '{attr_spec.name}'")
return decoded
# pylint: disable=too-many-statements
for attr in attrs:
try:
attr_spec = attr_space.attrs_by_val[attr.type]
- except (KeyError, UnboundLocalError):
+ except (KeyError, UnboundLocalError) as err:
if not self.process_unknown:
- raise Exception(f"Space '{space}' has no attribute with value '{attr.type}'")
+ raise YnlException(f"Space '{space}' has no attribute "
+ f"with value '{attr.type}'") from err
attr_name = f"UnknownAttr({attr.type})"
self._rsp_add(rsp, attr_name, None, self._decode_unknown(attr))
continue
decoded = self._decode_nest_type_value(attr, attr_spec)
else:
if not self.process_unknown:
- raise Exception(f'Unknown {attr_spec["type"]} with name {attr_spec["name"]}')
+ raise YnlException(f'Unknown {attr_spec["type"]} '
+ f'with name {attr_spec["name"]}')
decoded = self._decode_unknown(attr)
self._rsp_add(rsp, attr_spec["name"], attr_spec.is_multi, decoded)
for attr in attrs:
try:
attr_spec = attr_set.attrs_by_val[attr.type]
- except KeyError:
- raise Exception(f"Space '{attr_set.name}' has no attribute with value '{attr.type}'")
+ except KeyError as err:
+ raise YnlException(
+ f"Space '{attr_set.name}' has no attribute with value '{attr.type}'") from err
if offset > target:
break
if offset == target:
elif attr_spec['type'] == 'sub-message':
msg_format, value = self._resolve_selector(attr_spec, search_attrs)
if msg_format is None:
- raise Exception(f"Can't resolve sub-message of {attr_spec['name']} for extack")
+ raise YnlException(f"Can't resolve sub-message of "
+ f"{attr_spec['name']} for extack")
sub_attrs = self.attr_sets[msg_format.attr_set]
pathname += f"({value})"
else:
- raise Exception(f"Can't dive into {attr.type} ({attr_spec['name']}) for extack")
+ raise YnlException(f"Can't dive into {attr.type} ({attr_spec['name']}) for extack")
offset += 4
subpath = self._decode_extack_path(NlAttrs(attr.raw), sub_attrs,
offset, target, search_attrs)
mac_bytes = [int(x, 16) for x in string.split(':')]
else:
if len(string) % 2 != 0:
- raise Exception(f"Invalid MAC address format: {string}")
+ raise YnlException(f"Invalid MAC address format: {string}")
mac_bytes = [int(string[i:i+2], 16) for i in range(0, len(string), 2)]
raw = bytes(mac_bytes)
else:
- raise Exception(f"Display hint '{attr_spec.display_hint}' not implemented"
+ raise YnlException(f"Display hint '{attr_spec.display_hint}' not implemented"
f" when parsing '{attr_spec['name']}'")
return raw