]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon: use sendmmsg towards UDP clients
authorVladimír Čunát <vladimir.cunat@nic.cz>
Fri, 12 Jul 2019 14:22:57 +0000 (16:22 +0200)
committerVladimír Čunát <vladimir.cunat@nic.cz>
Tue, 16 Jul 2019 11:41:10 +0000 (13:41 +0200)
.gitlab-ci.yml
daemon/main.c
daemon/meson.build
daemon/session.h
daemon/udp_queue.c [new file with mode: 0644]
daemon/udp_queue.h [new file with mode: 0644]
daemon/worker.c
daemon/worker.h
meson.build
meson_options.txt

index 04eb7419459a9e09fb67c64e3bd3be9caa9bfe6d..b998a21f94212b9d49b97c1b532a31eef8d5aec8 100644 (file)
@@ -55,7 +55,8 @@ archive:
 build:
   <<: *build
   script:
-    - meson build_ci --default-library=static --prefix=$PREFIX -Dwerror=true -Dextra_tests=enabled
+      # sendmmsg: deckard can't handle that syscall
+    - meson build_ci --default-library=static --prefix=$PREFIX -Dwerror=true -Dextra_tests=enabled -Dsendmmsg=disabled
     - ninja -C build_ci
     - ninja -C build_ci install >/dev/null
     - ${MESON_TEST} --suite unit --suite config
index 9ab8f1019ccb03c013aa1fb6b1498c7338bb360f..0678f7e88dc1633ab9cb6ce46a505f3d48b08c98 100644 (file)
     along with this program.  If not, see <https://www.gnu.org/licenses/>.
  */
 
+#include "kresconfig.h"
+
+#include "contrib/ccan/asprintf/asprintf.h"
+#include "contrib/cleanup.h"
+#include "contrib/ucw/mempool.h"
+#include "daemon/engine.h"
+#include "daemon/io.h"
+#include "daemon/network.h"
+#include "daemon/tls.h"
+#include "daemon/udp_queue.h"
+#include "daemon/worker.h"
+#include "lib/defines.h"
+#include "lib/dnssec.h"
+#include "lib/dnssec/ta.h"
+#include "lib/resolve.h"
+
 #include <arpa/inet.h>
 #include <assert.h>
 #include <getopt.h>
@@ -23,8 +39,6 @@
 #include <string.h>
 #include <unistd.h>
 
-#include "kresconfig.h"
-
 #include <lua.h>
 #include <uv.h>
 #if SYSTEMD_VERSION > 0
 #endif
 #include <libknot/error.h>
 
-#include <contrib/cleanup.h>
-#include <contrib/ucw/mempool.h>
-#include <contrib/ccan/asprintf/asprintf.h>
-#include "lib/defines.h"
-#include "lib/resolve.h"
-#include "lib/dnssec.h"
-#include "daemon/io.h"
-#include "daemon/network.h"
-#include "daemon/worker.h"
-#include "daemon/engine.h"
-#include "daemon/tls.h"
-#include "lib/dnssec/ta.h"
 
 /* @internal Array of ip address shorthand. */
 typedef array_t(char*) addr_array_t;
@@ -809,6 +811,14 @@ int main(int argc, char **argv)
                goto cleanup;
        }
 
+       ret = udp_queue_init_global(loop);
+       if (ret) {
+               kr_log_error("[system] failed to initialize UDP queue: %s\n",
+                               kr_strerror(ret));
+               ret = EXIT_FAILURE;
+               goto cleanup;
+       }
+
        /* Start the scripting engine */
        if (engine_load_sandbox(&engine) != 0) {
                ret = EXIT_FAILURE;
index 4b9805275e2878c1849eb63b547829cddac7e71a..a7db4d361562c00e9c8260fad79c9e54f1544368 100644 (file)
@@ -16,6 +16,7 @@ kresd_src = files([
   'tls.c',
   'tls_ephemeral_credentials.c',
   'tls_session_ticket-srv.c',
+  'udp_queue.c',
   'worker.c',
   'zimport.c',
 ])
index 56f7eb4aa4c45381eea93e3c31f55f6f023fe661..7b261a4c19a1f15faf6578a9405ad421d4db92c0 100644 (file)
@@ -16,6 +16,8 @@
 
 #pragma once
 
+#include <libknot/packet/pkt.h>
+
 #include <stdbool.h>
 #include <uv.h>
 
diff --git a/daemon/udp_queue.c b/daemon/udp_queue.c
new file mode 100644 (file)
index 0000000..6d1fec0
--- /dev/null
@@ -0,0 +1,163 @@
+/*  Copyright (C) 2019 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+
+    This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation, either version 3 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program.  If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "kresconfig.h"
+#include "daemon/udp_queue.h"
+
+#if !ENABLE_SENDMMSG
+int udp_queue_init_global(uv_loop_t *loop)
+{
+       return 0;
+}
+#else
+
+#include "daemon/session.h"
+#include "daemon/worker.h"
+#include "lib/generic/array.h"
+#include "lib/utils.h"
+
+struct qr_task;
+
+#include <assert.h>
+#include <stdlib.h>
+#include <sys/socket.h>
+
+/* LATER: it might be useful to have this configurable during runtime,
+ * but the structures below would have to change a little (broken up). */
+#define UDP_QUEUE_LEN 64
+
+/** A queue of up to UDP_QUEUE_LEN messages, meant for the same socket. */
+typedef struct {
+       int len; /**< The number of messages in the queue: 0..UDP_QUEUE_LEN */
+       struct mmsghdr msgvec[UDP_QUEUE_LEN]; /**< Parameter for sendmmsg() */
+       struct {
+               struct qr_task *task; /**< Links for completion callbacks. */
+               struct iovec msg_iov[1]; /**< storage for .msgvec[i].msg_iov */
+       } items[UDP_QUEUE_LEN];
+} udp_queue_t;
+
+static udp_queue_t * udp_queue_create()
+{
+       udp_queue_t *q = calloc(1, sizeof(*q));
+       for (int i = 0; i < UDP_QUEUE_LEN; ++i) {
+               struct msghdr *mhi = &q->msgvec[i].msg_hdr;
+               /* These shall remain always the same. */
+               mhi->msg_iov = q->items[i].msg_iov;
+               mhi->msg_iovlen = 1;
+               /* msg_name and msg_namelen will be per-call,
+                * and the rest is OK to remain zeroed all the time. */
+       }
+       return q;
+}
+
+/** Global state for udp_queue_*.  Note: we never free the pointed-to memory. */
+struct {
+       /** Singleton map: fd -> udp_queue_t, as a simple array of pointers. */
+       udp_queue_t **udp_queues;
+       int udp_queues_len;
+
+       /** List of FD numbers that might have a non-empty queue. */
+       array_t(int) waiting_fds;
+
+       uv_check_t check_handle;
+} static state = {0};
+
+/** Empty the given queue.  The queue is assumed to exist (but may be empty). */
+static void udp_queue_send(int fd)
+{
+       udp_queue_t *const q = state.udp_queues[fd];
+       if (!q->len) return;
+       int sent_len = sendmmsg(fd, q->msgvec, q->len, 0);
+       /* ATM we don't really do anything about failures. */
+       int err = sent_len < 0 ? errno : EAGAIN /* unknown error, really */;
+       if (unlikely(sent_len != q->len)) {
+               if (err != EWOULDBLOCK) {
+                       kr_log_error("ERROR: udp sendmmsg() sent %d / %d; %s\n",
+                                       sent_len, q->len, strerror(err));
+               } else {
+                       const uint64_t stamp_now = kr_now();
+                       static uint64_t stamp_last = 0;
+                       if (stamp_now > stamp_last + 60*1000) {
+                               kr_log_info("WARNING: dropped UDP reply packet(s) due to network overload (reported at most once per minute)\n");
+                               stamp_last = stamp_now;
+                       }
+               }
+       }
+       for (int i = 0; i < q->len; ++i) {
+               qr_task_on_send(q->items[i].task, NULL, i < sent_len ? 0 : err);
+       }
+       q->len = 0;
+}
+
+/** Periodical callback to send all queued packets. */
+static void udp_queue_check(uv_check_t *handle)
+{
+       for (int i = 0; i < state.waiting_fds.len; ++i) {
+               udp_queue_send(state.waiting_fds.at[i]);
+       }
+       state.waiting_fds.len = 0;
+}
+
+int udp_queue_init_global(uv_loop_t *loop)
+{
+       int ret = uv_check_init(loop, &state.check_handle);
+       if (!ret) ret = uv_check_start(&state.check_handle, udp_queue_check);
+       return ret;
+}
+
+void udp_queue_push(int fd, struct kr_request *req, struct qr_task *task)
+{
+       if (fd < 0) {
+               kr_log_error("ERROR: called udp_queue_push(fd = %d, ...)\n", fd);
+               abort();
+       }
+       /* Get a valid correct queue. */
+       if (fd >= state.udp_queues_len) {
+               const int new_len = fd + 1;
+               state.udp_queues = realloc(state.udp_queues,
+                                       sizeof(state.udp_queues[0]) * new_len);
+               if (!state.udp_queues) abort();
+               memset(state.udp_queues + state.udp_queues_len, 0,
+                       sizeof(state.udp_queues[0]) * (new_len - state.udp_queues_len));
+               state.udp_queues_len = new_len;
+       }
+       if (unlikely(state.udp_queues[fd] == NULL))
+               state.udp_queues[fd] = udp_queue_create();
+       udp_queue_t *const q = state.udp_queues[fd];
+
+       /* Append to the queue */
+       struct sockaddr *sa = (struct sockaddr *)/*const-cast*/req->qsource.addr;
+       q->msgvec[q->len].msg_hdr.msg_name = sa;
+       q->msgvec[q->len].msg_hdr.msg_namelen = kr_sockaddr_len(sa);
+       q->items[q->len].task = task;
+       q->items[q->len].msg_iov[0] = (struct iovec){
+               .iov_base = req->answer->wire,
+               .iov_len  = req->answer->size,
+       };
+       if (q->len == 0)
+               array_push(state.waiting_fds, fd);
+       ++(q->len);
+
+       if (q->len >= UDP_QUEUE_LEN) {
+               assert(q->len == UDP_QUEUE_LEN);
+               udp_queue_send(fd);
+               /* We don't need to search state.waiting_fds;
+                * anyway, it's more efficient to let the hook do that. */
+       }
+}
+
+#endif
+
diff --git a/daemon/udp_queue.h b/daemon/udp_queue.h
new file mode 100644 (file)
index 0000000..c1730c0
--- /dev/null
@@ -0,0 +1,28 @@
+/*  Copyright (C) 2019 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+
+    This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation, either version 3 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program.  If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include <uv.h>
+struct kr_request;
+struct qr_task;
+
+/** Initialize the global state for udp_queue. */
+int udp_queue_init_global(uv_loop_t *loop);
+
+/** Send req->answer via UDP, possibly not immediately. */
+void udp_queue_push(int fd, struct kr_request *req, struct qr_task *task);
+
index 313fe86d155666d9a748d5773013896f98c0bfc5..0cbbcc7c06740b70471a06457f48f4678e68d48a 100644 (file)
@@ -14,6 +14,9 @@
     along with this program.  If not, see <https://www.gnu.org/licenses/>.
  */
 
+#include "kresconfig.h"
+#include "daemon/worker.h"
+
 #include <uv.h>
 #include <lua.h>
 #include <lauxlib.h>
 #include <sys/types.h>
 #include <unistd.h>
 #include <gnutls/gnutls.h>
-#include "lib/utils.h"
-#include "lib/layer.h"
-#include "daemon/worker.h"
+
 #include "daemon/bindings/api.h"
 #include "daemon/engine.h"
 #include "daemon/io.h"
+#include "daemon/session.h"
 #include "daemon/tls.h"
+#include "daemon/udp_queue.h"
 #include "daemon/zimport.h"
-#include "daemon/session.h"
+#include "lib/layer.h"
+#include "lib/utils.h"
 
 
 /* Magic defaults for the worker. */
@@ -510,7 +514,7 @@ static void qr_task_complete(struct qr_task *task)
 }
 
 /* This is called when we send subrequest / answer */
-static int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status)
+int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status)
 {
 
        if (task->finished) {
@@ -1180,10 +1184,26 @@ static int qr_task_finalize(struct qr_task *task, int state)
        /* Send back answer */
        assert(!session_flags(source_session)->closing);
        assert(ctx->source.addr.ip.sa_family != AF_UNSPEC);
-       int res = qr_task_send(task, source_session,
+
+       int ret;
+       const uv_handle_t *src_handle = session_get_handle(source_session);
+       if (src_handle->type != UV_UDP && src_handle->type != UV_TCP) {
+               assert(false);
+               ret = kr_error(EINVAL);
+       } else if (src_handle->type == UV_UDP && ENABLE_SENDMMSG) {
+               /* TODO: this is an ugly way of getting the FD number, as we're
+                * touching a private field of UV.  We might want to e.g. pass
+                * a pointer to struct endpoint in kr_request::qsource. */
+               const int fd = ((const uv_udp_t *)src_handle)->io_watcher.fd;
+               udp_queue_push(fd, &ctx->req, task);
+               ret = 0;
+       } else {
+               ret = qr_task_send(task, source_session,
                               (struct sockaddr *)&ctx->source.addr,
                                ctx->req.answer);
-       if (res != kr_ok()) {
+       }
+
+       if (ret != kr_ok()) {
                (void) qr_task_on_send(task, NULL, kr_error(EIO));
                /* Since source session is erroneous detach all tasks. */
                while (!session_tasklist_is_empty(source_session)) {
index 4e823c2c075be467c19a64832b1495480109f82e..7b1a84c08ca3a5884795e9fdab1b112411b71d03 100644 (file)
@@ -118,6 +118,8 @@ uint64_t worker_task_creation_time(struct qr_task *task);
 void worker_task_subreq_finalize(struct qr_task *task);
 bool worker_task_finished(struct qr_task *task);
 
+/** To be called after sending a DNS message.  It mainly deals with cleanups. */
+int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status);
 
 /** Various worker statistics.  Sync with wrk_stats() */
 struct worker_stats {
index 06868beaa3d47a53f41e62085b967149da48ed4e..f0ca01b1ad44390faecb6c87dfd750c50f39b66d 100644 (file)
@@ -83,6 +83,17 @@ verbose_log = get_option('verbose_log') == 'enabled' or get_option('verbose_log'
 user = get_option('user')
 group = get_option('group')
 
+## sendmmsg
+has_sendmmsg = meson.get_compiler('c').has_function('sendmmsg',
+  prefix: '#define _GNU_SOURCE\n#include <sys/socket.h>')
+if get_option('sendmmsg') == 'enabled' and not has_sendmmsg
+  error('missing compiler function: sendmmsg(), use -Dsendmmsg=disabled')
+elif get_option('sendmmsg') == 'auto'
+  sendmmsg = has_sendmmsg
+else
+  sendmmsg = get_option('sendmmsg') == 'enabled'
+endif
+
 ## Systemd
 message('--- systemd socket activation ---')
 libsystemd = dependency('libsystemd', required: false)
@@ -137,6 +148,7 @@ conf_data.set_quoted('libknot_SONAME',
 conf_data.set('SYSTEMD_VERSION',
   libsystemd.found() ? libsystemd.version().to_int() : -1)
 conf_data.set('NOVERBOSELOG', not verbose_log)
+conf_data.set('ENABLE_SENDMMSG', sendmmsg.to_int())
 
 kresconfig = configure_file(
   output: 'kresconfig.h',
@@ -249,6 +261,7 @@ s_build_unit_tests = build_unit_tests ? 'enabled' : 'disabled'
 s_build_config_tests = build_config_tests ? 'enabled' : 'disabled'
 s_build_extra_tests = build_extra_tests ? 'enabled' : 'disabled'
 s_install_kresd_conf = install_kresd_conf ? 'enabled' : 'disabled'
+s_sendmmsg = sendmmsg ? 'enabled': 'disabled'
 message('''
 
 ======================= SUMMARY =======================
@@ -281,6 +294,7 @@ message('''
     user:               @0@'''.format(user) + '''
     group:              @0@'''.format(group) + '''
     install_kresd_conf: @0@'''.format(s_install_kresd_conf) + '''
+    sendmmsg:           @0@'''.format(s_sendmmsg) + '''
 
 =======================================================
 
index bdb6fdf9dfa8434d61dc724b8efa19b43321e503..e48cf67a31feaf2fad8988dc549c9929331b3c9d 100644 (file)
@@ -63,6 +63,18 @@ option(
   description: 'group which is used for running kresd',
 )
 
+option(
+  'sendmmsg',
+  type: 'combo',
+  choices: [
+    'auto',
+    'enabled',
+    'disabled',
+  ],
+  value: 'auto',
+  description: 'use sendmmsg syscall towards clients',
+)
+
 ## Systemd
 option(
   'systemd_files',