List,
Optional,
Tuple,
- Type,
Union,
cast,
)
_CONTROL_DOMAIN = "_control."
- def __init__(self, commands: List[Type["ControlCommand"]]):
- super().__init__()
- self._control_domain = dns.name.from_text(self._CONTROL_DOMAIN)
- self._commands: Dict[dns.name.Name, "ControlCommand"] = {}
- for command_class in commands:
- command = command_class()
- command_subdomain = dns.name.Name([command.control_subdomain])
- control_subdomain = command_subdomain.concatenate(self._control_domain)
- try:
- existing_command = self._commands[control_subdomain]
- except KeyError:
- self._commands[control_subdomain] = command
- else:
- raise RuntimeError(
- f"{control_subdomain} already handled by {existing_command}"
- )
+ @functools.cached_property
+ def _control_domain(self) -> dns.name.Name:
+ return dns.name.from_text(self._CONTROL_DOMAIN)
+
+ @functools.cached_property
+ def _commands(self) -> Dict[dns.name.Name, "ControlCommand"]:
+ return {}
+
+ def install_control_commands(self, commands: List["ControlCommand"]) -> None:
+ for command in commands:
+ self.install_control_command(command)
+
+ def install_control_command(self, command: "ControlCommand") -> None:
+ command_subdomain = dns.name.Name([command.control_subdomain])
+ control_subdomain = command_subdomain.concatenate(self._control_domain)
+ try:
+ existing_command = self._commands[control_subdomain]
+ except KeyError:
+ self._commands[control_subdomain] = command
+ else:
+ raise RuntimeError(
+ f"{control_subdomain} already handled by {existing_command}"
+ )
async def _prepare_responses(
self, qctx: QueryContext