pass
-# pylint: disable=too-few-public-methods
class NlPolicy:
"""Kernel policy for one mode (do or dump) of one operation.
- Returned by YnlFamily.get_policy(). Contains a dict of attributes
- the kernel accepts, with their validation constraints.
-
- Attributes:
- attrs: dict mapping attribute names to policy dicts, e.g.
- page-pool-stats-get do policy::
-
- {
- 'info': {'type': 'nested', 'policy': {
- 'id': {'type': 'uint', 'min-value': 1,
- 'max-value': 4294967295},
- 'ifindex': {'type': 'u32', 'min-value': 1,
- 'max-value': 2147483647},
- }},
- }
-
- Each policy dict always contains 'type' (e.g. u32, string,
- nested). Optional keys: min-value, max-value, min-length,
- max-length, mask, policy.
+ Returned by YnlFamily.get_policy(). Attributes of the policy
+ are accessible as attributes of the object. Nested policies
+ can be accessed indexing the object like a dictionary::
+
+ pol = ynl.get_policy('page-pool-stats-get', 'do')
+ pol['info'].type # 'nested'
+ pol['info']['id'].type # 'uint'
+ pol['info']['id'].min_value # 1
+
+ Each policy entry always has a 'type' attribute (e.g. u32, string,
+ nested). Optional attributes depending on the 'type': min-value,
+ max-value, min-length, max-length, mask.
+
+ Policies can form infinite nesting loops. These loops are trimmed
+ when policy is converted to a dict with pol.to_dict().
"""
- def __init__(self, attrs):
- self.attrs = attrs
+ def __init__(self, ynl, policy_idx, policy_table, attr_set, props=None):
+ self._policy_idx = policy_idx
+ self._policy_table = policy_table
+ self._ynl = ynl
+ self._props = props or {}
+ self._entries = {}
+ self._cache = {}
+ if policy_idx is not None and policy_idx in policy_table:
+ for attr_id, decoded in policy_table[policy_idx].items():
+ if attr_set and attr_id in attr_set.attrs_by_val:
+ spec = attr_set.attrs_by_val[attr_id]
+ name = spec['name']
+ else:
+ spec = None
+ name = f'attr-{attr_id}'
+ self._entries[name] = (spec, decoded)
+
+ def __getitem__(self, name):
+ """Descend into a nested policy by attribute name."""
+ if name not in self._cache:
+ spec, decoded = self._entries[name]
+ props = dict(decoded)
+ child_idx = None
+ child_set = None
+ if 'policy-idx' in props:
+ child_idx = props.pop('policy-idx')
+ if spec and 'nested-attributes' in spec.yaml:
+ child_set = self._ynl.attr_sets[spec.yaml['nested-attributes']]
+ self._cache[name] = NlPolicy(self._ynl, child_idx,
+ self._policy_table,
+ child_set, props)
+ return self._cache[name]
+
+ def __getattr__(self, name):
+ """Access this policy entry's own properties (type, min-value, etc.).
+
+ Underscores in the name are converted to dashes, so that
+ pol.min_value looks up "min-value".
+ """
+ key = name.replace('_', '-')
+ try:
+ # Hack for level-0 which we still want to have .type but we don't
+ # want type to pointlessly show up in the dict / JSON form.
+ if not self._props and name == "type":
+ return "nested"
+ return self._props[key]
+ except KeyError:
+ raise AttributeError(name)
+
+ def get(self, name, default=None):
+ """Look up a child policy entry by attribute name, with a default."""
+ try:
+ return self[name]
+ except KeyError:
+ return default
+
+ def __contains__(self, name):
+ return name in self._entries
+
+ def __len__(self):
+ return len(self._entries)
+
+ def __iter__(self):
+ return iter(self._entries)
+
+ def keys(self):
+ """Return attribute names accepted by this policy."""
+ return self._entries.keys()
+
+ def to_dict(self, seen=None):
+ """Convert to a plain dict, suitable for JSON serialization.
+
+ Nested NlPolicy objects are expanded recursively. Cyclic
+ references are trimmed (resolved to just {"type": "nested"}).
+ """
+ if seen is None:
+ seen = set()
+ result = dict(self._props)
+ if self._policy_idx is not None:
+ if self._policy_idx not in seen:
+ seen = seen | {self._policy_idx}
+ children = {}
+ for name in self:
+ children[name] = self[name].to_dict(seen)
+ if self._props:
+ result['policy'] = children
+ else:
+ result = children
+ return result
+
+ def __repr__(self):
+ return repr(self.to_dict())
class NlAttr:
def do_multi(self, ops):
return self._ops(ops)
- def _resolve_policy(self, policy_idx, policy_table, attr_set):
- attrs = {}
- if policy_idx not in policy_table:
- return attrs
- for attr_id, decoded in policy_table[policy_idx].items():
- if attr_set and attr_id in attr_set.attrs_by_val:
- spec = attr_set.attrs_by_val[attr_id]
- name = spec['name']
- else:
- spec = None
- name = f'attr-{attr_id}'
- if 'policy-idx' in decoded:
- sub_set = None
- if spec and 'nested-attributes' in spec.yaml:
- sub_set = self.attr_sets[spec.yaml['nested-attributes']]
- nested = self._resolve_policy(decoded['policy-idx'],
- policy_table, sub_set)
- del decoded['policy-idx']
- decoded['policy'] = nested
- attrs[name] = decoded
- return attrs
-
def get_policy(self, op_name, mode):
"""Query running kernel for the Netlink policy of an operation.
mode: 'do' or 'dump'
Returns:
- NlPolicy with an attrs dict mapping attribute names to
- their policy properties (type, min/max, nested, etc.),
+ NlPolicy acting as a read-only dict mapping attribute names
+ to their policy properties (type, min/max, nested, etc.),
or None if the operation has no policy for the given mode.
Empty policy usually implies that the operation rejects
all attributes.
if mode not in op_policy:
return None
policy_idx = op_policy[mode]
- attrs = self._resolve_policy(policy_idx, policy_table, op.attr_set)
- return NlPolicy(attrs)
+ return NlPolicy(self, policy_idx, policy_table, op.attr_set)
# dev-get: do has a real policy with ifindex, dump has no policy
# (only the reject-all policy with maxattr=0)
pol = ndev.get_policy('dev-get', 'do')
- ksft_in('ifindex', pol.attrs,
- comment="dev-get do policy should have ifindex")
- ksft_eq(pol.attrs.get('ifindex', {}).get('type'), 'u32')
+ ksft_in('ifindex', pol, comment="dev-get do policy should have ifindex")
+ ksft_eq(pol['ifindex'].type, 'u32')
pol_dump = ndev.get_policy('dev-get', 'dump')
- ksft_eq(len(pol_dump.attrs), 0,
- comment="dev-get should not accept any attrs")
+ ksft_eq(len(pol_dump), 0, comment="dev-get should not accept any attrs")
# napi-get: both do and dump have real policies
pol_do = ndev.get_policy('napi-get', 'do')
- ksft_ge(len(pol_do.attrs), 1)
+ ksft_ge(len(pol_do), 1)
pol_dump = ndev.get_policy('napi-get', 'dump')
- ksft_ge(len(pol_dump.attrs), 1)
+ ksft_ge(len(pol_dump), 1)
# -- ethtool (full ops) --
et = EthtoolFamily()
# strset-get (has both do and dump, full ops share policy)
pol_do = et.get_policy('strset-get', 'do')
- ksft_ge(len(pol_do.attrs), 1, comment="strset-get should have a do policy")
+ ksft_ge(len(pol_do), 1, comment="strset-get should have a do policy")
pol_dump = et.get_policy('strset-get', 'dump')
- ksft_ge(len(pol_dump.attrs), 1,
- comment="strset-get should have a dump policy")
+ ksft_ge(len(pol_dump), 1, comment="strset-get should have a dump policy")
# Same policy means same attribute names
- ksft_eq(set(pol_do.attrs.keys()), set(pol_dump.attrs.keys()))
+ ksft_eq(set(pol_do.keys()), set(pol_dump.keys()))
# linkinfo-set is do-only (SET command), no dump
pol_do = et.get_policy('linkinfo-set', 'do')
- ksft_ge(len(pol_do.attrs), 1,
- comment="linkinfo-set should have a do policy")
+ ksft_ge(len(pol_do), 1, comment="linkinfo-set should have a do policy")
pol_dump = et.get_policy('linkinfo-set', 'dump')
ksft_eq(pol_dump, None,
# dev-get do policy should have named attributes from the spec
pol = ndev.get_policy('dev-get', 'do')
- ksft_ge(len(pol.attrs), 1)
+ ksft_ge(len(pol), 1)
# All attr names should be resolved (no 'attr-N' fallbacks)
- for name in pol.attrs:
+ for name in pol:
ksft_true(not name.startswith('attr-'),
comment=f"unresolved attr name: {name}")