]> git.ipfire.org Git - nitsi.git/blob - test.py
Remove unecessary inspect
[nitsi.git] / test.py
1 #!/usr/bin/python3
2
3 import serial
4
5 import re
6 from time import sleep
7 import sys
8
9 import libvirt
10
11 import xml.etree.ElementTree as ET
12
13 import os
14
15 import configparser
16
17 class log():
18 def __init__(self, log_level):
19 self.log_level = log_level
20
21 def debug(self, string):
22 if self.log_level >= 4:
23 print("DEBUG: {}".format(string))
24
25 def error(self, string):
26 print("ERROR: {}".format(string))
27
28 class libvirt_con():
29 def __init__(self, uri):
30 self.log = log(4)
31 self.uri = uri
32 self.connection = None
33
34 def get_domain_from_name(self, name):
35 dom = self.con.lookupByName(name)
36
37 if dom == None:
38 raise BaseException
39 return dom
40
41 @property
42 def con(self):
43 if self.connection == None:
44 try:
45 self.connection = libvirt.open(self.uri)
46 except BaseException as error:
47 self.log.error("Could not connect to: {}".format(self.uri))
48
49 self.log.debug("Connected to: {}".format(self.uri))
50 return self.connection
51
52 return self.connection
53
54
55 class vm():
56 def __init__(self, vm_xml_file, snapshot_xml_file, image, root_uid):
57 self.log = log(4)
58 self.con = libvirt_con("qemu:///system")
59 try:
60 with open(vm_xml_file) as fobj:
61 self.vm_xml = fobj.read()
62 except FileNotFoundError as error:
63 self.log.error("No such file: {}".format(vm_xml_file))
64
65 try:
66 with open(snapshot_xml_file) as fobj:
67 self.snapshot_xml = fobj.read()
68 except FileNotFoundError as error:
69 self.log.error("No such file: {}".format(snapshot_xml_file))
70
71 self.image = image
72
73 if not os.path.isfile(self.image):
74 self.log.error("No such file: {}".format(self.image))
75
76 self.root_uid = root_uid
77
78 def define(self):
79 self.dom = self.con.con.defineXML(self.vm_xml)
80 if self.dom == None:
81 self.log.error("Could not define VM")
82 raise BaseException
83
84 def start(self):
85 if self.dom.create() < 0:
86 self.log.error("Could not start VM")
87 raise BaseException
88
89 def shutdown(self):
90 if self.is_running():
91 if self.dom.shutdown() < 0:
92 self.log.error("Could not shutdown VM")
93 raise BaseException
94 else:
95 self.log.error("Domain is not running")
96
97 def undefine(self):
98 self.dom.undefine()
99
100 def create_snapshot(self):
101
102 self.snapshot = self.dom.snapshotCreateXML(self.snapshot_xml)
103
104 if not self.snapshot:
105 self.log.error("Could not create snapshot")
106 raise BaseException
107
108 def revert_snapshot(self):
109 self.dom.revertToSnapshot(self.snapshot)
110 self.snapshot.delete()
111
112 def is_running(self):
113
114 state, reason = self.dom.state()
115
116 if state == libvirt.VIR_DOMAIN_RUNNING:
117 return True
118 else:
119 return False
120
121 def get_serial_device(self):
122
123 if not self.is_running():
124 raise BaseException
125
126 xml_root = ET.fromstring(self.dom.XMLDesc(0))
127
128 elem = xml_root.find("./devices/serial/source")
129 return elem.get("path")
130
131 def check_is_booted_up(self):
132 serial_con = connection(self.get_serial_device())
133
134 serial_con.write("\n")
135 # This will block till the domain is booted up
136 serial_con.read(1)
137
138 #serial_con.close()
139
140 def login(self, username, password):
141 try:
142 self.serial_con = connection(self.get_serial_device(), username="root")
143 self.serial_con.login("25814@root")
144 except BaseException as e:
145 self.log.error("Could not connect to the domain via serial console")
146
147 def cmd(self, cmd):
148 return self.serial_con.command(cmd)
149
150
151 class connection():
152 def __init__(self, device, username=None):
153 self.buffer = b""
154 self.back_at_prompt_pattern = None
155 self.username = username
156 self.log = log(1)
157 self.con = serial.Serial(device)
158
159 def read(self, size=1):
160 if len(self.buffer) >= size:
161 # throw away first size bytes in buffer
162 data = self.buffer[:size]
163 # Set the buffer to the non used bytes
164 self.buffer = self.buffer[size:]
165 return data
166 else:
167 data = self.buffer
168 # Set the size to the value we have to read now
169 size = size - len(self.buffer)
170 # Set the buffer empty
171 self.buffer = b""
172 return data + self.con.read(size)
173
174 def peek(self, size=1):
175 if len(self.buffer) <= size:
176 self.buffer += self.con.read(size=size - len(self.buffer))
177
178 return self.buffer[:size]
179
180 def readline(self):
181 self.log.debug(self.buffer)
182 self.buffer = self.buffer + self.con.read(self.con.in_waiting)
183 if b"\n" in self.buffer:
184 size = self.buffer.index(b"\n") + 1
185 self.log.debug("We have a whole line in the buffer")
186 self.log.debug(self.buffer)
187 self.log.debug("We split at {}".format(size))
188 data = self.buffer[:size]
189 self.buffer = self.buffer[size:]
190 self.log.debug(data)
191 self.log.debug(self.buffer)
192 return data
193
194 data = self.buffer
195 self.buffer = b""
196 return data + self.con.readline()
197
198 def back_at_prompt(self):
199 data = self.peek()
200 if not data == b"[":
201 return False
202
203 # We need to use self.in_waiting because with self.con.in_waiting we get
204 # not the complete string
205 size = len(self.buffer) + self.in_waiting
206 data = self.peek(size)
207
208
209 if self.back_at_prompt_pattern == None:
210 #self.back_at_prompt_pattern = r"^\[{}@.+\]#".format(self.username)
211 self.back_at_prompt_pattern = re.compile(r"^\[{}@.+\]#".format(self.username), re.MULTILINE)
212
213 if self.back_at_prompt_pattern.search(data.decode()):
214 return True
215 else:
216 return False
217
218 def log_console_line(self, line):
219 self.log.debug("Get in function log_console_line()")
220 sys.stdout.write(line)
221
222 @property
223 def in_waiting(self):
224 in_waiting_before = 0
225 sleep(0.5)
226
227 while in_waiting_before != self.con.in_waiting:
228 in_waiting_before = self.con.in_waiting
229 sleep(0.5)
230
231 return self.con.in_waiting
232
233
234 def readline2(self, pattern=None):
235 string = ""
236 string2 = b""
237 if pattern:
238 pattern = re.compile(pattern)
239
240 while 1:
241 char = self.con.read(1)
242 string = string + char.decode("utf-8")
243 string2 = string2 + char
244 #print(char)
245 print(char.decode("utf-8"), end="")
246
247 #print(string2)
248 if pattern and pattern.match(string):
249 #print("get here1")
250 #print(string2)
251 return {"string" : string, "return-code" : 1}
252
253 if char == b"\n":
254 #print(char)
255 #print(string2)
256 #print("get here2")
257 return {"return-code" : 0}
258
259 def check_logged_in(self, username):
260 pattern = "^\[" + username + "@.+\]#"
261 data = self.readline(pattern=pattern)
262 if data["return-code"] == 1:
263 print("We are logged in")
264 return True
265 else:
266 print("We are not logged in")
267 return False
268
269 def login(self, password):
270 if self.username == None:
271 self.log.error("Username cannot be blank")
272 return False
273
274 # Hit enter to see what we get
275 self.con.write(b'\n')
276 # We get two new lines \r\n ?
277 data = self.readline()
278 self.log_console_line(data.decode())
279
280
281 if self.back_at_prompt():
282 self.log.debug("We are already logged in.")
283 return True
284
285 # Read all line till we get login:
286 while 1:
287 data = self.peek()
288 if not data.decode() == "l":
289 self.log.debug("We get no l at the start")
290 self.log_console_line(self.readline().decode())
291
292 # We need to use self.in_waiting because with self.con.in_waiting we get
293 # not the complete string
294 size = len(self.buffer) + self.in_waiting
295 data = self.peek(size)
296
297 pattern = r"^.*login: "
298 pattern = re.compile(pattern)
299
300 if pattern.search(data.decode()):
301 break
302 else:
303 self.log.debug("The pattern does not match")
304 self.log_console_line(self.readline().decode())
305
306 # We can login
307 string = "{}\n".format(self.username)
308 self.con.write(string.encode())
309 self.con.flush()
310 # read the login out of the buffer
311 data = self.readline()
312 self.log.debug("This is the login:{}".format(data))
313 self.log_console_line(data.decode())
314
315 # We need to wait her till we get the full string "Password:"
316 #This is useless but self.in_waiting will wait the correct amount of time
317 size = self.in_waiting
318
319 string = "{}\n".format(password)
320 self.con.write(string.encode())
321 self.con.flush()
322 # Print the 'Password:' line
323 data = self.readline()
324 self.log_console_line(data.decode())
325
326 while not self.back_at_prompt():
327 # This will fail if the login failed so we need to look for the failed keyword
328 data = self.readline()
329 self.log_console_line(data.decode())
330
331 return True
332
333 def write(self, string):
334 self.log.debug(string)
335 self.con.write(string.encode())
336 self.con.flush()
337
338 def command(self, command):
339 self.write("{}\n".format(command))
340
341 # We need to read out the prompt for this command first
342 # If we do not do this we will break the loop immediately
343 # because the prompt for this command is still in the buffer
344 data = self.readline()
345 self.log_console_line(data.decode())
346
347 while not self.back_at_prompt():
348 data = self.readline()
349 self.log_console_line(data.decode())
350
351
352 # A class which define and undefine a virtual network based on an xml file
353 class network():
354 def __init__(self, network_xml_file):
355 self.log = log(4)
356 self.con = libvirt_con("qemu:///system")
357 try:
358 with open(network_xml_file) as fobj:
359 self.network_xml = fobj.read()
360 except FileNotFoundError as error:
361 self.log.error("No such file: {}".format(vm_xml_file))
362
363 def define(self):
364 self.network = self.con.con.networkDefineXML(self.network_xml)
365
366 if network == None:
367 self.log.error("Failed to define virtual network")
368
369 def start(self):
370 self.network.create()
371
372 def undefine(self):
373 self.network.destroy()
374
375
376
377 class RecipeExeption(Exception):
378 pass
379
380
381
382 # Should read the test, check if the syntax are valid
383 # and return tuples with the ( host, command ) structure
384 class recipe():
385 def __init__(self, path):
386 self.log = log(4)
387 self.recipe_file = path
388 self._recipe = None
389
390 if not os.path.isfile(self.recipe_file):
391 self.log.error("No such file: {}".format(self.recipe_file))
392
393 try:
394 with open(self.recipe_file) as fobj:
395 self.raw_recipe = fobj.readlines()
396 except FileNotFoundError as error:
397 self.log.error("No such file: {}".format(vm_xml_file))
398
399 @property
400 def recipe(self):
401 if not self._recipe:
402 self.parse()
403
404 return self._recipe
405
406 def parse(self):
407 self._recipe = []
408 i = 1
409 for line in self.raw_recipe:
410 raw_line = line.split(":")
411 if len(raw_line) < 2:
412 self.log.error("Error parsing the recipe in line {}".format(i))
413 raise RecipeExeption
414 cmd = raw_line[1]
415 raw_line = raw_line[0].strip().split(" ")
416 if len(raw_line) == 0:
417 self.log.error("Failed to parse the recipe in line {}".format(i))
418 raise RecipeExeption
419 elif len(raw_line) == 1:
420 if raw_line[0] == "":
421 self.log.error("Failed to parse the recipe in line {}".format(i))
422 raise RecipeExeption
423 machine = raw_line[0]
424 extra = ""
425 elif len(raw_line) == 2:
426 machine = raw_line[0]
427 extra = raw_line[1]
428
429 self._recipe.append((machine.strip(), extra.strip(), cmd.strip()))
430 i = i + 1
431
432
433 class test():
434 def __init__(self, path):
435 self.log = log(4)
436 try:
437 self.path = os.path.abspath(path)
438 except BaseException as e:
439 self.log.error("Could not get absolute path")
440
441 self.log.debug(self.path)
442
443 self.settings_file = "{}/settings".format(self.path)
444 if not os.path.isfile(self.settings_file):
445 self.log.error("No such file: {}".format(self.settings_file))
446
447 self.recipe_file = "{}/recipe".format(self.path)
448 if not os.path.isfile(self.recipe_file):
449 self.log.error("No such file: {}".format(self.recipe_file))
450
451 def read_settings(self):
452 self.config = configparser.ConfigParser()
453 self.config.read(self.settings_file)
454 self.name = self.config["DEFAULT"]["Name"]
455 self.description = self.config["DEFAULT"]["Description"]
456
457 self.virtual_environ_name = self.config["VIRTUAL_ENVIRONMENT"]["Name"]
458 self.virtual_environ_path = self.config["VIRTUAL_ENVIRONMENT"]["Path"]
459 self.virtual_environ_path = os.path.normpath(self.path + "/" + self.virtual_environ_path)
460
461 def virtual_environ_setup(self):
462 self.virtual_environ = virtual_environ(self.virtual_environ_path)
463
464 self.virtual_networks = self.virtual_environ.get_networks()
465
466 self.virtual_machines = self.virtual_environ.get_machines()
467
468 def virtual_environ_start(self):
469 pass
470
471 def load_recipe(self):
472 pass
473
474 def run_recipe():
475 pass
476
477 def virtual_environ_stop():
478 pass
479
480
481 # Should return all vms and networks in a list
482 # and should provide the path to the necessary xml files
483 class virtual_environ():
484 def __init__(self, path):
485 self.log = log(4)
486 try:
487 self.path = os.path.abspath(path)
488 except BaseException as e:
489 self.log.error("Could not get absolute path")
490
491 self.log.debug(self.path)
492
493 self.settings_file = "{}/settings".format(self.path)
494 if not os.path.isfile(self.settings_file):
495 self.log.error("No such file: {}".format(self.settings_file))
496
497 self.log.debug(self.settings_file)
498 self.config = configparser.ConfigParser()
499 self.config.read(self.settings_file)
500 self.name = self.config["DEFAULT"]["name"]
501 self.machines_string = self.config["DEFAULT"]["machines"]
502 self.networks_string = self.config["DEFAULT"]["networks"]
503
504 self.machines = []
505 for machine in self.machines_string.split(","):
506 self.machines.append(machine.strip())
507
508 self.networks = []
509 for network in self.networks_string.split(","):
510 self.networks.append(network.strip())
511
512 self.log.debug(self.machines)
513 self.log.debug(self.networks)
514
515 def get_networks(self):
516 networks = {}
517 for _network in self.networks:
518 self.log.debug(_network)
519 networks.setdefault(_network, network(os.path.normpath(self.path + "/" + self.config[_network]["xml_file"])))
520 return networks
521
522 def get_machines(self):
523 machines = {}
524 for _machine in self.machines:
525 self.log.debug(_machine)
526 machines.setdefault(_machine, vm(
527 os.path.normpath(self.path + "/" + self.config[_machine]["xml_file"]),
528 os.path.normpath(self.path + "/" + self.config[_machine]["snapshot_xml_file"])))
529
530 return machines
531
532
533 if __name__ == "__main__":
534 import argparse
535
536 parser = argparse.ArgumentParser()
537
538 parser.add_argument("-d", "--directory", dest="dir")
539
540 args = parser.parse_args()
541
542 _recipe = recipe("/home/jonatan/python-testing-kvm/test/recipe")
543 currenttest = test(args.dir)
544 currenttest.read_settings()
545 currenttest.virtual_environ_setup()