import argparse
import signal
import json
+import traceback
TESTS_PATH = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.join(TESTS_PATH, '../../py/'))
else:
return json.dumps(json_obj, sort_keys = True)
+def json_validate(json_string):
+ json_obj = json.loads(json_string)
+ try:
+ nftables.json_validate(json_obj)
+ except Exception:
+ print_error("schema validation failed for input '%s'" % json_string)
+ print_error(traceback.format_exc())
def rule_add(rule, filename, lineno, force_all_family_option, filename_path):
'''
"expr": json.loads(json_input),
}}}]})
+ if enable_json_schema:
+ json_validate(cmd)
+
json_old = nftables.set_json_output(True)
ret = execute_cmd(cmd, filename, lineno, payload_log, debug="netlink")
nftables.set_json_output(json_old)
nftables.set_numeric_proto_output(numeric_proto_old)
nftables.set_stateless_output(stateless_old)
+ if enable_json_schema:
+ json_validate(json_output)
+
json_output = json.loads(json_output)
for item in json_output["nftables"]:
if "rule" in item:
dest='enable_json',
help='test JSON functionality as well')
+ parser.add_argument('-s', '--schema', action='store_true',
+ dest='enable_schema',
+ help='verify json input/output against schema')
+
args = parser.parse_args()
- global debug_option, need_fix_option, enable_json_option
+ global debug_option, need_fix_option, enable_json_option, enable_json_schema
debug_option = args.debug
need_fix_option = args.need_fix_line
force_all_family_option = args.force_all_family
enable_json_option = args.enable_json
+ enable_json_schema = args.enable_schema
specific_file = False
signal.signal(signal.SIGINT, signal_handler)
"You need to build the project."
return
+ if args.enable_schema and not args.enable_json:
+ print_error("Option --schema requires option --json")
+ return
+
global nftables
nftables = Nftables(sofile = 'src/.libs/libnftables.so')