]>
git.ipfire.org Git - people/ms/nitsi.git/blob - test.py
11 import xml
.etree
.ElementTree
as ET
19 def __init__(self
, log_level
):
20 self
.log_level
= log_level
22 def debug(self
, string
):
23 if self
.log_level
>= 4:
24 print("DEBUG: {}".format(string
))
26 def error(self
, string
):
27 print("ERROR: {}".format(string
))
30 def __init__(self
, uri
):
33 self
.connection
= None
35 def get_domain_from_name(self
, name
):
36 dom
= self
.con
.lookupByName(name
)
44 if self
.connection
== None:
46 self
.connection
= libvirt
.open(self
.uri
)
47 except BaseException
as error
:
48 self
.log
.error("Could not connect to: {}".format(self
.uri
))
50 self
.log
.debug("Connected to: {}".format(self
.uri
))
51 return self
.connection
53 return self
.connection
57 def __init__(self
, vm_xml_file
, snapshot_xml_file
, image
, root_uid
):
59 self
.con
= libvirt_con("qemu:///system")
61 with
open(vm_xml_file
) as fobj
:
62 self
.vm_xml
= fobj
.read()
63 except FileNotFoundError
as error
:
64 self
.log
.error("No such file: {}".format(vm_xml_file
))
67 with
open(snapshot_xml_file
) as fobj
:
68 self
.snapshot_xml
= fobj
.read()
69 except FileNotFoundError
as error
:
70 self
.log
.error("No such file: {}".format(snapshot_xml_file
))
74 if not os
.path
.isfile(self
.image
):
75 self
.log
.error("No such file: {}".format(self
.image
))
77 self
.root_uid
= root_uid
80 self
.dom
= self
.con
.con
.defineXML(self
.vm_xml
)
82 self
.log
.error("Could not define VM")
86 if self
.dom
.create() < 0:
87 self
.log
.error("Could not start VM")
92 if self
.dom
.shutdown() < 0:
93 self
.log
.error("Could not shutdown VM")
96 self
.log
.error("Domain is not running")
101 def create_snapshot(self
):
103 self
.snapshot
= self
.dom
.snapshotCreateXML(self
.snapshot_xml
)
105 if not self
.snapshot
:
106 self
.log
.error("Could not create snapshot")
109 def revert_snapshot(self
):
110 print(inspect
.getmembers(self
.dom
, predicate
=inspect
.ismethod
))
111 self
.dom
.revertToSnapshot(self
.snapshot
)
112 self
.snapshot
.delete()
114 def is_running(self
):
116 state
, reason
= self
.dom
.state()
118 if state
== libvirt
.VIR_DOMAIN_RUNNING
:
123 def get_serial_device(self
):
125 if not self
.is_running():
128 xml_root
= ET
.fromstring(self
.dom
.XMLDesc(0))
130 elem
= xml_root
.find("./devices/serial/source")
131 return elem
.get("path")
133 def check_is_booted_up(self
):
134 serial_con
= connection(self
.get_serial_device())
136 serial_con
.write("\n")
137 # This will block till the domain is booted up
142 def login(self
, username
, password
):
144 self
.serial_con
= connection(self
.get_serial_device(), username
="root")
145 self
.serial_con
.login("25814@root")
146 except BaseException
as e
:
147 self
.log
.error("Could not connect to the domain via serial console")
150 return self
.serial_con
.command(cmd
)
154 def __init__(self
, device
, username
=None):
156 self
.back_at_prompt_pattern
= None
157 self
.username
= username
159 self
.con
= serial
.Serial(device
)
161 def read(self
, size
=1):
162 if len(self
.buffer) >= size
:
163 # throw away first size bytes in buffer
164 data
= self
.buffer[:size
]
165 # Set the buffer to the non used bytes
166 self
.buffer = self
.buffer[size
:]
170 # Set the size to the value we have to read now
171 size
= size
- len(self
.buffer)
172 # Set the buffer empty
174 return data
+ self
.con
.read(size
)
176 def peek(self
, size
=1):
177 if len(self
.buffer) <= size
:
178 self
.buffer += self
.con
.read(size
=size
- len(self
.buffer))
180 return self
.buffer[:size
]
183 self
.log
.debug(self
.buffer)
184 self
.buffer = self
.buffer + self
.con
.read(self
.con
.in_waiting
)
185 if b
"\n" in self
.buffer:
186 size
= self
.buffer.index(b
"\n") + 1
187 self
.log
.debug("We have a whole line in the buffer")
188 self
.log
.debug(self
.buffer)
189 self
.log
.debug("We split at {}".format(size
))
190 data
= self
.buffer[:size
]
191 self
.buffer = self
.buffer[size
:]
193 self
.log
.debug(self
.buffer)
198 return data
+ self
.con
.readline()
200 def back_at_prompt(self
):
205 # We need to use self.in_waiting because with self.con.in_waiting we get
206 # not the complete string
207 size
= len(self
.buffer) + self
.in_waiting
208 data
= self
.peek(size
)
211 if self
.back_at_prompt_pattern
== None:
212 #self.back_at_prompt_pattern = r"^\[{}@.+\]#".format(self.username)
213 self
.back_at_prompt_pattern
= re
.compile(r
"^\[{}@.+\]#".format(self
.username
), re
.MULTILINE
)
215 if self
.back_at_prompt_pattern
.search(data
.decode()):
220 def log_console_line(self
, line
):
221 self
.log
.debug("Get in function log_console_line()")
222 sys
.stdout
.write(line
)
225 def in_waiting(self
):
226 in_waiting_before
= 0
229 while in_waiting_before
!= self
.con
.in_waiting
:
230 in_waiting_before
= self
.con
.in_waiting
233 return self
.con
.in_waiting
236 def readline2(self
, pattern
=None):
240 pattern
= re
.compile(pattern
)
243 char
= self
.con
.read(1)
244 string
= string
+ char
.decode("utf-8")
245 string2
= string2
+ char
247 print(char
.decode("utf-8"), end
="")
250 if pattern
and pattern
.match(string
):
253 return {"string" : string
, "return-code" : 1}
259 return {"return-code" : 0}
261 def check_logged_in(self
, username
):
262 pattern
= "^\[" + username
+ "@.+\]#"
263 data
= self
.readline(pattern
=pattern
)
264 if data
["return-code"] == 1:
265 print("We are logged in")
268 print("We are not logged in")
271 def login(self
, password
):
272 if self
.username
== None:
273 self
.log
.error("Username cannot be blank")
276 # Hit enter to see what we get
277 self
.con
.write(b
'\n')
278 # We get two new lines \r\n ?
279 data
= self
.readline()
280 self
.log_console_line(data
.decode())
283 if self
.back_at_prompt():
284 self
.log
.debug("We are already logged in.")
287 # Read all line till we get login:
290 if not data
.decode() == "l":
291 self
.log
.debug("We get no l at the start")
292 self
.log_console_line(self
.readline().decode())
294 # We need to use self.in_waiting because with self.con.in_waiting we get
295 # not the complete string
296 size
= len(self
.buffer) + self
.in_waiting
297 data
= self
.peek(size
)
299 pattern
= r
"^.*login: "
300 pattern
= re
.compile(pattern
)
302 if pattern
.search(data
.decode()):
305 self
.log
.debug("The pattern does not match")
306 self
.log_console_line(self
.readline().decode())
309 string
= "{}\n".format(self
.username
)
310 self
.con
.write(string
.encode())
312 # read the login out of the buffer
313 data
= self
.readline()
314 self
.log
.debug("This is the login:{}".format(data
))
315 self
.log_console_line(data
.decode())
317 # We need to wait her till we get the full string "Password:"
318 #This is useless but self.in_waiting will wait the correct amount of time
319 size
= self
.in_waiting
321 string
= "{}\n".format(password
)
322 self
.con
.write(string
.encode())
324 # Print the 'Password:' line
325 data
= self
.readline()
326 self
.log_console_line(data
.decode())
328 while not self
.back_at_prompt():
329 # This will fail if the login failed so we need to look for the failed keyword
330 data
= self
.readline()
331 self
.log_console_line(data
.decode())
335 def write(self
, string
):
336 self
.log
.debug(string
)
337 self
.con
.write(string
.encode())
340 def command(self
, command
):
341 self
.write("{}\n".format(command
))
343 # We need to read out the prompt for this command first
344 # If we do not do this we will break the loop immediately
345 # because the prompt for this command is still in the buffer
346 data
= self
.readline()
347 self
.log_console_line(data
.decode())
349 while not self
.back_at_prompt():
350 data
= self
.readline()
351 self
.log_console_line(data
.decode())
354 # A class which define and undefine a virtual network based on an xml file
356 def __init__(self
, network_xml_file
):
358 self
.con
= libvirt_con("qemu:///system")
360 with
open(network_xml_file
) as fobj
:
361 self
.network_xml
= fobj
.read()
362 except FileNotFoundError
as error
:
363 self
.log
.error("No such file: {}".format(vm_xml_file
))
366 self
.network
= self
.con
.con
.networkDefineXML(self
.network_xml
)
369 self
.log
.error("Failed to define virtual network")
372 self
.network
.create()
375 self
.network
.destroy()
379 # Should read the test, check if the syntax are valid
380 # and return tuples with the ( host, command ) structure
382 def __init__(self
, path
):
384 self
.recipe_file
= path
385 if not os
.path
.isfile(self
.recipe_file
):
386 self
.log
.error("No such file: {}".format(self
.recipe_file
))
389 with
open(self
.recipe_file
) as fobj
:
390 self
.raw_recipe
= fobj
.readlines()
391 except FileNotFoundError
as error
:
392 self
.log
.error("No such file: {}".format(vm_xml_file
))
394 for line
in self
.raw_recipe
:
399 def __init__(self
, path
):
402 self
.path
= os
.path
.abspath(path
)
403 except BaseException
as e
:
404 self
.log
.error("Could not get absolute path")
406 self
.log
.debug(self
.path
)
408 self
.settings_file
= "{}/settings".format(self
.path
)
409 if not os
.path
.isfile(self
.settings_file
):
410 self
.log
.error("No such file: {}".format(self
.settings_file
))
412 self
.recipe_file
= "{}/recipe".format(self
.path
)
413 if not os
.path
.isfile(self
.recipe_file
):
414 self
.log
.error("No such file: {}".format(self
.recipe_file
))
416 def read_settings(self
):
417 self
.config
= configparser
.ConfigParser()
418 self
.config
.read(self
.settings_file
)
419 self
.name
= self
.config
["DEFAULT"]["Name"]
420 self
.description
= self
.config
["DEFAULT"]["Description"]
422 self
.virtual_environ_name
= self
.config
["VIRTUAL_ENVIRONMENT"]["Name"]
423 self
.virtual_environ_path
= self
.config
["VIRTUAL_ENVIRONMENT"]["Path"]
424 self
.virtual_environ_path
= os
.path
.normpath(self
.path
+ "/" + self
.virtual_environ_path
)
426 def virtual_environ_setup(self
):
427 self
.virtual_environ
= virtual_environ(self
.virtual_environ_path
)
429 self
.virtual_networks
= self
.virtual_environ
.get_networks()
431 self
.virtual_machines
= self
.virtual_environ
.get_machines()
433 def virtual_environ_start(self
):
436 def load_recipe(self
):
442 def virtual_environ_stop():
446 # Should return all vms and networks in a list
447 # and should provide the path to the necessary xml files
448 class virtual_environ():
449 def __init__(self
, path
):
452 self
.path
= os
.path
.abspath(path
)
453 except BaseException
as e
:
454 self
.log
.error("Could not get absolute path")
456 self
.log
.debug(self
.path
)
458 self
.settings_file
= "{}/settings".format(self
.path
)
459 if not os
.path
.isfile(self
.settings_file
):
460 self
.log
.error("No such file: {}".format(self
.settings_file
))
462 self
.log
.debug(self
.settings_file
)
463 self
.config
= configparser
.ConfigParser()
464 self
.config
.read(self
.settings_file
)
465 self
.name
= self
.config
["DEFAULT"]["name"]
466 self
.machines_string
= self
.config
["DEFAULT"]["machines"]
467 self
.networks_string
= self
.config
["DEFAULT"]["networks"]
470 for machine
in self
.machines_string
.split(","):
471 self
.machines
.append(machine
.strip())
474 for network
in self
.networks_string
.split(","):
475 self
.networks
.append(network
.strip())
477 self
.log
.debug(self
.machines
)
478 self
.log
.debug(self
.networks
)
480 def get_networks(self
):
482 for _network
in self
.networks
:
483 self
.log
.debug(_network
)
484 networks
.setdefault(_network
, network(os
.path
.normpath(self
.path
+ "/" + self
.config
[_network
]["xml_file"])))
487 def get_machines(self
):
489 for _machine
in self
.machines
:
490 self
.log
.debug(_machine
)
491 machines
.setdefault(_machine
, vm(
492 os
.path
.normpath(self
.path
+ "/" + self
.config
[_machine
]["xml_file"]),
493 os
.path
.normpath(self
.path
+ "/" + self
.config
[_machine
]["snapshot_xml_file"])))
498 if __name__
== "__main__":
501 parser
= argparse
.ArgumentParser()
503 parser
.add_argument("-d", "--directory", dest
="dir")
505 args
= parser
.parse_args()
507 _recipe
= recipe("/home/jonatan/python-testing-kvm/test/recipe")
508 currenttest
= test(args
.dir)
509 currenttest
.read_settings()
510 currenttest
.virtual_environ_setup()