]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
manager: kresctl: 'completion' method for commands added
authorAleš Mrázek <ales.mrazek@nic.cz>
Wed, 21 Sep 2022 10:38:09 +0000 (12:38 +0200)
committerAleš Mrázek <ales.mrazek@nic.cz>
Tue, 10 Jan 2023 18:57:13 +0000 (19:57 +0100)
- lint and formating fixes

manager/knot_resolver_manager/cli/__main__.py
manager/knot_resolver_manager/cli/cmd/completion.py
manager/knot_resolver_manager/cli/cmd/config.py
manager/knot_resolver_manager/cli/cmd/convert.py
manager/knot_resolver_manager/cli/cmd/exit.py
manager/knot_resolver_manager/cli/cmd/metrics.py
manager/knot_resolver_manager/cli/cmd/schema.py
manager/knot_resolver_manager/cli/cmd/stop.py
manager/knot_resolver_manager/cli/command.py
manager/knot_resolver_manager/cli/kresctl.py
manager/knot_resolver_manager/utils/modeling/parsing.py

index ea07cc67ee7165eddcd9432cbe369776b41cc336..5daf4663001efac7b10248af7d8e312501f9d566 100644 (file)
@@ -41,8 +41,8 @@ def main() -> None:
     autoimport_commands()
     parser = create_main_argument_parser()
     install_commands_parsers(parser)
-    namespace = parser.parse_args()
-    kresctl = Kresctl(namespace, parser)
+    namespace, unknown_args = parser.parse_known_args()
+    kresctl = Kresctl(namespace, unknown_args, parser)
 
     if namespace.interactive or len(vars(namespace)) == 2:
         kresctl.interactive()
index 601dd97ab38a76522839868f765cfda4ae5bd84a..751c6e61c31d979001ff0222c79b2c5f250b3b05 100644 (file)
@@ -1,32 +1,70 @@
 import argparse
-from typing import Tuple, Type
+from enum import Enum
+from typing import Dict, List, Optional, Tuple, Type
 
 from knot_resolver_manager.cli.command import Command, CommandArgs, register_command
 
 
-class Shells:
+class Shells(Enum):
     BASH = 0
     FISH = 1
 
 
 @register_command
 class CompletionCommand(Command):
-    def __init__(self, namespace: argparse.Namespace) -> None:
-        super().__init__(namespace)
+    def __init__(self, namespace: argparse.Namespace, unknown_args: List[str]) -> None:
+        super().__init__(namespace, unknown_args)
+        self.shell: Shells = namespace.shell
+        self.unknown_args: List[str] = unknown_args
 
     @staticmethod
     def register_args_subparser(
-        parser: "argparse._SubParsersAction[argparse.ArgumentParser]",
+        subparser: argparse._SubParsersAction[argparse.ArgumentParser],
     ) -> Tuple[argparse.ArgumentParser, "Type[Command]"]:
-        completion = parser.add_parser("completion", help="commands auto-completion")
+        completion = subparser.add_parser("completion", help="commands auto-completion")
 
         shells_dest = "shell"
         shells = completion.add_mutually_exclusive_group()
         shells.add_argument("--bash", action="store_const", dest=shells_dest, const=Shells.BASH, default=Shells.BASH)
         shells.add_argument("--fish", action="store_const", dest=shells_dest, const=Shells.FISH)
 
-        completion.add_argument("values_to_complete", type=str, nargs="+", help="values to auto-complete")
         return completion, CompletionCommand
 
+    @staticmethod
+    def completion(args: List[str], parser: argparse.ArgumentParser) -> List[str]:
+        return []
+
     def run(self, args: CommandArgs) -> None:
-        pass
+        parser = args.parser
+        comp: List[str] = []
+
+        top_comp: Dict[str, Optional[str]] = {}
+
+        if parser._subparsers:
+            for action in parser._subparsers._actions:
+                if isinstance(action, argparse._SubParsersAction):
+                    for sub in action._get_subactions():
+                        top_comp[sub.dest] = sub.help
+                else:
+                    for s in action.option_strings:
+                        top_comp[s] = action.help
+
+            for arg in self.unknown_args:
+                for action in parser._subparsers._actions:
+                    if arg in action.option_strings:
+                        continue
+                    if isinstance(action, argparse._SubParsersAction) and arg in action.choices:
+                        subparser: argparse.ArgumentParser = action.choices[arg]
+                        command: Command = subparser._defaults["command"]
+
+                        comp += command.completion(self.unknown_args, subparser)
+                    else:
+                        pass
+
+        if self.shell == Shells.BASH:
+            print(" ".join(comp))
+        elif self.shell == Shells.FISH:
+            pass
+        else:
+            pass
+            # error
index dbf60f5cb706235e10b6a1a3785c735aef1c84ca..30b7803f365700eeb861258c810ac0cf827ecd9b 100644 (file)
@@ -1,63 +1,48 @@
 import argparse
 import json
-from typing import Optional, Tuple, Type
+from enum import Enum
+from typing import List, Optional, Tuple, Type
 
 import yaml
 from typing_extensions import Literal
 
 from knot_resolver_manager.cli.command import Command, CommandArgs, register_command
+from knot_resolver_manager.utils.modeling import try_to_parse
 from knot_resolver_manager.utils.requests import request
 
 
-class Operations:
+class Operations(Enum):
     SET = 0
     DELETE = 1
     GET = 2
 
 
-class Formats:
+class Formats(Enum):
     JSON = 0
     YAML = 1
 
 
-Methods = Literal["POST", "GET", "DELETE"]
-
-
-def _method_map(op: Operations) -> Methods:
-    if op == Operations.SET:
+def operation_to_method(operation: Operations) -> Literal["POST", "GET", "DELETE"]:
+    if operation == Operations.SET:
         return "POST"
-    elif op == Operations.DELETE:
+    elif operation == Operations.DELETE:
         return "DELETE"
-    elif op == Operations.GET:
-        return "GET"
-    else:
-        pass
 
+    return "GET"
 
-def _format(data: Optional[str], req_format: Formats) -> Optional[str]:
-    if not data:
-        return None
 
-    dic = {}
-    try:
-        dic = json.loads(data)
-    except json.JSONDecodeError:
-        try:
-            dic = yaml.load(data)
-        except yaml.YAMLError:
-            return data
+def reformat(data: str, req_format: Formats) -> str:
+    dict = try_to_parse(data).to_raw()
 
-    if req_format == Formats.JSON:
-        return json.dumps(dic, indent=4)
-    elif req_format == Formats.YAML:
-        return yaml.dump(dic, indent=4)
-    return None
+    if req_format == Formats.YAML:
+        return yaml.dump(dict, indent=4)
+    return json.dumps(dict, indent=4)
 
 
 @register_command
 class ConfigCommand(Command):
-    def __init__(self, namespace: argparse.Namespace) -> None:
-        super().__init__(namespace)
+    def __init__(self, namespace: argparse.Namespace, unknown_args: List[str]) -> None:
+        super().__init__(namespace, unknown_args)
         self.path: str = str(namespace.path)
         self.value_or_file: Optional[str] = namespace.value_or_file
         self.operation: Operations = namespace.operation
@@ -66,9 +51,9 @@ class ConfigCommand(Command):
 
     @staticmethod
     def register_args_subparser(
-        parser: "argparse._SubParsersAction[argparse.ArgumentParser]",
+        subparser: argparse._SubParsersAction[argparse.ArgumentParser],
     ) -> Tuple[argparse.ArgumentParser, "Type[Command]"]:
-        config = parser.add_parser("config", help="change configuration of a running resolver")
+        config = subparser.add_parser("config", help="change configuration of a running resolver")
         config.add_argument(
             "path",
             type=str,
@@ -123,13 +108,17 @@ class ConfigCommand(Command):
 
         return config, ConfigCommand
 
+    @staticmethod
+    def completion(args: List[str], parser: argparse.ArgumentParser) -> List[str]:
+        return []
+
     def run(self, args: CommandArgs) -> None:
         if not self.path.startswith("/"):
             self.path = "/" + self.path
 
         new_config = None
         url = f"{args.socket}/v1/config{self.path}"
-        method = _method_map(self.operation)
+        method = operation_to_method(self.operation)
 
         if self.operation == Operations.SET:
             # use STDIN also when value_or_file is not specified
@@ -142,12 +131,12 @@ class ConfigCommand(Command):
                 except FileNotFoundError:
                     new_config = self.value_or_file
 
-        response = request(method, url, _format(new_config, Formats.JSON))
+        response = request(method, url, reformat(new_config, Formats.JSON) if new_config else None)
         print(f"status: {response.status}")
 
         if self.operation == Operations.GET and self.value_or_file:
             with open(self.value_or_file, "w") as f:
-                f.write(_format(response.body, self.format))
+                f.write(reformat(response.body, self.format))
             print(f"response body saved to: {self.value_or_file}")
         else:
-            print(_format(response.body, self.format))
+            print(reformat(response.body, self.format))
index 2253014ff524b1b40a52ff88678762fe2ad3fbd3..2497c0b57284aaa3a0f28a657f395a23889f8849 100644 (file)
@@ -1,46 +1,32 @@
 import argparse
-import json
-from typing import Optional, Tuple, Type
-
-import yaml
+from typing import List, Optional, Tuple, Type
 
 from knot_resolver_manager.cli.command import Command, CommandArgs, register_command
 from knot_resolver_manager.datamodel import KresConfig
-from knot_resolver_manager.utils.modeling.parsing import ParsedTree, parse_json, parse_yaml
-
-
-def _parse_data(input: str) -> Optional[ParsedTree]:
-    try:
-        return parse_yaml(input)
-    except yaml.YAMLError:
-        print(f"failed to parse input as YAML")
-        try:
-            return parse_json(input)
-        except json.JSONDecodeError:
-            print(f"failed to parse input as JSON")
-            return None
+from knot_resolver_manager.utils.modeling import try_to_parse
+from knot_resolver_manager.utils.modeling.exceptions import DataParsingError
 
 
 @register_command
 class ConvertCommand(Command):
-    def __init__(self, namespace: argparse.Namespace) -> None:
-        super().__init__(namespace)
+    def __init__(self, namespace: argparse.Namespace, unknown_args: List[str]) -> None:
+        super().__init__(namespace, unknown_args)
         self.input_file: str = namespace.input_file
         self.output_file: Optional[str] = namespace.output_file
 
     @staticmethod
     def register_args_subparser(
-        parser: "argparse._SubParsersAction[argparse.ArgumentParser]",
+        subparser: argparse._SubParsersAction[argparse.ArgumentParser],
     ) -> Tuple[argparse.ArgumentParser, "Type[Command]"]:
-        config = parser.add_parser("convert", help="convert JSON/YAML configuration to Lua script")
-        config.add_argument(
+        convert = subparser.add_parser("convert", help="convert JSON/YAML configuration to Lua script")
+        convert.add_argument(
             "input_file",
             type=str,
             help="JSON/YAML configuration input file",
         )
 
-        config.add_argument("--stdin", help="read new config value on stdin", action="store_true", default=False)
-        config.add_argument(
+        convert.add_argument("--stdin", help="read new config value on stdin", action="store_true", default=False)
+        convert.add_argument(
             "output_file",
             type=str,
             nargs="?",
@@ -48,15 +34,21 @@ class ConvertCommand(Command):
             default=None,
         )
 
-        return config, ConvertCommand
+        return convert, ConvertCommand
+
+    @staticmethod
+    def completion(args: List[str], parser: argparse.ArgumentParser) -> List[str]:
+        return []
 
     def run(self, args: CommandArgs) -> None:
 
         with open(self.input_file, "r") as f:
             data = f.read()
 
-        parsed = _parse_data(data)
-        if not parsed:
+        try:
+            parsed = try_to_parse(data)
+        except DataParsingError as e:
+            print(e)
             return
 
         lua = KresConfig(parsed).render_lua()
index 95dcd815b75718b4325c5f35cc0be8b4879b120e..21c66def88b254adb05b564b04a88676c0fae523 100644 (file)
@@ -1,21 +1,25 @@
 import argparse
 import sys
-from typing import Tuple, Type
+from typing import List, Tuple, Type
 
 from knot_resolver_manager.cli.command import Command, CommandArgs, register_command
 
 
 @register_command
 class ExitCommand(Command):
-    def __init__(self, namespace: argparse.Namespace) -> None:
-        super().__init__(namespace)
+    def __init__(self, namespace: argparse.Namespace, unknown_args: List[str]) -> None:
+        super().__init__(namespace, unknown_args)
 
     def run(self, args: CommandArgs) -> None:
         sys.exit()
 
+    @staticmethod
+    def completion(args: List[str], parser: argparse.ArgumentParser) -> List[str]:
+        return []
+
     @staticmethod
     def register_args_subparser(
-        parser: "argparse._SubParsersAction[argparse.ArgumentParser]",
+        subparser: argparse._SubParsersAction[argparse.ArgumentParser],
     ) -> Tuple[argparse.ArgumentParser, "Type[Command]"]:
-        stop = parser.add_parser("exit", help="exit kresctl")
+        stop = subparser.add_parser("exit", help="exit kresctl")
         return stop, ExitCommand
index 8fd76d423e015f33c85ec52283f762297e45345b..27808fa9603256cd3389464bf6f7ca44ed57d4b7 100644 (file)
@@ -1,5 +1,5 @@
 import argparse
-from typing import Optional, Tuple, Type
+from typing import List, Optional, Tuple, Type
 
 from knot_resolver_manager.cli.command import Command, CommandArgs, register_command
 from knot_resolver_manager.utils.requests import request
@@ -7,19 +7,23 @@ from knot_resolver_manager.utils.requests import request
 
 @register_command
 class MetricsCommand(Command):
-    def __init__(self, namespace: argparse.Namespace) -> None:
+    def __init__(self, namespace: argparse.Namespace, unknown_args: List[str]) -> None:
         self.file: Optional[str] = namespace.file
 
-        super().__init__(namespace)
+        super().__init__(namespace, unknown_args)
 
     @staticmethod
     def register_args_subparser(
-        parser: "argparse._SubParsersAction[argparse.ArgumentParser]",
+        subparser: argparse._SubParsersAction[argparse.ArgumentParser],
     ) -> Tuple[argparse.ArgumentParser, "Type[Command]"]:
-        metrics = parser.add_parser("metrics", help="get prometheus metrics data")
+        metrics = subparser.add_parser("metrics", help="get prometheus metrics data")
         metrics.add_argument("file", help="optional, file to export metrics to", nargs="?", default=None)
         return metrics, MetricsCommand
 
+    @staticmethod
+    def completion(args: List[str], parser: argparse.ArgumentParser) -> List[str]:
+        return []
+
     def run(self, args: CommandArgs) -> None:
         url = f"{args.socket}/metrics"
         response = request("GET", url)
index 937c76ed1c0ccb90b2a47ed282ba25296ae9a147..d74668a71569e7737a805228f359d54d27353f65 100644 (file)
@@ -1,5 +1,5 @@
 import argparse
-from typing import Optional, Tuple, Type
+from typing import List, Optional, Tuple, Type
 
 from knot_resolver_manager.cli.command import Command, CommandArgs, register_command
 from knot_resolver_manager.utils.requests import request
@@ -7,19 +7,23 @@ from knot_resolver_manager.utils.requests import request
 
 @register_command
 class SchemaCommand(Command):
-    def __init__(self, namespace: argparse.Namespace) -> None:
+    def __init__(self, namespace: argparse.Namespace, unknown_args: List[str]) -> None:
         self.file: Optional[str] = namespace.file
 
-        super().__init__(namespace)
+        super().__init__(namespace, unknown_args)
 
     @staticmethod
     def register_args_subparser(
-        parser: "argparse._SubParsersAction[argparse.ArgumentParser]",
+        subparser: argparse._SubParsersAction[argparse.ArgumentParser],
     ) -> Tuple[argparse.ArgumentParser, "Type[Command]"]:
-        schema = parser.add_parser("schema", help="get JSON schema reprezentation of the configuration")
+        schema = subparser.add_parser("schema", help="get JSON schema reprezentation of the configuration")
         schema.add_argument("file", help="optional, file to export JSON schema to", nargs="?", default=None)
         return schema, SchemaCommand
 
+    @staticmethod
+    def completion(args: List[str], parser: argparse.ArgumentParser) -> List[str]:
+        return []
+
     def run(self, args: CommandArgs) -> None:
         url = f"{args.socket}/schema"
         response = request("GET", url)
index 663c574a06f71055334701e5034db13a35e3c5b3..db96d0bdad65a5502b9aef22d4fcfeded8d1aa1d 100644 (file)
@@ -1,5 +1,5 @@
 import argparse
-from typing import Tuple, Type
+from typing import List, Tuple, Type
 
 from knot_resolver_manager.cli.command import Command, CommandArgs, register_command
 from knot_resolver_manager.utils.requests import request
@@ -7,17 +7,21 @@ from knot_resolver_manager.utils.requests import request
 
 @register_command
 class StopCommand(Command):
-    def __init__(self, namespace: argparse.Namespace) -> None:
-        super().__init__(namespace)
+    def __init__(self, namespace: argparse.Namespace, unknown_args: List[str]) -> None:
+        super().__init__(namespace, unknown_args)
 
     def run(self, args: CommandArgs) -> None:
         url = f"{args.socket}/stop"
         response = request("POST", url)
         print(response)
 
+    @staticmethod
+    def completion(args: List[str], parser: argparse.ArgumentParser) -> List[str]:
+        return []
+
     @staticmethod
     def register_args_subparser(
-        parser: "argparse._SubParsersAction[argparse.ArgumentParser]",
+        subparser: argparse._SubParsersAction[argparse.ArgumentParser],
     ) -> Tuple[argparse.ArgumentParser, "Type[Command]"]:
-        stop = parser.add_parser("stop", help="shutdown everything")
+        stop = subparser.add_parser("stop", help="shutdown everything")
         return stop, StopCommand
index 6f5381862d41fef7f6fc97fd2d1885f87024be1e..c5d7d0374d8f38ba7de1f843dd2bddbafbf03b64 100644 (file)
@@ -38,14 +38,19 @@ class Command(ABC):
     @staticmethod
     @abstractmethod
     def register_args_subparser(
-        parser: argparse.ArgumentParser,
+        subparser: argparse._SubParsersAction[argparse.ArgumentParser],
     ) -> Tuple[argparse.ArgumentParser, "Type[Command]"]:
         raise NotImplementedError()
 
     @abstractmethod
-    def __init__(self, namespace: argparse.Namespace) -> None:
+    def __init__(self, namespace: argparse.Namespace, unknown_args: List[str]) -> None:
         super().__init__()
 
     @abstractmethod
     def run(self, args: CommandArgs) -> None:
         raise NotImplementedError()
+
+    @staticmethod
+    @abstractmethod
+    def completion(args: List[str], parser: argparse.ArgumentParser) -> List[str]:
+        raise NotImplementedError()
index 6eae603ac0a1810af77ceef8316fcf6b90e4735d..caa43736dad826d3695d9c560b4886846d9074b0 100644 (file)
@@ -1,18 +1,26 @@
 import argparse
+from typing import List
 
 from knot_resolver_manager.cli.command import CommandArgs
 
 
 class Kresctl:
-    def __init__(self, namespace: argparse.Namespace, parser: argparse.ArgumentParser, prompt: str = "kresctl") -> None:
+    def __init__(
+        self,
+        namespace: argparse.Namespace,
+        unknown_args: List[str],
+        parser: argparse.ArgumentParser,
+        prompt: str = "kresctl",
+    ) -> None:
         self.path = None
         self.prompt = prompt
         self.namespace = namespace
+        self.unknown_args = unknown_args
         self.parser = parser
 
     def execute(self):
         cmd_args = CommandArgs(self.namespace, self.parser)
-        command = self.namespace.command(self.namespace)
+        command = self.namespace.command(self.namespace, self.unknown_args)
         command.run(cmd_args)
 
     def _prompt_format(self) -> str:
index 6212905d96c4150b0a66c0929a980fec90493d80..79d7ad808b5834e8793df1d580a3a4c75c70ac10 100644 (file)
@@ -98,7 +98,7 @@ def parse_json(data: str) -> Any:
     return _Format.JSON.parse_to_dict(data)
 
 
-def try_to_parse(data: str) -> ParsedTree:
+def try_to_parse(data: str) -> Any:
     """Attempt to parse the data as a YAML or JSON string."""
     try:
         return parse_yaml(data)