]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
map: use control sockets instead of pipe from parent process
authorLukáš Ježek <lukas.jezek@nic.cz>
Mon, 27 Apr 2020 12:00:17 +0000 (14:00 +0200)
committerTomas Krizek <tomas.krizek@nic.cz>
Mon, 26 Oct 2020 13:25:12 +0000 (14:25 +0100)
This change allows map() to work with systemd integration.

As a bonus the new client implementation is based on Lua cqueues
allows caller to wrap map() in worker.corroutine() and get
asynchronous execution/avoid blocking main loop.

Currently socket communication does not employ timeouts so a hang
instance will lead to hang map() call. This does not affect query
processing _if_ map() is being run in worker.corroutine.

Fixes: #554
Fixes: #620
.luacheckrc
NEWS
daemon/engine.c
daemon/engine.h
daemon/lua/distro-preconfig.lua.in
daemon/lua/kluautil.lua
daemon/lua/postconfig.lua
daemon/lua/sandbox.lua.in
daemon/main.c

index 7ff62248929e41efc4e3fdd2e8d3cbfd2c81a8e6..0cf0b884a49fa8fe5990dec06f37accf85101482 100644 (file)
@@ -2,6 +2,7 @@
 std = 'luajit'
 new_read_globals = {
        'cache',
+       'eval_cmd',
        'event',
        'help',
        '_hint_root_file',
diff --git a/NEWS b/NEWS
index b5dc884e23e4ff722384bee4b8b9c05bad90a9ac..871a4b40ba29f63851e03610cd97cad68c9ef611 100644 (file)
--- a/NEWS
+++ b/NEWS
@@ -19,6 +19,7 @@ Bugfixes
 --------
 - avoid an assert() error in stash_rrset() (!1072)
 - fix emergency cache locking bug introduced in 5.1.3 (!1078)
+- migrate map() command to control sockets; fix systemd integration (!1000)
 
 Incompatible changes
 --------------------
index 0023696d569169c8372f0a57fa301c8bb96f6405..59a6cf18936e6d494720cd312ed68fe696ac8167 100644 (file)
@@ -12,6 +12,7 @@
 #include <pwd.h>
 #include <sys/param.h>
 #include <libzscanner/scanner.h>
+#include <sys/un.h>
 
 #include <lua.h>
 #include <lualib.h>
@@ -371,62 +372,6 @@ static int l_fromjson(lua_State *L)
        return 1;
 }
 
-/** @internal Throw Lua error if expr is false */
-#define expr_checked(expr) \
-       if (!(expr)) { lua_pushboolean(L, false); lua_rawseti(L, -2, lua_objlen(L, -2) + 1); continue; }
-
-static int l_map(lua_State *L)
-{
-       /* We don't kr_log_deprecate() here for now.  Plan: after --forks gets *removed*,
-        * kill internal uses of map() (e.g. from daf module) and add deprecation here.
-        * Alternatively we might (attempt to) implement map() in another way. */
-       if (lua_gettop(L) != 1 || !lua_isstring(L, 1))
-               lua_error_p(L, "map('string with a lua expression')");
-
-       const char *cmd = lua_tostring(L, 1);
-       uint32_t len = strlen(cmd);
-       lua_newtable(L);
-
-       /* Execute on leader instance */
-       int ntop = lua_gettop(L);
-       engine_cmd(L, cmd, true);
-       lua_settop(L, ntop + 1); /* Push only one return value to table */
-       lua_rawseti(L, -2, 1);
-
-       for (size_t i = 0; i < the_worker->engine->ipc_set.len; ++i) {
-               int fd = the_worker->engine->ipc_set.at[i];
-               /* Send command */
-               expr_checked(write(fd, &len, sizeof(len)) == sizeof(len));
-               expr_checked(write(fd, cmd, len) == len);
-               /* Read response */
-               uint32_t rlen = 0;
-               if (read(fd, &rlen, sizeof(rlen)) == sizeof(rlen)) {
-                       expr_checked(rlen < UINT32_MAX);
-                       auto_free char *rbuf = malloc(rlen + 1);
-                       expr_checked(rbuf != NULL);
-                       expr_checked(read(fd, rbuf, rlen) == rlen);
-                       rbuf[rlen] = '\0';
-                       /* Unpack from JSON */
-                       JsonNode *root_node = json_decode(rbuf);
-                       if (root_node) {
-                               l_unpack_json(L, root_node);
-                       } else {
-                               lua_pushlstring(L, rbuf, rlen);
-                       }
-                       json_delete(root_node);
-                       lua_rawseti(L, -2, lua_objlen(L, -2) + 1);
-                       continue;
-               }
-               /* Didn't respond */
-               lua_pushboolean(L, false);
-               lua_rawseti(L, -2, lua_objlen(L, -2) + 1);
-       }
-       return 1;
-}
-
-#undef expr_checked
-
-
 /*
  * Engine API.
  */
