]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
manager: use proper JSON values for socket communication
authorOto Šťáva <oto.stava@nic.cz>
Fri, 9 Feb 2024 09:55:17 +0000 (10:55 +0100)
committerAleš Mrázek <ales.mrazek@nic.cz>
Thu, 15 Feb 2024 09:51:05 +0000 (10:51 +0100)
This commit adds a special JSON mode for control sockets.

The mode is activated by issuing a special `__json` command to the
socket, resulting in all Lua objects returned by all subsequent commands
to be serialized into JSONs, prepended by a 32-bit unsigned integer
byte-length value.

This JSON mode is now exclusively utilized by Manager, removing the need
to hackily strip single-quotes from the output and to read the output by
lines. Instead, it can always just read the 32-bit length value and
subsequently the whole JSON-formatted message, which is now
automatically deserialized into a Python object.

daemon/engine.c
daemon/engine.h
daemon/io.c
daemon/lua/sandbox.lua.in
manager/knot_resolver_manager/datamodel/cache_schema.py
manager/knot_resolver_manager/datamodel/templates/macros/common_macros.lua.j2
manager/knot_resolver_manager/datamodel/templates/monitoring.lua.j2
manager/knot_resolver_manager/kresd_controller/interface.py
manager/knot_resolver_manager/kresd_controller/registered_workers.py
manager/knot_resolver_manager/server.py
manager/knot_resolver_manager/statistics.py

index 1b91d068037602007955279e511f29a485a6aaa1..84a164febe58b16d32c12aed3c2ee1d4b5404d81 100644 (file)
@@ -642,7 +642,17 @@ int engine_pcall(lua_State *L, int argc)
        return lua_pcall(L, argc, LUA_MULTRET, 0);
 }
 
