]>
git.ipfire.org Git - thirdparty/iptables.git/blob - iptables-test.py
3 # (C) 2012-2013 by Pablo Neira Ayuso <pablo@netfilter.org>
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This software has been sponsored by Sophos Astaro <http://www.sophos.com>
13 from __future__
import print_function
20 IP6TABLES
= "ip6tables"
21 ARPTABLES
= "arptables"
24 IPTABLES_SAVE
= "iptables-save"
25 IP6TABLES_SAVE
= "ip6tables-save"
26 ARPTABLES_SAVE
= "arptables-save"
27 EBTABLES_SAVE
= "ebtables-save"
28 #IPTABLES_SAVE = ['xtables-save','-4']
29 #IP6TABLES_SAVE = ['xtables-save','-6']
31 EXTENSIONS_PATH
= "extensions"
32 LOGFILE
="/tmp/iptables-test.log"
35 STDOUT_IS_TTY
= sys
.stdout
.isatty()
36 STDERR_IS_TTY
= sys
.stderr
.isatty()
38 def maybe_colored(color
, text
, isatty
):
39 terminal_sequences
= {
45 terminal_sequences
[color
] + text
+ '\033[0m' if isatty
else text
49 def print_error(reason
, filename
=None, lineno
=None):
51 Prints an error with nice colors, indicating file and line number.
53 print(filename
+ ": " + maybe_colored('red', "ERROR", STDERR_IS_TTY
) +
54 ": line %d (%s)" % (lineno
, reason
), file=sys
.stderr
)
57 def delete_rule(iptables
, rule
, filename
, lineno
, netns
= None):
59 Removes an iptables rule
61 cmd
= iptables
+ " -D " + rule
62 ret
= execute_cmd(cmd
, filename
, lineno
, netns
)
64 reason
= "cannot delete: " + iptables
+ " -I " + rule
65 print_error(reason
, filename
, lineno
)
71 def run_test(iptables
, rule
, rule_save
, res
, filename
, lineno
, netns
):
73 Executes an unit test. Returns the output of delete_rule().
76 :param iptables: string with the iptables command to execute
77 :param rule: string with iptables arguments for the rule to test
78 :param rule_save: string to find the rule in the output of iptables-save
79 :param res: expected result of the rule. Valid values: "OK", "FAIL"
80 :param filename: name of the file tested (used for print_error purposes)
81 :param lineno: line number being tested (used for print_error purposes)
82 :param netns: network namespace to call commands in (or None)
86 cmd
= iptables
+ " -A " + rule
87 ret
= execute_cmd(cmd
, filename
, lineno
, netns
)
94 reason
= "cannot load: " + cmd
95 print_error(reason
, filename
, lineno
)
98 # do not report this error
102 reason
= "should fail: " + cmd
103 print_error(reason
, filename
, lineno
)
104 delete_rule(iptables
, rule
, filename
, lineno
, netns
)
108 tokens
= iptables
.split(" ")
110 if tokens
[1] == '-4':
111 command
= IPTABLES_SAVE
112 elif tokens
[1] == '-6':
113 command
= IP6TABLES_SAVE
114 elif len(tokens
) == 1:
115 if tokens
[0] == IPTABLES
:
116 command
= IPTABLES_SAVE
117 elif tokens
[0] == IP6TABLES
:
118 command
= IP6TABLES_SAVE
119 elif tokens
[0] == ARPTABLES
:
120 command
= ARPTABLES_SAVE
121 elif tokens
[0] == EBTABLES
:
122 command
= EBTABLES_SAVE
124 command
= EXECUTABLE
+ " " + command
127 command
= "ip netns exec " + netns
+ " " + command
130 proc
= subprocess
.Popen(command
, shell
=True,
131 stdin
=subprocess
.PIPE
,
132 stdout
=subprocess
.PIPE
, stderr
=subprocess
.PIPE
)
133 out
, err
= proc
.communicate()
136 # check for segfaults
138 if proc
.returncode
== -11:
139 reason
= "iptables-save segfaults: " + cmd
140 print_error(reason
, filename
, lineno
)
141 delete_rule(iptables
, rule
, filename
, lineno
, netns
)
145 matching
= out
.find(rule_save
.encode('utf-8'))
148 reason
= "cannot find: " + iptables
+ " -I " + rule
149 print_error(reason
, filename
, lineno
)
150 delete_rule(iptables
, rule
, filename
, lineno
, netns
)
153 # do not report this error
157 reason
= "should not match: " + cmd
158 print_error(reason
, filename
, lineno
)
159 delete_rule(iptables
, rule
, filename
, lineno
, netns
)
162 # Test "ip netns del NETNS" path with rules in place
166 return delete_rule(iptables
, rule
, filename
, lineno
)
168 def execute_cmd(cmd
, filename
, lineno
= 0, netns
= None):
170 Executes a command, checking for segfaults and returning the command exit
173 :param cmd: string with the command to be executed
174 :param filename: name of the file tested (used for print_error purposes)
175 :param lineno: line number being tested (used for print_error purposes)
176 :param netns: network namespace to run command in
179 if cmd
.startswith('iptables ') or cmd
.startswith('ip6tables ') or cmd
.startswith('ebtables ') or cmd
.startswith('arptables '):
180 cmd
= EXECUTABLE
+ " " + cmd
183 cmd
= "ip netns exec " + netns
+ " " + cmd
185 print("command: {}".format(cmd
), file=log_file
)
186 ret
= subprocess
.call(cmd
, shell
=True, universal_newlines
=True,
187 stderr
=subprocess
.STDOUT
, stdout
=log_file
)
190 # generic check for segfaults
192 reason
= "command segfaults: " + cmd
193 print_error(reason
, filename
, lineno
)
197 def variant_res(res
, variant
, alt_res
=None):
199 Adjust expected result with given variant
201 If expected result is scoped to a variant, the other one yields a different
202 result. Therefore map @res to itself if given variant is current, use the
203 alternate result, @alt_res, if specified, invert @res otherwise.
205 :param res: expected result from test spec ("OK", "FAIL" or "NOMATCH")
206 :param variant: variant @res is scoped to by test spec ("NFT" or "LEGACY")
207 :param alt_res: optional expected result for the alternate variant.
209 variant_executable
= {
210 "NFT": "xtables-nft-multi",
211 "LEGACY": "xtables-legacy-multi"
219 if variant_executable
[variant
] == EXECUTABLE
:
221 if alt_res
is not None:
223 return res_inverse
[res
]
225 def fast_run_possible(filename
):
227 Keep things simple, run only for simple test files:
228 - no external commands
230 - no variant-specific results
234 for line
in open(filename
):
235 if line
[0] in ["#", ":"] or len(line
.strip()) == 0:
238 if table
or rulecount
> 0:
240 table
= line
.rstrip()[1:]
241 if line
[0] in ["@", "%"]:
243 if len(line
.split(";")) > 3:
249 def run_test_file_fast(iptables
, filename
, netns
):
251 Run a test file, but fast
253 :param filename: name of the file with the test rules
254 :param netns: network namespace to perform test run in
264 for lineno
, line
in enumerate(f
):
265 if line
[0] == "#" or len(line
.strip()) == 0:
269 table
= line
.rstrip()[1:]
273 chain_array
= line
.rstrip()[1:].split(",")
276 if len(chain_array
) == 0:
281 for chain
in chain_array
:
282 item
= line
.split(";")
283 rule
= chain
+ " " + item
[0]
286 rule_save
= chain
+ " " + item
[0]
288 rule_save
= chain
+ " " + item
[1]
290 if iptables
== EBTABLES
and rule_save
.find('-j') < 0:
291 rule_save
+= " -j CONTINUE"
293 res
= item
[2].rstrip()
295 rule
= chain
+ " -t " + table
+ " " + item
[0]
296 ret
= run_test(iptables
, rule
, rule_save
,
297 res
, filename
, lineno
+ 1, netns
)
303 if not chain
in rules
.keys():
305 rules
[chain
].append((rule
, rule_save
))
307 restore_data
= ["*" + table
]
309 for chain
in ["PREROUTING", "INPUT", "FORWARD", "OUTPUT", "POSTROUTING"]:
310 if not chain
in rules
.keys():
312 for rule
in rules
[chain
]:
313 restore_data
.append("-A " + rule
[0])
314 out_expect
.append("-A " + rule
[1])
315 restore_data
.append("COMMIT")
317 out_expect
= "\n".join(out_expect
)
319 # load all rules via iptables_restore
321 command
= EXECUTABLE
+ " " + iptables
+ "-restore"
323 command
= "ip netns exec " + netns
+ " " + command
325 for line
in restore_data
:
326 print(iptables
+ "-restore: " + line
, file=log_file
)
328 proc
= subprocess
.Popen(command
, shell
= True, text
= True,
329 stdin
= subprocess
.PIPE
,
330 stdout
= subprocess
.PIPE
,
331 stderr
= subprocess
.PIPE
)
332 restore_data
= "\n".join(restore_data
) + "\n"
333 out
, err
= proc
.communicate(input = restore_data
)
335 if proc
.returncode
== -11:
336 reason
= iptables
+ "-restore segfaults: " + cmd
337 print_error(reason
, filename
, lineno
)
340 if proc
.returncode
!= 0:
341 print("%s-restore returned %d: %s" % (iptables
, proc
.returncode
, err
),
345 # find all rules in iptables_save output
347 command
= EXECUTABLE
+ " " + iptables
+ "-save"
349 command
= "ip netns exec " + netns
+ " " + command
351 proc
= subprocess
.Popen(command
, shell
= True,
352 stdin
= subprocess
.PIPE
,
353 stdout
= subprocess
.PIPE
,
354 stderr
= subprocess
.PIPE
)
355 out
, err
= proc
.communicate()
357 if proc
.returncode
== -11:
358 reason
= iptables
+ "-save segfaults: " + cmd
359 print_error(reason
, filename
, lineno
)
362 cmd
= iptables
+ " -F -t " + table
363 execute_cmd(cmd
, filename
, 0, netns
)
365 out
= out
.decode('utf-8').rstrip()
366 if out
.find(out_expect
) < 0:
367 msg
= ["dumps differ!"]
368 msg
.extend(["expect: " + l
for l
in out_expect
.split("\n")])
369 msg
.extend(["got: " + l
for l
in out
.split("\n")
370 if not l
[0] in ['*', ':', '#']])
371 print("\n".join(msg
), file=log_file
)
376 def run_test_file(filename
, netns
):
380 :param filename: name of the file with the test rules
381 :param netns: network namespace to perform test run in
384 # if this is not a test file, skip.
386 if not filename
.endswith(".t"):
389 if "libipt_" in filename
:
391 elif "libip6t_" in filename
:
393 elif "libxt_" in filename
:
395 elif "libarpt_" in filename
:
396 # only supported with nf_tables backend
397 if EXECUTABLE
!= "xtables-nft-multi":
400 elif "libebt_" in filename
:
401 # only supported with nf_tables backend
402 if EXECUTABLE
!= "xtables-nft-multi":
406 # default to iptables if not known prefix
410 if fast_run_possible(filename
):
411 tests
= run_test_file_fast(iptables
, filename
, netns
)
413 print(filename
+ ": " + maybe_colored('green', "OK", STDOUT_IS_TTY
))
423 total_test_passed
= True
426 execute_cmd("ip netns add " + netns
, filename
)
428 for lineno
, line
in enumerate(f
):
429 if line
[0] == "#" or len(line
.strip()) == 0:
433 chain_array
= line
.rstrip()[1:].split(",")
436 # external command invocation, executed as is.
437 # detects iptables commands to prefix with EXECUTABLE automatically
438 if line
[0] in ["@", "%"]:
439 external_cmd
= line
.rstrip()[1:]
440 execute_cmd(external_cmd
, filename
, lineno
, netns
)
444 table
= line
.rstrip()[1:]
447 if len(chain_array
) == 0:
448 print_error("broken test, missing chain",
449 filename
= filename
, lineno
= lineno
)
450 total_test_passed
= False
456 for chain
in chain_array
:
457 item
= line
.split(";")
459 rule
= chain
+ " " + item
[0]
461 rule
= chain
+ " -t " + table
+ " " + item
[0]
464 rule_save
= chain
+ " " + item
[0]
466 rule_save
= chain
+ " " + item
[1]
468 res
= item
[2].rstrip()
470 variant
= item
[3].rstrip()
472 alt_res
= item
[4].rstrip()
475 res
= variant_res(res
, variant
, alt_res
)
477 ret
= run_test(iptables
, rule
, rule_save
,
478 res
, filename
, lineno
+ 1, netns
)
482 total_test_passed
= False
489 execute_cmd("ip netns del " + netns
, filename
)
490 if total_test_passed
:
493 suffix
= maybe_colored('red', " but fast mode failed!", STDOUT_IS_TTY
)
494 print(filename
+ ": " + maybe_colored('green', "OK", STDOUT_IS_TTY
) + suffix
)
502 Show the list of missing test files
504 file_list
= os
.listdir(EXTENSIONS_PATH
)
505 testfiles
= [i
for i
in file_list
if i
.endswith('.t')]
506 libfiles
= [i
for i
in file_list
507 if i
.startswith('lib') and i
.endswith('.c')]
510 return x
[0:-2] + '.t'
511 missing
= [test_name(i
) for i
in libfiles
512 if not test_name(i
) in testfiles
]
514 print('\n'.join(missing
))
517 # prefer unshare module
520 unshare
.unshare(unshare
.CLONE_NEWNET
)
525 # sledgehammer style:
526 # - call ourselves prefixed by 'unshare -n' if found
527 # - pass extra --no-netns parameter to avoid another recursion
531 unshare
= shutil
.which("unshare")
535 sys
.argv
.append("--no-netns")
536 os
.execv(unshare
, [unshare
, "-n", sys
.executable
] + sys
.argv
)
546 parser
= argparse
.ArgumentParser(description
='Run iptables tests')
547 parser
.add_argument('filename', nargs
='*',
548 metavar
='path/to/file.t',
549 help='Run only this test')
550 parser
.add_argument('-H', '--host', action
='store_true',
551 help='Run tests against installed binaries')
552 parser
.add_argument('-l', '--legacy', action
='store_true',
553 help='Test iptables-legacy')
554 parser
.add_argument('-m', '--missing', action
='store_true',
555 help='Check for missing tests')
556 parser
.add_argument('-n', '--nftables', action
='store_true',
557 help='Test iptables-over-nftables')
558 parser
.add_argument('-N', '--netns', action
='store_const',
559 const
='____iptables-container-test',
560 help='Test netnamespace path')
561 parser
.add_argument('--no-netns', action
='store_true',
562 help='Do not run testsuite in own network namespace')
563 args
= parser
.parse_args()
566 # show list of missing test files
574 variants
.append("legacy")
576 variants
.append("nft")
577 if len(variants
) == 0:
578 variants
= [ "legacy", "nft" ]
581 print("You need to be root to run this, sorry", file=sys
.stderr
)
584 if not args
.netns
and not args
.no_netns
and not spawn_netns():
585 print("Cannot run in own namespace, connectivity might break",
589 os
.putenv("XTABLES_LIBDIR", os
.path
.abspath(EXTENSIONS_PATH
))
590 os
.putenv("PATH", "%s/iptables:%s" % (os
.path
.abspath(os
.path
.curdir
),
596 for variant
in variants
:
598 EXECUTABLE
= "xtables-" + variant
+ "-multi"
604 # setup global var log file
607 log_file
= open(LOGFILE
, 'w')
609 print("Couldn't open log file %s" % LOGFILE
, file=sys
.stderr
)
613 file_list
= args
.filename
615 file_list
= [os
.path
.join(EXTENSIONS_PATH
, i
)
616 for i
in os
.listdir(EXTENSIONS_PATH
)
620 for filename
in file_list
:
621 file_tests
, file_passed
= run_test_file(filename
, args
.netns
)
624 passed
+= file_passed
627 print("%s: %d test files, %d unit tests, %d passed"
628 % (variant
, test_files
, tests
, passed
))
630 total_passed
+= passed
632 total_test_files
= max(total_test_files
, test_files
)
634 if len(variants
) > 1:
635 print("total: %d test files, %d unit tests, %d passed"
636 % (total_test_files
, total_tests
, total_passed
))
637 return total_passed
- total_tests
639 if __name__
== '__main__':