return;
}
memset(client, 0, sizeof(*client));
- io_create(master->loop, (uv_handle_t *)client, SOCK_STREAM);
+ io_create(master->loop, (uv_handle_t *)client, SOCK_STREAM, 0);
if (uv_accept(master, client) != 0) {
uv_close((uv_handle_t *)client, io_release);
return;
return _tcp_bindfd(handle, fd, tls_accept, tcp_backlog);
}
-void io_create(uv_loop_t *loop, uv_handle_t *handle, int type)
+void io_create(uv_loop_t *loop, uv_handle_t *handle, int type, unsigned family)
{
int ret = -1;
if (type == SOCK_DGRAM) {
ret = uv_udp_init(loop, (uv_udp_t *)handle);
} else if (type == SOCK_STREAM) {
- ret = uv_tcp_init(loop, (uv_tcp_t *)handle);
+ ret = uv_tcp_init_ex(loop, (uv_tcp_t *)handle, family);
uv_tcp_nodelay((uv_tcp_t *)handle, 1);
}
assert(ret == 0);
int tcp_bindfd_tls(uv_tcp_t *handle, int fd, int tcp_backlog);
/** Initialize the handle, incl. ->data = struct session * instance. type = SOCK_* */
-void io_create(uv_loop_t *loop, uv_handle_t *handle, int type);
+void io_create(uv_loop_t *loop, uv_handle_t *handle, int type, unsigned family);
void io_deinit(uv_handle_t *handle);
void io_free(uv_handle_t *handle);
return transfer;
}
+static void on_write_complete(uv_write_t *req, int status)
+{
+ assert(req->data != NULL);
+ free(req->data);
+ free(req);
+}
+
+static bool stream_queue_is_empty(uv_stream_t *handle)
+{
+#if UV_VERSION_HEX >= 0x011900
+ return uv_stream_get_write_queue_size(handle) == 0;
+#else
+ /* Assume best case */
+ return true;
+#endif
+}
+
+static ssize_t kres_gnutls_vec_push(gnutls_transport_ptr_t h, const giovec_t * iov, int iovcnt)
+{
+ struct tls_common_ctx *t = (struct tls_common_ctx *)h;
+
+ if (t == NULL) {
+ errno = EFAULT;
+ return -1;
+ }
+
+ if (iovcnt == 0) {
+ return 0;
+ }
+
+ assert(t->session && t->session->handle &&
+ t->session->handle->type == UV_TCP);
+ uv_stream_t *handle = (uv_stream_t *)t->session->handle;
+
+ /*
+ * This is a little bit complicated. There are two different writes:
+ * 1. Immediate, these don't need to own the buffered data and return immediately
+ * 2. Asynchronous, these need to own the buffers until the write completes
+ * In order to avoid copying the buffer, an immediate write is tried first if possible.
+ * If it isn't possible to write the data without queueing, an asynchronous write
+ * is created (with copied buffered data).
+ */
+
+ size_t total_len = 0;
+ uv_buf_t uv_buf[iovcnt];
+ for (int i = 0; i < iovcnt; ++i) {
+ uv_buf[i].base = iov[i].iov_base;
+ uv_buf[i].len = iov[i].iov_len;
+ total_len += iov[i].iov_len;
+ }
+
+ /* Try to perform the immediate write first to avoid copy */
+ int ret = 0;
+ if (stream_queue_is_empty(handle)) {
+ ret = uv_try_write(handle, uv_buf, iovcnt);
+ DEBUG_MSG("[%s] push %zu <%p> = %d\n",
+ t->client_side ? "tls_client" : "tls", total_len, h, ret);
+ if (ret >= 0 || ret != UV_EAGAIN) {
+ return ret;
+ }
+ }
+
+ /* Fallback when the queue is full, and it's not possible to do an immediate write */
+ char *buf = malloc(total_len);
+ if (buf != NULL) {
+ /* Copy the buffer into owned memory */
+ size_t off = 0;
+ for (int i = 0; i < iovcnt; ++i) {
+ memcpy(buf + off, uv_buf[i].base, uv_buf[i].len);
+ off += uv_buf[i].len;
+ }
+ uv_buf[0].base = buf;
+ uv_buf[0].len = total_len;
+
+ /* Create an asynchronous write request */
+ uv_write_t *write_req = calloc(1, sizeof(uv_write_t));
+ if (write_req != NULL) {
+ write_req->data = buf;
+ } else {
+ free(buf);
+ errno = ENOMEM;
+ ret = -1;
+ }
+
+ /* Perform an asynchronous write with a callback */
+ if (uv_write(write_req, handle, uv_buf, 1, on_write_complete) == 0) {
+ ret = total_len;
+ } else {
+ free(buf);
+ free(write_req);
+ errno = EIO;
+ ret = -1;
+ }
+ } else {
+ errno = ENOMEM;
+ ret = -1;
+ }
+
+ DEBUG_MSG("[%s] queued %zu <%p> = %d\n",
+ t->client_side ? "tls_client" : "tls", total_len, h, ret);
+
+ return ret;
+}
+
/** Perform TLS handshake and handle error codes according to the documentation.
* See See https://gnutls.org/manual/html_node/TLS-handshake.html#TLS-handshake
* The function returns kr_ok() or success or non fatal error, kr_error(EAGAIN) on blocking, or kr_error(EIO) on fatal error.
tls->c.client_side = false;
gnutls_transport_set_pull_function(tls->c.tls_session, kres_gnutls_pull);
- gnutls_transport_set_push_function(tls->c.tls_session, worker_gnutls_push);
+ gnutls_transport_set_vec_push_function(tls->c.tls_session, kres_gnutls_vec_push);
gnutls_transport_set_ptr(tls->c.tls_session, tls);
if (net->tls_session_ticket_ctx) {
free(tls);
}
-int tls_push(struct qr_task *task, uv_handle_t *handle, knot_pkt_t *pkt)
+int tls_write(uv_write_t *req, uv_handle_t *handle, knot_pkt_t *pkt, uv_write_cb cb)
{
if (!pkt || !handle || !handle->data) {
return kr_error(EINVAL);
const char *logstring = tls_ctx->client_side ? client_logstring : server_logstring;
gnutls_session_t tls_session = tls_ctx->tls_session;
- tls_ctx->task = task;
-
gnutls_record_cork(tls_session);
ssize_t count = 0;
if ((count = gnutls_record_send(tls_session, &pkt_size, sizeof(pkt_size)) < 0) ||
return kr_error(EIO);
}
- ssize_t submitted = 0;
- ssize_t retries = 0;
- do {
- count = gnutls_record_uncork(tls_session, 0);
- if (count < 0) {
- if (gnutls_error_is_fatal(count)) {
- kr_log_error("[%s] gnutls_record_uncork failed: %s (%zd)\n",
- logstring, gnutls_strerror_name(count), count);
- return kr_error(EIO);
- }
- if (++retries > TLS_MAX_UNCORK_RETRIES) {
- kr_log_error("[%s] gnutls_record_uncork: too many sequential non-fatal errors (%zd), last error is: %s (%zd)\n",
- logstring, retries, gnutls_strerror_name(count), count);
- return kr_error(EIO);
- }
- } else if (count != 0) {
- submitted += count;
- retries = 0;
- } else if (gnutls_record_check_corked(tls_session) != 0) {
- if (++retries > TLS_MAX_UNCORK_RETRIES) {
- kr_log_error("[%s] gnutls_record_uncork: too many retries (%zd)\n",
- logstring, retries);
- return kr_error(EIO);
- }
- } else if (submitted != sizeof(pkt_size) + pkt->size) {
- kr_log_error("[%s] gnutls_record_uncork didn't send all data(%zd of %zd)\n",
- logstring, submitted, sizeof(pkt_size) + pkt->size);
- return kr_error(EIO);
- }
- } while (submitted != sizeof(pkt_size) + pkt->size);
+ const ssize_t submitted = sizeof(pkt_size) + pkt->size;
+
+ int ret = gnutls_record_uncork(tls_session, GNUTLS_RECORD_WAIT);
+ if (gnutls_error_is_fatal(ret)) {
+ kr_log_error("[%s] gnutls_record_uncork failed: %s (%d)\n",
+ logstring, gnutls_strerror_name(ret), ret);
+ return kr_error(EIO);
+ }
+
+ if (ret != submitted) {
+ kr_log_error("[%s] gnutls_record_uncork didn't send all data (%d of %zd)\n",
+ logstring, ret, submitted);
+ return kr_error(EIO);
+ }
+
+ /* The data is now accepted in gnutls internal buffers, the message can be treated as sent */
+ req->handle = (uv_stream_t *)handle;
+ cb(req, 0);
return kr_ok();
}
return NULL;
}
- int ret = gnutls_init(&ctx->c.tls_session, GNUTLS_CLIENT | GNUTLS_NONBLOCK);
+ int ret = gnutls_init(&ctx->c.tls_session, GNUTLS_CLIENT | GNUTLS_NONBLOCK | GNUTLS_ENABLE_FALSE_START);
if (ret != GNUTLS_E_SUCCESS) {
tls_client_ctx_free(ctx);
return NULL;
ctx->c.client_side = true;
gnutls_transport_set_pull_function(ctx->c.tls_session, kres_gnutls_pull);
- gnutls_transport_set_push_function(ctx->c.tls_session, worker_gnutls_push);
+ gnutls_transport_set_vec_push_function(ctx->c.tls_session, kres_gnutls_vec_push);
gnutls_transport_set_ptr(ctx->c.tls_session, ctx);
return ctx;
}
*/
#define TLS_MAX_HANDSHAKE_TIME (KR_CONN_RTT_MAX * 3)
+/** Transport session (opaque). */
+struct session;
+
struct tls_ctx_t;
struct tls_client_ctx_t;
struct tls_credentials {
uint8_t recv_buf[4096];
tls_handshake_cb handshake_cb;
struct worker_ctx *worker;
- struct qr_task *task;
};
struct tls_ctx_t {
void tls_free(struct tls_ctx_t* tls);
/*! Push new data to TLS context for sending */
-int tls_push(struct qr_task *task, uv_handle_t* handle, knot_pkt_t * pkt);
+int tls_write(uv_write_t *req, uv_handle_t* handle, knot_pkt_t * pkt, uv_write_cb cb);
/*! Unwrap incoming data from a TLS stream and pass them to TCP session.
* @return the number of newly-completed requests (>=0) or an error code
if (!handle) {
return NULL;
}
- io_create(worker->loop, handle, socktype);
+ io_create(worker->loop, handle, socktype, family);
/* Bind to outgoing address, according to IP v4/v6. */
union inaddr *addr;
iorequest_release(worker, req);
}
-static void on_nontask_write(uv_write_t *req, int status)
-{
- uv_handle_t *handle = (uv_handle_t *)(req->handle);
- uv_loop_t *loop = handle->loop;
- struct worker_ctx *worker = loop->data;
- assert(worker == get_worker());
- iorequest_release(worker, req);
-}
-
-ssize_t worker_gnutls_push(gnutls_transport_ptr_t h, const void *buf, size_t len)
-{
- struct tls_common_ctx *t = (struct tls_common_ctx *)h;
- const uv_buf_t uv_buf[1] = {
- { (char *)buf, len }
- };
-
- if (t == NULL) {
- errno = EFAULT;
- return -1;
- }
-
- assert(t->session && t->session->handle &&
- t->session->handle->type == UV_TCP);
-
- VERBOSE_MSG(NULL,"[%s] push %zu <%p>\n",
- t->client_side ? "tls_client" : "tls", len, h);
-
- struct worker_ctx *worker = t->worker;
- assert(worker);
-
- void *ioreq = worker_iohandle_borrow(worker);
- if (!ioreq) {
- errno = EFAULT;
- return -1;
- }
-
- uv_write_t *write_req = (uv_write_t *)ioreq;
-
- struct qr_task *task = t->task;
- uv_write_cb write_cb = on_task_write;
- if (t->handshake_state == TLS_HS_DONE) {
- assert(task);
- } else {
- task = NULL;
- write_cb = on_nontask_write;
- }
-
- write_req->data = task;
-
- ssize_t ret = -1;
- int res = uv_write(write_req, (uv_stream_t *)t->session->handle, uv_buf, 1, write_cb);
- if (res == 0) {
- if (task) {
- qr_task_ref(task); /* Pending ioreq on current task */
- struct request_ctx *ctx = task->ctx;
- if (ctx && ctx->source.session &&
- t->session->handle != ctx->source.session->handle) {
- struct sockaddr *addr = &t->session->peer.ip;
- worker->stats.tls += 1;
- if (addr->sa_family == AF_INET6)
- worker->stats.ipv6 += 1;
- else if (addr->sa_family == AF_INET)
- worker->stats.ipv4 += 1;
- }
- }
- if (worker->too_many_open &&
- worker->stats.rconcurrent <
- worker->rconcurrent_highwatermark - 10) {
- worker->too_many_open = false;
- }
- ret = len;
- } else {
- VERBOSE_MSG(NULL,"[%s] uv_write: %s\n",
- t->client_side ? "tls_client" : "tls", uv_strerror(res));
- iorequest_release(worker, ioreq);
- errno = EIO;
- }
- return ret;
-}
-
static int qr_task_send(struct qr_task *task, uv_handle_t *handle,
struct sockaddr *addr, knot_pkt_t *pkt)
{
return qr_task_on_send(task, handle, kr_error(EIO));
}
- /* Synchronous push to TLS context, bypassing event loop. */
- struct session *session = handle->data;
- assert(session->closing == false);
- if (session->has_tls) {
- return tls_push(task, handle, pkt);
- }
-
int ret = 0;
struct request_ctx *ctx = task->ctx;
struct worker_ctx *worker = ctx->worker;
if (!ioreq) {
return qr_task_on_send(task, handle, kr_error(ENOMEM));
}
+ /* Pending ioreq on current task */
+ qr_task_ref(task);
/* Send using given protocol */
- if (handle->type == UV_UDP) {
+ struct session *session = handle->data;
+ assert(session->closing == false);
+ if (session->has_tls) {
+ uv_write_t *write_req = (uv_write_t *)ioreq;
+ write_req->data = task;
+ ret = tls_write(write_req, handle, pkt, &on_task_write);
+ } else if (handle->type == UV_UDP) {
uv_udp_send_t *send_req = (uv_udp_send_t *)ioreq;
uv_buf_t buf = { (char *)pkt->wire, pkt->size };
send_req->data = task;
}
if (ret == 0) {
- qr_task_ref(task); /* Pending ioreq on current task */
if (worker->too_many_open &&
worker->stats.rconcurrent <
worker->rconcurrent_highwatermark - 10) {
}
} else {
iorequest_release(worker, ioreq);
+ qr_task_unref(task);
if (ret == UV_EMFILE) {
worker->too_many_open = true;
worker->rconcurrent_highwatermark = worker->stats.rconcurrent;
if (ctx->source.session &&
handle != ctx->source.session->handle &&
addr) {
- if (handle->type == UV_UDP)
+ if (session->has_tls)
+ worker->stats.tls += 1;
+ else if (handle->type == UV_UDP)
worker->stats.udp += 1;
else
worker->stats.tcp += 1;
+
if (addr->sa_family == AF_INET6)
worker->stats.ipv6 += 1;
else if (addr->sa_family == AF_INET)
worker->stats.ipv4 += 1;
}
+
return ret;
}
return state == KR_STATE_DONE ? 0 : kr_error(EIO);
}
+ /* Reference task as the callback handler can close it */
+ qr_task_ref(task);
+
/* Send back answer */
struct session *source_session = ctx->source.session;
uv_handle_t *handle = source_session->handle;
session_del_tasks(source_session, t);
}
session_close(source_session);
- } else if (handle->type == UV_TCP) {
+ } else if (handle->type == UV_TCP && ctx->source.session) {
/* Don't try to close source session at least
* retry_interval_for_timeout_timer milliseconds */
uv_timer_again(&ctx->source.session->timeout);
}
+ qr_task_unref(task);
+
return state == KR_STATE_DONE ? 0 : kr_error(EIO);
}
/* Upgrade to TLS if the upstream address is configured as DoT capable. */
struct engine *engine = ctx->worker->engine;
struct network *net = &engine->net;
- const struct sockaddr *addr = packet_source ? packet_source : task->addrlist;
struct tls_client_paramlist_entry *tls_entry = NULL;
- if (kr_inaddr_port(task->addrlist) == KR_DNS_PORT) {
- tls_entry = tls_client_try_upgrade(&net->tls_client_params, task->addrlist);
+ /* SOCK_STREAM is likely a retry over TCP, so the resolver should use
+ * the same address that failed over UDP instead of selecting a new one. */
+ const bool retry_address = (packet_source && sock_type == SOCK_STREAM);
+ const struct sockaddr *addr = retry_address ? packet_source : task->addrlist;
+ if (kr_inaddr_port(addr) == KR_DNS_PORT) {
+ tls_entry = tls_client_try_upgrade(&net->tls_client_params, addr);
if (tls_entry != NULL) {
- kr_inaddr_set_port(task->addrlist, KR_DNS_TLS_PORT);
+ kr_inaddr_set_port((struct sockaddr *)addr, KR_DNS_TLS_PORT);
sock_type = SOCK_STREAM;
}
} else if (sock_type == SOCK_STREAM) {
/* Start transmitting */
uv_handle_t *handle = retransmit(task);
if (handle == NULL) {
- return qr_task_step(task, task->addrlist, NULL);
+ return qr_task_step(task, addr, NULL);
}
/* Check current query NSLIST */
struct kr_query *qry = array_tail(req->rplan.pending);
subreq_finalize(task, packet_source, packet);
return qr_task_finalize(task, KR_STATE_FAIL);
}
- /* CHeck that the selected address is still valid */
+
+ /* Check that the selected address is still valid */
if (addr->sa_family != AF_INET && addr->sa_family != AF_INET6) {
subreq_finalize(task, packet_source, packet);
return qr_task_finalize(task, KR_STATE_FAIL);
}
+
struct session* session = NULL;
if ((session = worker_find_tcp_waiting(ctx->worker, addr)) != NULL) {
assert(session->outgoing);
/* Check if there must be TLS */
if (tls_entry) {
+ assert(kr_inaddr_port(addr) != KR_DNS_PORT);
assert(session->tls_client_ctx == NULL);
struct tls_client_ctx_t *tls_ctx = tls_client_ctx_new(tls_entry, worker);
if (!tls_ctx) {
#pragma once
-#include <gnutls/gnutls.h>
-
#include "daemon/engine.h"
#include "lib/generic/array.h"
#include "lib/generic/map.h"
void worker_iohandle_release(struct worker_ctx *worker, void *h);
-ssize_t worker_gnutls_push(gnutls_transport_ptr_t h, const void *buf, size_t len);
-
-ssize_t worker_gnutls_client_push(gnutls_transport_ptr_t h, const void *buf, size_t len);
-
/** Finalize given task */
int worker_task_finalize(struct qr_task *task, int state);