+import ipaddress
import json
import multiprocessing
import subprocess
import urllib.parse
from pathlib import Path
from typing import Dict, List, Union
-import ipaddress
import requests
def set_num_workers(self, n: int):
response = requests.post(self._create_url("/config/server/workers"), data=str(n))
print(response.text)
-
+
def set_static_hints(self, hints: Dict[str, List[Union[ipaddress.IPv4Address, ipaddress.IPv6Address]]]):
- payload = [
- {
- "name": name,
- "addresses": [str(a) for a in addrs]
- }
- for name, addrs in hints.items()
- ]
+ payload = [{"name": name, "addresses": [str(a) for a in addrs]} for name, addrs in hints.items()]
response = requests.post(self._create_url("/config/static-hints/hints"), json=payload)
print(response.text)
-
+
def set_listen_ip_address(self, ip: Union[ipaddress.IPv4Address, ipaddress.IPv6Address], port: int):
- payload = [
- {
- "listen": {
- "ip": str(ip),
- "port": port
- }
- }
- ]
+ payload = [{"listen": {"ip": str(ip), "port": port}}]
response = requests.post(self._create_url("/config/network/interfaces"), json=payload)
print(response)
client = KnotManagerClient(ctx.obj[BASE_URL])
client.set_num_workers(instances)
+
@main.command("one-static-hint", help="Set one inline static-hint hints (replaces old static hints)")
@click.argument("name", type=str, nargs=1)
@click.argument("ip", type=str, nargs=1)
client = KnotManagerClient(ctx.obj[BASE_URL])
client.set_static_hints({name: [ipaddress.ip_address(ip)]})
+
@main.command("listen-ip", help="Configure where the resolver should listen (replaces all previous locations)")
@click.argument("ip", type=str, nargs=1)
@click.argument("port", type=int, nargs=1)
_used: "weakref.WeakSet[KresID]" = weakref.WeakSet()
-def alloc(_custom_name_id: bool=False) -> KresID:
+def alloc(_custom_name_id: bool = False) -> KresID:
for i in itertools.count(start=1):
val = KresID(i if not _custom_name_id else -i)
if val not in _used:
from knot_resolver_manager import compat
from knot_resolver_manager.compat.asyncio import to_thread
-from knot_resolver_manager.kres_id import KresID, alloc, alloc_from_string
+from knot_resolver_manager.kres_id import KresID, alloc_from_string
from knot_resolver_manager.kresd_controller.interface import (
Subprocess,
SubprocessController,