]>
git.ipfire.org Git - thirdparty/iptables.git/blob - iptables-test.py
3 # (C) 2012-2013 by Pablo Neira Ayuso <pablo@netfilter.org>
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This software has been sponsored by Sophos Astaro <http://www.sophos.com>
13 from __future__
import print_function
20 IP6TABLES
= "ip6tables"
21 ARPTABLES
= "arptables"
24 IPTABLES_SAVE
= "iptables-save"
25 IP6TABLES_SAVE
= "ip6tables-save"
26 ARPTABLES_SAVE
= "arptables-save"
27 EBTABLES_SAVE
= "ebtables-save"
28 #IPTABLES_SAVE = ['xtables-save','-4']
29 #IP6TABLES_SAVE = ['xtables-save','-6']
31 EXTENSIONS_PATH
= "extensions"
32 LOGFILE
="/tmp/iptables-test.log"
35 STDOUT_IS_TTY
= sys
.stdout
.isatty()
36 STDERR_IS_TTY
= sys
.stderr
.isatty()
38 def maybe_colored(color
, text
, isatty
):
39 terminal_sequences
= {
45 terminal_sequences
[color
] + text
+ '\033[0m' if isatty
else text
49 def print_error(reason
, filename
=None, lineno
=None):
51 Prints an error with nice colors, indicating file and line number.
53 print(filename
+ ": " + maybe_colored('red', "ERROR", STDERR_IS_TTY
) +
54 ": line %d (%s)" % (lineno
, reason
), file=sys
.stderr
)
57 def delete_rule(iptables
, rule
, filename
, lineno
, netns
= None):
59 Removes an iptables rule
61 cmd
= iptables
+ " -D " + rule
62 ret
= execute_cmd(cmd
, filename
, lineno
, netns
)
64 reason
= "cannot delete: " + iptables
+ " -I " + rule
65 print_error(reason
, filename
, lineno
)
71 def run_test(iptables
, rule
, rule_save
, res
, filename
, lineno
, netns
):
73 Executes an unit test. Returns the output of delete_rule().
76 :param iptables: string with the iptables command to execute
77 :param rule: string with iptables arguments for the rule to test
78 :param rule_save: string to find the rule in the output of iptables-save
79 :param res: expected result of the rule. Valid values: "OK", "FAIL"
80 :param filename: name of the file tested (used for print_error purposes)
81 :param lineno: line number being tested (used for print_error purposes)
82 :param netns: network namespace to call commands in (or None)
86 cmd
= iptables
+ " -A " + rule
87 ret
= execute_cmd(cmd
, filename
, lineno
, netns
)
94 reason
= "cannot load: " + cmd
95 print_error(reason
, filename
, lineno
)
98 # do not report this error
102 reason
= "should fail: " + cmd
103 print_error(reason
, filename
, lineno
)
104 delete_rule(iptables
, rule
, filename
, lineno
, netns
)
108 tokens
= iptables
.split(" ")
110 if tokens
[1] == '-4':
111 command
= IPTABLES_SAVE
112 elif tokens
[1] == '-6':
113 command
= IP6TABLES_SAVE
114 elif len(tokens
) == 1:
115 if tokens
[0] == IPTABLES
:
116 command
= IPTABLES_SAVE
117 elif tokens
[0] == IP6TABLES
:
118 command
= IP6TABLES_SAVE
119 elif tokens
[0] == ARPTABLES
:
120 command
= ARPTABLES_SAVE
121 elif tokens
[0] == EBTABLES
:
122 command
= EBTABLES_SAVE
124 command
= EXECUTABLE
+ " " + command
127 command
= "ip netns exec " + netns
+ " " + command
130 proc
= subprocess
.Popen(command
, shell
=True,
131 stdin
=subprocess
.PIPE
,
132 stdout
=subprocess
.PIPE
, stderr
=subprocess
.PIPE
)
133 out
, err
= proc
.communicate()
136 # check for segfaults
138 if proc
.returncode
== -11:
139 reason
= "iptables-save segfaults: " + cmd
140 print_error(reason
, filename
, lineno
)
141 delete_rule(iptables
, rule
, filename
, lineno
, netns
)
145 matching
= out
.find(rule_save
.encode('utf-8'))
148 reason
= "cannot find: " + iptables
+ " -I " + rule
149 print_error(reason
, filename
, lineno
)
150 delete_rule(iptables
, rule
, filename
, lineno
, netns
)
153 # do not report this error
157 reason
= "should not match: " + cmd
158 print_error(reason
, filename
, lineno
)
159 delete_rule(iptables
, rule
, filename
, lineno
, netns
)
162 # Test "ip netns del NETNS" path with rules in place
166 return delete_rule(iptables
, rule
, filename
, lineno
)
168 def execute_cmd(cmd
, filename
, lineno
= 0, netns
= None):
170 Executes a command, checking for segfaults and returning the command exit
173 :param cmd: string with the command to be executed
174 :param filename: name of the file tested (used for print_error purposes)
175 :param lineno: line number being tested (used for print_error purposes)
176 :param netns: network namespace to run command in
179 if cmd
.startswith('iptables ') or cmd
.startswith('ip6tables ') or cmd
.startswith('ebtables ') or cmd
.startswith('arptables '):
180 cmd
= EXECUTABLE
+ " " + cmd
183 cmd
= "ip netns exec " + netns
+ " " + cmd
185 print("command: {}".format(cmd
), file=log_file
)
186 ret
= subprocess
.call(cmd
, shell
=True, universal_newlines
=True,
187 stderr
=subprocess
.STDOUT
, stdout
=log_file
)
190 # generic check for segfaults
192 reason
= "command segfaults: " + cmd
193 print_error(reason
, filename
, lineno
)
197 def variant_res(res
, variant
, alt_res
=None):
199 Adjust expected result with given variant
201 If expected result is scoped to a variant, the other one yields a different
202 result. Therefore map @res to itself if given variant is current, use the
203 alternate result, @alt_res, if specified, invert @res otherwise.
205 :param res: expected result from test spec ("OK", "FAIL" or "NOMATCH")
206 :param variant: variant @res is scoped to by test spec ("NFT" or "LEGACY")
207 :param alt_res: optional expected result for the alternate variant.
209 variant_executable
= {
210 "NFT": "xtables-nft-multi",
211 "LEGACY": "xtables-legacy-multi"
219 if variant_executable
[variant
] == EXECUTABLE
:
221 if alt_res
is not None:
223 return res_inverse
[res
]
225 def fast_run_possible(filename
):
227 Keep things simple, run only for simple test files:
228 - no external commands
230 - no variant-specific results
234 for line
in open(filename
):
235 if line
[0] in ["#", ":"] or len(line
.strip()) == 0:
238 if table
or rulecount
> 0:
240 table
= line
.rstrip()[1:]
241 if line
[0] in ["@", "%"]:
243 if len(line
.split(";")) > 3:
249 def run_test_file_fast(iptables
, filename
, netns
):
251 Run a test file, but fast
253 :param filename: name of the file with the test rules
254 :param netns: network namespace to perform test run in
264 for lineno
, line
in enumerate(f
):
265 if line
[0] == "#" or len(line
.strip()) == 0:
269 table
= line
.rstrip()[1:]
273 chain_array
= line
.rstrip()[1:].split(",")
276 if len(chain_array
) == 0:
281 for chain
in chain_array
:
282 item
= line
.split(";")
283 rule
= chain
+ " " + item
[0]
286 rule_save
= chain
+ " " + item
[0]
288 rule_save
= chain
+ " " + item
[1]
290 res
= item
[2].rstrip()
292 rule
= chain
+ " -t " + table
+ " " + item
[0]
293 ret
= run_test(iptables
, rule
, rule_save
,
294 res
, filename
, lineno
+ 1, netns
)
300 if not chain
in rules
.keys():
302 rules
[chain
].append((rule
, rule_save
))
304 restore_data
= ["*" + table
]
306 for chain
in ["PREROUTING", "INPUT", "FORWARD", "OUTPUT", "POSTROUTING"]:
307 if not chain
in rules
.keys():
309 for rule
in rules
[chain
]:
310 restore_data
.append("-A " + rule
[0])
311 out_expect
.append("-A " + rule
[1])
312 restore_data
.append("COMMIT")
314 out_expect
= "\n".join(out_expect
)
316 # load all rules via iptables_restore
318 command
= EXECUTABLE
+ " " + iptables
+ "-restore"
320 command
= "ip netns exec " + netns
+ " " + command
322 for line
in restore_data
:
323 print(iptables
+ "-restore: " + line
, file=log_file
)
325 proc
= subprocess
.Popen(command
, shell
= True, text
= True,
326 stdin
= subprocess
.PIPE
,
327 stdout
= subprocess
.PIPE
,
328 stderr
= subprocess
.PIPE
)
329 restore_data
= "\n".join(restore_data
) + "\n"
330 out
, err
= proc
.communicate(input = restore_data
)
332 if proc
.returncode
== -11:
333 reason
= iptables
+ "-restore segfaults: " + cmd
334 print_error(reason
, filename
, lineno
)
337 if proc
.returncode
!= 0:
338 print("%s-restore returned %d: %s" % (iptables
, proc
.returncode
, err
),
342 # find all rules in iptables_save output
344 command
= EXECUTABLE
+ " " + iptables
+ "-save"
346 command
= "ip netns exec " + netns
+ " " + command
348 proc
= subprocess
.Popen(command
, shell
= True,
349 stdin
= subprocess
.PIPE
,
350 stdout
= subprocess
.PIPE
,
351 stderr
= subprocess
.PIPE
)
352 out
, err
= proc
.communicate()
354 if proc
.returncode
== -11:
355 reason
= iptables
+ "-save segfaults: " + cmd
356 print_error(reason
, filename
, lineno
)
359 cmd
= iptables
+ " -F -t " + table
360 execute_cmd(cmd
, filename
, 0, netns
)
362 out
= out
.decode('utf-8').rstrip()
363 if out
.find(out_expect
) < 0:
364 msg
= ["dumps differ!"]
365 msg
.extend(["expect: " + l
for l
in out_expect
.split("\n")])
366 msg
.extend(["got: " + l
for l
in out
.split("\n")
367 if not l
[0] in ['*', ':', '#']])
368 print("\n".join(msg
), file=log_file
)
373 def run_test_file(filename
, netns
):
377 :param filename: name of the file with the test rules
378 :param netns: network namespace to perform test run in
381 # if this is not a test file, skip.
383 if not filename
.endswith(".t"):
386 if "libipt_" in filename
:
388 elif "libip6t_" in filename
:
390 elif "libxt_" in filename
:
392 elif "libarpt_" in filename
:
393 # only supported with nf_tables backend
394 if EXECUTABLE
!= "xtables-nft-multi":
397 elif "libebt_" in filename
:
398 # only supported with nf_tables backend
399 if EXECUTABLE
!= "xtables-nft-multi":
403 # default to iptables if not known prefix
407 if fast_run_possible(filename
):
408 tests
= run_test_file_fast(iptables
, filename
, netns
)
410 print(filename
+ ": " + maybe_colored('green', "OK", STDOUT_IS_TTY
))
420 total_test_passed
= True
423 execute_cmd("ip netns add " + netns
, filename
)
425 for lineno
, line
in enumerate(f
):
426 if line
[0] == "#" or len(line
.strip()) == 0:
430 chain_array
= line
.rstrip()[1:].split(",")
433 # external command invocation, executed as is.
434 # detects iptables commands to prefix with EXECUTABLE automatically
435 if line
[0] in ["@", "%"]:
436 external_cmd
= line
.rstrip()[1:]
437 execute_cmd(external_cmd
, filename
, lineno
, netns
)
441 table
= line
.rstrip()[1:]
444 if len(chain_array
) == 0:
445 print_error("broken test, missing chain",
446 filename
= filename
, lineno
= lineno
)
447 total_test_passed
= False
453 for chain
in chain_array
:
454 item
= line
.split(";")
456 rule
= chain
+ " " + item
[0]
458 rule
= chain
+ " -t " + table
+ " " + item
[0]
461 rule_save
= chain
+ " " + item
[0]
463 rule_save
= chain
+ " " + item
[1]
465 res
= item
[2].rstrip()
467 variant
= item
[3].rstrip()
469 alt_res
= item
[4].rstrip()
472 res
= variant_res(res
, variant
, alt_res
)
474 ret
= run_test(iptables
, rule
, rule_save
,
475 res
, filename
, lineno
+ 1, netns
)
479 total_test_passed
= False
486 execute_cmd("ip netns del " + netns
, filename
)
487 if total_test_passed
:
490 suffix
= maybe_colored('red', " but fast mode failed!", STDOUT_IS_TTY
)
491 print(filename
+ ": " + maybe_colored('green', "OK", STDOUT_IS_TTY
) + suffix
)
499 Show the list of missing test files
501 file_list
= os
.listdir(EXTENSIONS_PATH
)
502 testfiles
= [i
for i
in file_list
if i
.endswith('.t')]
503 libfiles
= [i
for i
in file_list
504 if i
.startswith('lib') and i
.endswith('.c')]
507 return x
[0:-2] + '.t'
508 missing
= [test_name(i
) for i
in libfiles
509 if not test_name(i
) in testfiles
]
511 print('\n'.join(missing
))
514 # prefer unshare module
517 unshare
.unshare(unshare
.CLONE_NEWNET
)
522 # sledgehammer style:
523 # - call ourselves prefixed by 'unshare -n' if found
524 # - pass extra --no-netns parameter to avoid another recursion
528 unshare
= shutil
.which("unshare")
532 sys
.argv
.append("--no-netns")
533 os
.execv(unshare
, [unshare
, "-n", sys
.executable
] + sys
.argv
)
543 parser
= argparse
.ArgumentParser(description
='Run iptables tests')
544 parser
.add_argument('filename', nargs
='*',
545 metavar
='path/to/file.t',
546 help='Run only this test')
547 parser
.add_argument('-H', '--host', action
='store_true',
548 help='Run tests against installed binaries')
549 parser
.add_argument('-l', '--legacy', action
='store_true',
550 help='Test iptables-legacy')
551 parser
.add_argument('-m', '--missing', action
='store_true',
552 help='Check for missing tests')
553 parser
.add_argument('-n', '--nftables', action
='store_true',
554 help='Test iptables-over-nftables')
555 parser
.add_argument('-N', '--netns', action
='store_const',
556 const
='____iptables-container-test',
557 help='Test netnamespace path')
558 parser
.add_argument('--no-netns', action
='store_true',
559 help='Do not run testsuite in own network namespace')
560 args
= parser
.parse_args()
563 # show list of missing test files
571 variants
.append("legacy")
573 variants
.append("nft")
574 if len(variants
) == 0:
575 variants
= [ "legacy", "nft" ]
578 print("You need to be root to run this, sorry", file=sys
.stderr
)
581 if not args
.netns
and not args
.no_netns
and not spawn_netns():
582 print("Cannot run in own namespace, connectivity might break",
586 os
.putenv("XTABLES_LIBDIR", os
.path
.abspath(EXTENSIONS_PATH
))
587 os
.putenv("PATH", "%s/iptables:%s" % (os
.path
.abspath(os
.path
.curdir
),
593 for variant
in variants
:
595 EXECUTABLE
= "xtables-" + variant
+ "-multi"
601 # setup global var log file
604 log_file
= open(LOGFILE
, 'w')
606 print("Couldn't open log file %s" % LOGFILE
, file=sys
.stderr
)
610 file_list
= args
.filename
612 file_list
= [os
.path
.join(EXTENSIONS_PATH
, i
)
613 for i
in os
.listdir(EXTENSIONS_PATH
)
617 for filename
in file_list
:
618 file_tests
, file_passed
= run_test_file(filename
, args
.netns
)
621 passed
+= file_passed
624 print("%s: %d test files, %d unit tests, %d passed"
625 % (variant
, test_files
, tests
, passed
))
627 total_passed
+= passed
629 total_test_files
= max(total_test_files
, test_files
)
631 if len(variants
) > 1:
632 print("total: %d test files, %d unit tests, %d passed"
633 % (total_test_files
, total_tests
, total_passed
))
634 return total_passed
- total_tests
636 if __name__
== '__main__':