From ddc67e7b2d3e8ea655184db2a6a8865b7c1a3465 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Vladim=C3=ADr=20=C4=8Cun=C3=A1t?= Date: Wed, 21 Oct 2020 17:25:18 +0200 Subject: [PATCH] XDP: add backend parts Logging strings: I originally wanted to have four chars inside [], but it doesn't really matter in these cases where logs don't happen within a request, so "[xdp]" won due to uniformity and simplicity. --- daemon/io.c | 127 +++++++++++++++++++++++++++-- daemon/io.h | 12 +++ daemon/lua/kres-gen.lua | 6 ++ daemon/lua/kres-gen.sh | 1 + daemon/network.h | 1 + daemon/session.c | 13 ++- daemon/session.h | 8 +- daemon/worker.c | 168 +++++++++++++++++++++++++++++++++++---- daemon/worker.h | 13 +-- lib/resolve.c | 14 +++- lib/resolve.h | 18 ++++- meson.build | 6 ++ modules/stats/README.rst | 3 + modules/stats/stats.c | 6 +- 14 files changed, 355 insertions(+), 41 deletions(-) diff --git a/daemon/io.c b/daemon/io.c index ff732c5de..f02e6568d 100644 --- a/daemon/io.c +++ b/daemon/io.c @@ -2,13 +2,19 @@ * SPDX-License-Identifier: GPL-3.0-or-later */ -#include -#include +#include "daemon/io.h" + +#include #include #include -#include +#include +#include +#include + +#if ENABLE_XDP + #include +#endif -#include "daemon/io.h" #include "daemon/network.h" #include "daemon/worker.h" #include "daemon/tls.h" @@ -790,6 +796,103 @@ int io_listen_pipe(uv_loop_t *loop, uv_pipe_t *handle, int fd) return 0; } +#if ENABLE_XDP +static void xdp_rx(uv_poll_t* handle, int status, int events) +{ + const int XDP_RX_BATCH_SIZE = 64; + if (status < 0) { + kr_log_error("[xdp] poll status %d: %s\n", status, uv_strerror(status)); + return; + } + if (events != UV_READABLE) { + kr_log_error("[xdp] poll unexpected events: %d\n", events); + return; + } + + xdp_handle_data_t *xhd = handle->data; + assert(xhd && xhd->session && xhd->socket); + uint32_t rcvd; + knot_xdp_msg_t msgs[XDP_RX_BATCH_SIZE]; + int ret = knot_xdp_recv(xhd->socket, msgs, XDP_RX_BATCH_SIZE, &rcvd); + if (ret == KNOT_EOK) { + kr_log_verbose("[xdp] poll triggered, processing a batch of %d packets\n", + (int)rcvd); + } else { + kr_log_error("[xdp] knot_xdp_recv(): %d, %s\n", ret, knot_strerror(ret)); + assert(false); // ATM it can only be returned when called incorrectly + return; + } + assert(rcvd <= XDP_RX_BATCH_SIZE); + for (int i = 0; i < rcvd; ++i) { + const knot_xdp_msg_t *msg = &msgs[i]; + assert(msg->payload.iov_len <= KNOT_WIRE_MAX_PKTSIZE); + knot_pkt_t *kpkt = knot_pkt_new(msg->payload.iov_base, msg->payload.iov_len, + &the_worker->pkt_pool); + if (kpkt == NULL) { + ret = kr_error(ENOMEM); + } else { + ret = worker_submit(xhd->session, + (const struct sockaddr *)&msg->ip_from, + (const struct sockaddr *)&msg->ip_to, + msg->eth_from, msg->eth_to, kpkt); + } + if (ret) + kr_log_verbose("[xdp] worker_submit() == %d: %s\n", ret, kr_strerror(ret)); + mp_flush(the_worker->pkt_pool.ctx); + } + knot_xdp_recv_finish(xhd->socket, msgs, rcvd); +} +int io_listen_xdp(uv_loop_t *loop, struct endpoint *ep, const char *ifname) +{ + if (!ep || !ep->handle) { + return kr_error(EINVAL); + } + + // RLIMIT_MEMLOCK often needs raising when operating on BPF + static int ret_limit = 1; + if (ret_limit == 1) { + struct rlimit no_limit = { RLIM_INFINITY, RLIM_INFINITY }; + ret_limit = setrlimit(RLIMIT_MEMLOCK, &no_limit) + ? kr_error(errno) : 0; + } + if (ret_limit) return ret_limit; + + xdp_handle_data_t *xhd = malloc(sizeof(*xhd)); + if (!xhd) return kr_error(ENOMEM); + + const int port = ep->port ? ep->port : KNOT_XDP_LISTEN_PORT_ALL; + xhd->socket = NULL; // needed for some reason + int ret = knot_xdp_init(&xhd->socket, ifname, ep->nic_queue, port, + KNOT_XDP_LOAD_BPF_MAYBE); + + if (!ret) ret = uv_idle_init(loop, &xhd->tx_waker); + if (ret) { + free(xhd); + return kr_error(ret); + } + assert(xhd->socket); + xhd->tx_waker.data = xhd->socket; + + ep->fd = knot_xdp_socket_fd(xhd->socket); // probably not useful + ret = uv_poll_init(loop, (uv_poll_t *)ep->handle, ep->fd); + if (ret) { + knot_xdp_deinit(xhd->socket); + free(xhd); + return kr_error(ret); + } + + // beware: this sets poll_handle->data + xhd->session = session_new(ep->handle, false, false); + assert(!session_flags(xhd->session)->outgoing); + session_get_sockname(xhd->session)->sa_family = AF_XDP; // to have something in there + + ep->handle->data = xhd; + ret = uv_poll_start((uv_poll_t *)ep->handle, UV_READABLE, xdp_rx); + return ret; +} +#endif + + int io_create(uv_loop_t *loop, uv_handle_t *handle, int type, unsigned family, bool has_tls, bool has_http) { int ret = -1; @@ -811,12 +914,22 @@ int io_create(uv_loop_t *loop, uv_handle_t *handle, int type, unsigned family, b void io_deinit(uv_handle_t *handle) { - if (!handle) { + if (!handle || !handle->data) { return; } - if (handle->data) { + if (handle->type != UV_POLL) { session_free(handle->data); - handle->data = NULL; + } else { + #if ENABLE_XDP + xdp_handle_data_t *xhd = handle->data; + uv_idle_stop(&xhd->tx_waker); + uv_close((uv_handle_t *)&xhd->tx_waker, NULL); + session_free(xhd->session); + knot_xdp_deinit(xhd->socket); + free(xhd); + #else + assert(false); + #endif } } diff --git a/daemon/io.h b/daemon/io.h index 541cd2ae6..c83eabd62 100644 --- a/daemon/io.h +++ b/daemon/io.h @@ -24,6 +24,9 @@ int io_listen_udp(uv_loop_t *loop, uv_udp_t *handle, int fd); int io_listen_tcp(uv_loop_t *loop, uv_tcp_t *handle, int fd, int tcp_backlog, bool has_tls, bool has_http); /** Initialize a pipe handle and start listening. */ int io_listen_pipe(uv_loop_t *loop, uv_pipe_t *handle, int fd); +/** Initialize a poll handle (ep->handle) and start listening over AF_XDP on ifname. + * Sets ep->session. */ +int io_listen_xdp(uv_loop_t *loop, struct endpoint *ep, const char *ifname); /** Control socket / TTY - related functions. */ void io_tty_process_input(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf); @@ -44,3 +47,12 @@ void io_free(uv_handle_t *handle); int io_start_read(uv_handle_t *handle); int io_stop_read(uv_handle_t *handle); + +/** When uv_handle_t::type == UV_POLL, ::data points to this malloc-ed helper. + * (Other cases store a direct struct session pointer in ::data.) */ +typedef struct { + struct knot_xdp_socket *socket; + struct session *session; + uv_idle_t tx_waker; +} xdp_handle_data_t; + diff --git a/daemon/lua/kres-gen.lua b/daemon/lua/kres-gen.lua index 2eb58737a..084f7bd0f 100644 --- a/daemon/lua/kres-gen.lua +++ b/daemon/lua/kres-gen.lua @@ -16,6 +16,7 @@ typedef void *(*map_alloc_f)(void *, size_t); typedef void (*map_free_f)(void *baton, void *ptr); typedef void (*trace_log_f) (const struct kr_request *, const char *); typedef void (*trace_callback_f)(struct kr_request *); +typedef uint8_t * (*alloc_wire_f)(struct kr_request *req, uint16_t *maxlen); typedef enum {KNOT_ANSWER, KNOT_AUTHORITY, KNOT_ADDITIONAL} knot_section_t; typedef struct { uint16_t pos; @@ -160,6 +161,7 @@ struct kr_request_qsource_flags { _Bool tcp : 1; _Bool tls : 1; _Bool http : 1; + _Bool xdp : 1; }; struct kr_request { struct kr_context *ctx; @@ -193,6 +195,7 @@ struct kr_request { unsigned int uid; unsigned int count_no_nsaddr; unsigned int count_fail_row; + alloc_wire_f alloc_wire_cb; }; enum kr_rank {KR_RANK_INITIAL, KR_RANK_OMIT, KR_RANK_TRY, KR_RANK_INDET = 4, KR_RANK_BOGUS, KR_RANK_MISMATCH, KR_RANK_MISSING, KR_RANK_INSECURE, KR_RANK_AUTH = 16, KR_RANK_SECURE = 32}; typedef struct kr_cdb * kr_cdb_pt; @@ -437,11 +440,14 @@ struct endpoint { int fd; int family; uint16_t port; + int16_t nic_queue; _Bool engaged; endpoint_flags_t flags; }; struct request_ctx { struct kr_request req; + struct worker_ctx *worker; + struct qr_task *task; /* beware: hidden stub, to avoid hardcoding sockaddr lengths */ }; struct qr_task { diff --git a/daemon/lua/kres-gen.sh b/daemon/lua/kres-gen.sh index 7bc6c7374..ad58d238d 100755 --- a/daemon/lua/kres-gen.sh +++ b/daemon/lua/kres-gen.sh @@ -70,6 +70,7 @@ typedef void *(*map_alloc_f)(void *, size_t); typedef void (*map_free_f)(void *baton, void *ptr); typedef void (*trace_log_f) (const struct kr_request *, const char *); typedef void (*trace_callback_f)(struct kr_request *); +typedef uint8_t * (*alloc_wire_f)(struct kr_request *req, uint16_t *maxlen); " ${CDEFS} ${LIBKRES} types <<-EOF diff --git a/daemon/network.h b/daemon/network.h index a960effa4..66d257801 100644 --- a/daemon/network.h +++ b/daemon/network.h @@ -46,6 +46,7 @@ struct endpoint { int fd; /**< POSIX file-descriptor; always used. */ int family; /**< AF_INET or AF_INET6 or AF_UNIX */ uint16_t port; /**< TCP/UDP port. Meaningless with AF_UNIX. */ + int16_t nic_queue; /**< -1 or queue number of the interface for AF_XDP use. */ bool engaged; /**< to some module or internally */ endpoint_flags_t flags; }; diff --git a/daemon/session.c b/daemon/session.c index 9aa53f214..8f97b6818 100644 --- a/daemon/session.c +++ b/daemon/session.c @@ -23,8 +23,10 @@ /* Per-socket (TCP or UDP) persistent structure. * * In particular note that for UDP clients it's just one session (per socket) - * shared for all clients. For TCP/TLS it's for the connection-specific socket, + * shared for all clients. For TCP/TLS it's also for the connection-specific socket, * i.e one session per connection. + * + * LATER(optim.): the memory here is used a bit wastefully. */ struct session { struct session_flags sflags; /**< miscellaneous flags. */ @@ -43,7 +45,7 @@ struct session { trie_t *tasks; /**< list of tasks assotiated with given session. */ queue_t(struct qr_task *) waiting; /**< list of tasks waiting for sending to upstream. */ - uint8_t *wire_buf; /**< Buffer for DNS message. */ + uint8_t *wire_buf; /**< Buffer for DNS message, except for XDP. */ ssize_t wire_buf_size; /**< Buffer size. */ ssize_t wire_buf_start_idx; /**< Data start offset in wire_buf. */ ssize_t wire_buf_end_idx; /**< Data end offset in wire_buf. */ @@ -364,6 +366,11 @@ struct session *session_new(uv_handle_t *handle, bool has_tls, bool has_http) assert(handle->loop->data); session->wire_buf = the_worker->wire_buf; session->wire_buf_size = sizeof(the_worker->wire_buf); + } else { + assert(handle->type == UV_POLL/*XDP*/); + /* - wire_buf* are left zeroed, as they make no sense + * - timer is unused but OK for simplicity (server-side sessions are few) + */ } uv_timer_init(handle->loop, &session->timeout); @@ -749,7 +756,7 @@ int session_wirebuf_process(struct session *session, const struct sockaddr *peer while (((pkt = session_produce_packet(session, &the_worker->pkt_pool)) != NULL) && (ret < max_iterations)) { assert (!session_wirebuf_error(session)); - int res = worker_submit(session, peer, pkt); + int res = worker_submit(session, peer, NULL, NULL, NULL, pkt); /* Errors from worker_submit() are intetionally *not* handled in order to * ensure the entire wire buffer is processed. */ if (res == kr_ok()) diff --git a/daemon/session.h b/daemon/session.h index 773180e7e..7c4bae4ce 100644 --- a/daemon/session.h +++ b/daemon/session.h @@ -23,12 +23,12 @@ struct session_flags { bool wirebuf_error : 1; /**< True: last operation with wirebuf ended up with an error. */ }; -/* Allocate new session for a libuv handle. - * If handle->tyoe is UV_UDP, tls parameter will be ignored. */ +/** Allocate new session for a libuv handle. + * If handle->type isn't UV_TCP, has_* parameters will be ignored. */ struct session *session_new(uv_handle_t *handle, bool has_tls, bool has_http); -/* Clear and free given session. */ +/** Clear and free given session. */ void session_free(struct session *session); -/* Clear session. */ +/** Clear session. */ void session_clear(struct session *session); /** Close session. */ void session_close(struct session *session); diff --git a/daemon/worker.c b/daemon/worker.c index c420350dd..2b13ef24f 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -22,6 +22,10 @@ #include #include +#if ENABLE_XDP + #include +#endif + #include "daemon/bindings/api.h" #include "daemon/engine.h" #include "daemon/io.h" @@ -56,15 +60,19 @@ struct request_ctx { struct kr_request req; + struct worker_ctx *worker; + struct qr_task *task; struct { - /** Requestor's address; separate because of UDP session "sharing". */ - union inaddr addr; /** NULL if the request didn't come over network. */ struct session *session; + /** Requestor's address; separate because of UDP session "sharing". */ + union inaddr addr; + /** Local address. For AF_XDP we couldn't use session's, + * as the address might be different every time. */ + union inaddr dst_addr; + /** MAC addresses - ours [0] and router's [1], in case of AF_XDP socket. */ + uint8_t eth_addrs[2][6]; } source; - - struct worker_ctx *worker; - struct qr_task *task; }; /** Query resolution task. */ @@ -258,14 +266,63 @@ static int subreq_key(char *dst, knot_pkt_t *pkt) knot_pkt_qtype(pkt), knot_pkt_qtype(pkt)); } +#if ENABLE_XDP +static uint8_t *alloc_wire_cb(struct kr_request *req, uint16_t *maxlen) +{ + assert(maxlen); + struct request_ctx *ctx = (struct request_ctx *)req; + /* We know it's an AF_XDP socket; otherwise this CB isn't assigned. */ + uv_handle_t *handle = session_get_handle(ctx->source.session); + assert(handle->type == UV_POLL); + xdp_handle_data_t *xhd = handle->data; + knot_xdp_msg_t out; + int ret = knot_xdp_send_alloc(xhd->socket, ctx->source.addr.ip.sa_family == AF_INET6, + &out, NULL); + if (ret != KNOT_EOK) { + assert(ret == KNOT_ENOMEM); + *maxlen = 0; + return NULL; + } + *maxlen = MIN(*maxlen, out.payload.iov_len); + /* It's most convenient to fill the MAC addresses at this point. */ + memcpy(out.eth_from, &ctx->source.eth_addrs[0], 6); + memcpy(out.eth_to, &ctx->source.eth_addrs[1], 6); + return out.payload.iov_base; +} +static void free_wire(const struct request_ctx *ctx) +{ + assert(ctx->req.alloc_wire_cb == alloc_wire_cb); + knot_pkt_t *ans = ctx->req.answer; + if (unlikely(ans == NULL)) /* dropped */ + return; + if (likely(ans->wire == NULL)) /* sent most likely */ + return; + /* We know it's an AF_XDP socket; otherwise alloc_wire_cb isn't assigned. */ + uv_handle_t *handle = session_get_handle(ctx->source.session); + assert(handle->type == UV_POLL); + xdp_handle_data_t *xhd = handle->data; + /* Freeing is done by sending an empty packet (the API won't really send it). */ + knot_xdp_msg_t out; + out.payload.iov_base = ans->wire; + out.payload.iov_len = 0; + uint32_t sent; + int ret = knot_xdp_send(xhd->socket, &out, 1, &sent); + assert(ret == KNOT_EOK && sent == 0); (void)ret; + kr_log_verbose("[xdp] freed unsent buffer, ret = %d\n", ret); +} +#endif + /** Create and initialize a request_ctx (on a fresh mempool). * - * handle and addr point to the source of the request, and they are NULL + * session and addr point to the source of the request, and they are NULL * in case the request didn't come from network. */ static struct request_ctx *request_create(struct worker_ctx *worker, struct session *session, - const struct sockaddr *peer, + const struct sockaddr *addr, + const struct sockaddr *dst_addr, + const uint8_t *eth_from, + const uint8_t *eth_to, uint32_t uid) { knot_mm_t pool = { @@ -288,14 +345,26 @@ static struct request_ctx *request_create(struct worker_ctx *worker, assert(session_flags(session)->outgoing == false); } ctx->source.session = session; + assert(!!eth_to == !!eth_from); + const bool is_xdp = eth_to != NULL; + if (is_xdp) { + #if ENABLE_XDP + assert(session); + memcpy(&ctx->source.eth_addrs[0], eth_to, sizeof(ctx->source.eth_addrs[0])); + memcpy(&ctx->source.eth_addrs[1], eth_from, sizeof(ctx->source.eth_addrs[1])); + ctx->req.alloc_wire_cb = alloc_wire_cb; + #else + assert(!EINVAL); + return NULL; + #endif + } struct kr_request *req = &ctx->req; req->pool = pool; req->vars_ref = LUA_NOREF; req->uid = uid; + req->qsource.flags.xdp = is_xdp; if (session) { - /* We assume the session will be alive during the whole life of the request. */ - req->qsource.dst_addr = session_get_sockname(session); req->qsource.flags.tcp = session_get_handle(session)->type == UV_TCP; req->qsource.flags.tls = session_flags(session)->has_tls; req->qsource.flags.http = session_flags(session)->has_http; @@ -307,8 +376,12 @@ static struct request_ctx *request_create(struct worker_ctx *worker, } #endif /* We need to store a copy of peer address. */ - memcpy(&ctx->source.addr.ip, peer, kr_sockaddr_len(peer)); + memcpy(&ctx->source.addr.ip, addr, kr_sockaddr_len(addr)); req->qsource.addr = &ctx->source.addr.ip; + if (!dst_addr) /* We wouldn't have to copy in this case, but for consistency. */ + dst_addr = session_get_sockname(session); + memcpy(&ctx->source.dst_addr.ip, dst_addr, kr_sockaddr_len(dst_addr)); + req->qsource.dst_addr = &ctx->source.dst_addr.ip; } worker->stats.rconcurrent += 1; @@ -367,6 +440,14 @@ static void request_free(struct request_ctx *ctx) lua_pop(L, 1); ctx->req.vars_ref = LUA_NOREF; } + /* Make sure to free XDP buffer in case it wasn't sent. */ + if (ctx->req.alloc_wire_cb) { + #if ENABLE_XDP + free_wire(ctx); + #else + assert(!EINVAL); + #endif + } /* Return mempool to ring or free it if it's full */ pool_release(worker, ctx->req.pool.ctx); /* @note The 'task' is invalidated from now on. */ @@ -477,7 +558,7 @@ static void qr_task_complete(struct qr_task *task) } /* This is called when we send subrequest / answer */ -int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status) +int qr_task_on_send(struct qr_task *task, const uv_handle_t *handle, int status) { if (task->finished) { @@ -1164,6 +1245,52 @@ static bool subreq_enqueue(struct qr_task *task) return true; } +#if ENABLE_XDP +static void xdp_tx_waker(uv_idle_t *handle) +{ + int ret = knot_xdp_send_finish(handle->data); + if (ret != KNOT_EAGAIN && ret != KNOT_EOK) + kr_log_error("[xdp] check: ret = %d, %s\n", ret, knot_strerror(ret)); + /* Apparently some drivers need many explicit wake-up calls + * even if we push no additional packets (in case they accumulated a lot) */ + if (ret != KNOT_EAGAIN) + uv_idle_stop(handle); + knot_xdp_send_prepare(handle->data); + /* LATER(opt.): it _might_ be better for performance to do these two steps + * at different points in time */ +} +#endif +/** Send an answer packet over XDP. */ +static int xdp_push(struct qr_task *task, const uv_handle_t *src_handle) +{ +#if ENABLE_XDP + struct request_ctx *ctx = task->ctx; + if (unlikely(ctx->req.answer == NULL)) /* meant to be dropped */ + return kr_ok(); + knot_xdp_msg_t msg; + const struct sockaddr *ip_from = &ctx->source.dst_addr.ip; + const struct sockaddr *ip_to = &ctx->source.addr.ip; + memcpy(&msg.ip_from, ip_from, kr_sockaddr_len(ip_from)); + memcpy(&msg.ip_to, ip_to, kr_sockaddr_len(ip_to)); + msg.payload.iov_base = ctx->req.answer->wire; + msg.payload.iov_len = ctx->req.answer->size; + + xdp_handle_data_t *xhd = src_handle->data; + assert(xhd && xhd->socket && xhd->session == ctx->source.session); + uint32_t sent; + int ret = knot_xdp_send(xhd->socket, &msg, 1, &sent); + ctx->req.answer->wire = NULL; /* it's been freed */ + + uv_idle_start(&xhd->tx_waker, xdp_tx_waker); + kr_log_verbose("[xdp] pushed a packet, ret = %d\n", ret); + + return qr_task_on_send(task, src_handle, ret); +#else + assert(!EINVAL); + return kr_error(EINVAL); +#endif +} + static int qr_task_finalize(struct qr_task *task, int state) { assert(task && task->leading == false); @@ -1190,9 +1317,14 @@ static int qr_task_finalize(struct qr_task *task, int state) /* Send back answer */ int ret; const uv_handle_t *src_handle = session_get_handle(source_session); - if (src_handle->type != UV_UDP && src_handle->type != UV_TCP) { + if (src_handle->type != UV_UDP && src_handle->type != UV_TCP + && src_handle->type != UV_POLL) { assert(false); ret = kr_error(EINVAL); + + } else if (src_handle->type == UV_POLL) { + ret = xdp_push(task, src_handle); + } else if (src_handle->type == UV_UDP && ENABLE_SENDMMSG) { int fd; ret = uv_fileno(src_handle, &fd); @@ -1584,7 +1716,9 @@ static int parse_packet(knot_pkt_t *query) return ret; } -int worker_submit(struct session *session, const struct sockaddr *peer, knot_pkt_t *pkt) +int worker_submit(struct session *session, + const struct sockaddr *peer, const struct sockaddr *dst_addr, + const uint8_t *eth_from, const uint8_t *eth_to, knot_pkt_t *pkt) { if (!session || !pkt) return kr_error(EINVAL); @@ -1622,8 +1756,9 @@ int worker_submit(struct session *session, const struct sockaddr *peer, knot_pkt struct qr_task *task = NULL; const struct sockaddr *addr = NULL; if (!is_outgoing) { /* request from a client */ - struct request_ctx *ctx = request_create(the_worker, session, peer, - knot_wire_get_id(pkt->wire)); + struct request_ctx *ctx = + request_create(the_worker, session, peer, dst_addr, + eth_from, eth_to, knot_wire_get_id(pkt->wire)); if (http_ctx) queue_pop(http_ctx->streams); if (!ctx) @@ -1863,7 +1998,8 @@ struct qr_task *worker_resolve_start(knot_pkt_t *query, struct kr_qflags options } - struct request_ctx *ctx = request_create(worker, NULL, NULL, worker->next_request_uid); + struct request_ctx *ctx = request_create(worker, NULL, NULL, NULL, NULL, NULL, + worker->next_request_uid); if (!ctx) { return NULL; } diff --git a/daemon/worker.h b/daemon/worker.h index 2e93617df..0e3e27580 100644 --- a/daemon/worker.h +++ b/daemon/worker.h @@ -31,12 +31,15 @@ void worker_deinit(void); /** * Process an incoming packet (query from a client or answer from upstream). * - * @param session session the packet came from - * @param peer address the packet came from - * @param pkt the packet, or NULL on an error from the transport layer + * @param session session the packet came from, or NULL (not from network) + * @param peer address the packet came from, or NULL (not from network) + * @param eth_* MAC addresses or NULL (they're useful for XDP) + * @param pkt the packet, or NULL (an error from the transport layer) * @return 0 or an error code */ -int worker_submit(struct session *session, const struct sockaddr *peer, knot_pkt_t *pkt); +int worker_submit(struct session *session, + const struct sockaddr *peer, const struct sockaddr *dst_addr, + const uint8_t *eth_from, const uint8_t *eth_to, knot_pkt_t *pkt); /** * End current DNS/TCP session, this disassociates pending tasks from this session @@ -108,7 +111,7 @@ void worker_task_subreq_finalize(struct qr_task *task); bool worker_task_finished(struct qr_task *task); /** To be called after sending a DNS message. It mainly deals with cleanups. */ -int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status); +int qr_task_on_send(struct qr_task *task, const uv_handle_t *handle, int status); /** Various worker statistics. Sync with wrk_stats() */ struct worker_stats { diff --git a/lib/resolve.c b/lib/resolve.c index ba199477d..79438b26e 100644 --- a/lib/resolve.c +++ b/lib/resolve.c @@ -784,7 +784,7 @@ knot_pkt_t * kr_request_ensure_answer(struct kr_request *request) const knot_pkt_t *qs_pkt = request->qsource.packet; assert(qs_pkt); // Find answer_max: limit on DNS wire length. - size_t answer_max; + uint16_t answer_max; const struct kr_request_qsource_flags *qs_flags = &request->qsource.flags; assert((qs_flags->tls || qs_flags->http) ? qs_flags->tcp : true); if (!request->qsource.addr || qs_flags->tcp) { @@ -801,14 +801,22 @@ knot_pkt_t * kr_request_ensure_answer(struct kr_request *request) } // Allocate the packet. + uint8_t *wire = NULL; + if (request->alloc_wire_cb) { + wire = request->alloc_wire_cb(request, &answer_max); + if (!wire) + goto enomem; + } knot_pkt_t *answer = request->answer = - knot_pkt_new(NULL, answer_max, &request->pool); + knot_pkt_new(wire, answer_max, &request->pool); if (!answer || knot_pkt_init_response(answer, qs_pkt) != 0) { assert(!answer); // otherwise we messed something up goto enomem; } + if (!wire) + wire = answer->wire; - uint8_t *wire = answer->wire; // much was done by knot_pkt_init_response() + // Much was done by knot_pkt_init_response() knot_wire_set_ra(wire); knot_wire_set_rcode(wire, KNOT_RCODE_NOERROR); if (knot_wire_get_cd(qs_pkt->wire)) { diff --git a/lib/resolve.h b/lib/resolve.h index 7d9df64ef..d18bd6479 100644 --- a/lib/resolve.h +++ b/lib/resolve.h @@ -61,6 +61,20 @@ */ +struct kr_request; +/** Allocate buffer for answer's wire (*maxlen may get lowered). + * + * Motivation: XDP wire allocation is an overlap of library and daemon: + * - it needs to be called from the library + * - it needs to rely on some daemon's internals + * - the library (currently) isn't allowed to directly use symbols from daemon + * (contrary to modules), e.g. some of our lib-using tests run without daemon + * + * Note: after we obtain the wire, we're obliged to send it out. + * (So far there's no use case to allow cancelling at that point.) + */ +typedef uint8_t * (*alloc_wire_f)(struct kr_request *req, uint16_t *maxlen); + /** * RRset rank - for cache and ranked_rr_*. * @@ -165,6 +179,7 @@ struct kr_request_qsource_flags { bool tcp:1; /**< true if the request is not on UDP; only meaningful if (dst_addr). */ bool tls:1; /**< true if the request is encrypted; only meaningful if (dst_addr). */ bool http:1; /**< true if the request is on HTTP; only meaningful if (dst_addr). */ + bool xdp:1; /**< true if the request is on AF_XDP; only meaningful if (dst_addr). */ }; /** @@ -219,9 +234,10 @@ struct kr_request { trace_callback_f trace_finish; /**< Request finish tracepoint */ int vars_ref; /**< Reference to per-request variable table. LUA_NOREF if not set. */ knot_mm_t pool; - unsigned int uid; /** for logging purposes only */ + unsigned int uid; /**< for logging purposes only */ unsigned int count_no_nsaddr; unsigned int count_fail_row; + alloc_wire_f alloc_wire_cb; /**< CB to allocate answer wire (can be NULL). */ }; /** Initializer for an array of *_selected. */ diff --git a/meson.build b/meson.build index 1c431b9d6..df16fe033 100644 --- a/meson.build +++ b/meson.build @@ -110,6 +110,9 @@ else sendmmsg = get_option('sendmmsg') == 'enabled' endif +### XDP: not configurable - we just check if libknot supports it +xdp = meson.get_compiler('c').has_header('libknot/xdp/xdp.h') + ### Systemd systemd_files = get_option('systemd_files') libsystemd = dependency('libsystemd', required: systemd_files == 'enabled') @@ -171,6 +174,7 @@ conf_data.set_quoted('libknot_SONAME', conf_data.set('ENABLE_LIBSYSTEMD', libsystemd.found().to_int()) conf_data.set('NOVERBOSELOG', not verbose_log) conf_data.set('ENABLE_SENDMMSG', sendmmsg.to_int()) +conf_data.set('ENABLE_XDP', xdp.to_int()) conf_data.set('ENABLE_CAP_NG', capng.found().to_int()) conf_data.set('ENABLE_DOH2', nghttp2.found().to_int()) @@ -286,6 +290,7 @@ s_build_config_tests = build_config_tests ? 'enabled' : 'disabled' s_build_extra_tests = build_extra_tests ? 'enabled' : 'disabled' s_install_kresd_conf = install_kresd_conf ? 'enabled' : 'disabled' s_sendmmsg = sendmmsg ? 'enabled': 'disabled' +s_xdp = xdp ? 'enabled': 'disabled' s_openssl = openssl.found() ? 'present': 'missing' s_capng = capng.found() ? 'enabled': 'disabled' s_doh2 = nghttp2.found() ? 'enabled': 'disabled' @@ -323,6 +328,7 @@ message(''' group: @0@'''.format(group) + ''' install_kresd_conf: @0@'''.format(s_install_kresd_conf) + ''' sendmmsg: @0@'''.format(s_sendmmsg) + ''' + XDP (in libknot): @0@'''.format(s_xdp) + ''' openssl debug: @0@'''.format(s_openssl) + ''' capng: @0@'''.format(s_capng) + ''' doh2: @0@'''.format(s_doh2) + ''' diff --git a/modules/stats/README.rst b/modules/stats/README.rst index af26ca1c2..1da310f10 100644 --- a/modules/stats/README.rst +++ b/modules/stats/README.rst @@ -44,6 +44,9 @@ Built-in counters keep track of number of queries and answers matching specific | request.doh | external requests received over | | | DNS-over-HTTP (:rfc:`8484`) | +------------------+----------------------------------------------+ +| request.xdp | external requests received over plain UDP | +| | via an AF_XDP socket | ++------------------+----------------------------------------------+ +----------------------------------------------------+ | **Global answer counters** | diff --git a/modules/stats/stats.c b/modules/stats/stats.c index 6997cbb0d..a83ead891 100644 --- a/modules/stats/stats.c +++ b/modules/stats/stats.c @@ -44,7 +44,7 @@ X(answer,aa) X(answer,tc) X(answer,rd) X(answer,ra) X(answer, ad) X(answer,cd) \ X(answer,edns0) X(answer,do) \ X(query,edns) X(query,dnssec) \ - X(request,total) X(request,udp) X(request,tcp) \ + X(request,total) X(request,udp) X(request,tcp) X(request,xdp) \ X(request,dot) X(request,doh) X(request,internal) \ X(const,end) @@ -186,7 +186,7 @@ static int collect_transport(kr_layer_t *ctx) /** * Count each transport only once, - * i.e. DoT does not count as TCP. + * i.e. DoT does not count as TCP and XDP does not count as UDP. */ if (req->qsource.flags.http) stat_const_add(data, metric_request_doh, 1); @@ -194,6 +194,8 @@ static int collect_transport(kr_layer_t *ctx) stat_const_add(data, metric_request_dot, 1); else if (req->qsource.flags.tcp) stat_const_add(data, metric_request_tcp, 1); + else if (req->qsource.flags.xdp) + stat_const_add(data, metric_request_xdp, 1); else stat_const_add(data, metric_request_udp, 1); return ctx->state; -- 2.47.2