We introduce a new way to init a serial console, to connect and disconnect to a serial console.
Therefore we have 3 new functions in the Machine class:
serial_init()
serial_connect()
serial_disconnet()
there are similar functions in the SerialConnection class.
This should make it possible to close a serial connection in a test and
to reopen this connection later.
Signed-off-by: Jonatan Schlag <jonatan.schlag@ipfire.org>
# self.snapshot should be also at least None
self.snapshot = None
# self.snapshot should be also at least None
self.snapshot = None
try:
with open(vm_xml_file) as fobj:
try:
with open(vm_xml_file) as fobj:
- def login(self, log_file, log_start_time=None, longest_machine_name=10):
+ # This function should initialize the serial connection
+ def serial_init(self, log_file=None, log_start_time=None, longest_machine_name=10):
try:
self.serial_con = serial_connection.SerialConnection(self.get_serial_device(),
try:
self.serial_con = serial_connection.SerialConnection(self.get_serial_device(),
- username=self.username,
- log_file=log_file,
- log_start_time=log_start_time,
- name=self.name,
- longest_machine_name=longest_machine_name)
- self.serial_con.login(self.password)
+ username=self.username,
+ password= self.password,
+ log_file=log_file,
+ log_start_time=log_start_time,
+ name=self.name,
+ longest_machine_name=longest_machine_name)
+ except BaseException as e:
+ self.log.error("Could initialize the serial console")
+ self.log.exception(e)
+ raise e
+
+
+ # This function should create a ready to use serial connection for this serial domain
+ def serial_connect(self):
+ try:
+ # Do the real connect
+ self.serial_con.connect()
except BaseException as e:
self.log.error("Could not connect to the domain via serial console")
self.log.exception(e)
raise e
except BaseException as e:
self.log.error("Could not connect to the domain via serial console")
self.log.exception(e)
raise e
+ def serial_disconnect(self):
+ try:
+ self.serial_con.disconnect()
+ except BaseException as e:
+ self.log.error("Could not disconnect from the serial console")
+ self.log.exception(e)
+ raise e
+
def cmd(self, cmd):
return self.serial_con.command(cmd)
def cmd(self, cmd):
return self.serial_con.command(cmd)
log = logging.getLogger("nitsi.serial")
class SerialConnection():
log = logging.getLogger("nitsi.serial")
class SerialConnection():
- def __init__(self, device, username=None, log_file=None, name=None, log_start_time=None, longest_machine_name=10):
+ def __init__(self, device, username=None, password=None, log_file=None, name=None, log_start_time=None, longest_machine_name=10):
self.buffer = b""
self.back_at_prompt_pattern = None
self.username = username
self.buffer = b""
self.back_at_prompt_pattern = None
self.username = username
+ self.password = password
self.log_file = log_file
self.log = log.getChild(name)
self.log.setLevel(logging.INFO)
self.log_file = log_file
self.log = log.getChild(name)
self.log.setLevel(logging.INFO)
- self.con = serial.Serial(device)
+
+ # We create here a closed serial connection
+ self.con = serial.Serial()
+ # Set the port in a second step to avoid that the connection is brought up automatically
+ self.con.port = self.device
self.log_output = self.log.getChild("output")
# Do not propagate the output to ancestor loggers as it looks ugly
self.log_output = self.log.getChild("output")
# Do not propagate the output to ancestor loggers as it looks ugly
self.log_output.addHandler(log_file_handler)
self.log_output.addHandler(stream_handler)
self.log_output.addHandler(log_file_handler)
self.log_output.addHandler(stream_handler)
+ def connect(self):
+ # Check if the serial port is open, if not open the port
+ if self.con.is_open:
+ self.log.debug("Connection to the serial port is open")
+ else:
+ self.log.debug("Connection to the serial port is closed, try to open it.")
+ self.con.open()
+ # Try to login, if we are already logged in we detect this and exit the function
+ self.login()
+
+ def disconnect(self):
+ if not self.con.is_open:
+ self.log.debug("Connection to the serial port is already closed")
+ else:
+ self.log.debug("Connection to the serial port is open, try to close it.")
+ self.con.close()
+
def read(self, size=1):
if len(self.buffer) >= size:
# throw away first size bytes in buffer
def read(self, size=1):
if len(self.buffer) >= size:
# throw away first size bytes in buffer
self.log.debug("We have printed all lines in the buffer")
break
self.log.debug("We have printed all lines in the buffer")
break
- def login(self, password):
if self.username == None:
self.log.error("Username cannot be blank")
return False
if self.username == None:
self.log.error("Username cannot be blank")
return False
#This is useless but self.in_waiting will wait the correct amount of time
size = self.in_waiting
#This is useless but self.in_waiting will wait the correct amount of time
size = self.in_waiting
- string = "{}\n".format(password)
+ string = "{}\n".format(self.password)
self.con.write(string.encode())
self.con.flush()
# Print the 'Password:' line
self.con.write(string.encode())
self.con.flush()
# Print the 'Password:' line
# Number of chars of the longest machine name
longest_machine_name = self.virtual_environ.longest_machine_name
# Number of chars of the longest machine name
longest_machine_name = self.virtual_environ.longest_machine_name
- self.log.info("Try to login on all machines")
+ self.log.info("Try to intialize the serial connection, connect and login on all machines")
for name in self.used_machine_names:
for name in self.used_machine_names:
- self.log.info("Try to login on {}".format(name))
- self.virtual_machines[name].login("{}/test.log".format(self.log_path),
+ self.log.info("Try to initialize the serial connection connect and login on {}".format(name))
+ self.virtual_machines[name].serial_init(log_file="{}/test.log".format(self.log_path),
log_start_time=log_start_time,
longest_machine_name=longest_machine_name)
log_start_time=log_start_time,
longest_machine_name=longest_machine_name)
+ self.virtual_machines[name].serial_connect()
def load_recipe(self):
self.log.info("Going to load the recipe")
def load_recipe(self):
self.log.info("Going to load the recipe")