invoke_callback = True
continue
- if detected_subprocesses[eid] is SubprocessStatus.FAILED:
- logger.error("Subprocess '%s' is failed.", eid)
+ if detected_subprocesses[eid] is SubprocessStatus.FATAL:
+ logger.error("Subprocess '%s' is in FATAL state!", eid)
invoke_callback = True
continue
if detected_subprocesses[eid] is SubprocessStatus.UNKNOWN:
- logger.warning("Subprocess '%s' is in unknown state!", eid)
+ logger.warning("Subprocess '%s' is in UNKNOWN state!", eid)
non_registered_ids = detected_subprocesses.keys() - set(expected_ids)
if len(non_registered_ids) != 0:
return getattr(proxy, "fast")
+def _convert_subprocess_status(proc: Any) -> SubprocessStatus:
+ conversion_tbl = {
+ # "STOPPED": None, # filtered out elsewhere
+ "STARTING": SubprocessStatus.RUNNING,
+ "RUNNING": SubprocessStatus.RUNNING,
+ "BACKOFF": SubprocessStatus.RUNNING,
+ "STOPPING": SubprocessStatus.RUNNING,
+ "EXITED": SubprocessStatus.EXITED,
+ "FATAL": SubprocessStatus.FATAL,
+ "UNKNOWN": SubprocessStatus.UNKNOWN,
+ }
+
+ if proc["statename"] in conversion_tbl:
+ status = conversion_tbl[proc["statename"]]
+ else:
+ logger.warning(f"Unknown supervisord process state {proc['statename']}")
+ status = SubprocessStatus.UNKNOWN
+ return status
+
+
def _list_running_subprocesses(config: KresConfig) -> Dict[SupervisordKresID, SubprocessStatus]:
- supervisord = _create_supervisord_proxy(config)
- processes: Any = supervisord.getAllProcessInfo()
-
- def convert(proc: Any) -> SubprocessStatus:
- conversion_tbl = {
- # "STOPPED": None, # filtered out elsewhere
- "STARTING": SubprocessStatus.RUNNING,
- "RUNNING": SubprocessStatus.RUNNING,
- "BACKOFF": SubprocessStatus.RUNNING,
- "STOPPING": SubprocessStatus.RUNNING,
- "EXITED": SubprocessStatus.EXITED,
- "FATAL": SubprocessStatus.FAILED,
- "UNKNOWN": SubprocessStatus.UNKNOWN,
- }
-
- if proc["statename"] in conversion_tbl:
- status = conversion_tbl[proc["statename"]]
- else:
- logger.warning(f"Unknown supervisord process state {proc['statename']}")
- status = SubprocessStatus.UNKNOWN
- return status
+ try:
+ supervisord = _create_supervisord_proxy(config)
+ processes: Any = supervisord.getAllProcessInfo()
+ except Fault as e:
+ raise SubprocessControllerException(f"failed to get info from all running processes: {e}") from e
# there will be a manager process as well, but we don't want to report anything on ourselves
processes = [pr for pr in processes if pr["name"] != "manager"]
# convert all the names
return {
- SupervisordKresID.from_string(f"{pr['group']}:{pr['name']}"): convert(pr)
+ SupervisordKresID.from_string(f"{pr['group']}:{pr['name']}"): _convert_subprocess_status(pr)
for pr in processes
if pr["statename"] != "STOPPED"
}
def name(self):
return str(self.id)
+ def status(self) -> SubprocessStatus:
+ try:
+ supervisord = _create_supervisord_proxy(self._config)
+ status = supervisord.getProcessInfo(self.name)
+ except Fault as e:
+ raise SubprocessControllerException(f"failed to get status from '{self.id}' process: {e}") from e
+ return _convert_subprocess_status(status)
+
@async_in_a_thread
def _start(self) -> None:
# +1 for canary process (same as in config_file.py)