]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon: remove user-space transmit buffering docs-develop-tmp-xva6ir/deployments/4626
authorVladimír Čunát <vladimir.cunat@nic.cz>
Sun, 9 Jun 2024 09:01:59 +0000 (11:01 +0200)
committerVladimír Čunát <vladimir.cunat@nic.cz>
Mon, 22 Jul 2024 10:56:01 +0000 (12:56 +0200)
daemon/session.c
daemon/tls.c
daemon/tls.h
daemon/worker.c

index 91d3c39e1946f5239a7b4b0190ce0dd22cb3a25e..4abb7409148f2231a7a86eb0d2ba46d4f809653f 100644 (file)
@@ -814,6 +814,8 @@ int session_wirebuf_process(struct session *session, struct io_comm_data *comm)
                         * something went wrong, normally should not happen. */
                        break;
                }
+               if (session->sflags.closing)
+                       break; /* Submitting into a closing session could cause trouble. */
        }
 
        /* worker_submit() may cause the session to close (e.g. due to IO
index e821ff964c4dd85b8c26c77d11c2e00777b95f89..0c06fba8f382d864e65b71f6fd2a0f3aa3750958 100644 (file)
@@ -90,24 +90,6 @@ static ssize_t kres_gnutls_pull(gnutls_transport_ptr_t h, void *buf, size_t len)
        return transfer;
 }
 
-static void on_write_complete(uv_write_t *req, int status)
-{
-       if (kr_fails_assert(req->data))
-               return;
-       struct async_write_ctx *async_ctx = (struct async_write_ctx *)req->data;
-       struct tls_common_ctx *t = async_ctx->t;
-       if (t->write_queue_size)
-               t->write_queue_size -= 1;
-       else
-               kr_assert(false);
-       free(req->data);
-}
-
-static bool stream_queue_is_empty(struct tls_common_ctx *t)
-{
-       return (t->write_queue_size == 0);
-}
-
 static ssize_t kres_gnutls_vec_push(gnutls_transport_ptr_t h, const giovec_t * iov, int iovcnt)
 {
        struct tls_common_ctx *t = (struct tls_common_ctx *)h;
@@ -130,15 +112,6 @@ static ssize_t kres_gnutls_vec_push(gnutls_transport_ptr_t h, const giovec_t * i
                return -1;
        }
 
-       /*
-        * This is a little bit complicated. There are two different writes:
-        * 1. Immediate, these don't need to own the buffered data and return immediately
-        * 2. Asynchronous, these need to own the buffers until the write completes
-        * In order to avoid copying the buffer, an immediate write is tried first if possible.
-        * If it isn't possible to write the data without queueing, an asynchronous write
-        * is created (with copied buffered data).
-        */
-
        size_t total_len = 0;
        uv_buf_t uv_buf[iovcnt];
        for (int i = 0; i < iovcnt; ++i) {
@@ -147,9 +120,8 @@ static ssize_t kres_gnutls_vec_push(gnutls_transport_ptr_t h, const giovec_t * i
                total_len += iov[i].iov_len;
        }
 
-       /* Try to perform the immediate write first to avoid copy */
-       int ret = 0;
-       if (stream_queue_is_empty(t)) {
+       int ret;
+       { // indentation kept to reduce diff:
                ret = uv_try_write(handle, uv_buf, iovcnt);
                DEBUG_MSG("[%s] push %zu <%p> = %d\n",
                    t->client_side ? "tls_client" : "tls", total_len, h, ret);
@@ -173,73 +145,8 @@ static ssize_t kres_gnutls_vec_push(gnutls_transport_ptr_t h, const giovec_t * i
                        errno = EIO;
                        return ret;
                }
-               /* Since we are here expression below is true
-                * (ret != total_len) && (ret >= 0 || ret == UV_EAGAIN)
-                * or the same
-                * (ret != total_len && ret >= 0) || (ret != total_len && ret == UV_EAGAIN)
-                * i.e. either occurs partial write or UV_EAGAIN.
-                * Proceed and copy data amount to owned memory and perform async write.
-                */
-               if (ret == UV_EAGAIN) {
-                       /* No data were buffered, so we must buffer all the data. */
-                       ret = 0;
-               }
-       }
-
-       /* Fallback when the queue is full, and it's not possible to do an immediate write */
-       char *p = malloc(sizeof(struct async_write_ctx) + total_len - ret);
-       if (p != NULL) {
-               struct async_write_ctx *async_ctx = (struct async_write_ctx *)p;
-               /* Save pointer to session tls context */
-               async_ctx->t = t;
-               char *buf = async_ctx->buf;
-               /* Skip data written in the partial write */
-               size_t to_skip = ret;
-               /* Copy the buffer into owned memory */
-               size_t off = 0;
-               for (int i = 0; i < iovcnt; ++i) {
-                       if (to_skip > 0) {
-                               /* Ignore current buffer if it's all skipped */
-                               if (to_skip >= uv_buf[i].len) {
-                                       to_skip -= uv_buf[i].len;
-                                       continue;
-                               }
-                               /* Skip only part of the buffer */
-                               uv_buf[i].base += to_skip;
-                               uv_buf[i].len -= to_skip;
-                               to_skip = 0;
-                       }
-                       memcpy(buf + off, uv_buf[i].base, uv_buf[i].len);
-                       off += uv_buf[i].len;
-               }
-               uv_buf[0].base = buf;
-               uv_buf[0].len = off;
-
-               /* Create an asynchronous write request */
-               uv_write_t *write_req = &async_ctx->write_req;
-               memset(write_req, 0, sizeof(uv_write_t));
-               write_req->data = p;
-
-               /* Perform an asynchronous write with a callback */
-               if (uv_write(write_req, handle, uv_buf, 1, on_write_complete) == 0) {
-                       ret = total_len;
-                       t->write_queue_size += 1;
-               } else {
-                       free(p);
-                       VERBOSE_MSG(t->client_side, "uv_write error: %s\n",
-                                       uv_strerror(ret));
-                       errno = EIO;
-                       ret = -1;
-               }
-       } else {
-               errno = ENOMEM;
-               ret = -1;
+               return kr_error(ENOBUFS);
        }
-
-       DEBUG_MSG("[%s] queued %zu <%p> = %d\n",
-           t->client_side ? "tls_client" : "tls", total_len, h, ret);
-
-       return ret;
 }
 
 /** Perform TLS handshake and handle error codes according to the documentation.
@@ -413,7 +320,7 @@ void tls_free(struct tls_ctx *tls)
        free(tls);
 }
 
-int tls_write(uv_write_t *req, uv_handle_t *handle, knot_pkt_t *pkt, uv_write_cb cb)
+int tls_write(uv_handle_t *handle, knot_pkt_t *pkt)
 {
        if (!pkt || !handle || !handle->data) {
                return kr_error(EINVAL);
@@ -456,9 +363,6 @@ int tls_write(uv_write_t *req, uv_handle_t *handle, knot_pkt_t *pkt, uv_write_cb
        }
 
        /* The data is now accepted in gnutls internal buffers, the message can be treated as sent */
-       req->handle = (uv_stream_t *)handle;
-       cb(req, 0);
-
        return kr_ok();
 }
 
