-def main():
- print("Knot Resolver CLI successfully running...")
- print("... unfortunatelly, it does nothing at the moment")
+from typing import TYPE_CHECKING, cast
+
+from typing_extensions import Literal
+
+from knot_resolver_manager.utils.requests import request
+
+if TYPE_CHECKING:
+ from knot_resolver_manager.cli.__main__ import Args, ConfigArgs
+
+
+def config(args: "Args") -> None:
+ cfg: "ConfigArgs" = cast("ConfigArgs", args.command)
+
+ if not cfg.path.startswith("/"):
+ cfg.path = "/" + cfg.path
+
+ method: Literal["GET", "POST"] = "GET" if cfg.replacement_value is None else "POST"
+ url = f"{args.socket}/v1/config{cfg.path}"
+ response = request(method, url, cfg.replacement_value)
+ print(response)
+
+
+def stop(args: "Args") -> None:
+ url = f"{args.socket}/stop"
+ response = request("POST", url)
+ print(response)
-from knot_resolver_manager.cli import main
+import argparse
+from abc import ABC
+from typing import Optional
+
+from knot_resolver_manager.cli import config, stop
+from knot_resolver_manager.compat.dataclasses import dataclass
+
+
+class Cmd(ABC):
+ def __init__(self, ns: argparse.Namespace) -> None:
+ pass
+
+ def run(self, args: "Args") -> None:
+ raise NotImplementedError()
+
+
+class ConfigArgs(Cmd):
+ def __init__(self, ns: argparse.Namespace) -> None:
+ super().__init__(ns)
+ self.path: str = str(ns.path)
+ self.replacement_value: Optional[str] = ns.new_value
+ self.delete: bool = ns.delete
+ self.stdin: bool = ns.stdin
+
+ def run(self, args: "Args") -> None:
+ config(args)
+
+
+class StopArgs(Cmd):
+ def run(self, args: "Args") -> None:
+ stop(args)
+
+
+@dataclass
+class Args:
+ socket: str
+ command: Cmd # union in the future
+
+
+def parse_args() -> Args:
+ # pylint: disable=redefined-outer-name
+
+ parser = argparse.ArgumentParser("kresctl", description="CLI for controlling Knot Resolver")
+ parser.add_argument(
+ "-s",
+ "--socket",
+ action="store",
+ type=str,
+ help="manager API listen address",
+ default="http+unix://%2Fvar%2Frun%2Fknot-resolver%2Fmanager.sock",
+ nargs=1,
+ required=True,
+ )
+ subparsers = parser.add_subparsers()
+
+ config = subparsers.add_parser(
+ "config", help="dynamically change configuration of a running resolver", aliases=["c", "conf"]
+ )
+ config.add_argument("path", type=str, help="which part of config should we work with")
+ config.add_argument(
+ "new_value",
+ type=str,
+ nargs="?",
+ help="optional, what value should we set for the given path (JSON)",
+ default=None,
+ )
+ config.add_argument("-d", "--delete", action="store_true", help="delete part of the config tree", default=False)
+ config.add_argument("--stdin", help="read new config value on stdin", action="store_true", default=False)
+ config.set_defaults(command_type=ConfigArgs)
+
+ stop = subparsers.add_parser("stop", help="shutdown everything")
+ stop.set_defaults(command_type=StopArgs)
+
+ ns = parser.parse_args()
+ return Args(socket=ns.socket[0], command=ns.command_type(ns)) # type: ignore[call-arg]
+
if __name__ == "__main__":
- main()
+ _args = parse_args()
+ _args.command.run(_args)
+++ /dev/null
-import ipaddress
-import json
-import multiprocessing
-import subprocess
-import time
-import urllib.parse
-from pathlib import Path
-from typing import Dict, List, Union
-
-import requests
-
-from knot_resolver_manager import compat
-from knot_resolver_manager.server import start_server
-from knot_resolver_manager.utils.modeling import ParsedTree
-
-
-class KnotManagerClient:
- def __init__(self, url: str):
- self._url = url
-
- def _create_url(self, path: str) -> str:
- return urllib.parse.urljoin(self._url, path)
-
- def stop(self) -> None:
- response = requests.post(self._create_url("/stop"))
- print(response.text)
-
- def set_num_workers(self, n: int) -> None:
- response = requests.post(self._create_url("/config/server/workers"), data=str(n))
- print(response.text)
-
- def set_groupid(self, gid: str) -> None:
- response = requests.post(self._create_url("/config/server/groupid"), data=f'"{gid}"')
- print(response.text)
-
- def set_static_hints(self, hints: Dict[str, List[Union[ipaddress.IPv4Address, ipaddress.IPv6Address]]]) -> None:
- payload = {name: [str(a) for a in addrs] for name, addrs in hints.items()}
- response = requests.post(self._create_url("/config/static-hints/hints"), json=payload)
- print(response.text)
-
- def set_listen_ip_address(self, ip: Union[ipaddress.IPv4Address, ipaddress.IPv6Address], port: int) -> None:
- payload = [{"listen": {"ip": str(ip), "port": port}}]
- response = requests.post(self._create_url("/config/network/interfaces"), json=payload)
- print(response)
-
- def wait_for_initialization(self, timeout_sec: float = 5, time_step: float = 0.4) -> None:
- started = time.time()
- while True:
- try:
- response = requests.get(self._create_url("/"))
- data = json.loads(response.text)
- if data["status"] == "RUNNING":
- return
- except BaseException:
- pass
-
- if time.time() - started > timeout_sec:
- raise TimeoutError("The manager did not start in time")
-
- time.sleep(time_step)
-
-
-def count_running_kresds() -> int:
- """
- Inteded use-case is testing... Nothing more
-
- Looks at running processes in the system and returns the number of kresd instances observed.
- """
- cmd = subprocess.run(
- "ps aux | grep kresd | grep -v grep", shell=True, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, check=False
- )
- return len(str(cmd.stdout, "utf8").strip().split("\n"))
-
-
-class _DefaultSentinel:
- pass
-
-
-_DEFAULT_SENTINEL = _DefaultSentinel()
-
-
-def start_manager_in_background(
- initial_config: Union[Path, ParsedTree, _DefaultSentinel] = _DEFAULT_SENTINEL
-) -> multiprocessing.Process:
- if isinstance(initial_config, _DefaultSentinel):
- p = multiprocessing.Process(target=compat.asyncio.run, args=(start_server(),))
- else:
- p = multiprocessing.Process(target=compat.asyncio.run, args=(start_server(config=initial_config),))
- p.start()
- return p
+++ /dev/null
-import ipaddress
-import sys
-
-import click
-from click.exceptions import ClickException
-
-from knot_resolver_manager.client import KnotManagerClient
-from knot_resolver_manager.datamodel.config_schema import KresConfig
-from knot_resolver_manager.exceptions import KresManagerException
-from knot_resolver_manager.utils.modeling import parse_yaml
-
-BASE_URL = "base_url"
-
-
-@click.group()
-@click.option(
- "-u",
- "--url",
- "base_url",
- nargs=1,
- default="http://localhost:5000/",
- help="Set base URL on which the manager communicates",
-)
-@click.pass_context
-def main(ctx: click.Context, base_url: str) -> None:
- ctx.ensure_object(dict)
- ctx.obj[BASE_URL] = base_url
-
-
-@main.command(help="Shutdown the manager and all workers")
-@click.pass_context
-def stop(ctx: click.Context) -> None:
- client = KnotManagerClient(ctx.obj[BASE_URL])
- client.stop()
-
-
-@main.command("gen-lua", help="Generate LUA config from a given declarative config")
-@click.argument("config_path", type=str, nargs=1)
-def gen_lua(config_path: str) -> None:
- try:
- with open(config_path, "r", encoding="utf8") as f:
- data = f.read()
- parsed = parse_yaml(data)
- config = KresConfig(parsed)
- lua = config.render_lua()
- click.echo_via_pager(lua)
- except KresManagerException as e:
- ne = ClickException(str(e))
- ne.exit_code = 1
- raise ne
-
-
-@main.command(help="Set number of workers")
-@click.argument("instances", type=int, nargs=1)
-@click.pass_context
-def workers(ctx: click.Context, instances: int) -> None:
- client = KnotManagerClient(ctx.obj[BASE_URL])
- client.set_num_workers(instances)
-
-
-@main.command(help="Set the manager groupid")
-@click.argument("gid", type=str, nargs=1)
-@click.pass_context
-def groupid(ctx: click.Context, gid: str) -> None:
- client = KnotManagerClient(ctx.obj[BASE_URL])
- client.set_groupid(gid)
-
-
-@main.command("one-static-hint", help="Set one inline static-hint hints (replaces old static hints)")
-@click.argument("name", type=str, nargs=1)
-@click.argument("ip", type=str, nargs=1)
-@click.pass_context
-def one_static_hint(ctx: click.Context, name: str, ip: str) -> None:
- client = KnotManagerClient(ctx.obj[BASE_URL])
- client.set_static_hints({name: [ipaddress.ip_address(ip)]})
-
-
-@main.command("listen-ip", help="Configure where the resolver should listen (replaces all previous locations)")
-@click.argument("ip", type=str, nargs=1)
-@click.argument("port", type=int, nargs=1)
-@click.pass_context
-def listen_ip(ctx: click.Context, ip: str, port: int) -> None:
- client = KnotManagerClient(ctx.obj[BASE_URL])
- client.set_listen_ip_address(ipaddress.ip_address(ip), port)
-
-
-@main.command(help="Wait for manager initialization")
-@click.pass_context
-def wait(ctx: click.Context) -> None:
- client = KnotManagerClient(ctx.obj[BASE_URL])
- try:
- client.wait_for_initialization()
- except TimeoutError as e:
- click.echo(f"ERR: {e}")
- sys.exit(1)
-
-
-if __name__ == "__main__":
- main() # pylint: disable=no-value-for-parameter
err_res = filter(lambda r: r.is_err(), results)
errs = list(map(lambda r: r.unwrap_err(), err_res))
if len(errs) > 0:
- raise KresManagerException("Validation of the new config failed. The reasons are:", *errs)
+ raise KresManagerException(
+ "Validation of the new config failed. The reasons are:\n - " + "\n - ".join(errs)
+ )
async with self._update_lock:
# update the stored config with the new version
try:
return await handler(request)
except DataValidationError as e:
- return web.Response(text=f"validation of configuration failed: {e}", status=HTTPStatus.BAD_REQUEST)
+ return web.Response(text=f"validation of configuration failed:\n{e}", status=HTTPStatus.BAD_REQUEST)
+ except DataParsingError as e:
+ return web.Response(text=f"request processing error:\n{e}", status=HTTPStatus.BAD_REQUEST)
except KresManagerException as e:
- logger.error("Request processing failed", exc_info=True)
- return web.Response(text=f"Request processing failed: {e}", status=HTTPStatus.INTERNAL_SERVER_ERROR)
+ return web.Response(text=f"request processing failed:\n{e}", status=HTTPStatus.INTERNAL_SERVER_ERROR)
class Server:
if self._LAYER is not None:
source = self._LAYER(source, object_path=object_path) # pylint: disable=not-callable
+ # prevent failure when user provides a different type than object
+ if isinstance(source, ParsedTree) and not source.is_dict():
+ raise DataValidationError(f"expected object, found '{source.type()}'", object_path)
+
# assign fields
used_keys = self._assign_fields(source, object_path)
import json
from enum import Enum, auto
from hashlib import blake2b
-from typing import Any, Dict, List, Optional, Set, Tuple, Union
+from typing import Any, Dict, List, Optional, Set, Tuple, Type, Union
import yaml
from typing_extensions import Literal
assert isinstance(self._data, dict)
return self._data[ParsedTree._convert_internal_field_name_to_external(key)]
+ def is_dict(self) -> bool:
+ return isinstance(self._data, dict)
+
+ def type(self) -> Type[Any]:
+ return type(self._data)
+
def __contains__(self, key: str) -> bool:
assert isinstance(self._data, dict)
return ParsedTree._convert_internal_field_name_to_external(key) in self._data
"text/vnd.yaml": _Format.YAML,
}
if mime_type not in formats:
- raise DataParsingError("Unsupported MIME type")
+ raise DataParsingError(
+ f"unsupported MIME type '{mime_type}', expected 'application/json' or 'text/vnd.yaml'"
+ )
return formats[mime_type]
--- /dev/null
+import socket
+from http.client import HTTPConnection
+from typing import Any, Optional, Union
+from urllib.error import HTTPError
+from urllib.request import AbstractHTTPHandler, Request, build_opener, install_opener, urlopen
+
+from typing_extensions import Literal
+
+
+class Response:
+ def __init__(self, status: int, body: str) -> None:
+ self.status = status
+ self.body = body
+
+ def __repr__(self) -> str:
+ return f"status: {self.status}\nbody:\n{self.body}"
+
+
+def request(
+ method: Literal["GET", "POST", "HEAD", "PUT", "DELETE"],
+ url: str,
+ body: Optional[str] = None,
+ content_type: str = "application/json",
+) -> Response:
+ req = Request(
+ url,
+ method=method,
+ data=body.encode("utf8") if body is not None else None,
+ headers={"Content-Type": content_type},
+ )
+ # req.add_header("Authorization", _authorization_header)
+
+ try:
+ with urlopen(req) as response:
+ return Response(response.status, response.read().decode("utf8"))
+ except HTTPError as err:
+ return Response(err.code, response.read().decode("utf8"))
+
+
+# Code heavily inspired by requests-unixsocket
+# https://github.com/msabramo/requests-unixsocket/blob/master/requests_unixsocket/adapters.py
+class UnixHTTPConnection(HTTPConnection):
+ def __init__(self, unix_socket_url: str, timeout: Union[int, float] = 60):
+ """Create an HTTP connection to a unix domain socket
+ :param unix_socket_url: A URL with a scheme of 'http+unix' and the
+ netloc is a percent-encoded path to a unix domain socket. E.g.:
+ 'http+unix://%2Ftmp%2Fprofilesvc.sock/status/pid'
+ """
+ super().__init__("localhost", timeout=timeout)
+ self.unix_socket_path = unix_socket_url
+ self.timeout = timeout
+ self.sock: Optional[socket.socket] = None
+
+ def __del__(self): # base class does not have d'tor
+ if self.sock:
+ self.sock.close()
+
+ def connect(self):
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ sock.settimeout(1) # there is something weird stored in self.timeout
+ sock.connect(self.unix_socket_path)
+ self.sock = sock
+
+
+class UnixHTTPHandler(AbstractHTTPHandler):
+ def __init__(self) -> None:
+ super().__init__()
+
+ def open_(self: UnixHTTPHandler, req: Any) -> Any:
+ return self.do_open(UnixHTTPConnection, req)
+
+ setattr(UnixHTTPHandler, "http+unix_open", open_)
+ setattr(UnixHTTPHandler, "http+unix_request", AbstractHTTPHandler.do_request_)
+
+
+opener = build_opener(UnixHTTPHandler())
+install_opener(opener)
python = "^3.6.8"
aiohttp = "^3.6.12"
Jinja2 = "^2.11.3"
-click = "^7.1.2"
PyYAML = "^5.4.1"
-requests = "^2.25.1"
typing-extensions = ">=3.7.2"
prometheus-client = "^0.6"
supervisor = "^4.2.2"
tox = "^3.21.4"
tox-pyenv = "^1.1.0"
poethepoet = "^0.13.0"
-requests = "^2.25.1"
-requests-unixsocket = "^0.2.0"
-click = "^7.1.2"
toml = "^0.10.2"
debugpy = "^1.2.1"
Sphinx = "^4.0.2"
pylint = "^2.11.1"
pytest-asyncio = "^0.16.0"
pytest = "^6.2.5"
-types-requests = "^2.26.3"
types-PyYAML = "^6.0.1"
mypy = "^0.930"
-types-click = "^7.1.8"
types-Jinja2 = "^2.11.9"
types-dataclasses = "^0.6.4"
poetry = "^1.1.12"
packages = \
['knot_resolver_manager',
'knot_resolver_manager.cli',
- 'knot_resolver_manager.client',
'knot_resolver_manager.compat',
'knot_resolver_manager.datamodel',
'knot_resolver_manager.datamodel.types',
['Jinja2>=2.11.3',
'PyYAML>=5.4.1',
'aiohttp>=3.6.12',
- 'click>=7.1.2',
'prometheus-client>=0.6',
- 'requests>=2.25.1',
'supervisor>=4.2.2',
'typing-extensions>=3.7.2']