def kresd_config_file(_config: "KresConfig", kres_id: "KresID") -> Path:
- return Path(f"{kres_id}.conf")
+ return Path(f"kresd{int(kres_id)}.conf")
def kresd_config_file_supervisord_pattern(_config: "KresConfig") -> Path:
# if it crashes and the startup fails, then well, it's not running anymore... :)
await self._spawn_new_worker(new)
except (SubprocessError, SubprocessControllerException):
- logger.error("kresd with the new config failed to start, rejecting config")
- return Result.err("Canary kresd instance failed to start. Config is invalid.")
+ logger.error("Kresd with the new config failed to start, rejecting config")
+ return Result.err("canary kresd process failed to start. Config might be invalid.")
logger.debug("Canary process test passed.")
return Result.ok(None)
ID object used for identifying subprocesses.
"""
- _used: "WeakValueDictionary[int, KresID]" = WeakValueDictionary()
+ _used: "Dict[SubprocessType, WeakValueDictionary[int, KresID]]" = {k: WeakValueDictionary() for k in SubprocessType}
@classmethod
def alloc(cls: Type[T], typ: SubprocessType) -> T:
- # we split them in order to make the numbers nice (no gaps, pretty naming)
- # there are no strictly technical reasons to do this
- #
- # GC - negative IDs
- # KRESD - positive IDs
- if typ is SubprocessType.GC:
- start = -1
- step = -1
- elif typ is SubprocessType.KRESD:
- start = 1
- step = 1
- else:
- raise RuntimeError(f"Unexpected subprocess type {typ}")
-
# find free ID closest to zero
- for i in itertools.count(start=start, step=step):
- if i not in cls._used:
+ for i in itertools.count(start=0, step=1):
+ if i not in cls._used[typ]:
res = cls.new(typ, i)
return res
@classmethod
def new(cls: "Type[T]", typ: SubprocessType, n: int) -> "T":
- if n in cls._used:
+ if n in cls._used[typ]:
# Ignoring typing here, because I can't find a way how to make the _used dict
# typed based on subclass. I am not even sure that it's different between subclasses,
# it's probably still the same dict. But we don't really care about it
- return cls._used[n] # type: ignore
+ return cls._used[typ][n] # type: ignore
else:
val = cls(typ, n, _i_know_what_i_am_doing=True)
- cls._used[n] = val
+ cls._used[typ][n] = val
return val
def __init__(self, typ: SubprocessType, n: int, _i_know_what_i_am_doing: bool = False):
"""
raise NotImplementedError()
+ def __int__(self) -> int:
+ return self._id
+
class Subprocess:
"""
processes = [pr for pr in processes if pr["name"] != "manager"]
# convert all the names
- return {SupervisordKresID.from_string(pr["name"]): convert(pr) for pr in processes if pr["statename"] != "STOPPED"}
+ return {
+ SupervisordKresID.from_string(f"{pr['group']}:{pr['name']}"): convert(pr)
+ for pr in processes
+ if pr["statename"] != "STOPPED"
+ }
class SupervisordSubprocess(Subprocess):
super().__init__(config, base_id)
self._controller: "SupervisordSubprocessController" = controller
- def _name(self):
- if self.type is SubprocessType.GC:
- return str(self.id)
- else:
- return f"kresd:{self.id}"
+ @property
+ def name(self):
+ return str(self.id)
@async_in_a_thread
def _start(self) -> None:
+ # +1 for canary process (same as in config_file.py)
+ assert int(self.id) <= int(self._config.max_workers) + 1, "trying to spawn more than allowed limit of workers"
try:
supervisord = _create_fast_proxy(self._config)
- supervisord.startProcess(self._name())
+ supervisord.startProcess(self.name)
except Fault as e:
raise SubprocessControllerException(f"failed to start '{self.id}'") from e
@async_in_a_thread
def _stop(self) -> None:
supervisord = _create_supervisord_proxy(self._config)
- supervisord.stopProcess(self._name())
+ supervisord.stopProcess(self.name)
@async_in_a_thread
def _restart(self) -> None:
supervisord = _create_supervisord_proxy(self._config)
- supervisord.stopProcess(self._name())
+ supervisord.stopProcess(self.name)
fast = _create_fast_proxy(self._config)
- fast.startProcess(self._name())
+ fast.startProcess(self.name)
def get_used_config(self) -> KresConfig:
return self._config
@staticmethod
def from_string(val: str) -> "SupervisordKresID":
- if val == "gc":
- return SupervisordKresID.new(SubprocessType.GC, -1)
+ if val == "gc:gc":
+ return SupervisordKresID.new(SubprocessType.GC, 0)
else:
- val = val.replace("kresd", "")
+ val = val.replace("kresd:kresd", "")
return SupervisordKresID.new(SubprocessType.KRESD, int(val))
def __str__(self) -> str:
if self.subprocess_type is SubprocessType.GC:
return "gc"
elif self.subprocess_type is SubprocessType.KRESD:
- return f"kresd{self._id}"
+ return f"kresd:kresd{self._id}"
else:
raise RuntimeError(f"Unexpected subprocess type {self.subprocess_type}")