-int engine_cmd(lua_State *L, const char *str, bool raw)
+const char *engine_eval_mode_str(enum engine_eval_mode mode)
+{
+       switch (mode) {
+#define XX(cid) case ENGINE_EVAL_MODE_##cid: return #cid;
+               ENGINE_EVAL_MODE_MAP(XX)
+#undef XX
+       }
+       return "(invalid)";
+}
+
+int engine_cmd(struct lua_State *L, const char *str, enum engine_eval_mode mode)
 {
        if (L == NULL) {
                return kr_error(ENOEXEC);
@@ -651,7 +661,7 @@ int engine_cmd(lua_State *L, const char *str, bool raw)
        /* Evaluate results */
        lua_getglobal(L, "eval_cmd");
        lua_pushstring(L, str);
-       lua_pushboolean(L, raw);
+       lua_pushstring(L, engine_eval_mode_str(mode));
 
        /* Check result. */
        return engine_pcall(L, 2);
index a97122133040e4a3756c5e493e880e32c4dddfa5..e25590a833f2a894d948a5724823f3874b08260e 100644 (file)
@@ -31,12 +31,26 @@ int engine_init(void);
  * this and before `network_deinit`. */
 void engine_deinit(void);
 
+#define ENGINE_EVAL_MODE_MAP(XX) \
+       XX(LUA_TABLE) \
+       XX(RAW) \
+       XX(JSON) \
+       //
+
+enum engine_eval_mode {
+#define XX(cid) ENGINE_EVAL_MODE_##cid,
+       ENGINE_EVAL_MODE_MAP(XX)
+#undef XX
+};
+
+const char *engine_eval_mode_str(enum engine_eval_mode mode);
+
 /** Perform a lua command within the sandbox.
  *
  *  @return zero on success.
  *  The result will be returned on the lua stack - an error message in case of failure.
  *  http://www.lua.org/manual/5.1/manual.html#lua_pcall */
-int engine_cmd(struct lua_State *L, const char *str, bool raw);
+int engine_cmd(struct lua_State *L, const char *str, enum engine_eval_mode mode);
 
 /** Execute current chunk in the sandbox */
 int engine_pcall(struct lua_State *L, int argc);
index d54019d2a3b8fdb948e40f704176c5a530f3e27e..ac9a08e03737f223da3d35df01e5170c60dfb12a 100644 (file)
@@ -647,8 +647,9 @@ int io_listen_tcp(uv_loop_t *loop, uv_tcp_t *handle, int fd, int tcp_backlog, bo
 
 
 enum io_stream_mode {
-       io_mode_text = 0,
-       io_mode_binary = 1,
+       IO_MODE_TEXT   = 0,
+       IO_MODE_BINARY = 1,
+       IO_MODE_JSON   = 2,
 };
 
 struct io_stream_data {
@@ -753,20 +754,28 @@ void io_tty_process_input(uv_stream_t *stream, ssize_t nread, const uv_buf_t *bu
 
                /* Pseudo-command for switching to "binary output"; */
                if (strcmp(cmd, "__binary") == 0) {
-                       data->mode = io_mode_binary;
+                       data->mode = IO_MODE_BINARY;
+                       goto next_iter;
+               }
+               if (strcmp(cmd, "__json") == 0) {
+                       data->mode = IO_MODE_JSON;
                        goto next_iter;
                }
 
-               const bool cmd_failed = engine_cmd(L, cmd, false);
+               const bool cmd_failed = engine_cmd(L, cmd,
+                               (data->mode == IO_MODE_JSON)
+                                       ? ENGINE_EVAL_MODE_JSON
+                                       : ENGINE_EVAL_MODE_LUA_TABLE);
                const char *message = NULL;
                size_t len_s;
                if (lua_gettop(L) > 0) {
                        message = lua_tolstring(L, -1, &len_s);
                }
 
-               /* Send back the output, either in "binary" or normal mode. */
-               if (data->mode == io_mode_binary) {
-                       /* Leader expects length field in all cases */
+               switch (data->mode) {
+               case IO_MODE_BINARY:
+               case IO_MODE_JSON:
+                       /* Length-field-prepended mode */
                        if (!message || len_s > UINT32_MAX) {
                                kr_log_error(IO, "unrepresentable response on control socket, "
                                                "sending back empty block (command '%s')\n", cmd);
@@ -776,13 +785,16 @@ void io_tty_process_input(uv_stream_t *stream, ssize_t nread, const uv_buf_t *bu
                        fwrite(&len_n, sizeof(len_n), 1, out);
                        if (len_s > 0)
                                fwrite(message, len_s, 1, out);
-               } else {
+                       break;
+               case IO_MODE_TEXT:
+                       /* Human-readable and console-printable mode */
                        if (message)
                                fprintf(out, "%s", message);
                        if (message || !args->quiet)
                                fprintf(out, "\n");
                        if (!args->quiet)
                                fprintf(out, "> ");
+                       break;
                }
 
                /* Duplicate command and output to logs */
@@ -826,7 +838,7 @@ struct io_stream_data *io_tty_alloc_data(void) {
        struct io_stream_data *data = mm_alloc(pool, sizeof(struct io_stream_data));
 
        data->buf = mp_start(pool->ctx, 512);
-       data->mode = io_mode_text;
+       data->mode = IO_MODE_TEXT;
        data->blen = 0;
        data->pool = pool;
 
index 60fd299254799680befe4b32300d8a9c3a124b71..116489f875793b60145ff5ce462aa52fb6001187 100644 (file)
@@ -514,7 +514,7 @@ modules.load('extended_error')
 -- Load keyfile_default
 trust_anchors.add_file('@keyfile_default@', @unmanaged@)
 
-local function eval_cmd_compile(line, raw)
+local function eval_cmd_compile(line, mode)
        -- Compatibility sandbox code loading
        local function load_code(code)
            if getfenv then -- Lua 5.1
@@ -523,8 +523,19 @@ local function eval_cmd_compile(line, raw)
                return load(code, nil, 't', _ENV)
            end
        end
+
+       -- See `ENGINE_EVAL_MODE_MAP(XX)` C-macro for possible values
        local err, chunk
-       chunk, err = load_code(raw and 'return '..line or 'return table_print('..line..')')
+       if mode == "LUA_TABLE" then
+               chunk, err = load_code('return table_print(('..line..'))')
+       elseif mode == "RAW" then
+               chunk, err = load_code('return ('..line..')')
+       elseif mode == "JSON" then
+               chunk, err = load_code('return tojson(('..line..'))')
+       else
+               return nil, "invalid mode"
+       end
+
        if err then
                chunk, err = load_code(line)
        end
@@ -532,8 +543,8 @@ local function eval_cmd_compile(line, raw)
 end
 
 -- Interactive command evaluation
-function eval_cmd(line, raw)
-       local chunk, err = eval_cmd_compile(line, raw)
+function eval_cmd(line, mode)
+       local chunk, err = eval_cmd_compile(line, mode)
        if not err then
                return chunk()
        else
@@ -642,7 +653,7 @@ end
 -- must be public because it is called from eval_cmd()
 -- when map() commands are read from control socket
 function _map_luaobj_call_wrapper(cmd)
-       local func = eval_cmd_compile(cmd, true)
+       local func = eval_cmd_compile(cmd, "RAW")
        local ret = kluautil.kr_table_pack(xpcall(func, debug.traceback))
        local ok, serial = pcall(krprint.serialize_lua, ret, 'error')
        if not ok then
@@ -747,7 +758,7 @@ function map(cmd, format)
        if (#cmd <= 0) then
                panic('map() command must be non-empty') end
        -- syntax check on input command to detect typos early
-       local chunk, err = eval_cmd_compile(cmd, false)
+       local chunk, err = eval_cmd_compile(cmd, "LUA_TABLE")
        if not chunk then
                panic('failure when compiling map() command: %s', err)
        end
@@ -785,7 +796,7 @@ function map(cmd, format)
                log_info(ffi.C.LOG_GRP_SYSTEM, 'executing map() on %s: command %s', path_name, cmd)
                local ret
                if local_exec then
-                       ret = eval_cmd(cmd)
+                       ret = eval_cmd(cmd, "LUA_TABLE")
                else
                        ret = map_send_recv(cmd, path)
                        -- skip dead sockets (leftovers from dead instances)
index 1c8a9fc38e86899ec7afafa72de9dd0e4ee353f8..40aa88970f23d4878faf64271f9ef2b39553d19e 100644 (file)
@@ -19,9 +19,7 @@ from knot_resolver_manager.utils.modeling import ConfigSchema
 from knot_resolver_manager.utils.modeling.base_schema import lazy_default
 
 _CACHE_CLEAR_TEMPLATE = template_from_str(
-    "{% from 'macros/common_macros.lua.j2' import tojson %}"
-    "{% from 'macros/cache_macros.lua.j2' import cache_clear %}"
-    "{{ tojson(cache_clear(params)) }}"
+    "{% from 'macros/cache_macros.lua.j2' import cache_clear %} {{ cache_clear(params) }}"
 )
 
 
index 4d1bcc78bebe8382062cadc2b1ebff580b2d3f72..4c2ba11a8e55c90d37c47a44f53e864a8cb10697 100644 (file)
@@ -1,7 +1,3 @@
-{% macro tojson(object) -%}
-tojson({{ object }})
-{%- endmacro %}
-
 {% macro quotes(string) -%}
 '{{ string }}'
 {%- endmacro %}
index 0825928db6f39413326cd643dd2cfa6ee599af95..624b59ab721ce61e87d55fe3a80a2e125cd1cad6 100644 (file)
@@ -24,10 +24,10 @@ function collect_lazy_statistics()
                modules.load('stats')
        end
 
-       return tojson(stats.list())
+       return stats.list()
 end
 
 --- function used for statistics collection
 function collect_statistics()
-       return tojson(stats.list())
+       return stats.list()
 end
index 30ec03574883bbb96fe55948d17566cd5f420a7b..77197a32047844596b4c27747644552b59c93871 100644 (file)
@@ -1,6 +1,8 @@
 import asyncio
 import itertools
+import json
 import logging
+import struct
 import sys
 from abc import ABC, abstractmethod  # pylint: disable=no-name-in-module
 from enum import Enum, auto
@@ -165,7 +167,7 @@ class Subprocess(ABC):
     def id(self) -> KresID:
         return self._id
 
-    async def command(self, cmd: str) -> str:
+    async def command(self, cmd: str) -> object:
         reader: asyncio.StreamReader
         writer: Optional[asyncio.StreamWriter] = None
         try:
@@ -174,14 +176,18 @@ class Subprocess(ABC):
             # drop prompt
             _ = await reader.read(2)
 
+            # switch to JSON mode
+            writer.write("__json\n".encode("utf8"))
+
             # write command
             writer.write(cmd.encode("utf8"))
             writer.write(b"\n")
             await writer.drain()
 
             # read result
-            result_bytes = await reader.readline()
-            return result_bytes.decode("utf8")[:-1]  # strip trailing newline
+            (msg_len,) = struct.unpack(">I", await reader.read(4))
+            result_bytes = await reader.readexactly(msg_len)
+            return json.loads(result_bytes.decode("utf8"))
 
         finally:
             if writer is not None:
index 9a3a0d36809ce02cc92d028a2dfe3cf45ea83b4f..b6ea834eddff72e57cc525acf1807ac80eb77a37 100644 (file)
@@ -18,7 +18,7 @@ def get_registered_workers_kresids() -> "List[KresID]":
     return list(_REGISTERED_WORKERS.keys())
 
 
-async def command_single_registered_worker(cmd: str) -> "Tuple[KresID, str]":
+async def command_single_registered_worker(cmd: str) -> "Tuple[KresID, object]":
     for sub in _REGISTERED_WORKERS.values():
         return sub.id, await sub.command(cmd)
     raise SubprocessControllerException(
@@ -27,8 +27,8 @@ async def command_single_registered_worker(cmd: str) -> "Tuple[KresID, str]":
     )
 
 
-async def command_registered_workers(cmd: str) -> "Dict[KresID, str]":
-    async def single_pair(sub: "Subprocess") -> "Tuple[KresID, str]":
+async def command_registered_workers(cmd: str) -> "Dict[KresID, object]":
+    async def single_pair(sub: "Subprocess") -> "Tuple[KresID, object]":
         return sub.id, await sub.command(cmd)
 
     pairs = await asyncio.gather(*(single_pair(inst) for inst in _REGISTERED_WORKERS.values()))
index 4b507b459124fa9897ca0b1e917543af12b62384..6834e70fb830d441a90b39b45115f7082c38ac4c 100644 (file)
@@ -260,7 +260,7 @@ class Server:
 
         _, result = await command_single_registered_worker(config.render_lua())
         return web.Response(
-            body=result,
+            body=json.dumps(result),
             content_type="application/json",
             charset="utf8",
         )
index 50cfc7f1cddf31221fe1b3d45402e5278807cdd0..aa51c423dcd38ee2cd24ef888979bda13bfefa8b 100644 (file)
@@ -69,7 +69,7 @@ def _histogram(
 
 class ResolverCollector:
     def __init__(self, config_store: ConfigStore) -> None:
-        self._stats_raw: "Optional[Dict[KresID, str]]" = None
+        self._stats_raw: "Optional[Dict[KresID, object]]" = None
         self._config_store: ConfigStore = config_store
         self._collection_task: "Optional[asyncio.Task[None]]" = None
         self._skip_immediate_collection: bool = False
@@ -148,8 +148,7 @@ class ResolverCollector:
             success = False
             try:
                 if kresid in self._stats_raw:
-                    raw = self._stats_raw[kresid]
-                    metrics: Dict[str, int] = json.loads(raw[1:-1])
+                    metrics = self._stats_raw[kresid]
                     yield from self._parse_resolver_metrics(kresid, metrics)
                     success = True
             except json.JSONDecodeError: