]> git.ipfire.org Git - people/arne_f/kernel.git/blob - tools/testing/selftests/tc-testing/tdc.py
License cleanup: add SPDX GPL-2.0 license identifier to files with no license
[people/arne_f/kernel.git] / tools / testing / selftests / tc-testing / tdc.py
1 #!/usr/bin/env python3
2 # SPDX-License-Identifier: GPL-2.0
3
4 """
5 tdc.py - Linux tc (Traffic Control) unit test driver
6
7 Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com>
8 """
9
10 import re
11 import os
12 import sys
13 import argparse
14 import json
15 import subprocess
16 from collections import OrderedDict
17 from string import Template
18
19 from tdc_config import *
20 from tdc_helper import *
21
22
23 USE_NS = True
24
25
26 def replace_keywords(cmd):
27 """
28 For a given executable command, substitute any known
29 variables contained within NAMES with the correct values
30 """
31 tcmd = Template(cmd)
32 subcmd = tcmd.safe_substitute(NAMES)
33 return subcmd
34
35
36 def exec_cmd(command, nsonly=True):
37 """
38 Perform any required modifications on an executable command, then run
39 it in a subprocess and return the results.
40 """
41 if (USE_NS and nsonly):
42 command = 'ip netns exec $NS ' + command
43
44 if '$' in command:
45 command = replace_keywords(command)
46
47 proc = subprocess.Popen(command,
48 shell=True,
49 stdout=subprocess.PIPE,
50 stderr=subprocess.PIPE)
51 (rawout, serr) = proc.communicate()
52
53 if proc.returncode != 0:
54 foutput = serr.decode("utf-8")
55 else:
56 foutput = rawout.decode("utf-8")
57
58 proc.stdout.close()
59 proc.stderr.close()
60 return proc, foutput
61
62
63 def prepare_env(cmdlist):
64 """
65 Execute the setup/teardown commands for a test case. Optionally
66 terminate test execution if the command fails.
67 """
68 for cmdinfo in cmdlist:
69 if (type(cmdinfo) == list):
70 exit_codes = cmdinfo[1:]
71 cmd = cmdinfo[0]
72 else:
73 exit_codes = [0]
74 cmd = cmdinfo
75
76 if (len(cmd) == 0):
77 continue
78
79 (proc, foutput) = exec_cmd(cmd)
80
81 if proc.returncode not in exit_codes:
82 print
83 print("Could not execute:")
84 print(cmd)
85 print("\nError message:")
86 print(foutput)
87 print("\nAborting test run.")
88 ns_destroy()
89 exit(1)
90
91
92 def test_runner(filtered_tests):
93 """
94 Driver function for the unit tests.
95
96 Prints information about the tests being run, executes the setup and
97 teardown commands and the command under test itself. Also determines
98 success/failure based on the information in the test case and generates
99 TAP output accordingly.
100 """
101 testlist = filtered_tests
102 tcount = len(testlist)
103 index = 1
104 tap = str(index) + ".." + str(tcount) + "\n"
105
106 for tidx in testlist:
107 result = True
108 tresult = ""
109 print("Test " + tidx["id"] + ": " + tidx["name"])
110 prepare_env(tidx["setup"])
111 (p, procout) = exec_cmd(tidx["cmdUnderTest"])
112 exit_code = p.returncode
113
114 if (exit_code != int(tidx["expExitCode"])):
115 result = False
116 print("exit:", exit_code, int(tidx["expExitCode"]))
117 print(procout)
118 else:
119 match_pattern = re.compile(str(tidx["matchPattern"]), re.DOTALL)
120 (p, procout) = exec_cmd(tidx["verifyCmd"])
121 match_index = re.findall(match_pattern, procout)
122 if len(match_index) != int(tidx["matchCount"]):
123 result = False
124
125 if result == True:
126 tresult += "ok "
127 else:
128 tresult += "not ok "
129 tap += tresult + str(index) + " " + tidx["id"] + " " + tidx["name"] + "\n"
130
131 if result == False:
132 tap += procout
133
134 prepare_env(tidx["teardown"])
135 index += 1
136
137 return tap
138
139
140 def ns_create():
141 """
142 Create the network namespace in which the tests will be run and set up
143 the required network devices for it.
144 """
145 if (USE_NS):
146 cmd = 'ip netns add $NS'
147 exec_cmd(cmd, False)
148 cmd = 'ip link add $DEV0 type veth peer name $DEV1'
149 exec_cmd(cmd, False)
150 cmd = 'ip link set $DEV1 netns $NS'
151 exec_cmd(cmd, False)
152 cmd = 'ip link set $DEV0 up'
153 exec_cmd(cmd, False)
154 cmd = 'ip -s $NS link set $DEV1 up'
155 exec_cmd(cmd, False)
156
157
158 def ns_destroy():
159 """
160 Destroy the network namespace for testing (and any associated network
161 devices as well)
162 """
163 if (USE_NS):
164 cmd = 'ip netns delete $NS'
165 exec_cmd(cmd, False)
166
167
168 def has_blank_ids(idlist):
169 """
170 Search the list for empty ID fields and return true/false accordingly.
171 """
172 return not(all(k for k in idlist))
173
174
175 def load_from_file(filename):
176 """
177 Open the JSON file containing the test cases and return them as an
178 ordered dictionary object.
179 """
180 with open(filename) as test_data:
181 testlist = json.load(test_data, object_pairs_hook=OrderedDict)
182 idlist = get_id_list(testlist)
183 if (has_blank_ids(idlist)):
184 for k in testlist:
185 k['filename'] = filename
186 return testlist
187
188
189 def args_parse():
190 """
191 Create the argument parser.
192 """
193 parser = argparse.ArgumentParser(description='Linux TC unit tests')
194 return parser
195
196
197 def set_args(parser):
198 """
199 Set the command line arguments for tdc.
200 """
201 parser.add_argument('-p', '--path', type=str,
202 help='The full path to the tc executable to use')
203 parser.add_argument('-c', '--category', type=str, nargs='?', const='+c',
204 help='Run tests only from the specified category, or if no category is specified, list known categories.')
205 parser.add_argument('-f', '--file', type=str,
206 help='Run tests from the specified file')
207 parser.add_argument('-l', '--list', type=str, nargs='?', const="", metavar='CATEGORY',
208 help='List all test cases, or those only within the specified category')
209 parser.add_argument('-s', '--show', type=str, nargs=1, metavar='ID', dest='showID',
210 help='Display the test case with specified id')
211 parser.add_argument('-e', '--execute', type=str, nargs=1, metavar='ID',
212 help='Execute the single test case with specified ID')
213 parser.add_argument('-i', '--id', action='store_true', dest='gen_id',
214 help='Generate ID numbers for new test cases')
215 return parser
216 return parser
217
218
219 def check_default_settings(args):
220 """
221 Process any arguments overriding the default settings, and ensure the
222 settings are correct.
223 """
224 # Allow for overriding specific settings
225 global NAMES
226
227 if args.path != None:
228 NAMES['TC'] = args.path
229 if not os.path.isfile(NAMES['TC']):
230 print("The specified tc path " + NAMES['TC'] + " does not exist.")
231 exit(1)
232
233
234 def get_id_list(alltests):
235 """
236 Generate a list of all IDs in the test cases.
237 """
238 return [x["id"] for x in alltests]
239
240
241 def check_case_id(alltests):
242 """
243 Check for duplicate test case IDs.
244 """
245 idl = get_id_list(alltests)
246 return [x for x in idl if idl.count(x) > 1]
247
248
249 def does_id_exist(alltests, newid):
250 """
251 Check if a given ID already exists in the list of test cases.
252 """
253 idl = get_id_list(alltests)
254 return (any(newid == x for x in idl))
255
256
257 def generate_case_ids(alltests):
258 """
259 If a test case has a blank ID field, generate a random hex ID for it
260 and then write the test cases back to disk.
261 """
262 import random
263 for c in alltests:
264 if (c["id"] == ""):
265 while True:
266 newid = str('%04x' % random.randrange(16**4))
267 if (does_id_exist(alltests, newid)):
268 continue
269 else:
270 c['id'] = newid
271 break
272
273 ufilename = []
274 for c in alltests:
275 if ('filename' in c):
276 ufilename.append(c['filename'])
277 ufilename = get_unique_item(ufilename)
278 for f in ufilename:
279 testlist = []
280 for t in alltests:
281 if 'filename' in t:
282 if t['filename'] == f:
283 del t['filename']
284 testlist.append(t)
285 outfile = open(f, "w")
286 json.dump(testlist, outfile, indent=4)
287 outfile.close()
288
289
290 def get_test_cases(args):
291 """
292 If a test case file is specified, retrieve tests from that file.
293 Otherwise, glob for all json files in subdirectories and load from
294 each one.
295 """
296 import fnmatch
297 if args.file != None:
298 if not os.path.isfile(args.file):
299 print("The specified test case file " + args.file + " does not exist.")
300 exit(1)
301 flist = [args.file]
302 else:
303 flist = []
304 for root, dirnames, filenames in os.walk('tc-tests'):
305 for filename in fnmatch.filter(filenames, '*.json'):
306 flist.append(os.path.join(root, filename))
307 alltests = list()
308 for casefile in flist:
309 alltests = alltests + (load_from_file(casefile))
310 return alltests
311
312
313 def set_operation_mode(args):
314 """
315 Load the test case data and process remaining arguments to determine
316 what the script should do for this run, and call the appropriate
317 function.
318 """
319 alltests = get_test_cases(args)
320
321 if args.gen_id:
322 idlist = get_id_list(alltests)
323 if (has_blank_ids(idlist)):
324 alltests = generate_case_ids(alltests)
325 else:
326 print("No empty ID fields found in test files.")
327 exit(0)
328
329 duplicate_ids = check_case_id(alltests)
330 if (len(duplicate_ids) > 0):
331 print("The following test case IDs are not unique:")
332 print(str(set(duplicate_ids)))
333 print("Please correct them before continuing.")
334 exit(1)
335
336 ucat = get_test_categories(alltests)
337
338 if args.showID:
339 show_test_case_by_id(alltests, args.showID[0])
340 exit(0)
341
342 if args.execute:
343 target_id = args.execute[0]
344 else:
345 target_id = ""
346
347 if args.category:
348 if (args.category == '+c'):
349 print("Available categories:")
350 print_sll(ucat)
351 exit(0)
352 else:
353 target_category = args.category
354 else:
355 target_category = ""
356
357
358 testcases = get_categorized_testlist(alltests, ucat)
359
360 if args.list:
361 if (len(args.list) == 0):
362 list_test_cases(alltests)
363 exit(0)
364 elif(len(args.list > 0)):
365 if (args.list not in ucat):
366 print("Unknown category " + args.list)
367 print("Available categories:")
368 print_sll(ucat)
369 exit(1)
370 list_test_cases(testcases[args.list])
371 exit(0)
372
373 if (os.geteuid() != 0):
374 print("This script must be run with root privileges.\n")
375 exit(1)
376
377 ns_create()
378
379 if (len(target_category) == 0):
380 if (len(target_id) > 0):
381 alltests = list(filter(lambda x: target_id in x['id'], alltests))
382 if (len(alltests) == 0):
383 print("Cannot find a test case with ID matching " + target_id)
384 exit(1)
385 catresults = test_runner(alltests)
386 print("All test results: " + "\n\n" + catresults)
387 elif (len(target_category) > 0):
388 if (target_category not in ucat):
389 print("Specified category is not present in this file.")
390 exit(1)
391 else:
392 catresults = test_runner(testcases[target_category])
393 print("Category " + target_category + "\n\n" + catresults)
394
395 ns_destroy()
396
397
398 def main():
399 """
400 Start of execution; set up argument parser and get the arguments,
401 and start operations.
402 """
403 parser = args_parse()
404 parser = set_args(parser)
405 (args, remaining) = parser.parse_known_args()
406 check_default_settings(args)
407
408 set_operation_mode(args)
409
410 exit(0)
411
412
413 if __name__ == "__main__":
414 main()