RED = ''
ENDC = ''
+
def print_msg(reason, filename=None, lineno=None, color=None, errstr=None):
'''
Prints a message with nice colors, indicating file and line number.
'''
if filename and lineno:
- print (filename + ": " + color + "ERROR:" +
- Colors.ENDC + " line %d: %s" % (lineno + 1, reason))
+ print filename + ": " + color + "ERROR:" + Colors.ENDC + \
+ " line %d: %s" % (lineno + 1, reason)
else:
- print (color + "ERROR:" + Colors.ENDC + " %s" % (reason))
+ print color + "ERROR:" + Colors.ENDC + " %s" % reason
+
def print_error(reason, filename=None, lineno=None):
print_msg(reason, filename, lineno, Colors.RED, "ERROR:")
+
def print_warning(reason, filename=None, lineno=None):
print_msg(reason, filename, lineno, Colors.YELLOW, "WARNING:")
def print_differences_warning(filename, lineno, rule1, rule2, cmd):
reason = "'" + rule1 + "' mismatches '" + rule2 + "'"
print filename + ": " + Colors.YELLOW + "WARNING: " + Colors.ENDC + \
- "line: " + str(lineno + 1) + ": '" + cmd + "': " + reason
+ "line: " + str(lineno + 1) + ": '" + cmd + "': " + reason
def print_differences_error(filename, lineno, cmd):
reason = "Listing is broken."
- print filename + ": " + Colors.RED + "ERROR: " + Colors.ENDC + \
- "line: " + str(lineno + 1) + ": '" + cmd + "': " + reason
+ print filename + ": " + Colors.RED + "ERROR: " + Colors.ENDC + "line: " + \
+ str(lineno + 1) + ": '" + cmd + "': " + reason
def table_exist(table, filename, lineno):
'''
Adds a table.
'''
- ## We check if table exists.
+ # We check if table exists.
if table_exist(table, filename, lineno):
reason = "Table " + table[1] + " already exists"
print_error(reason, filename, lineno)
table_list.append(table)
- ## We add a new table
+ # We add a new table
cmd = NFT_BIN + " add table " + table[0] + " " + table[1]
ret = execute_cmd(cmd, filename, lineno)
table_list.remove(table)
return -1
- ## We check if table was added correctly.
+ # We check if table was added correctly.
if not table_exist(table, filename, lineno):
table_list.remove(table)
reason = "I have just added the table " + table[1] + \
- " but it does not exist. Giving up!"
+ " but it does not exist. Giving up!"
print_error(reason, filename, lineno)
return -1
table_info = " " + table[0] + " " + table[1] + " "
if not table_exist(table, filename, lineno):
- reason = "Table " + table[1] + \
- " does not exist but I added it before."
+ reason = "Table " + table[1] + " does not exist but I added it before."
print_error(reason, filename, lineno)
return -1
cmd = NFT_BIN + " delete table" + table_info
ret = execute_cmd(cmd, filename, lineno)
if ret != 0:
- reason = cmd + ": " \
- "I cannot delete table '" + table[1] + "'. Giving up! "
+ reason = cmd + ": " + "I cannot delete table '" + table[1] + \
+ "'. Giving up! "
print_error(reason, filename, lineno)
return -1
if table_exist(table, filename, lineno):
reason = "I have just deleted the table " + table[1] + \
- " but the table still exists."
+ " but the table still exists."
print_error(reason, filename, lineno)
return -1
'''
Checks a chain
'''
-
table_info = " " + table[0] + " " + table[1] + " "
cmd = NFT_BIN + " list -nnn chain" + table_info + chain
ret = execute_cmd(cmd, filename, lineno)
'''
Adds a chain
'''
-
table_info = " " + table[0] + " " + table[1] + " "
if chain_exist(chain, table, filename, lineno):
- reason = "This chain '" + chain + "' exists in " + table[1] + "." + \
- "I cannot create two chains with same name."
+ reason = "This chain '" + chain + "' exists in " + table[1] + \
+ ". I cannot create two chains with same name."
print_error(reason, filename, lineno)
return -1
if chain_type:
- cmd = NFT_BIN + " add chain" + table_info + chain + "\{ " + chain_type + "\; \}"
+ cmd = NFT_BIN + " add chain" + table_info + chain + "\{ " + \
+ chain_type + "\; \}"
else:
cmd = NFT_BIN + " add chain" + table_info + chain
print_error(reason, filename, lineno)
return -1
- if not chain in chain_list:
+ if chain not in chain_list:
chain_list.append(chain)
if not chain_exist(chain, table, filename, lineno):
reason = "I have added the chain '" + chain + \
- "' but it does not exist in " + table[1]
+ "' but it does not exist in " + table[1]
print_error(reason, filename, lineno)
return -1
return 0
-def chain_delete(chain, table, filename=None, lineno=None):
+def chain_delete(chain, table, filename=None, lineno=None):
'''
Flushes and deletes a chain.
'''
-
table_info = " " + table[0] + " " + table[1] + " "
if not chain_exist(chain, table, filename, lineno):
reason = "The chain " + chain + " does not exists in " + table[1] + \
- ". I cannot delete it."
+ ". I cannot delete it."
print_error(reason, filename, lineno)
return -1
if chain_exist(chain, table, filename, lineno):
reason = "The chain " + chain + " exists in " + table[1] + \
- ". I cannot delete this chain"
+ ". I cannot delete this chain"
print_error(reason, filename, lineno)
return -1
'''
Adds a set.
'''
-
if not table_list:
reason = "Missing table to add rule"
print_error(reason, filename, lineno)
for table in table_list:
if set_exist(set_info[0], table, filename, lineno):
reason = "This set " + set_info + " exists in " + table[1] + \
- ". I cannot add it again"
+ ". I cannot add it again"
print_error(reason, filename, lineno)
return -1
ret = execute_cmd(cmd, filename, lineno)
if (ret == 0 and set_info[2].rstrip() == "fail") or \
- (ret != 0 and set_info[2].rstrip() == "ok"):
- reason = cmd + ": " + "I cannot add the set " + set_info[0]
- print_error(reason, filename, lineno)
- return -1
+ (ret != 0 and set_info[2].rstrip() == "ok"):
+ reason = cmd + ": " + "I cannot add the set " + set_info[0]
+ print_error(reason, filename, lineno)
+ return -1
if not set_exist(set_info[0], table, filename, lineno):
reason = "I have just added the set " + set_info[0] + \
- " to the table " + table[1] + " but it does not exist"
+ " to the table " + table[1] + " but it does not exist"
print_error(reason, filename, lineno)
return -1
'''
Adds elements to the set.
'''
-
if not table_list:
reason = "Missing table to add rules"
print_error(reason, filename, lineno)
for table in table_list:
# Check if set exists.
if (not set_exist(set_name, table, filename, lineno) or
- not set_name in set_all) and state == "ok":
+ set_name not in set_all) and state == "ok":
reason = "I cannot add an element to the set " + set_name + \
- " since it does not exist."
+ " since it does not exist."
print_error(reason, filename, lineno)
return -1
ret = execute_cmd(cmd, filename, lineno)
if (state == "fail" and ret == 0) or (state == "ok" and ret != 0):
- test_state = "This rule should have failed."
- reason = cmd + ": " + test_state
- print_error(reason, filename, lineno)
- return -1
+ test_state = "This rule should have failed."
+ reason = cmd + ": " + test_state
+ print_error(reason, filename, lineno)
+ return -1
# Add element into a all_set.
- if (ret == 0 and state == "ok"):
+ if ret == 0 and state == "ok":
for e in set_element:
set_all[set_name].add(e)
ret = execute_cmd(cmd, filename, lineno)
if ret != 0:
reason = "I cannot delete an element" + element + \
- " from the set '" + set_name
+ " from the set '" + set_name
print_error(reason, filename, lineno)
return -1
'''
Deletes set and its content.
'''
-
for set_name in all_set.keys():
# Check if exists the set
if not set_exist(set_name, table, filename, lineno):
reason = "The set " + set_name + \
- " does not exist, I cannot delete it"
+ " does not exist, I cannot delete it"
print_error(reason, filename, lineno)
return -1
end1 = rule1.find("}")
end2 = rule2.find("}")
- if ((pos1 != -1) and (pos2 != -1) and (end1 != -1) and (end2 != -1)):
+ if (pos1 != -1) and (pos2 != -1) and (end1 != -1) and (end2 != -1):
list1 = (rule1[pos1 + 1:end1].replace(" ", "")).split(",")
list2 = (rule2[pos2 + 1:end2].replace(" ", "")).split(",")
list1.sort()
list2.sort()
- if (cmp(list1, list2) == 0):
+ if cmp(list1, list2) == 0:
ret = 0
return ret
set = set.split(";")[2].strip() + "}"
return set
else:
- rule = pre_rule.split(";")[2].replace("\t", "").replace("\n", "").strip()
+ rule = pre_rule.split(";")[2].replace("\t", "").replace("\n", "").\
+ strip()
if len(rule) < 0:
return ""
return rule
+
def payload_check_elems_to_set(elems):
newset = set()
return newset
-def payload_check_set_elems(want, got):
+def payload_check_set_elems(want, got):
if want.find('element') < 0 or want.find('[end]') < 0:
return 0
return set_want == set_got
-def payload_check(payload_buffer, file, cmd):
+def payload_check(payload_buffer, file, cmd):
file.seek(0, 0)
i = 0
if payload_check_set_elems(want_line, line):
continue
- print_differences_warning(file.name, lineno, want_line.strip(), line.strip(), cmd);
+ print_differences_warning(file.name, lineno, want_line.strip(),
+ line.strip(), cmd)
return 0
return i > 0
+
def rule_add(rule, table_list, chain_list, filename, lineno,
force_all_family_option, filename_path):
'''
for table in table_list:
try:
payload_log = open("%s.payload.%s" % (filename_path, table[0]))
- except (IOError):
+ except IOError:
payload_log = open("%s.payload" % filename_path)
if rule[1].strip() == "ok":
try:
payload_expected.index(rule[0])
- except (ValueError):
+ except ValueError:
payload_expected = payload_find_expected(payload_log, rule[0])
- if payload_expected == []:
- print_error("did not find payload information for rule '%s'" % rule[0], payload_log.name, 1)
+ if not payload_expected:
+ print_error("did not find payload information for "
+ "rule '%s'" % rule[0], payload_log.name, 1)
for chain in chain_list:
unit_tests += 1
table_flush(table, filename, lineno)
table_info = " " + table[0] + " " + table[1] + " "
- payload_log = os.tmpfile();
+ payload_log = os.tmpfile()
- cmd = NFT_BIN + " add rule --debug=netlink" + table_info + chain + " " + rule[0]
+ cmd = NFT_BIN + " add rule --debug=netlink" + table_info + chain + \
+ " " + rule[0]
ret = execute_cmd(cmd, filename, lineno, payload_log)
state = rule[1].rstrip()
if not force_all_family_option:
return [ret, warning, error, unit_tests]
- if (state == "fail" and ret != 0):
+ if state == "fail" and ret != 0:
ret = 0
continue
if ret == 0:
- # Check for matching payload
- if state == "ok" and not payload_check(payload_expected, payload_log, cmd):
+ # Check for matching payload
+ if state == "ok" and not payload_check(payload_expected,
+ payload_log, cmd):
error += 1
gotf = open("%s.payload.got" % filename_path, 'a')
payload_log.seek(0, 0)
break
gotf.write(line)
gotf.close()
- print_warning("Wrote payload for rule %s" % rule[0], gotf.name, 1)
-
- # Check output of nft
- process = subprocess.Popen([NFT_BIN, '-nnn', 'list', 'table'] + table,
- shell=False, stdout=subprocess.PIPE,
+ print_warning("Wrote payload for rule %s" % rule[0],
+ gotf.name, 1)
+
+ # Check output of nft
+ process = subprocess.Popen([NFT_BIN, '-nnn', 'list', 'table'] +
+ table,
+ shell=False,
+ stdout=subprocess.PIPE,
preexec_fn=preexec)
pre_output = process.communicate()
output = pre_output[0].split(";")
return [ret, warning, error, unit_tests]
else:
rule_output = output_clean(pre_output, chain)
- if (len(rule) == 3):
+ if len(rule) == 3:
teoric_exit = rule[2]
else:
teoric_exit = rule[0]
- if (rule_output.rstrip() != teoric_exit.rstrip()):
- if (rule[0].find("{") != -1): # anonymous sets
- if (set_check_element(teoric_exit, rule_output) != 0):
+ if rule_output.rstrip() != teoric_exit.rstrip():
+ if rule[0].find("{") != -1: # anonymous sets
+ if set_check_element(teoric_exit, rule_output) != 0:
warning += 1
print_differences_warning(filename, lineno,
rule[0], rule_output,
else:
if len(rule_output) <= 0:
error += 1
- print_differences_error(filename, lineno,
- cmd)
+ print_differences_error(filename, lineno, cmd)
if not force_all_family_option:
return [ret, warning, error, unit_tests]
warning += 1
print_differences_warning(filename, lineno,
- teoric_exit.rstrip(), rule_output,
- cmd)
+ teoric_exit.rstrip(),
+ rule_output, cmd)
if not force_all_family_option:
return [ret, warning, error, unit_tests]
signal_received = 1
-def execute_cmd(cmd, filename, lineno, stdout_log = False):
+def execute_cmd(cmd, filename, lineno, stdout_log=False):
'''
Executes a command, checks for segfaults and returns the command exit
code.
def print_result(filename, tests, warning, error):
- return str(filename) + ": " + str(tests) + " unit tests, " + \
- str(error) + " error, " + str(warning) + " warning"
+ return str(filename) + ": " + str(tests) + " unit tests, " + str(error) + \
+ " error, " + str(warning) + " warning"
def print_result_all(filename, tests, warning, error, unit_tests):
- return str(filename) + ": " + str(tests) + " unit tests, " +\
- str(unit_tests) + " total test executed, " + \
- str(error) + " error, " + \
- str(warning) + " warning"
+ return str(filename) + ": " + str(tests) + " unit tests, " + \
+ str(unit_tests) + " total test executed, " + str(error) + \
+ " error, " + str(warning) + " warning"
def table_process(table_line, filename, lineno):
+ table_info = []
if ";" in table_line:
table_info = table_line.split(";")
else:
for table in table_list:
if len(chain_line) > 1:
chain_type = chain_line[1]
- ret = chain_create(chain_name, chain_type, chain_list, table,
- filename, lineno)
+ ret = chain_create(chain_name, chain_type, chain_list, table, filename,
+ lineno)
if ret != 0:
return -1
- return ret
+ return 0
def set_process(set_line, filename, lineno):
return set_add_elements(set_element, set_name, all_set, rule_state,
table_list, filename, lineno)
+
def payload_find_expected(payload_log, rule):
'''
- Find the netlink payload that should be generated by given rule in payload_log
+ Find the netlink payload that should be generated by given rule in
+ payload_log
:param payload_log: open file handle of the payload data
:param rule: nft rule we are going to add
payload_log.seek(0, 0)
return payload_buffer
+
def run_test_file(filename, force_all_family_option, specific_file):
'''
Runs a test file
if line[0] == '*': # Table
table_line = line.rstrip()[1:]
ret = table_process(table_line, filename, lineno)
- if (ret != 0):
+ if ret != 0:
break
continue
# Rule
rule = line.split(';') # rule[1] Ok or FAIL
- if len(rule) == 1 or len(rule) > 3 or rule[1].rstrip() not in {"ok", "fail"}:
+ if len(rule) == 1 or len(rule) > 3 or rule[1].rstrip() \
+ not in {"ok", "fail"}:
reason = "Skipping malformed rule test. (" + line.rstrip('\n') + ")"
print_warning(reason, filename, lineno)
continue
for table in table_list:
# We delete chains
for chain in chain_list:
- ret = chain_delete(chain, table, filename, lineno)
- if ret != 0:
+ chain_delete(chain, table, filename, lineno)
# We delete sets.
if all_set:
else:
print print_result(filename, tests, total_warning, total_error)
else:
- if (tests == passed and tests > 0):
+ if tests == passed and tests > 0:
print filename + ": " + Colors.GREEN + "OK" + Colors.ENDC
f.close()
def main():
- parser = argparse.ArgumentParser(description='Run nft tests',
- version='1.0')
+ parser = argparse.ArgumentParser(description='Run nft tests', version='1.0')
- parser.add_argument('filename', nargs='?',
- metavar='path/to/file.t',
+ parser.add_argument('filename', nargs='?', metavar='path/to/file.t',
help='Run only this test')
- parser.add_argument('-d', '--debug', action='store_true',
- dest='debug',
+ parser.add_argument('-d', '--debug', action='store_true', dest='debug',
help='enable debugging mode')
parser.add_argument('-e', '--need-fix', action='store_true',
- dest='need_fix_line',
- help='run rules that need a fix')
+ dest='need_fix_line', help='run rules that need a fix')
parser.add_argument('-f', '--force-family', action='store_true',
dest='force_all_family',
else:
if not specific_file:
if force_all_family_option:
- print ("%d test files, %d files passed, %d unit tests, %d total executed, %d error, %d warning" %
- (test_files, files_ok, tests, run_total, errors, warnings))
+ print "%d test files, %d files passed, %d unit tests, " \
+ "%d total executed, %d error, %d warning" \
+ % (test_files, files_ok, tests, run_total, errors,
+ warnings)
else:
- print ("%d test files, %d files passed, %d unit tests, %d error, %d warning" %
- (test_files, files_ok, tests, errors, warnings))
+ print "%d test files, %d files passed, %d unit tests, " \
+ "%d error, %d warning" \
+ % (test_files, files_ok, tests, errors, warnings)
+
if __name__ == '__main__':
main()