]> git.ipfire.org Git - thirdparty/iptables.git/blob - iptables-test.py
tests: iptables-test: Cover for obligatory -j CONTINUE in ebtables
[thirdparty/iptables.git] / iptables-test.py
1 #!/usr/bin/env python
2 #
3 # (C) 2012-2013 by Pablo Neira Ayuso <pablo@netfilter.org>
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
9 #
10 # This software has been sponsored by Sophos Astaro <http://www.sophos.com>
11 #
12
13 from __future__ import print_function
14 import sys
15 import os
16 import subprocess
17 import argparse
18
19 IPTABLES = "iptables"
20 IP6TABLES = "ip6tables"
21 ARPTABLES = "arptables"
22 EBTABLES = "ebtables"
23
24 IPTABLES_SAVE = "iptables-save"
25 IP6TABLES_SAVE = "ip6tables-save"
26 ARPTABLES_SAVE = "arptables-save"
27 EBTABLES_SAVE = "ebtables-save"
28 #IPTABLES_SAVE = ['xtables-save','-4']
29 #IP6TABLES_SAVE = ['xtables-save','-6']
30
31 EXTENSIONS_PATH = "extensions"
32 LOGFILE="/tmp/iptables-test.log"
33 log_file = None
34
35 STDOUT_IS_TTY = sys.stdout.isatty()
36 STDERR_IS_TTY = sys.stderr.isatty()
37
38 def maybe_colored(color, text, isatty):
39 terminal_sequences = {
40 'green': '\033[92m',
41 'red': '\033[91m',
42 }
43
44 return (
45 terminal_sequences[color] + text + '\033[0m' if isatty else text
46 )
47
48
49 def print_error(reason, filename=None, lineno=None):
50 '''
51 Prints an error with nice colors, indicating file and line number.
52 '''
53 print(filename + ": " + maybe_colored('red', "ERROR", STDERR_IS_TTY) +
54 ": line %d (%s)" % (lineno, reason), file=sys.stderr)
55
56
57 def delete_rule(iptables, rule, filename, lineno, netns = None):
58 '''
59 Removes an iptables rule
60 '''
61 cmd = iptables + " -D " + rule
62 ret = execute_cmd(cmd, filename, lineno, netns)
63 if ret == 1:
64 reason = "cannot delete: " + iptables + " -I " + rule
65 print_error(reason, filename, lineno)
66 return -1
67
68 return 0
69
70
71 def run_test(iptables, rule, rule_save, res, filename, lineno, netns):
72 '''
73 Executes an unit test. Returns the output of delete_rule().
74
75 Parameters:
76 :param iptables: string with the iptables command to execute
77 :param rule: string with iptables arguments for the rule to test
78 :param rule_save: string to find the rule in the output of iptables-save
79 :param res: expected result of the rule. Valid values: "OK", "FAIL"
80 :param filename: name of the file tested (used for print_error purposes)
81 :param lineno: line number being tested (used for print_error purposes)
82 :param netns: network namespace to call commands in (or None)
83 '''
84 ret = 0
85
86 cmd = iptables + " -A " + rule
87 ret = execute_cmd(cmd, filename, lineno, netns)
88
89 #
90 # report failed test
91 #
92 if ret:
93 if res != "FAIL":
94 reason = "cannot load: " + cmd
95 print_error(reason, filename, lineno)
96 return -1
97 else:
98 # do not report this error
99 return 0
100 else:
101 if res == "FAIL":
102 reason = "should fail: " + cmd
103 print_error(reason, filename, lineno)
104 delete_rule(iptables, rule, filename, lineno, netns)
105 return -1
106
107 matching = 0
108 tokens = iptables.split(" ")
109 if len(tokens) == 2:
110 if tokens[1] == '-4':
111 command = IPTABLES_SAVE
112 elif tokens[1] == '-6':
113 command = IP6TABLES_SAVE
114 elif len(tokens) == 1:
115 if tokens[0] == IPTABLES:
116 command = IPTABLES_SAVE
117 elif tokens[0] == IP6TABLES:
118 command = IP6TABLES_SAVE
119 elif tokens[0] == ARPTABLES:
120 command = ARPTABLES_SAVE
121 elif tokens[0] == EBTABLES:
122 command = EBTABLES_SAVE
123
124 command = EXECUTABLE + " " + command
125
126 if netns:
127 command = "ip netns exec " + netns + " " + command
128
129 args = tokens[1:]
130 proc = subprocess.Popen(command, shell=True,
131 stdin=subprocess.PIPE,
132 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
133 out, err = proc.communicate()
134
135 #
136 # check for segfaults
137 #
138 if proc.returncode == -11:
139 reason = "iptables-save segfaults: " + cmd
140 print_error(reason, filename, lineno)
141 delete_rule(iptables, rule, filename, lineno, netns)
142 return -1
143
144 # find the rule
145 matching = out.find(rule_save.encode('utf-8'))
146 if matching < 0:
147 if res == "OK":
148 reason = "cannot find: " + iptables + " -I " + rule
149 print_error(reason, filename, lineno)
150 delete_rule(iptables, rule, filename, lineno, netns)
151 return -1
152 else:
153 # do not report this error
154 return 0
155 else:
156 if res != "OK":
157 reason = "should not match: " + cmd
158 print_error(reason, filename, lineno)
159 delete_rule(iptables, rule, filename, lineno, netns)
160 return -1
161
162 # Test "ip netns del NETNS" path with rules in place
163 if netns:
164 return 0
165
166 return delete_rule(iptables, rule, filename, lineno)
167
168 def execute_cmd(cmd, filename, lineno = 0, netns = None):
169 '''
170 Executes a command, checking for segfaults and returning the command exit
171 code.
172
173 :param cmd: string with the command to be executed
174 :param filename: name of the file tested (used for print_error purposes)
175 :param lineno: line number being tested (used for print_error purposes)
176 :param netns: network namespace to run command in
177 '''
178 global log_file
179 if cmd.startswith('iptables ') or cmd.startswith('ip6tables ') or cmd.startswith('ebtables ') or cmd.startswith('arptables '):
180 cmd = EXECUTABLE + " " + cmd
181
182 if netns:
183 cmd = "ip netns exec " + netns + " " + cmd
184
185 print("command: {}".format(cmd), file=log_file)
186 ret = subprocess.call(cmd, shell=True, universal_newlines=True,
187 stderr=subprocess.STDOUT, stdout=log_file)
188 log_file.flush()
189
190 # generic check for segfaults
191 if ret == -11:
192 reason = "command segfaults: " + cmd
193 print_error(reason, filename, lineno)
194 return ret
195
196
197 def variant_res(res, variant, alt_res=None):
198 '''
199 Adjust expected result with given variant
200
201 If expected result is scoped to a variant, the other one yields a different
202 result. Therefore map @res to itself if given variant is current, use the
203 alternate result, @alt_res, if specified, invert @res otherwise.
204
205 :param res: expected result from test spec ("OK", "FAIL" or "NOMATCH")
206 :param variant: variant @res is scoped to by test spec ("NFT" or "LEGACY")
207 :param alt_res: optional expected result for the alternate variant.
208 '''
209 variant_executable = {
210 "NFT": "xtables-nft-multi",
211 "LEGACY": "xtables-legacy-multi"
212 }
213 res_inverse = {
214 "OK": "FAIL",
215 "FAIL": "OK",
216 "NOMATCH": "OK"
217 }
218
219 if variant_executable[variant] == EXECUTABLE:
220 return res
221 if alt_res is not None:
222 return alt_res
223 return res_inverse[res]
224
225 def fast_run_possible(filename):
226 '''
227 Keep things simple, run only for simple test files:
228 - no external commands
229 - no multiple tables
230 - no variant-specific results
231 '''
232 table = None
233 rulecount = 0
234 for line in open(filename):
235 if line[0] in ["#", ":"] or len(line.strip()) == 0:
236 continue
237 if line[0] == "*":
238 if table or rulecount > 0:
239 return False
240 table = line.rstrip()[1:]
241 if line[0] in ["@", "%"]:
242 return False
243 if len(line.split(";")) > 3:
244 return False
245 rulecount += 1
246
247 return True
248
249 def run_test_file_fast(iptables, filename, netns):
250 '''
251 Run a test file, but fast
252
253 :param filename: name of the file with the test rules
254 :param netns: network namespace to perform test run in
255 '''
256
257 f = open(filename)
258
259 rules = {}
260 table = "filter"
261 chain_array = []
262 tests = 0
263
264 for lineno, line in enumerate(f):
265 if line[0] == "#" or len(line.strip()) == 0:
266 continue
267
268 if line[0] == "*":
269 table = line.rstrip()[1:]
270 continue
271
272 if line[0] == ":":
273 chain_array = line.rstrip()[1:].split(",")
274 continue
275
276 if len(chain_array) == 0:
277 return -1
278
279 tests += 1
280
281 for chain in chain_array:
282 item = line.split(";")
283 rule = chain + " " + item[0]
284
285 if item[1] == "=":
286 rule_save = chain + " " + item[0]
287 else:
288 rule_save = chain + " " + item[1]
289
290 if iptables == EBTABLES and rule_save.find('-j') < 0:
291 rule_save += " -j CONTINUE"
292
293 res = item[2].rstrip()
294 if res != "OK":
295 rule = chain + " -t " + table + " " + item[0]
296 ret = run_test(iptables, rule, rule_save,
297 res, filename, lineno + 1, netns)
298
299 if ret < 0:
300 return -1
301 continue
302
303 if not chain in rules.keys():
304 rules[chain] = []
305 rules[chain].append((rule, rule_save))
306
307 restore_data = ["*" + table]
308 out_expect = []
309 for chain in ["PREROUTING", "INPUT", "FORWARD", "OUTPUT", "POSTROUTING"]:
310 if not chain in rules.keys():
311 continue
312 for rule in rules[chain]:
313 restore_data.append("-A " + rule[0])
314 out_expect.append("-A " + rule[1])
315 restore_data.append("COMMIT")
316
317 out_expect = "\n".join(out_expect)
318
319 # load all rules via iptables_restore
320
321 command = EXECUTABLE + " " + iptables + "-restore"
322 if netns:
323 command = "ip netns exec " + netns + " " + command
324
325 for line in restore_data:
326 print(iptables + "-restore: " + line, file=log_file)
327
328 proc = subprocess.Popen(command, shell = True, text = True,
329 stdin = subprocess.PIPE,
330 stdout = subprocess.PIPE,
331 stderr = subprocess.PIPE)
332 restore_data = "\n".join(restore_data) + "\n"
333 out, err = proc.communicate(input = restore_data)
334
335 if proc.returncode == -11:
336 reason = iptables + "-restore segfaults: " + cmd
337 print_error(reason, filename, lineno)
338 return -1
339
340 if proc.returncode != 0:
341 print("%s-restore returned %d: %s" % (iptables, proc.returncode, err),
342 file=log_file)
343 return -1
344
345 # find all rules in iptables_save output
346
347 command = EXECUTABLE + " " + iptables + "-save"
348 if netns:
349 command = "ip netns exec " + netns + " " + command
350
351 proc = subprocess.Popen(command, shell = True,
352 stdin = subprocess.PIPE,
353 stdout = subprocess.PIPE,
354 stderr = subprocess.PIPE)
355 out, err = proc.communicate()
356
357 if proc.returncode == -11:
358 reason = iptables + "-save segfaults: " + cmd
359 print_error(reason, filename, lineno)
360 return -1
361
362 cmd = iptables + " -F -t " + table
363 execute_cmd(cmd, filename, 0, netns)
364
365 out = out.decode('utf-8').rstrip()
366 if out.find(out_expect) < 0:
367 msg = ["dumps differ!"]
368 msg.extend(["expect: " + l for l in out_expect.split("\n")])
369 msg.extend(["got: " + l for l in out.split("\n")
370 if not l[0] in ['*', ':', '#']])
371 print("\n".join(msg), file=log_file)
372 return -1
373
374 return tests
375
376 def run_test_file(filename, netns):
377 '''
378 Runs a test file
379
380 :param filename: name of the file with the test rules
381 :param netns: network namespace to perform test run in
382 '''
383 #
384 # if this is not a test file, skip.
385 #
386 if not filename.endswith(".t"):
387 return 0, 0
388
389 if "libipt_" in filename:
390 iptables = IPTABLES
391 elif "libip6t_" in filename:
392 iptables = IP6TABLES
393 elif "libxt_" in filename:
394 iptables = IPTABLES
395 elif "libarpt_" in filename:
396 # only supported with nf_tables backend
397 if EXECUTABLE != "xtables-nft-multi":
398 return 0, 0
399 iptables = ARPTABLES
400 elif "libebt_" in filename:
401 # only supported with nf_tables backend
402 if EXECUTABLE != "xtables-nft-multi":
403 return 0, 0
404 iptables = EBTABLES
405 else:
406 # default to iptables if not known prefix
407 iptables = IPTABLES
408
409 fast_failed = False
410 if fast_run_possible(filename):
411 tests = run_test_file_fast(iptables, filename, netns)
412 if tests > 0:
413 print(filename + ": " + maybe_colored('green', "OK", STDOUT_IS_TTY))
414 return tests, tests
415 fast_failed = True
416
417 f = open(filename)
418
419 tests = 0
420 passed = 0
421 table = ""
422 chain_array = []
423 total_test_passed = True
424
425 if netns:
426 execute_cmd("ip netns add " + netns, filename)
427
428 for lineno, line in enumerate(f):
429 if line[0] == "#" or len(line.strip()) == 0:
430 continue
431
432 if line[0] == ":":
433 chain_array = line.rstrip()[1:].split(",")
434 continue
435
436 # external command invocation, executed as is.
437 # detects iptables commands to prefix with EXECUTABLE automatically
438 if line[0] in ["@", "%"]:
439 external_cmd = line.rstrip()[1:]
440 execute_cmd(external_cmd, filename, lineno, netns)
441 continue
442
443 if line[0] == "*":
444 table = line.rstrip()[1:]
445 continue
446
447 if len(chain_array) == 0:
448 print_error("broken test, missing chain",
449 filename = filename, lineno = lineno)
450 total_test_passed = False
451 break
452
453 test_passed = True
454 tests += 1
455
456 for chain in chain_array:
457 item = line.split(";")
458 if table == "":
459 rule = chain + " " + item[0]
460 else:
461 rule = chain + " -t " + table + " " + item[0]
462
463 if item[1] == "=":
464 rule_save = chain + " " + item[0]
465 else:
466 rule_save = chain + " " + item[1]
467
468 res = item[2].rstrip()
469 if len(item) > 3:
470 variant = item[3].rstrip()
471 if len(item) > 4:
472 alt_res = item[4].rstrip()
473 else:
474 alt_res = None
475 res = variant_res(res, variant, alt_res)
476
477 ret = run_test(iptables, rule, rule_save,
478 res, filename, lineno + 1, netns)
479
480 if ret < 0:
481 test_passed = False
482 total_test_passed = False
483 break
484
485 if test_passed:
486 passed += 1
487
488 if netns:
489 execute_cmd("ip netns del " + netns, filename)
490 if total_test_passed:
491 suffix = ""
492 if fast_failed:
493 suffix = maybe_colored('red', " but fast mode failed!", STDOUT_IS_TTY)
494 print(filename + ": " + maybe_colored('green', "OK", STDOUT_IS_TTY) + suffix)
495
496 f.close()
497 return tests, passed
498
499
500 def show_missing():
501 '''
502 Show the list of missing test files
503 '''
504 file_list = os.listdir(EXTENSIONS_PATH)
505 testfiles = [i for i in file_list if i.endswith('.t')]
506 libfiles = [i for i in file_list
507 if i.startswith('lib') and i.endswith('.c')]
508
509 def test_name(x):
510 return x[0:-2] + '.t'
511 missing = [test_name(i) for i in libfiles
512 if not test_name(i) in testfiles]
513
514 print('\n'.join(missing))
515
516 def spawn_netns():
517 # prefer unshare module
518 try:
519 import unshare
520 unshare.unshare(unshare.CLONE_NEWNET)
521 return True
522 except:
523 pass
524
525 # sledgehammer style:
526 # - call ourselves prefixed by 'unshare -n' if found
527 # - pass extra --no-netns parameter to avoid another recursion
528 try:
529 import shutil
530
531 unshare = shutil.which("unshare")
532 if unshare is None:
533 return False
534
535 sys.argv.append("--no-netns")
536 os.execv(unshare, [unshare, "-n", sys.executable] + sys.argv)
537 except:
538 pass
539
540 return False
541
542 #
543 # main
544 #
545 def main():
546 parser = argparse.ArgumentParser(description='Run iptables tests')
547 parser.add_argument('filename', nargs='*',
548 metavar='path/to/file.t',
549 help='Run only this test')
550 parser.add_argument('-H', '--host', action='store_true',
551 help='Run tests against installed binaries')
552 parser.add_argument('-l', '--legacy', action='store_true',
553 help='Test iptables-legacy')
554 parser.add_argument('-m', '--missing', action='store_true',
555 help='Check for missing tests')
556 parser.add_argument('-n', '--nftables', action='store_true',
557 help='Test iptables-over-nftables')
558 parser.add_argument('-N', '--netns', action='store_const',
559 const='____iptables-container-test',
560 help='Test netnamespace path')
561 parser.add_argument('--no-netns', action='store_true',
562 help='Do not run testsuite in own network namespace')
563 args = parser.parse_args()
564
565 #
566 # show list of missing test files
567 #
568 if args.missing:
569 show_missing()
570 return
571
572 variants = []
573 if args.legacy:
574 variants.append("legacy")
575 if args.nftables:
576 variants.append("nft")
577 if len(variants) == 0:
578 variants = [ "legacy", "nft" ]
579
580 if os.getuid() != 0:
581 print("You need to be root to run this, sorry", file=sys.stderr)
582 return
583
584 if not args.netns and not args.no_netns and not spawn_netns():
585 print("Cannot run in own namespace, connectivity might break",
586 file=sys.stderr)
587
588 if not args.host:
589 os.putenv("XTABLES_LIBDIR", os.path.abspath(EXTENSIONS_PATH))
590 os.putenv("PATH", "%s/iptables:%s" % (os.path.abspath(os.path.curdir),
591 os.getenv("PATH")))
592
593 total_test_files = 0
594 total_passed = 0
595 total_tests = 0
596 for variant in variants:
597 global EXECUTABLE
598 EXECUTABLE = "xtables-" + variant + "-multi"
599
600 test_files = 0
601 tests = 0
602 passed = 0
603
604 # setup global var log file
605 global log_file
606 try:
607 log_file = open(LOGFILE, 'w')
608 except IOError:
609 print("Couldn't open log file %s" % LOGFILE, file=sys.stderr)
610 return
611
612 if args.filename:
613 file_list = args.filename
614 else:
615 file_list = [os.path.join(EXTENSIONS_PATH, i)
616 for i in os.listdir(EXTENSIONS_PATH)
617 if i.endswith('.t')]
618 file_list.sort()
619
620 for filename in file_list:
621 file_tests, file_passed = run_test_file(filename, args.netns)
622 if file_tests:
623 tests += file_tests
624 passed += file_passed
625 test_files += 1
626
627 print("%s: %d test files, %d unit tests, %d passed"
628 % (variant, test_files, tests, passed))
629
630 total_passed += passed
631 total_tests += tests
632 total_test_files = max(total_test_files, test_files)
633
634 if len(variants) > 1:
635 print("total: %d test files, %d unit tests, %d passed"
636 % (total_test_files, total_tests, total_passed))
637 return total_passed - total_tests
638
639 if __name__ == '__main__':
640 sys.exit(main())