prev_entry = None
value_start = self.yaml.get('value-start', 0)
- self.entries = dict()
- self.entries_by_val = dict()
+ self.entries = {}
+ self.entries_by_val = {}
for entry in self.yaml['entries']:
e = self.new_entry(entry, prev_entry, value_start)
self.entries[e.name] = e
stream.seek(0)
spec = pyyaml.safe_load(stream)
+ self.fixed_header = None
self._resolution_list = []
super().__init__(self, spec)
self.msgs[op.name] = op
def find_operation(self, name):
- """
- For a given operation name, find and return operation spec.
- """
- for op in self.yaml['operations']['list']:
- if name == op['name']:
- return op
- return None
+ """
+ For a given operation name, find and return operation spec.
+ """
+ for op in self.yaml['operations']['list']:
+ if name == op['name']:
+ return op
+ return None
def resolve(self):
self.resolve_up(super())
self.extack = None
if self.nl_flags & Netlink.NLM_F_ACK_TLVS and extack_off:
- self.extack = dict()
+ self.extack = {}
extack_attrs = NlAttrs(self.raw[extack_off:])
for extack in extack_attrs:
if extack.type == Netlink.NLMSGERR_ATTR_MSG:
return self.nl_type
def __repr__(self):
- msg = f"nl_len = {self.nl_len} ({len(self.raw)}) nl_flags = 0x{self.nl_flags:x} nl_type = {self.nl_type}"
+ msg = (f"nl_len = {self.nl_len} ({len(self.raw)}) "
+ f"nl_flags = 0x{self.nl_flags:x} nl_type = {self.nl_type}")
if self.error:
msg += '\n\terror: ' + str(self.error)
if self.extack:
return
gm = GenlMsg(nl_msg)
- fam = dict()
+ fam = {}
for attr in NlAttrs(gm.raw):
if attr.type == Netlink.CTRL_ATTR_FAMILY_ID:
fam['id'] = attr.as_scalar('u16')
elif attr.type == Netlink.CTRL_ATTR_MAXATTR:
fam['maxattr'] = attr.as_scalar('u32')
elif attr.type == Netlink.CTRL_ATTR_MCAST_GROUPS:
- fam['mcast'] = dict()
+ fam['mcast'] = {}
for entry in NlAttrs(attr.raw):
mcast_name = None
mcast_id = None
self.nl = nl_msg
self.genl_cmd, self.genl_version, _ = struct.unpack_from("BBH", nl_msg.raw, 0)
self.raw = nl_msg.raw[4:]
+ self.raw_attrs = []
def cmd(self):
return self.genl_cmd
for single_value in value:
scalar += enum.entries[single_value].user_value(as_flags = True)
return scalar
- else:
- return enum.entries[value].user_value()
+ return enum.entries[value].user_value()
def _get_scalar(self, attr_spec, value):
try:
def _decode_unknown(self, attr):
if attr.is_nest:
return self._decode(NlAttrs(attr.raw), None)
- else:
- return attr.as_bin()
+ return attr.as_bin()
def _rsp_add(self, rsp, name, is_multi, decoded):
if is_multi is None:
# pylint: disable=too-many-statements
def _decode(self, attrs, space, outer_attrs = None):
- rsp = dict()
+ rsp = {}
+ search_attrs = {}
if space:
attr_space = self.attr_sets[space]
search_attrs = SpaceAttrs(attr_space, rsp, outer_attrs)
try:
if attr_spec["type"] == 'nest':
- subdict = self._decode(NlAttrs(attr.raw), attr_spec['nested-attributes'], search_attrs)
+ subdict = self._decode(NlAttrs(attr.raw),
+ attr_spec['nested-attributes'],
+ search_attrs)
decoded = subdict
elif attr_spec["type"] == 'string':
decoded = attr.as_strz()
format_ = NlAttr.get_format(m.type, m.byte_order)
size += format_.size
return size
- else:
- return 0
+ return 0
def _decode_struct(self, data, name):
members = self.consts[name].members
- attrs = dict()
+ attrs = {}
offset = 0
for m in members:
value = None
elif m.type == 'binary':
if m.struct:
if value is None:
- value = dict()
+ value = {}
attr_payload += self._encode_struct(m.struct, value)
else:
if value is None:
return raw
def handle_ntf(self, decoded):
- msg = dict()
+ msg = {}
if self.include_raw:
msg['raw'] = decoded
op = self.rsp_by_value[decoded.cmd()]
if decoded.cmd() in self.async_msg_ids:
self.handle_ntf(decoded)
continue
- else:
- print('Unexpected message: ' + repr(decoded))
- continue
+ print('Unexpected message: ' + repr(decoded))
+ continue
rsp_msg = self._decode(decoded.raw_attrs, op.attr_set.name)
if op.fixed_header: