]> git.ipfire.org Git - thirdparty/iptables.git/blob - iptables-test.py
xtables-save: Unify *-save header/footer comments
[thirdparty/iptables.git] / iptables-test.py
1 #!/usr/bin/env python
2 #
3 # (C) 2012-2013 by Pablo Neira Ayuso <pablo@netfilter.org>
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
9 #
10 # This software has been sponsored by Sophos Astaro <http://www.sophos.com>
11 #
12
13 from __future__ import print_function
14 import sys
15 import os
16 import subprocess
17 import argparse
18
19 IPTABLES = "iptables"
20 IP6TABLES = "ip6tables"
21 ARPTABLES = "arptables"
22 EBTABLES = "ebtables"
23
24 IPTABLES_SAVE = "iptables-save"
25 IP6TABLES_SAVE = "ip6tables-save"
26 ARPTABLES_SAVE = "arptables-save"
27 EBTABLES_SAVE = "ebtables-save"
28 #IPTABLES_SAVE = ['xtables-save','-4']
29 #IP6TABLES_SAVE = ['xtables-save','-6']
30
31 EXTENSIONS_PATH = "extensions"
32 LOGFILE="/tmp/iptables-test.log"
33 log_file = None
34
35
36 class Colors:
37 HEADER = '\033[95m'
38 BLUE = '\033[94m'
39 GREEN = '\033[92m'
40 YELLOW = '\033[93m'
41 RED = '\033[91m'
42 ENDC = '\033[0m'
43
44
45 def print_error(reason, filename=None, lineno=None):
46 '''
47 Prints an error with nice colors, indicating file and line number.
48 '''
49 print(filename + ": " + Colors.RED + "ERROR" +
50 Colors.ENDC + ": line %d (%s)" % (lineno, reason))
51
52
53 def delete_rule(iptables, rule, filename, lineno):
54 '''
55 Removes an iptables rule
56 '''
57 cmd = iptables + " -D " + rule
58 ret = execute_cmd(cmd, filename, lineno)
59 if ret == 1:
60 reason = "cannot delete: " + iptables + " -I " + rule
61 print_error(reason, filename, lineno)
62 return -1
63
64 return 0
65
66
67 def run_test(iptables, rule, rule_save, res, filename, lineno, netns):
68 '''
69 Executes an unit test. Returns the output of delete_rule().
70
71 Parameters:
72 :param iptables: string with the iptables command to execute
73 :param rule: string with iptables arguments for the rule to test
74 :param rule_save: string to find the rule in the output of iptables -save
75 :param res: expected result of the rule. Valid values: "OK", "FAIL"
76 :param filename: name of the file tested (used for print_error purposes)
77 :param lineno: line number being tested (used for print_error purposes)
78 '''
79 ret = 0
80
81 cmd = iptables + " -A " + rule
82 if netns:
83 cmd = "ip netns exec ____iptables-container-test " + EXECUTEABLE + " " + cmd
84
85 ret = execute_cmd(cmd, filename, lineno)
86
87 #
88 # report failed test
89 #
90 if ret:
91 if res == "OK":
92 reason = "cannot load: " + cmd
93 print_error(reason, filename, lineno)
94 return -1
95 else:
96 # do not report this error
97 return 0
98 else:
99 if res == "FAIL":
100 reason = "should fail: " + cmd
101 print_error(reason, filename, lineno)
102 delete_rule(iptables, rule, filename, lineno)
103 return -1
104
105 matching = 0
106 splitted = iptables.split(" ")
107 if len(splitted) == 2:
108 if splitted[1] == '-4':
109 command = IPTABLES_SAVE
110 elif splitted[1] == '-6':
111 command = IP6TABLES_SAVE
112 elif len(splitted) == 1:
113 if splitted[0] == IPTABLES:
114 command = IPTABLES_SAVE
115 elif splitted[0] == IP6TABLES:
116 command = IP6TABLES_SAVE
117 elif splitted[0] == ARPTABLES:
118 command = ARPTABLES_SAVE
119 elif splitted[0] == EBTABLES:
120 command = EBTABLES_SAVE
121
122 path = os.path.abspath(os.path.curdir) + "/iptables/" + EXECUTEABLE
123 command = path + " " + command
124
125 if netns:
126 command = "ip netns exec ____iptables-container-test " + command
127
128 args = splitted[1:]
129 proc = subprocess.Popen(command, shell=True,
130 stdin=subprocess.PIPE,
131 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
132 out, err = proc.communicate()
133
134 #
135 # check for segfaults
136 #
137 if proc.returncode == -11:
138 reason = "iptables-save segfaults: " + cmd
139 print_error(reason, filename, lineno)
140 delete_rule(iptables, rule, filename, lineno)
141 return -1
142
143 # find the rule
144 matching = out.find(rule_save.encode('utf-8'))
145 if matching < 0:
146 reason = "cannot find: " + iptables + " -I " + rule
147 print_error(reason, filename, lineno)
148 delete_rule(iptables, rule, filename, lineno)
149 return -1
150
151 # Test "ip netns del NETNS" path with rules in place
152 if netns:
153 return 0
154
155 return delete_rule(iptables, rule, filename, lineno)
156
157 def execute_cmd(cmd, filename, lineno):
158 '''
159 Executes a command, checking for segfaults and returning the command exit
160 code.
161
162 :param cmd: string with the command to be executed
163 :param filename: name of the file tested (used for print_error purposes)
164 :param lineno: line number being tested (used for print_error purposes)
165 '''
166 global log_file
167 if cmd.startswith('iptables ') or cmd.startswith('ip6tables ') or cmd.startswith('ebtables ') or cmd.startswith('arptables '):
168 cmd = os.path.abspath(os.path.curdir) + "/iptables/" + EXECUTEABLE + " " + cmd
169
170 print("command: {}".format(cmd), file=log_file)
171 ret = subprocess.call(cmd, shell=True, universal_newlines=True,
172 stderr=subprocess.STDOUT, stdout=log_file)
173 log_file.flush()
174
175 # generic check for segfaults
176 if ret == -11:
177 reason = "command segfaults: " + cmd
178 print_error(reason, filename, lineno)
179 return ret
180
181
182 def run_test_file(filename, netns):
183 '''
184 Runs a test file
185
186 :param filename: name of the file with the test rules
187 '''
188 #
189 # if this is not a test file, skip.
190 #
191 if not filename.endswith(".t"):
192 return 0, 0
193
194 if "libipt_" in filename:
195 iptables = IPTABLES
196 elif "libip6t_" in filename:
197 iptables = IP6TABLES
198 elif "libxt_" in filename:
199 iptables = IPTABLES
200 elif "libarpt_" in filename:
201 # only supported with nf_tables backend
202 if EXECUTEABLE != "xtables-nft-multi":
203 return 0, 0
204 iptables = ARPTABLES
205 elif "libebt_" in filename:
206 # only supported with nf_tables backend
207 if EXECUTEABLE != "xtables-nft-multi":
208 return 0, 0
209 iptables = EBTABLES
210 else:
211 # default to iptables if not known prefix
212 iptables = IPTABLES
213
214 f = open(filename)
215
216 tests = 0
217 passed = 0
218 table = ""
219 total_test_passed = True
220
221 if netns:
222 execute_cmd("ip netns add ____iptables-container-test", filename, 0)
223
224 for lineno, line in enumerate(f):
225 if line[0] == "#":
226 continue
227
228 if line[0] == ":":
229 chain_array = line.rstrip()[1:].split(",")
230 continue
231
232 # external non-iptables invocation, executed as is.
233 if line[0] == "@":
234 external_cmd = line.rstrip()[1:]
235 if netns:
236 external_cmd = "ip netns exec ____iptables-container-test " + external_cmd
237 execute_cmd(external_cmd, filename, lineno)
238 continue
239
240 # external iptables invocation, executed as is.
241 if line[0] == "%":
242 external_cmd = line.rstrip()[1:]
243 if netns:
244 external_cmd = "ip netns exec ____iptables-container-test " + EXECUTEABLE + " " + external_cmd
245 execute_cmd(external_cmd, filename, lineno)
246 continue
247
248 if line[0] == "*":
249 table = line.rstrip()[1:]
250 continue
251
252 if len(chain_array) == 0:
253 print("broken test, missing chain, leaving")
254 sys.exit()
255
256 test_passed = True
257 tests += 1
258
259 for chain in chain_array:
260 item = line.split(";")
261 if table == "":
262 rule = chain + " " + item[0]
263 else:
264 rule = chain + " -t " + table + " " + item[0]
265
266 if item[1] == "=":
267 rule_save = chain + " " + item[0]
268 else:
269 rule_save = chain + " " + item[1]
270
271 res = item[2].rstrip()
272 ret = run_test(iptables, rule, rule_save,
273 res, filename, lineno + 1, netns)
274
275 if ret < 0:
276 test_passed = False
277 total_test_passed = False
278 break
279
280 if test_passed:
281 passed += 1
282
283 if netns:
284 execute_cmd("ip netns del ____iptables-container-test", filename, 0)
285 if total_test_passed:
286 print(filename + ": " + Colors.GREEN + "OK" + Colors.ENDC)
287
288 f.close()
289 return tests, passed
290
291
292 def show_missing():
293 '''
294 Show the list of missing test files
295 '''
296 file_list = os.listdir(EXTENSIONS_PATH)
297 testfiles = [i for i in file_list if i.endswith('.t')]
298 libfiles = [i for i in file_list
299 if i.startswith('lib') and i.endswith('.c')]
300
301 def test_name(x):
302 return x[0:-2] + '.t'
303 missing = [test_name(i) for i in libfiles
304 if not test_name(i) in testfiles]
305
306 print('\n'.join(missing))
307
308
309 #
310 # main
311 #
312 def main():
313 parser = argparse.ArgumentParser(description='Run iptables tests')
314 parser.add_argument('filename', nargs='?',
315 metavar='path/to/file.t',
316 help='Run only this test')
317 parser.add_argument('-l', '--legacy', action='store_true',
318 help='Test iptables-legacy')
319 parser.add_argument('-m', '--missing', action='store_true',
320 help='Check for missing tests')
321 parser.add_argument('-n', '--nftables', action='store_true',
322 help='Test iptables-over-nftables')
323 parser.add_argument('-N', '--netns', action='store_true',
324 help='Test netnamespace path')
325 args = parser.parse_args()
326
327 #
328 # show list of missing test files
329 #
330 if args.missing:
331 show_missing()
332 return
333
334 global EXECUTEABLE
335 EXECUTEABLE = "xtables-legacy-multi"
336 if args.nftables:
337 EXECUTEABLE = "xtables-nft-multi"
338
339 if os.getuid() != 0:
340 print("You need to be root to run this, sorry")
341 return
342
343 os.putenv("XTABLES_LIBDIR", os.path.abspath(EXTENSIONS_PATH))
344 os.putenv("PATH", "%s/iptables:%s" % (os.path.abspath(os.path.curdir), os.getenv("PATH")))
345
346 test_files = 0
347 tests = 0
348 passed = 0
349
350 # setup global var log file
351 global log_file
352 try:
353 log_file = open(LOGFILE, 'w')
354 except IOError:
355 print("Couldn't open log file %s" % LOGFILE)
356 return
357
358 file_list = [os.path.join(EXTENSIONS_PATH, i)
359 for i in os.listdir(EXTENSIONS_PATH)]
360 if args.filename:
361 file_list = [args.filename]
362 for filename in file_list:
363 file_tests, file_passed = run_test_file(filename, args.netns)
364 if file_tests:
365 tests += file_tests
366 passed += file_passed
367 test_files += 1
368
369 print("%d test files, %d unit tests, %d passed" % (test_files, tests, passed))
370
371
372 if __name__ == '__main__':
373 main()