]> git.ipfire.org Git - nitsi.git/blame_incremental - test.py
Add network class
[nitsi.git] / test.py
... / ...
CommitLineData
1#!/usr/bin/python3
2
3import serial
4
5import re
6from time import sleep
7import sys
8
9import libvirt
10
11import xml.etree.ElementTree as ET
12
13import inspect
14import os
15
16import configparser
17
18class log():
19 def __init__(self, log_level):
20 self.log_level = log_level
21
22 def debug(self, string):
23 if self.log_level >= 4:
24 print("DEBUG: {}".format(string))
25
26 def error(self, string):
27 print("ERROR: {}".format(string))
28
29class libvirt_con():
30 def __init__(self, uri):
31 self.log = log(4)
32 self.uri = uri
33 self.connection = None
34
35 def get_domain_from_name(self, name):
36 dom = self.con.lookupByName(name)
37
38 if dom == None:
39 raise BaseException
40 return dom
41
42 @property
43 def con(self):
44 if self.connection == None:
45 try:
46 self.connection = libvirt.open(self.uri)
47 except BaseException as error:
48 self.log.error("Could not connect to: {}".format(self.uri))
49
50 self.log.debug("Connected to: {}".format(self.uri))
51 return self.connection
52
53 return self.connection
54
55
56class vm():
57 def __init__(self, vm_xml_file, snapshot_xml_file, image, root_uid):
58 self.log = log(4)
59 self.con = libvirt_con("qemu:///system")
60 try:
61 with open(vm_xml_file) as fobj:
62 self.vm_xml = fobj.read()
63 except FileNotFoundError as error:
64 self.log.error("No such file: {}".format(vm_xml_file))
65
66 try:
67 with open(snapshot_xml_file) as fobj:
68 self.snapshot_xml = fobj.read()
69 except FileNotFoundError as error:
70 self.log.error("No such file: {}".format(snapshot_xml_file))
71
72 self.image = image
73
74 if not os.path.isfile(self.image):
75 self.log.error("No such file: {}".format(self.image))
76
77 self.root_uid = root_uid
78
79 def define(self):
80 self.dom = self.con.con.defineXML(self.vm_xml)
81 if self.dom == None:
82 self.log.error("Could not define VM")
83 raise BaseException
84
85 def start(self):
86 if self.dom.create() < 0:
87 self.log.error("Could not start VM")
88 raise BaseException
89
90 def shutdown(self):
91 if self.is_running():
92 if self.dom.shutdown() < 0:
93 self.log.error("Could not shutdown VM")
94 raise BaseException
95 else:
96 self.log.error("Domain is not running")
97
98 def undefine(self):
99 self.dom.undefine()
100
101 def create_snapshot(self):
102
103 self.snapshot = self.dom.snapshotCreateXML(self.snapshot_xml)
104
105 if not self.snapshot:
106 self.log.error("Could not create snapshot")
107 raise BaseException
108
109 def revert_snapshot(self):
110 print(inspect.getmembers(self.dom, predicate=inspect.ismethod))
111 self.dom.revertToSnapshot(self.snapshot)
112 self.snapshot.delete()
113
114 def is_running(self):
115
116 state, reason = self.dom.state()
117
118 if state == libvirt.VIR_DOMAIN_RUNNING:
119 return True
120 else:
121 return False
122
123 def get_serial_device(self):
124
125 if not self.is_running():
126 raise BaseException
127
128 xml_root = ET.fromstring(self.dom.XMLDesc(0))
129
130 elem = xml_root.find("./devices/serial/source")
131 return elem.get("path")
132
133 def check_is_booted_up(self):
134 serial_con = connection(self.get_serial_device())
135
136 serial_con.write("\n")
137 # This will block till the domain is booted up
138 serial_con.read(1)
139
140 #serial_con.close()
141
142 def login(self, username, password):
143 try:
144 self.serial_con = connection(self.get_serial_device(), username="root")
145 self.serial_con.login("25814@root")
146 except BaseException as e:
147 self.log.error("Could not connect to the domain via serial console")
148
149 def cmd(self, cmd):
150 return self.serial_con.command(cmd)
151
152
153class connection():
154 def __init__(self, device, username=None):
155 self.buffer = b""
156 self.back_at_prompt_pattern = None
157 self.username = username
158 self.log = log(1)
159 self.con = serial.Serial(device)
160
161 def read(self, size=1):
162 if len(self.buffer) >= size:
163 # throw away first size bytes in buffer
164 data = self.buffer[:size]
165 # Set the buffer to the non used bytes
166 self.buffer = self.buffer[size:]
167 return data
168 else:
169 data = self.buffer
170 # Set the size to the value we have to read now
171 size = size - len(self.buffer)
172 # Set the buffer empty
173 self.buffer = b""
174 return data + self.con.read(size)
175
176 def peek(self, size=1):
177 if len(self.buffer) <= size:
178 self.buffer += self.con.read(size=size - len(self.buffer))
179
180 return self.buffer[:size]
181
182 def readline(self):
183 self.log.debug(self.buffer)
184 self.buffer = self.buffer + self.con.read(self.con.in_waiting)
185 if b"\n" in self.buffer:
186 size = self.buffer.index(b"\n") + 1
187 self.log.debug("We have a whole line in the buffer")
188 self.log.debug(self.buffer)
189 self.log.debug("We split at {}".format(size))
190 data = self.buffer[:size]
191 self.buffer = self.buffer[size:]
192 self.log.debug(data)
193 self.log.debug(self.buffer)
194 return data
195
196 data = self.buffer
197 self.buffer = b""
198 return data + self.con.readline()
199
200 def back_at_prompt(self):
201 data = self.peek()
202 if not data == b"[":
203 return False
204
205 # We need to use self.in_waiting because with self.con.in_waiting we get
206 # not the complete string
207 size = len(self.buffer) + self.in_waiting
208 data = self.peek(size)
209
210
211 if self.back_at_prompt_pattern == None:
212 #self.back_at_prompt_pattern = r"^\[{}@.+\]#".format(self.username)
213 self.back_at_prompt_pattern = re.compile(r"^\[{}@.+\]#".format(self.username), re.MULTILINE)
214
215 if self.back_at_prompt_pattern.search(data.decode()):
216 return True
217 else:
218 return False
219
220 def log_console_line(self, line):
221 self.log.debug("Get in function log_console_line()")
222 sys.stdout.write(line)
223
224 @property
225 def in_waiting(self):
226 in_waiting_before = 0
227 sleep(0.5)
228
229 while in_waiting_before != self.con.in_waiting:
230 in_waiting_before = self.con.in_waiting
231 sleep(0.5)
232
233 return self.con.in_waiting
234
235
236 def readline2(self, pattern=None):
237 string = ""
238 string2 = b""
239 if pattern:
240 pattern = re.compile(pattern)
241
242 while 1:
243 char = self.con.read(1)
244 string = string + char.decode("utf-8")
245 string2 = string2 + char
246 #print(char)
247 print(char.decode("utf-8"), end="")
248
249 #print(string2)
250 if pattern and pattern.match(string):
251 #print("get here1")
252 #print(string2)
253 return {"string" : string, "return-code" : 1}
254
255 if char == b"\n":
256 #print(char)
257 #print(string2)
258 #print("get here2")
259 return {"return-code" : 0}
260
261 def check_logged_in(self, username):
262 pattern = "^\[" + username + "@.+\]#"
263 data = self.readline(pattern=pattern)
264 if data["return-code"] == 1:
265 print("We are logged in")
266 return True
267 else:
268 print("We are not logged in")
269 return False
270
271 def login(self, password):
272 if self.username == None:
273 self.log.error("Username cannot be blank")
274 return False
275
276 # Hit enter to see what we get
277 self.con.write(b'\n')
278 # We get two new lines \r\n ?
279 data = self.readline()
280 self.log_console_line(data.decode())
281
282
283 if self.back_at_prompt():
284 self.log.debug("We are already logged in.")
285 return True
286
287 # Read all line till we get login:
288 while 1:
289 data = self.peek()
290 if not data.decode() == "l":
291 self.log.debug("We get no l at the start")
292 self.log_console_line(self.readline().decode())
293
294 # We need to use self.in_waiting because with self.con.in_waiting we get
295 # not the complete string
296 size = len(self.buffer) + self.in_waiting
297 data = self.peek(size)
298
299 pattern = r"^.*login: "
300 pattern = re.compile(pattern)
301
302 if pattern.search(data.decode()):
303 break
304 else:
305 self.log.debug("The pattern does not match")
306 self.log_console_line(self.readline().decode())
307
308 # We can login
309 string = "{}\n".format(self.username)
310 self.con.write(string.encode())
311 self.con.flush()
312 # read the login out of the buffer
313 data = self.readline()
314 self.log.debug("This is the login:{}".format(data))
315 self.log_console_line(data.decode())
316
317 # We need to wait her till we get the full string "Password:"
318 #This is useless but self.in_waiting will wait the correct amount of time
319 size = self.in_waiting
320
321 string = "{}\n".format(password)
322 self.con.write(string.encode())
323 self.con.flush()
324 # Print the 'Password:' line
325 data = self.readline()
326 self.log_console_line(data.decode())
327
328 while not self.back_at_prompt():
329 # This will fail if the login failed so we need to look for the failed keyword
330 data = self.readline()
331 self.log_console_line(data.decode())
332
333 return True
334
335 def write(self, string):
336 self.log.debug(string)
337 self.con.write(string.encode())
338 self.con.flush()
339
340 def command(self, command):
341 self.write("{}\n".format(command))
342
343 # We need to read out the prompt for this command first
344 # If we do not do this we will break the loop immediately
345 # because the prompt for this command is still in the buffer
346 data = self.readline()
347 self.log_console_line(data.decode())
348
349 while not self.back_at_prompt():
350 data = self.readline()
351 self.log_console_line(data.decode())
352
353
354# A class which define and undefine a virtual network based on an xml file
355class network():
356 def __init__(self, network_xml_file):
357 self.log = log(4)
358 self.con = libvirt_con("qemu:///system")
359 try:
360 with open(network_xml_file) as fobj:
361 self.network_xml = fobj.read()
362 except FileNotFoundError as error:
363 self.log.error("No such file: {}".format(vm_xml_file))
364
365 def define(self):
366 self.network = self.con.con.networkDefineXML(self.network_xml)
367
368 if network == None:
369 self.log.error("Failed to define virtual network")
370
371 def start(self):
372 self.network.create()
373
374 def undefine(self):
375 self.network.destroy()
376
377
378
379# Should read the test, check if the syntax are valid
380# and return tuples with the ( host, command ) structure
381class recipe():
382 def __init__(self, path):
383 self.log = log(4)
384 self.recipe_file = path
385 if not os.path.isfile(self.recipe_file):
386 self.log.error("No such file: {}".format(self.recipe_file))
387
388 try:
389 with open(self.recipe_file) as fobj:
390 self.raw_recipe = fobj.readlines()
391 except FileNotFoundError as error:
392 self.log.error("No such file: {}".format(vm_xml_file))
393
394 for line in self.raw_recipe:
395 print(line)
396
397
398class test():
399 def __init__(self, path):
400 self.log = log(4)
401 try:
402 self.path = os.path.abspath(path)
403 except BaseException as e:
404 self.log.error("Could not get absolute path")
405
406 self.log.debug(self.path)
407
408 self.settings_file = "{}/settings".format(self.path)
409 if not os.path.isfile(self.settings_file):
410 self.log.error("No such file: {}".format(self.settings_file))
411
412 self.recipe_file = "{}/recipe".format(self.path)
413 if not os.path.isfile(self.recipe_file):
414 self.log.error("No such file: {}".format(self.recipe_file))
415
416 def read_settings(self):
417 self.config = configparser.ConfigParser()
418 self.config.read(self.settings_file)
419 self.name = self.config["DEFAULT"]["Name"]
420 self.description = self.config["DEFAULT"]["Description"]
421
422 self.virtual_environ_name = self.config["VIRTUAL_ENVIRONMENT"]["Name"]
423 self.virtual_environ_path = self.config["VIRTUAL_ENVIRONMENT"]["Path"]
424 self.virtual_environ_path = os.path.normpath(self.path + "/" + self.virtual_environ_path)
425
426 def virtual_environ_setup(self):
427 self.virtual_environ = virtual_environ(self.virtual_environ_path)
428
429 self.virtual_networks = self.virtual_environ.get_networks()
430
431 self.virtual_machines = self.virtual_environ.get_machines()
432
433 def virtual_environ_start(self):
434 pass
435
436 def load_recipe(self):
437 pass
438
439 def run_recipe():
440 pass
441
442 def virtual_environ_stop():
443 pass
444
445
446# Should return all vms and networks in a list
447# and should provide the path to the necessary xml files
448class virtual_environ():
449 def __init__(self, path):
450 self.log = log(4)
451 try:
452 self.path = os.path.abspath(path)
453 except BaseException as e:
454 self.log.error("Could not get absolute path")
455
456 self.log.debug(self.path)
457
458 self.settings_file = "{}/settings".format(self.path)
459 if not os.path.isfile(self.settings_file):
460 self.log.error("No such file: {}".format(self.settings_file))
461
462 self.log.debug(self.settings_file)
463 self.config = configparser.ConfigParser()
464 self.config.read(self.settings_file)
465 self.name = self.config["DEFAULT"]["name"]
466 self.machines_string = self.config["DEFAULT"]["machines"]
467 self.networks_string = self.config["DEFAULT"]["networks"]
468
469 self.machines = []
470 for machine in self.machines_string.split(","):
471 self.machines.append(machine.strip())
472
473 self.networks = []
474 for network in self.networks_string.split(","):
475 self.networks.append(network.strip())
476
477 self.log.debug(self.machines)
478 self.log.debug(self.networks)
479
480 def get_networks(self):
481 networks = {}
482 for _network in self.networks:
483 self.log.debug(_network)
484 networks.setdefault(_network, network(os.path.normpath(self.path + "/" + self.config[_network]["xml_file"])))
485 return networks
486
487 def get_machines(self):
488 machines = {}
489 for _machine in self.machines:
490 self.log.debug(_machine)
491 machines.setdefault(_machine, vm(
492 os.path.normpath(self.path + "/" + self.config[_machine]["xml_file"]),
493 os.path.normpath(self.path + "/" + self.config[_machine]["snapshot_xml_file"])))
494
495 return machines
496
497
498if __name__ == "__main__":
499 import argparse
500
501 parser = argparse.ArgumentParser()
502
503 parser.add_argument("-d", "--directory", dest="dir")
504
505 args = parser.parse_args()
506
507 _recipe = recipe("/home/jonatan/python-testing-kvm/test/recipe")
508 currenttest = test(args.dir)
509 currenttest.read_settings()
510 currenttest.virtual_environ_setup()