sys.path.append(pathlib.Path(__file__).resolve().parent.as_posix())
from lib import YnlFamily, Netlink, NlError, SpecFamily
-sys_schema_dir='/usr/share/ynl'
-relative_schema_dir='../../../../Documentation/netlink'
+SYS_SCHEMA_DIR='/usr/share/ynl'
+RELATIVE_SCHEMA_DIR='../../../../Documentation/netlink'
def schema_dir():
"""
system schema directory.
"""
script_dir = os.path.dirname(os.path.abspath(__file__))
- schema_dir = os.path.abspath(f"{script_dir}/{relative_schema_dir}")
- if not os.path.isdir(schema_dir):
- schema_dir = sys_schema_dir
- if not os.path.isdir(schema_dir):
- raise Exception(f"Schema directory {schema_dir} does not exist")
- return schema_dir
+ schema_dir_ = os.path.abspath(f"{script_dir}/{RELATIVE_SCHEMA_DIR}")
+ if not os.path.isdir(schema_dir_):
+ schema_dir_ = SYS_SCHEMA_DIR
+ if not os.path.isdir(schema_dir_):
+ raise Exception(f"Schema directory {schema_dir_} does not exist")
+ return schema_dir_
def spec_dir():
"""
Return the effective spec directory, relative to the effective
schema directory.
"""
- spec_dir = schema_dir() + '/specs'
- if not os.path.isdir(spec_dir):
- raise Exception(f"Spec directory {spec_dir} does not exist")
- return spec_dir
+ spec_dir_ = schema_dir() + '/specs'
+ if not os.path.isdir(spec_dir_):
+ raise Exception(f"Spec directory {spec_dir_} does not exist")
+ return spec_dir_
class YnlEncoder(json.JSONEncoder):
"""A custom encoder for emitting JSON with ynl-specific instance types"""
- def default(self, obj):
- if isinstance(obj, bytes):
- return bytes.hex(obj)
- if isinstance(obj, set):
- return list(obj)
- return json.JSONEncoder.default(self, obj)
+ def default(self, o):
+ if isinstance(o, bytes):
+ return bytes.hex(o)
+ if isinstance(o, set):
+ return list(o)
+ return json.JSONEncoder.default(self, o)
def print_attr_list(ynl, attr_names, attr_set, indent=2):
SpecFamily(spec, args.schema)
except Exception as error:
print(error)
- exit(1)
+ sys.exit(1)
return
if args.family: # set behaviour when using installed specs
- if args.schema is None and spec.startswith(sys_schema_dir):
+ if args.schema is None and spec.startswith(SYS_SCHEMA_DIR):
args.schema = '' # disable schema validation when installed
if args.process_unknown is None:
args.process_unknown = True
op = ynl.msgs.get(args.list_attrs)
if not op:
print(f'Operation {args.list_attrs} not found')
- exit(1)
+ sys.exit(1)
print(f'Operation: {op.name}')
print(op.yaml['doc'])
output(msg)
except NlError as e:
print(e)
- exit(1)
+ sys.exit(1)
except KeyboardInterrupt:
pass
except BrokenPipeError:
import collections
import importlib
import os
-import yaml
+import yaml as pyyaml
# To be loaded dynamically as needed
self.formats = collections.OrderedDict()
for elem in self.yaml['formats']:
- format = self.new_format(family, elem)
- self.formats[format.value] = format
+ msg_format = self.new_format(family, elem)
+ self.formats[msg_format.value] = msg_format
- def new_format(self, family, format):
- return SpecSubMessageFormat(family, format)
+ def new_format(self, family, msg_format):
+ return SpecSubMessageFormat(family, msg_format)
class SpecSubMessageFormat(SpecElement):
kernel_family dict of kernel family attributes
"""
def __init__(self, spec_path, schema_path=None, exclude_ops=None):
- with open(spec_path, "r") as stream:
+ with open(spec_path, "r", encoding='utf-8') as stream:
prefix = '# SPDX-License-Identifier: '
first = stream.readline().strip()
if not first.startswith(prefix):
self.license = first[len(prefix):]
stream.seek(0)
- spec = yaml.safe_load(stream)
+ spec = pyyaml.safe_load(stream)
self._resolution_list = []
if schema_path:
global jsonschema
- with open(schema_path, "r") as stream:
- schema = yaml.safe_load(stream)
+ with open(schema_path, "r", encoding='utf-8') as stream:
+ schema = pyyaml.safe_load(stream)
if jsonschema is None:
jsonschema = importlib.import_module("jsonschema")
@classmethod
def get_format(cls, attr_type, byte_order=None):
- format = cls.type_formats[attr_type]
+ format_ = cls.type_formats[attr_type]
if byte_order:
- return format.big if byte_order == "big-endian" \
- else format.little
- return format.native
+ return format_.big if byte_order == "big-endian" \
+ else format_.little
+ return format_.native
def as_scalar(self, attr_type, byte_order=None):
- format = self.get_format(attr_type, byte_order)
- return format.unpack(self.raw)[0]
+ format_ = self.get_format(attr_type, byte_order)
+ return format_.unpack(self.raw)[0]
def as_auto_scalar(self, attr_type, byte_order=None):
if len(self.raw) != 4 and len(self.raw) != 8:
raise Exception(f"Auto-scalar len payload be 4 or 8 bytes, got {len(self.raw)}")
real_type = attr_type[0] + str(len(self.raw) * 8)
- format = self.get_format(real_type, byte_order)
- return format.unpack(self.raw)[0]
+ format_ = self.get_format(real_type, byte_order)
+ return format_.unpack(self.raw)[0]
def as_strz(self):
return self.raw.decode('ascii')[:-1]
def as_bin(self):
return self.raw
- def as_c_array(self, type):
- format = self.get_format(type)
- return [ x[0] for x in format.iter_unpack(self.raw) ]
+ def as_c_array(self, c_type):
+ format_ = self.get_format(c_type)
+ return [ x[0] for x in format_.iter_unpack(self.raw) ]
def __repr__(self):
return f"[type:{self.type} len:{self._len}] {self.raw}"
policy = {}
for attr in NlAttrs(raw):
if attr.type == Netlink.NL_POLICY_TYPE_ATTR_TYPE:
- type = attr.as_scalar('u32')
- policy['type'] = Netlink.AttrType(type).name
+ type_ = attr.as_scalar('u32')
+ policy['type'] = Netlink.AttrType(type_).name
elif attr.type == Netlink.NL_POLICY_TYPE_ATTR_MIN_VALUE_S:
policy['min-value'] = attr.as_scalar('s64')
elif attr.type == Netlink.NL_POLICY_TYPE_ATTR_MAX_VALUE_S:
elif isinstance(value, dict) and attr.struct_name:
attr_payload = self._encode_struct(attr.struct_name, value)
elif isinstance(value, list) and attr.sub_type in NlAttr.type_formats:
- format = NlAttr.get_format(attr.sub_type)
- attr_payload = b''.join([format.pack(x) for x in value])
+ format_ = NlAttr.get_format(attr.sub_type)
+ attr_payload = b''.join([format_.pack(x) for x in value])
else:
raise Exception(f'Unknown type for binary attribute, value: {value}')
elif attr['type'] in NlAttr.type_formats or attr.is_auto_scalar:
attr_type = attr["type"][0] + ('32' if scalar.bit_length() <= 32 else '64')
else:
attr_type = attr["type"]
- format = NlAttr.get_format(attr_type, attr.byte_order)
- attr_payload = format.pack(scalar)
+ format_ = NlAttr.get_format(attr_type, attr.byte_order)
+ attr_payload = format_.pack(scalar)
elif attr['type'] in "bitfield32":
scalar_value = self._get_scalar(attr, value["value"])
scalar_selector = self._get_scalar(attr, value["selector"])
else:
size += m.len
else:
- format = NlAttr.get_format(m.type, m.byte_order)
- size += format.size
+ format_ = NlAttr.get_format(m.type, m.byte_order)
+ size += format_.size
return size
else:
return 0
offset += m.len
elif m.type == 'binary':
if m.struct:
- len = self._struct_size(m.struct)
- value = self._decode_struct(data[offset : offset + len],
+ len_ = self._struct_size(m.struct)
+ value = self._decode_struct(data[offset : offset + len_],
m.struct)
- offset += len
+ offset += len_
else:
value = data[offset : offset + m.len]
offset += m.len
else:
- format = NlAttr.get_format(m.type, m.byte_order)
- [ value ] = format.unpack_from(data, offset)
- offset += format.size
+ format_ = NlAttr.get_format(m.type, m.byte_order)
+ [ value ] = format_.unpack_from(data, offset)
+ offset += format_.size
if value is not None:
if m.enum:
value = self._decode_enum(value, m)
else:
if value is None:
value = 0
- format = NlAttr.get_format(m.type, m.byte_order)
- attr_payload += format.pack(value)
+ format_ = NlAttr.get_format(m.type, m.byte_order)
+ attr_payload += format_.pack(value)
return attr_payload
def _formatted_string(self, raw, display_hint):