]>
git.ipfire.org Git - thirdparty/iptables.git/blob - iptables-test.py
3 # (C) 2012-2013 by Pablo Neira Ayuso <pablo@netfilter.org>
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This software has been sponsored by Sophos Astaro <http://www.sophos.com>
13 from __future__
import print_function
18 from difflib
import unified_diff
21 IP6TABLES
= "ip6tables"
22 ARPTABLES
= "arptables"
25 IPTABLES_SAVE
= "iptables-save"
26 IP6TABLES_SAVE
= "ip6tables-save"
27 ARPTABLES_SAVE
= "arptables-save"
28 EBTABLES_SAVE
= "ebtables-save"
29 #IPTABLES_SAVE = ['xtables-save','-4']
30 #IP6TABLES_SAVE = ['xtables-save','-6']
32 EXTENSIONS_PATH
= "extensions"
33 LOGFILE
="/tmp/iptables-test.log"
36 STDOUT_IS_TTY
= sys
.stdout
.isatty()
37 STDERR_IS_TTY
= sys
.stderr
.isatty()
39 def maybe_colored(color
, text
, isatty
):
40 terminal_sequences
= {
46 terminal_sequences
[color
] + text
+ '\033[0m' if isatty
else text
50 def print_error(reason
, filename
=None, lineno
=None):
52 Prints an error with nice colors, indicating file and line number.
54 print(filename
+ ": " + maybe_colored('red', "ERROR", STDERR_IS_TTY
) +
55 ": line %d (%s)" % (lineno
, reason
), file=sys
.stderr
)
58 def delete_rule(iptables
, rule
, filename
, lineno
, netns
= None):
60 Removes an iptables rule
62 cmd
= iptables
+ " -D " + rule
63 ret
= execute_cmd(cmd
, filename
, lineno
, netns
)
65 reason
= "cannot delete: " + iptables
+ " -I " + rule
66 print_error(reason
, filename
, lineno
)
72 def run_test(iptables
, rule
, rule_save
, res
, filename
, lineno
, netns
):
74 Executes an unit test. Returns the output of delete_rule().
77 :param iptables: string with the iptables command to execute
78 :param rule: string with iptables arguments for the rule to test
79 :param rule_save: string to find the rule in the output of iptables-save
80 :param res: expected result of the rule. Valid values: "OK", "FAIL"
81 :param filename: name of the file tested (used for print_error purposes)
82 :param lineno: line number being tested (used for print_error purposes)
83 :param netns: network namespace to call commands in (or None)
87 cmd
= iptables
+ " -A " + rule
88 ret
= execute_cmd(cmd
, filename
, lineno
, netns
)
95 reason
= "cannot load: " + cmd
96 print_error(reason
, filename
, lineno
)
99 # do not report this error
103 reason
= "should fail: " + cmd
104 print_error(reason
, filename
, lineno
)
105 delete_rule(iptables
, rule
, filename
, lineno
, netns
)
109 tokens
= iptables
.split(" ")
111 if tokens
[1] == '-4':
112 command
= IPTABLES_SAVE
113 elif tokens
[1] == '-6':
114 command
= IP6TABLES_SAVE
115 elif len(tokens
) == 1:
116 if tokens
[0] == IPTABLES
:
117 command
= IPTABLES_SAVE
118 elif tokens
[0] == IP6TABLES
:
119 command
= IP6TABLES_SAVE
120 elif tokens
[0] == ARPTABLES
:
121 command
= ARPTABLES_SAVE
122 elif tokens
[0] == EBTABLES
:
123 command
= EBTABLES_SAVE
125 command
= EXECUTABLE
+ " " + command
128 command
= "ip netns exec " + netns
+ " " + command
131 proc
= subprocess
.Popen(command
, shell
=True,
132 stdin
=subprocess
.PIPE
,
133 stdout
=subprocess
.PIPE
, stderr
=subprocess
.PIPE
)
134 out
, err
= proc
.communicate()
137 # check for segfaults
139 if proc
.returncode
== -11:
140 reason
= command
+ " segfaults!"
141 print_error(reason
, filename
, lineno
)
142 delete_rule(iptables
, rule
, filename
, lineno
, netns
)
146 matching
= out
.find("\n-A {}\n".format(rule_save
).encode('utf-8'))
150 reason
= "cannot find: " + iptables
+ " -I " + rule
151 print_error(reason
, filename
, lineno
)
152 delete_rule(iptables
, rule
, filename
, lineno
, netns
)
155 # do not report this error
159 reason
= "should not match: " + cmd
160 print_error(reason
, filename
, lineno
)
161 delete_rule(iptables
, rule
, filename
, lineno
, netns
)
164 # Test "ip netns del NETNS" path with rules in place
168 return delete_rule(iptables
, rule
, filename
, lineno
)
170 def execute_cmd(cmd
, filename
, lineno
= 0, netns
= None):
172 Executes a command, checking for segfaults and returning the command exit
175 :param cmd: string with the command to be executed
176 :param filename: name of the file tested (used for print_error purposes)
177 :param lineno: line number being tested (used for print_error purposes)
178 :param netns: network namespace to run command in
181 if cmd
.startswith('iptables ') or cmd
.startswith('ip6tables ') or cmd
.startswith('ebtables ') or cmd
.startswith('arptables '):
182 cmd
= EXECUTABLE
+ " " + cmd
185 cmd
= "ip netns exec " + netns
+ " " + cmd
187 print("command: {}".format(cmd
), file=log_file
)
188 ret
= subprocess
.call(cmd
, shell
=True, universal_newlines
=True,
189 stderr
=subprocess
.STDOUT
, stdout
=log_file
)
192 # generic check for segfaults
194 reason
= "command segfaults: " + cmd
195 print_error(reason
, filename
, lineno
)
199 def variant_res(res
, variant
, alt_res
=None):
201 Adjust expected result with given variant
203 If expected result is scoped to a variant, the other one yields a different
204 result. Therefore map @res to itself if given variant is current, use the
205 alternate result, @alt_res, if specified, invert @res otherwise.
207 :param res: expected result from test spec ("OK", "FAIL" or "NOMATCH")
208 :param variant: variant @res is scoped to by test spec ("NFT" or "LEGACY")
209 :param alt_res: optional expected result for the alternate variant.
211 variant_executable
= {
212 "NFT": "xtables-nft-multi",
213 "LEGACY": "xtables-legacy-multi"
221 if variant_executable
[variant
] == EXECUTABLE
:
223 if alt_res
is not None:
225 return res_inverse
[res
]
227 def fast_run_possible(filename
):
229 Keep things simple, run only for simple test files:
230 - no external commands
232 - no variant-specific results
236 for line
in open(filename
):
237 if line
[0] in ["#", ":"] or len(line
.strip()) == 0:
240 if table
or rulecount
> 0:
242 table
= line
.rstrip()[1:]
243 if line
[0] in ["@", "%"]:
245 if len(line
.split(";")) > 3:
251 def run_test_file_fast(iptables
, filename
, netns
):
253 Run a test file, but fast
255 :param filename: name of the file with the test rules
256 :param netns: network namespace to perform test run in
266 for lineno
, line
in enumerate(f
):
267 if line
[0] == "#" or len(line
.strip()) == 0:
271 table
= line
.rstrip()[1:]
275 chain_array
= line
.rstrip()[1:].split(",")
278 if len(chain_array
) == 0:
283 for chain
in chain_array
:
284 item
= line
.split(";")
285 rule
= chain
+ " " + item
[0]
288 rule_save
= chain
+ " " + item
[0]
290 rule_save
= chain
+ " " + item
[1]
292 if iptables
== EBTABLES
and rule_save
.find('-j') < 0:
293 rule_save
+= " -j CONTINUE"
295 res
= item
[2].rstrip()
297 rule
= chain
+ " -t " + table
+ " " + item
[0]
298 ret
= run_test(iptables
, rule
, rule_save
,
299 res
, filename
, lineno
+ 1, netns
)
305 if not chain
in rules
.keys():
307 rules
[chain
].append((rule
, rule_save
))
309 restore_data
= ["*" + table
]
311 for chain
in ["PREROUTING", "INPUT", "FORWARD", "OUTPUT", "POSTROUTING"]:
312 if not chain
in rules
.keys():
314 for rule
in rules
[chain
]:
315 restore_data
.append("-A " + rule
[0])
316 out_expect
.append("-A " + rule
[1])
317 restore_data
.append("COMMIT")
319 out_expect
= "\n".join(out_expect
)
321 # load all rules via iptables_restore
323 command
= EXECUTABLE
+ " " + iptables
+ "-restore"
325 command
= "ip netns exec " + netns
+ " " + command
327 for line
in restore_data
:
328 print(iptables
+ "-restore: " + line
, file=log_file
)
330 proc
= subprocess
.Popen(command
, shell
= True, text
= True,
331 stdin
= subprocess
.PIPE
,
332 stdout
= subprocess
.PIPE
,
333 stderr
= subprocess
.PIPE
)
334 restore_data
= "\n".join(restore_data
) + "\n"
335 out
, err
= proc
.communicate(input = restore_data
)
337 if proc
.returncode
== -11:
338 reason
= iptables
+ "-restore segfaults!"
339 print_error(reason
, filename
, lineno
)
340 msg
= [iptables
+ "-restore segfault from:"]
341 msg
.extend(["input: " + l
for l
in restore_data
.split("\n")])
342 print("\n".join(msg
), file=log_file
)
345 if proc
.returncode
!= 0:
346 print("%s-restore returned %d: %s" % (iptables
, proc
.returncode
, err
),
350 # find all rules in iptables_save output
352 command
= EXECUTABLE
+ " " + iptables
+ "-save"
354 command
= "ip netns exec " + netns
+ " " + command
356 proc
= subprocess
.Popen(command
, shell
= True,
357 stdin
= subprocess
.PIPE
,
358 stdout
= subprocess
.PIPE
,
359 stderr
= subprocess
.PIPE
)
360 out
, err
= proc
.communicate()
362 if proc
.returncode
== -11:
363 reason
= iptables
+ "-save segfaults!"
364 print_error(reason
, filename
, lineno
)
367 cmd
= iptables
+ " -F -t " + table
368 execute_cmd(cmd
, filename
, 0, netns
)
370 out
= out
.decode('utf-8').rstrip()
371 if out
.find(out_expect
) < 0:
372 print("dumps differ!", file=log_file
)
373 out_clean
= [ l
for l
in out
.split("\n")
374 if not l
[0] in ['*', ':', '#']]
375 diff
= unified_diff(out_expect
.split("\n"), out_clean
,
376 fromfile
="expect", tofile
="got", lineterm
='')
377 print("\n".join(diff
), file=log_file
)
382 def run_test_file(filename
, netns
):
386 :param filename: name of the file with the test rules
387 :param netns: network namespace to perform test run in
390 # if this is not a test file, skip.
392 if not filename
.endswith(".t"):
395 if "libipt_" in filename
:
397 elif "libip6t_" in filename
:
399 elif "libxt_" in filename
:
401 elif "libarpt_" in filename
:
402 # only supported with nf_tables backend
403 if EXECUTABLE
!= "xtables-nft-multi":
406 elif "libebt_" in filename
:
407 # only supported with nf_tables backend
408 if EXECUTABLE
!= "xtables-nft-multi":
412 # default to iptables if not known prefix
416 if fast_run_possible(filename
):
417 tests
= run_test_file_fast(iptables
, filename
, netns
)
419 print(filename
+ ": " + maybe_colored('green', "OK", STDOUT_IS_TTY
))
429 total_test_passed
= True
432 execute_cmd("ip netns add " + netns
, filename
)
434 for lineno
, line
in enumerate(f
):
435 if line
[0] == "#" or len(line
.strip()) == 0:
439 chain_array
= line
.rstrip()[1:].split(",")
442 # external command invocation, executed as is.
443 # detects iptables commands to prefix with EXECUTABLE automatically
444 if line
[0] in ["@", "%"]:
445 external_cmd
= line
.rstrip()[1:]
446 execute_cmd(external_cmd
, filename
, lineno
, netns
)
450 table
= line
.rstrip()[1:]
453 if len(chain_array
) == 0:
454 print_error("broken test, missing chain",
455 filename
= filename
, lineno
= lineno
)
456 total_test_passed
= False
462 for chain
in chain_array
:
463 item
= line
.split(";")
465 rule
= chain
+ " " + item
[0]
467 rule
= chain
+ " -t " + table
+ " " + item
[0]
470 rule_save
= chain
+ " " + item
[0]
472 rule_save
= chain
+ " " + item
[1]
474 if iptables
== EBTABLES
and rule_save
.find('-j') < 0:
475 rule_save
+= " -j CONTINUE"
477 res
= item
[2].rstrip()
479 variant
= item
[3].rstrip()
481 alt_res
= item
[4].rstrip()
484 res
= variant_res(res
, variant
, alt_res
)
486 ret
= run_test(iptables
, rule
, rule_save
,
487 res
, filename
, lineno
+ 1, netns
)
491 total_test_passed
= False
498 execute_cmd("ip netns del " + netns
, filename
)
499 if total_test_passed
:
502 suffix
= maybe_colored('red', " but fast mode failed!", STDOUT_IS_TTY
)
503 print(filename
+ ": " + maybe_colored('green', "OK", STDOUT_IS_TTY
) + suffix
)
511 Show the list of missing test files
513 file_list
= os
.listdir(EXTENSIONS_PATH
)
514 testfiles
= [i
for i
in file_list
if i
.endswith('.t')]
515 libfiles
= [i
for i
in file_list
516 if i
.startswith('lib') and i
.endswith('.c')]
519 return x
[0:-2] + '.t'
520 missing
= [test_name(i
) for i
in libfiles
521 if not test_name(i
) in testfiles
]
523 print('\n'.join(missing
))
526 # prefer unshare module
529 unshare
.unshare(unshare
.CLONE_NEWNET
)
534 # sledgehammer style:
535 # - call ourselves prefixed by 'unshare -n' if found
536 # - pass extra --no-netns parameter to avoid another recursion
540 unshare
= shutil
.which("unshare")
544 sys
.argv
.append("--no-netns")
545 os
.execv(unshare
, [unshare
, "-n", sys
.executable
] + sys
.argv
)
555 parser
= argparse
.ArgumentParser(description
='Run iptables tests')
556 parser
.add_argument('filename', nargs
='*',
557 metavar
='path/to/file.t',
558 help='Run only this test')
559 parser
.add_argument('-H', '--host', action
='store_true',
560 help='Run tests against installed binaries')
561 parser
.add_argument('-l', '--legacy', action
='store_true',
562 help='Test iptables-legacy')
563 parser
.add_argument('-m', '--missing', action
='store_true',
564 help='Check for missing tests')
565 parser
.add_argument('-n', '--nftables', action
='store_true',
566 help='Test iptables-over-nftables')
567 parser
.add_argument('-N', '--netns', action
='store_const',
568 const
='____iptables-container-test',
569 help='Test netnamespace path')
570 parser
.add_argument('--no-netns', action
='store_true',
571 help='Do not run testsuite in own network namespace')
572 args
= parser
.parse_args()
575 # show list of missing test files
583 variants
.append("legacy")
585 variants
.append("nft")
586 if len(variants
) == 0:
587 variants
= [ "legacy", "nft" ]
590 print("You need to be root to run this, sorry", file=sys
.stderr
)
593 if not args
.netns
and not args
.no_netns
and not spawn_netns():
594 print("Cannot run in own namespace, connectivity might break",
598 os
.putenv("XTABLES_LIBDIR", os
.path
.abspath(EXTENSIONS_PATH
))
599 os
.putenv("PATH", "%s/iptables:%s" % (os
.path
.abspath(os
.path
.curdir
),
605 for variant
in variants
:
607 EXECUTABLE
= "xtables-" + variant
+ "-multi"
613 # setup global var log file
616 log_file
= open(LOGFILE
, 'w')
618 print("Couldn't open log file %s" % LOGFILE
, file=sys
.stderr
)
622 file_list
= args
.filename
624 file_list
= [os
.path
.join(EXTENSIONS_PATH
, i
)
625 for i
in os
.listdir(EXTENSIONS_PATH
)
629 for filename
in file_list
:
630 file_tests
, file_passed
= run_test_file(filename
, args
.netns
)
633 passed
+= file_passed
636 print("%s: %d test files, %d unit tests, %d passed"
637 % (variant
, test_files
, tests
, passed
))
639 total_passed
+= passed
641 total_test_files
= max(total_test_files
, test_files
)
643 if len(variants
) > 1:
644 print("total: %d test files, %d unit tests, %d passed"
645 % (total_test_files
, total_tests
, total_passed
))
646 return total_passed
- total_tests
648 if __name__
== '__main__':