@@ -497,8 +442,6 @@ static int init_state(struct engine *engine)
        lua_setglobal(engine->L, "tojson");
        lua_pushcfunction(engine->L, l_fromjson);
        lua_setglobal(engine->L, "fromjson");
-       lua_pushcfunction(engine->L, l_map);
-       lua_setglobal(engine->L, "map");
        /* Random number generator */
        lua_getfield(engine->L, LUA_GLOBALSINDEX, "math");
        lua_getfield(engine->L, -1, "randomseed");
@@ -628,9 +571,6 @@ void engine_deinit(struct engine *engine)
         * e.g. the endpoint kind registry to work (inside ->net),
         * and this registry deinitization uses the lua state. */
        network_close_force(&engine->net);
-       for (size_t i = 0; i < engine->ipc_set.len; ++i) {
-               close(engine->ipc_set.at[i]);
-       }
        for (size_t i = 0; i < engine->modules.len; ++i) {
                engine_unload(engine, engine->modules.at[i]);
        }
@@ -649,7 +589,6 @@ void engine_deinit(struct engine *engine)
        /* Free data structures */
        array_clear(engine->modules);
        array_clear(engine->backends);
-       array_clear(engine->ipc_set);
        kr_ta_clear(&engine->resolver.trust_anchors);
        kr_ta_clear(&engine->resolver.negative_anchors);
        free(engine->hostname);
@@ -675,22 +614,6 @@ int engine_cmd(lua_State *L, const char *str, bool raw)
        return engine_pcall(L, 2);
 }
 
