autoimport_commands()
parser = create_main_argument_parser()
install_commands_parsers(parser)
- namespace = parser.parse_args()
- kresctl = Kresctl(namespace, parser)
+ namespace, unknown_args = parser.parse_known_args()
+ kresctl = Kresctl(namespace, unknown_args, parser)
if namespace.interactive or len(vars(namespace)) == 2:
kresctl.interactive()
import argparse
-from typing import Tuple, Type
+from enum import Enum
+from typing import Dict, List, Optional, Tuple, Type
from knot_resolver_manager.cli.command import Command, CommandArgs, register_command
-class Shells:
+class Shells(Enum):
BASH = 0
FISH = 1
@register_command
class CompletionCommand(Command):
- def __init__(self, namespace: argparse.Namespace) -> None:
- super().__init__(namespace)
+ def __init__(self, namespace: argparse.Namespace, unknown_args: List[str]) -> None:
+ super().__init__(namespace, unknown_args)
+ self.shell: Shells = namespace.shell
+ self.unknown_args: List[str] = unknown_args
@staticmethod
def register_args_subparser(
- parser: "argparse._SubParsersAction[argparse.ArgumentParser]",
+ subparser: argparse._SubParsersAction[argparse.ArgumentParser],
) -> Tuple[argparse.ArgumentParser, "Type[Command]"]:
- completion = parser.add_parser("completion", help="commands auto-completion")
+ completion = subparser.add_parser("completion", help="commands auto-completion")
shells_dest = "shell"
shells = completion.add_mutually_exclusive_group()
shells.add_argument("--bash", action="store_const", dest=shells_dest, const=Shells.BASH, default=Shells.BASH)
shells.add_argument("--fish", action="store_const", dest=shells_dest, const=Shells.FISH)
- completion.add_argument("values_to_complete", type=str, nargs="+", help="values to auto-complete")
return completion, CompletionCommand
+ @staticmethod
+ def completion(args: List[str], parser: argparse.ArgumentParser) -> List[str]:
+ return []
+
def run(self, args: CommandArgs) -> None:
- pass
+ parser = args.parser
+ comp: List[str] = []
+
+ top_comp: Dict[str, Optional[str]] = {}
+
+ if parser._subparsers:
+ for action in parser._subparsers._actions:
+ if isinstance(action, argparse._SubParsersAction):
+ for sub in action._get_subactions():
+ top_comp[sub.dest] = sub.help
+ else:
+ for s in action.option_strings:
+ top_comp[s] = action.help
+
+ for arg in self.unknown_args:
+ for action in parser._subparsers._actions:
+ if arg in action.option_strings:
+ continue
+ if isinstance(action, argparse._SubParsersAction) and arg in action.choices:
+ subparser: argparse.ArgumentParser = action.choices[arg]
+ command: Command = subparser._defaults["command"]
+
+ comp += command.completion(self.unknown_args, subparser)
+ else:
+ pass
+
+ if self.shell == Shells.BASH:
+ print(" ".join(comp))
+ elif self.shell == Shells.FISH:
+ pass
+ else:
+ pass
+ # error
import argparse
import json
-from typing import Optional, Tuple, Type
+from enum import Enum
+from typing import List, Optional, Tuple, Type
import yaml
from typing_extensions import Literal
from knot_resolver_manager.cli.command import Command, CommandArgs, register_command
+from knot_resolver_manager.utils.modeling import try_to_parse
from knot_resolver_manager.utils.requests import request
-class Operations:
+class Operations(Enum):
SET = 0
DELETE = 1
GET = 2
-class Formats:
+class Formats(Enum):
JSON = 0
YAML = 1
-Methods = Literal["POST", "GET", "DELETE"]
-
-
-def _method_map(op: Operations) -> Methods:
- if op == Operations.SET:
+def operation_to_method(operation: Operations) -> Literal["POST", "GET", "DELETE"]:
+ if operation == Operations.SET:
return "POST"
- elif op == Operations.DELETE:
+ elif operation == Operations.DELETE:
return "DELETE"
- elif op == Operations.GET:
- return "GET"
- else:
- pass
+ return "GET"
-def _format(data: Optional[str], req_format: Formats) -> Optional[str]:
- if not data:
- return None
- dic = {}
- try:
- dic = json.loads(data)
- except json.JSONDecodeError:
- try:
- dic = yaml.load(data)
- except yaml.YAMLError:
- return data
+def reformat(data: str, req_format: Formats) -> str:
+ dict = try_to_parse(data).to_raw()
- if req_format == Formats.JSON:
- return json.dumps(dic, indent=4)
- elif req_format == Formats.YAML:
- return yaml.dump(dic, indent=4)
- return None
+ if req_format == Formats.YAML:
+ return yaml.dump(dict, indent=4)
+ return json.dumps(dict, indent=4)
@register_command
class ConfigCommand(Command):
- def __init__(self, namespace: argparse.Namespace) -> None:
- super().__init__(namespace)
+ def __init__(self, namespace: argparse.Namespace, unknown_args: List[str]) -> None:
+ super().__init__(namespace, unknown_args)
self.path: str = str(namespace.path)
self.value_or_file: Optional[str] = namespace.value_or_file
self.operation: Operations = namespace.operation
@staticmethod
def register_args_subparser(
- parser: "argparse._SubParsersAction[argparse.ArgumentParser]",
+ subparser: argparse._SubParsersAction[argparse.ArgumentParser],
) -> Tuple[argparse.ArgumentParser, "Type[Command]"]:
- config = parser.add_parser("config", help="change configuration of a running resolver")
+ config = subparser.add_parser("config", help="change configuration of a running resolver")
config.add_argument(
"path",
type=str,
return config, ConfigCommand
+ @staticmethod
+ def completion(args: List[str], parser: argparse.ArgumentParser) -> List[str]:
+ return []
+
def run(self, args: CommandArgs) -> None:
if not self.path.startswith("/"):
self.path = "/" + self.path
new_config = None
url = f"{args.socket}/v1/config{self.path}"
- method = _method_map(self.operation)
+ method = operation_to_method(self.operation)
if self.operation == Operations.SET:
# use STDIN also when value_or_file is not specified
except FileNotFoundError:
new_config = self.value_or_file
- response = request(method, url, _format(new_config, Formats.JSON))
+ response = request(method, url, reformat(new_config, Formats.JSON) if new_config else None)
print(f"status: {response.status}")
if self.operation == Operations.GET and self.value_or_file:
with open(self.value_or_file, "w") as f:
- f.write(_format(response.body, self.format))
+ f.write(reformat(response.body, self.format))
print(f"response body saved to: {self.value_or_file}")
else:
- print(_format(response.body, self.format))
+ print(reformat(response.body, self.format))
import argparse
-import json
-from typing import Optional, Tuple, Type
-
-import yaml
+from typing import List, Optional, Tuple, Type
from knot_resolver_manager.cli.command import Command, CommandArgs, register_command
from knot_resolver_manager.datamodel import KresConfig
-from knot_resolver_manager.utils.modeling.parsing import ParsedTree, parse_json, parse_yaml
-
-
-def _parse_data(input: str) -> Optional[ParsedTree]:
- try:
- return parse_yaml(input)
- except yaml.YAMLError:
- print(f"failed to parse input as YAML")
- try:
- return parse_json(input)
- except json.JSONDecodeError:
- print(f"failed to parse input as JSON")
- return None
+from knot_resolver_manager.utils.modeling import try_to_parse
+from knot_resolver_manager.utils.modeling.exceptions import DataParsingError
@register_command
class ConvertCommand(Command):
- def __init__(self, namespace: argparse.Namespace) -> None:
- super().__init__(namespace)
+ def __init__(self, namespace: argparse.Namespace, unknown_args: List[str]) -> None:
+ super().__init__(namespace, unknown_args)
self.input_file: str = namespace.input_file
self.output_file: Optional[str] = namespace.output_file
@staticmethod
def register_args_subparser(
- parser: "argparse._SubParsersAction[argparse.ArgumentParser]",
+ subparser: argparse._SubParsersAction[argparse.ArgumentParser],
) -> Tuple[argparse.ArgumentParser, "Type[Command]"]:
- config = parser.add_parser("convert", help="convert JSON/YAML configuration to Lua script")
- config.add_argument(
+ convert = subparser.add_parser("convert", help="convert JSON/YAML configuration to Lua script")
+ convert.add_argument(
"input_file",
type=str,
help="JSON/YAML configuration input file",
)
- config.add_argument("--stdin", help="read new config value on stdin", action="store_true", default=False)
- config.add_argument(
+ convert.add_argument("--stdin", help="read new config value on stdin", action="store_true", default=False)
+ convert.add_argument(
"output_file",
type=str,
nargs="?",
default=None,
)
- return config, ConvertCommand
+ return convert, ConvertCommand
+
+ @staticmethod
+ def completion(args: List[str], parser: argparse.ArgumentParser) -> List[str]:
+ return []
def run(self, args: CommandArgs) -> None:
with open(self.input_file, "r") as f:
data = f.read()
- parsed = _parse_data(data)
- if not parsed:
+ try:
+ parsed = try_to_parse(data)
+ except DataParsingError as e:
+ print(e)
return
lua = KresConfig(parsed).render_lua()
import argparse
import sys
-from typing import Tuple, Type
+from typing import List, Tuple, Type
from knot_resolver_manager.cli.command import Command, CommandArgs, register_command
@register_command
class ExitCommand(Command):
- def __init__(self, namespace: argparse.Namespace) -> None:
- super().__init__(namespace)
+ def __init__(self, namespace: argparse.Namespace, unknown_args: List[str]) -> None:
+ super().__init__(namespace, unknown_args)
def run(self, args: CommandArgs) -> None:
sys.exit()
+ @staticmethod
+ def completion(args: List[str], parser: argparse.ArgumentParser) -> List[str]:
+ return []
+
@staticmethod
def register_args_subparser(
- parser: "argparse._SubParsersAction[argparse.ArgumentParser]",
+ subparser: argparse._SubParsersAction[argparse.ArgumentParser],
) -> Tuple[argparse.ArgumentParser, "Type[Command]"]:
- stop = parser.add_parser("exit", help="exit kresctl")
+ stop = subparser.add_parser("exit", help="exit kresctl")
return stop, ExitCommand
import argparse
-from typing import Optional, Tuple, Type
+from typing import List, Optional, Tuple, Type
from knot_resolver_manager.cli.command import Command, CommandArgs, register_command
from knot_resolver_manager.utils.requests import request
@register_command
class MetricsCommand(Command):
- def __init__(self, namespace: argparse.Namespace) -> None:
+ def __init__(self, namespace: argparse.Namespace, unknown_args: List[str]) -> None:
self.file: Optional[str] = namespace.file
- super().__init__(namespace)
+ super().__init__(namespace, unknown_args)
@staticmethod
def register_args_subparser(
- parser: "argparse._SubParsersAction[argparse.ArgumentParser]",
+ subparser: argparse._SubParsersAction[argparse.ArgumentParser],
) -> Tuple[argparse.ArgumentParser, "Type[Command]"]:
- metrics = parser.add_parser("metrics", help="get prometheus metrics data")
+ metrics = subparser.add_parser("metrics", help="get prometheus metrics data")
metrics.add_argument("file", help="optional, file to export metrics to", nargs="?", default=None)
return metrics, MetricsCommand
+ @staticmethod
+ def completion(args: List[str], parser: argparse.ArgumentParser) -> List[str]:
+ return []
+
def run(self, args: CommandArgs) -> None:
url = f"{args.socket}/metrics"
response = request("GET", url)
import argparse
-from typing import Optional, Tuple, Type
+from typing import List, Optional, Tuple, Type
from knot_resolver_manager.cli.command import Command, CommandArgs, register_command
from knot_resolver_manager.utils.requests import request
@register_command
class SchemaCommand(Command):
- def __init__(self, namespace: argparse.Namespace) -> None:
+ def __init__(self, namespace: argparse.Namespace, unknown_args: List[str]) -> None:
self.file: Optional[str] = namespace.file
- super().__init__(namespace)
+ super().__init__(namespace, unknown_args)
@staticmethod
def register_args_subparser(
- parser: "argparse._SubParsersAction[argparse.ArgumentParser]",
+ subparser: argparse._SubParsersAction[argparse.ArgumentParser],
) -> Tuple[argparse.ArgumentParser, "Type[Command]"]:
- schema = parser.add_parser("schema", help="get JSON schema reprezentation of the configuration")
+ schema = subparser.add_parser("schema", help="get JSON schema reprezentation of the configuration")
schema.add_argument("file", help="optional, file to export JSON schema to", nargs="?", default=None)
return schema, SchemaCommand
+ @staticmethod
+ def completion(args: List[str], parser: argparse.ArgumentParser) -> List[str]:
+ return []
+
def run(self, args: CommandArgs) -> None:
url = f"{args.socket}/schema"
response = request("GET", url)
import argparse
-from typing import Tuple, Type
+from typing import List, Tuple, Type
from knot_resolver_manager.cli.command import Command, CommandArgs, register_command
from knot_resolver_manager.utils.requests import request
@register_command
class StopCommand(Command):
- def __init__(self, namespace: argparse.Namespace) -> None:
- super().__init__(namespace)
+ def __init__(self, namespace: argparse.Namespace, unknown_args: List[str]) -> None:
+ super().__init__(namespace, unknown_args)
def run(self, args: CommandArgs) -> None:
url = f"{args.socket}/stop"
response = request("POST", url)
print(response)
+ @staticmethod
+ def completion(args: List[str], parser: argparse.ArgumentParser) -> List[str]:
+ return []
+
@staticmethod
def register_args_subparser(
- parser: "argparse._SubParsersAction[argparse.ArgumentParser]",
+ subparser: argparse._SubParsersAction[argparse.ArgumentParser],
) -> Tuple[argparse.ArgumentParser, "Type[Command]"]:
- stop = parser.add_parser("stop", help="shutdown everything")
+ stop = subparser.add_parser("stop", help="shutdown everything")
return stop, StopCommand
@staticmethod
@abstractmethod
def register_args_subparser(
- parser: argparse.ArgumentParser,
+ subparser: argparse._SubParsersAction[argparse.ArgumentParser],
) -> Tuple[argparse.ArgumentParser, "Type[Command]"]:
raise NotImplementedError()
@abstractmethod
- def __init__(self, namespace: argparse.Namespace) -> None:
+ def __init__(self, namespace: argparse.Namespace, unknown_args: List[str]) -> None:
super().__init__()
@abstractmethod
def run(self, args: CommandArgs) -> None:
raise NotImplementedError()
+
+ @staticmethod
+ @abstractmethod
+ def completion(args: List[str], parser: argparse.ArgumentParser) -> List[str]:
+ raise NotImplementedError()
import argparse
+from typing import List
from knot_resolver_manager.cli.command import CommandArgs
class Kresctl:
- def __init__(self, namespace: argparse.Namespace, parser: argparse.ArgumentParser, prompt: str = "kresctl") -> None:
+ def __init__(
+ self,
+ namespace: argparse.Namespace,
+ unknown_args: List[str],
+ parser: argparse.ArgumentParser,
+ prompt: str = "kresctl",
+ ) -> None:
self.path = None
self.prompt = prompt
self.namespace = namespace
+ self.unknown_args = unknown_args
self.parser = parser
def execute(self):
cmd_args = CommandArgs(self.namespace, self.parser)
- command = self.namespace.command(self.namespace)
+ command = self.namespace.command(self.namespace, self.unknown_args)
command.run(cmd_args)
def _prompt_format(self) -> str:
return _Format.JSON.parse_to_dict(data)
-def try_to_parse(data: str) -> ParsedTree:
+def try_to_parse(data: str) -> Any:
"""Attempt to parse the data as a YAML or JSON string."""
try:
return parse_yaml(data)