]> git.ipfire.org Git - thirdparty/iptables.git/blob - iptables-test.py
tests: Test compat mode
[thirdparty/iptables.git] / iptables-test.py
1 #!/usr/bin/env python3
2 #
3 # (C) 2012-2013 by Pablo Neira Ayuso <pablo@netfilter.org>
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
9 #
10 # This software has been sponsored by Sophos Astaro <http://www.sophos.com>
11 #
12
13 from __future__ import print_function
14 import sys
15 import os
16 import subprocess
17 import argparse
18
19 IPTABLES = "iptables"
20 IP6TABLES = "ip6tables"
21 ARPTABLES = "arptables"
22 EBTABLES = "ebtables"
23
24 IPTABLES_SAVE = "iptables-save"
25 IP6TABLES_SAVE = "ip6tables-save"
26 ARPTABLES_SAVE = "arptables-save"
27 EBTABLES_SAVE = "ebtables-save"
28 #IPTABLES_SAVE = ['xtables-save','-4']
29 #IP6TABLES_SAVE = ['xtables-save','-6']
30
31 COMPAT_ARG = ""
32
33 EXTENSIONS_PATH = "extensions"
34 LOGFILE="/tmp/iptables-test.log"
35 log_file = None
36
37 STDOUT_IS_TTY = sys.stdout.isatty()
38 STDERR_IS_TTY = sys.stderr.isatty()
39
40 def maybe_colored(color, text, isatty):
41 terminal_sequences = {
42 'green': '\033[92m',
43 'red': '\033[91m',
44 }
45
46 return (
47 terminal_sequences[color] + text + '\033[0m' if isatty else text
48 )
49
50
51 def print_error(reason, filename=None, lineno=None):
52 '''
53 Prints an error with nice colors, indicating file and line number.
54 '''
55 print(filename + ": " + maybe_colored('red', "ERROR", STDERR_IS_TTY) +
56 ": line %d (%s)" % (lineno, reason), file=sys.stderr)
57
58
59 def delete_rule(iptables, rule, filename, lineno, netns = None):
60 '''
61 Removes an iptables rule
62 '''
63 cmd = iptables + " -D " + rule
64 ret = execute_cmd(cmd, filename, lineno, netns)
65 if ret == 1:
66 reason = "cannot delete: " + iptables + " -I " + rule
67 print_error(reason, filename, lineno)
68 return -1
69
70 return 0
71
72
73 def run_test(iptables, rule, rule_save, res, filename, lineno, netns):
74 '''
75 Executes an unit test. Returns the output of delete_rule().
76
77 Parameters:
78 :param iptables: string with the iptables command to execute
79 :param rule: string with iptables arguments for the rule to test
80 :param rule_save: string to find the rule in the output of iptables-save
81 :param res: expected result of the rule. Valid values: "OK", "FAIL"
82 :param filename: name of the file tested (used for print_error purposes)
83 :param lineno: line number being tested (used for print_error purposes)
84 :param netns: network namespace to call commands in (or None)
85 '''
86 ret = 0
87
88 cmd = iptables + COMPAT_ARG + " -A " + rule
89 ret = execute_cmd(cmd, filename, lineno, netns)
90
91 #
92 # report failed test
93 #
94 if ret:
95 if res != "FAIL":
96 reason = "cannot load: " + cmd
97 print_error(reason, filename, lineno)
98 return -1
99 else:
100 # do not report this error
101 return 0
102 else:
103 if res == "FAIL":
104 reason = "should fail: " + cmd
105 print_error(reason, filename, lineno)
106 delete_rule(iptables, rule, filename, lineno, netns)
107 return -1
108
109 matching = 0
110 tokens = iptables.split(" ")
111 if len(tokens) == 2:
112 if tokens[1] == '-4':
113 command = IPTABLES_SAVE
114 elif tokens[1] == '-6':
115 command = IP6TABLES_SAVE
116 elif len(tokens) == 1:
117 if tokens[0] == IPTABLES:
118 command = IPTABLES_SAVE
119 elif tokens[0] == IP6TABLES:
120 command = IP6TABLES_SAVE
121 elif tokens[0] == ARPTABLES:
122 command = ARPTABLES_SAVE
123 elif tokens[0] == EBTABLES:
124 command = EBTABLES_SAVE
125
126 command = EXECUTABLE + " " + command
127
128 if netns:
129 command = "ip netns exec " + netns + " " + command
130
131 args = tokens[1:]
132 proc = subprocess.Popen(command, shell=True,
133 stdin=subprocess.PIPE,
134 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
135 out, err = proc.communicate()
136
137 #
138 # check for segfaults
139 #
140 if proc.returncode == -11:
141 reason = command + " segfaults!"
142 print_error(reason, filename, lineno)
143 delete_rule(iptables, rule, filename, lineno, netns)
144 return -1
145
146 # find the rule
147 matching = out.find(rule_save.encode('utf-8'))
148 if matching < 0:
149 if res == "OK":
150 reason = "cannot find: " + iptables + " -I " + rule
151 print_error(reason, filename, lineno)
152 delete_rule(iptables, rule, filename, lineno, netns)
153 return -1
154 else:
155 # do not report this error
156 return 0
157 else:
158 if res != "OK":
159 reason = "should not match: " + cmd
160 print_error(reason, filename, lineno)
161 delete_rule(iptables, rule, filename, lineno, netns)
162 return -1
163
164 # Test "ip netns del NETNS" path with rules in place
165 if netns:
166 return 0
167
168 return delete_rule(iptables, rule, filename, lineno)
169
170 def execute_cmd(cmd, filename, lineno = 0, netns = None):
171 '''
172 Executes a command, checking for segfaults and returning the command exit
173 code.
174
175 :param cmd: string with the command to be executed
176 :param filename: name of the file tested (used for print_error purposes)
177 :param lineno: line number being tested (used for print_error purposes)
178 :param netns: network namespace to run command in
179 '''
180 global log_file
181 if cmd.startswith('iptables ') or cmd.startswith('ip6tables ') or cmd.startswith('ebtables ') or cmd.startswith('arptables '):
182 cmd = EXECUTABLE + " " + cmd
183
184 if netns:
185 cmd = "ip netns exec " + netns + " " + cmd
186
187 print("command: {}".format(cmd), file=log_file)
188 ret = subprocess.call(cmd, shell=True, universal_newlines=True,
189 stderr=subprocess.STDOUT, stdout=log_file)
190 log_file.flush()
191
192 # generic check for segfaults
193 if ret == -11:
194 reason = "command segfaults: " + cmd
195 print_error(reason, filename, lineno)
196 return ret
197
198
199 def variant_res(res, variant, alt_res=None):
200 '''
201 Adjust expected result with given variant
202
203 If expected result is scoped to a variant, the other one yields a different
204 result. Therefore map @res to itself if given variant is current, use the
205 alternate result, @alt_res, if specified, invert @res otherwise.
206
207 :param res: expected result from test spec ("OK", "FAIL" or "NOMATCH")
208 :param variant: variant @res is scoped to by test spec ("NFT" or "LEGACY")
209 :param alt_res: optional expected result for the alternate variant.
210 '''
211 variant_executable = {
212 "NFT": "xtables-nft-multi",
213 "LEGACY": "xtables-legacy-multi"
214 }
215 res_inverse = {
216 "OK": "FAIL",
217 "FAIL": "OK",
218 "NOMATCH": "OK"
219 }
220
221 if variant_executable[variant] == EXECUTABLE:
222 return res
223 if alt_res is not None:
224 return alt_res
225 return res_inverse[res]
226
227 def fast_run_possible(filename):
228 '''
229 Keep things simple, run only for simple test files:
230 - no external commands
231 - no multiple tables
232 - no variant-specific results
233 '''
234 table = None
235 rulecount = 0
236 for line in open(filename):
237 if line[0] in ["#", ":"] or len(line.strip()) == 0:
238 continue
239 if line[0] == "*":
240 if table or rulecount > 0:
241 return False
242 table = line.rstrip()[1:]
243 if line[0] in ["@", "%"]:
244 return False
245 if len(line.split(";")) > 3:
246 return False
247 rulecount += 1
248
249 return True
250
251 def run_test_file_fast(iptables, filename, netns):
252 '''
253 Run a test file, but fast
254
255 :param filename: name of the file with the test rules
256 :param netns: network namespace to perform test run in
257 '''
258
259 f = open(filename)
260
261 rules = {}
262 table = "filter"
263 chain_array = []
264 tests = 0
265
266 for lineno, line in enumerate(f):
267 if line[0] == "#" or len(line.strip()) == 0:
268 continue
269
270 if line[0] == "*":
271 table = line.rstrip()[1:]
272 continue
273
274 if line[0] == ":":
275 chain_array = line.rstrip()[1:].split(",")
276 continue
277
278 if len(chain_array) == 0:
279 return -1
280
281 tests += 1
282
283 for chain in chain_array:
284 item = line.split(";")
285 rule = chain + " " + item[0]
286
287 if item[1] == "=":
288 rule_save = chain + " " + item[0]
289 else:
290 rule_save = chain + " " + item[1]
291
292 if iptables == EBTABLES and rule_save.find('-j') < 0:
293 rule_save += " -j CONTINUE"
294
295 res = item[2].rstrip()
296 if res != "OK":
297 rule = chain + " -t " + table + " " + item[0]
298 ret = run_test(iptables, rule, rule_save,
299 res, filename, lineno + 1, netns)
300
301 if ret < 0:
302 return -1
303 continue
304
305 if not chain in rules.keys():
306 rules[chain] = []
307 rules[chain].append((rule, rule_save))
308
309 restore_data = ["*" + table]
310 out_expect = []
311 for chain in ["PREROUTING", "INPUT", "FORWARD", "OUTPUT", "POSTROUTING"]:
312 if not chain in rules.keys():
313 continue
314 for rule in rules[chain]:
315 restore_data.append("-A " + rule[0])
316 out_expect.append("-A " + rule[1])
317 restore_data.append("COMMIT")
318
319 out_expect = "\n".join(out_expect)
320
321 # load all rules via iptables_restore
322
323 command = EXECUTABLE + " " + iptables + "-restore" + COMPAT_ARG
324 if netns:
325 command = "ip netns exec " + netns + " " + command
326
327 for line in restore_data:
328 print(iptables + "-restore: " + line, file=log_file)
329
330 proc = subprocess.Popen(command, shell = True, text = True,
331 stdin = subprocess.PIPE,
332 stdout = subprocess.PIPE,
333 stderr = subprocess.PIPE)
334 restore_data = "\n".join(restore_data) + "\n"
335 out, err = proc.communicate(input = restore_data)
336
337 if proc.returncode == -11:
338 reason = iptables + "-restore segfaults!"
339 print_error(reason, filename, lineno)
340 msg = [iptables + "-restore segfault from:"]
341 msg.extend(["input: " + l for l in restore_data.split("\n")])
342 print("\n".join(msg), file=log_file)
343 return -1
344
345 if proc.returncode != 0:
346 print("%s-restore returned %d: %s" % (iptables, proc.returncode, err),
347 file=log_file)
348 return -1
349
350 # find all rules in iptables_save output
351
352 command = EXECUTABLE + " " + iptables + "-save"
353 if netns:
354 command = "ip netns exec " + netns + " " + command
355
356 proc = subprocess.Popen(command, shell = True,
357 stdin = subprocess.PIPE,
358 stdout = subprocess.PIPE,
359 stderr = subprocess.PIPE)
360 out, err = proc.communicate()
361
362 if proc.returncode == -11:
363 reason = iptables + "-save segfaults!"
364 print_error(reason, filename, lineno)
365 return -1
366
367 cmd = iptables + " -F -t " + table
368 execute_cmd(cmd, filename, 0, netns)
369
370 out = out.decode('utf-8').rstrip()
371 if out.find(out_expect) < 0:
372 msg = ["dumps differ!"]
373 msg.extend(["expect: " + l for l in out_expect.split("\n")])
374 msg.extend(["got: " + l for l in out.split("\n")
375 if not l[0] in ['*', ':', '#']])
376 print("\n".join(msg), file=log_file)
377 return -1
378
379 return tests
380
381 def run_test_file(filename, netns):
382 '''
383 Runs a test file
384
385 :param filename: name of the file with the test rules
386 :param netns: network namespace to perform test run in
387 '''
388 #
389 # if this is not a test file, skip.
390 #
391 if not filename.endswith(".t"):
392 return 0, 0
393
394 if "libipt_" in filename:
395 iptables = IPTABLES
396 elif "libip6t_" in filename:
397 iptables = IP6TABLES
398 elif "libxt_" in filename:
399 iptables = IPTABLES
400 elif "libarpt_" in filename:
401 # only supported with nf_tables backend
402 if EXECUTABLE != "xtables-nft-multi":
403 return 0, 0
404 iptables = ARPTABLES
405 elif "libebt_" in filename:
406 # only supported with nf_tables backend
407 if EXECUTABLE != "xtables-nft-multi":
408 return 0, 0
409 iptables = EBTABLES
410 else:
411 # default to iptables if not known prefix
412 iptables = IPTABLES
413
414 fast_failed = False
415 if fast_run_possible(filename):
416 tests = run_test_file_fast(iptables, filename, netns)
417 if tests > 0:
418 print(filename + ": " + maybe_colored('green', "OK", STDOUT_IS_TTY))
419 return tests, tests
420 fast_failed = True
421
422 f = open(filename)
423
424 tests = 0
425 passed = 0
426 table = ""
427 chain_array = []
428 total_test_passed = True
429
430 if netns:
431 execute_cmd("ip netns add " + netns, filename)
432
433 for lineno, line in enumerate(f):
434 if line[0] == "#" or len(line.strip()) == 0:
435 continue
436
437 if line[0] == ":":
438 chain_array = line.rstrip()[1:].split(",")
439 continue
440
441 # external command invocation, executed as is.
442 # detects iptables commands to prefix with EXECUTABLE automatically
443 if line[0] in ["@", "%"]:
444 external_cmd = line.rstrip()[1:]
445 execute_cmd(external_cmd, filename, lineno, netns)
446 continue
447
448 if line[0] == "*":
449 table = line.rstrip()[1:]
450 continue
451
452 if len(chain_array) == 0:
453 print_error("broken test, missing chain",
454 filename = filename, lineno = lineno)
455 total_test_passed = False
456 break
457
458 test_passed = True
459 tests += 1
460
461 for chain in chain_array:
462 item = line.split(";")
463 if table == "":
464 rule = chain + " " + item[0]
465 else:
466 rule = chain + " -t " + table + " " + item[0]
467
468 if item[1] == "=":
469 rule_save = chain + " " + item[0]
470 else:
471 rule_save = chain + " " + item[1]
472
473 res = item[2].rstrip()
474 if len(item) > 3:
475 variant = item[3].rstrip()
476 if len(item) > 4:
477 alt_res = item[4].rstrip()
478 else:
479 alt_res = None
480 res = variant_res(res, variant, alt_res)
481
482 ret = run_test(iptables, rule, rule_save,
483 res, filename, lineno + 1, netns)
484
485 if ret < 0:
486 test_passed = False
487 total_test_passed = False
488 break
489
490 if test_passed:
491 passed += 1
492
493 if netns:
494 execute_cmd("ip netns del " + netns, filename)
495 if total_test_passed:
496 suffix = ""
497 if fast_failed:
498 suffix = maybe_colored('red', " but fast mode failed!", STDOUT_IS_TTY)
499 print(filename + ": " + maybe_colored('green', "OK", STDOUT_IS_TTY) + suffix)
500
501 f.close()
502 return tests, passed
503
504
505 def show_missing():
506 '''
507 Show the list of missing test files
508 '''
509 file_list = os.listdir(EXTENSIONS_PATH)
510 testfiles = [i for i in file_list if i.endswith('.t')]
511 libfiles = [i for i in file_list
512 if i.startswith('lib') and i.endswith('.c')]
513
514 def test_name(x):
515 return x[0:-2] + '.t'
516 missing = [test_name(i) for i in libfiles
517 if not test_name(i) in testfiles]
518
519 print('\n'.join(missing))
520
521 def spawn_netns():
522 # prefer unshare module
523 try:
524 import unshare
525 unshare.unshare(unshare.CLONE_NEWNET)
526 return True
527 except:
528 pass
529
530 # sledgehammer style:
531 # - call ourselves prefixed by 'unshare -n' if found
532 # - pass extra --no-netns parameter to avoid another recursion
533 try:
534 import shutil
535
536 unshare = shutil.which("unshare")
537 if unshare is None:
538 return False
539
540 sys.argv.append("--no-netns")
541 os.execv(unshare, [unshare, "-n", sys.executable] + sys.argv)
542 except:
543 pass
544
545 return False
546
547 #
548 # main
549 #
550 def main():
551 parser = argparse.ArgumentParser(description='Run iptables tests')
552 parser.add_argument('filename', nargs='*',
553 metavar='path/to/file.t',
554 help='Run only this test')
555 parser.add_argument('-H', '--host', action='store_true',
556 help='Run tests against installed binaries')
557 parser.add_argument('-l', '--legacy', action='store_true',
558 help='Test iptables-legacy')
559 parser.add_argument('-m', '--missing', action='store_true',
560 help='Check for missing tests')
561 parser.add_argument('-n', '--nftables', action='store_true',
562 help='Test iptables-over-nftables')
563 parser.add_argument('-c', '--nft-compat', action='store_true',
564 help='Test iptables-over-nftables in compat mode')
565 parser.add_argument('-N', '--netns', action='store_const',
566 const='____iptables-container-test',
567 help='Test netnamespace path')
568 parser.add_argument('--no-netns', action='store_true',
569 help='Do not run testsuite in own network namespace')
570 args = parser.parse_args()
571
572 #
573 # show list of missing test files
574 #
575 if args.missing:
576 show_missing()
577 return
578
579 variants = []
580 if args.legacy:
581 variants.append("legacy")
582 if args.nftables:
583 variants.append("nft")
584 if args.nft_compat:
585 variants.append("nft_compat")
586 if len(variants) == 0:
587 variants = [ "legacy", "nft", "nft_compat" ]
588
589 if os.getuid() != 0:
590 print("You need to be root to run this, sorry", file=sys.stderr)
591 return 77
592
593 if not args.netns and not args.no_netns and not spawn_netns():
594 print("Cannot run in own namespace, connectivity might break",
595 file=sys.stderr)
596
597 if not args.host:
598 os.putenv("XTABLES_LIBDIR", os.path.abspath(EXTENSIONS_PATH))
599 os.putenv("PATH", "%s/iptables:%s" % (os.path.abspath(os.path.curdir),
600 os.getenv("PATH")))
601
602 total_test_files = 0
603 total_passed = 0
604 total_tests = 0
605 for variant in variants:
606 global EXECUTABLE
607 global COMPAT_ARG
608 if variant == "nft_compat":
609 EXECUTABLE = "xtables-nft-multi"
610 COMPAT_ARG = " --compat"
611 else:
612 EXECUTABLE = "xtables-" + variant + "-multi"
613
614 test_files = 0
615 tests = 0
616 passed = 0
617
618 # setup global var log file
619 global log_file
620 try:
621 log_file = open(LOGFILE, 'w')
622 except IOError:
623 print("Couldn't open log file %s" % LOGFILE, file=sys.stderr)
624 return
625
626 if args.filename:
627 file_list = args.filename
628 else:
629 file_list = [os.path.join(EXTENSIONS_PATH, i)
630 for i in os.listdir(EXTENSIONS_PATH)
631 if i.endswith('.t')]
632 file_list.sort()
633
634 for filename in file_list:
635 file_tests, file_passed = run_test_file(filename, args.netns)
636 if file_tests:
637 tests += file_tests
638 passed += file_passed
639 test_files += 1
640
641 print("%s: %d test files, %d unit tests, %d passed"
642 % (variant, test_files, tests, passed))
643
644 total_passed += passed
645 total_tests += tests
646 total_test_files = max(total_test_files, test_files)
647
648 if len(variants) > 1:
649 print("total: %d test files, %d unit tests, %d passed"
650 % (total_test_files, total_tests, total_passed))
651 return total_passed - total_tests
652
653 if __name__ == '__main__':
654 sys.exit(main())