]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon: XDP with protolayers
authorOto Šťáva <oto.stava@nic.cz>
Thu, 20 Oct 2022 10:10:15 +0000 (12:10 +0200)
committerOto Šťáva <oto.stava@nic.cz>
Thu, 26 Jan 2023 11:56:08 +0000 (12:56 +0100)
daemon/http.c
daemon/io.c
daemon/io.h
daemon/network.c
daemon/session2.c
daemon/session2.h
daemon/tls.c
daemon/worker.c

index ee067aa456496071953a3e4b8123eb2465686b55..e88e908a197cce371b45ecb5179062dd817e5099 100644 (file)
@@ -368,7 +368,7 @@ static int http_send_response_rst_stream(struct pl_http_sess_data *ctx, int32_t
 }
 
 static void callback_finished_free_baton(int status, struct session2 *session,
-                                         const void *target, void *baton)
+                                         const struct comm_info *comm, void *baton)
 {
        free(baton);
 }
index 2503c81da70e20a299c563c4cf88f90379c483c9..13dde0ae08f5629cbf565ae8ae4cde0b0fec3802 100644 (file)
@@ -63,7 +63,7 @@ static void handle_getbuf(uv_handle_t* handle, size_t suggested_size, uv_buf_t*
 }
 
 static void udp_on_unwrapped(int status, struct session2 *session,
-                             const void *target, void *baton)
+                             const struct comm_info *comm, void *baton)
 {
        wire_buf_reset(&session->wire_buf);
 }
@@ -92,7 +92,11 @@ void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
                return;
        }
 
-       session2_unwrap(s, protolayer_wire_buf(&s->wire_buf), comm_addr,
+       struct comm_info in_comm = {
+               .comm_addr = comm_addr,
+               .src_addr = comm_addr
+       };
+       session2_unwrap(s, protolayer_wire_buf(&s->wire_buf), &in_comm,
                        udp_on_unwrapped, NULL);
 }
 
