From: Vladimír Čunát Date: Sun, 9 Jun 2024 09:01:59 +0000 (+0200) Subject: daemon: remove user-space transmit buffering X-Git-Tag: v5.7.4^2~3^2~3 X-Git-Url: http://git.ipfire.org/gitweb/gitweb.cgi?a=commitdiff_plain;h=refs%2Fenvironments%2Fdocs-develop-tmp-xva6ir%2Fdeployments%2F4626;p=thirdparty%2Fknot-resolver.git daemon: remove user-space transmit buffering --- diff --git a/daemon/session.c b/daemon/session.c index 91d3c39e1..4abb74091 100644 --- a/daemon/session.c +++ b/daemon/session.c @@ -814,6 +814,8 @@ int session_wirebuf_process(struct session *session, struct io_comm_data *comm) * something went wrong, normally should not happen. */ break; } + if (session->sflags.closing) + break; /* Submitting into a closing session could cause trouble. */ } /* worker_submit() may cause the session to close (e.g. due to IO diff --git a/daemon/tls.c b/daemon/tls.c index e821ff964..0c06fba8f 100644 --- a/daemon/tls.c +++ b/daemon/tls.c @@ -90,24 +90,6 @@ static ssize_t kres_gnutls_pull(gnutls_transport_ptr_t h, void *buf, size_t len) return transfer; } -static void on_write_complete(uv_write_t *req, int status) -{ - if (kr_fails_assert(req->data)) - return; - struct async_write_ctx *async_ctx = (struct async_write_ctx *)req->data; - struct tls_common_ctx *t = async_ctx->t; - if (t->write_queue_size) - t->write_queue_size -= 1; - else - kr_assert(false); - free(req->data); -} - -static bool stream_queue_is_empty(struct tls_common_ctx *t) -{ - return (t->write_queue_size == 0); -} - static ssize_t kres_gnutls_vec_push(gnutls_transport_ptr_t h, const giovec_t * iov, int iovcnt) { struct tls_common_ctx *t = (struct tls_common_ctx *)h; @@ -130,15 +112,6 @@ static ssize_t kres_gnutls_vec_push(gnutls_transport_ptr_t h, const giovec_t * i return -1; } - /* - * This is a little bit complicated. There are two different writes: - * 1. Immediate, these don't need to own the buffered data and return immediately - * 2. Asynchronous, these need to own the buffers until the write completes - * In order to avoid copying the buffer, an immediate write is tried first if possible. - * If it isn't possible to write the data without queueing, an asynchronous write - * is created (with copied buffered data). - */ - size_t total_len = 0; uv_buf_t uv_buf[iovcnt]; for (int i = 0; i < iovcnt; ++i) { @@ -147,9 +120,8 @@ static ssize_t kres_gnutls_vec_push(gnutls_transport_ptr_t h, const giovec_t * i total_len += iov[i].iov_len; } - /* Try to perform the immediate write first to avoid copy */ - int ret = 0; - if (stream_queue_is_empty(t)) { + int ret; + { // indentation kept to reduce diff: ret = uv_try_write(handle, uv_buf, iovcnt); DEBUG_MSG("[%s] push %zu <%p> = %d\n", t->client_side ? "tls_client" : "tls", total_len, h, ret); @@ -173,73 +145,8 @@ static ssize_t kres_gnutls_vec_push(gnutls_transport_ptr_t h, const giovec_t * i errno = EIO; return ret; } - /* Since we are here expression below is true - * (ret != total_len) && (ret >= 0 || ret == UV_EAGAIN) - * or the same - * (ret != total_len && ret >= 0) || (ret != total_len && ret == UV_EAGAIN) - * i.e. either occurs partial write or UV_EAGAIN. - * Proceed and copy data amount to owned memory and perform async write. - */ - if (ret == UV_EAGAIN) { - /* No data were buffered, so we must buffer all the data. */ - ret = 0; - } - } - - /* Fallback when the queue is full, and it's not possible to do an immediate write */ - char *p = malloc(sizeof(struct async_write_ctx) + total_len - ret); - if (p != NULL) { - struct async_write_ctx *async_ctx = (struct async_write_ctx *)p; - /* Save pointer to session tls context */ - async_ctx->t = t; - char *buf = async_ctx->buf; - /* Skip data written in the partial write */ - size_t to_skip = ret; - /* Copy the buffer into owned memory */ - size_t off = 0; - for (int i = 0; i < iovcnt; ++i) { - if (to_skip > 0) { - /* Ignore current buffer if it's all skipped */ - if (to_skip >= uv_buf[i].len) { - to_skip -= uv_buf[i].len; - continue; - } - /* Skip only part of the buffer */ - uv_buf[i].base += to_skip; - uv_buf[i].len -= to_skip; - to_skip = 0; - } - memcpy(buf + off, uv_buf[i].base, uv_buf[i].len); - off += uv_buf[i].len; - } - uv_buf[0].base = buf; - uv_buf[0].len = off; - - /* Create an asynchronous write request */ - uv_write_t *write_req = &async_ctx->write_req; - memset(write_req, 0, sizeof(uv_write_t)); - write_req->data = p; - - /* Perform an asynchronous write with a callback */ - if (uv_write(write_req, handle, uv_buf, 1, on_write_complete) == 0) { - ret = total_len; - t->write_queue_size += 1; - } else { - free(p); - VERBOSE_MSG(t->client_side, "uv_write error: %s\n", - uv_strerror(ret)); - errno = EIO; - ret = -1; - } - } else { - errno = ENOMEM; - ret = -1; + return kr_error(ENOBUFS); } - - DEBUG_MSG("[%s] queued %zu <%p> = %d\n", - t->client_side ? "tls_client" : "tls", total_len, h, ret); - - return ret; } /** Perform TLS handshake and handle error codes according to the documentation. @@ -413,7 +320,7 @@ void tls_free(struct tls_ctx *tls) free(tls); } -int tls_write(uv_write_t *req, uv_handle_t *handle, knot_pkt_t *pkt, uv_write_cb cb) +int tls_write(uv_handle_t *handle, knot_pkt_t *pkt) { if (!pkt || !handle || !handle->data) { return kr_error(EINVAL); @@ -456,9 +363,6 @@ int tls_write(uv_write_t *req, uv_handle_t *handle, knot_pkt_t *pkt, uv_write_cb } /* The data is now accepted in gnutls internal buffers, the message can be treated as sent */ - req->handle = (uv_stream_t *)handle; - cb(req, 0); - return kr_ok(); } diff --git a/daemon/tls.h b/daemon/tls.h index c30444bea..8ddb1e5f2 100644 --- a/daemon/tls.h +++ b/daemon/tls.h @@ -123,7 +123,6 @@ struct tls_common_ctx { uint8_t recv_buf[16384]; tls_handshake_cb handshake_cb; struct worker_ctx *worker; - size_t write_queue_size; }; struct tls_ctx { @@ -160,7 +159,7 @@ void tls_client_close(struct tls_client_ctx *ctx); void tls_free(struct tls_ctx* tls); /*! Push new data to TLS context for sending */ -int tls_write(uv_write_t *req, uv_handle_t* handle, knot_pkt_t * pkt, uv_write_cb cb); +int tls_write(uv_handle_t* handle, knot_pkt_t * pkt); /*! Unwrap incoming data from a TLS stream and pass them to TCP session. * @return the number of newly-completed requests (>=0) or an error code diff --git a/daemon/worker.c b/daemon/worker.c index 8ba7015dc..46744c123 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -635,15 +635,6 @@ int qr_task_on_send(struct qr_task *task, const uv_handle_t *handle, int status) return status; } -static void on_send(uv_udp_send_t *req, int status) -{ - struct qr_task *task = req->data; - uv_handle_t *h = (uv_handle_t *)req->handle; - qr_task_on_send(task, h, status); - qr_task_unref(task); - free(req); -} - static void on_write(uv_write_t *req, int status) { struct qr_task *task = req->data; @@ -696,59 +687,53 @@ static int qr_task_send(struct qr_task *task, struct session *session, if (kr_fails_assert(!session_flags(session)->closing)) return qr_task_on_send(task, NULL, kr_error(EIO)); - uv_handle_t *ioreq = malloc(is_stream ? sizeof(uv_write_t) : sizeof(uv_udp_send_t)); - if (!ioreq) - return qr_task_on_send(task, handle, kr_error(ENOMEM)); - /* Pending ioreq on current task */ qr_task_ref(task); if (session_flags(session)->has_http) { #if ENABLE_DOH2 - uv_write_t *write_req = (uv_write_t *)ioreq; + uv_write_t *write_req = malloc(sizeof(*write_req)); + if (!write_req) + return qr_task_on_send(task, handle, kr_error(ENOMEM)); write_req->data = task; ret = http_write(write_req, handle, pkt, ctx->req.qsource.stream_id, &on_write); + if (ret != 0) + free(write_req); #else ret = kr_error(ENOPROTOOPT); #endif } else if (session_flags(session)->has_tls) { - uv_write_t *write_req = (uv_write_t *)ioreq; - write_req->data = task; - ret = tls_write(write_req, handle, pkt, &on_write); + ret = tls_write(handle, pkt); } else if (handle->type == UV_UDP) { - uv_udp_send_t *send_req = (uv_udp_send_t *)ioreq; uv_buf_t buf = { (char *)pkt->wire, pkt->size }; - send_req->data = task; - ret = uv_udp_send(send_req, (uv_udp_t *)handle, &buf, 1, addr, &on_send); - } else if (handle->type == UV_TCP) { - uv_write_t *write_req = (uv_write_t *)ioreq; - /* We need to write message length in native byte order, - * but we don't have a convenient place to store those bytes. - * The problem is that all memory referenced from buf[] MUST retain - * its contents at least until on_write() is called, and I currently - * can't see any convenient place outside the `pkt` structure. - * So we use directly the *individual* bytes in pkt->size. - * The call to htonl() and the condition will probably be inlinable. */ - int lsbi, slsbi; /* (second) least significant byte index */ - if (htonl(1) == 1) { /* big endian */ - lsbi = sizeof(pkt->size) - 1; - slsbi = sizeof(pkt->size) - 2; - } else { - lsbi = 0; - slsbi = 1; + ret = uv_udp_try_send((uv_udp_t *)handle, &buf, 1, addr); + if (ret >= 0 && ret == pkt->size) { + ret = 0; // this is the success + } else if (ret == UV_EAGAIN || ret >= 0) { + ret = kr_error(ENOBUFS); } - uv_buf_t buf[3] = { - { (char *)&pkt->size + slsbi, 1 }, - { (char *)&pkt->size + lsbi, 1 }, + } else if (handle->type == UV_TCP) { + uint8_t size_buf[2] = { pkt->size / 256, pkt->size % 256 }; + uv_buf_t buf[2] = { + { (char *)&size_buf, 2 }, { (char *)pkt->wire, pkt->size }, }; - write_req->data = task; - ret = uv_write(write_req, (uv_stream_t *)handle, buf, 3, &on_write); + ret = uv_try_write((uv_stream_t *)handle, buf, 2); + if (ret >= 0 && ret == pkt->size + 2) { + ret = 0; // this is the success + } else if (ret == UV_EAGAIN || ret >= 0) { + ret = kr_error(ENOBUFS); + } } else { kr_assert(false); + ret = kr_error(EINVAL); } if (ret == 0) { + if (!session_flags(session)->has_http) { // instead of completion callback + qr_task_on_send(task, handle, ret); + qr_task_unref(task); + } session_touch(session); if (session_flags(session)->outgoing) { session_tasklist_add(session, task); @@ -759,7 +744,6 @@ static int qr_task_send(struct qr_task *task, struct session *session, worker->too_many_open = false; } } else { - free(ioreq); qr_task_unref(task); if (ret == UV_EMFILE) { worker->too_many_open = true;