--- /dev/null
+
+
+class CMD():
+ def __init__(self, prompt="", help={}, intro = ""):
+ self.prompt = "nitsi: "
+ self.help_min = {"help": "Shows this help message", "?": "Same as 'help'"}
+ self.help = help
+
+ if prompt != "":
+ self.prompt = prompt
+
+ self.intro = ""
+
+ if intro != "":
+ self.intro = intro
+
+ def print_intro(self, intro=""):
+ if intro == "":
+ intro = self.intro
+ self.print_to_cmd(intro)
+
+ def print_to_cmd(self, string):
+ print(string, end="\n")
+
+ def read_from_cmd(self, prompt=""):
+ if prompt == "":
+ prompt = self.prompt
+ return input(prompt)
+
+ def get_input(self, valid_commands=[], help={}):
+ valid_commands = valid_commands + [ "?", "help" ]
+ input=""
+
+ while True:
+ input = self.read_from_cmd()
+ if input not in valid_commands:
+ self.print_to_cmd("{} is not valid command.".format(input))
+ continue
+
+ # print help
+ if input == "help" or input == "?":
+ self.print_help(help=help)
+ continue
+
+ # if we get here we get a valid input
+ break
+
+ return input
+
+ def print_help(self, help={}):
+ if len(help) == 0:
+ help = self.help
+
+ # Update help with help_min
+ tmp_help = self.help_min
+ tmp_help.update(help)
+ help = tmp_help
+
+ for key in help:
+ self.print_to_cmd("{}: {}".format(key, help[key]))
from . import recipe
from . import virtual_environ
from . import settings
+from . import cmd
logger = logging.getLogger("nitsi.test")
self.settings["copy_from"] = None
self.settings["copy_to"] = None
self.settings["virtual_environ_path"] = None
+ self.settings["interactive_error_handling"] = True
self.cmd_settings = cmd_settings
self.log_path = log_path
self.log.error("Failed to load recipe")
raise e
+ # This functions tries to handle an rror of the test (eg. when 'echo "Hello World"' failed)
+ # in an interactive way
+ # returns False when the test should exit right now, and True when the test should go on
+ def interactive_error_handling(self):
+ if not self.settings["interactive_error_handling"]:
+ return False
+
+ _cmd = cmd.CMD(intro="You are droppped into an interative debugging shell because of the previous errors",
+ help={"exit": "Exit the test rigth now",
+ "continue": "Continues the test without any error handling, so do not expect that the test succeeds.",
+ "debug": "Disconnects from the serial console and prints the devices to manually connect to the virtual machines." \
+ "This is useful when you can fix th error with some manual commands. Please disconnect from the serial consoles and " \
+ "choose 'exit or 'continue' when you are done"})
+
+ command = _cmd.get_input(valid_commands=["continue", "exit", "debug"])
+
+ if command == "continue":
+ # The test should go on but we do not any debugging, so we return True
+ return True
+ elif command == "exit":
+ # The test should exit right now (normal behaviour)
+ return False
+
+ # If we get here we are in debugging mode
+ # Disconnect from the serial console:
+
+ for name in self.used_machine_names:
+ _cmd.print_to_cmd("Disconnect from the serial console of {}".format(name))
+ self.virtual_machines[name].serial_disconnect()
+
+ # Print the serial device for each machine
+ for name in self.used_machine_names:
+ device = self.virtual_machines[name].get_serial_device()
+ _cmd.print_to_cmd("Serial device of {} is {}".format(name, device))
+
+ _cmd.print_to_cmd("You can now connect to all serial devices, and send custom commands to the virtual machines." \
+ "Please type 'continue' or 'exit' when you disconnected from als serial devices and want to go on.")
+
+ command = _cmd.get_input(valid_commands=["continue", "exit"])
+
+ if command == "exit":
+ return False
+
+ # We should continue whit the test
+ # Reconnect to the serial devices
+
+ for name in self.used_machine_names:
+ self.log.info("Try to reconnect to {}".format(name))
+ self.virtual_machines[name].serial_connect()
+
+ return True
+
def run_recipe(self):
for line in self.recipe.recipe:
return_value = self.virtual_machines[line[0]].cmd(line[2])
self.log.debug("Return value is: {}".format(return_value))
if return_value != "0" and line[1] == "":
- raise TestException("Failed to execute command '{}' on {}, return code: {}".format(line[2],line[0], return_value))
+ err_msg = "Failed to execute command '{}' on {}, return code: {}".format(line[2],line[0], return_value)
+ # Try to handle this error in an interactive way, if we cannot go on
+ # raise an exception and exit
+ if not self.interactive_error_handling():
+ raise TestException(err_msg)
+
elif return_value == "0" and line[1] == "!":
- raise TestException("Succeded to execute command '{}' on {}, return code: {}".format(line[2],line[0],return_value))
+ err_msg = "Succeded to execute command '{}' on {}, return code: {}".format(line[2],line[0],return_value)
+ self.log.error(err_msg)
+ # Try to handle this error in an interactive way, if we cannot go on
+ # raise an exception and exit
+ if not self.interactive_error_handling():
+ raise TestException(err_msg)
else:
self.log.debug("Command '{}' on {} returned with: {}".format(line[2],line[0],return_value))