-int engine_ipc(struct engine *engine, const char *expr)
-{
-       if (engine == NULL || engine->L == NULL) {
-               return kr_error(ENOEXEC);
-       }
-
-       /* Run expression and serialize response. */
-       engine_cmd(engine->L, expr, true);
-       if (lua_gettop(engine->L) > 0) {
-               l_tojson(engine->L);
-               return 1;
-       } else {
-               return 0;
-       }
-}
-
 int engine_load_sandbox(struct engine *engine)
 {
        /* Init environment */
index 3de30320be93e1be65ae80c68d009eb6c31327a4..d724f9255b4453bdf4551b7c835bfbdd27d2add9 100644 (file)
@@ -13,15 +13,11 @@ struct lua_State;
 #include "lib/resolve.h"
 #include "daemon/network.h"
 
-/* @internal Array of file descriptors shorthand. */
-typedef array_t(int) fd_array_t;
-
 struct engine {
     struct kr_context resolver;
     struct network net;
     module_array_t modules;
     array_t(const struct kr_cdb_api *) backends;
-    fd_array_t ipc_set;
     knot_mm_t *pool;
     char *hostname;
     struct lua_State *L;
@@ -40,9 +36,6 @@ int engine_cmd(struct lua_State *L, const char *str, bool raw);
 /** Execute current chunk in the sandbox */
 int engine_pcall(struct lua_State *L, int argc);
 
-int engine_ipc(struct engine *engine, const char *expr);
-
-
 int engine_load_sandbox(struct engine *engine);
 int engine_loadconf(struct engine *engine, const char *config_path);
 
index e017b35561e3e975714c8a477154a6c214c2b997..a27974a2b4eddc6b01892f07240046444c2b2257 100644 (file)
@@ -4,7 +4,8 @@ if not id then
        warn('environment variable $SYSTEMD_INSTANCE not set')
 else
        -- Bind to control socket in run_dir
-       local path = '@run_dir@/control/'..id
+       worker.control_path = '@run_dir@/control/'
+       local path = worker.control_path..id
        local ok, err = pcall(net.listen, path, nil, { kind = 'control' })
        if not ok then
                warn('bind to '..path..' failed '..err)
index 0d63c07fc0a34dd0ff5c37384215f739eb36f2ed..0b6a114fb910e732f6ae212d2963a1c72fa79fe0 100644 (file)
@@ -1,6 +1,7 @@
 -- SPDX-License-Identifier: GPL-3.0-or-later
 
 local cqerrno = require('cqueues.errno')
+local ffi = require('ffi')
 local kluautil = {}
 
 -- Get length of table
@@ -17,6 +18,21 @@ function kluautil.kr_table_len(t)
 end
 
 -- Fetch over HTTPS
+ffi.cdef([[
+       typedef struct __dirstream DIR;
+       struct dirent {
+               unsigned long int       d_ino;
+               long int                d_off;
+               unsigned short          d_reclen;
+               unsigned char           d_type;
+               char                    d_name[256];
+       };
+       DIR *opendir(const char *name);
+       struct dirent *readdir(DIR *dirp);
+       int closedir(DIR *dirp);
+       char *strerror(int errnum);
+]])
+
 function kluautil.kr_https_fetch(url, out_file, ca_file)
        local http_ok, http_request = pcall(require, 'http.request')
        local httptls_ok, http_tls = pcall(require, 'http.tls')
@@ -62,9 +78,31 @@ function kluautil.kr_https_fetch(url, out_file, ca_file)
                return nil, errmsg
        end
 
-       out_file:seek("set", 0)
+       out_file:seek('set', 0)
 
        return true
 end
 
+-- List directory
+function kluautil.list_dir (path)
+       local results = {}
+       local dir = ffi.C.opendir(path)
+       if dir == nil then
+               return results
+       end
+
+       local entry = ffi.C.readdir(dir)
+       while entry ~= nil do
+               local entry_name = ffi.string(entry.d_name)
+               if entry_name ~= '.' and entry_name ~= '..' then
+                       table.insert(results, entry_name)
+               end
+               entry = ffi.C.readdir(dir)
+       end
+
+       ffi.C.closedir(dir)
+
+       return results
+end
+
 return kluautil
index 7fff3d1e5cc1b8f157d85e0f71081506c7dd9cc5..48ac65aa497b57afb2201024b8a220be976a2cd0 100644 (file)
@@ -21,9 +21,12 @@ end
 
 local n_dns_socks, n_control_socks = count_sockets()
 
+-- Check and set control sockets path
+worker.control_path = worker.control_path or (worker.cwd .. '/control/')
+
 -- Bind to control socket by default
 if not C.the_args.interactive and n_control_socks == 0 and not env.KRESD_NO_LISTEN then
-       local path = worker.cwd..'/control/'..worker.pid
+       local path = worker.control_path..worker.pid
        local ok, err = pcall(net.listen, path, nil, { kind = 'control' })
        if not ok then
                warn('bind to '..path..' failed '..err)
index b20068194de3cfd837ea96e3d9addfb255614c40..a02584d22cde266296ab18788aa77a258f9ce8f8 100644 (file)
@@ -642,3 +642,78 @@ else
        worker.coroutine = disabled
        worker.bg_worker = setmetatable({}, { __index = disabled })
 end
+
+-- Global commands for map()
+
+function map(cmd, format)
+       local socket = require('cqueues.socket')
+       local kluautil = require('kluautil')
+       local bit = require("bit")
+       local local_sockets = {}
+       local results = {}
+
+       format = format or 'luaobj'
+       if format ~= 'luaobj' and format ~= 'strings' then
+               warn('warning: Unknown map format. Used "luaobj".')
+               format = 'luaobj'
+       end
+
+       for _,v in pairs(net.list()) do
+               if (v['kind'] == 'control') and (v['transport']['family'] == 'unix') then
+                       table.insert(local_sockets, string.match(v['transport']['path'], '^.*/([^/]+)$'))
+               end
+       end
+
+       local filetab = kluautil.list_dir(worker.control_path)
+       if next(filetab) == nil then
+               local ret = eval_cmd(cmd, true)
+               if ret == nil then
+                       results = {}
+               else
+                       table.insert(results, ret)
+               end
+               return results
+       end
+
+       for _,file in ipairs(filetab) do
+               local local_exec = false
+               for _,lsoc in ipairs(local_sockets) do
+                       if file == lsoc then
+                               local_exec = true
+                       end
+               end
+
+               if local_exec then
+                       table.insert(results, eval_cmd(cmd, true))
+               else
+                       local s = socket.connect({ path = worker.control_path..file })
+                       s:setmode('bn', 'bn')
+                       local status, err = pcall(s.connect, s)
+                       if not status then
+                               print(err)
+                       else
+                               s:write('__binary\n')
+                               recv = s:read(2)
+                               if format == 'luaobj' then
+                                       cmd = 'tojson('..cmd..')'
+                               end
+                               s:write(cmd..'\n')
+                               local recv = s:read(4)
+                               local len = tonumber(recv:byte(1))
+                               for i=2,4 do
+                                       len = bit.bor(bit.lshift(len, 8), tonumber(recv:byte(i)))
+                               end
+                               recv = s:read(len)
+                               if format == 'strings' then
+                                       table.insert(results, recv)
+                               else
+                                       table.insert(results, fromjson(recv))
+                               end
+
+                               s:close()
+                       end
+               end
+       end
+
+       return results
+end
index f464effbd2fe57aa5cef21f8fea4a7a2624d4148..cd83a976dfee2ca5dc3ee41b0f371cf4b4b15eef 100644 (file)
 
 struct args the_args_value;  /** Static allocation for the_args singleton. */
 
-
-/* @internal AF_LOCAL reads may still be interrupted, loop it. */
-static bool ipc_readall(int fd, char *dst, size_t len)
-{
-       while (len > 0) {
-               int rb = read(fd, dst, len);
-               if (rb > 0) {
-                       dst += rb;
-                       len -= rb;
-               } else if (errno != EAGAIN && errno != EINTR) {
-                       return false;
-               }
-       }
-       return true;
-}
-
-static void ipc_activity(uv_poll_t *handle, int status, int events)
-{
-       struct engine *engine = handle->data;
-       if (status != 0) {
-               kr_log_error("[system] ipc: %s\n", uv_strerror(status));
-               return;
-       }
-       /* Get file descriptor from handle */
-       uv_os_fd_t fd = 0;
-       (void) uv_fileno((uv_handle_t *)(handle), &fd);
-       /* Read expression from IPC pipe */
-       uint32_t len = 0;
-       auto_free char *rbuf = NULL;
-       if (!ipc_readall(fd, (char *)&len, sizeof(len))) {
-               goto failure;
-       }
-       if (len < UINT32_MAX) {
-               rbuf = malloc(len + 1);
-       } else {
-               errno = EINVAL;
-       }
-       if (!rbuf) {
-               goto failure;
-       }
-       if (!ipc_readall(fd, rbuf, len)) {
-               goto failure;
-       }
-       rbuf[len] = '\0';
-       /* Run expression */
-       const char *message = "";
-       int ret = engine_ipc(engine, rbuf);
-       if (ret > 0) {
-               message = lua_tostring(engine->L, -1);
-       }
-       /* Clear the Lua stack */
-       lua_settop(engine->L, 0);
-       /* Send response back */
-       len = strlen(message);
-       if (write(fd, &len, sizeof(len)) != sizeof(len) ||
-               write(fd, message, len) != len) {
-               goto failure;
-       }
-       return; /* success! */
-failure:
-       /* Note that if the piped command got read or written partially,
-        * we would get out of sync and only receive rubbish now.
-        * Therefore we prefer to stop IPC, but we try to continue with all else.
-        */
-       kr_log_error("[system] stopping ipc because of: %s\n", strerror(errno));
-       uv_poll_stop(handle);
-       uv_close((uv_handle_t *)handle, (uv_close_cb)free);
-}
-
-static bool ipc_watch(uv_loop_t *loop, struct engine *engine, int fd)
-{
-       uv_poll_t *poller = malloc(sizeof(*poller));
-       if (!poller) {
-               return false;
-       }
-       int ret = uv_poll_init(loop, poller, fd);
-       if (ret != 0) {
-               free(poller);
-               return false;
-       }
-       poller->data = engine;
-       ret = uv_poll_start(poller, UV_READABLE, ipc_activity);
-       if (ret != 0) {
-               free(poller);
-               return false;
-       }
-       /* libuv sets O_NONBLOCK whether we want it or not */
-       (void) fcntl(fd, F_SETFD, fcntl(fd, F_GETFL) & ~O_NONBLOCK);
-       return true;
-}
-
 static void signal_handler(uv_signal_t *handle, int signum)
 {
        uv_stop(uv_default_loop());
@@ -180,15 +89,10 @@ end:
  * Server operation.
  */
 
-static int fork_workers(fd_array_t *ipc_set, int forks)
+static int fork_workers(int forks)
 {
        /* Fork subprocesses if requested */
        while (--forks > 0) {
-               int sv[2] = {-1, -1};
-               if (socketpair(AF_LOCAL, SOCK_STREAM, 0, sv) < 0) {
-                       perror("[system] socketpair");
-                       return kr_error(errno);
-               }
                int pid = fork();
                if (pid < 0) {
                        perror("[system] fork");
@@ -197,16 +101,7 @@ static int fork_workers(fd_array_t *ipc_set, int forks)
 
                /* Forked process */
                if (pid == 0) {
-                       array_clear(*ipc_set);
-                       array_push(*ipc_set, sv[0]);
-                       close(sv[1]);
                        return forks;
-               /* Parent process */
-               } else {
-                       array_push(*ipc_set, sv[1]);
-                       /* Do not share parent-end with other forks. */
-                       (void) fcntl(sv[1], F_SETFD, FD_CLOEXEC);
-                       close(sv[0]);
                }
        }
        return 0;
@@ -234,7 +129,7 @@ static void help(int argc, char *argv[])
 }
 
 /** \return exit code for main()  */
-static int run_worker(uv_loop_t *loop, struct engine *engine, fd_array_t *ipc_set, bool leader, struct args *args)
+static int run_worker(uv_loop_t *loop, struct engine *engine, bool leader, struct args *args)
 {
        /* Only some kinds of stdin work with uv_pipe_t.
         * Otherwise we would abort() from libuv e.g. with </dev/null */
@@ -265,16 +160,6 @@ static int run_worker(uv_loop_t *loop, struct engine *engine, fd_array_t *ipc_se
        } else if (args->control_fd != -1 && uv_pipe_open(pipe, args->control_fd) == 0) {
                uv_listen((uv_stream_t *)pipe, 16, io_tty_accept);
        }
-       /* Watch IPC pipes (or just assign them if leading the pgroup). */
-       if (!leader) {
-               for (size_t i = 0; i < ipc_set->len; ++i) {
-                       if (!ipc_watch(loop, engine, ipc_set->at[i])) {
-                               kr_log_error("[system] failed to create poller: %s\n", strerror(errno));
-                               close(ipc_set->at[i]);
-                       }
-               }
-       }
-       memcpy(&engine->ipc_set, ipc_set, sizeof(*ipc_set));
 
        /* Notify supervisor. */
 #if ENABLE_LIBSYSTEMD
@@ -591,11 +476,8 @@ int main(int argc, char **argv)
                                (long)rlim.rlim_cur);
        }
 
-       /* Connect forks with local socket */
-       fd_array_t ipc_set;
-       array_init(ipc_set);
        /* Fork subprocesses if requested */
-       int fork_id = fork_workers(&ipc_set, the_args->forks);
+       int fork_id = fork_workers(the_args->forks);
        if (fork_id < 0) {
                return EXIT_FAILURE;
        }
@@ -696,7 +578,7 @@ int main(int argc, char **argv)
        }
 
        /* Run the event loop */
-       ret = run_worker(loop, &engine, &ipc_set, fork_id == 0, the_args);
+       ret = run_worker(loop, &engine, fork_id == 0, the_args);
 
 cleanup:/* Cleanup. */
        engine_deinit(&engine);