]> git.ipfire.org Git - thirdparty/iptables.git/blob - iptables-test.py
configure: Add option to enable/disable libnfnetlink
[thirdparty/iptables.git] / iptables-test.py
1 #!/usr/bin/env python3
2 #
3 # (C) 2012-2013 by Pablo Neira Ayuso <pablo@netfilter.org>
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
9 #
10 # This software has been sponsored by Sophos Astaro <http://www.sophos.com>
11 #
12
13 from __future__ import print_function
14 import sys
15 import os
16 import subprocess
17 import argparse
18 from difflib import unified_diff
19
20 IPTABLES = "iptables"
21 IP6TABLES = "ip6tables"
22 ARPTABLES = "arptables"
23 EBTABLES = "ebtables"
24
25 IPTABLES_SAVE = "iptables-save"
26 IP6TABLES_SAVE = "ip6tables-save"
27 ARPTABLES_SAVE = "arptables-save"
28 EBTABLES_SAVE = "ebtables-save"
29 #IPTABLES_SAVE = ['xtables-save','-4']
30 #IP6TABLES_SAVE = ['xtables-save','-6']
31
32 EXTENSIONS_PATH = "extensions"
33 LOGFILE="/tmp/iptables-test.log"
34 log_file = None
35
36 STDOUT_IS_TTY = sys.stdout.isatty()
37 STDERR_IS_TTY = sys.stderr.isatty()
38
39 def maybe_colored(color, text, isatty):
40 terminal_sequences = {
41 'green': '\033[92m',
42 'red': '\033[91m',
43 }
44
45 return (
46 terminal_sequences[color] + text + '\033[0m' if isatty else text
47 )
48
49
50 def print_error(reason, filename=None, lineno=None):
51 '''
52 Prints an error with nice colors, indicating file and line number.
53 '''
54 print(filename + ": " + maybe_colored('red', "ERROR", STDERR_IS_TTY) +
55 ": line %d (%s)" % (lineno, reason), file=sys.stderr)
56
57
58 def delete_rule(iptables, rule, filename, lineno, netns = None):
59 '''
60 Removes an iptables rule
61 '''
62 cmd = iptables + " -D " + rule
63 ret = execute_cmd(cmd, filename, lineno, netns)
64 if ret == 1:
65 reason = "cannot delete: " + iptables + " -I " + rule
66 print_error(reason, filename, lineno)
67 return -1
68
69 return 0
70
71
72 def run_test(iptables, rule, rule_save, res, filename, lineno, netns):
73 '''
74 Executes an unit test. Returns the output of delete_rule().
75
76 Parameters:
77 :param iptables: string with the iptables command to execute
78 :param rule: string with iptables arguments for the rule to test
79 :param rule_save: string to find the rule in the output of iptables-save
80 :param res: expected result of the rule. Valid values: "OK", "FAIL"
81 :param filename: name of the file tested (used for print_error purposes)
82 :param lineno: line number being tested (used for print_error purposes)
83 :param netns: network namespace to call commands in (or None)
84 '''
85 ret = 0
86
87 cmd = iptables + " -A " + rule
88 ret = execute_cmd(cmd, filename, lineno, netns)
89
90 #
91 # report failed test
92 #
93 if ret:
94 if res != "FAIL":
95 reason = "cannot load: " + cmd
96 print_error(reason, filename, lineno)
97 return -1
98 else:
99 # do not report this error
100 return 0
101 else:
102 if res == "FAIL":
103 reason = "should fail: " + cmd
104 print_error(reason, filename, lineno)
105 delete_rule(iptables, rule, filename, lineno, netns)
106 return -1
107
108 matching = 0
109 tokens = iptables.split(" ")
110 if len(tokens) == 2:
111 if tokens[1] == '-4':
112 command = IPTABLES_SAVE
113 elif tokens[1] == '-6':
114 command = IP6TABLES_SAVE
115 elif len(tokens) == 1:
116 if tokens[0] == IPTABLES:
117 command = IPTABLES_SAVE
118 elif tokens[0] == IP6TABLES:
119 command = IP6TABLES_SAVE
120 elif tokens[0] == ARPTABLES:
121 command = ARPTABLES_SAVE
122 elif tokens[0] == EBTABLES:
123 command = EBTABLES_SAVE
124
125 command = EXECUTABLE + " " + command
126
127 if netns:
128 command = "ip netns exec " + netns + " " + command
129
130 args = tokens[1:]
131 proc = subprocess.Popen(command, shell=True,
132 stdin=subprocess.PIPE,
133 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
134 out, err = proc.communicate()
135
136 #
137 # check for segfaults
138 #
139 if proc.returncode == -11:
140 reason = command + " segfaults!"
141 print_error(reason, filename, lineno)
142 delete_rule(iptables, rule, filename, lineno, netns)
143 return -1
144
145 # find the rule
146 matching = out.find("\n-A {}\n".format(rule_save).encode('utf-8'))
147
148 if matching < 0:
149 if res == "OK":
150 reason = "cannot find: " + iptables + " -I " + rule
151 print_error(reason, filename, lineno)
152 delete_rule(iptables, rule, filename, lineno, netns)
153 return -1
154 else:
155 # do not report this error
156 return 0
157 else:
158 if res != "OK":
159 reason = "should not match: " + cmd
160 print_error(reason, filename, lineno)
161 delete_rule(iptables, rule, filename, lineno, netns)
162 return -1
163
164 # Test "ip netns del NETNS" path with rules in place
165 if netns:
166 return 0
167
168 return delete_rule(iptables, rule, filename, lineno)
169
170 def execute_cmd(cmd, filename, lineno = 0, netns = None):
171 '''
172 Executes a command, checking for segfaults and returning the command exit
173 code.
174
175 :param cmd: string with the command to be executed
176 :param filename: name of the file tested (used for print_error purposes)
177 :param lineno: line number being tested (used for print_error purposes)
178 :param netns: network namespace to run command in
179 '''
180 global log_file
181 if cmd.startswith('iptables ') or cmd.startswith('ip6tables ') or cmd.startswith('ebtables ') or cmd.startswith('arptables '):
182 cmd = EXECUTABLE + " " + cmd
183
184 if netns:
185 cmd = "ip netns exec " + netns + " " + cmd
186
187 print("command: {}".format(cmd), file=log_file)
188 ret = subprocess.call(cmd, shell=True, universal_newlines=True,
189 stderr=subprocess.STDOUT, stdout=log_file)
190 log_file.flush()
191
192 # generic check for segfaults
193 if ret == -11:
194 reason = "command segfaults: " + cmd
195 print_error(reason, filename, lineno)
196 return ret
197
198
199 def variant_res(res, variant, alt_res=None):
200 '''
201 Adjust expected result with given variant
202
203 If expected result is scoped to a variant, the other one yields a different
204 result. Therefore map @res to itself if given variant is current, use the
205 alternate result, @alt_res, if specified, invert @res otherwise.
206
207 :param res: expected result from test spec ("OK", "FAIL" or "NOMATCH")
208 :param variant: variant @res is scoped to by test spec ("NFT" or "LEGACY")
209 :param alt_res: optional expected result for the alternate variant.
210 '''
211 variant_executable = {
212 "NFT": "xtables-nft-multi",
213 "LEGACY": "xtables-legacy-multi"
214 }
215 res_inverse = {
216 "OK": "FAIL",
217 "FAIL": "OK",
218 "NOMATCH": "OK"
219 }
220
221 if variant_executable[variant] == EXECUTABLE:
222 return res
223 if alt_res is not None:
224 return alt_res
225 return res_inverse[res]
226
227 def fast_run_possible(filename):
228 '''
229 Keep things simple, run only for simple test files:
230 - no external commands
231 - no multiple tables
232 - no variant-specific results
233 '''
234 table = None
235 rulecount = 0
236 for line in open(filename):
237 if line[0] in ["#", ":"] or len(line.strip()) == 0:
238 continue
239 if line[0] == "*":
240 if table or rulecount > 0:
241 return False
242 table = line.rstrip()[1:]
243 if line[0] in ["@", "%"]:
244 return False
245 if len(line.split(";")) > 3:
246 return False
247 rulecount += 1
248
249 return True
250
251 def run_test_file_fast(iptables, filename, netns):
252 '''
253 Run a test file, but fast
254
255 :param filename: name of the file with the test rules
256 :param netns: network namespace to perform test run in
257 '''
258
259 f = open(filename)
260
261 rules = {}
262 table = "filter"
263 chain_array = []
264 tests = 0
265
266 for lineno, line in enumerate(f):
267 if line[0] == "#" or len(line.strip()) == 0:
268 continue
269
270 if line[0] == "*":
271 table = line.rstrip()[1:]
272 continue
273
274 if line[0] == ":":
275 chain_array = line.rstrip()[1:].split(",")
276 continue
277
278 if len(chain_array) == 0:
279 return -1
280
281 tests += 1
282
283 for chain in chain_array:
284 item = line.split(";")
285 rule = chain + " " + item[0]
286
287 if item[1] == "=":
288 rule_save = chain + " " + item[0]
289 else:
290 rule_save = chain + " " + item[1]
291
292 if iptables == EBTABLES and rule_save.find('-j') < 0:
293 rule_save += " -j CONTINUE"
294
295 res = item[2].rstrip()
296 if res != "OK":
297 rule = chain + " -t " + table + " " + item[0]
298 ret = run_test(iptables, rule, rule_save,
299 res, filename, lineno + 1, netns)
300
301 if ret < 0:
302 return -1
303 continue
304
305 if not chain in rules.keys():
306 rules[chain] = []
307 rules[chain].append((rule, rule_save))
308
309 restore_data = ["*" + table]
310 out_expect = []
311 for chain in ["PREROUTING", "INPUT", "FORWARD", "OUTPUT", "POSTROUTING"]:
312 if not chain in rules.keys():
313 continue
314 for rule in rules[chain]:
315 restore_data.append("-A " + rule[0])
316 out_expect.append("-A " + rule[1])
317 restore_data.append("COMMIT")
318
319 out_expect = "\n".join(out_expect)
320
321 # load all rules via iptables_restore
322
323 command = EXECUTABLE + " " + iptables + "-restore"
324 if netns:
325 command = "ip netns exec " + netns + " " + command
326
327 for line in restore_data:
328 print(iptables + "-restore: " + line, file=log_file)
329
330 proc = subprocess.Popen(command, shell = True, text = True,
331 stdin = subprocess.PIPE,
332 stdout = subprocess.PIPE,
333 stderr = subprocess.PIPE)
334 restore_data = "\n".join(restore_data) + "\n"
335 out, err = proc.communicate(input = restore_data)
336
337 if proc.returncode == -11:
338 reason = iptables + "-restore segfaults!"
339 print_error(reason, filename, lineno)
340 msg = [iptables + "-restore segfault from:"]
341 msg.extend(["input: " + l for l in restore_data.split("\n")])
342 print("\n".join(msg), file=log_file)
343 return -1
344
345 if proc.returncode != 0:
346 print("%s-restore returned %d: %s" % (iptables, proc.returncode, err),
347 file=log_file)
348 return -1
349
350 # find all rules in iptables_save output
351
352 command = EXECUTABLE + " " + iptables + "-save"
353 if netns:
354 command = "ip netns exec " + netns + " " + command
355
356 proc = subprocess.Popen(command, shell = True,
357 stdin = subprocess.PIPE,
358 stdout = subprocess.PIPE,
359 stderr = subprocess.PIPE)
360 out, err = proc.communicate()
361
362 if proc.returncode == -11:
363 reason = iptables + "-save segfaults!"
364 print_error(reason, filename, lineno)
365 return -1
366
367 cmd = iptables + " -F -t " + table
368 execute_cmd(cmd, filename, 0, netns)
369
370 out = out.decode('utf-8').rstrip()
371 if out.find(out_expect) < 0:
372 print("dumps differ!", file=log_file)
373 out_clean = [ l for l in out.split("\n")
374 if not l[0] in ['*', ':', '#']]
375 diff = unified_diff(out_expect.split("\n"), out_clean,
376 fromfile="expect", tofile="got", lineterm='')
377 print("\n".join(diff), file=log_file)
378 return -1
379
380 return tests
381
382 def run_test_file(filename, netns):
383 '''
384 Runs a test file
385
386 :param filename: name of the file with the test rules
387 :param netns: network namespace to perform test run in
388 '''
389 #
390 # if this is not a test file, skip.
391 #
392 if not filename.endswith(".t"):
393 return 0, 0
394
395 if "libipt_" in filename:
396 iptables = IPTABLES
397 elif "libip6t_" in filename:
398 iptables = IP6TABLES
399 elif "libxt_" in filename:
400 iptables = IPTABLES
401 elif "libarpt_" in filename:
402 # only supported with nf_tables backend
403 if EXECUTABLE != "xtables-nft-multi":
404 return 0, 0
405 iptables = ARPTABLES
406 elif "libebt_" in filename:
407 # only supported with nf_tables backend
408 if EXECUTABLE != "xtables-nft-multi":
409 return 0, 0
410 iptables = EBTABLES
411 else:
412 # default to iptables if not known prefix
413 iptables = IPTABLES
414
415 fast_failed = False
416 if fast_run_possible(filename):
417 tests = run_test_file_fast(iptables, filename, netns)
418 if tests > 0:
419 print(filename + ": " + maybe_colored('green', "OK", STDOUT_IS_TTY))
420 return tests, tests
421 fast_failed = True
422
423 f = open(filename)
424
425 tests = 0
426 passed = 0
427 table = ""
428 chain_array = []
429 total_test_passed = True
430
431 if netns:
432 execute_cmd("ip netns add " + netns, filename)
433
434 for lineno, line in enumerate(f):
435 if line[0] == "#" or len(line.strip()) == 0:
436 continue
437
438 if line[0] == ":":
439 chain_array = line.rstrip()[1:].split(",")
440 continue
441
442 # external command invocation, executed as is.
443 # detects iptables commands to prefix with EXECUTABLE automatically
444 if line[0] in ["@", "%"]:
445 external_cmd = line.rstrip()[1:]
446 execute_cmd(external_cmd, filename, lineno, netns)
447 continue
448
449 if line[0] == "*":
450 table = line.rstrip()[1:]
451 continue
452
453 if len(chain_array) == 0:
454 print_error("broken test, missing chain",
455 filename = filename, lineno = lineno)
456 total_test_passed = False
457 break
458
459 test_passed = True
460 tests += 1
461
462 for chain in chain_array:
463 item = line.split(";")
464 if table == "":
465 rule = chain + " " + item[0]
466 else:
467 rule = chain + " -t " + table + " " + item[0]
468
469 if item[1] == "=":
470 rule_save = chain + " " + item[0]
471 else:
472 rule_save = chain + " " + item[1]
473
474 if iptables == EBTABLES and rule_save.find('-j') < 0:
475 rule_save += " -j CONTINUE"
476
477 res = item[2].rstrip()
478 if len(item) > 3:
479 variant = item[3].rstrip()
480 if len(item) > 4:
481 alt_res = item[4].rstrip()
482 else:
483 alt_res = None
484 res = variant_res(res, variant, alt_res)
485
486 ret = run_test(iptables, rule, rule_save,
487 res, filename, lineno + 1, netns)
488
489 if ret < 0:
490 test_passed = False
491 total_test_passed = False
492 break
493
494 if test_passed:
495 passed += 1
496
497 if netns:
498 execute_cmd("ip netns del " + netns, filename)
499 if total_test_passed:
500 suffix = ""
501 if fast_failed:
502 suffix = maybe_colored('red', " but fast mode failed!", STDOUT_IS_TTY)
503 print(filename + ": " + maybe_colored('green', "OK", STDOUT_IS_TTY) + suffix)
504
505 f.close()
506 return tests, passed
507
508
509 def show_missing():
510 '''
511 Show the list of missing test files
512 '''
513 file_list = os.listdir(EXTENSIONS_PATH)
514 testfiles = [i for i in file_list if i.endswith('.t')]
515 libfiles = [i for i in file_list
516 if i.startswith('lib') and i.endswith('.c')]
517
518 def test_name(x):
519 return x[0:-2] + '.t'
520 missing = [test_name(i) for i in libfiles
521 if not test_name(i) in testfiles]
522
523 print('\n'.join(missing))
524
525 def spawn_netns():
526 # prefer unshare module
527 try:
528 import unshare
529 unshare.unshare(unshare.CLONE_NEWNET)
530 return True
531 except:
532 pass
533
534 # sledgehammer style:
535 # - call ourselves prefixed by 'unshare -n' if found
536 # - pass extra --no-netns parameter to avoid another recursion
537 try:
538 import shutil
539
540 unshare = shutil.which("unshare")
541 if unshare is None:
542 return False
543
544 sys.argv.append("--no-netns")
545 os.execv(unshare, [unshare, "-n", sys.executable] + sys.argv)
546 except:
547 pass
548
549 return False
550
551 #
552 # main
553 #
554 def main():
555 parser = argparse.ArgumentParser(description='Run iptables tests')
556 parser.add_argument('filename', nargs='*',
557 metavar='path/to/file.t',
558 help='Run only this test')
559 parser.add_argument('-H', '--host', action='store_true',
560 help='Run tests against installed binaries')
561 parser.add_argument('-l', '--legacy', action='store_true',
562 help='Test iptables-legacy')
563 parser.add_argument('-m', '--missing', action='store_true',
564 help='Check for missing tests')
565 parser.add_argument('-n', '--nftables', action='store_true',
566 help='Test iptables-over-nftables')
567 parser.add_argument('-N', '--netns', action='store_const',
568 const='____iptables-container-test',
569 help='Test netnamespace path')
570 parser.add_argument('--no-netns', action='store_true',
571 help='Do not run testsuite in own network namespace')
572 args = parser.parse_args()
573
574 #
575 # show list of missing test files
576 #
577 if args.missing:
578 show_missing()
579 return
580
581 variants = []
582 if args.legacy:
583 variants.append("legacy")
584 if args.nftables:
585 variants.append("nft")
586 if len(variants) == 0:
587 variants = [ "legacy", "nft" ]
588
589 if os.getuid() != 0:
590 print("You need to be root to run this, sorry", file=sys.stderr)
591 return 77
592
593 if not args.netns and not args.no_netns and not spawn_netns():
594 print("Cannot run in own namespace, connectivity might break",
595 file=sys.stderr)
596
597 if not args.host:
598 os.putenv("XTABLES_LIBDIR", os.path.abspath(EXTENSIONS_PATH))
599 os.putenv("PATH", "%s/iptables:%s" % (os.path.abspath(os.path.curdir),
600 os.getenv("PATH")))
601
602 total_test_files = 0
603 total_passed = 0
604 total_tests = 0
605 for variant in variants:
606 global EXECUTABLE
607 EXECUTABLE = "xtables-" + variant + "-multi"
608
609 test_files = 0
610 tests = 0
611 passed = 0
612
613 # setup global var log file
614 global log_file
615 try:
616 log_file = open(LOGFILE, 'w')
617 except IOError:
618 print("Couldn't open log file %s" % LOGFILE, file=sys.stderr)
619 return
620
621 if args.filename:
622 file_list = args.filename
623 else:
624 file_list = [os.path.join(EXTENSIONS_PATH, i)
625 for i in os.listdir(EXTENSIONS_PATH)
626 if i.endswith('.t')]
627 file_list.sort()
628
629 for filename in file_list:
630 file_tests, file_passed = run_test_file(filename, args.netns)
631 if file_tests:
632 tests += file_tests
633 passed += file_passed
634 test_files += 1
635
636 print("%s: %d test files, %d unit tests, %d passed"
637 % (variant, test_files, tests, passed))
638
639 total_passed += passed
640 total_tests += tests
641 total_test_files = max(total_test_files, test_files)
642
643 if len(variants) > 1:
644 print("total: %d test files, %d unit tests, %d passed"
645 % (total_test_files, total_tests, total_passed))
646 return total_passed - total_tests
647
648 if __name__ == '__main__':
649 sys.exit(main())