build:
<<: *build
script:
- - meson build_ci --default-library=static --prefix=$PREFIX -Dwerror=true -Dextra_tests=enabled
+ # sendmmsg: deckard can't handle that syscall
+ - meson build_ci --default-library=static --prefix=$PREFIX -Dwerror=true -Dextra_tests=enabled -Dsendmmsg=disabled
- ninja -C build_ci
- ninja -C build_ci install >/dev/null
- ${MESON_TEST} --suite unit --suite config
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
+#include "kresconfig.h"
+
+#include "contrib/ccan/asprintf/asprintf.h"
+#include "contrib/cleanup.h"
+#include "contrib/ucw/mempool.h"
+#include "daemon/engine.h"
+#include "daemon/io.h"
+#include "daemon/network.h"
+#include "daemon/tls.h"
+#include "daemon/udp_queue.h"
+#include "daemon/worker.h"
+#include "lib/defines.h"
+#include "lib/dnssec.h"
+#include "lib/dnssec/ta.h"
+#include "lib/resolve.h"
+
#include <arpa/inet.h>
#include <assert.h>
#include <getopt.h>
#include <string.h>
#include <unistd.h>
-#include "kresconfig.h"
-
#include <lua.h>
#include <uv.h>
#if SYSTEMD_VERSION > 0
#endif
#include <libknot/error.h>
-#include <contrib/cleanup.h>
-#include <contrib/ucw/mempool.h>
-#include <contrib/ccan/asprintf/asprintf.h>
-#include "lib/defines.h"
-#include "lib/resolve.h"
-#include "lib/dnssec.h"
-#include "daemon/io.h"
-#include "daemon/network.h"
-#include "daemon/worker.h"
-#include "daemon/engine.h"
-#include "daemon/tls.h"
-#include "lib/dnssec/ta.h"
/* @internal Array of ip address shorthand. */
typedef array_t(char*) addr_array_t;
goto cleanup;
}
+ ret = udp_queue_init_global(loop);
+ if (ret) {
+ kr_log_error("[system] failed to initialize UDP queue: %s\n",
+ kr_strerror(ret));
+ ret = EXIT_FAILURE;
+ goto cleanup;
+ }
+
/* Start the scripting engine */
if (engine_load_sandbox(&engine) != 0) {
ret = EXIT_FAILURE;
'tls.c',
'tls_ephemeral_credentials.c',
'tls_session_ticket-srv.c',
+ 'udp_queue.c',
'worker.c',
'zimport.c',
])
#pragma once
+#include <libknot/packet/pkt.h>
+
#include <stdbool.h>
#include <uv.h>
--- /dev/null
+/* Copyright (C) 2019 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "kresconfig.h"
+#include "daemon/udp_queue.h"
+
+#if !ENABLE_SENDMMSG
+int udp_queue_init_global(uv_loop_t *loop)
+{
+ return 0;
+}
+#else
+
+#include "daemon/session.h"
+#include "daemon/worker.h"
+#include "lib/generic/array.h"
+#include "lib/utils.h"
+
+struct qr_task;
+
+#include <assert.h>
+#include <stdlib.h>
+#include <sys/socket.h>
+
+/* LATER: it might be useful to have this configurable during runtime,
+ * but the structures below would have to change a little (broken up). */
+#define UDP_QUEUE_LEN 64
+
+/** A queue of up to UDP_QUEUE_LEN messages, meant for the same socket. */
+typedef struct {
+ int len; /**< The number of messages in the queue: 0..UDP_QUEUE_LEN */
+ struct mmsghdr msgvec[UDP_QUEUE_LEN]; /**< Parameter for sendmmsg() */
+ struct {
+ struct qr_task *task; /**< Links for completion callbacks. */
+ struct iovec msg_iov[1]; /**< storage for .msgvec[i].msg_iov */
+ } items[UDP_QUEUE_LEN];
+} udp_queue_t;
+
+static udp_queue_t * udp_queue_create()
+{
+ udp_queue_t *q = calloc(1, sizeof(*q));
+ for (int i = 0; i < UDP_QUEUE_LEN; ++i) {
+ struct msghdr *mhi = &q->msgvec[i].msg_hdr;
+ /* These shall remain always the same. */
+ mhi->msg_iov = q->items[i].msg_iov;
+ mhi->msg_iovlen = 1;
+ /* msg_name and msg_namelen will be per-call,
+ * and the rest is OK to remain zeroed all the time. */
+ }
+ return q;
+}
+
+/** Global state for udp_queue_*. Note: we never free the pointed-to memory. */
+struct {
+ /** Singleton map: fd -> udp_queue_t, as a simple array of pointers. */
+ udp_queue_t **udp_queues;
+ int udp_queues_len;
+
+ /** List of FD numbers that might have a non-empty queue. */
+ array_t(int) waiting_fds;
+
+ uv_check_t check_handle;
+} static state = {0};
+
+/** Empty the given queue. The queue is assumed to exist (but may be empty). */
+static void udp_queue_send(int fd)
+{
+ udp_queue_t *const q = state.udp_queues[fd];
+ if (!q->len) return;
+ int sent_len = sendmmsg(fd, q->msgvec, q->len, 0);
+ /* ATM we don't really do anything about failures. */
+ int err = sent_len < 0 ? errno : EAGAIN /* unknown error, really */;
+ if (unlikely(sent_len != q->len)) {
+ if (err != EWOULDBLOCK) {
+ kr_log_error("ERROR: udp sendmmsg() sent %d / %d; %s\n",
+ sent_len, q->len, strerror(err));
+ } else {
+ const uint64_t stamp_now = kr_now();
+ static uint64_t stamp_last = 0;
+ if (stamp_now > stamp_last + 60*1000) {
+ kr_log_info("WARNING: dropped UDP reply packet(s) due to network overload (reported at most once per minute)\n");
+ stamp_last = stamp_now;
+ }
+ }
+ }
+ for (int i = 0; i < q->len; ++i) {
+ qr_task_on_send(q->items[i].task, NULL, i < sent_len ? 0 : err);
+ }
+ q->len = 0;
+}
+
+/** Periodical callback to send all queued packets. */
+static void udp_queue_check(uv_check_t *handle)
+{
+ for (int i = 0; i < state.waiting_fds.len; ++i) {
+ udp_queue_send(state.waiting_fds.at[i]);
+ }
+ state.waiting_fds.len = 0;
+}
+
+int udp_queue_init_global(uv_loop_t *loop)
+{
+ int ret = uv_check_init(loop, &state.check_handle);
+ if (!ret) ret = uv_check_start(&state.check_handle, udp_queue_check);
+ return ret;
+}
+
+void udp_queue_push(int fd, struct kr_request *req, struct qr_task *task)
+{
+ if (fd < 0) {
+ kr_log_error("ERROR: called udp_queue_push(fd = %d, ...)\n", fd);
+ abort();
+ }
+ /* Get a valid correct queue. */
+ if (fd >= state.udp_queues_len) {
+ const int new_len = fd + 1;
+ state.udp_queues = realloc(state.udp_queues,
+ sizeof(state.udp_queues[0]) * new_len);
+ if (!state.udp_queues) abort();
+ memset(state.udp_queues + state.udp_queues_len, 0,
+ sizeof(state.udp_queues[0]) * (new_len - state.udp_queues_len));
+ state.udp_queues_len = new_len;
+ }
+ if (unlikely(state.udp_queues[fd] == NULL))
+ state.udp_queues[fd] = udp_queue_create();
+ udp_queue_t *const q = state.udp_queues[fd];
+
+ /* Append to the queue */
+ struct sockaddr *sa = (struct sockaddr *)/*const-cast*/req->qsource.addr;
+ q->msgvec[q->len].msg_hdr.msg_name = sa;
+ q->msgvec[q->len].msg_hdr.msg_namelen = kr_sockaddr_len(sa);
+ q->items[q->len].task = task;
+ q->items[q->len].msg_iov[0] = (struct iovec){
+ .iov_base = req->answer->wire,
+ .iov_len = req->answer->size,
+ };
+ if (q->len == 0)
+ array_push(state.waiting_fds, fd);
+ ++(q->len);
+
+ if (q->len >= UDP_QUEUE_LEN) {
+ assert(q->len == UDP_QUEUE_LEN);
+ udp_queue_send(fd);
+ /* We don't need to search state.waiting_fds;
+ * anyway, it's more efficient to let the hook do that. */
+ }
+}
+
+#endif
+
--- /dev/null
+/* Copyright (C) 2019 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include <uv.h>
+struct kr_request;
+struct qr_task;
+
+/** Initialize the global state for udp_queue. */
+int udp_queue_init_global(uv_loop_t *loop);
+
+/** Send req->answer via UDP, possibly not immediately. */
+void udp_queue_push(int fd, struct kr_request *req, struct qr_task *task);
+
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
+#include "kresconfig.h"
+#include "daemon/worker.h"
+
#include <uv.h>
#include <lua.h>
#include <lauxlib.h>
#include <sys/types.h>
#include <unistd.h>
#include <gnutls/gnutls.h>
-#include "lib/utils.h"
-#include "lib/layer.h"
-#include "daemon/worker.h"
+
#include "daemon/bindings/api.h"
#include "daemon/engine.h"
#include "daemon/io.h"
+#include "daemon/session.h"
#include "daemon/tls.h"
+#include "daemon/udp_queue.h"
#include "daemon/zimport.h"
-#include "daemon/session.h"
+#include "lib/layer.h"
+#include "lib/utils.h"
/* Magic defaults for the worker. */
}
/* This is called when we send subrequest / answer */
-static int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status)
+int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status)
{
if (task->finished) {
/* Send back answer */
assert(!session_flags(source_session)->closing);
assert(ctx->source.addr.ip.sa_family != AF_UNSPEC);
- int res = qr_task_send(task, source_session,
+
+ int ret;
+ const uv_handle_t *src_handle = session_get_handle(source_session);
+ if (src_handle->type != UV_UDP && src_handle->type != UV_TCP) {
+ assert(false);
+ ret = kr_error(EINVAL);
+ } else if (src_handle->type == UV_UDP && ENABLE_SENDMMSG) {
+ /* TODO: this is an ugly way of getting the FD number, as we're
+ * touching a private field of UV. We might want to e.g. pass
+ * a pointer to struct endpoint in kr_request::qsource. */
+ const int fd = ((const uv_udp_t *)src_handle)->io_watcher.fd;
+ udp_queue_push(fd, &ctx->req, task);
+ ret = 0;
+ } else {
+ ret = qr_task_send(task, source_session,
(struct sockaddr *)&ctx->source.addr,
ctx->req.answer);
- if (res != kr_ok()) {
+ }
+
+ if (ret != kr_ok()) {
(void) qr_task_on_send(task, NULL, kr_error(EIO));
/* Since source session is erroneous detach all tasks. */
while (!session_tasklist_is_empty(source_session)) {
void worker_task_subreq_finalize(struct qr_task *task);
bool worker_task_finished(struct qr_task *task);
+/** To be called after sending a DNS message. It mainly deals with cleanups. */
+int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status);
/** Various worker statistics. Sync with wrk_stats() */
struct worker_stats {
user = get_option('user')
group = get_option('group')
+## sendmmsg
+has_sendmmsg = meson.get_compiler('c').has_function('sendmmsg',
+ prefix: '#define _GNU_SOURCE\n#include <sys/socket.h>')
+if get_option('sendmmsg') == 'enabled' and not has_sendmmsg
+ error('missing compiler function: sendmmsg(), use -Dsendmmsg=disabled')
+elif get_option('sendmmsg') == 'auto'
+ sendmmsg = has_sendmmsg
+else
+ sendmmsg = get_option('sendmmsg') == 'enabled'
+endif
+
## Systemd
message('--- systemd socket activation ---')
libsystemd = dependency('libsystemd', required: false)
conf_data.set('SYSTEMD_VERSION',
libsystemd.found() ? libsystemd.version().to_int() : -1)
conf_data.set('NOVERBOSELOG', not verbose_log)
+conf_data.set('ENABLE_SENDMMSG', sendmmsg.to_int())
kresconfig = configure_file(
output: 'kresconfig.h',
s_build_config_tests = build_config_tests ? 'enabled' : 'disabled'
s_build_extra_tests = build_extra_tests ? 'enabled' : 'disabled'
s_install_kresd_conf = install_kresd_conf ? 'enabled' : 'disabled'
+s_sendmmsg = sendmmsg ? 'enabled': 'disabled'
message('''
======================= SUMMARY =======================
user: @0@'''.format(user) + '''
group: @0@'''.format(group) + '''
install_kresd_conf: @0@'''.format(s_install_kresd_conf) + '''
+ sendmmsg: @0@'''.format(s_sendmmsg) + '''
=======================================================
description: 'group which is used for running kresd',
)
+option(
+ 'sendmmsg',
+ type: 'combo',
+ choices: [
+ 'auto',
+ 'enabled',
+ 'disabled',
+ ],
+ value: 'auto',
+ description: 'use sendmmsg syscall towards clients',
+)
+
## Systemd
option(
'systemd_files',