index c30444bea1a43752d43f1558070e38986f619ec2..8ddb1e5f24a624ffe4d68403aaa0a72bbac53de9 100644 (file)
@@ -123,7 +123,6 @@ struct tls_common_ctx {
        uint8_t recv_buf[16384];
        tls_handshake_cb handshake_cb;
        struct worker_ctx *worker;
-       size_t write_queue_size;
 };
 
 struct tls_ctx {
@@ -160,7 +159,7 @@ void tls_client_close(struct tls_client_ctx *ctx);
 void tls_free(struct tls_ctx* tls);
 
 /*! Push new data to TLS context for sending */
-int tls_write(uv_write_t *req, uv_handle_t* handle, knot_pkt_t * pkt, uv_write_cb cb);
+int tls_write(uv_handle_t* handle, knot_pkt_t * pkt);
 
 /*! Unwrap incoming data from a TLS stream and pass them to TCP session.
  * @return the number of newly-completed requests (>=0) or an error code
index 8ba7015dc5ab3fd5f40a0b998077fb7755c87555..46744c123644e509d345c11af6494e798886ba6c 100644 (file)
@@ -635,15 +635,6 @@ int qr_task_on_send(struct qr_task *task, const uv_handle_t *handle, int status)
        return status;
 }
 
-static void on_send(uv_udp_send_t *req, int status)
-{
-       struct qr_task *task = req->data;
-       uv_handle_t *h = (uv_handle_t *)req->handle;
-       qr_task_on_send(task, h, status);
-       qr_task_unref(task);
-       free(req);
-}
-
 static void on_write(uv_write_t *req, int status)
 {
        struct qr_task *task = req->data;
@@ -696,59 +687,53 @@ static int qr_task_send(struct qr_task *task, struct session *session,
        if (kr_fails_assert(!session_flags(session)->closing))
                return qr_task_on_send(task, NULL, kr_error(EIO));
 
-       uv_handle_t *ioreq = malloc(is_stream ? sizeof(uv_write_t) : sizeof(uv_udp_send_t));
-       if (!ioreq)
-               return qr_task_on_send(task, handle, kr_error(ENOMEM));
-
        /* Pending ioreq on current task */
        qr_task_ref(task);
 
        if (session_flags(session)->has_http) {
 #if ENABLE_DOH2
-               uv_write_t *write_req = (uv_write_t *)ioreq;
+               uv_write_t *write_req = malloc(sizeof(*write_req));
+               if (!write_req)
+                       return qr_task_on_send(task, handle, kr_error(ENOMEM));
                write_req->data = task;
                ret = http_write(write_req, handle, pkt, ctx->req.qsource.stream_id, &on_write);
+               if (ret != 0)
+                       free(write_req);
 #else
                ret = kr_error(ENOPROTOOPT);
 #endif
        } else if (session_flags(session)->has_tls) {
-               uv_write_t *write_req = (uv_write_t *)ioreq;
-               write_req->data = task;
-               ret = tls_write(write_req, handle, pkt, &on_write);
+               ret = tls_write(handle, pkt);
        } else if (handle->type == UV_UDP) {
-               uv_udp_send_t *send_req = (uv_udp_send_t *)ioreq;
                uv_buf_t buf = { (char *)pkt->wire, pkt->size };
-               send_req->data = task;
-               ret = uv_udp_send(send_req, (uv_udp_t *)handle, &buf, 1, addr, &on_send);
-       } else if (handle->type == UV_TCP) {
-               uv_write_t *write_req = (uv_write_t *)ioreq;
-               /* We need to write message length in native byte order,
-                * but we don't have a convenient place to store those bytes.
-                * The problem is that all memory referenced from buf[] MUST retain
-                * its contents at least until on_write() is called, and I currently
-                * can't see any convenient place outside the `pkt` structure.
-                * So we use directly the *individual* bytes in pkt->size.
-                * The call to htonl() and the condition will probably be inlinable. */
-               int lsbi, slsbi; /* (second) least significant byte index */
-               if (htonl(1) == 1) { /* big endian */
-                       lsbi  = sizeof(pkt->size) - 1;
-                       slsbi = sizeof(pkt->size) - 2;
-               } else {
-                       lsbi  = 0;
-                       slsbi = 1;
+               ret = uv_udp_try_send((uv_udp_t *)handle, &buf, 1, addr);
+               if (ret >= 0 && ret == pkt->size) {
+                       ret = 0; // this is the success
+               } else if (ret == UV_EAGAIN || ret >= 0) {
+                       ret = kr_error(ENOBUFS);
                }
-               uv_buf_t buf[3] = {
-                       { (char *)&pkt->size + slsbi, 1 },
-                       { (char *)&pkt->size + lsbi,  1 },
+       } else if (handle->type == UV_TCP) {
+               uint8_t size_buf[2] = { pkt->size / 256, pkt->size % 256 };
+               uv_buf_t buf[2] = {
+                       { (char *)&size_buf, 2 },
                        { (char *)pkt->wire, pkt->size },
                };
-               write_req->data = task;
-               ret = uv_write(write_req, (uv_stream_t *)handle, buf, 3, &on_write);
+               ret = uv_try_write((uv_stream_t *)handle, buf, 2);
+               if (ret >= 0 && ret == pkt->size + 2) {
+                       ret = 0; // this is the success
+               } else if (ret == UV_EAGAIN || ret >= 0) {
+                       ret = kr_error(ENOBUFS);
+               }
        } else {
                kr_assert(false);
+               ret = kr_error(EINVAL);
        }
 
        if (ret == 0) {
+               if (!session_flags(session)->has_http) { // instead of completion callback
+                       qr_task_on_send(task, handle, ret);
+                       qr_task_unref(task);
+               }
                session_touch(session);
                if (session_flags(session)->outgoing) {
                        session_tasklist_add(session, task);
@@ -759,7 +744,6 @@ static int qr_task_send(struct qr_task *task, struct session *session,
                        worker->too_many_open = false;
                }
        } else {
-               free(ioreq);
                qr_task_unref(task);
                if (ret == UV_EMFILE) {
                        worker->too_many_open = true;