]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
XDP: add backend parts
authorVladimír Čunát <vladimir.cunat@nic.cz>
Wed, 21 Oct 2020 15:25:18 +0000 (17:25 +0200)
committerVladimír Čunát <vladimir.cunat@nic.cz>
Tue, 10 Nov 2020 16:16:46 +0000 (17:16 +0100)
Logging strings: I originally wanted to have four chars inside [],
but it doesn't really matter in these cases where logs don't happen
within a request, so "[xdp]" won due to uniformity and simplicity.

14 files changed:
daemon/io.c
daemon/io.h
daemon/lua/kres-gen.lua
daemon/lua/kres-gen.sh
daemon/network.h
daemon/session.c
daemon/session.h
daemon/worker.c
daemon/worker.h
lib/resolve.c
lib/resolve.h
meson.build
modules/stats/README.rst
modules/stats/stats.c

index ff732c5de38bb6bd5289ba8b77adac02a24edb7a..f02e6568de4179f4d57a813333198cca409722c5 100644 (file)
@@ -2,13 +2,19 @@
  *  SPDX-License-Identifier: GPL-3.0-or-later
  */
 
-#include <string.h>
-#include <libknot/errcode.h>
+#include "daemon/io.h"
+
+#include <assert.h>
 #include <contrib/ucw/lib.h>
 #include <contrib/ucw/mempool.h>
-#include <assert.h>
+#include <libknot/errcode.h>
+#include <string.h>
+#include <sys/resource.h>
+
+#if ENABLE_XDP
+       #include <libknot/xdp/xdp.h>
+#endif
 
-#include "daemon/io.h"
 #include "daemon/network.h"
 #include "daemon/worker.h"
 #include "daemon/tls.h"
@@ -790,6 +796,103 @@ int io_listen_pipe(uv_loop_t *loop, uv_pipe_t *handle, int fd)
        return 0;
 }
 
+#if ENABLE_XDP
+static void xdp_rx(uv_poll_t* handle, int status, int events)
+{
+       const int XDP_RX_BATCH_SIZE = 64;
+       if (status < 0) {
+               kr_log_error("[xdp] poll status %d: %s\n", status, uv_strerror(status));
+               return;
+       }
+       if (events != UV_READABLE) {
+               kr_log_error("[xdp] poll unexpected events: %d\n", events);
+               return;
+       }
+
+       xdp_handle_data_t *xhd = handle->data;
+       assert(xhd && xhd->session && xhd->socket);
+       uint32_t rcvd;
+       knot_xdp_msg_t msgs[XDP_RX_BATCH_SIZE];
+       int ret = knot_xdp_recv(xhd->socket, msgs, XDP_RX_BATCH_SIZE, &rcvd);
+       if (ret == KNOT_EOK) {
+               kr_log_verbose("[xdp] poll triggered, processing a batch of %d packets\n",
+                       (int)rcvd);
+       } else {
+               kr_log_error("[xdp] knot_xdp_recv(): %d, %s\n", ret, knot_strerror(ret));
+               assert(false); // ATM it can only be returned when called incorrectly
+               return;
+       }
+       assert(rcvd <= XDP_RX_BATCH_SIZE);
+       for (int i = 0; i < rcvd; ++i) {
+               const knot_xdp_msg_t *msg = &msgs[i];
+               assert(msg->payload.iov_len <= KNOT_WIRE_MAX_PKTSIZE);
+               knot_pkt_t *kpkt = knot_pkt_new(msg->payload.iov_base, msg->payload.iov_len,
+                                               &the_worker->pkt_pool);
+               if (kpkt == NULL) {
+                       ret = kr_error(ENOMEM);
+               } else {
+                       ret = worker_submit(xhd->session,
+                                       (const struct sockaddr *)&msg->ip_from,
+                                       (const struct sockaddr *)&msg->ip_to,
+                                       msg->eth_from, msg->eth_to, kpkt);
+               }
+               if (ret)
+                       kr_log_verbose("[xdp] worker_submit() == %d: %s\n", ret, kr_strerror(ret));
+               mp_flush(the_worker->pkt_pool.ctx);
+       }
+       knot_xdp_recv_finish(xhd->socket, msgs, rcvd);
+}
+int io_listen_xdp(uv_loop_t *loop, struct endpoint *ep, const char *ifname)
+{
+       if (!ep || !ep->handle) {
+               return kr_error(EINVAL);
+       }
+
+       // RLIMIT_MEMLOCK often needs raising when operating on BPF
+       static int ret_limit = 1;
+       if (ret_limit == 1) {
+               struct rlimit no_limit = { RLIM_INFINITY, RLIM_INFINITY };
+               ret_limit = setrlimit(RLIMIT_MEMLOCK, &no_limit)
+                       ? kr_error(errno) : 0;
+       }
+       if (ret_limit) return ret_limit;
+
+       xdp_handle_data_t *xhd = malloc(sizeof(*xhd));
+       if (!xhd) return kr_error(ENOMEM);
+
+       const int port = ep->port ? ep->port : KNOT_XDP_LISTEN_PORT_ALL;
+       xhd->socket = NULL; // needed for some reason
+       int ret = knot_xdp_init(&xhd->socket, ifname, ep->nic_queue, port,
+                               KNOT_XDP_LOAD_BPF_MAYBE);
+
+       if (!ret) ret = uv_idle_init(loop, &xhd->tx_waker);
+       if (ret) {
+               free(xhd);
+               return kr_error(ret);
+       }
+       assert(xhd->socket);
+       xhd->tx_waker.data = xhd->socket;
+
+       ep->fd = knot_xdp_socket_fd(xhd->socket); // probably not useful
+       ret = uv_poll_init(loop, (uv_poll_t *)ep->handle, ep->fd);
+       if (ret) {
+               knot_xdp_deinit(xhd->socket);
+               free(xhd);
+               return kr_error(ret);
+       }
+
+       // beware: this sets poll_handle->data
+       xhd->session = session_new(ep->handle, false, false);
+       assert(!session_flags(xhd->session)->outgoing);
+       session_get_sockname(xhd->session)->sa_family = AF_XDP; // to have something in there
+
+       ep->handle->data = xhd;
+       ret = uv_poll_start((uv_poll_t *)ep->handle, UV_READABLE, xdp_rx);
+       return ret;
+}
+#endif
+
+
 int io_create(uv_loop_t *loop, uv_handle_t *handle, int type, unsigned family, bool has_tls, bool has_http)
 {
        int ret = -1;
@@ -811,12 +914,22 @@ int io_create(uv_loop_t *loop, uv_handle_t *handle, int type, unsigned family, b
 
 void io_deinit(uv_handle_t *handle)
 {
-       if (!handle) {
+       if (!handle || !handle->data) {
                return;
        }
-       if (handle->data) {
+       if (handle->type != UV_POLL) {
                session_free(handle->data);
-               handle->data = NULL;
+       } else {
+       #if ENABLE_XDP
+               xdp_handle_data_t *xhd = handle->data;
+               uv_idle_stop(&xhd->tx_waker);
+               uv_close((uv_handle_t *)&xhd->tx_waker, NULL);
+               session_free(xhd->session);
+               knot_xdp_deinit(xhd->socket);
+               free(xhd);
+       #else
+               assert(false);
+       #endif
        }
 }
 
index 541cd2ae69de3f62ac1de3cc7531157256dc7d8b..c83eabd623f26686be23ee6cc3375e4684cd570d 100644 (file)
@@ -24,6 +24,9 @@ int io_listen_udp(uv_loop_t *loop, uv_udp_t *handle, int fd);
 int io_listen_tcp(uv_loop_t *loop, uv_tcp_t *handle, int fd, int tcp_backlog, bool has_tls, bool has_http);
 /** Initialize a pipe handle and start listening. */
 int io_listen_pipe(uv_loop_t *loop, uv_pipe_t *handle, int fd);
+/** Initialize a poll handle (ep->handle) and start listening over AF_XDP on ifname.
+ * Sets ep->session. */
+int io_listen_xdp(uv_loop_t *loop, struct endpoint *ep, const char *ifname);
 
 /** Control socket / TTY - related functions. */
 void io_tty_process_input(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf);
@@ -44,3 +47,12 @@ void io_free(uv_handle_t *handle);
 
 int io_start_read(uv_handle_t *handle);
 int io_stop_read(uv_handle_t *handle);
+
+/** When uv_handle_t::type == UV_POLL, ::data points to this malloc-ed helper.
+ * (Other cases store a direct struct session pointer in ::data.) */
+typedef struct {
+       struct knot_xdp_socket *socket;
+       struct session *session;
+       uv_idle_t tx_waker;
+} xdp_handle_data_t;
+
index 2eb58737a2d9acb8b1d10a63be93c5738290d685..084f7bd0f25245763e57f3b6db73b8d02037902e 100644 (file)
@@ -16,6 +16,7 @@ typedef void *(*map_alloc_f)(void *, size_t);
 typedef void (*map_free_f)(void *baton, void *ptr);
 typedef void (*trace_log_f) (const struct kr_request *, const char *);
 typedef void (*trace_callback_f)(struct kr_request *);
+typedef uint8_t * (*alloc_wire_f)(struct kr_request *req, uint16_t *maxlen);
 typedef enum {KNOT_ANSWER, KNOT_AUTHORITY, KNOT_ADDITIONAL} knot_section_t;
 typedef struct {
        uint16_t pos;
@@ -160,6 +161,7 @@ struct kr_request_qsource_flags {
        _Bool tcp : 1;
        _Bool tls : 1;
        _Bool http : 1;
+       _Bool xdp : 1;
 };
 struct kr_request {
        struct kr_context *ctx;
@@ -193,6 +195,7 @@ struct kr_request {
        unsigned int uid;
        unsigned int count_no_nsaddr;
        unsigned int count_fail_row;
+       alloc_wire_f alloc_wire_cb;
 };
 enum kr_rank {KR_RANK_INITIAL, KR_RANK_OMIT, KR_RANK_TRY, KR_RANK_INDET = 4, KR_RANK_BOGUS, KR_RANK_MISMATCH, KR_RANK_MISSING, KR_RANK_INSECURE, KR_RANK_AUTH = 16, KR_RANK_SECURE = 32};
 typedef struct kr_cdb * kr_cdb_pt;
@@ -437,11 +440,14 @@ struct endpoint {
        int fd;
        int family;
        uint16_t port;
+       int16_t nic_queue;
        _Bool engaged;
        endpoint_flags_t flags;
 };
 struct request_ctx {
        struct kr_request req;
+       struct worker_ctx *worker;
+       struct qr_task *task;
        /* beware: hidden stub, to avoid hardcoding sockaddr lengths */
 };
 struct qr_task {
index 7bc6c737438929d1fe9af24bb656c898fc68ab40..ad58d238df24185aa5d65f018ee127cd37000d3e 100755 (executable)
@@ -70,6 +70,7 @@ typedef void *(*map_alloc_f)(void *, size_t);
 typedef void (*map_free_f)(void *baton, void *ptr);
 typedef void (*trace_log_f) (const struct kr_request *, const char *);
 typedef void (*trace_callback_f)(struct kr_request *);
+typedef uint8_t * (*alloc_wire_f)(struct kr_request *req, uint16_t *maxlen);
 "
 
 ${CDEFS} ${LIBKRES} types <<-EOF
index a960effa4c59738f3c5748f0d64b3ff9b9e87eda..66d2578013ea7e2d151913709fce335237bd1777 100644 (file)
@@ -46,6 +46,7 @@ struct endpoint {
        int fd;              /**< POSIX file-descriptor; always used. */
        int family;          /**< AF_INET or AF_INET6 or AF_UNIX */
        uint16_t port;       /**< TCP/UDP port.  Meaningless with AF_UNIX. */
+       int16_t nic_queue;   /**< -1 or queue number of the interface for AF_XDP use. */
        bool engaged;        /**< to some module or internally */
        endpoint_flags_t flags;
 };
index 9aa53f2148333b4a9f37a1034833cd1df0473b0b..8f97b6818706221f6df8e624f0ddd32df31f61b7 100644 (file)
 /* Per-socket (TCP or UDP) persistent structure.
  *
  * In particular note that for UDP clients it's just one session (per socket)
- * shared for all clients.  For TCP/TLS it's for the connection-specific socket,
+ * shared for all clients.  For TCP/TLS it's also for the connection-specific socket,
  * i.e one session per connection.
+ *
+ * LATER(optim.): the memory here is used a bit wastefully.
  */
 struct session {
        struct session_flags sflags;  /**< miscellaneous flags. */
@@ -43,7 +45,7 @@ struct session {
        trie_t *tasks;                /**< list of tasks assotiated with given session. */
        queue_t(struct qr_task *) waiting;  /**< list of tasks waiting for sending to upstream. */
 
-       uint8_t *wire_buf;            /**< Buffer for DNS message. */
+       uint8_t *wire_buf;            /**< Buffer for DNS message, except for XDP. */
        ssize_t wire_buf_size;        /**< Buffer size. */
        ssize_t wire_buf_start_idx;   /**< Data start offset in wire_buf. */
        ssize_t wire_buf_end_idx;     /**< Data end offset in wire_buf. */
@@ -364,6 +366,11 @@ struct session *session_new(uv_handle_t *handle, bool has_tls, bool has_http)
                assert(handle->loop->data);
                session->wire_buf = the_worker->wire_buf;
                session->wire_buf_size = sizeof(the_worker->wire_buf);
+       } else {
+               assert(handle->type == UV_POLL/*XDP*/);
+               /* - wire_buf* are left zeroed, as they make no sense
+                * - timer is unused but OK for simplicity (server-side sessions are few)
+                */
        }
 
        uv_timer_init(handle->loop, &session->timeout);
@@ -749,7 +756,7 @@ int session_wirebuf_process(struct session *session, const struct sockaddr *peer
        while (((pkt = session_produce_packet(session, &the_worker->pkt_pool)) != NULL) &&
               (ret < max_iterations)) {
                assert (!session_wirebuf_error(session));
-               int res = worker_submit(session, peer, pkt);
+               int res = worker_submit(session, peer, NULL, NULL, NULL, pkt);
                /* Errors from worker_submit() are intetionally *not* handled in order to
                 * ensure the entire wire buffer is processed. */
                if (res == kr_ok())
index 773180e7e162f57f2b60a9e87233026a94fa15a3..7c4bae4cef60dc6b1bc1efa3839a1a578e97f0ce 100644 (file)
@@ -23,12 +23,12 @@ struct session_flags {
        bool wirebuf_error : 1; /**< True: last operation with wirebuf ended up with an error. */
 };
 
-/* Allocate new session for a libuv handle.
- * If handle->tyoe is UV_UDP, tls parameter will be ignored. */
+/** Allocate new session for a libuv handle.
+ * If handle->type isn't UV_TCP, has_* parameters will be ignored. */
 struct session *session_new(uv_handle_t *handle, bool has_tls, bool has_http);
-/* Clear and free given session. */
+/** Clear and free given session. */
 void session_free(struct session *session);
-/* Clear session. */
+/** Clear session. */
 void session_clear(struct session *session);
 /** Close session. */
 void session_close(struct session *session);
index c420350dd34659a6c40b500462f03999ecf60232..2b13ef24f2f680c5e5efd90e74a2af652e4b36bc 100644 (file)
 #include <unistd.h>
 #include <gnutls/gnutls.h>
 
+#if ENABLE_XDP
+       #include <libknot/xdp/xdp.h>
+#endif
+
 #include "daemon/bindings/api.h"
 #include "daemon/engine.h"
 #include "daemon/io.h"
@@ -56,15 +60,19 @@ struct request_ctx
 {
        struct kr_request req;
 
+       struct worker_ctx *worker;
+       struct qr_task *task;
        struct {
-               /** Requestor's address; separate because of UDP session "sharing". */
-               union inaddr addr;
                /** NULL if the request didn't come over network. */
                struct session *session;
+               /** Requestor's address; separate because of UDP session "sharing". */
+               union inaddr addr;
+               /** Local address.  For AF_XDP we couldn't use session's,
+                * as the address might be different every time. */
+               union inaddr dst_addr;
+               /** MAC addresses - ours [0] and router's [1], in case of AF_XDP socket. */
+               uint8_t eth_addrs[2][6];
        } source;
-
-       struct worker_ctx *worker;
-       struct qr_task *task;
 };
 
 /** Query resolution task. */
@@ -258,14 +266,63 @@ static int subreq_key(char *dst, knot_pkt_t *pkt)
                        knot_pkt_qtype(pkt), knot_pkt_qtype(pkt));
 }
 
+#if ENABLE_XDP
+static uint8_t *alloc_wire_cb(struct kr_request *req, uint16_t *maxlen)
+{
+       assert(maxlen);
+       struct request_ctx *ctx = (struct request_ctx *)req;
+       /* We know it's an AF_XDP socket; otherwise this CB isn't assigned. */
+       uv_handle_t *handle = session_get_handle(ctx->source.session);
+       assert(handle->type == UV_POLL);
+       xdp_handle_data_t *xhd = handle->data;
+       knot_xdp_msg_t out;
+       int ret = knot_xdp_send_alloc(xhd->socket, ctx->source.addr.ip.sa_family == AF_INET6,
+                                       &out, NULL);
+       if (ret != KNOT_EOK) {
+               assert(ret == KNOT_ENOMEM);
+               *maxlen = 0;
+               return NULL;
+       }
+       *maxlen = MIN(*maxlen, out.payload.iov_len);
+       /* It's most convenient to fill the MAC addresses at this point. */
+       memcpy(out.eth_from, &ctx->source.eth_addrs[0], 6);
+       memcpy(out.eth_to,   &ctx->source.eth_addrs[1], 6);
+       return out.payload.iov_base;
+}
+static void free_wire(const struct request_ctx *ctx)
+{
+       assert(ctx->req.alloc_wire_cb == alloc_wire_cb);
+       knot_pkt_t *ans = ctx->req.answer;
+       if (unlikely(ans == NULL)) /* dropped */
+               return;
+       if (likely(ans->wire == NULL)) /* sent most likely */
+               return;
+       /* We know it's an AF_XDP socket; otherwise alloc_wire_cb isn't assigned. */
+       uv_handle_t *handle = session_get_handle(ctx->source.session);
+       assert(handle->type == UV_POLL);
+       xdp_handle_data_t *xhd = handle->data;
+       /* Freeing is done by sending an empty packet (the API won't really send it). */
+       knot_xdp_msg_t out;
+       out.payload.iov_base = ans->wire;
+       out.payload.iov_len = 0;
+       uint32_t sent;
+       int ret = knot_xdp_send(xhd->socket, &out, 1, &sent);
+       assert(ret == KNOT_EOK && sent == 0); (void)ret;
+       kr_log_verbose("[xdp] freed unsent buffer, ret = %d\n", ret);
+}
+#endif
+
 /** Create and initialize a request_ctx (on a fresh mempool).
  *
- * handle and addr point to the source of the request, and they are NULL
+ * session and addr point to the source of the request, and they are NULL
  * in case the request didn't come from network.
  */
 static struct request_ctx *request_create(struct worker_ctx *worker,
                                          struct session *session,
-                                         const struct sockaddr *peer,
+                                         const struct sockaddr *addr,
+                                         const struct sockaddr *dst_addr,
+                                         const uint8_t *eth_from,
+                                         const uint8_t *eth_to,
                                          uint32_t uid)
 {
        knot_mm_t pool = {
@@ -288,14 +345,26 @@ static struct request_ctx *request_create(struct worker_ctx *worker,
                assert(session_flags(session)->outgoing == false);
        }
        ctx->source.session = session;
+       assert(!!eth_to == !!eth_from);
+       const bool is_xdp = eth_to != NULL;
+       if (is_xdp) {
+       #if ENABLE_XDP
+               assert(session);
+               memcpy(&ctx->source.eth_addrs[0], eth_to,   sizeof(ctx->source.eth_addrs[0]));
+               memcpy(&ctx->source.eth_addrs[1], eth_from, sizeof(ctx->source.eth_addrs[1]));
+               ctx->req.alloc_wire_cb = alloc_wire_cb;
+       #else
+               assert(!EINVAL);
+               return NULL;
+       #endif
+       }
 
        struct kr_request *req = &ctx->req;
        req->pool = pool;
        req->vars_ref = LUA_NOREF;
        req->uid = uid;
+       req->qsource.flags.xdp = is_xdp;
        if (session) {
-               /* We assume the session will be alive during the whole life of the request. */
-               req->qsource.dst_addr = session_get_sockname(session);
                req->qsource.flags.tcp = session_get_handle(session)->type == UV_TCP;
                req->qsource.flags.tls = session_flags(session)->has_tls;
                req->qsource.flags.http = session_flags(session)->has_http;
@@ -307,8 +376,12 @@ static struct request_ctx *request_create(struct worker_ctx *worker,
                }
 #endif
                /* We need to store a copy of peer address. */
-               memcpy(&ctx->source.addr.ip, peer, kr_sockaddr_len(peer));
+               memcpy(&ctx->source.addr.ip, addr, kr_sockaddr_len(addr));
                req->qsource.addr = &ctx->source.addr.ip;
+               if (!dst_addr) /* We wouldn't have to copy in this case, but for consistency. */
+                       dst_addr = session_get_sockname(session);
+               memcpy(&ctx->source.dst_addr.ip, dst_addr, kr_sockaddr_len(dst_addr));
+               req->qsource.dst_addr = &ctx->source.dst_addr.ip;
        }
 
        worker->stats.rconcurrent += 1;
@@ -367,6 +440,14 @@ static void request_free(struct request_ctx *ctx)
                lua_pop(L, 1);
                ctx->req.vars_ref = LUA_NOREF;
        }
+       /* Make sure to free XDP buffer in case it wasn't sent. */
+       if (ctx->req.alloc_wire_cb) {
+       #if ENABLE_XDP
+               free_wire(ctx);
+       #else
+               assert(!EINVAL);
+       #endif
+       }
        /* Return mempool to ring or free it if it's full */
        pool_release(worker, ctx->req.pool.ctx);
        /* @note The 'task' is invalidated from now on. */
@@ -477,7 +558,7 @@ static void qr_task_complete(struct qr_task *task)
 }
 
 /* This is called when we send subrequest / answer */
-int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status)
+int qr_task_on_send(struct qr_task *task, const uv_handle_t *handle, int status)
 {
 
        if (task->finished) {
@@ -1164,6 +1245,52 @@ static bool subreq_enqueue(struct qr_task *task)
        return true;
 }
 
+#if ENABLE_XDP
+static void xdp_tx_waker(uv_idle_t *handle)
+{
+       int ret = knot_xdp_send_finish(handle->data);
+       if (ret != KNOT_EAGAIN && ret != KNOT_EOK)
+               kr_log_error("[xdp] check: ret = %d, %s\n", ret, knot_strerror(ret));
+       /* Apparently some drivers need many explicit wake-up calls
+        * even if we push no additional packets (in case they accumulated a lot) */
+       if (ret != KNOT_EAGAIN)
+               uv_idle_stop(handle);
+       knot_xdp_send_prepare(handle->data);
+       /* LATER(opt.): it _might_ be better for performance to do these two steps
+        * at different points in time */
+}
+#endif
+/** Send an answer packet over XDP. */
+static int xdp_push(struct qr_task *task, const uv_handle_t *src_handle)
+{
+#if ENABLE_XDP
+       struct request_ctx *ctx = task->ctx;
+       if (unlikely(ctx->req.answer == NULL)) /* meant to be dropped */
+               return kr_ok();
+       knot_xdp_msg_t msg;
+       const struct sockaddr *ip_from = &ctx->source.dst_addr.ip;
+       const struct sockaddr *ip_to   = &ctx->source.addr.ip;
+       memcpy(&msg.ip_from, ip_from, kr_sockaddr_len(ip_from));
+       memcpy(&msg.ip_to,   ip_to,   kr_sockaddr_len(ip_to));
+       msg.payload.iov_base = ctx->req.answer->wire;
+       msg.payload.iov_len  = ctx->req.answer->size;
+
+       xdp_handle_data_t *xhd = src_handle->data;
+       assert(xhd && xhd->socket && xhd->session == ctx->source.session);
+       uint32_t sent;
+       int ret = knot_xdp_send(xhd->socket, &msg, 1, &sent);
+       ctx->req.answer->wire = NULL; /* it's been freed */
+
+       uv_idle_start(&xhd->tx_waker, xdp_tx_waker);
+       kr_log_verbose("[xdp] pushed a packet, ret = %d\n", ret);
+
+       return qr_task_on_send(task, src_handle, ret);
+#else
+       assert(!EINVAL);
+       return kr_error(EINVAL);
+#endif
+}
+
 static int qr_task_finalize(struct qr_task *task, int state)
 {
        assert(task && task->leading == false);
@@ -1190,9 +1317,14 @@ static int qr_task_finalize(struct qr_task *task, int state)
        /* Send back answer */
        int ret;
        const uv_handle_t *src_handle = session_get_handle(source_session);
-       if (src_handle->type != UV_UDP && src_handle->type != UV_TCP) {
+       if (src_handle->type != UV_UDP && src_handle->type != UV_TCP
+                                      && src_handle->type != UV_POLL) {
                assert(false);
                ret = kr_error(EINVAL);
+
+       } else if (src_handle->type == UV_POLL) {
+               ret = xdp_push(task, src_handle);
+
        } else if (src_handle->type == UV_UDP && ENABLE_SENDMMSG) {
                int fd;
                ret = uv_fileno(src_handle, &fd);
@@ -1584,7 +1716,9 @@ static int parse_packet(knot_pkt_t *query)
        return ret;
 }
 
-int worker_submit(struct session *session, const struct sockaddr *peer, knot_pkt_t *pkt)
+int worker_submit(struct session *session,
+                 const struct sockaddr *peer, const struct sockaddr *dst_addr,
+                 const uint8_t *eth_from, const uint8_t *eth_to, knot_pkt_t *pkt)
 {
        if (!session || !pkt)
                return kr_error(EINVAL);
@@ -1622,8 +1756,9 @@ int worker_submit(struct session *session, const struct sockaddr *peer, knot_pkt
        struct qr_task *task = NULL;
        const struct sockaddr *addr = NULL;
        if (!is_outgoing) { /* request from a client */
-               struct request_ctx *ctx = request_create(the_worker, session, peer,
-                                                        knot_wire_get_id(pkt->wire));
+               struct request_ctx *ctx =
+                       request_create(the_worker, session, peer, dst_addr,
+                                       eth_from, eth_to, knot_wire_get_id(pkt->wire));
                if (http_ctx)
                        queue_pop(http_ctx->streams);
                if (!ctx)
@@ -1863,7 +1998,8 @@ struct qr_task *worker_resolve_start(knot_pkt_t *query, struct kr_qflags options
        }
 
 
-       struct request_ctx *ctx = request_create(worker, NULL, NULL, worker->next_request_uid);
+       struct request_ctx *ctx = request_create(worker, NULL, NULL, NULL, NULL, NULL,
+                                                worker->next_request_uid);
        if (!ctx) {
                return NULL;
        }
index 2e93617df3019e57f76de67412ff4e18d567cdbb..0e3e275801c2e5a320c7f8565dd8d0eace25731f 100644 (file)
@@ -31,12 +31,15 @@ void worker_deinit(void);
 /**
  * Process an incoming packet (query from a client or answer from upstream).
  *
- * @param session  session the packet came from
- * @param peer     address the packet came from
- * @param pkt      the packet, or NULL on an error from the transport layer
+ * @param session  session the packet came from, or NULL (not from network)
+ * @param peer     address the packet came from, or NULL (not from network)
+ * @param eth_*    MAC addresses or NULL (they're useful for XDP)
+ * @param pkt      the packet, or NULL (an error from the transport layer)
  * @return 0 or an error code
  */
-int worker_submit(struct session *session, const struct sockaddr *peer, knot_pkt_t *pkt);
+int worker_submit(struct session *session,
+                 const struct sockaddr *peer, const struct sockaddr *dst_addr,
+                 const uint8_t *eth_from, const uint8_t *eth_to, knot_pkt_t *pkt);
 
 /**
  * End current DNS/TCP session, this disassociates pending tasks from this session
@@ -108,7 +111,7 @@ void worker_task_subreq_finalize(struct qr_task *task);
 bool worker_task_finished(struct qr_task *task);
 
 /** To be called after sending a DNS message.  It mainly deals with cleanups. */
-int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status);
+int qr_task_on_send(struct qr_task *task, const uv_handle_t *handle, int status);
 
 /** Various worker statistics.  Sync with wrk_stats() */
 struct worker_stats {
index ba199477dc4ee9a5271c6636445d7f7e0e4d0a96..79438b26e38a6749d03b90e723ffa57f7c483888 100644 (file)
@@ -784,7 +784,7 @@ knot_pkt_t * kr_request_ensure_answer(struct kr_request *request)
        const knot_pkt_t *qs_pkt = request->qsource.packet;
        assert(qs_pkt);
        // Find answer_max: limit on DNS wire length.
-       size_t answer_max;
+       uint16_t answer_max;
        const struct kr_request_qsource_flags *qs_flags = &request->qsource.flags;
        assert((qs_flags->tls || qs_flags->http) ? qs_flags->tcp : true);
        if (!request->qsource.addr || qs_flags->tcp) {
@@ -801,14 +801,22 @@ knot_pkt_t * kr_request_ensure_answer(struct kr_request *request)
        }
 
        // Allocate the packet.
+       uint8_t *wire = NULL;
+       if (request->alloc_wire_cb) {
+               wire = request->alloc_wire_cb(request, &answer_max);
+               if (!wire)
+                       goto enomem;
+       }
        knot_pkt_t *answer = request->answer =
-               knot_pkt_new(NULL, answer_max, &request->pool);
+               knot_pkt_new(wire, answer_max, &request->pool);
        if (!answer || knot_pkt_init_response(answer, qs_pkt) != 0) {
                assert(!answer); // otherwise we messed something up
                goto enomem;
        }
+       if (!wire)
+               wire = answer->wire;
 
-       uint8_t *wire = answer->wire; // much was done by knot_pkt_init_response()
+       // Much was done by knot_pkt_init_response()
        knot_wire_set_ra(wire);
        knot_wire_set_rcode(wire, KNOT_RCODE_NOERROR);
        if (knot_wire_get_cd(qs_pkt->wire)) {
index 7d9df64efb22ce77f8f9225b2a485deb404002e5..d18bd64790450739105b16f718e558b957a2ecf6 100644 (file)
  */
 
 
+struct kr_request;
+/** Allocate buffer for answer's wire (*maxlen may get lowered).
+ *
+ * Motivation: XDP wire allocation is an overlap of library and daemon:
+ *  - it needs to be called from the library
+ *  - it needs to rely on some daemon's internals
+ *  - the library (currently) isn't allowed to directly use symbols from daemon
+ *    (contrary to modules), e.g. some of our lib-using tests run without daemon
+ *
+ * Note: after we obtain the wire, we're obliged to send it out.
+ * (So far there's no use case to allow cancelling at that point.)
+ */
+typedef uint8_t * (*alloc_wire_f)(struct kr_request *req, uint16_t *maxlen);
+
 /**
  * RRset rank - for cache and ranked_rr_*.
  *
@@ -165,6 +179,7 @@ struct kr_request_qsource_flags {
        bool tcp:1; /**< true if the request is not on UDP; only meaningful if (dst_addr). */
        bool tls:1; /**< true if the request is encrypted; only meaningful if (dst_addr). */
        bool http:1; /**< true if the request is on HTTP; only meaningful if (dst_addr). */
+       bool xdp:1; /**< true if the request is on AF_XDP; only meaningful if (dst_addr). */
 };
 
 /**
@@ -219,9 +234,10 @@ struct kr_request {
        trace_callback_f trace_finish; /**< Request finish tracepoint */
        int vars_ref; /**< Reference to per-request variable table. LUA_NOREF if not set. */
        knot_mm_t pool;
-       unsigned int uid; /** for logging purposes only */
+       unsigned int uid; /**< for logging purposes only */
        unsigned int count_no_nsaddr;
        unsigned int count_fail_row;
+       alloc_wire_f alloc_wire_cb; /**< CB to allocate answer wire (can be NULL). */
 };
 
 /** Initializer for an array of *_selected. */
index 1c431b9d6fd5a1ec24dc27d5187516df2819e39b..df16fe0332e17b538109e339a0965133c3bd2ffa 100644 (file)
@@ -110,6 +110,9 @@ else
   sendmmsg = get_option('sendmmsg') == 'enabled'
 endif
 
+### XDP: not configurable - we just check if libknot supports it
+xdp = meson.get_compiler('c').has_header('libknot/xdp/xdp.h')
+
 ### Systemd
 systemd_files = get_option('systemd_files')
 libsystemd = dependency('libsystemd', required: systemd_files == 'enabled')
@@ -171,6 +174,7 @@ conf_data.set_quoted('libknot_SONAME',
 conf_data.set('ENABLE_LIBSYSTEMD', libsystemd.found().to_int())
 conf_data.set('NOVERBOSELOG', not verbose_log)
 conf_data.set('ENABLE_SENDMMSG', sendmmsg.to_int())
+conf_data.set('ENABLE_XDP', xdp.to_int())
 conf_data.set('ENABLE_CAP_NG', capng.found().to_int())
 conf_data.set('ENABLE_DOH2', nghttp2.found().to_int())
 
@@ -286,6 +290,7 @@ s_build_config_tests = build_config_tests ? 'enabled' : 'disabled'
 s_build_extra_tests = build_extra_tests ? 'enabled' : 'disabled'
 s_install_kresd_conf = install_kresd_conf ? 'enabled' : 'disabled'
 s_sendmmsg = sendmmsg ? 'enabled': 'disabled'
+s_xdp = xdp ? 'enabled': 'disabled'
 s_openssl = openssl.found() ? 'present': 'missing'
 s_capng = capng.found() ? 'enabled': 'disabled'
 s_doh2 = nghttp2.found() ? 'enabled': 'disabled'
@@ -323,6 +328,7 @@ message('''
     group:              @0@'''.format(group) + '''
     install_kresd_conf: @0@'''.format(s_install_kresd_conf) + '''
     sendmmsg:           @0@'''.format(s_sendmmsg) + '''
+    XDP (in libknot):   @0@'''.format(s_xdp) + '''
     openssl debug:      @0@'''.format(s_openssl) + '''
     capng:              @0@'''.format(s_capng) + '''
     doh2:               @0@'''.format(s_doh2) + '''
index af26ca1c23bec7a53c349366a7efc4022e4e427c..1da310f109ae5d93d056db21eaba84e401aa822e 100644 (file)
@@ -44,6 +44,9 @@ Built-in counters keep track of number of queries and answers matching specific
 | request.doh      | external requests received over              |
 |                  | DNS-over-HTTP (:rfc:`8484`)                  |
 +------------------+----------------------------------------------+
+| request.xdp      | external requests received over plain UDP    |
+|                  | via an AF_XDP socket                         |
++------------------+----------------------------------------------+
 
 +----------------------------------------------------+
 | **Global answer counters**                         |
index 6997cbb0d00898b1976c1d40e72dcca05f72779e..a83ead891d2a24bb2462caf8f4c6785d903e5388 100644 (file)
@@ -44,7 +44,7 @@
        X(answer,aa) X(answer,tc) X(answer,rd) X(answer,ra) X(answer, ad) X(answer,cd) \
        X(answer,edns0) X(answer,do) \
        X(query,edns) X(query,dnssec) \
-       X(request,total) X(request,udp) X(request,tcp) \
+       X(request,total) X(request,udp) X(request,tcp) X(request,xdp) \
        X(request,dot) X(request,doh) X(request,internal) \
        X(const,end)
 
@@ -186,7 +186,7 @@ static int collect_transport(kr_layer_t *ctx)
 
        /**
         * Count each transport only once,
-        * i.e. DoT does not count as TCP.
+        * i.e. DoT does not count as TCP and XDP does not count as UDP.
         */
        if (req->qsource.flags.http)
                stat_const_add(data, metric_request_doh, 1);
@@ -194,6 +194,8 @@ static int collect_transport(kr_layer_t *ctx)
                stat_const_add(data, metric_request_dot, 1);
        else if (req->qsource.flags.tcp)
                stat_const_add(data, metric_request_tcp, 1);
+       else if (req->qsource.flags.xdp)
+               stat_const_add(data, metric_request_xdp, 1);
        else
                stat_const_add(data, metric_request_udp, 1);
        return ctx->state;