]>
Commit | Line | Data |
---|---|---|
886b00b2 | 1 | #!/usr/bin/env python |
c8b7aaab PNA |
2 | # |
3 | # (C) 2012-2013 by Pablo Neira Ayuso <pablo@netfilter.org> | |
4 | # | |
5 | # This program is free software; you can redistribute it and/or modify | |
6 | # it under the terms of the GNU General Public License as published by | |
7 | # the Free Software Foundation; either version 2 of the License, or | |
8 | # (at your option) any later version. | |
9 | # | |
10 | # This software has been sponsored by Sophos Astaro <http://www.sophos.com> | |
11 | # | |
12 | ||
886b00b2 | 13 | from __future__ import print_function |
c8b7aaab PNA |
14 | import sys |
15 | import os | |
16 | import subprocess | |
17 | import argparse | |
18 | ||
19 | IPTABLES = "iptables" | |
20 | IP6TABLES = "ip6tables" | |
3ac65afe | 21 | ARPTABLES = "arptables" |
fb747f8a | 22 | EBTABLES = "ebtables" |
c8b7aaab PNA |
23 | |
24 | IPTABLES_SAVE = "iptables-save" | |
25 | IP6TABLES_SAVE = "ip6tables-save" | |
3ac65afe | 26 | ARPTABLES_SAVE = "arptables-save" |
fb747f8a | 27 | EBTABLES_SAVE = "ebtables-save" |
c8b7aaab PNA |
28 | #IPTABLES_SAVE = ['xtables-save','-4'] |
29 | #IP6TABLES_SAVE = ['xtables-save','-6'] | |
30 | ||
31 | EXTENSIONS_PATH = "extensions" | |
32 | LOGFILE="/tmp/iptables-test.log" | |
33 | log_file = None | |
34 | ||
35 | ||
36 | class Colors: | |
37 | HEADER = '\033[95m' | |
38 | BLUE = '\033[94m' | |
39 | GREEN = '\033[92m' | |
40 | YELLOW = '\033[93m' | |
41 | RED = '\033[91m' | |
42 | ENDC = '\033[0m' | |
43 | ||
44 | ||
45 | def print_error(reason, filename=None, lineno=None): | |
46 | ''' | |
47 | Prints an error with nice colors, indicating file and line number. | |
48 | ''' | |
886b00b2 | 49 | print(filename + ": " + Colors.RED + "ERROR" + |
c8b7aaab PNA |
50 | Colors.ENDC + ": line %d (%s)" % (lineno, reason)) |
51 | ||
52 | ||
53 | def delete_rule(iptables, rule, filename, lineno): | |
54 | ''' | |
55 | Removes an iptables rule | |
56 | ''' | |
a77a7d8e | 57 | cmd = iptables + " -D " + rule |
c8b7aaab PNA |
58 | ret = execute_cmd(cmd, filename, lineno) |
59 | if ret == 1: | |
60 | reason = "cannot delete: " + iptables + " -I " + rule | |
61 | print_error(reason, filename, lineno) | |
62 | return -1 | |
63 | ||
64 | return 0 | |
65 | ||
66 | ||
0123183f | 67 | def run_test(iptables, rule, rule_save, res, filename, lineno, netns): |
c8b7aaab PNA |
68 | ''' |
69 | Executes an unit test. Returns the output of delete_rule(). | |
70 | ||
71 | Parameters: | |
72 | :param iptables: string with the iptables command to execute | |
73 | :param rule: string with iptables arguments for the rule to test | |
74 | :param rule_save: string to find the rule in the output of iptables -save | |
75 | :param res: expected result of the rule. Valid values: "OK", "FAIL" | |
76 | :param filename: name of the file tested (used for print_error purposes) | |
77 | :param lineno: line number being tested (used for print_error purposes) | |
78 | ''' | |
79 | ret = 0 | |
80 | ||
a77a7d8e | 81 | cmd = iptables + " -A " + rule |
0123183f PNA |
82 | if netns: |
83 | cmd = "ip netns exec ____iptables-container-test " + EXECUTEABLE + " " + cmd | |
84 | ||
c8b7aaab PNA |
85 | ret = execute_cmd(cmd, filename, lineno) |
86 | ||
87 | # | |
88 | # report failed test | |
89 | # | |
90 | if ret: | |
91 | if res == "OK": | |
92 | reason = "cannot load: " + cmd | |
93 | print_error(reason, filename, lineno) | |
94 | return -1 | |
95 | else: | |
96 | # do not report this error | |
97 | return 0 | |
98 | else: | |
99 | if res == "FAIL": | |
100 | reason = "should fail: " + cmd | |
101 | print_error(reason, filename, lineno) | |
102 | delete_rule(iptables, rule, filename, lineno) | |
103 | return -1 | |
104 | ||
105 | matching = 0 | |
106 | splitted = iptables.split(" ") | |
107 | if len(splitted) == 2: | |
108 | if splitted[1] == '-4': | |
109 | command = IPTABLES_SAVE | |
110 | elif splitted[1] == '-6': | |
111 | command = IP6TABLES_SAVE | |
112 | elif len(splitted) == 1: | |
113 | if splitted[0] == IPTABLES: | |
114 | command = IPTABLES_SAVE | |
115 | elif splitted[0] == IP6TABLES: | |
116 | command = IP6TABLES_SAVE | |
3ac65afe FW |
117 | elif splitted[0] == ARPTABLES: |
118 | command = ARPTABLES_SAVE | |
fb747f8a FW |
119 | elif splitted[0] == EBTABLES: |
120 | command = EBTABLES_SAVE | |
0123183f PNA |
121 | |
122 | path = os.path.abspath(os.path.curdir) + "/iptables/" + EXECUTEABLE | |
123 | command = path + " " + command | |
124 | ||
125 | if netns: | |
126 | command = "ip netns exec ____iptables-container-test " + command | |
127 | ||
c8b7aaab | 128 | args = splitted[1:] |
0123183f | 129 | proc = subprocess.Popen(command, shell=True, |
d7ac61b5 | 130 | stdin=subprocess.PIPE, |
c8b7aaab PNA |
131 | stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
132 | out, err = proc.communicate() | |
133 | ||
134 | # | |
135 | # check for segfaults | |
136 | # | |
137 | if proc.returncode == -11: | |
138 | reason = "iptables-save segfaults: " + cmd | |
139 | print_error(reason, filename, lineno) | |
140 | delete_rule(iptables, rule, filename, lineno) | |
141 | return -1 | |
142 | ||
143 | # find the rule | |
886b00b2 | 144 | matching = out.find(rule_save.encode('utf-8')) |
c8b7aaab PNA |
145 | if matching < 0: |
146 | reason = "cannot find: " + iptables + " -I " + rule | |
147 | print_error(reason, filename, lineno) | |
148 | delete_rule(iptables, rule, filename, lineno) | |
149 | return -1 | |
150 | ||
0123183f PNA |
151 | # Test "ip netns del NETNS" path with rules in place |
152 | if netns: | |
153 | return 0 | |
154 | ||
c8b7aaab PNA |
155 | return delete_rule(iptables, rule, filename, lineno) |
156 | ||
c8b7aaab PNA |
157 | def execute_cmd(cmd, filename, lineno): |
158 | ''' | |
159 | Executes a command, checking for segfaults and returning the command exit | |
160 | code. | |
161 | ||
162 | :param cmd: string with the command to be executed | |
163 | :param filename: name of the file tested (used for print_error purposes) | |
164 | :param lineno: line number being tested (used for print_error purposes) | |
165 | ''' | |
166 | global log_file | |
3ac65afe | 167 | if cmd.startswith('iptables ') or cmd.startswith('ip6tables ') or cmd.startswith('ebtables ') or cmd.startswith('arptables '): |
a77a7d8e FW |
168 | cmd = os.path.abspath(os.path.curdir) + "/iptables/" + EXECUTEABLE + " " + cmd |
169 | ||
886b00b2 | 170 | print("command: {}".format(cmd), file=log_file) |
c8b7aaab PNA |
171 | ret = subprocess.call(cmd, shell=True, universal_newlines=True, |
172 | stderr=subprocess.STDOUT, stdout=log_file) | |
173 | log_file.flush() | |
174 | ||
175 | # generic check for segfaults | |
176 | if ret == -11: | |
177 | reason = "command segfaults: " + cmd | |
178 | print_error(reason, filename, lineno) | |
179 | return ret | |
180 | ||
181 | ||
0123183f | 182 | def run_test_file(filename, netns): |
c8b7aaab PNA |
183 | ''' |
184 | Runs a test file | |
185 | ||
186 | :param filename: name of the file with the test rules | |
187 | ''' | |
188 | # | |
189 | # if this is not a test file, skip. | |
190 | # | |
191 | if not filename.endswith(".t"): | |
192 | return 0, 0 | |
193 | ||
194 | if "libipt_" in filename: | |
195 | iptables = IPTABLES | |
196 | elif "libip6t_" in filename: | |
197 | iptables = IP6TABLES | |
198 | elif "libxt_" in filename: | |
199 | iptables = IPTABLES | |
3ac65afe FW |
200 | elif "libarpt_" in filename: |
201 | # only supported with nf_tables backend | |
202 | if EXECUTEABLE != "xtables-nft-multi": | |
203 | return 0, 0 | |
204 | iptables = ARPTABLES | |
fb747f8a FW |
205 | elif "libebt_" in filename: |
206 | # only supported with nf_tables backend | |
207 | if EXECUTEABLE != "xtables-nft-multi": | |
208 | return 0, 0 | |
209 | iptables = EBTABLES | |
c8b7aaab PNA |
210 | else: |
211 | # default to iptables if not known prefix | |
212 | iptables = IPTABLES | |
213 | ||
214 | f = open(filename) | |
215 | ||
216 | tests = 0 | |
217 | passed = 0 | |
218 | table = "" | |
219 | total_test_passed = True | |
220 | ||
9ff99156 TY |
221 | if netns: |
222 | execute_cmd("ip netns add ____iptables-container-test", filename, 0) | |
223 | ||
c8b7aaab PNA |
224 | for lineno, line in enumerate(f): |
225 | if line[0] == "#": | |
226 | continue | |
227 | ||
228 | if line[0] == ":": | |
229 | chain_array = line.rstrip()[1:].split(",") | |
230 | continue | |
231 | ||
232 | # external non-iptables invocation, executed as is. | |
233 | if line[0] == "@": | |
234 | external_cmd = line.rstrip()[1:] | |
9ff99156 | 235 | if netns: |
b81c8da2 | 236 | external_cmd = "ip netns exec ____iptables-container-test " + external_cmd |
c8b7aaab PNA |
237 | execute_cmd(external_cmd, filename, lineno) |
238 | continue | |
239 | ||
a9f93778 PNA |
240 | # external iptables invocation, executed as is. |
241 | if line[0] == "%": | |
242 | external_cmd = line.rstrip()[1:] | |
243 | if netns: | |
244 | external_cmd = "ip netns exec ____iptables-container-test " + EXECUTEABLE + " " + external_cmd | |
245 | execute_cmd(external_cmd, filename, lineno) | |
246 | continue | |
247 | ||
c8b7aaab PNA |
248 | if line[0] == "*": |
249 | table = line.rstrip()[1:] | |
250 | continue | |
251 | ||
252 | if len(chain_array) == 0: | |
886b00b2 | 253 | print("broken test, missing chain, leaving") |
c8b7aaab PNA |
254 | sys.exit() |
255 | ||
256 | test_passed = True | |
257 | tests += 1 | |
258 | ||
259 | for chain in chain_array: | |
260 | item = line.split(";") | |
261 | if table == "": | |
262 | rule = chain + " " + item[0] | |
263 | else: | |
264 | rule = chain + " -t " + table + " " + item[0] | |
265 | ||
266 | if item[1] == "=": | |
267 | rule_save = chain + " " + item[0] | |
268 | else: | |
269 | rule_save = chain + " " + item[1] | |
270 | ||
271 | res = item[2].rstrip() | |
9ff99156 TY |
272 | ret = run_test(iptables, rule, rule_save, |
273 | res, filename, lineno + 1, netns) | |
0123183f | 274 | |
c8b7aaab PNA |
275 | if ret < 0: |
276 | test_passed = False | |
277 | total_test_passed = False | |
278 | break | |
279 | ||
280 | if test_passed: | |
281 | passed += 1 | |
282 | ||
9ff99156 TY |
283 | if netns: |
284 | execute_cmd("ip netns del ____iptables-container-test", filename, 0) | |
c8b7aaab | 285 | if total_test_passed: |
886b00b2 | 286 | print(filename + ": " + Colors.GREEN + "OK" + Colors.ENDC) |
c8b7aaab PNA |
287 | |
288 | f.close() | |
289 | return tests, passed | |
290 | ||
291 | ||
292 | def show_missing(): | |
293 | ''' | |
294 | Show the list of missing test files | |
295 | ''' | |
296 | file_list = os.listdir(EXTENSIONS_PATH) | |
297 | testfiles = [i for i in file_list if i.endswith('.t')] | |
298 | libfiles = [i for i in file_list | |
299 | if i.startswith('lib') and i.endswith('.c')] | |
300 | ||
301 | def test_name(x): | |
302 | return x[0:-2] + '.t' | |
303 | missing = [test_name(i) for i in libfiles | |
304 | if not test_name(i) in testfiles] | |
305 | ||
886b00b2 | 306 | print('\n'.join(missing)) |
c8b7aaab PNA |
307 | |
308 | ||
309 | # | |
310 | # main | |
311 | # | |
312 | def main(): | |
313 | parser = argparse.ArgumentParser(description='Run iptables tests') | |
314 | parser.add_argument('filename', nargs='?', | |
315 | metavar='path/to/file.t', | |
316 | help='Run only this test') | |
be70918e FW |
317 | parser.add_argument('-l', '--legacy', action='store_true', |
318 | help='Test iptables-legacy') | |
c8b7aaab PNA |
319 | parser.add_argument('-m', '--missing', action='store_true', |
320 | help='Check for missing tests') | |
d7ac61b5 FW |
321 | parser.add_argument('-n', '--nftables', action='store_true', |
322 | help='Test iptables-over-nftables') | |
0123183f PNA |
323 | parser.add_argument('-N', '--netns', action='store_true', |
324 | help='Test netnamespace path') | |
c8b7aaab PNA |
325 | args = parser.parse_args() |
326 | ||
327 | # | |
328 | # show list of missing test files | |
329 | # | |
330 | if args.missing: | |
331 | show_missing() | |
332 | return | |
333 | ||
d7ac61b5 | 334 | global EXECUTEABLE |
be70918e | 335 | EXECUTEABLE = "xtables-legacy-multi" |
d7ac61b5 | 336 | if args.nftables: |
be70918e | 337 | EXECUTEABLE = "xtables-nft-multi" |
d7ac61b5 | 338 | |
c8b7aaab | 339 | if os.getuid() != 0: |
886b00b2 | 340 | print("You need to be root to run this, sorry") |
c8b7aaab PNA |
341 | return |
342 | ||
d7ac61b5 FW |
343 | os.putenv("XTABLES_LIBDIR", os.path.abspath(EXTENSIONS_PATH)) |
344 | os.putenv("PATH", "%s/iptables:%s" % (os.path.abspath(os.path.curdir), os.getenv("PATH"))) | |
345 | ||
c8b7aaab PNA |
346 | test_files = 0 |
347 | tests = 0 | |
348 | passed = 0 | |
349 | ||
350 | # setup global var log file | |
351 | global log_file | |
352 | try: | |
353 | log_file = open(LOGFILE, 'w') | |
354 | except IOError: | |
886b00b2 | 355 | print("Couldn't open log file %s" % LOGFILE) |
c8b7aaab PNA |
356 | return |
357 | ||
358 | file_list = [os.path.join(EXTENSIONS_PATH, i) | |
359 | for i in os.listdir(EXTENSIONS_PATH)] | |
360 | if args.filename: | |
361 | file_list = [args.filename] | |
362 | for filename in file_list: | |
0123183f | 363 | file_tests, file_passed = run_test_file(filename, args.netns) |
c8b7aaab PNA |
364 | if file_tests: |
365 | tests += file_tests | |
366 | passed += file_passed | |
367 | test_files += 1 | |
368 | ||
886b00b2 | 369 | print("%d test files, %d unit tests, %d passed" % (test_files, tests, passed)) |
c8b7aaab PNA |
370 | |
371 | ||
372 | if __name__ == '__main__': | |
373 | main() |