#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
+"""
+YNL cli tool
+"""
+
import argparse
import json
import os
import sys
import textwrap
+# pylint: disable=no-name-in-module,wrong-import-position
sys.path.append(pathlib.Path(__file__).resolve().parent.as_posix())
from lib import YnlFamily, Netlink, NlError, SpecFamily
relative_schema_dir='../../../../Documentation/netlink'
def schema_dir():
+ """
+ Return the effective schema directory, preferring in-tree before
+ system schema directory.
+ """
script_dir = os.path.dirname(os.path.abspath(__file__))
schema_dir = os.path.abspath(f"{script_dir}/{relative_schema_dir}")
if not os.path.isdir(schema_dir):
return schema_dir
def spec_dir():
+ """
+ Return the effective spec directory, relative to the effective
+ schema directory.
+ """
spec_dir = schema_dir() + '/specs'
if not os.path.isdir(spec_dir):
raise Exception(f"Spec directory {spec_dir} does not exist")
class YnlEncoder(json.JSONEncoder):
+ """A custom encoder for emitting JSON with ynl-specific instance types"""
def default(self, obj):
if isinstance(obj, bytes):
return bytes.hex(obj)
print_attr_list(ynl, mode_spec['attributes'], attr_set)
+# pylint: disable=too-many-locals,too-many-branches,too-many-statements
def main():
+ """YNL cli tool"""
+
description = """
YNL CLI utility - a general purpose netlink utility that uses YAML
specs to drive protocol encoding and decoding.
# SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
+#
+# pylint: disable=missing-class-docstring, missing-function-docstring
+# pylint: disable=too-many-branches, too-many-locals, too-many-instance-attributes
+# pylint: disable=too-many-lines
+
+"""
+YAML Netlink Library
+
+An implementation of the genetlink and raw netlink protocols.
+"""
from collections import namedtuple
from enum import Enum
#
+# pylint: disable=too-few-public-methods
class Netlink:
# Netlink socket
SOL_NETLINK = 270
return msg
+# pylint: disable=too-few-public-methods
class NlMsgs:
def __init__(self, data):
self.msgs = []
return struct.pack("I", len(msg) + 4) + msg
+# pylint: disable=too-many-nested-blocks
def _genl_load_families():
with socket.socket(socket.AF_NETLINK, socket.SOCK_RAW, Netlink.NETLINK_GENERIC) as sock:
sock.setsockopt(Netlink.SOL_NETLINK, Netlink.NETLINK_CAP_ACK, 1)
return super().msghdr_size() + 4
+# pylint: disable=too-few-public-methods
class SpaceAttrs:
SpecValuesPair = namedtuple('SpecValuesPair', ['spec', 'values'])
return self._from_string(value, attr_spec)
raise e
+ # pylint: disable=too-many-statements
def _add_attr(self, space, name, value, search_attrs):
try:
attr = self.attr_sets[space][name]
raise Exception(f"Unknown attribute-set '{msg_format.attr_set}' when decoding '{attr_spec.name}'")
return decoded
+ # pylint: disable=too-many-statements
def _decode(self, attrs, space, outer_attrs = None):
rsp = dict()
if space:
return rsp
+ # pylint: disable=too-many-arguments, too-many-positional-arguments
def _decode_extack_path(self, attrs, attr_set, offset, target, search_attrs):
for attr in attrs:
try:
msg = _genl_msg_finalize(msg)
return msg
+ # pylint: disable=too-many-statements
def _ops(self, ops):
reqs_by_seq = {}
req_seq = random.randint(1024, 65535)