This change allows map() to work with systemd integration.
As a bonus the new client implementation is based on Lua cqueues
allows caller to wrap map() in worker.corroutine() and get
asynchronous execution/avoid blocking main loop.
Currently socket communication does not employ timeouts so a hang
instance will lead to hang map() call. This does not affect query
processing _if_ map() is being run in worker.corroutine.
Fixes: #554
Fixes: #620
std = 'luajit'
new_read_globals = {
'cache',
+ 'eval_cmd',
'event',
'help',
'_hint_root_file',
--------
- avoid an assert() error in stash_rrset() (!1072)
- fix emergency cache locking bug introduced in 5.1.3 (!1078)
+- migrate map() command to control sockets; fix systemd integration (!1000)
Incompatible changes
--------------------
#include <pwd.h>
#include <sys/param.h>
#include <libzscanner/scanner.h>
+#include <sys/un.h>
#include <lua.h>
#include <lualib.h>
return 1;
}
-/** @internal Throw Lua error if expr is false */
-#define expr_checked(expr) \
- if (!(expr)) { lua_pushboolean(L, false); lua_rawseti(L, -2, lua_objlen(L, -2) + 1); continue; }
-
-static int l_map(lua_State *L)
-{
- /* We don't kr_log_deprecate() here for now. Plan: after --forks gets *removed*,
- * kill internal uses of map() (e.g. from daf module) and add deprecation here.
- * Alternatively we might (attempt to) implement map() in another way. */
- if (lua_gettop(L) != 1 || !lua_isstring(L, 1))
- lua_error_p(L, "map('string with a lua expression')");
-
- const char *cmd = lua_tostring(L, 1);
- uint32_t len = strlen(cmd);
- lua_newtable(L);
-
- /* Execute on leader instance */
- int ntop = lua_gettop(L);
- engine_cmd(L, cmd, true);
- lua_settop(L, ntop + 1); /* Push only one return value to table */
- lua_rawseti(L, -2, 1);
-
- for (size_t i = 0; i < the_worker->engine->ipc_set.len; ++i) {
- int fd = the_worker->engine->ipc_set.at[i];
- /* Send command */
- expr_checked(write(fd, &len, sizeof(len)) == sizeof(len));
- expr_checked(write(fd, cmd, len) == len);
- /* Read response */
- uint32_t rlen = 0;
- if (read(fd, &rlen, sizeof(rlen)) == sizeof(rlen)) {
- expr_checked(rlen < UINT32_MAX);
- auto_free char *rbuf = malloc(rlen + 1);
- expr_checked(rbuf != NULL);
- expr_checked(read(fd, rbuf, rlen) == rlen);
- rbuf[rlen] = '\0';
- /* Unpack from JSON */
- JsonNode *root_node = json_decode(rbuf);
- if (root_node) {
- l_unpack_json(L, root_node);
- } else {
- lua_pushlstring(L, rbuf, rlen);
- }
- json_delete(root_node);
- lua_rawseti(L, -2, lua_objlen(L, -2) + 1);
- continue;
- }
- /* Didn't respond */
- lua_pushboolean(L, false);
- lua_rawseti(L, -2, lua_objlen(L, -2) + 1);
- }
- return 1;
-}
-
-#undef expr_checked
-
-
/*
* Engine API.
*/
lua_setglobal(engine->L, "tojson");
lua_pushcfunction(engine->L, l_fromjson);
lua_setglobal(engine->L, "fromjson");
- lua_pushcfunction(engine->L, l_map);
- lua_setglobal(engine->L, "map");
/* Random number generator */
lua_getfield(engine->L, LUA_GLOBALSINDEX, "math");
lua_getfield(engine->L, -1, "randomseed");
* e.g. the endpoint kind registry to work (inside ->net),
* and this registry deinitization uses the lua state. */
network_close_force(&engine->net);
- for (size_t i = 0; i < engine->ipc_set.len; ++i) {
- close(engine->ipc_set.at[i]);
- }
for (size_t i = 0; i < engine->modules.len; ++i) {
engine_unload(engine, engine->modules.at[i]);
}
/* Free data structures */
array_clear(engine->modules);
array_clear(engine->backends);
- array_clear(engine->ipc_set);
kr_ta_clear(&engine->resolver.trust_anchors);
kr_ta_clear(&engine->resolver.negative_anchors);
free(engine->hostname);
return engine_pcall(L, 2);
}
-int engine_ipc(struct engine *engine, const char *expr)
-{
- if (engine == NULL || engine->L == NULL) {
- return kr_error(ENOEXEC);
- }
-
- /* Run expression and serialize response. */
- engine_cmd(engine->L, expr, true);
- if (lua_gettop(engine->L) > 0) {
- l_tojson(engine->L);
- return 1;
- } else {
- return 0;
- }
-}
-
int engine_load_sandbox(struct engine *engine)
{
/* Init environment */
#include "lib/resolve.h"
#include "daemon/network.h"
-/* @internal Array of file descriptors shorthand. */
-typedef array_t(int) fd_array_t;
-
struct engine {
struct kr_context resolver;
struct network net;
module_array_t modules;
array_t(const struct kr_cdb_api *) backends;
- fd_array_t ipc_set;
knot_mm_t *pool;
char *hostname;
struct lua_State *L;
/** Execute current chunk in the sandbox */
int engine_pcall(struct lua_State *L, int argc);
-int engine_ipc(struct engine *engine, const char *expr);
-
-
int engine_load_sandbox(struct engine *engine);
int engine_loadconf(struct engine *engine, const char *config_path);
warn('environment variable $SYSTEMD_INSTANCE not set')
else
-- Bind to control socket in run_dir
- local path = '@run_dir@/control/'..id
+ worker.control_path = '@run_dir@/control/'
+ local path = worker.control_path..id
local ok, err = pcall(net.listen, path, nil, { kind = 'control' })
if not ok then
warn('bind to '..path..' failed '..err)
-- SPDX-License-Identifier: GPL-3.0-or-later
local cqerrno = require('cqueues.errno')
+local ffi = require('ffi')
local kluautil = {}
-- Get length of table
end
-- Fetch over HTTPS
+ffi.cdef([[
+ typedef struct __dirstream DIR;
+ struct dirent {
+ unsigned long int d_ino;
+ long int d_off;
+ unsigned short d_reclen;
+ unsigned char d_type;
+ char d_name[256];
+ };
+ DIR *opendir(const char *name);
+ struct dirent *readdir(DIR *dirp);
+ int closedir(DIR *dirp);
+ char *strerror(int errnum);
+]])
+
function kluautil.kr_https_fetch(url, out_file, ca_file)
local http_ok, http_request = pcall(require, 'http.request')
local httptls_ok, http_tls = pcall(require, 'http.tls')
return nil, errmsg
end
- out_file:seek("set", 0)
+ out_file:seek('set', 0)
return true
end
+-- List directory
+function kluautil.list_dir (path)
+ local results = {}
+ local dir = ffi.C.opendir(path)
+ if dir == nil then
+ return results
+ end
+
+ local entry = ffi.C.readdir(dir)
+ while entry ~= nil do
+ local entry_name = ffi.string(entry.d_name)
+ if entry_name ~= '.' and entry_name ~= '..' then
+ table.insert(results, entry_name)
+ end
+ entry = ffi.C.readdir(dir)
+ end
+
+ ffi.C.closedir(dir)
+
+ return results
+end
+
return kluautil
local n_dns_socks, n_control_socks = count_sockets()
+-- Check and set control sockets path
+worker.control_path = worker.control_path or (worker.cwd .. '/control/')
+
-- Bind to control socket by default
if not C.the_args.interactive and n_control_socks == 0 and not env.KRESD_NO_LISTEN then
- local path = worker.cwd..'/control/'..worker.pid
+ local path = worker.control_path..worker.pid
local ok, err = pcall(net.listen, path, nil, { kind = 'control' })
if not ok then
warn('bind to '..path..' failed '..err)
worker.coroutine = disabled
worker.bg_worker = setmetatable({}, { __index = disabled })
end
+
+-- Global commands for map()
+
+function map(cmd, format)
+ local socket = require('cqueues.socket')
+ local kluautil = require('kluautil')
+ local bit = require("bit")
+ local local_sockets = {}
+ local results = {}
+
+ format = format or 'luaobj'
+ if format ~= 'luaobj' and format ~= 'strings' then
+ warn('warning: Unknown map format. Used "luaobj".')
+ format = 'luaobj'
+ end
+
+ for _,v in pairs(net.list()) do
+ if (v['kind'] == 'control') and (v['transport']['family'] == 'unix') then
+ table.insert(local_sockets, string.match(v['transport']['path'], '^.*/([^/]+)$'))
+ end
+ end
+
+ local filetab = kluautil.list_dir(worker.control_path)
+ if next(filetab) == nil then
+ local ret = eval_cmd(cmd, true)
+ if ret == nil then
+ results = {}
+ else
+ table.insert(results, ret)
+ end
+ return results
+ end
+
+ for _,file in ipairs(filetab) do
+ local local_exec = false
+ for _,lsoc in ipairs(local_sockets) do
+ if file == lsoc then
+ local_exec = true
+ end
+ end
+
+ if local_exec then
+ table.insert(results, eval_cmd(cmd, true))
+ else
+ local s = socket.connect({ path = worker.control_path..file })
+ s:setmode('bn', 'bn')
+ local status, err = pcall(s.connect, s)
+ if not status then
+ print(err)
+ else
+ s:write('__binary\n')
+ recv = s:read(2)
+ if format == 'luaobj' then
+ cmd = 'tojson('..cmd..')'
+ end
+ s:write(cmd..'\n')
+ local recv = s:read(4)
+ local len = tonumber(recv:byte(1))
+ for i=2,4 do
+ len = bit.bor(bit.lshift(len, 8), tonumber(recv:byte(i)))
+ end
+ recv = s:read(len)
+ if format == 'strings' then
+ table.insert(results, recv)
+ else
+ table.insert(results, fromjson(recv))
+ end
+
+ s:close()
+ end
+ end
+ end
+
+ return results
+end
struct args the_args_value; /** Static allocation for the_args singleton. */
-
-/* @internal AF_LOCAL reads may still be interrupted, loop it. */
-static bool ipc_readall(int fd, char *dst, size_t len)
-{
- while (len > 0) {
- int rb = read(fd, dst, len);
- if (rb > 0) {
- dst += rb;
- len -= rb;
- } else if (errno != EAGAIN && errno != EINTR) {
- return false;
- }
- }
- return true;
-}
-
-static void ipc_activity(uv_poll_t *handle, int status, int events)
-{
- struct engine *engine = handle->data;
- if (status != 0) {
- kr_log_error("[system] ipc: %s\n", uv_strerror(status));
- return;
- }
- /* Get file descriptor from handle */
- uv_os_fd_t fd = 0;
- (void) uv_fileno((uv_handle_t *)(handle), &fd);
- /* Read expression from IPC pipe */
- uint32_t len = 0;
- auto_free char *rbuf = NULL;
- if (!ipc_readall(fd, (char *)&len, sizeof(len))) {
- goto failure;
- }
- if (len < UINT32_MAX) {
- rbuf = malloc(len + 1);
- } else {
- errno = EINVAL;
- }
- if (!rbuf) {
- goto failure;
- }
- if (!ipc_readall(fd, rbuf, len)) {
- goto failure;
- }
- rbuf[len] = '\0';
- /* Run expression */
- const char *message = "";
- int ret = engine_ipc(engine, rbuf);
- if (ret > 0) {
- message = lua_tostring(engine->L, -1);
- }
- /* Clear the Lua stack */
- lua_settop(engine->L, 0);
- /* Send response back */
- len = strlen(message);
- if (write(fd, &len, sizeof(len)) != sizeof(len) ||
- write(fd, message, len) != len) {
- goto failure;
- }
- return; /* success! */
-failure:
- /* Note that if the piped command got read or written partially,
- * we would get out of sync and only receive rubbish now.
- * Therefore we prefer to stop IPC, but we try to continue with all else.
- */
- kr_log_error("[system] stopping ipc because of: %s\n", strerror(errno));
- uv_poll_stop(handle);
- uv_close((uv_handle_t *)handle, (uv_close_cb)free);
-}
-
-static bool ipc_watch(uv_loop_t *loop, struct engine *engine, int fd)
-{
- uv_poll_t *poller = malloc(sizeof(*poller));
- if (!poller) {
- return false;
- }
- int ret = uv_poll_init(loop, poller, fd);
- if (ret != 0) {
- free(poller);
- return false;
- }
- poller->data = engine;
- ret = uv_poll_start(poller, UV_READABLE, ipc_activity);
- if (ret != 0) {
- free(poller);
- return false;
- }
- /* libuv sets O_NONBLOCK whether we want it or not */
- (void) fcntl(fd, F_SETFD, fcntl(fd, F_GETFL) & ~O_NONBLOCK);
- return true;
-}
-
static void signal_handler(uv_signal_t *handle, int signum)
{
uv_stop(uv_default_loop());
* Server operation.
*/
-static int fork_workers(fd_array_t *ipc_set, int forks)
+static int fork_workers(int forks)
{
/* Fork subprocesses if requested */
while (--forks > 0) {
- int sv[2] = {-1, -1};
- if (socketpair(AF_LOCAL, SOCK_STREAM, 0, sv) < 0) {
- perror("[system] socketpair");
- return kr_error(errno);
- }
int pid = fork();
if (pid < 0) {
perror("[system] fork");
/* Forked process */
if (pid == 0) {
- array_clear(*ipc_set);
- array_push(*ipc_set, sv[0]);
- close(sv[1]);
return forks;
- /* Parent process */
- } else {
- array_push(*ipc_set, sv[1]);
- /* Do not share parent-end with other forks. */
- (void) fcntl(sv[1], F_SETFD, FD_CLOEXEC);
- close(sv[0]);
}
}
return 0;
}
/** \return exit code for main() */
-static int run_worker(uv_loop_t *loop, struct engine *engine, fd_array_t *ipc_set, bool leader, struct args *args)
+static int run_worker(uv_loop_t *loop, struct engine *engine, bool leader, struct args *args)
{
/* Only some kinds of stdin work with uv_pipe_t.
* Otherwise we would abort() from libuv e.g. with </dev/null */
} else if (args->control_fd != -1 && uv_pipe_open(pipe, args->control_fd) == 0) {
uv_listen((uv_stream_t *)pipe, 16, io_tty_accept);
}
- /* Watch IPC pipes (or just assign them if leading the pgroup). */
- if (!leader) {
- for (size_t i = 0; i < ipc_set->len; ++i) {
- if (!ipc_watch(loop, engine, ipc_set->at[i])) {
- kr_log_error("[system] failed to create poller: %s\n", strerror(errno));
- close(ipc_set->at[i]);
- }
- }
- }
- memcpy(&engine->ipc_set, ipc_set, sizeof(*ipc_set));
/* Notify supervisor. */
#if ENABLE_LIBSYSTEMD
(long)rlim.rlim_cur);
}
- /* Connect forks with local socket */
- fd_array_t ipc_set;
- array_init(ipc_set);
/* Fork subprocesses if requested */
- int fork_id = fork_workers(&ipc_set, the_args->forks);
+ int fork_id = fork_workers(the_args->forks);
if (fork_id < 0) {
return EXIT_FAILURE;
}
}
/* Run the event loop */
- ret = run_worker(loop, &engine, &ipc_set, fork_id == 0, the_args);
+ ret = run_worker(loop, &engine, fork_id == 0, the_args);
cleanup:/* Cleanup. */
engine_deinit(&engine);