cache:
storage: ../cache
logging:
- level: debug
+ level: notice
groups:
- manager
+ - supervisord
network:
listen:
- interface: 127.0.0.1@5353
return Path("supervisord.conf.tmp")
-def supervisord_log_file(_config: "KresConfig") -> Path:
- return Path("supervisord.log")
-
-
def supervisord_pid_file(_config: "KresConfig") -> Path:
return Path("supervisord.pid")
-from typing import List, Optional, Set, Union
+import os
+from typing import Any, List, Optional, Set, Type, Union, cast
from typing_extensions import Literal
from knot_resolver_manager.datamodel.types import CheckedPath, TimeUnit
from knot_resolver_manager.utils.modeling import BaseSchema
+from knot_resolver_manager.utils.modeling.base_schema import is_obj_type_Valid
try:
# On Debian 10, the typing_extensions library does not contain TypeAlias.
LogTargetEnum = Literal["syslog", "stderr", "stdout"]
LogGroupsEnum: TypeAlias = Literal[
"manager",
+ "supervisord",
"system",
"cache",
"io",
class LoggingSchema(BaseSchema):
- """
- Logging and debugging configuration.
-
- ---
- level: Global logging level.
- target: Global logging stream target.
- groups: List of groups for which 'debug' logging level is set.
- dnssec_bogus: Logging a message for each DNSSEC validation failure.
- dnstap: Logging DNS requests and responses to a unix socket.
- debugging: Advanced debugging parameters for kresd (Knot Resolver daemon).
- """
-
- level: LogLevelEnum = "notice"
- target: Optional[LogTargetEnum] = None
- groups: Optional[List[LogGroupsEnum]] = None
- dnssec_bogus: bool = False
- dnstap: Union[Literal[False], DnstapSchema] = False
- debugging: DebuggingSchema = DebuggingSchema()
+ class Raw(BaseSchema):
+ """
+ Logging and debugging configuration.
+
+ ---
+ level: Global logging level.
+ target: Global logging stream target. "from-env" uses $KRES_LOG_TARGET and defaults to "stdout".
+ groups: List of groups for which 'debug' logging level is set.
+ dnssec_bogus: Logging a message for each DNSSEC validation failure.
+ dnstap: Logging DNS requests and responses to a unix socket.
+ debugging: Advanced debugging parameters for kresd (Knot Resolver daemon).
+ """
+
+ level: LogLevelEnum = "notice"
+ target: Union[LogTargetEnum, Literal["from-env"]] = "from-env"
+ groups: Optional[List[LogGroupsEnum]] = None
+ dnssec_bogus: bool = False
+ dnstap: Union[Literal[False], DnstapSchema] = False
+ debugging: DebuggingSchema = DebuggingSchema()
+
+ _LAYER = Raw
+
+ level: LogLevelEnum
+ target: LogTargetEnum
+ groups: Optional[List[LogGroupsEnum]]
+ dnssec_bogus: bool
+ dnstap: Union[Literal[False], DnstapSchema]
+ debugging: DebuggingSchema
+
+ def _target(self, raw: Raw) -> LogTargetEnum:
+ if raw.target == "from-env":
+ target = os.environ.get("KRES_LOGGING_TARGET") or "stdout"
+ if not is_obj_type_Valid(target, cast(Type[Any], LogTargetEnum)):
+ raise ValueError(f"logging target '{target}' read from $KRES_LOGGING_TARGET is invalid")
+ return cast(LogTargetEnum, target)
+ else:
+ return raw.target
def _validate(self):
if self.groups is None:
-- logging.groups
log_groups({
{% for g in cfg.logging.groups %}
-{% if g != "manager" %}
+{% if g != "manager" and g != "supervisord" %}
'{{ g }}',
{% endif %}
{% endfor %}
import os
from jinja2 import Template
+from typing_extensions import Literal
from knot_resolver_manager.compat.dataclasses import dataclass
from knot_resolver_manager.constants import (
kresd_executable,
supervisord_config_file,
supervisord_config_file_tmp,
- supervisord_log_file,
supervisord_pid_file,
supervisord_sock_file,
supervisord_subprocess_log_dir,
user_constants,
)
from knot_resolver_manager.datamodel.config_schema import KresConfig
+from knot_resolver_manager.datamodel.logging_schema import LogTargetEnum
from knot_resolver_manager.kresd_controller.interface import KresID, SubprocessType
from knot_resolver_manager.utils.async_utils import read_resource, writefile
pid_file: str
workdir: str
logfile: str
+ loglevel: Literal["critical", "error", "warn", "info", "debug", "trace", "blather"]
+ target: LogTargetEnum
@staticmethod
def create(config: KresConfig) -> "SupervisordConfig":
+ # determine the correct logging level
+ if config.logging.groups and "supervisord" in config.logging.groups:
+ loglevel = "info"
+ else:
+ loglevel = {
+ "crit": "critical",
+ "err": "error",
+ "warning": "warn",
+ "notice": "warn",
+ "info": "info",
+ "debug": "debug",
+ }[config.logging.level]
+
cwd = str(os.getcwd())
return SupervisordConfig( # type: ignore[call-arg]
unix_http_server=supervisord_sock_file(config),
pid_file=supervisord_pid_file(config),
workdir=cwd,
- logfile=supervisord_log_file(config),
+ logfile="syslog" if config.logging.target == "syslog" else "/dev/null",
+ loglevel=loglevel,
+ target=config.logging.target,
)
systemd_notify(READY="1", STATUS="Ready")
-def make_rpcinterface(supervisord: Supervisor, **_config: Any) -> Any: # pylint: disable=useless-return
+def inject(supervisord: Supervisor, **_config: Any) -> Any: # pylint: disable=useless-return
global superd
superd = supervisord
--- /dev/null
+# type: ignore
+# pylint: disable=protected-access
+
+import sys
+import traceback
+from typing import Any
+
+from supervisor.dispatchers import POutputDispatcher
+from supervisor.loggers import LevelsByName, StreamHandler, SyslogHandler
+from supervisor.supervisord import Supervisor
+from typing_extensions import Literal
+
+FORWARD_LOG_LEVEL = LevelsByName.CRIT # to make sure it's always printed
+
+
+def empty_function(*args, **kwargs):
+ pass
+
+
+def POutputDispatcher_log(self: POutputDispatcher, data: bytearray):
+ if data:
+ # parse the input
+ if not isinstance(data, bytes):
+ text = data
+ else:
+ try:
+ text = data.decode("utf-8")
+ except UnicodeDecodeError:
+ text = "Undecodable: %r" % data
+
+ # print line by line prepending correct prefix to match the style
+ config = self.process.config
+ config.options.logger.handlers = forward_handlers
+ for line in text.splitlines():
+ msg = "[%(name)s:%(channel)s] %(data)s"
+ config.options.logger.log(FORWARD_LOG_LEVEL, msg, name=config.name, channel=self.channel[3:], data=line)
+ config.options.logger.handlers = supervisord_handlers
+
+
+def _create_handler(fmt, level, target: Literal["stdout", "stderr", "syslog"]) -> StreamHandler:
+ if target == "syslog":
+ handler = SyslogHandler()
+ else:
+ handler = StreamHandler(sys.stdout if target == "stdout" else sys.stderr)
+ handler.setFormat(fmt)
+ handler.setLevel(level)
+ return handler
+
+
+supervisord_handlers = []
+forward_handlers = []
+
+
+def inject(supervisord: Supervisor, **config: Any) -> Any: # pylint: disable=useless-return
+ try:
+ # reconfigure log handlers
+ supervisord.options.logger.info("reconfiguring log handlers")
+ supervisord_handlers.append(
+ _create_handler(
+ "[%(asctime)s][supervisor] [%(levelname)s] %(message)s\n",
+ supervisord.options.loglevel,
+ config["target"],
+ )
+ )
+ forward_handlers.append(
+ _create_handler("[%(asctime)s]%(message)s\n", supervisord.options.loglevel, config["target"])
+ )
+ supervisord.options.logger.handlers = supervisord_handlers
+
+ # replace output handler for subprocesses
+ if config["target"] == "syslog":
+ POutputDispatcher._log = empty_function
+ else:
+ POutputDispatcher._log = POutputDispatcher_log
+
+ # this method is called by supervisord when loading the plugin,
+ # it should return XML-RPC object, which we don't care about
+ # That's why why are returning just None
+ return None
+
+ # if we fail to load the module, print some explanation
+ # should not happen when run by endusers
+ except BaseException:
+ traceback.print_exc()
+ raise
def process_spawn_as_child_add_env(slf: Subprocess, *args: Any) -> Tuple[Any, ...]:
if is_type_notify(slf):
slf.config.environment["NOTIFY_SOCKET"] = "@knot-resolver-control-socket"
- slf.config.options.logger.info("setting env")
return (slf, *args)
subscribe(ProcessStateEvent, keep_track_of_starting_processes)
-def make_rpcinterface(supervisord: Supervisor, **_config: Any) -> Any: # pylint: disable=useless-return
+def inject(supervisord: Supervisor, **_config: Any) -> Any: # pylint: disable=useless-return
monkeypatch(supervisord)
# this method is called by supervisord when loading the plugin,
+++ /dev/null
-# type: ignore
-# pylint: disable=protected-access
-"""
-Plugin which creates a new fd at `NEW_STDOUT_FD` and a thread copying data from there to actual stdout.
-Why would we want this? Because when running under systemd, stdout FD is a socket and we can't open it
-by calling `open("/proc/self/fd/1")`. We can do this with pipes though. So in order to transparently pass
-stdout from manager to stdout of supervisord, we are configuring manager to use /proc/self/fd/42001 as its
-logfile. Then we are routing the data to the actual supervisord's stdout.
-
-Performance should not be a problem as this is not a performance critical component.
-"""
-import os
-import sys
-from threading import Thread
-from typing import Any
-
-from supervisor.supervisord import Supervisor
-
-# when changing this, change it in supervisord.conf.j2 as well
-NEW_STDOUT_FD = 42
-
-
-class SplicingThread(Thread):
- def __init__(self, source_fd: int, target_fd: int) -> None:
- super().__init__(daemon=True, name=f"FD-splice-{source_fd}->{target_fd}")
- self.source_fd = source_fd
- self.dest_fd = target_fd
-
- def run(self) -> None:
- if sys.version_info.major >= 3 and sys.version_info.minor >= 10:
- while True:
- os.splice(self.source_fd, self.dest_fd, 1024) # type: ignore[attr-defined]
- else:
- while True:
- buf = os.read(self.source_fd, 1024)
- os.write(self.dest_fd, buf)
-
-
-def make_rpcinterface(_supervisord: Supervisor, **_config: Any) -> Any: # pylint: disable=useless-return
- # create pipe
- (r, w) = os.pipe()
- os.dup2(w, NEW_STDOUT_FD)
- os.close(w)
-
- # start splicing
- t = SplicingThread(r, sys.stdout.fileno())
- t.start()
-
- # this method is called by supervisord when loading the plugin,
- # it should return XML-RPC object, which we don't care about
- # That's why why are returning just None
- return None
pidfile = {{ config.pid_file }}
directory = {{ config.workdir }}
nodaemon = true
-logfile = {{ config.logfile }}
-logfile_maxbytes = 50MB
-loglevel = info
-silent=true
-{# user=root #}
+
+{# disable initial logging until patch_logger.py takes over #}
+logfile = /dev/null
+logfile_maxbytes = 0
+silent = true
+
+{# config for patch_logger.py #}
+loglevel = {{ config.loglevel }}
+{# there are more options in the plugin section #}
+
[unix_http_server]
file = {{ config.unix_http_server }}
[supervisorctl]
serverurl = unix://{{ config.unix_http_server }}
+{# Extensions to changing the supervisord behavior #}
+[rpcinterface:patch_logger]
+supervisor.rpcinterface_factory = knot_resolver_manager.kresd_controller.supervisord.plugin.patch_logger:inject
+target = {{ config.target }}
+
+[rpcinterface:manager_lifecycle_monitor]
+supervisor.rpcinterface_factory = knot_resolver_manager.kresd_controller.supervisord.plugin.manager_lifecycle_monitor:inject
+
+[rpcinterface:sd_notify]
+supervisor.rpcinterface_factory = knot_resolver_manager.kresd_controller.supervisord.plugin.sd_notify:inject
+
{# Extensions for actual API control #}
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[rpcinterface:fast]
supervisor.rpcinterface_factory = knot_resolver_manager.kresd_controller.supervisord.plugin.fast_rpcinterface:make_main_rpcinterface
-{# Extensions to changing the supervisord behavior #}
-[rpcinterface:manager_lifecycle_monitor]
-supervisor.rpcinterface_factory = knot_resolver_manager.kresd_controller.supervisord.plugin.manager_lifecycle_monitor:make_rpcinterface
-
-[rpcinterface:notify]
-supervisor.rpcinterface_factory = knot_resolver_manager.kresd_controller.supervisord.plugin:make_rpcinterface
-
-[rpcinterface:stdout_pipe]
-supervisor.rpcinterface_factory = knot_resolver_manager.kresd_controller.supervisord.plugin.stdout_pipe_log:make_rpcinterface
[program:manager]
redirect_stderr=false
-stdout_logfile=/proc/self/fd/42 # handled by stdout_pipe plugin
-stdout_logfile_maxbytes=0
directory={{ manager.workdir }}
command={{ manager.command }}
stopsignal=SIGINT
autorestart=true
autostart=true
startsecs=5
-environment={{ manager.environment }}
+environment={{ manager.environment }},KRES_SUPRESS_LOG_PREFIX=true
[program:kresd]
process_name=%(program_name)s%(process_num)d
numprocs={{ kresd.max_procs }}
redirect_stderr=false
-stdout_logfile={{ kresd.logfile }}
-stderr_logfile={{ kresd.logfile }}
directory={{ kresd.workdir }}
command={{ kresd.command }}
autostart=false
[program:gc]
redirect_stderr=false
-stdout_logfile={{ gc.logfile }}
-stderr_logfile={{ gc.logfile }}
directory={{ gc.workdir }}
command={{ gc.command }}
autostart=false
import logging
import logging.handlers
+import os
import sys
from typing import Optional
logger = logging.getLogger(__name__)
-FORMAT = "%(relativeCreated)dms:%(levelname)s:%(name)s:%(message)s"
+def get_log_format(config: KresConfig) -> str:
+ """
+ Based on an environment variable $KRES_SUPRESS_LOG_PREFIX, returns the appropriate format string for logger.
+ """
+
+ if os.environ.get("KRES_SUPRESS_LOG_PREFIX") == "true":
+ # In this case, we are running under supervisord and it's adding prefixes to our output
+ return "[%(levelname)s] %(name)s: %(message)s"
+ else:
+ # In this case, we are running standalone during inicialization and we need to add a prefix to each line
+ # by ourselves to make it consistent
+ assert config.logging.target != "syslog"
+ channel = config.logging.target[3:]
+ return f"[%(asctime)s][manager:{channel}] [%(levelname)s] %(name)s: %(message)s"
async def _set_log_level(config: KresConfig) -> None:
"debug": "DEBUG",
}
- # when logging is configured but not for us, still log all WARNING
- if config.logging.groups and "manager" not in config.logging.groups:
- target = "WARNING"
+ # when logging group is set to make us log with DEBUG
+ if config.logging.groups and "manager" in config.logging.groups:
+ target = "DEBUG"
# otherwise, follow the standard log level
else:
target = levels_map[config.logging.level]
handler: logging.Handler
if target == "syslog":
handler = logging.handlers.SysLogHandler(address="/dev/log")
- handler.setFormatter(logging.Formatter("%(name)s:%(message)s"))
+ handler.setFormatter(logging.Formatter("%(name)s: %(message)s"))
elif target == "stdout":
handler = logging.StreamHandler(sys.stdout)
- handler.setFormatter(logging.Formatter(FORMAT))
+ handler.setFormatter(logging.Formatter(get_log_format(config)))
elif target == "stderr":
handler = logging.StreamHandler(sys.stderr)
- handler.setFormatter(logging.Formatter(FORMAT))
+ handler.setFormatter(logging.Formatter(get_log_format(config)))
else:
raise RuntimeError(f"Unexpected value '{target}' for log target in the config")
for name in annot:
res[name] = Serializable.serialize(getattr(self, name))
return res
+
+
+def is_obj_type_Valid(obj: Any, tp: Type[Any]) -> bool:
+ """
+ Runtime type checking. Validate, that a given object is of a given type.
+ """
+
+ try:
+ _validated_object_type(tp, obj)
+ return True
+ except (DataValidationError, ValueError):
+ return False
from typing import Iterable, List
+from knot_resolver_manager.exceptions import KresManagerException
-class DataModelingBaseException(Exception):
+
+class DataModelingBaseException(KresManagerException):
"""
Base class for all exceptions used in modelling.
"""