]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon: channel udp_queue through protolayers
authorOto Šťáva <oto.stava@nic.cz>
Thu, 11 Aug 2022 07:25:41 +0000 (09:25 +0200)
committerOto Šťáva <oto.stava@nic.cz>
Thu, 26 Jan 2023 11:56:07 +0000 (12:56 +0100)
daemon/session2.c
daemon/udp_queue.c
daemon/udp_queue.h
daemon/worker.c

index 81ba5ce610b89fa68984c8194b28cec22a7e7cf8..dc1aeba254e567ecd537e8f7a45bd6fddd65ea9f 100644 (file)
@@ -8,13 +8,14 @@
 #include "lib/log.h"
 #include "lib/utils.h"
 #include "daemon/io.h"
+#include "daemon/udp_queue.h"
 #include "daemon/worker.h"
 
 #include "daemon/session2.h"
 
 
 static int session2_transport_pushv(struct session2 *s,
-                                    const struct iovec *iov, int iovcnt,
+                                    struct iovec *iov, int iovcnt,
                                     const void *target,
                                     protolayer_finished_cb cb, void *baton);
 static inline int session2_transport_push(struct session2 *s,
@@ -913,7 +914,7 @@ void session2_event(struct session2 *s, enum protolayer_event_type event, void *
 }
 
 
-struct parent_pushv_ctx {
+struct session2_pushv_ctx {
        struct session2 *session;
        protolayer_finished_cb cb;
        const void *target;
@@ -928,26 +929,25 @@ static void session2_transport_parent_pushv_finished(int status,
                                                      const void *target,
                                                      void *baton)
 {
-       struct parent_pushv_ctx *ctx = baton;
+       struct session2_pushv_ctx *ctx = baton;
        if (ctx->cb)
                ctx->cb(status, ctx->session, target, ctx->baton);
        free(ctx->buf);
        free(ctx);
 }
 
-static void session2_transport_udp_pushv_finished(uv_udp_send_t *req, int status)
+static void session2_transport_udp_queue_pushv_finished(int status, void *baton)
 {
-       struct parent_pushv_ctx *ctx = req->data;
+       struct session2_pushv_ctx *ctx = baton;
        if (ctx->cb)
                ctx->cb(status, ctx->session, ctx->target, ctx->baton);
        free(ctx->buf);
        free(ctx);
-       free(req);
 }
 
-static void session2_transport_stream_pushv_finished(uv_write_t *req, int status)
+static void session2_transport_udp_pushv_finished(uv_udp_send_t *req, int status)
 {
-       struct parent_pushv_ctx *ctx = req->data;
+       struct session2_pushv_ctx *ctx = req->data;
        if (ctx->cb)
                ctx->cb(status, ctx->session, ctx->target, ctx->baton);
        free(ctx->buf);
@@ -955,52 +955,27 @@ static void session2_transport_stream_pushv_finished(uv_write_t *req, int status
        free(req);
 }
 
-static int concat_iovs(const struct iovec *iov, int iovcnt, char **buf, size_t *buf_len)
+static void session2_transport_stream_pushv_finished(uv_write_t *req, int status)
 {
-       if (!iov || iovcnt <= 0)
-               return kr_error(ENODATA);
-
-       size_t len = 0;
-       for (int i = 0; i < iovcnt; i++) {
-               size_t old_len = len;
-               len += iov[i].iov_len;
-               if (kr_fails_assert(len >= old_len)) {
-                       *buf = NULL;
-                       return kr_error(EFBIG);
-               }
-       }
-
-       *buf_len = len;
-       if (len == 0) {
-               *buf = NULL;
-               return kr_ok();
-       }
-
-       *buf = malloc(len);
-       kr_require(*buf);
-
-       char *c = *buf;
-       for (int i = 0; i < iovcnt; i++) {
-               if (iov[i].iov_len == 0)
-                       continue;
-               memcpy(c, iov[i].iov_base, iov[i].iov_len);
-               c += iov[i].iov_len;
-       }
-
-       return kr_ok();
+       struct session2_pushv_ctx *ctx = req->data;
+       if (ctx->cb)
+               ctx->cb(status, ctx->session, ctx->target, ctx->baton);
+       free(ctx->buf);
+       free(ctx);
+       free(req);
 }
 
 static int session2_transport_pushv(struct session2 *s,
-                                    const struct iovec *iov, int iovcnt,
+                                    struct iovec *iov, int iovcnt,
                                     const void *target,
                                     protolayer_finished_cb cb, void *baton)
 {
        if (kr_fails_assert(s))
                return kr_error(EINVAL);
 
-       struct parent_pushv_ctx *ctx = malloc(sizeof(*ctx));
+       struct session2_pushv_ctx *ctx = malloc(sizeof(*ctx));
        kr_require(ctx);
-       *ctx = (struct parent_pushv_ctx){
+       *ctx = (struct session2_pushv_ctx){
                .session = s,
                .cb = cb,
                .baton = baton,
@@ -1015,13 +990,30 @@ static int session2_transport_pushv(struct session2 *s,
                        return kr_error(EINVAL);
                }
 
+               /* TODO: XDP */
                if (handle->type == UV_UDP) {
-                       uv_udp_send_t *req = malloc(sizeof(*req));
-                       req->data = ctx;
-                       uv_udp_send(req, (uv_udp_t *)handle,
-                                       (uv_buf_t *)iov, iovcnt, target,
-                                       session2_transport_udp_pushv_finished);
-                       return kr_ok();
+                       if (ENABLE_SENDMMSG && !s->outgoing) {
+                               int fd;
+                               int ret = uv_fileno(handle, &fd);
+                               if (kr_fails_assert(!ret))
+                                       return kr_error(EIO);
+
+                               /* TODO: support multiple iovecs properly? */
+                               if (kr_fails_assert(iovcnt == 1))
+                                       return kr_error(EINVAL);
+
+                               udp_queue_push(fd, target, iov->iov_base, iov->iov_len,
+                                               session2_transport_udp_queue_pushv_finished,
+                                               ctx);
+                               return kr_ok();
+                       } else {
+                               uv_udp_send_t *req = malloc(sizeof(*req));
+                               req->data = ctx;
+                               uv_udp_send(req, (uv_udp_t *)handle,
+                                               (uv_buf_t *)iov, iovcnt, target,
+                                               session2_transport_udp_pushv_finished);
+                               return kr_ok();
+                       }
                } else if (handle->type == UV_TCP) {
                        uv_write_t *req = malloc(sizeof(*req));
                        req->data = ctx;
@@ -1040,15 +1032,10 @@ static int session2_transport_pushv(struct session2 *s,
                        free(ctx);
                        return kr_error(EINVAL);
                }
-               int ret = concat_iovs(iov, iovcnt, &ctx->buf, &ctx->buf_len);
-               if (ret) {
-                       free(ctx);
-                       return ret;
-               }
-               session2_wrap(parent, protolayer_buffer(ctx->buf, ctx->buf_len),
+               int ret = session2_wrap(parent, protolayer_iovec(iov, iovcnt),
                                target, session2_transport_parent_pushv_finished,
                                ctx);
-               return kr_ok();
+               return (ret < 0) ? ret : kr_ok();
 
        default:
                kr_assert(false && "Invalid transport");
index 7460e041cf6ef0d3564fc5e20444aab63cdc86b4..d41fd640acacaa8fa00ef7ea92ec7a39dfbba8bd 100644 (file)
@@ -5,7 +5,7 @@
 #include "kresconfig.h"
 #include "daemon/udp_queue.h"
 
-#include "daemon/worker.h"
+#include "daemon/session2.h"
 #include "lib/generic/array.h"
 #include "lib/utils.h"
 
@@ -20,7 +20,8 @@ int udp_queue_init_global(uv_loop_t *loop)
        return 0;
 }
 /* Appease the linker in case this unused call isn't optimized out. */
-void udp_queue_push(int fd, struct kr_request *req, struct qr_task *task)
+void udp_queue_push(int fd, const struct sockaddr *sa, char *buf, size_t buf_len,
+                    udp_queue_cb cb, void *baton)
 {
        abort();
 }
@@ -35,7 +36,8 @@ typedef struct {
        int len; /**< The number of messages in the queue: 0..UDP_QUEUE_LEN */
        struct mmsghdr msgvec[UDP_QUEUE_LEN]; /**< Parameter for sendmmsg() */
        struct {
-               struct qr_task *task; /**< Links for completion callbacks. */
+               udp_queue_cb cb;
+               void *cb_baton;
                struct iovec msg_iov[1]; /**< storage for .msgvec[i].msg_iov */
        } items[UDP_QUEUE_LEN];
 } udp_queue_t;
@@ -77,8 +79,8 @@ static void udp_queue_send(int fd)
        /* ATM we don't really do anything about failures. */
        int err = sent_len < 0 ? errno : EAGAIN /* unknown error, really */;
        for (int i = 0; i < q->len; ++i) {
-               qr_task_on_send(q->items[i].task, NULL, i < sent_len ? 0 : err);
-               worker_task_unref(q->items[i].task);
+               if (q->items[i].cb)
+                       q->items[i].cb(i < sent_len ? 0 : err, q->items[i].cb_baton);
        }
        q->len = 0;
 }
@@ -99,13 +101,13 @@ int udp_queue_init_global(uv_loop_t *loop)
        return ret;
 }
 
-void udp_queue_push(int fd, struct kr_request *req, struct qr_task *task)
+void udp_queue_push(int fd, const struct sockaddr *sa, char *buf, size_t buf_len,
+                    udp_queue_cb cb, void *baton)
 {
        if (fd < 0) {
                kr_log_error(SYSTEM, "ERROR: called udp_queue_push(fd = %d, ...)\n", fd);
                abort();
        }
-       worker_task_ref(task);
        /* Get a valid correct queue. */
        if (fd >= state.udp_queues_len) {
                const int new_len = fd + 1;
@@ -121,13 +123,13 @@ void udp_queue_push(int fd, struct kr_request *req, struct qr_task *task)
        udp_queue_t *const q = state.udp_queues[fd];
 
        /* Append to the queue */
-       struct sockaddr *sa = (struct sockaddr *)/*const-cast*/req->qsource.comm_addr;
-       q->msgvec[q->len].msg_hdr.msg_name = sa;
+       q->msgvec[q->len].msg_hdr.msg_name = (void *)sa;
        q->msgvec[q->len].msg_hdr.msg_namelen = kr_sockaddr_len(sa);
-       q->items[q->len].task = task;
+       q->items[q->len].cb = cb;
+       q->items[q->len].cb_baton = baton;
        q->items[q->len].msg_iov[0] = (struct iovec){
-               .iov_base = req->answer->wire,
-               .iov_len  = req->answer->size,
+               .iov_base = buf,
+               .iov_len  = buf_len,
        };
        if (q->len == 0)
                array_push(state.waiting_fds, fd);
index f4a1ae1ec0f01e322d484230083bcc6c6d4aefe4..ed0a32699d37b838908624529a05aef4d4043c6e 100644 (file)
@@ -8,9 +8,12 @@
 struct kr_request;
 struct qr_task;
 
+typedef void (*udp_queue_cb)(int status, void *baton);
+
 /** Initialize the global state for udp_queue. */
 int udp_queue_init_global(uv_loop_t *loop);
 
 /** Send req->answer via UDP, possibly not immediately. */
-void udp_queue_push(int fd, struct kr_request *req, struct qr_task *task);
+void udp_queue_push(int fd, const struct sockaddr *sa, char *buf, size_t buf_len,
+                    udp_queue_cb cb, void *baton);
 
index 2a04f41e86398955930fda1237782abe4bd195c6..023efdd48b31f7bfb24481d90bd17c732a95f560 100644 (file)
@@ -31,7 +31,6 @@
 #include "daemon/session2.h"
 #include "daemon/tls.h"
 #include "daemon/http.h"
-#include "daemon/udp_queue.h"
 #include "lib/layer.h"
 #include "lib/utils.h"
 
@@ -1256,24 +1255,8 @@ static int qr_task_finalize(struct qr_task *task, int state)
        qr_task_ref(task);
 
        /* Send back answer */
-       int ret;
-       const uv_handle_t *src_handle = session2_get_handle(source_session);
-       /* TODO: this should probably just be a _wrap? */
-       if (kr_fails_assert(src_handle->type == UV_UDP || src_handle->type == UV_TCP
-                      || src_handle->type == UV_POLL)) {
-               ret = kr_error(EINVAL);
-       } else if (src_handle->type == UV_POLL) {
-               ret = xdp_push(task, src_handle);
-       } else if (src_handle->type == UV_UDP && ENABLE_SENDMMSG) {
-               int fd;
-               ret = uv_fileno(src_handle, &fd);
-               if (ret == 0)
-                       udp_queue_push(fd, &ctx->req, task);
-               else
-                       kr_assert(false);
-       } else {
-               ret = qr_task_send(task, source_session, &ctx->source.comm_addr.ip, ctx->req.answer);
-       }
+       /* TODO: xdp */
+       int ret = qr_task_send(task, source_session, &ctx->source.comm_addr.ip, ctx->req.answer);
 
        if (ret != kr_ok()) {
                (void) qr_task_on_send(task, NULL, kr_error(EIO));