class Machine:
- def __init__(self, args: Sequence[str] = []) -> None:
+ def __init__(self, args: Sequence[str] = [], *, num_retries: int = 30) -> None:
# Remains None until image is built and booted, then receives pexpect process.
self._serial: Optional[pexpect.spawn] = None
+ self.num_retries = num_retries
self.exit_code: int = -1
self.stack = contextlib.ExitStack()
self.config: MkosiConfig
# The retry logic only applies when running commands against a VM.
- for _ in range(0, 30):
+ for _ in range(0, self.num_retries):
try:
return run(cmdline, check=check, stdout=stdout, stderr=stderr, text=True, timeout=timeout)
except subprocess.CalledProcessError as e:
class MkosiMachineTest(unittest.TestCase):
args: Sequence[str]
+ num_retries: int
machine: Machine
- def __init_subclass__(cls, args: Sequence[str] = []) -> None:
+ def __init_subclass__(cls, args: Sequence[str] = [], *, num_retries: int = 30) -> None:
cls.args = args
+ cls.num_retries = num_retries
@classmethod
def setUpClass(cls) -> None:
with skip_not_supported():
- cls.machine = Machine(cls.args)
+ cls.machine = Machine(cls.args, num_retries = cls.num_retries)
verb = cls.machine.config.verb
no_nspawn = parse_boolean(os.getenv("MKOSI_TEST_NO_NSPAWN", "0"))