from . import recipe
from . import virtual_environ
from . import settings
+from . import cmd
logger = logging.getLogger("nitsi.test")
self.settings["copy_from"] = None
self.settings["copy_to"] = None
self.settings["virtual_environ_path"] = None
+ self.settings["interactive_error_handling"] = True
self.cmd_settings = cmd_settings
self.log_path = log_path
self.log.error("Failed to load recipe")
raise e
+ # This functions tries to handle an rror of the test (eg. when 'echo "Hello World"' failed)
+ # in an interactive way
+ # returns False when the test should exit right now, and True when the test should go on
+ def interactive_error_handling(self):
+ if not self.settings["interactive_error_handling"]:
+ return False
+
+ _cmd = cmd.CMD(intro="You are droppped into an interative debugging shell because of the previous errors",
+ help={"exit": "Exit the test rigth now",
+ "continue": "Continues the test without any error handling, so do not expect that the test succeeds.",
+ "debug": "Disconnects from the serial console and prints the devices to manually connect to the virtual machines." \
+ "This is useful when you can fix th error with some manual commands. Please disconnect from the serial consoles and " \
+ "choose 'exit or 'continue' when you are done"})
+
+ command = _cmd.get_input(valid_commands=["continue", "exit", "debug"])
+
+ if command == "continue":
+ # The test should go on but we do not any debugging, so we return True
+ return True
+ elif command == "exit":
+ # The test should exit right now (normal behaviour)
+ return False
+
+ # If we get here we are in debugging mode
+ # Disconnect from the serial console:
+
+ for name in self.used_machine_names:
+ _cmd.print_to_cmd("Disconnect from the serial console of {}".format(name))
+ self.virtual_machines[name].serial_disconnect()
+
+ # Print the serial device for each machine
+ for name in self.used_machine_names:
+ device = self.virtual_machines[name].get_serial_device()
+ _cmd.print_to_cmd("Serial device of {} is {}".format(name, device))
+
+ _cmd.print_to_cmd("You can now connect to all serial devices, and send custom commands to the virtual machines." \
+ "Please type 'continue' or 'exit' when you disconnected from als serial devices and want to go on.")
+
+ command = _cmd.get_input(valid_commands=["continue", "exit"])
+
+ if command == "exit":
+ return False
+
+ # We should continue whit the test
+ # Reconnect to the serial devices
+
+ for name in self.used_machine_names:
+ self.log.info("Try to reconnect to {}".format(name))
+ self.virtual_machines[name].serial_connect()
+
+ return True
+
def run_recipe(self):
for line in self.recipe.recipe:
return_value = self.virtual_machines[line[0]].cmd(line[2])
self.log.debug("Return value is: {}".format(return_value))
if return_value != "0" and line[1] == "":
- raise TestException("Failed to execute command '{}' on {}, return code: {}".format(line[2],line[0], return_value))
+ err_msg = "Failed to execute command '{}' on {}, return code: {}".format(line[2],line[0], return_value)
+ # Try to handle this error in an interactive way, if we cannot go on
+ # raise an exception and exit
+ if not self.interactive_error_handling():
+ raise TestException(err_msg)
+
elif return_value == "0" and line[1] == "!":
- raise TestException("Succeded to execute command '{}' on {}, return code: {}".format(line[2],line[0],return_value))
+ err_msg = "Succeded to execute command '{}' on {}, return code: {}".format(line[2],line[0],return_value)
+ self.log.error(err_msg)
+ # Try to handle this error in an interactive way, if we cannot go on
+ # raise an exception and exit
+ if not self.interactive_error_handling():
+ raise TestException(err_msg)
else:
self.log.debug("Command '{}' on {} returned with: {}".format(line[2],line[0],return_value))