@@ -134,19 +138,9 @@ static int family_to_freebind_option(sa_family_t sa_family, int *level, int *nam
 struct pl_udp_iter_data {
        PROTOLAYER_DATA_HEADER();
        struct proxy_result proxy;
-       struct comm_info comm;
        bool has_proxy;
 };
 
-static int pl_udp_iter_init(struct protolayer_manager *manager,
-                            struct protolayer_iter_ctx *ctx,
-                            void *iter_data)
-{
-       struct pl_udp_iter_data *udp = iter_data;
-       ctx->comm = &udp->comm;
-       return kr_ok();
-}
-
 static enum protolayer_iter_cb_result pl_udp_unwrap(
                void *sess_data, void *iter_data, struct protolayer_iter_ctx *ctx)
 {
@@ -161,9 +155,7 @@ static enum protolayer_iter_cb_result pl_udp_unwrap(
 
        char *data = ctx->payload.buffer.buf;
        ssize_t data_len = ctx->payload.buffer.len;
-       struct comm_info *comm = ctx->comm;
-       comm->comm_addr = ctx->target;
-       comm->src_addr = ctx->target;
+       struct comm_info *comm = &ctx->comm;
        if (!s->outgoing && proxy_header_present(data, data_len)) {
                if (!proxy_allowed(comm->comm_addr)) {
                        kr_log_debug(IO, "<= ignoring PROXYv2 UDP from disallowed address '%s'\n",
@@ -234,6 +226,18 @@ struct pl_tcp_sess_data {
        bool has_proxy : 1;
 };
 
+static int pl_tcp_sess_init(struct protolayer_manager *manager,
+                            void *data,
+                            void *param)
+{
+       struct sockaddr *peer = session2_get_peer(manager->session);
+       manager->session->comm = (struct comm_info) {
+               .comm_addr = peer,
+               .src_addr = peer
+       };
+       return 0;
+}
+
 static int pl_tcp_sess_deinit(struct protolayer_manager *manager, void *sess_data)
 {
        struct pl_tcp_sess_data *tcp = sess_data;
@@ -284,10 +288,7 @@ static enum protolayer_iter_cb_result pl_tcp_unwrap(
 
        char *data = wire_buf_data(ctx->payload.wire_buf); /* layer's or session's wirebuf */
        ssize_t data_len = wire_buf_data_length(ctx->payload.wire_buf);
-       struct comm_info *comm = ctx->comm;
-       comm->src_addr = peer;
-       comm->comm_addr = peer;
-       comm->dst_addr = NULL;
+       struct comm_info *comm = &ctx->manager->session->comm;
        if (!s->outgoing && !tcp->had_data && proxy_header_present(data, data_len)) {
                if (!proxy_allowed(comm->src_addr)) {
                        if (kr_log_is_debug(IO, NULL)) {
@@ -335,7 +336,7 @@ static enum protolayer_iter_cb_result pl_tcp_unwrap(
        }
 
        tcp->had_data = true;
-
+       ctx->comm = ctx->manager->session->comm;
        return protolayer_continue(ctx);
 }
 
@@ -359,13 +360,13 @@ void io_protolayers_init(void)
 {
        protolayer_globals[PROTOLAYER_UDP] = (struct protolayer_globals){
                .iter_size = sizeof(struct pl_udp_iter_data),
-               .iter_init = pl_udp_iter_init,
                .unwrap = pl_udp_unwrap,
                .event_wrap = pl_udp_event_wrap,
        };
 
        protolayer_globals[PROTOLAYER_TCP] = (struct protolayer_globals){
                .sess_size = sizeof(struct pl_tcp_sess_data),
+               .sess_init = pl_tcp_sess_init,
                .sess_deinit = pl_tcp_sess_deinit,
                .unwrap = pl_tcp_unwrap,
                .event_wrap = pl_tcp_event_wrap,
@@ -882,149 +883,147 @@ int io_listen_pipe(uv_loop_t *loop, uv_pipe_t *handle, int fd)
        return 0;
 }
 
-/* TODO: xdp */
-//#if ENABLE_XDP
-//static void xdp_rx(uv_poll_t* handle, int status, int events)
-//{
-//     const int XDP_RX_BATCH_SIZE = 64;
-//     if (status < 0) {
-//             kr_log_error(XDP, "poll status %d: %s\n", status, uv_strerror(status));
-//             return;
-//     }
-//     if (events != UV_READABLE) {
-//             kr_log_error(XDP, "poll unexpected events: %d\n", events);
-//             return;
-//     }
-//
-//     xdp_handle_data_t *xhd = handle->data;
-//     kr_require(xhd && xhd->session && xhd->socket);
-//     uint32_t rcvd;
-//     knot_xdp_msg_t msgs[XDP_RX_BATCH_SIZE];
-//     int ret = knot_xdp_recv(xhd->socket, msgs, XDP_RX_BATCH_SIZE, &rcvd
-//                     #if KNOT_VERSION_HEX >= 0x030100
-//                     , NULL
-//                     #endif
-//                     );
-//
-//     if (kr_fails_assert(ret == KNOT_EOK)) {
-//             /* ATM other error codes can only be returned when called incorrectly */
-//             kr_log_error(XDP, "knot_xdp_recv(): %d, %s\n", ret, knot_strerror(ret));
-//             return;
-//     }
-//     kr_log_debug(XDP, "poll triggered, processing a batch of %d packets\n", (int)rcvd);
-//     kr_require(rcvd <= XDP_RX_BATCH_SIZE);
-//     for (int i = 0; i < rcvd; ++i) {
-//             const knot_xdp_msg_t *msg = &msgs[i];
-//             kr_require(msg->payload.iov_len <= KNOT_WIRE_MAX_PKTSIZE);
-//             knot_pkt_t *kpkt = knot_pkt_new(msg->payload.iov_base, msg->payload.iov_len,
-//                                             &the_worker->pkt_pool);
-//             if (kpkt == NULL) {
-//                     ret = kr_error(ENOMEM);
-//             } else {
-//                     struct io_comm_data comm = {
-//                             .src_addr = (const struct sockaddr *)&msg->ip_from,
-//                             .comm_addr = (const struct sockaddr *)&msg->ip_from,
-//                             .dst_addr = (const struct sockaddr *)&msg->ip_to
-//                     };
-//                     ret = worker_submit(xhd->session, &comm,
-//                                     msg->eth_from, msg->eth_to, kpkt);
-//             }
-//             if (ret)
-//                     kr_log_debug(XDP, "worker_submit() == %d: %s\n", ret, kr_strerror(ret));
-//             mp_flush(the_worker->pkt_pool.ctx);
-//     }
-//     knot_xdp_recv_finish(xhd->socket, msgs, rcvd);
-//}
-///// Warn if the XDP program is running in emulated mode (XDP_SKB)
-//static void xdp_warn_mode(const char *ifname)
-//{
-//     if (kr_fails_assert(ifname))
-//             return;
-//
-//     const unsigned if_index = if_nametoindex(ifname);
-//     if (!if_index) {
-//             kr_log_warning(XDP, "warning: interface %s, unexpected error when converting its name: %s\n",
-//                             ifname, strerror(errno));
-//             return;
-//     }
-//
-//     const knot_xdp_mode_t mode = knot_eth_xdp_mode(if_index);
-//     switch (mode) {
-//     case KNOT_XDP_MODE_FULL:
-//             return;
-//     case KNOT_XDP_MODE_EMUL:
-//             kr_log_warning(XDP, "warning: interface %s running only with XDP emulation\n",
-//                             ifname);
-//             return;
-//     case KNOT_XDP_MODE_NONE: // enum warnings from compiler
-//             break;
-//     }
-//     kr_log_warning(XDP, "warning: interface %s running in unexpected XDP mode %d\n",
-//                     ifname, (int)mode);
-//}
-//int io_listen_xdp(uv_loop_t *loop, struct endpoint *ep, const char *ifname)
-//{
-//     if (!ep || !ep->handle) {
-//             return kr_error(EINVAL);
-//     }
-//
-//     // RLIMIT_MEMLOCK often needs raising when operating on BPF
-//     static int ret_limit = 1;
-//     if (ret_limit == 1) {
-//             struct rlimit no_limit = { RLIM_INFINITY, RLIM_INFINITY };
-//             ret_limit = setrlimit(RLIMIT_MEMLOCK, &no_limit)
-//                     ? kr_error(errno) : 0;
-//     }
-//     if (ret_limit) return ret_limit;
-//
-//     xdp_handle_data_t *xhd = malloc(sizeof(*xhd));
-//     if (!xhd) return kr_error(ENOMEM);
-//
-//     xhd->socket = NULL; // needed for some reason
-//
-//     // This call is a libknot version hell, unfortunately.
-//     int ret = knot_xdp_init(&xhd->socket, ifname, ep->nic_queue,
-//             #if KNOT_VERSION_HEX < 0x030100
-//                     ep->port ? ep->port : KNOT_XDP_LISTEN_PORT_ALL,
-//                     KNOT_XDP_LOAD_BPF_MAYBE
-//             #elif KNOT_VERSION_HEX < 0x030200
-//                     ep->port ? ep->port : (KNOT_XDP_LISTEN_PORT_PASS | 0),
-//                     KNOT_XDP_LOAD_BPF_MAYBE
-//             #else
-//                     KNOT_XDP_FILTER_UDP | (ep->port ? 0 : KNOT_XDP_FILTER_PASS),
-//                     ep->port, 0/*quic_port*/,
-//                     KNOT_XDP_LOAD_BPF_MAYBE,
-//                     NULL/*xdp_config*/
-//             #endif
-//             );
-//
-//     if (!ret) xdp_warn_mode(ifname);
-//
-//     if (!ret) ret = uv_idle_init(loop, &xhd->tx_waker);
-//     if (ret || kr_fails_assert(xhd->socket)) {
-//             free(xhd);
-//             return ret == 0 ? kr_error(EINVAL) : kr_error(ret);
-//     }
-//     xhd->tx_waker.data = xhd->socket;
-//
-//     ep->fd = knot_xdp_socket_fd(xhd->socket); // probably not useful
-//     ret = uv_poll_init(loop, (uv_poll_t *)ep->handle, ep->fd);
-//     if (ret) {
-//             knot_xdp_deinit(xhd->socket);
-//             free(xhd);
-//             return kr_error(ret);
-//     }
-//
-//     // beware: this sets poll_handle->data
-//     xhd->session = session_new(ep->handle, false, false);
-//     kr_require(!session_flags(xhd->session)->outgoing);
-//     session_get_sockname(xhd->session)->sa_family = AF_XDP; // to have something in there
-//
-//     ep->handle->data = xhd;
-//     ret = uv_poll_start((uv_poll_t *)ep->handle, UV_READABLE, xdp_rx);
-//     return ret;
-//}
-//#endif
+#if ENABLE_XDP
+static void xdp_rx(uv_poll_t* handle, int status, int events)
+{
+       const int XDP_RX_BATCH_SIZE = 64;
+       if (status < 0) {
+               kr_log_error(XDP, "poll status %d: %s\n", status, uv_strerror(status));
+               return;
+       }
+       if (events != UV_READABLE) {
+               kr_log_error(XDP, "poll unexpected events: %d\n", events);
+               return;
+       }
+
+       xdp_handle_data_t *xhd = handle->data;
+       kr_require(xhd && xhd->session && xhd->socket);
+       uint32_t rcvd;
+       knot_xdp_msg_t msgs[XDP_RX_BATCH_SIZE];
+       int ret = knot_xdp_recv(xhd->socket, msgs, XDP_RX_BATCH_SIZE, &rcvd
+                       #if KNOT_VERSION_HEX >= 0x030100
+                       , NULL
+                       #endif
+                       );
+
+       if (kr_fails_assert(ret == KNOT_EOK)) {
+               /* ATM other error codes can only be returned when called incorrectly */
+               kr_log_error(XDP, "knot_xdp_recv(): %d, %s\n", ret, knot_strerror(ret));
+               return;
+       }
+       kr_log_debug(XDP, "poll triggered, processing a batch of %d packets\n", (int)rcvd);
+       kr_require(rcvd <= XDP_RX_BATCH_SIZE);
+       for (int i = 0; i < rcvd; ++i) {
+               knot_xdp_msg_t *msg = &msgs[i];
+               kr_require(msg->payload.iov_len <= KNOT_WIRE_MAX_PKTSIZE);
+               struct comm_info comm = {
+                       .src_addr = (const struct sockaddr *)&msg->ip_from,
+                       .comm_addr = (const struct sockaddr *)&msg->ip_from,
+                       .dst_addr = (const struct sockaddr *)&msg->ip_to,
+                       .xdp = true
+               };
+               memcpy(comm.eth_from, msg->eth_from, sizeof(comm.eth_from));
+               memcpy(comm.eth_to, msg->eth_to, sizeof(comm.eth_to));
+               session2_unwrap(xhd->session,
+                               protolayer_buffer(msg->payload.iov_base, msg->payload.iov_len),
+                               &comm, NULL, NULL);
+               if (ret)
+                       kr_log_debug(XDP, "worker_submit() == %d: %s\n", ret, kr_strerror(ret));
+               mp_flush(the_worker->pkt_pool.ctx);
+       }
+       knot_xdp_recv_finish(xhd->socket, msgs, rcvd);
+}
+/// Warn if the XDP program is running in emulated mode (XDP_SKB)
+static void xdp_warn_mode(const char *ifname)
+{
+       if (kr_fails_assert(ifname))
+               return;
+
+       const unsigned if_index = if_nametoindex(ifname);
+       if (!if_index) {
+               kr_log_warning(XDP, "warning: interface %s, unexpected error when converting its name: %s\n",
+                               ifname, strerror(errno));
+               return;
+       }
+
+       const knot_xdp_mode_t mode = knot_eth_xdp_mode(if_index);
+       switch (mode) {
+       case KNOT_XDP_MODE_FULL:
+               return;
+       case KNOT_XDP_MODE_EMUL:
+               kr_log_warning(XDP, "warning: interface %s running only with XDP emulation\n",
+                               ifname);
+               return;
+       case KNOT_XDP_MODE_NONE: // enum warnings from compiler
+               break;
+       }
+       kr_log_warning(XDP, "warning: interface %s running in unexpected XDP mode %d\n",
+                       ifname, (int)mode);
+}
+int io_listen_xdp(uv_loop_t *loop, struct endpoint *ep, const char *ifname)
+{
+       if (!ep || !ep->handle) {
+               return kr_error(EINVAL);
+       }
+
+       // RLIMIT_MEMLOCK often needs raising when operating on BPF
+       static int ret_limit = 1;
+       if (ret_limit == 1) {
+               struct rlimit no_limit = { RLIM_INFINITY, RLIM_INFINITY };
+               ret_limit = setrlimit(RLIMIT_MEMLOCK, &no_limit)
+                       ? kr_error(errno) : 0;
+       }
+       if (ret_limit) return ret_limit;
+
+       xdp_handle_data_t *xhd = malloc(sizeof(*xhd));
+       if (!xhd) return kr_error(ENOMEM);
+
+       xhd->socket = NULL; // needed for some reason
+       queue_init(xhd->tx_waker_queue);
+
+       // This call is a libknot version hell, unfortunately.
+       int ret = knot_xdp_init(&xhd->socket, ifname, ep->nic_queue,
+               #if KNOT_VERSION_HEX < 0x030100
+                       ep->port ? ep->port : KNOT_XDP_LISTEN_PORT_ALL,
+                       KNOT_XDP_LOAD_BPF_MAYBE
+               #elif KNOT_VERSION_HEX < 0x030200
+                       ep->port ? ep->port : (KNOT_XDP_LISTEN_PORT_PASS | 0),
+                       KNOT_XDP_LOAD_BPF_MAYBE
+               #else
+                       KNOT_XDP_FILTER_UDP | (ep->port ? 0 : KNOT_XDP_FILTER_PASS),
+                       ep->port, 0/*quic_port*/,
+                       KNOT_XDP_LOAD_BPF_MAYBE,
+                       NULL/*xdp_config*/
+               #endif
+               );
+
+       if (!ret) xdp_warn_mode(ifname);
+
+       if (!ret) ret = uv_idle_init(loop, &xhd->tx_waker);
+       if (ret || kr_fails_assert(xhd->socket)) {
+               free(xhd);
+               return ret == 0 ? kr_error(EINVAL) : kr_error(ret);
+       }
+       xhd->tx_waker.data = xhd;
+
+       ep->fd = knot_xdp_socket_fd(xhd->socket); // probably not useful
+       ret = uv_poll_init(loop, (uv_poll_t *)ep->handle, ep->fd);
+       if (ret) {
+               knot_xdp_deinit(xhd->socket);
+               free(xhd);
+               return kr_error(ret);
+       }
+
+       xhd->session = session2_new_io(ep->handle, PROTOLAYER_GRP_DOUDP,
+                       NULL, 0, false);
+       kr_require(xhd->session);
+       session2_get_sockname(xhd->session)->sa_family = AF_XDP; // to have something in there
+
+       ep->handle->data = xhd;
+       ret = uv_poll_start((uv_poll_t *)ep->handle, UV_READABLE, xdp_rx);
+       return ret;
+}
+#endif
 
 int io_create(uv_loop_t *loop, uv_handle_t *handle, int type, unsigned family,
                enum protolayer_grp grp,
@@ -1064,6 +1063,7 @@ static void io_deinit(uv_handle_t *handle)
                uv_close((uv_handle_t *)&xhd->tx_waker, NULL);
                session2_free(xhd->session);
                knot_xdp_deinit(xhd->socket);
+               queue_deinit(xhd->tx_waker_queue);
                free(xhd);
        #else
                kr_assert(false);
index 232098de4ec25fde0ce88875512729607cbf54ed..6733cd954a271a83a06b7f57e9609869efefb422 100644 (file)
@@ -60,5 +60,6 @@ typedef struct {
        struct knot_xdp_socket *socket;
        struct session2 *session;
        uv_idle_t tx_waker;
+       queue_t(void *) tx_waker_queue;
 } xdp_handle_data_t;
 
index 17c2bf95cf0cc371233092d6ae1c987ef86f6beb..504a2545dda1bfe1bf616f4c710eebdefc2e191a 100644 (file)
@@ -417,14 +417,14 @@ static int open_endpoint(const char *addr_str,
        }
 
        if (is_xdp) {
-//     #if ENABLE_XDP
-//             uv_poll_t *ep_handle = malloc(sizeof(uv_poll_t));
-//             ep->handle = (uv_handle_t *)ep_handle;
-//             ret = !ep->handle ? ENOMEM
-//                     : io_listen_xdp(the_network->loop, ep, addr_str);
-//     #else
+       #if ENABLE_XDP
+               uv_poll_t *ep_handle = malloc(sizeof(uv_poll_t));
+               ep->handle = (uv_handle_t *)ep_handle;
+               ret = !ep->handle ? ENOMEM
+                       : io_listen_xdp(the_network->loop, ep, addr_str);
+       #else
                ret = ESOCKTNOSUPPORT;
-//     #endif
+       #endif
                goto finish_ret;
        } /* else */
 
index 903c992c76c2fbd03373c5cd5edeb87edbcbfcc6..d9b5ba159fd202e1d2473f092ed89a9788d981b1 100644 (file)
@@ -2,9 +2,15 @@
  *  SPDX-License-Identifier: GPL-3.0-or-later
  */
 
+#include "kresconfig.h"
+
 #include <ucw/lib.h>
 #include <sys/socket.h>
 
+#if ENABLE_XDP
+       #include <libknot/xdp/xdp.h>
+#endif
+
 #include "lib/log.h"
 #include "lib/utils.h"
 #include "daemon/io.h"
@@ -101,11 +107,11 @@ const char *protolayer_payload_names[PROTOLAYER_PAYLOAD_COUNT] = {
 /* Forward decls. */
 static int session2_transport_pushv(struct session2 *s,
                                     struct iovec *iov, int iovcnt,
-                                    const void *target,
+                                    const struct comm_info *comm,
                                     protolayer_finished_cb cb, void *baton);
 static inline int session2_transport_push(struct session2 *s,
                                           char *buf, size_t buf_len,
-                                          const void *target,
+                                          const struct comm_info *comm,
                                           protolayer_finished_cb cb, void *baton);
 static int session2_transport_event(struct session2 *s,
                                     enum protolayer_event_type event,
@@ -306,7 +312,7 @@ static int protolayer_iter_ctx_finish(struct protolayer_iter_ctx *ctx, int ret)
                                ctx->layer_ix, layer_name_ctx(ctx), ctx->status);
 
        if (ctx->finished_cb)
-               ctx->finished_cb(ret, session, ctx->finished_cb_target,
+               ctx->finished_cb(ret, session, &ctx->comm,
                                ctx->finished_cb_baton);
 
        free(ctx);
@@ -314,7 +320,7 @@ static int protolayer_iter_ctx_finish(struct protolayer_iter_ctx *ctx, int ret)
        return ret;
 }
 
-static void protolayer_push_finished(int status, struct session2 *s, const void *target, void *baton)
+static void protolayer_push_finished(int status, struct session2 *s, const struct comm_info *comm, void *baton)
 {
        struct protolayer_iter_ctx *ctx = baton;
        ctx->status = status;
@@ -338,11 +344,11 @@ static int protolayer_push(struct protolayer_iter_ctx *ctx)
        if (ctx->payload.type == PROTOLAYER_PAYLOAD_BUFFER) {
                session2_transport_push(session,
                                ctx->payload.buffer.buf, ctx->payload.buffer.len,
-                               ctx->target, protolayer_push_finished, ctx);
+                               &ctx->comm, protolayer_push_finished, ctx);
        } else if (ctx->payload.type == PROTOLAYER_PAYLOAD_IOVEC) {
                session2_transport_pushv(session,
                                ctx->payload.iovec.iov, ctx->payload.iovec.cnt,
-                               ctx->target, protolayer_push_finished, ctx);
+                               &ctx->comm, protolayer_push_finished, ctx);
        } else {
                kr_assert(false && "Invalid payload type");
                return kr_error(EINVAL);
@@ -429,7 +435,7 @@ static int protolayer_step(struct protolayer_iter_ctx *ctx)
 static int protolayer_manager_submit(
                struct protolayer_manager *manager,
                enum protolayer_direction direction, size_t layer_ix,
-               struct protolayer_payload payload, const void *target,
+               struct protolayer_payload payload, const struct comm_info *comm,
                protolayer_finished_cb cb, void *baton)
 {
        struct protolayer_iter_ctx *ctx = malloc(manager->cb_ctx_size);
@@ -444,13 +450,11 @@ static int protolayer_manager_submit(
 
        *ctx = (struct protolayer_iter_ctx) {
                .payload = payload,
-               .target = target,
-               .comm = &manager->session->comm,
+               .comm = (comm) ? *comm : manager->session->comm,
                .direction = direction,
                .layer_ix = layer_ix,
                .manager = manager,
                .finished_cb = cb,
-               .finished_cb_target = target,
                .finished_cb_baton = baton
        };
 
@@ -711,9 +715,8 @@ struct session2 *session2_new(enum session2_transport_type transport_type,
        return s;
 }
 
-static void session2_timer_on_close(uv_handle_t *handle)
+void session2_free(struct session2 *s)
 {
-       struct session2 *s = handle->data;
        protolayer_manager_free(s->layers);
        wire_buf_deinit(&s->wire_buf);
        mm_ctx_delete(&s->pool);
@@ -722,11 +725,6 @@ static void session2_timer_on_close(uv_handle_t *handle)
        free(s);
 }
 
-void session2_free(struct session2 *s)
-{
-       uv_close((uv_handle_t *)&s->timer, session2_timer_on_close);
-}
-
 int session2_start_read(struct session2 *session)
 {
        if (session->transport.type == SESSION2_TRANSPORT_IO)
@@ -1010,40 +1008,44 @@ void session2_waitinglist_finalize(struct session2 *session, int status)
 }
 
 int session2_unwrap(struct session2 *s, struct protolayer_payload payload,
-                    const void *target, protolayer_finished_cb cb, void *baton)
+                    const struct comm_info *comm, protolayer_finished_cb cb,
+                    void *baton)
 {
        return protolayer_manager_submit(s->layers, PROTOLAYER_UNWRAP, 0,
-                       payload, target, cb, baton);
+                       payload, comm, cb, baton);
 }
 
 int session2_unwrap_after(struct session2 *s, enum protolayer_protocol protocol,
-                         struct protolayer_payload payload, const void *target,
-                         protolayer_finished_cb cb, void *baton)
+                          struct protolayer_payload payload,
+                          const struct comm_info *comm,
+                          protolayer_finished_cb cb, void *baton)
 {
        ssize_t layer_ix = protolayer_manager_get_protocol(s->layers, protocol) + 1;
        if (layer_ix < 0)
                return layer_ix;
        return protolayer_manager_submit(s->layers, PROTOLAYER_UNWRAP, layer_ix,
-                       payload, target, cb, baton);
+                       payload, comm, cb, baton);
 }
 
 int session2_wrap(struct session2 *s, struct protolayer_payload payload,
-                  const void *target, protolayer_finished_cb cb, void *baton)
+                  const struct comm_info *comm, protolayer_finished_cb cb,
+                  void *baton)
 {
        return protolayer_manager_submit(s->layers, PROTOLAYER_WRAP,
                        s->layers->num_layers - 1,
-                       payload, target, cb, baton);
+                       payload, comm, cb, baton);
 }
 
 int session2_wrap_after(struct session2 *s, enum protolayer_protocol protocol,
-                       struct protolayer_payload payload, const void *target,
-                       protolayer_finished_cb cb, void *baton)
+                        struct protolayer_payload payload,
+                        const struct comm_info *comm,
+                        protolayer_finished_cb cb, void *baton)
 {
        ssize_t layer_ix = protolayer_manager_get_protocol(s->layers, protocol) - 1;
        if (layer_ix < 0)
                return layer_ix;
        return protolayer_manager_submit(s->layers, PROTOLAYER_WRAP, layer_ix,
-                       payload, target, cb, baton);
+                       payload, comm, cb, baton);
 }
 
 static void session2_event_wrap(struct session2 *s, enum protolayer_event_type event, void *baton)
@@ -1125,7 +1127,7 @@ void session2_init_request(struct session2 *s, struct kr_request *req)
 struct session2_pushv_ctx {
        struct session2 *session;
        protolayer_finished_cb cb;
-       const void *target;
+       const struct comm_info *comm;
        void *baton;
 
        char *buf;
@@ -1134,12 +1136,12 @@ struct session2_pushv_ctx {
 
 static void session2_transport_parent_pushv_finished(int status,
                                                      struct session2 *session,
-                                                     const void *target,
+                                                     const struct comm_info *comm,
                                                      void *baton)
 {
        struct session2_pushv_ctx *ctx = baton;
        if (ctx->cb)
-               ctx->cb(status, ctx->session, target, ctx->baton);
+               ctx->cb(status, ctx->session, comm, ctx->baton);
        free(ctx->buf);
        free(ctx);
 }
@@ -1148,7 +1150,7 @@ static void session2_transport_udp_queue_pushv_finished(int status, void *baton)
 {
        struct session2_pushv_ctx *ctx = baton;
        if (ctx->cb)
-               ctx->cb(status, ctx->session, ctx->target, ctx->baton);
+               ctx->cb(status, ctx->session, ctx->comm, ctx->baton);
        free(ctx->buf);
        free(ctx);
 }
@@ -1157,7 +1159,7 @@ static void session2_transport_udp_pushv_finished(uv_udp_send_t *req, int status
 {
        struct session2_pushv_ctx *ctx = req->data;
        if (ctx->cb)
-               ctx->cb(status, ctx->session, ctx->target, ctx->baton);
+               ctx->cb(status, ctx->session, ctx->comm, ctx->baton);
        free(ctx->buf);
        free(ctx);
        free(req);
@@ -1167,15 +1169,39 @@ static void session2_transport_stream_pushv_finished(uv_write_t *req, int status
 {
        struct session2_pushv_ctx *ctx = req->data;
        if (ctx->cb)
-               ctx->cb(status, ctx->session, ctx->target, ctx->baton);
+               ctx->cb(status, ctx->session, ctx->comm, ctx->baton);
        free(ctx->buf);
        free(ctx);
        free(req);
 }
 
+#if ENABLE_XDP
+static void xdp_tx_waker(uv_idle_t *handle)
+{
+       xdp_handle_data_t *xhd = handle->data;
+       int ret = knot_xdp_send_finish(xhd->socket);
+       if (ret != KNOT_EAGAIN && ret != KNOT_EOK)
+               kr_log_error(XDP, "check: ret = %d, %s\n", ret, knot_strerror(ret));
+       /* Apparently some drivers need many explicit wake-up calls
+        * even if we push no additional packets (in case they accumulated a lot) */
+       if (ret != KNOT_EAGAIN)
+               uv_idle_stop(handle);
+       knot_xdp_send_prepare(xhd->socket);
+       /* LATER(opt.): it _might_ be better for performance to do these two steps
+        * at different points in time */
+       while (queue_len(xhd->tx_waker_queue)) {
+               struct session2_pushv_ctx *ctx = queue_head(xhd->tx_waker_queue);
+               if (ctx->cb)
+                       ctx->cb(kr_ok(), ctx->session, ctx->comm, ctx->baton);
+               free(ctx);
+               queue_pop(xhd->tx_waker_queue);
+       }
+}
+#endif
+
 static int session2_transport_pushv(struct session2 *s,
                                     struct iovec *iov, int iovcnt,
-                                    const void *target,
+                                    const struct comm_info *comm,
                                     protolayer_finished_cb cb, void *baton)
 {
        if (kr_fails_assert(s))
@@ -1187,7 +1213,7 @@ static int session2_transport_pushv(struct session2 *s,
                .session = s,
                .cb = cb,
                .baton = baton,
-               .target = target
+               .comm = comm
        };
 
        switch (s->transport.type) {
@@ -1195,12 +1221,11 @@ static int session2_transport_pushv(struct session2 *s,
                uv_handle_t *handle = s->transport.io.handle;
                if (kr_fails_assert(handle)) {
                        if (cb)
-                               cb(kr_error(EINVAL), s, target, baton);
+                               cb(kr_error(EINVAL), s, comm, baton);
                        free(ctx);
                        return kr_error(EINVAL);
                }
 
-               /* TODO: XDP */
                if (handle->type == UV_UDP) {
                        if (ENABLE_SENDMMSG && !s->outgoing) {
                                int fd;
@@ -1212,7 +1237,7 @@ static int session2_transport_pushv(struct session2 *s,
                                if (kr_fails_assert(iovcnt == 1))
                                        return kr_error(EINVAL);
 
-                               udp_queue_push(fd, target, iov->iov_base, iov->iov_len,
+                               udp_queue_push(fd, comm->comm_addr, iov->iov_base, iov->iov_len,
                                                session2_transport_udp_queue_pushv_finished,
                                                ctx);
                                return kr_ok();
@@ -1220,11 +1245,11 @@ static int session2_transport_pushv(struct session2 *s,
                                uv_udp_send_t *req = malloc(sizeof(*req));
                                req->data = ctx;
                                int ret = uv_udp_send(req, (uv_udp_t *)handle,
-                                               (uv_buf_t *)iov, iovcnt, target,
+                                               (uv_buf_t *)iov, iovcnt, comm->comm_addr,
                                                session2_transport_udp_pushv_finished);
                                if (ret) {
                                        if (cb)
-                                               cb(ret, s, target, baton);
+                                               cb(ret, s, comm, baton);
                                        free(req);
                                        free(ctx);
                                }
@@ -1237,15 +1262,50 @@ static int session2_transport_pushv(struct session2 *s,
                                        session2_transport_stream_pushv_finished);
                        if (ret) {
                                if (cb)
-                                       cb(ret, s, target, baton);
+                                       cb(ret, s, comm, baton);
                                free(req);
                                free(ctx);
                        }
                        return ret;
+#if ENABLE_XDP
+               } else if (handle->type == UV_POLL) {
+                       xdp_handle_data_t *xhd = handle->data;
+                       if (kr_fails_assert(xhd && xhd->socket))
+                               return kr_error(EIO);
+
+                       /* TODO: support multiple iovecs properly? */
+                       if (kr_fails_assert(iovcnt == 1))
+                               return kr_error(EINVAL);
+
+                       knot_xdp_msg_t msg;
+#if KNOT_VERSION_HEX >= 0x030100
+                       /* We don't have a nice way of preserving the _msg_t from frame allocation,
+                        * so we manually redo all other parts of knot_xdp_send_alloc() */
+                       memset(&msg, 0, sizeof(msg));
+                       bool ipv6 = comm->comm_addr->sa_family == AF_INET6;
+                       msg.flags = ipv6 ? KNOT_XDP_MSG_IPV6 : 0;
+                       memcpy(msg.eth_from, comm->eth_from, sizeof(comm->eth_from));
+                       memcpy(msg.eth_to,   comm->eth_to,   sizeof(comm->eth_to));
+#endif
+                       const struct sockaddr *ip_from = comm->dst_addr;
+                       const struct sockaddr *ip_to   = comm->comm_addr;
+                       memcpy(&msg.ip_from, ip_from, kr_sockaddr_len(ip_from));
+                       memcpy(&msg.ip_to,   ip_to,   kr_sockaddr_len(ip_to));
+                       msg.payload = *iov;
+
+                       uint32_t sent;
+                       int ret = knot_xdp_send(xhd->socket, &msg, 1, &sent);
+
+                       queue_push(xhd->tx_waker_queue, ctx);
+                       uv_idle_start(&xhd->tx_waker, xdp_tx_waker);
+                       kr_log_debug(XDP, "pushed a packet, ret = %d\n", ret);
+
+                       return kr_ok();
+#endif
                } else {
                        kr_assert(false && "Unsupported handle");
                        if (cb)
-                               cb(kr_error(EINVAL), s, target, baton);
+                               cb(kr_error(EINVAL), s, comm, baton);
                        free(ctx);
                        return kr_error(EINVAL);
                }
@@ -1257,7 +1317,7 @@ static int session2_transport_pushv(struct session2 *s,
                        return kr_error(EINVAL);
                }
                int ret = session2_wrap(parent, protolayer_iovec(iov, iovcnt),
-                               target, session2_transport_parent_pushv_finished,
+                               comm, session2_transport_parent_pushv_finished,
                                ctx);
                return (ret < 0) ? ret : kr_ok();
 
@@ -1276,18 +1336,18 @@ struct push_ctx {
 
 static void session2_transport_single_push_finished(int status,
                                                     struct session2 *s,
-                                                    const void *target,
+                                                    const struct comm_info *comm,
                                                     void *baton)
 {
        struct push_ctx *ctx = baton;
        if (ctx->cb)
-               ctx->cb(status, s, target, ctx->baton);
+               ctx->cb(status, s, comm, ctx->baton);
        free(ctx);
 }
 
 static inline int session2_transport_push(struct session2 *s,
                                           char *buf, size_t buf_len,
-                                          const void *target,
+                                          const struct comm_info *comm,
                                           protolayer_finished_cb cb, void *baton)
 {
        struct push_ctx *ctx = malloc(sizeof(*ctx));
@@ -1301,7 +1361,7 @@ static inline int session2_transport_push(struct session2 *s,
                .baton = baton
        };
 
-       return session2_transport_pushv(s, &ctx->iov, 1, target,
+       return session2_transport_pushv(s, &ctx->iov, 1, comm,
                        session2_transport_single_push_finished, ctx);
 }
 
@@ -1315,7 +1375,12 @@ static void on_session2_handle_close(uv_handle_t *handle)
 
 static int session2_handle_close(struct session2 *s, uv_handle_t *handle)
 {
+       if (kr_fails_assert(s->transport.type == SESSION2_TRANSPORT_IO
+                               && s->transport.io.handle == handle))
+               return kr_error(EINVAL);
+
        io_stop_read(handle);
+       uv_close((uv_handle_t *)&s->timer, NULL);
        uv_close(handle, on_session2_handle_close);
        return kr_ok();
 }
index f7f6bb21e846b9b3dd6367b8cf4c8720cd4fc5e0..5b63a27c73dc565e956172e518ace5493b2f7518 100644 (file)
@@ -46,6 +46,9 @@
 struct session2;
 struct protolayer_iter_ctx;
 
+/** Type of MAC addresses. */
+typedef uint8_t ethaddr_t[6];
+
 /** Information about the transport - addresses and proxy. */
 struct comm_info {
        /** The original address the data came from. May be that of a proxied
@@ -67,6 +70,15 @@ struct comm_info {
        /** Data parsed from a PROXY header. May be `NULL` if the communication
         * did not come through a proxy, or if the PROXYv2 protocol was not used. */
        const struct proxy_result *proxy;
+
+       /** Pointer to protolayer-specific data, e.g. a key to decide, which
+        * sub-session to use. */
+       void *target;
+
+       /* XDP data */
+       ethaddr_t eth_from;
+       ethaddr_t eth_to;
+       bool xdp:1;
 };
 
 /** Protocol layer types - the individual implementations of protocol layers.
@@ -191,7 +203,7 @@ enum protolayer_ret {
  * function.
  * `baton` is the `baton` parameter passed to the `session2_(un)wrap` function. */
 typedef void (*protolayer_finished_cb)(int status, struct session2 *session,
-                                       const void *target, void *baton);
+                                       const struct comm_info *comm, void *baton);
 
 
 #define PROTOLAYER_EVENT_MAP(XX) \
@@ -279,17 +291,12 @@ struct protolayer_iter_ctx {
 /* read-write: */
        /** The payload */
        struct protolayer_payload payload;
-       /** Transport information (e.g. UDP sender address). May be `NULL`. */
-       const void *target;
        /** Communication information. Typically written into by one of the
-        * first layers facilitating transport protocol processing.
-        * Points to session-wide comm info by default, may be changed
-        * by a layer to point elsewhere. */
-       struct comm_info *comm;
+        * first layers facilitating transport protocol processing. */
+       struct comm_info comm;
 
 /* callback for when the layer iteration has ended - read-only: */
        protolayer_finished_cb finished_cb;
-       const void *finished_cb_target;
        void *finished_cb_baton;
 
 /* internal information for the manager - private: */
@@ -793,7 +800,8 @@ static inline struct session2 *session2_new_child(struct session2 *parent,
        return s;
 }
 
-/** De-allocates the session. */
+/** De-allocates the session. Must only be called once the underlying IO handle
+ * and timer are already closed, otherwise may leak resources. */
 void session2_free(struct session2 *s);
 
 /** Start reading from the underlying transport. */
@@ -876,9 +884,11 @@ static inline bool session2_is_empty(const struct session2 *session)
 }
 
 /** Sends the specified `payload` to be processed in the `_UNWRAP` direction by
- * the session's protocol layers. The `target` parameter may contain a pointer
- * to transport-specific data, e.g. for UDP, it shall contain a pointer to the
- * sender's `struct sockaddr_*`.
+ * the session's protocol layers.
+ *
+ * The `comm` parameter may contain a pointer to comm data, e.g. for UDP, that
+ * comm data shall contain a pointer to the sender's `struct sockaddr_*`. If
+ * `comm` is `NULL`, session-wide data shall be used.
  *
  * Note that the payload data may be modified by any of the layers, to avoid
  * making copies. Once the payload is passed to this function, the content of
@@ -891,7 +901,8 @@ static inline bool session2_is_empty(const struct session2 *session)
  * Returns one of `enum protolayer_ret` or a negative number
  * indicating an error. */
 int session2_unwrap(struct session2 *s, struct protolayer_payload payload,
-                    const void *target, protolayer_finished_cb cb, void *baton);
+                    const struct comm_info *comm, protolayer_finished_cb cb,
+                   void *baton);
 
 /** Same as `session2_unwrap`, but looks up the specified `protocol` in the
  * session's assigned protocol group and sends the `payload` to the layer that
@@ -900,8 +911,9 @@ int session2_unwrap(struct session2 *s, struct protolayer_payload payload,
  * Layers may use this to generate their own data to send in the sequence, e.g.
  * for protocol-specific ceremony. */
 int session2_unwrap_after(struct session2 *s, enum protolayer_protocol protocol,
-                         struct protolayer_payload payload, const void *target,
-                         protolayer_finished_cb cb, void *baton);
+                          struct protolayer_payload payload,
+                          const struct comm_info *comm,
+                          protolayer_finished_cb cb, void *baton);
 
 /** Sends the specified `payload` to be processed in the `_WRAP` direction by
  * the session's protocol layers. The `target` parameter may contain a pointer
@@ -918,7 +930,8 @@ int session2_unwrap_after(struct session2 *s, enum protolayer_protocol protocol,
  * Returns one of `enum protolayer_ret` or a negative number
  * indicating an error. */
 int session2_wrap(struct session2 *s, struct protolayer_payload payload,
-                  const void *target, protolayer_finished_cb cb, void *baton);
+                  const struct comm_info *comm, protolayer_finished_cb cb,
+                  void *baton);
 
 /** Same as `session2_wrap`, but looks up the specified `protocol` in the
  * session's assigned protocol group and sends the `payload` to the layer that
@@ -927,8 +940,9 @@ int session2_wrap(struct session2 *s, struct protolayer_payload payload,
  * Layers may use this to generate their own data to send in the sequence, e.g.
  * for protocol-specific ceremony. */
 int session2_wrap_after(struct session2 *s, enum protolayer_protocol protocol,
-                       struct protolayer_payload payload, const void *target,
-                       protolayer_finished_cb cb, void *baton);
+                        struct protolayer_payload payload,
+                        const struct comm_info *comm,
+                        protolayer_finished_cb cb, void *baton);
 
 /** Sends an event to be synchronously processed by the protocol layers of the
  * specified session. The layers are first iterated through in the `_UNWRAP`
index fc43b0e3c64119b854acabf4f10f157e2e10f40d..b1782c86c12d69b16e3c35d066eab1fad8bef88e 100644 (file)
@@ -200,7 +200,7 @@ struct kres_gnutls_push_ctx {
 };
 
 static void kres_gnutls_push_finished(int status, struct session2 *session,
-                                      const void *target, void *baton)
+                                      const struct comm_info *comm, void *baton)
 {
        struct kres_gnutls_push_ctx *push_ctx = baton;
        struct pl_tls_sess_data *tls = push_ctx->sess_data;
index 2353e9feece1628542824438c42c8653b1886a44..2233fc725e7e6b3d74eeeee4967eebb9c645624d 100644 (file)
@@ -59,8 +59,13 @@ struct request_ctx
                /** Local address.  For AF_XDP we couldn't use session's,
                 * as the address might be different every time. */
                union kr_sockaddr dst_addr;
-               /** MAC addresses - ours [0] and router's [1], in case of AF_XDP socket. */
-               uint8_t eth_addrs[2][6];
+
+               /** Router's MAC address for XDP. */
+               ethaddr_t eth_from;
+               /** Our MAC address for XDP. */
+               ethaddr_t eth_to;
+               /** Whether XDP was used. */
+               bool xdp : 1;
        } source;
 };
 
@@ -101,7 +106,7 @@ static int qr_task_step(struct qr_task *task,
                        const struct sockaddr *packet_source,
                        knot_pkt_t *packet);
 static int qr_task_send(struct qr_task *task, struct session2 *session,
-                       const struct sockaddr *addr, knot_pkt_t *pkt);
+                       const struct comm_info *comm, knot_pkt_t *pkt);
 static int qr_task_finalize(struct qr_task *task, int state);
 static void qr_task_complete(struct qr_task *task);
 static int worker_add_tcp_connected(const struct sockaddr* addr, struct session2 *session);
@@ -241,8 +246,8 @@ static uint8_t *alloc_wire_cb(struct kr_request *req, uint16_t *maxlen)
        *maxlen = MIN(*maxlen, out.payload.iov_len);
 #if KNOT_VERSION_HEX < 0x030100
        /* It's most convenient to fill the MAC addresses at this point. */
-       memcpy(out.eth_from, &ctx->source.eth_addrs[0], 6);
-       memcpy(out.eth_to,   &ctx->source.eth_addrs[1], 6);
+       memcpy(out.eth_from, &ctx->source.eth_from, 6);
+       memcpy(out.eth_to,   &ctx->source.eth_to, 6);
 #endif
        return out.payload.iov_base;
 }
@@ -255,9 +260,11 @@ static void free_wire(const struct request_ctx *ctx)
                return;
        if (likely(ans->wire == NULL)) /* sent most likely */
                return;
+       if (!ctx->source.session)
+               return;
        /* We know it's an AF_XDP socket; otherwise alloc_wire_cb isn't assigned. */
        uv_handle_t *handle = session2_get_handle(ctx->source.session);
-       if (kr_fails_assert(handle->type == UV_POLL))
+       if (!handle || kr_fails_assert(handle->type == UV_POLL))
                return;
        xdp_handle_data_t *xhd = handle->data;
        /* Freeing is done by sending an empty packet (the API won't really send it). */
@@ -297,8 +304,6 @@ static inline bool is_tcp_waiting(struct sockaddr *address) {
  */
 static struct request_ctx *request_create(struct session2 *session,
                                           struct comm_info *comm,
-                                          const uint8_t *eth_from,
-                                          const uint8_t *eth_to,
                                           uint32_t uid)
 {
        knot_mm_t pool = {
@@ -319,19 +324,14 @@ static struct request_ctx *request_create(struct session2 *session,
                return NULL;
        }
        ctx->source.session = session;
-       if (kr_fails_assert(!!eth_to == !!eth_from)) {
-               pool_release(pool.ctx);
-               return NULL;
-       }
-       const bool is_xdp = eth_to != NULL;
-       if (is_xdp) {
+       if (comm && comm->xdp) {
        #if ENABLE_XDP
                if (kr_fails_assert(session)) {
                        pool_release(pool.ctx);
                        return NULL;
                }
-               memcpy(&ctx->source.eth_addrs[0], eth_to,   sizeof(ctx->source.eth_addrs[0]));
-               memcpy(&ctx->source.eth_addrs[1], eth_from, sizeof(ctx->source.eth_addrs[1]));
+               memcpy(ctx->source.eth_to,   comm->eth_to,   sizeof(ctx->source.eth_to));
+               memcpy(ctx->source.eth_from, comm->eth_from, sizeof(ctx->source.eth_from));
                ctx->req.alloc_wire_cb = alloc_wire_cb;
        #else
                kr_assert(!EINVAL);
@@ -344,7 +344,7 @@ static struct request_ctx *request_create(struct session2 *session,
        req->pool = pool;
        req->vars_ref = LUA_NOREF;
        req->uid = uid;
-       req->qsource.comm_flags.xdp = is_xdp;
+       req->qsource.comm_flags.xdp = comm && comm->xdp;
        kr_request_set_extended_error(req, KNOT_EDNS_EDE_NONE, NULL);
        array_init(req->qsource.headers);
        if (session) {
@@ -623,7 +623,7 @@ int qr_task_on_send(struct qr_task *task, struct session2 *s, int status)
 }
 
 static void qr_task_wrap_finished(int status, struct session2 *session,
-                                  const void *target, void *baton)
+                                  const struct comm_info *comm, void *baton)
 {
        struct qr_task *task = baton;
        qr_task_on_send(task, session, status);
@@ -632,15 +632,15 @@ static void qr_task_wrap_finished(int status, struct session2 *session,
 }
 
 static int qr_task_send(struct qr_task *task, struct session2 *session,
-                       const struct sockaddr *addr, knot_pkt_t *pkt)
+                       const struct comm_info *comm, knot_pkt_t *pkt)
 {
        if (!session)
                return qr_task_on_send(task, NULL, kr_error(EIO));
 
        int ret = 0;
 
-       if (addr == NULL)
-               addr = session2_get_peer(session);
+       if (comm == NULL)
+               comm = &session->comm;
 
        if (pkt == NULL)
                pkt = worker_task_get_pktbuf(task);
@@ -670,7 +670,7 @@ static int qr_task_send(struct qr_task *task, struct session2 *session,
        qr_task_ref(task);
        struct protolayer_payload payload = protolayer_buffer((char *)pkt->wire, pkt->size);
        payload.ttl = packet_ttl(pkt);
-       ret = session2_wrap(session, payload, addr, qr_task_wrap_finished, task);
+       ret = session2_wrap(session, payload, comm, qr_task_wrap_finished, task);
 
        if (ret >= 0) {
                session2_touch(session);
@@ -694,12 +694,12 @@ static int qr_task_send(struct qr_task *task, struct session2 *session,
        }
 
        /* Update outgoing query statistics */
-       if (session->outgoing && addr) {
+       if (session->outgoing && comm) {
                session2_event(session, PROTOLAYER_EVENT_STATS_QRY_OUT, NULL);
 
-               if (addr->sa_family == AF_INET6)
+               if (comm->comm_addr->sa_family == AF_INET6)
                        the_worker->stats.ipv6 += 1;
-               else if (addr->sa_family == AF_INET)
+               else if (comm->comm_addr->sa_family == AF_INET)
                        the_worker->stats.ipv4 += 1;
        }
        return ret;
@@ -854,7 +854,10 @@ static int transmit(struct qr_task *task)
        kr_require(addr->sa_family == AF_INET || addr->sa_family == AF_INET6);
        memcpy(peer, addr, kr_sockaddr_len(addr));
 
-       ret = qr_task_send(task, session, (struct sockaddr *)choice, task->pktbuf);
+       struct comm_info out_comm = {
+               .comm_addr = (struct sockaddr *)choice
+       };
+       ret = qr_task_send(task, session, &out_comm, task->pktbuf);
        if (ret) {
                session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL);
                return ret;
@@ -947,62 +950,6 @@ static bool subreq_enqueue(struct qr_task *task)
        return true;
 }
 
-//#if ENABLE_XDP
-//static void xdp_tx_waker(uv_idle_t *handle)
-//{
-//     int ret = knot_xdp_send_finish(handle->data);
-//     if (ret != KNOT_EAGAIN && ret != KNOT_EOK)
-//             kr_log_error(XDP, "check: ret = %d, %s\n", ret, knot_strerror(ret));
-//     /* Apparently some drivers need many explicit wake-up calls
-//      * even if we push no additional packets (in case they accumulated a lot) */
-//     if (ret != KNOT_EAGAIN)
-//             uv_idle_stop(handle);
-//     knot_xdp_send_prepare(handle->data);
-//     /* LATER(opt.): it _might_ be better for performance to do these two steps
-//      * at different points in time */
-//}
-//#endif
-//
-///** Send an answer packet over XDP. */
-//static int xdp_push(struct qr_task *task, const uv_handle_t *src_handle)
-//{
-//#if ENABLE_XDP
-//     struct request_ctx *ctx = task->ctx;
-//     xdp_handle_data_t *xhd = src_handle->data;
-//     if (kr_fails_assert(xhd && xhd->socket && xhd->session == ctx->source.session))
-//             return qr_task_on_send(task, NULL, kr_error(EINVAL));
-//
-//     knot_xdp_msg_t msg;
-//#if KNOT_VERSION_HEX >= 0x030100
-//     /* We don't have a nice way of preserving the _msg_t from frame allocation,
-//      * so we manually redo all other parts of knot_xdp_send_alloc() */
-//     memset(&msg, 0, sizeof(msg));
-//     bool ipv6 = ctx->source.addr.ip.sa_family == AF_INET6;
-//     msg.flags = ipv6 ? KNOT_XDP_MSG_IPV6 : 0;
-//     memcpy(msg.eth_from, &ctx->source.eth_addrs[0], 6);
-//     memcpy(msg.eth_to,   &ctx->source.eth_addrs[1], 6);
-//#endif
-//     const struct sockaddr *ip_from = &ctx->source.dst_addr.ip;
-//     const struct sockaddr *ip_to   = &ctx->source.comm_addr.ip;
-//     memcpy(&msg.ip_from, ip_from, kr_sockaddr_len(ip_from));
-//     memcpy(&msg.ip_to,   ip_to,   kr_sockaddr_len(ip_to));
-//     msg.payload.iov_base = ctx->req.answer->wire;
-//     msg.payload.iov_len  = ctx->req.answer->size;
-//
-//     uint32_t sent;
-//     int ret = knot_xdp_send(xhd->socket, &msg, 1, &sent);
-//     ctx->req.answer->wire = NULL; /* it's been freed */
-//
-//     uv_idle_start(&xhd->tx_waker, xdp_tx_waker);
-//     kr_log_debug(XDP, "pushed a packet, ret = %d\n", ret);
-//
-//     return qr_task_on_send(task, xhd->session, ret);
-//#else
-//     kr_assert(!EINVAL);
-//     return kr_error(EINVAL);
-//#endif
-//}
-
 static int qr_task_finalize(struct qr_task *task, int state)
 {
        kr_require(task && task->leading == false);
@@ -1036,8 +983,17 @@ static int qr_task_finalize(struct qr_task *task, int state)
        qr_task_ref(task);
 
        /* Send back answer */
-       /* TODO: xdp */
-       int ret = qr_task_send(task, source_session, &ctx->source.comm_addr.ip, ctx->req.answer);
+       struct comm_info out_comm = {
+               .src_addr = &ctx->source.addr.ip,
+               .dst_addr = &ctx->source.dst_addr.ip,
+               .comm_addr = &ctx->source.comm_addr.ip,
+               .xdp = ctx->source.xdp
+       };
+       if (ctx->source.xdp) {
+               memcpy(out_comm.eth_from, ctx->source.eth_from, sizeof(out_comm.eth_from));
+               memcpy(out_comm.eth_to,   ctx->source.eth_to,   sizeof(out_comm.eth_to));
+       }
+       int ret = qr_task_send(task, source_session, &out_comm, ctx->req.answer);
 
        if (ret != kr_ok()) {
                (void) qr_task_on_send(task, NULL, kr_error(EIO));
@@ -1357,8 +1313,7 @@ static int qr_task_step(struct qr_task *task,
        }
 }
 
-int worker_submit(struct session2 *session, struct comm_info *comm,
-                  const uint8_t *eth_from, const uint8_t *eth_to, knot_pkt_t *pkt)
+static int worker_submit(struct session2 *session, struct comm_info *comm, knot_pkt_t *pkt)
 {
        if (!session || !pkt)
                return kr_error(EINVAL);
@@ -1395,8 +1350,7 @@ int worker_submit(struct session2 *session, struct comm_info *comm,
        const struct sockaddr *addr = NULL;
        if (!is_outgoing) { /* request from a client */
                struct request_ctx *ctx =
-                       request_create(session, comm, eth_from,
-                                      eth_to, knot_wire_get_id(pkt->wire));
+                       request_create(session, comm, knot_wire_get_id(pkt->wire));
                if (!ctx)
                        return kr_error(ENOMEM);
 
@@ -1574,8 +1528,7 @@ struct qr_task *worker_resolve_start(knot_pkt_t *query, struct kr_qflags options
                return NULL;
 
 
-       struct request_ctx *ctx = request_create(NULL, NULL, NULL, NULL,
-                                                the_worker->next_request_uid);
+       struct request_ctx *ctx = request_create(NULL, NULL, the_worker->next_request_uid);
        if (!ctx)
                return NULL;
 
@@ -1778,7 +1731,7 @@ static enum protolayer_iter_cb_result pl_dns_dgram_unwrap(
                                break;
                        }
 
-                       ret = worker_submit(session, ctx->comm, NULL, NULL, pkt);
+                       ret = worker_submit(session, &ctx->comm, pkt);
                        if (ret)
                                break;
                }
@@ -1792,7 +1745,7 @@ static enum protolayer_iter_cb_result pl_dns_dgram_unwrap(
                if (!pkt)
                        return protolayer_break(ctx, KNOT_EMALF);
 
-               int ret = worker_submit(session, ctx->comm, NULL, NULL, pkt);
+               int ret = worker_submit(session, &ctx->comm, pkt);
                mp_flush(the_worker->pkt_pool.ctx);
                return protolayer_break(ctx, ret);
        } else if (ctx->payload.type == PROTOLAYER_PAYLOAD_WIRE_BUF) {
@@ -1802,7 +1755,7 @@ static enum protolayer_iter_cb_result pl_dns_dgram_unwrap(
                if (!pkt)
                        return protolayer_break(ctx, KNOT_EMALF);
 
-               int ret = worker_submit(session, ctx->comm, NULL, NULL, pkt);
+               int ret = worker_submit(session, &ctx->comm, pkt);
                wire_buf_reset(ctx->payload.wire_buf);
                mp_flush(the_worker->pkt_pool.ctx);
                return protolayer_break(ctx, ret);
@@ -2090,7 +2043,7 @@ static enum protolayer_iter_cb_result pl_dns_stream_unwrap(
                if (stream_sess->single && stream_sess->produced) {
                        if (kr_log_is_debug(WORKER, NULL)) {
                                kr_log_debug(WORKER, "Unexpected extra data from %s\n",
-                                               kr_straddr(ctx->comm->src_addr));
+                                               kr_straddr(ctx->comm.src_addr));
                        }
                        worker_end_tcp(session);
                        status = KNOT_EMALF;
@@ -2103,7 +2056,7 @@ static enum protolayer_iter_cb_result pl_dns_stream_unwrap(
                        goto exit;
                }
 
-               int ret = worker_submit(session, ctx->comm, NULL, NULL, pkt);
+               int ret = worker_submit(session, &ctx->comm, pkt);
                wire_buf_movestart(wb);
                if (ret == kr_ok()) {
                        iters += 1;