def _seek_on_enter(self) -> None:
if self._fd:
self._fd.seek(0, os.SEEK_END)
-
-
-# pylint: disable=too-few-public-methods
-class RNDCExecutor(abc.ABC):
- """
- An interface which RNDC executors have to implement in order for the
- `NamedInstance` class to be able to use them.
- """
-
- @overload
- def call(self, ip: str, port: int, command: str, timeout: int = 10) -> str: ...
-
- @overload
- def call(
- self, ip: str, port: int, command: List[str], timeout: int = 10
- ) -> str: ...
-
- @abc.abstractmethod
- def call(
- self, ip: str, port: int, command: Union[str, List[str]], timeout: int = 10
- ) -> str:
- """
- Send RNDC `command` to the `named` instance at `ip:port` and return the
- server's response.
- """
-
-
-class RNDCException(Exception):
- """
- Raised by classes implementing the `RNDCExecutor` interface when sending an
- RNDC command fails for any reason.
- """
-
-
-class RNDCBinaryExecutor(RNDCExecutor):
- """
- An `RNDCExecutor` which sends RNDC commands to servers using the `rndc`
- binary.
- """
-
- def __init__(self) -> None:
- """
- This class needs the `RNDC` environment variable to be set to the path
- to the `rndc` binary to use.
- """
- rndc_path = os.environ.get("RNDC", "/bin/false")
- rndc_conf = os.path.join("..", "_common", "rndc.conf")
- self._base_cmdline = [rndc_path, "-c", rndc_conf]
-
- @overload
- def call(self, ip: str, port: int, command: str, timeout: int = 10) -> str: ...
-
- @overload
- def call(
- self, ip: str, port: int, command: List[str], timeout: int = 10
- ) -> str: ...
-
- def call(
- self, ip: str, port: int, command: Union[str, List[str]], timeout: int = 10
- ) -> str:
- """
- Send RNDC `command` to the `named` instance at `ip:port` and return the
- server's response.
- """
- cmdline = self._base_cmdline[:]
- cmdline.extend(["-s", ip])
- cmdline.extend(["-p", str(port)])
- cmdline.extend(shlex.split(command) if isinstance(command, str) else command)
-
- try:
- return subprocess.check_output(
- cmdline, stderr=subprocess.STDOUT, timeout=timeout, encoding="utf-8"
- )
- except subprocess.SubprocessError as exc:
- msg = getattr(exc, "output", "RNDC exception occurred")
- raise RNDCException(msg) from exc