From: Oto Šťáva Date: Fri, 9 Feb 2024 09:55:17 +0000 (+0100) Subject: manager: use proper JSON values for socket communication X-Git-Tag: v6.0.7~23^2~4 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=c375cf2cbc5931a70594373065aa18e5ec18f9d5;p=thirdparty%2Fknot-resolver.git manager: use proper JSON values for socket communication This commit adds a special JSON mode for control sockets. The mode is activated by issuing a special `__json` command to the socket, resulting in all Lua objects returned by all subsequent commands to be serialized into JSONs, prepended by a 32-bit unsigned integer byte-length value. This JSON mode is now exclusively utilized by Manager, removing the need to hackily strip single-quotes from the output and to read the output by lines. Instead, it can always just read the 32-bit length value and subsequently the whole JSON-formatted message, which is now automatically deserialized into a Python object. --- diff --git a/daemon/engine.c b/daemon/engine.c index 1b91d0680..84a164feb 100644 --- a/daemon/engine.c +++ b/daemon/engine.c @@ -642,7 +642,17 @@ int engine_pcall(lua_State *L, int argc) return lua_pcall(L, argc, LUA_MULTRET, 0); } -int engine_cmd(lua_State *L, const char *str, bool raw) +const char *engine_eval_mode_str(enum engine_eval_mode mode) +{ + switch (mode) { +#define XX(cid) case ENGINE_EVAL_MODE_##cid: return #cid; + ENGINE_EVAL_MODE_MAP(XX) +#undef XX + } + return "(invalid)"; +} + +int engine_cmd(struct lua_State *L, const char *str, enum engine_eval_mode mode) { if (L == NULL) { return kr_error(ENOEXEC); @@ -651,7 +661,7 @@ int engine_cmd(lua_State *L, const char *str, bool raw) /* Evaluate results */ lua_getglobal(L, "eval_cmd"); lua_pushstring(L, str); - lua_pushboolean(L, raw); + lua_pushstring(L, engine_eval_mode_str(mode)); /* Check result. */ return engine_pcall(L, 2); diff --git a/daemon/engine.h b/daemon/engine.h index a97122133..e25590a83 100644 --- a/daemon/engine.h +++ b/daemon/engine.h @@ -31,12 +31,26 @@ int engine_init(void); * this and before `network_deinit`. */ void engine_deinit(void); +#define ENGINE_EVAL_MODE_MAP(XX) \ + XX(LUA_TABLE) \ + XX(RAW) \ + XX(JSON) \ + // + +enum engine_eval_mode { +#define XX(cid) ENGINE_EVAL_MODE_##cid, + ENGINE_EVAL_MODE_MAP(XX) +#undef XX +}; + +const char *engine_eval_mode_str(enum engine_eval_mode mode); + /** Perform a lua command within the sandbox. * * @return zero on success. * The result will be returned on the lua stack - an error message in case of failure. * http://www.lua.org/manual/5.1/manual.html#lua_pcall */ -int engine_cmd(struct lua_State *L, const char *str, bool raw); +int engine_cmd(struct lua_State *L, const char *str, enum engine_eval_mode mode); /** Execute current chunk in the sandbox */ int engine_pcall(struct lua_State *L, int argc); diff --git a/daemon/io.c b/daemon/io.c index d54019d2a..ac9a08e03 100644 --- a/daemon/io.c +++ b/daemon/io.c @@ -647,8 +647,9 @@ int io_listen_tcp(uv_loop_t *loop, uv_tcp_t *handle, int fd, int tcp_backlog, bo enum io_stream_mode { - io_mode_text = 0, - io_mode_binary = 1, + IO_MODE_TEXT = 0, + IO_MODE_BINARY = 1, + IO_MODE_JSON = 2, }; struct io_stream_data { @@ -753,20 +754,28 @@ void io_tty_process_input(uv_stream_t *stream, ssize_t nread, const uv_buf_t *bu /* Pseudo-command for switching to "binary output"; */ if (strcmp(cmd, "__binary") == 0) { - data->mode = io_mode_binary; + data->mode = IO_MODE_BINARY; + goto next_iter; + } + if (strcmp(cmd, "__json") == 0) { + data->mode = IO_MODE_JSON; goto next_iter; } - const bool cmd_failed = engine_cmd(L, cmd, false); + const bool cmd_failed = engine_cmd(L, cmd, + (data->mode == IO_MODE_JSON) + ? ENGINE_EVAL_MODE_JSON + : ENGINE_EVAL_MODE_LUA_TABLE); const char *message = NULL; size_t len_s; if (lua_gettop(L) > 0) { message = lua_tolstring(L, -1, &len_s); } - /* Send back the output, either in "binary" or normal mode. */ - if (data->mode == io_mode_binary) { - /* Leader expects length field in all cases */ + switch (data->mode) { + case IO_MODE_BINARY: + case IO_MODE_JSON: + /* Length-field-prepended mode */ if (!message || len_s > UINT32_MAX) { kr_log_error(IO, "unrepresentable response on control socket, " "sending back empty block (command '%s')\n", cmd); @@ -776,13 +785,16 @@ void io_tty_process_input(uv_stream_t *stream, ssize_t nread, const uv_buf_t *bu fwrite(&len_n, sizeof(len_n), 1, out); if (len_s > 0) fwrite(message, len_s, 1, out); - } else { + break; + case IO_MODE_TEXT: + /* Human-readable and console-printable mode */ if (message) fprintf(out, "%s", message); if (message || !args->quiet) fprintf(out, "\n"); if (!args->quiet) fprintf(out, "> "); + break; } /* Duplicate command and output to logs */ @@ -826,7 +838,7 @@ struct io_stream_data *io_tty_alloc_data(void) { struct io_stream_data *data = mm_alloc(pool, sizeof(struct io_stream_data)); data->buf = mp_start(pool->ctx, 512); - data->mode = io_mode_text; + data->mode = IO_MODE_TEXT; data->blen = 0; data->pool = pool; diff --git a/daemon/lua/sandbox.lua.in b/daemon/lua/sandbox.lua.in index 60fd29925..116489f87 100644 --- a/daemon/lua/sandbox.lua.in +++ b/daemon/lua/sandbox.lua.in @@ -514,7 +514,7 @@ modules.load('extended_error') -- Load keyfile_default trust_anchors.add_file('@keyfile_default@', @unmanaged@) -local function eval_cmd_compile(line, raw) +local function eval_cmd_compile(line, mode) -- Compatibility sandbox code loading local function load_code(code) if getfenv then -- Lua 5.1 @@ -523,8 +523,19 @@ local function eval_cmd_compile(line, raw) return load(code, nil, 't', _ENV) end end + + -- See `ENGINE_EVAL_MODE_MAP(XX)` C-macro for possible values local err, chunk - chunk, err = load_code(raw and 'return '..line or 'return table_print('..line..')') + if mode == "LUA_TABLE" then + chunk, err = load_code('return table_print(('..line..'))') + elseif mode == "RAW" then + chunk, err = load_code('return ('..line..')') + elseif mode == "JSON" then + chunk, err = load_code('return tojson(('..line..'))') + else + return nil, "invalid mode" + end + if err then chunk, err = load_code(line) end @@ -532,8 +543,8 @@ local function eval_cmd_compile(line, raw) end -- Interactive command evaluation -function eval_cmd(line, raw) - local chunk, err = eval_cmd_compile(line, raw) +function eval_cmd(line, mode) + local chunk, err = eval_cmd_compile(line, mode) if not err then return chunk() else @@ -642,7 +653,7 @@ end -- must be public because it is called from eval_cmd() -- when map() commands are read from control socket function _map_luaobj_call_wrapper(cmd) - local func = eval_cmd_compile(cmd, true) + local func = eval_cmd_compile(cmd, "RAW") local ret = kluautil.kr_table_pack(xpcall(func, debug.traceback)) local ok, serial = pcall(krprint.serialize_lua, ret, 'error') if not ok then @@ -747,7 +758,7 @@ function map(cmd, format) if (#cmd <= 0) then panic('map() command must be non-empty') end -- syntax check on input command to detect typos early - local chunk, err = eval_cmd_compile(cmd, false) + local chunk, err = eval_cmd_compile(cmd, "LUA_TABLE") if not chunk then panic('failure when compiling map() command: %s', err) end @@ -785,7 +796,7 @@ function map(cmd, format) log_info(ffi.C.LOG_GRP_SYSTEM, 'executing map() on %s: command %s', path_name, cmd) local ret if local_exec then - ret = eval_cmd(cmd) + ret = eval_cmd(cmd, "LUA_TABLE") else ret = map_send_recv(cmd, path) -- skip dead sockets (leftovers from dead instances) diff --git a/manager/knot_resolver_manager/datamodel/cache_schema.py b/manager/knot_resolver_manager/datamodel/cache_schema.py index 1c8a9fc38..40aa88970 100644 --- a/manager/knot_resolver_manager/datamodel/cache_schema.py +++ b/manager/knot_resolver_manager/datamodel/cache_schema.py @@ -19,9 +19,7 @@ from knot_resolver_manager.utils.modeling import ConfigSchema from knot_resolver_manager.utils.modeling.base_schema import lazy_default _CACHE_CLEAR_TEMPLATE = template_from_str( - "{% from 'macros/common_macros.lua.j2' import tojson %}" - "{% from 'macros/cache_macros.lua.j2' import cache_clear %}" - "{{ tojson(cache_clear(params)) }}" + "{% from 'macros/cache_macros.lua.j2' import cache_clear %} {{ cache_clear(params) }}" ) diff --git a/manager/knot_resolver_manager/datamodel/templates/macros/common_macros.lua.j2 b/manager/knot_resolver_manager/datamodel/templates/macros/common_macros.lua.j2 index 4d1bcc78b..4c2ba11a8 100644 --- a/manager/knot_resolver_manager/datamodel/templates/macros/common_macros.lua.j2 +++ b/manager/knot_resolver_manager/datamodel/templates/macros/common_macros.lua.j2 @@ -1,7 +1,3 @@ -{% macro tojson(object) -%} -tojson({{ object }}) -{%- endmacro %} - {% macro quotes(string) -%} '{{ string }}' {%- endmacro %} diff --git a/manager/knot_resolver_manager/datamodel/templates/monitoring.lua.j2 b/manager/knot_resolver_manager/datamodel/templates/monitoring.lua.j2 index 0825928db..624b59ab7 100644 --- a/manager/knot_resolver_manager/datamodel/templates/monitoring.lua.j2 +++ b/manager/knot_resolver_manager/datamodel/templates/monitoring.lua.j2 @@ -24,10 +24,10 @@ function collect_lazy_statistics() modules.load('stats') end - return tojson(stats.list()) + return stats.list() end --- function used for statistics collection function collect_statistics() - return tojson(stats.list()) + return stats.list() end diff --git a/manager/knot_resolver_manager/kresd_controller/interface.py b/manager/knot_resolver_manager/kresd_controller/interface.py index 30ec03574..77197a320 100644 --- a/manager/knot_resolver_manager/kresd_controller/interface.py +++ b/manager/knot_resolver_manager/kresd_controller/interface.py @@ -1,6 +1,8 @@ import asyncio import itertools +import json import logging +import struct import sys from abc import ABC, abstractmethod # pylint: disable=no-name-in-module from enum import Enum, auto @@ -165,7 +167,7 @@ class Subprocess(ABC): def id(self) -> KresID: return self._id - async def command(self, cmd: str) -> str: + async def command(self, cmd: str) -> object: reader: asyncio.StreamReader writer: Optional[asyncio.StreamWriter] = None try: @@ -174,14 +176,18 @@ class Subprocess(ABC): # drop prompt _ = await reader.read(2) + # switch to JSON mode + writer.write("__json\n".encode("utf8")) + # write command writer.write(cmd.encode("utf8")) writer.write(b"\n") await writer.drain() # read result - result_bytes = await reader.readline() - return result_bytes.decode("utf8")[:-1] # strip trailing newline + (msg_len,) = struct.unpack(">I", await reader.read(4)) + result_bytes = await reader.readexactly(msg_len) + return json.loads(result_bytes.decode("utf8")) finally: if writer is not None: diff --git a/manager/knot_resolver_manager/kresd_controller/registered_workers.py b/manager/knot_resolver_manager/kresd_controller/registered_workers.py index 9a3a0d368..b6ea834ed 100644 --- a/manager/knot_resolver_manager/kresd_controller/registered_workers.py +++ b/manager/knot_resolver_manager/kresd_controller/registered_workers.py @@ -18,7 +18,7 @@ def get_registered_workers_kresids() -> "List[KresID]": return list(_REGISTERED_WORKERS.keys()) -async def command_single_registered_worker(cmd: str) -> "Tuple[KresID, str]": +async def command_single_registered_worker(cmd: str) -> "Tuple[KresID, object]": for sub in _REGISTERED_WORKERS.values(): return sub.id, await sub.command(cmd) raise SubprocessControllerException( @@ -27,8 +27,8 @@ async def command_single_registered_worker(cmd: str) -> "Tuple[KresID, str]": ) -async def command_registered_workers(cmd: str) -> "Dict[KresID, str]": - async def single_pair(sub: "Subprocess") -> "Tuple[KresID, str]": +async def command_registered_workers(cmd: str) -> "Dict[KresID, object]": + async def single_pair(sub: "Subprocess") -> "Tuple[KresID, object]": return sub.id, await sub.command(cmd) pairs = await asyncio.gather(*(single_pair(inst) for inst in _REGISTERED_WORKERS.values())) diff --git a/manager/knot_resolver_manager/server.py b/manager/knot_resolver_manager/server.py index 4b507b459..6834e70fb 100644 --- a/manager/knot_resolver_manager/server.py +++ b/manager/knot_resolver_manager/server.py @@ -260,7 +260,7 @@ class Server: _, result = await command_single_registered_worker(config.render_lua()) return web.Response( - body=result, + body=json.dumps(result), content_type="application/json", charset="utf8", ) diff --git a/manager/knot_resolver_manager/statistics.py b/manager/knot_resolver_manager/statistics.py index 50cfc7f1c..aa51c423d 100644 --- a/manager/knot_resolver_manager/statistics.py +++ b/manager/knot_resolver_manager/statistics.py @@ -69,7 +69,7 @@ def _histogram( class ResolverCollector: def __init__(self, config_store: ConfigStore) -> None: - self._stats_raw: "Optional[Dict[KresID, str]]" = None + self._stats_raw: "Optional[Dict[KresID, object]]" = None self._config_store: ConfigStore = config_store self._collection_task: "Optional[asyncio.Task[None]]" = None self._skip_immediate_collection: bool = False @@ -148,8 +148,7 @@ class ResolverCollector: success = False try: if kresid in self._stats_raw: - raw = self._stats_raw[kresid] - metrics: Dict[str, int] = json.loads(raw[1:-1]) + metrics = self._stats_raw[kresid] yield from self._parse_resolver_metrics(kresid, metrics) success = True except json.JSONDecodeError: