]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
manager: client: completion improvements
authorAleš Mrázek <ales.mrazek@nic.cz>
Thu, 8 Dec 2022 13:33:26 +0000 (14:33 +0100)
committerAleš Mrázek <ales.mrazek@nic.cz>
Tue, 10 Jan 2023 18:57:14 +0000 (19:57 +0100)
manager/knot_resolver_manager/cli/cmd/completion.py
manager/knot_resolver_manager/cli/cmd/config.py
manager/knot_resolver_manager/cli/command.py

index 56286f6fcbc255b35742237a18858e4e27f833df..a769ac350939475acd0634f9f1c9dc99fd4c891b 100644 (file)
@@ -1,8 +1,16 @@
 import argparse
 from enum import Enum
-from typing import List, Optional, Tuple, Type
+from typing import List, Tuple, Type
 
-from knot_resolver_manager.cli.command import Command, CommandArgs, CompWords, register_command
+from knot_resolver_manager.cli.command import (
+    Command,
+    CommandArgs,
+    CompWords,
+    parser_words,
+    register_command,
+    subparser_by_name,
+    subparser_command,
+)
 
 
 class Shells(Enum):
@@ -10,32 +18,6 @@ class Shells(Enum):
     FISH = 1
 
 
-def _parser_top_lvl_words(actions: List[argparse.Action]) -> CompWords:
-    words: CompWords = {}
-
-    for action in actions:
-        if isinstance(action, argparse._SubParsersAction):
-            for sub in action._get_subactions():
-                words[sub.dest] = sub.help
-        else:
-            for s in action.option_strings:
-                words[s] = action.help
-    return words
-
-
-def _subparser_words(comp_args: List[str], actions: List[argparse.Action]) -> Optional[CompWords]:
-    for arg in comp_args:
-        for action in actions:
-            if isinstance(action, argparse._SubParsersAction) and arg in action.choices:
-                subparser: argparse.ArgumentParser = action.choices[arg]
-                command: Command = subparser._defaults["command"]
-
-                subparser_args = comp_args[comp_args.index(arg) + 1 :]
-                if subparser_args:
-                    return command.completion(subparser_args, subparser)
-    return None
-
-
 @register_command
 class CompletionCommand(Command):
     def __init__(self, namespace: argparse.Namespace) -> None:
@@ -75,26 +57,38 @@ class CompletionCommand(Command):
 
     @staticmethod
     def completion(args: List[str], parser: argparse.ArgumentParser) -> CompWords:
-        comp: CompWords = {}
-
+        words: CompWords = {}
         for action in parser._actions:
             for opt in action.option_strings:
-                comp[opt] = action.help
-        return comp
+                words[opt] = action.help
+        return words
 
     def run(self, args: CommandArgs) -> None:
-        parser = args.parser
+        subparsers = args.parser._subparsers
         words: CompWords = {}
 
-        if parser._subparsers:
-            subparser_words = _subparser_words(self.comp_args, parser._subparsers._actions)
-
-            if subparser_words is None:
-                # parser top level options/commands
-                words = _parser_top_lvl_words(parser._subparsers._actions)
-            else:
-                # subparsers optons/commands
-                words = subparser_words
+        if subparsers:
+            words = parser_words(subparsers._actions)
+
+            uargs = iter(self.comp_args)
+            for uarg in uargs:
+                subparser = subparser_by_name(uarg, subparsers._actions)  # pylint: disable=W0212
+
+                if subparser:
+                    cmd: Command = subparser_command(subparser)
+                    subparser_args = self.comp_args[self.comp_args.index(uarg) + 1 :]
+                    if subparser_args:
+                        words = cmd.completion(subparser_args, subparser)
+                    break
+                elif uarg in ["-s", "--socket"]:
+                    # if arg is socket config, skip next arg
+                    next(uargs)
+                    continue
+                elif uarg in words:
+                    # uarg is walid arg, continue
+                    continue
+                else:
+                    raise ValueError(f"unknown argument: {uarg}")
 
         # print completion words
         # based on required bash/fish shell format
index a4a08a6bf2c53d2727020d0a3d3d282c10f5f610..6ce63a55f0ff43efba9e908f54401bb1907e7f77 100644 (file)
@@ -6,7 +6,7 @@ from typing import Any, Dict, List, Optional, Tuple, Type
 import yaml
 from typing_extensions import Literal
 
-from knot_resolver_manager.cli.command import Command, CommandArgs, CompWords, register_command
+from knot_resolver_manager.cli.command import Command, CommandArgs, CompWords, parser_words, register_command
 from knot_resolver_manager.datamodel.config_schema import KresConfig
 from knot_resolver_manager.utils.modeling import try_to_parse
 from knot_resolver_manager.utils.requests import request
@@ -40,39 +40,34 @@ def reformat(data: str, req_format: Formats) -> str:
     return json.dumps(dict, indent=4)
 
 
-def properties_comp(props: Dict[str, Any]) -> CompWords:
-    comp: CompWords = {}
+def _properties_words(props: Dict[str, Any]) -> CompWords:
+    words: CompWords = {}
     for name, prop in props.items():
-        comp[name] = prop["description"] if "description" in prop else None
-    return comp
-
-
-def node_comp(nodes: List[str], props: Dict[str, Any]) -> CompWords:
-
-    if len(nodes) > 1 and nodes[0] in props:
-        prop = props[nodes[0]]
-        if "properties" in prop:
-            return node_comp(nodes[1:], prop["properties"])
-        else:
-            return {}
+        words[name] = prop["description"] if "description" in prop else None
+    return words
+
+
+def _path_comp_words(node: str, nodes: List[str], props: Dict[str, Any]) -> CompWords:
+    i = nodes.index(node)
+    ln = len(nodes[i:])
+
+    # if node is last in path, return all possible words on thi level
+    if ln == 1:
+        return _properties_words(props)
+    # if node is valid
+    elif node in props:
+        node_schema = props[node]
+
+        # arrays/lists must be handled sparately
+        if node_schema["type"] == "array":
+            if ln > 2:
+                # skip index for item in array
+                return _path_comp_words(nodes[i + 2], nodes, node_schema["items"]["properties"])
+            return {"0": "first array item", "-": "last array item"}
+        return _path_comp_words(nodes[i + 1], nodes, node_schema["properties"])
     else:
-        return properties_comp(props)
-
-
-def config_path_comp(path: str) -> Dict[str, Optional[str]]:
-    nodes = path[1:].split("/") if path.startswith("/") else path.split("/")
-    properties: Dict[str, Any] = KresConfig.json_schema()["properties"]
-    return node_comp(nodes, properties)
-
-
-def subparser_comp(parser: argparse.ArgumentParser) -> Dict[str, Optional[str]]:
-    comp: Dict[str, Optional[str]] = {}
-
-    for action in parser._actions:
-        if isinstance(action, (argparse._StoreConstAction, argparse._HelpAction)):
-            for s in action.option_strings:
-                comp[s] = action.help
-    return comp
+        # if node is not last or valid, value error
+        raise ValueError(f"unknown config path node: {node}")
 
 
 @register_command
@@ -146,15 +141,17 @@ class ConfigCommand(Command):
 
     @staticmethod
     def completion(args: List[str], parser: argparse.ArgumentParser) -> Dict[str, Optional[str]]:
-        top_comp = subparser_comp(parser)
+        words = parser_words(parser._actions)  # pylint: disable=W0212
 
         for arg in args:
-            if arg in top_comp:
+            if arg in words:
                 continue
             elif arg.startswith("-"):
-                return top_comp
+                return words
             elif arg == args[-1]:
-                return config_path_comp(arg)
+                config_path = arg[1:].split("/") if arg.startswith("/") else arg.split("/")
+                schema_props: Dict[str, Any] = KresConfig.json_schema()["properties"]
+                return _path_comp_words(config_path[0], config_path, schema_props)
             else:
                 break
         return {}
index 625ad39d52611e1960de4b6df61bf1a29f3ec3c2..f2278d403d633b93a4eee7ffff4897d8ce433c5e 100644 (file)
@@ -56,3 +56,28 @@ class Command(ABC):
     @abstractmethod
     def completion(args: List[str], parser: argparse.ArgumentParser) -> CompWords:
         raise NotImplementedError()
+
+
+def parser_words(actions: List[argparse.Action]) -> CompWords:
+    words: CompWords = {}
+    for action in actions:
+        if isinstance(action, argparse._SubParsersAction):  # pylint: disable=W0212
+            for sub in action._get_subactions():  # pylint: disable=W0212
+                words[sub.dest] = sub.help
+        elif isinstance(
+            action, (argparse._StoreConstAction, argparse._StoreAction, argparse._HelpAction)  # pylint: disable=W0212
+        ):  # pylint: disable=W0212
+            for s in action.option_strings:
+                words[s] = action.help
+    return words
+
+
+def subparser_by_name(subparser_name: str, actions: List[argparse.Action]) -> Optional[argparse.ArgumentParser]:
+    for action in actions:
+        if isinstance(action, argparse._SubParsersAction) and subparser_name in action.choices:  # pylint: disable=W0212
+            return action.choices[subparser_name]
+    return None
+
+
+def subparser_command(subparser: argparse.ArgumentParser) -> Command:
+    return subparser._defaults["command"]  # pylint: disable=W0212