]> git.ipfire.org Git - thirdparty/iptables.git/blob - xlate-test.py
configure: Add option to enable/disable libnfnetlink
[thirdparty/iptables.git] / xlate-test.py
1 #!/usr/bin/env python3
2 # encoding: utf-8
3
4 import os
5 import sys
6 import shlex
7 import argparse
8 from subprocess import Popen, PIPE
9
10 def run_proc(args, shell = False, input = None):
11 """A simple wrapper around Popen, returning (rc, stdout, stderr)"""
12 process = Popen(args, text = True, shell = shell,
13 stdin = PIPE, stdout = PIPE, stderr = PIPE)
14 output, error = process.communicate(input)
15 return (process.returncode, output, error)
16
17 keywords = ("iptables-translate", "ip6tables-translate", "arptables-translate", "ebtables-translate")
18 xtables_nft_multi = 'xtables-nft-multi'
19
20 if sys.stdout.isatty():
21 colors = {"magenta": "\033[95m", "green": "\033[92m", "yellow": "\033[93m",
22 "red": "\033[91m", "end": "\033[0m"}
23 else:
24 colors = {"magenta": "", "green": "", "yellow": "", "red": "", "end": ""}
25
26
27 def magenta(string):
28 return colors["magenta"] + string + colors["end"]
29
30
31 def red(string):
32 return colors["red"] + string + colors["end"]
33
34
35 def yellow(string):
36 return colors["yellow"] + string + colors["end"]
37
38
39 def green(string):
40 return colors["green"] + string + colors["end"]
41
42
43 def test_one_xlate(name, sourceline, expected, result):
44 cmd = [xtables_nft_multi] + shlex.split(sourceline)
45 rc, output, error = run_proc(cmd)
46 if rc != 0:
47 result.append(name + ": " + red("Error: ") + "Call failed: " + " ".join(cmd))
48 result.append(error)
49 return False
50
51 translation = output.rstrip(" \n")
52 if translation != expected:
53 result.append(name + ": " + red("Fail"))
54 result.append(magenta("src: ") + sourceline.rstrip(" \n"))
55 result.append(magenta("exp: ") + expected)
56 result.append(magenta("res: ") + translation + "\n")
57 return False
58
59 return True
60
61 def test_one_replay(name, sourceline, expected, result):
62 global args
63
64 searchline = None
65 if sourceline.find(';') >= 0:
66 sourceline, searchline = sourceline.split(';')
67
68 srcwords = shlex.split(sourceline)
69
70 srccmd = srcwords[0]
71 ipt = srccmd.split('-')[0]
72 table_idx = -1
73 chain_idx = -1
74 table_name = "filter"
75 chain_name = None
76 for idx in range(1, len(srcwords)):
77 if srcwords[idx] in ["-A", "-I", "--append", "--insert"]:
78 chain_idx = idx
79 chain_name = srcwords[idx + 1]
80 elif srcwords[idx] in ["-t", "--table"]:
81 table_idx = idx
82 table_name = srcwords[idx + 1]
83
84 if not chain_name:
85 return True # nothing to do?
86
87 if searchline is None:
88 # adjust sourceline as required
89 checkcmd = srcwords[:]
90 checkcmd[0] = ipt
91 checkcmd[chain_idx] = "--check"
92 else:
93 checkcmd = [ipt, "-t", table_name]
94 checkcmd += ["--check", chain_name, searchline]
95
96 fam = ""
97 if srccmd.startswith("ip6"):
98 fam = "ip6 "
99 elif srccmd.startswith("arp"):
100 fam = "arp "
101 elif srccmd.startswith("ebt"):
102 fam = "bridge "
103
104 expected = [ l.removeprefix("nft ").strip(" '") for l in expected.split("\n") ]
105 nft_input = [
106 "flush ruleset",
107 "add table " + fam + table_name,
108 "add chain " + fam + table_name + " " + chain_name,
109 ] + expected
110
111 rc, output, error = run_proc([args.nft, "-f", "-"], shell = False, input = "\n".join(nft_input))
112 if rc != 0:
113 result.append(name + ": " + red("Replay Fail"))
114 result.append(args.nft + " call failed: " + error.rstrip('\n'))
115 for line in nft_input:
116 result.append(magenta("input: ") + line)
117 return False
118
119 rc, output, error = run_proc([xtables_nft_multi] + checkcmd)
120 if rc != 0:
121 result.append(name + ": " + red("Check Fail"))
122 result.append(magenta("check: ") + " ".join(checkcmd))
123 result.append(magenta("error: ") + error)
124 rc, output, error = run_proc([xtables_nft_multi, ipt + "-save"])
125 for l in output.split("\n"):
126 result.append(magenta("ipt: ") + l)
127 rc, output, error = run_proc([args.nft, "list", "ruleset"])
128 for l in output.split("\n"):
129 result.append(magenta("nft: ") + l)
130 return False
131
132 return True
133
134
135 def run_test(name, payload):
136 global xtables_nft_multi
137 global args
138
139 test_passed = True
140 tests = passed = failed = errors = 0
141 result = []
142
143 line = payload.readline()
144 while line:
145 if not line.startswith(keywords):
146 line = payload.readline()
147 continue
148
149 sourceline = replayline = line.rstrip("\n")
150 if line.find(';') >= 0:
151 sourceline = line.split(';')[0]
152
153 expected = payload.readline().rstrip(" \n")
154 next_expected = payload.readline()
155 if next_expected.startswith("nft"):
156 expected += "\n" + next_expected.rstrip(" \n")
157 line = payload.readline()
158 else:
159 line = next_expected
160
161 tests += 1
162 if test_one_xlate(name, sourceline, expected, result):
163 passed += 1
164 else:
165 errors += 1
166 test_passed = False
167 continue
168
169 if args.replay:
170 tests += 1
171 if test_one_replay(name, replayline, expected, result):
172 passed += 1
173 else:
174 errors += 1
175 test_passed = False
176
177 rc, output, error = run_proc([args.nft, "flush", "ruleset"])
178 if rc != 0:
179 result.append(name + ": " + red("Fail"))
180 result.append("nft flush ruleset call failed: " + error)
181
182 if (passed == tests):
183 print(name + ": " + green("OK"))
184 if not test_passed:
185 print("\n".join(result), file=sys.stderr)
186 return tests, passed, failed, errors
187
188
189 def load_test_files():
190 test_files = total_tests = total_passed = total_error = total_failed = 0
191 tests = sorted(os.listdir("extensions"))
192 for test in ['extensions/' + f for f in tests if f.endswith(".txlate")]:
193 with open(test, "r") as payload:
194 tests, passed, failed, errors = run_test(test, payload)
195 test_files += 1
196 total_tests += tests
197 total_passed += passed
198 total_failed += failed
199 total_error += errors
200 return (test_files, total_tests, total_passed, total_failed, total_error)
201
202
203 def spawn_netns():
204 # prefer unshare module
205 try:
206 import unshare
207 unshare.unshare(unshare.CLONE_NEWNET)
208 return True
209 except:
210 pass
211
212 # sledgehammer style:
213 # - call ourselves prefixed by 'unshare -n' if found
214 # - pass extra --no-netns parameter to avoid another recursion
215 try:
216 import shutil
217
218 unshare = shutil.which("unshare")
219 if unshare is None:
220 return False
221
222 sys.argv.append("--no-netns")
223 os.execv(unshare, [unshare, "-n", sys.executable] + sys.argv)
224 except:
225 pass
226
227 return False
228
229
230 def main():
231 global xtables_nft_multi
232
233 if args.replay:
234 if os.getuid() != 0:
235 print("Replay test requires root, sorry", file=sys.stderr)
236 return
237 if not args.no_netns and not spawn_netns():
238 print("Cannot run in own namespace, connectivity might break",
239 file=sys.stderr)
240
241 if not args.host:
242 os.putenv("XTABLES_LIBDIR", os.path.abspath("extensions"))
243 xtables_nft_multi = os.path.abspath(os.path.curdir) \
244 + '/iptables/' + xtables_nft_multi
245
246 files = tests = passed = failed = errors = 0
247 for test in args.test:
248 if not test.endswith(".txlate"):
249 test += ".txlate"
250 try:
251 with open(test, "r") as payload:
252 t, p, f, e = run_test(test, payload)
253 files += 1
254 tests += t
255 passed += p
256 failed += f
257 errors += e
258 except IOError:
259 print(red("Error: ") + "test file does not exist", file=sys.stderr)
260 return 99
261
262 if files == 0:
263 files, tests, passed, failed, errors = load_test_files()
264
265 if files > 1:
266 file_word = "files"
267 else:
268 file_word = "file"
269 print("%d test %s, %d tests, %d tests passed, %d tests failed, %d errors"
270 % (files, file_word, tests, passed, failed, errors))
271 return passed - tests
272
273
274 parser = argparse.ArgumentParser()
275 parser.add_argument('-H', '--host', action='store_true',
276 help='Run tests against installed binaries')
277 parser.add_argument('-R', '--replay', action='store_true',
278 help='Replay tests to check iptables-nft parser')
279 parser.add_argument('-n', '--nft', type=str, default='nft',
280 help='Replay using given nft binary (default: \'%(default)s\')')
281 parser.add_argument('--no-netns', action='store_true',
282 help='Do not run testsuite in own network namespace')
283 parser.add_argument("test", nargs="*", help="run only the specified test file(s)")
284 args = parser.parse_args()
285 sys.exit(main())