import argparse
import signal
-TERMINAL_PATH = os.getcwd()
NFT_BIN = "src/nft"
TESTS_PATH = os.path.dirname(os.path.abspath(__file__))
TESTS_DIRECTORY = ["any", "arp", "bridge", "inet", "ip", "ip6"]
"line: " + str(lineno + 1) + ": '" + cmd + "': " + reason
-def print_differences_error(filename, lineno, output, cmd):
+def print_differences_error(filename, lineno, cmd):
reason = "Listing is broken."
print filename + ": " + Colors.RED + "ERROR: " + Colors.ENDC + \
"line: " + str(lineno + 1) + ": '" + cmd + "': " + reason
Flush a table.
'''
cmd = NFT_BIN + " flush table " + str(table[0]) + " " + str(table[1])
- ret = execute_cmd(cmd, filename, lineno)
+ execute_cmd(cmd, filename, lineno)
return cmd
def payload_check(payload_buffer, file, cmd):
file.seek(0, 0)
-
- ret = False
i = 0
for lineno, want_line in enumerate(payload_buffer):
unit_tests += 1
table_flush(table, filename, lineno)
table_info = " " + table[0] + " " + table[1] + " "
- cmd = NFT_BIN + " add rule" + table_info + chain + " " + rule[0]
payload_log = os.tmpfile();
if len(rule_output) <= 0:
error += 1
print_differences_error(filename, lineno,
- rule_output, cmd)
+ cmd)
if not force_all_family_option:
return [ret, warning, error, unit_tests]
def cleanup_on_exit():
for table in table_list:
for chain in chain_list:
- ret = chain_delete(chain, table, "", "")
+ chain_delete(chain, table, "", "")
if all_set:
- ret = set_delete(all_set, table)
- ret = table_delete(table)
+ set_delete(all_set, table)
+ table_delete(table)
def signal_handler(signal, frame):
:param rule: nft rule we are going to add
'''
found = 0
- pos = 0
payload_buffer = []
while True:
:param filename: name of the file with the test rules
'''
-
- if specific_file:
- filename_path = os.path.join(TERMINAL_PATH, filename)
- else:
- filename_path = os.path.join(TESTS_PATH, filename)
-
+ filename_path = os.path.join(TESTS_PATH, filename)
f = open(filename_path)
tests = passed = total_unit_run = total_warning = total_error = 0
- table = ""
- total_test_passed = True
for lineno, line in enumerate(f):
if signal_received == 1:
table_line = line.rstrip()[1:]
ret = table_process(table_line, filename, lineno)
if (ret != 0):
- total_test_passed = False
break
continue
chain_line = line.rstrip()[1:].split(";")
ret = chain_process(chain_line, filename, lineno)
if ret != 0:
- total_test_passed = False
break
continue
ret = set_process(set_line, filename, lineno)
tests += 1
if ret == -1:
- total_test_passed = False
continue
passed += 1
continue
ret = set_element_process(element_line, filename, lineno)
tests += 1
if ret == -1:
- total_test_passed = False
continue
passed += 1
total_unit_run += result[3]
if ret != 0:
- total_test_passed = False
continue
if warning == 0: # All ok.
for chain in chain_list:
ret = chain_delete(chain, table, filename, lineno)
if ret != 0:
- total_test_passed = False
# We delete sets.
if all_set:
ret = set_delete(all_set, table, filename, lineno)
if ret != 0:
- total_test_passed = False
reason = "There is a problem when we delete a set"
print_error(reason, filename, lineno)
# We delete tables.
- ret = table_delete(table, filename, lineno)
-
- if ret != 0:
- total_test_passed = False
+ table_delete(table, filename, lineno)
if specific_file:
if force_all_family_option: