import signal
TERMINAL_PATH = os.getcwd()
+NFT_BIN = TERMINAL_PATH + "/src/nft"
TESTS_PATH = os.path.dirname(os.path.abspath(__file__))
TESTS_DIRECTORY = ["any", "arp", "bridge", "inet", "ip", "ip6"]
LOGFILE = "/tmp/nftables-test.log"
'''
Exists a table.
'''
- cmd = "nft list -nnn table " + table[0] + " " + table[1]
+ cmd = NFT_BIN + " list -nnn table " + table[0] + " " + table[1]
ret = execute_cmd(cmd, filename, lineno)
return True if (ret == 0) else False
'''
Flush a table.
'''
- cmd = "nft flush table " + str(table[0]) + " " + str(table[1])
+ cmd = NFT_BIN + " flush table " + str(table[0]) + " " + str(table[1])
ret = execute_cmd(cmd, filename, lineno)
return cmd
table_list.append(table)
## We add a new table
- cmd = "nft add table " + table[0] + " " + table[1]
+ cmd = NFT_BIN + " add table " + table[0] + " " + table[1]
ret = execute_cmd(cmd, filename, lineno)
if ret != 0:
print_error(reason, filename, lineno)
return -1
- cmd = "nft delete table" + table_info
+ cmd = NFT_BIN + " delete table" + table_info
ret = execute_cmd(cmd, filename, lineno)
if ret != 0:
reason = cmd + ": " \
'''
table_info = " " + table[0] + " " + table[1] + " "
- cmd = "nft list -nnn chain" + table_info + chain
+ cmd = NFT_BIN + " list -nnn chain" + table_info + chain
ret = execute_cmd(cmd, filename, lineno)
return True if (ret == 0) else False
return -1
if chain_type:
- cmd = "nft add chain" + table_info + chain + "\{ " + chain_type + "\; \}"
+ cmd = NFT_BIN + " add chain" + table_info + chain + "\{ " + chain_type + "\; \}"
else:
- cmd = "nft add chain" + table_info + chain
+ cmd = NFT_BIN + " add chain" + table_info + chain
ret = execute_cmd(cmd, filename, lineno)
if ret != 0:
print_error(reason, filename, lineno)
return -1
- cmd = "nft flush chain" + table_info + chain
+ cmd = NFT_BIN + " flush chain" + table_info + chain
ret = execute_cmd(cmd, filename, lineno)
if ret != 0:
reason = "I cannot flush this chain " + chain
print_error(reason, filename, lineno)
return -1
- cmd = "nft delete chain" + table_info + chain
+ cmd = NFT_BIN + " delete chain" + table_info + chain
ret = execute_cmd(cmd, filename, lineno)
if ret != 0:
reason = cmd + "I cannot delete this chain. DD"
table_info = " " + table[0] + " " + table[1] + " "
set_text = " " + set_info[0] + " { type " + set_info[1] + " \;}"
- cmd = "nft add set" + table_info + set_text
+ cmd = NFT_BIN + " add set" + table_info + set_text
ret = execute_cmd(cmd, filename, lineno)
if (ret == 0 and set_info[2].rstrip() == "fail") or \
element = element + ", " + e
set_text = set_name + " { " + element + " }"
- cmd = "nft add element" + table_info + set_text
+ cmd = NFT_BIN + " add element" + table_info + set_text
ret = execute_cmd(cmd, filename, lineno)
if (state == "fail" and ret == 0) or (state == "ok" and ret != 0):
for element in set_element:
set_text = set_name + " {" + element + "}"
- cmd = "nft delete element" + table_info + set_text
+ cmd = NFT_BIN + " delete element" + table_info + set_text
ret = execute_cmd(cmd, filename, lineno)
if ret != 0:
reason = "I cannot delete an element" + element + \
# We delete the set.
table_info = " " + table[0] + " " + table[1] + " "
- cmd = "nft delete set " + table_info + " " + set_name
+ cmd = NFT_BIN + " delete set " + table_info + " " + set_name
ret = execute_cmd(cmd, filename, lineno)
# Check if the set still exists after I deleted it.
Check if the set exists.
'''
table_info = " " + table[0] + " " + table[1] + " "
- cmd = "nft list -nnn set" + table_info + set_name
+ cmd = NFT_BIN + " list -nnn set" + table_info + set_name
ret = execute_cmd(cmd, filename, lineno)
return True if (ret == 0) else False
unit_tests += 1
table_flush(table, filename, lineno)
table_info = " " + table[0] + " " + table[1] + " "
- cmd = "nft add rule" + table_info + chain + " " + rule[0]
+ cmd = NFT_BIN + " add rule" + table_info + chain + " " + rule[0]
payload_log = os.tmpfile();
- cmd = "nft add rule --debug=netlink" + table_info + chain + " " + rule[0]
+ cmd = NFT_BIN + " add rule --debug=netlink" + table_info + chain + " " + rule[0]
ret = execute_cmd(cmd, filename, lineno, payload_log)
state = rule[1].rstrip()
print_warning("Wrote payload for rule %s" % rule[0], gotf.name, 1)
# Check output of nft
- process = subprocess.Popen(['nft', '-nnn', 'list', 'table'] + table,
+ process = subprocess.Popen([NFT_BIN, '-nnn', 'list', 'table'] + table,
shell=False, stdout=subprocess.PIPE,
preexec_fn=preexec)
pre_output = process.communicate()