AsyncGenerator,
Callable,
Coroutine,
+ Dict,
List,
Optional,
Tuple,
+ Type,
Union,
cast,
)
async for response in handler.get_responses(qctx):
yield response
return
+
+
+class ControllableAsyncDnsServer(AsyncDnsServer):
+ """
+ An AsyncDnsServer whose behavior can be dynamically changed by sending TXT
+ queries to a "magic" domain.
+ """
+
+ _CONTROL_DOMAIN = "_control."
+
+ def __init__(self, commands: List[Type["ControlCommand"]]):
+ super().__init__()
+ self._control_domain = dns.name.from_text(self._CONTROL_DOMAIN)
+ self._commands: Dict[dns.name.Name, "ControlCommand"] = {}
+ for command_class in commands:
+ command = command_class()
+ command_subdomain = dns.name.Name([command.control_subdomain])
+ control_subdomain = command_subdomain.concatenate(self._control_domain)
+ try:
+ existing_command = self._commands[control_subdomain]
+ except KeyError:
+ self._commands[control_subdomain] = command
+ else:
+ raise RuntimeError(
+ f"{control_subdomain} already handled by {existing_command}"
+ )
+
+ async def _prepare_responses(
+ self, qctx: QueryContext
+ ) -> AsyncGenerator[Optional[Union[dns.message.Message, bytes]], None]:
+ """
+ Detect and handle control queries, falling back to normal processing
+ for non-control queries.
+ """
+ control_response = self._handle_control_command(qctx)
+ if control_response:
+ yield await DnsResponseSend(response=control_response).perform()
+ return
+
+ async for response in super()._prepare_responses(qctx):
+ yield response
+
+ def _handle_control_command(
+ self, qctx: QueryContext
+ ) -> Optional[dns.message.Message]:
+ """
+ Detect and handle control queries.
+
+ A control query must be of type TXT; if it is not, a FORMERR response
+ is sent back.
+
+ The list of commands that the server should respond to is passed to its
+ constructor. If the server is unable to handle the control query using
+ any of the enabled commands, an NXDOMAIN response is sent.
+
+ Otherwise, the relevant command's handler is expected to provide the
+ response via qctx.response and/or return a string that is converted to
+ a TXT RRset inserted into the ANSWER section of the response to the
+ control query. The RCODE for a command-provided response defaults to
+ NOERROR, but can be overridden by the command's handler.
+ """
+ if not qctx.qname.is_subdomain(self._control_domain):
+ return None
+
+ if qctx.qtype != dns.rdatatype.TXT:
+ logging.error("Non-TXT control query %s from %s", qctx.qname, qctx.peer)
+ qctx.response.set_rcode(dns.rcode.FORMERR)
+ return qctx.response
+
+ control_subdomain = dns.name.Name(qctx.qname.labels[-3:])
+ try:
+ command = self._commands[control_subdomain]
+ except KeyError:
+ logging.error("Unhandled control query %s from %s", qctx.qname, qctx.peer)
+ qctx.response.set_rcode(dns.rcode.NXDOMAIN)
+ return qctx.response
+
+ logging.info("Received control query %s from %s", qctx.qname, qctx.peer)
+ logging.debug("Handling control query %s using %s", qctx.qname, command)
+ qctx.response.set_rcode(dns.rcode.NOERROR)
+ qctx.response.flags |= dns.flags.AA
+
+ command_qname = qctx.qname.relativize(control_subdomain)
+ try:
+ command_args = [l.decode("ascii") for l in command_qname.labels]
+ except UnicodeDecodeError:
+ logging.error("Non-ASCII control query %s from %s", qctx.qname, qctx.peer)
+ qctx.response.set_rcode(dns.rcode.FORMERR)
+ return qctx.response
+
+ command_response = command.handle(command_args, self, qctx)
+ if command_response:
+ command_response_rrset = dns.rrset.from_text(
+ qctx.qname, 0, qctx.qclass, dns.rdatatype.TXT, f'"{command_response}"'
+ )
+ qctx.response.answer.append(command_response_rrset)
+
+ return qctx.response
+
+
+class ControlCommand(abc.ABC):
+ """
+ Base class for control commands.
+
+ The derived class must define the control query subdomain that it handles
+ and the callback that handles the control queries.
+ """
+
+ @property
+ @abc.abstractmethod
+ def control_subdomain(self) -> str:
+ """
+ The subdomain of the control domain handled by this command. Needs to
+ be defined as a string by the derived class.
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def handle(
+ self, args: List[str], server: ControllableAsyncDnsServer, qctx: QueryContext
+ ) -> Optional[str]:
+ """
+ This method is expected to carry out arbitrary actions in response to a
+ control query. Note that it is invoked synchronously (it is not a
+ coroutine).
+
+ `args` is a list of arguments for the command extracted from the
+ control query's QNAME; these arguments (and therefore the QNAME as
+ well) must only contain ASCII characters. For example, if a command's
+ subdomain is `my-command`, control query `foo.bar.my-command._control.`
+ causes `args` to be set to `["foo", "bar"]` while control query
+ `my-command._control.` causes `args` to be set to `[]`.
+
+ `server` is the server instance that received the control query. This
+ method can change the server's behavior by altering its response
+ handler list using the appropriate methods.
+
+ `qctx` is the query context for the control query. By operating on
+ qctx.response, this method can prepare the DNS response sent to
+ the client in response to the control query. Alternatively (or in
+ addition to the above), it can also return a string; if it does, the
+ returned string is converted to a TXT RRset that is inserted into the
+ ANSWER section of the response to the control query.
+ """
+ raise NotImplementedError
+
+ def __str__(self) -> str:
+ return self.__class__.__name__