#include <libknot/errcode.h>
#include <contrib/ucw/lib.h>
#include <contrib/ucw/mempool.h>
+#include <assert.h>
#include "daemon/io.h"
#include "daemon/network.h"
#undef negotiate_bufsize
-static void *handle_alloc(uv_loop_t *loop, size_t size)
+static void session_clear(struct session *s)
{
- return malloc(size);
+ assert(s->is_subreq || s->tasks.len == 0);
+ array_clear(s->tasks);
+ memset(s, 0, sizeof(*s));
}
-static void handle_free(uv_handle_t *handle)
+void session_free(struct session *s)
{
- free(handle);
+ session_clear(s);
+ free(s);
+}
+
+struct session *session_new(void)
+{
+ return calloc(1, sizeof(struct session));
+}
+
+static struct session *session_borrow(struct worker_ctx *worker)
+{
+ struct session *s = NULL;
+ if (worker->pool_sessions.len > 0) {
+ s = array_tail(worker->pool_sessions);
+ array_pop(worker->pool_sessions);
+ kr_asan_unpoison(s, sizeof(*s));
+ } else {
+ s = session_new();
+ }
+ return s;
+}
+
+static void session_release(struct worker_ctx *worker, struct session *s)
+{
+ if (worker->pool_sessions.len < MP_FREELIST_SIZE) {
+ session_clear(s);
+ array_push(worker->pool_sessions, s);
+ kr_asan_poison(s, sizeof(*s));
+ } else {
+ session_free(s);
+ }
+}
+
+static uv_stream_t *handle_alloc(uv_loop_t *loop)
+{
+ uv_stream_t *handle = calloc(1, sizeof(*handle));
+ if (!handle) {
+ return NULL;
+ }
+
+ return handle;
}
static void handle_getbuf(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
* guaranteed to be unchanged only for the duration of
* udp_read() and tcp_read().
*/
+ struct session *session = handle->data;
uv_loop_t *loop = handle->loop;
struct worker_ctx *worker = loop->data;
buf->base = (char *)worker->wire_buf;
- /* Use recvmmsg() on master sockets if possible. */
- if (handle->data)
+ /* Limit TCP stream buffer size to 4K for granularity in batches of incoming queries. */
+ if (handle->type == UV_TCP) {
+ buf->len = MIN(suggested_size, 4096);
+ /* Regular buffer size for subrequests. */
+ } else if (session->is_subreq) {
buf->len = suggested_size;
- else
+ /* Use recvmmsg() on master sockets if possible. */
+ } else {
buf->len = sizeof(worker->wire_buf);
+ }
}
void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
struct worker_ctx *worker = loop->data;
if (nread <= 0) {
if (nread < 0) { /* Error response, notify resolver */
- worker_exec(worker, (uv_handle_t *)handle, NULL, addr);
+ worker_submit(worker, (uv_handle_t *)handle, NULL, addr);
} /* nread == 0 is for freeing buffers, we don't need to do this */
return;
}
knot_pkt_t *query = knot_pkt_new(buf->base, nread, &worker->pkt_pool);
if (query) {
query->max_size = KNOT_WIRE_MAX_PKTSIZE;
- worker_exec(worker, (uv_handle_t *)handle, query, addr);
+ worker_submit(worker, (uv_handle_t *)handle, query, addr);
}
mp_flush(worker->pkt_pool.ctx);
}
if (ret != 0) {
return ret;
}
- handle->data = NULL;
check_bufsize((uv_handle_t *)handle);
+ /* Handle is already created, just create context. */
+ handle->data = session_new();
+ assert(handle->data);
return io_start_read((uv_handle_t *)handle);
}
+static void tcp_timeout(uv_handle_t *timer)
+{
+ uv_handle_t *handle = timer->data;
+ uv_close(handle, io_free);
+}
+
+static void tcp_timeout_trigger(uv_timer_t *timer)
+{
+ uv_handle_t *handle = timer->data;
+ struct session *session = handle->data;
+ if (session->tasks.len > 0) {
+ uv_timer_again(timer);
+ } else {
+ uv_close((uv_handle_t *)timer, tcp_timeout);
+ }
+}
+
static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
{
uv_loop_t *loop = handle->loop;
+ struct session *s = handle->data;
struct worker_ctx *worker = loop->data;
-
- /* Check for originator connection close. */
- if (nread <= 0) {
- if (handle->data) {
- worker_exec(worker, (uv_handle_t *)handle, NULL, NULL);
- }
- if (!uv_is_closing((uv_handle_t *)handle)) {
- uv_close((uv_handle_t *)handle, handle_free);
- }
- return;
- }
-
+ /* TCP pipelining is rather complicated and requires cooperation from the worker
+ * so the whole message reassembly and demuxing logic is inside worker */
int ret = worker_process_tcp(worker, (uv_handle_t *)handle, (const uint8_t *)buf->base, nread);
- if (ret == 0) {
- /* Push - pull, stop reading from this handle until
- * the task is finished. Since the handle has no track of the
- * pending tasks, it might be freed before the task finishes
- * leading various errors. */
- uv_unref((uv_handle_t *)handle);
- io_stop_read((uv_handle_t *)handle);
+ if (ret < 0) {
+ worker_end_tcp(worker, (uv_handle_t *)handle);
+ /* Exceeded per-connection quota for outstanding requests
+ * stop reading from stream and close after last message is processed. */
+ if (!s->is_subreq && !uv_is_closing((uv_handle_t *)&s->timeout)) {
+ uv_timer_stop(&s->timeout);
+ if (s->tasks.len == 0) {
+ uv_close((uv_handle_t *)&s->timeout, tcp_timeout);
+ } else { /* If there are tasks running, defer until they finish. */
+ uv_timer_start(&s->timeout, tcp_timeout_trigger, 1, KR_CONN_RTT_MAX/2);
+ }
+ }
+ /* Connection spawned more than one request, reset its deadline for next query. */
+ } else if (ret > 0 && !s->is_subreq) {
+ uv_timer_again(&s->timeout);
}
mp_flush(worker->pkt_pool.ctx);
}
if (status != 0) {
return;
}
-
- uv_stream_t *client = handle_alloc(master->loop, sizeof(*client));
+ uv_stream_t *client = handle_alloc(master->loop);
if (!client) {
return;
}
memset(client, 0, sizeof(*client));
io_create(master->loop, (uv_handle_t *)client, SOCK_STREAM);
if (uv_accept(master, client) != 0) {
- handle_free((uv_handle_t *)client);
+ io_free((uv_handle_t *)client);
return;
}
+ /* Set deadlines for TCP connection and start reading.
+ * It will re-check every half of a request time limit if the connection
+ * is idle and should be terminated, this is an educated guess. */
+ struct session *session = client->data;
+ uv_timer_t *timer = &session->timeout;
+ uv_timer_init(master->loop, timer);
+ timer->data = client;
+ uv_timer_start(timer, tcp_timeout_trigger, KR_CONN_RTT_MAX/2, KR_CONN_RTT_MAX/2);
io_start_read((uv_handle_t *)client);
}
-int tcp_bind(uv_tcp_t *handle, struct sockaddr *addr)
+static int set_tcp_option(uv_tcp_t *handle, int option, int val)
{
- unsigned flags = UV_UDP_REUSEADDR;
+ uv_os_fd_t fd = 0;
+ if (uv_fileno((uv_handle_t *)handle, &fd) == 0) {
+ return setsockopt(fd, IPPROTO_TCP, option, &val, sizeof(val));
+ }
+ return 0; /* N/A */
+}
+
+static int _tcp_bind(uv_tcp_t *handle, struct sockaddr *addr, uv_connection_cb connection)
+{
+ unsigned flags = 0;
if (addr->sa_family == AF_INET6) {
- flags |= UV_UDP_IPV6ONLY;
+ flags |= UV_TCP_IPV6ONLY;
}
+
int ret = uv_tcp_bind(handle, addr, flags);
if (ret != 0) {
return ret;
}
- ret = uv_listen((uv_stream_t *)handle, 16, tcp_accept);
+ /* TCP_DEFER_ACCEPT delays accepting connections until there is readable data. */
+#ifdef TCP_DEFER_ACCEPT
+ if (set_tcp_option(handle, TCP_DEFER_ACCEPT, KR_CONN_RTT_MAX/1000) != 0) {
+ kr_log_info("[ io ] tcp_bind (defer_accept): %s\n", strerror(errno));
+ }
+#endif
+
+ ret = uv_listen((uv_stream_t *)handle, 16, connection);
if (ret != 0) {
return ret;
}
+ /* TCP_FASTOPEN enables 1 RTT connection resumptions. */
+#ifdef TCP_FASTOPEN
+# ifdef __linux__
+ (void) set_tcp_option(handle, TCP_FASTOPEN, 16); /* Accepts queue length hint */
+# else
+ (void) set_tcp_option(handle, TCP_FASTOPEN, 1); /* Accepts on/off */
+# endif
+#endif
+
handle->data = NULL;
return 0;
}
+int tcp_bind(uv_tcp_t *handle, struct sockaddr *addr)
+{
+ return _tcp_bind(handle, addr, tcp_accept);
+}
+
void io_create(uv_loop_t *loop, uv_handle_t *handle, int type)
{
if (type == SOCK_DGRAM) {
uv_tcp_init(loop, (uv_tcp_t *)handle);
uv_tcp_nodelay((uv_tcp_t *)handle, 1);
}
+
+ struct worker_ctx *worker = loop->data;
+ handle->data = session_borrow(worker);
+ assert(handle->data);
+}
+
+void io_deinit(uv_handle_t *handle)
+{
+ if (!handle) {
+ return;
+ }
+ uv_loop_t *loop = handle->loop;
+ if (loop && loop->data) {
+ struct worker_ctx *worker = loop->data;
+ session_release(worker, handle->data);
+ } else {
+ session_free(handle->data);
+ }
+ handle->data = NULL;
+}
+
+void io_free(uv_handle_t *handle)
+{
+ if (!handle) {
+ return;
+ }
+ io_deinit(handle);
+ free(handle);
}
int io_start_read(uv_handle_t *handle)
#include "daemon/engine.h"
#include "daemon/io.h"
-/* @internal IO request entry. */
-struct ioreq
+/* @internal Union of various libuv objects for freelist. */
+struct req
{
union {
+ /* Socket handles, these have session as their `handle->data` and own it. */
uv_udp_t udp;
uv_tcp_t tcp;
+ /* I/O events, these have only a reference to the task they're operating on. */
uv_udp_send_t send;
uv_write_t write;
uv_connect_t connect;
+ /* Timer events */
+ uv_timer_t timer;
} as;
};
-/** @internal Number of request within timeout window. */
-#define MAX_PENDING (KR_NSREP_MAXADDR + (KR_NSREP_MAXADDR / 2))
-
/** @internal Debugging facility. */
#ifdef DEBUG
#define DEBUG_MSG(fmt...) printf("[daem] " fmt)
#define DEBUG_MSG(fmt...)
#endif
-/** @internal Query resolution task. */
-struct qr_task
-{
- struct kr_request req;
- struct worker_ctx *worker;
- knot_pkt_t *pktbuf;
- array_t(struct qr_task *) waiting;
- uv_handle_t *pending[MAX_PENDING];
- uint16_t pending_count;
- uint16_t addrlist_count;
- uint16_t addrlist_turn;
- struct sockaddr *addrlist;
- uv_timer_t retry, timeout;
- worker_cb_t on_complete;
- void *baton;
- struct {
- union {
- struct sockaddr_in ip4;
- struct sockaddr_in6 ip6;
- } addr;
- uv_handle_t *handle;
- } source;
- uint16_t iter_count;
- uint16_t refs;
- uint16_t bytes_remaining;
- bool finished;
- bool leading;
-};
-
/* Convenience macros */
#define qr_task_ref(task) \
do { ++(task)->refs; } while(0)
(!uv_is_closing((checked)) || (task)->source.handle == (checked))
/* Forward decls */
+static void qr_task_free(struct qr_task *task);
static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *packet);
/** @internal Get singleton worker. */
return uv_default_loop()->data;
}
-static inline struct ioreq *ioreq_take(struct worker_ctx *worker)
+static inline struct req *req_borrow(struct worker_ctx *worker)
{
- struct ioreq *req = NULL;
- if (worker->ioreqs.len > 0) {
- req = array_tail(worker->ioreqs);
- array_pop(worker->ioreqs);
+ struct req *req = NULL;
+ if (worker->pool_ioreq.len > 0) {
+ req = array_tail(worker->pool_ioreq);
+ array_pop(worker->pool_ioreq);
+ kr_asan_unpoison(req, sizeof(*req));
} else {
req = malloc(sizeof(*req));
}
- kr_asan_unpoison(req, sizeof(*req));
return req;
}
-static inline void ioreq_release(struct worker_ctx *worker, struct ioreq *req)
+static inline void req_release(struct worker_ctx *worker, struct req *req)
{
- kr_asan_poison(req, sizeof(*req));
- if (!req || worker->ioreqs.len < 4 * MP_FREELIST_SIZE) {
- array_push(worker->ioreqs, req);
+ if (!req || worker->pool_ioreq.len < 4 * MP_FREELIST_SIZE) {
+ array_push(worker->pool_ioreq, req);
+ kr_asan_poison(req, sizeof(*req));
} else {
free(req);
}
}
+/*! @internal Create a UDP/TCP handle */
static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype)
{
if (task->pending_count >= MAX_PENDING) {
return NULL;
}
/* Create connection for iterative query */
- uv_handle_t *req = (uv_handle_t *)ioreq_take(task->worker);
- if (!req) {
+ uv_handle_t *handle = (uv_handle_t *)req_borrow(task->worker);
+ if (!handle) {
+ return NULL;
+ }
+ io_create(task->worker->loop, handle, socktype);
+ /* Set current handle as a subrequest type. */
+ struct session *session = handle->data;
+ session->is_subreq = true;
+ int ret = array_push(session->tasks, task);
+ if (ret != 0) {
+ io_deinit(handle);
+ req_release(task->worker, (struct req *)handle);
return NULL;
}
- io_create(task->worker->loop, req, socktype);
- req->data = task;
+ qr_task_ref(task);
/* Connect or issue query datagram */
- task->pending[task->pending_count] = req;
+ task->pending[task->pending_count] = handle;
task->pending_count += 1;
- return req;
+ return handle;
}
static void ioreq_on_close(uv_handle_t *handle)
{
struct worker_ctx *worker = get_worker();
- ioreq_release(worker, (struct ioreq *)handle);
+ /* Handle-type events own a session, must close it. */
+ struct session *session = handle->data;
+ struct qr_task *task = session->tasks.at[0];
+ io_deinit(handle);
+ qr_task_unref(task);
+ req_release(worker, (struct req *)handle);
}
static void ioreq_kill(uv_handle_t *req)
{
assert(req);
if (!uv_is_closing(req)) {
- io_stop_read(req);
uv_close(req, ioreq_on_close);
}
}
#endif
/** @endcond */
-static inline struct mempool *pool_take(struct worker_ctx *worker)
+static inline struct mempool *pool_borrow(struct worker_ctx *worker)
{
/* Recycle available mempool if possible */
struct mempool *mp = NULL;
- if (worker->pools.len > 0) {
- mp = array_tail(worker->pools);
- array_pop(worker->pools);
+ if (worker->pool_mp.len > 0) {
+ mp = array_tail(worker->pool_mp);
+ array_pop(worker->pool_mp);
+ mp_poison(mp, 0);
} else { /* No mempool on the freelist, create new one */
mp = mp_new (4 * CPU_PAGE_SIZE);
}
- mp_poison(mp, 0);
return mp;
}
static inline void pool_release(struct worker_ctx *worker, struct mempool *mp)
{
/* Return mempool to ring or free it if it's full */
- if (worker->pools.len < MP_FREELIST_SIZE) {
+ if (worker->pool_mp.len < MP_FREELIST_SIZE) {
mp_flush(mp);
- array_push(worker->pools, mp);
+ array_push(worker->pool_mp, mp);
mp_poison(mp, 1);
} else {
mp_delete(mp);
}
}
-static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr *addr)
+static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *handle, const struct sockaddr *addr)
{
/* How much can client handle? */
struct engine *engine = worker->engine;
- size_t answer_max = KNOT_WIRE_MIN_PKTSIZE;
size_t pktbuf_max = KR_EDNS_PAYLOAD;
if (engine->resolver.opt_rr) {
pktbuf_max = MAX(knot_edns_get_payload(engine->resolver.opt_rr), pktbuf_max);
}
- if (!addr && handle) { /* TCP */
- answer_max = KNOT_WIRE_MAX_PKTSIZE;
- pktbuf_max = KNOT_WIRE_MAX_PKTSIZE;
- } else if (knot_pkt_has_edns(query)) { /* EDNS */
- answer_max = MAX(knot_edns_get_payload(query->opt_rr), KNOT_WIRE_MIN_PKTSIZE);
- }
/* Recycle available mempool if possible */
knot_mm_t pool = {
- .ctx = pool_take(worker),
+ .ctx = pool_borrow(worker),
.alloc = (knot_mm_alloc_t) mp_alloc
};
/* Create packet buffers for answer and subrequests */
task->req.pool = pool;
knot_pkt_t *pktbuf = knot_pkt_new(NULL, pktbuf_max, &task->req.pool);
- knot_pkt_t *answer = knot_pkt_new(NULL, answer_max, &task->req.pool);
- if (!pktbuf || !answer) {
+ if (!pktbuf) {
mp_delete(pool.ctx);
return NULL;
}
- task->req.answer = answer;
+ pktbuf->size = 0;
+ task->req.answer = NULL;
task->pktbuf = pktbuf;
array_init(task->waiting);
task->addrlist = NULL;
task->pending_count = 0;
task->bytes_remaining = 0;
task->iter_count = 0;
+ task->timeouts = 0;
task->refs = 1;
task->finished = false;
task->leading = false;
task->worker = worker;
+ task->session = NULL;
task->source.handle = handle;
- uv_timer_init(worker->loop, &task->retry);
- uv_timer_init(worker->loop, &task->timeout);
- task->retry.data = task;
- task->timeout.data = task;
+ task->timeout = NULL;
task->on_complete = NULL;
task->req.qsource.key = NULL;
task->req.qsource.addr = NULL;
} else {
task->source.addr.ip4.sin_family = AF_UNSPEC;
}
- /* Remember query source TSIG key */
- if (query->tsig_rr) {
- task->req.qsource.key = knot_rrset_copy(query->tsig_rr, &task->req.pool);
- }
-
- /* Start resolution */
- kr_resolve_begin(&task->req, &engine->resolver, answer);
worker->stats.concurrent += 1;
- worker->stats.queries += 1;
- /* Throttle outbound queries only when high pressure */
- if (worker->stats.concurrent < QUERY_RATE_THRESHOLD) {
- task->req.options |= QUERY_NO_THROTTLE;
- }
return task;
}
/* This is called when the task refcount is zero, free memory. */
static void qr_task_free(struct qr_task *task)
{
- /* Return mempool to ring or free it if it's full */
+ struct session *session = task->session;
+ if (session) {
+ /* Walk the session task list and remove itself. */
+ for (size_t i = 0; i < session->tasks.len; ++i) {
+ if (session->tasks.at[i] == task) {
+ array_del(session->tasks, i);
+ break;
+ }
+ }
+ /* Start reading again if the session is throttled and
+ * the number of outstanding requests is below watermark. */
+ uv_handle_t *handle = task->source.handle;
+ if (handle && session->tasks.len < task->worker->tcp_pipeline_max/2) {
+ if (!uv_is_closing(handle) && session->throttled) {
+ io_start_read(handle);
+ session->throttled = false;
+ }
+ }
+ }
+ /* Update stats */
struct worker_ctx *worker = task->worker;
+ worker->stats.concurrent -= 1;
+ /* Return mempool to ring or free it if it's full */
pool_release(worker, task->req.pool.ctx);
/* @note The 'task' is invalidated from now on. */
/* Decommit memory every once in a while */
}
}
-/* This is called when retry timer closes */
-static void retransmit_close(uv_handle_t *handle)
+static int qr_task_start(struct qr_task *task, knot_pkt_t *query)
{
- struct qr_task *task = handle->data;
- qr_task_unref(task);
+ assert(task && query);
+ size_t answer_max = KNOT_WIRE_MIN_PKTSIZE;
+ if (!task->source.handle || task->source.handle->type == UV_TCP) {
+ answer_max = KNOT_WIRE_MAX_PKTSIZE;
+ } else if (knot_pkt_has_edns(query)) { /* EDNS */
+ answer_max = MAX(knot_edns_get_payload(query->opt_rr), KNOT_WIRE_MIN_PKTSIZE);
+ }
+
+ knot_pkt_t *answer = knot_pkt_new(NULL, answer_max, &task->req.pool);
+ if (!answer) {
+ return kr_error(ENOMEM);
+ }
+ task->req.answer = answer;
+
+ /* Remember query source TSIG key */
+ if (query->tsig_rr) {
+ task->req.qsource.key = knot_rrset_copy(query->tsig_rr, &task->req.pool);
+ }
+
+ /* Start resolution */
+ struct worker_ctx *worker = task->worker;
+ struct engine *engine = worker->engine;
+ kr_resolve_begin(&task->req, &engine->resolver, answer);
+ worker->stats.queries += 1;
+ /* Throttle outbound queries only when high pressure */
+ if (worker->stats.concurrent < QUERY_RATE_THRESHOLD) {
+ task->req.options |= QUERY_NO_THROTTLE;
+ }
+ return 0;
}
-/* This is called when task completes and timeout timer is closed. */
-static void qr_task_complete(uv_handle_t *handle)
+/*@ Register qr_task within session. */
+static int qr_task_register(struct qr_task *task, struct session *session)
+{
+ int ret = array_reserve(session->tasks, session->tasks.len + 1);
+ if (ret != 0) {
+ return kr_error(ENOMEM);
+ }
+ array_push(session->tasks, task);
+ task->session = session;
+ /* Soft-limit on parallel queries, there is no "slow down" RCODE
+ * that we could use to signalize to client, but we can stop reading,
+ * an in effect shrink TCP window size. To get more precise throttling,
+ * we would need to copy remainder of the unread buffer and reassemble
+ * when resuming reading. This is NYI. */
+ if (session->tasks.len >= task->worker->tcp_pipeline_max) {
+ uv_handle_t *handle = task->source.handle;
+ if (handle && !session->throttled && !uv_is_closing(handle)) {
+ io_stop_read(handle);
+ session->throttled = true;
+ }
+ }
+ return 0;
+}
+
+static void qr_task_complete(struct qr_task *task)
{
- struct qr_task *task = handle->data;
struct worker_ctx *worker = task->worker;
/* Kill pending I/O requests */
ioreq_killall(task);
if (task->on_complete) {
task->on_complete(worker, &task->req, task->baton);
}
- /* Return handle to the event loop in case
- * it was exclusively taken by this task. */
- if (task->source.handle && !uv_has_ref(task->source.handle)) {
- uv_ref(task->source.handle);
- io_start_read(task->source.handle);
- }
- /* Release task */
+ /* Release primary reference to task. */
qr_task_unref(task);
- /* Update stats */
- worker->stats.concurrent -= 1;
-}
-
-/* This is called when I/O timeouts */
-static void on_timeout(uv_timer_t *req)
-{
- struct qr_task *task = req->data;
- uv_handle_t *handle = (uv_handle_t *)req;
-#ifdef DEBUG
- char qname_str[KNOT_DNAME_MAXLEN] = {'\0'}, type_str[16] = {'\0'};
- knot_dname_to_str(qname_str, knot_pkt_qname(task->pktbuf), sizeof(qname_str));
- knot_rrtype_to_string(knot_pkt_qtype(task->pktbuf), type_str, sizeof(type_str));
- DEBUG_MSG("ioreq timeout %s %s %p\n", qname_str, type_str, req);
-#endif
- /* Ignore if this timeout is being terminated. */
- if (uv_is_closing(handle)) {
- return;
- }
- /* Penalize all tried nameservers with a timeout. */
- struct worker_ctx *worker = task->worker;
- if (task->leading && task->pending_count > 0) {
- struct kr_query *qry = array_tail(task->req.rplan.pending);
- struct sockaddr_in6 *addrlist = (struct sockaddr_in6 *)task->addrlist;
- for (uint16_t i = 0; i < MIN(task->pending_count, task->addrlist_count); ++i) {
- struct sockaddr *choice = (struct sockaddr *)(&addrlist[i]);
- WITH_DEBUG {
- char addr_str[INET6_ADDRSTRLEN];
- inet_ntop(choice->sa_family, kr_inaddr(choice), addr_str, sizeof(addr_str));
- QRDEBUG(qry, "wrkr", "=> server: '%s' flagged as 'bad'\n", addr_str);
- }
- kr_nsrep_update_rtt(&qry->ns, choice, KR_NS_TIMEOUT, worker->engine->resolver.cache_rtt);
- }
- }
- /* Interrupt current pending request. */
- worker->stats.timeout += 1;
- qr_task_step(task, NULL, NULL);
}
/* This is called when we send subrequest / answer */
if (!task->finished) {
if (status == 0 && handle) {
io_start_read(handle); /* Start reading new query */
- } else {
- DEBUG_MSG("ioreq send_done %p => %d, %s\n", handle, status, uv_strerror(status));
}
} else {
- /* Close retry timer (borrows task) */
- qr_task_ref(task);
- uv_close((uv_handle_t *)&task->retry, retransmit_close);
- /* Close timeout timer (finishes task) */
- uv_close((uv_handle_t *)&task->timeout, qr_task_complete);
+ assert(task->timeout == NULL);
+ qr_task_complete(task);
}
return status;
}
qr_task_on_send(task, (uv_handle_t *)req->handle, status);
}
qr_task_unref(task);
- ioreq_release(worker, (struct ioreq *)req);
+ req_release(worker, (struct req *)req);
}
static void on_write(uv_write_t *req, int status)
qr_task_on_send(task, (uv_handle_t *)req->handle, status);
}
qr_task_unref(task);
- ioreq_release(worker, (struct ioreq *)req);
+ req_release(worker, (struct req *)req);
}
static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockaddr *addr, knot_pkt_t *pkt)
if (!handle) {
return qr_task_on_send(task, handle, kr_error(EIO));
}
- struct ioreq *send_req = ioreq_take(task->worker);
+ struct req *send_req = req_borrow(task->worker);
if (!send_req) {
return qr_task_on_send(task, handle, kr_error(ENOMEM));
}
if (ret == 0) {
qr_task_ref(task); /* Pending ioreq on current task */
} else {
- DEBUG_MSG("ioreq send_start %p => %d, %s\n", send_req, ret, uv_strerror(ret));
- ioreq_release(task->worker, send_req);
+ req_release(task->worker, send_req);
}
/* Update statistics */
if (status == 0) {
qr_task_send(task, (uv_handle_t *)handle, NULL, task->pktbuf);
} else {
- DEBUG_MSG("ioreq conn_done %p => %d, %s\n", req, status, uv_strerror(status));
qr_task_step(task, task->addrlist, NULL);
}
}
qr_task_unref(task);
- ioreq_release(worker, (struct ioreq *)req);
+ req_release(worker, (struct req *)req);
+}
+
+static void on_timer_close(uv_handle_t *handle)
+{
+ struct qr_task *task = handle->data;
+ req_release(task->worker, (struct req *)handle);
+ qr_task_unref(task);
+}
+
+/* This is called when I/O timeouts */
+static void on_timeout(uv_timer_t *req)
+{
+ struct qr_task *task = req->data;
+
+ /* Penalize all tried nameservers with a timeout. */
+ struct worker_ctx *worker = task->worker;
+ if (task->leading && task->pending_count > 0) {
+ struct kr_query *qry = array_tail(task->req.rplan.pending);
+ struct sockaddr_in6 *addrlist = (struct sockaddr_in6 *)task->addrlist;
+ for (uint16_t i = 0; i < MIN(task->pending_count, task->addrlist_count); ++i) {
+ struct sockaddr *choice = (struct sockaddr *)(&addrlist[i]);
+ WITH_DEBUG {
+ char addr_str[INET6_ADDRSTRLEN];
+ inet_ntop(choice->sa_family, kr_inaddr(choice), addr_str, sizeof(addr_str));
+ QRDEBUG(qry, "wrkr", "=> server: '%s' flagged as 'bad'\n", addr_str);
+ }
+ kr_nsrep_update_rtt(&qry->ns, choice, KR_NS_TIMEOUT, worker->engine->resolver.cache_rtt);
+ }
+ }
+ /* Release timer handle */
+ task->timeout = NULL;
+ req_release(worker, (struct req *)req);
+ /* Interrupt current pending request. */
+ task->timeouts += 1;
+ worker->stats.timeout += 1;
+ qr_task_step(task, NULL, NULL);
+ qr_task_unref(task); /* Return borrowed task */
}
static bool retransmit(struct qr_task *task)
{
- if (task && task->addrlist) {
+ if (task && task->addrlist && task->addrlist_count > 0) {
uv_handle_t *subreq = ioreq_spawn(task, SOCK_DGRAM);
if (subreq) { /* Create connection for iterative query */
struct sockaddr_in6 *choice = &((struct sockaddr_in6 *)task->addrlist)[task->addrlist_turn];
static void on_retransmit(uv_timer_t *req)
{
- if (uv_is_closing((uv_handle_t *)req))
- return;
+ uv_timer_stop(req);
+ struct qr_task *task = req->data;
if (!retransmit(req->data)) {
- uv_timer_stop(req); /* Not possible to spawn request, stop trying */
+ /* Not possible to spawn request, start timeout timer with remaining deadline. */
+ uint64_t timeout = KR_CONN_RTT_MAX - task->pending_count * KR_CONN_RETRY;
+ uv_timer_start(req, on_timeout, timeout, 0);
+ } else {
+ uv_timer_start(req, on_retransmit, KR_CONN_RETRY, 0);
}
}
+static int timer_start(struct qr_task *task, uv_timer_cb cb, uint64_t timeout, uint64_t repeat)
+{
+ assert(task->timeout == NULL);
+ struct worker_ctx *worker = task->worker;
+ uv_timer_t *timer = (uv_timer_t *)req_borrow(worker);
+ if (!timer) {
+ return kr_error(ENOMEM);
+ }
+ uv_timer_init(worker->loop, timer);
+ int ret = uv_timer_start(timer, cb, timeout, repeat);
+ if (ret != 0) {
+ uv_timer_stop(timer);
+ req_release(worker, (struct req *)timer);
+ return kr_error(ENOMEM);
+ }
+ timer->data = task;
+ qr_task_ref(task);
+ task->timeout = timer;
+ return 0;
+}
+
/** @internal Get key from current outstanding subrequest. */
static int subreq_key(char *dst, struct qr_task *task)
{
static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *pkt)
{
- /* Close pending I/O requests */
- if (uv_is_active((uv_handle_t *)&task->retry))
- uv_timer_stop(&task->retry);
- if (uv_is_active((uv_handle_t *)&task->timeout))
- uv_timer_stop(&task->timeout);
+ /* Close pending timer */
+ if (task->timeout) {
+ /* Timer was running so it holds reference to task, make sure the timer event
+ * never fires and release the reference on timer close instead. */
+ uv_timer_stop(task->timeout);
+ uv_close((uv_handle_t *)task->timeout, on_timer_close);
+ task->timeout = NULL;
+ }
ioreq_killall(task);
/* Clear from outstanding table. */
if (!task->leading)
static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *packet)
{
/* No more steps after we're finished. */
- if (task->finished) {
+ if (!task || task->finished) {
return kr_error(ESTALE);
}
/* Close pending I/O requests */
int state = kr_resolve_consume(&task->req, packet_source, packet);
while (state == KNOT_STATE_PRODUCE) {
state = kr_resolve_produce(&task->req, &task->addrlist, &sock_type, task->pktbuf);
- if (unlikely(++task->iter_count > KR_ITER_LIMIT)) {
- DEBUG_MSG("task iter_limit %p\n", task);
+ if (unlikely(++task->iter_count > KR_ITER_LIMIT || task->timeouts >= KR_TIMEOUT_LIMIT)) {
return qr_task_finalize(task, KNOT_STATE_FAIL);
}
}
}
/* Start fast retransmit with UDP, otherwise connect. */
+ int ret = 0;
if (sock_type == SOCK_DGRAM) {
/* If such subrequest is outstanding, enqueue to it. */
if (subreq_enqueue(task)) {
}
/* Start transmitting */
if (retransmit(task)) {
- uv_timer_start(&task->retry, on_retransmit, KR_CONN_RETRY, KR_CONN_RETRY);
+ ret = timer_start(task, on_retransmit, KR_CONN_RETRY, 0);
} else {
return qr_task_step(task, NULL, NULL);
}
*/
subreq_lead(task);
} else {
- struct ioreq *conn = ioreq_take(task->worker);
+ uv_connect_t *conn = (uv_connect_t *)req_borrow(task->worker);
if (!conn) {
return qr_task_step(task, NULL, NULL);
}
uv_handle_t *client = ioreq_spawn(task, sock_type);
if (!client) {
- ioreq_release(task->worker, conn);
+ req_release(task->worker, (struct req *)conn);
return qr_task_step(task, NULL, NULL);
}
- conn->as.connect.data = task;
- if (uv_tcp_connect(&conn->as.connect, (uv_tcp_t *)client, task->addrlist, on_connect) != 0) {
- ioreq_release(task->worker, conn);
+ conn->data = task;
+ if (uv_tcp_connect(conn, (uv_tcp_t *)client, task->addrlist, on_connect) != 0) {
+ req_release(task->worker, (struct req *)conn);
return qr_task_step(task, NULL, NULL);
}
- /* Connect request borrows task */
- qr_task_ref(task);
+ qr_task_ref(task); /* Connect request borrows task */
+ ret = timer_start(task, on_timeout, KR_CONN_RTT_MAX, 0);
}
/* Start next step with timeout, fatal if can't start a timer. */
- int ret = uv_timer_start(&task->timeout, on_timeout, KR_CONN_RTT_MAX, 0);
if (ret != 0) {
subreq_finalize(task, packet_source, packet);
return qr_task_finalize(task, KNOT_STATE_FAIL);
}
-
- return ret;
+ return 0;
}
static int parse_packet(knot_pkt_t *query)
{
- if (!query)
+ if (!query){
return kr_error(EINVAL);
+ }
/* Parse query packet. */
int ret = knot_pkt_parse(query, 0);
return kr_ok();
}
-int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr* addr)
+int worker_submit(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *msg, const struct sockaddr* addr)
{
if (!worker || !handle) {
return kr_error(EINVAL);
}
+ struct session *session = handle->data;
+ assert(session);
+
/* Parse packet */
- int ret = parse_packet(query);
+ int ret = parse_packet(msg);
- /* Start new task on master sockets, or resume existing */
- struct qr_task *task = handle->data;
- bool is_master_socket = (!task);
- if (is_master_socket) {
+ /* Start new task on listening sockets, or resume if this is subrequest */
+ struct qr_task *task = NULL;
+ if (!session->is_subreq) {
/* Ignore badly formed queries or responses. */
- if (!query || ret != 0 || knot_wire_get_qr(query->wire)) {
- DEBUG_MSG("task bad_query %p => %d, %s\n", task, ret, kr_strerror(ret));
- worker->stats.dropped += 1;
+ if (!msg || ret != 0 || knot_wire_get_qr(msg->wire)) {
+ if (msg) worker->stats.dropped += 1;
return kr_error(EINVAL); /* Ignore. */
}
- task = qr_task_create(worker, handle, query, addr);
+ task = qr_task_create(worker, handle, addr);
if (!task) {
return kr_error(ENOMEM);
}
+ ret = qr_task_start(task, msg);
+ if (ret != 0) {
+ qr_task_free(task);
+ return kr_error(ENOMEM);
+ }
+ } else {
+ task = session->tasks.len > 0 ? array_tail(session->tasks) : NULL;
}
- /* Consume input and produce next query */
- return qr_task_step(task, addr, query);
+ /* Consume input and produce next message */
+ return qr_task_step(task, addr, msg);
}
/* Return DNS/TCP message size. */
-static int msg_size(const uint8_t *msg, size_t len)
+static int msg_size(const uint8_t *msg)
{
- if (len < 2) {
- return kr_error(EMSGSIZE);
- }
return wire_read_u16(msg);
}
-int worker_process_tcp(struct worker_ctx *worker, uv_handle_t *handle, const uint8_t *msg, size_t len)
+/* If buffering, close last task as it isn't live yet. */
+static void discard_buffered(struct session *session)
+{
+ if (session->buffering) {
+ qr_task_free(session->buffering);
+ session->buffering = NULL;
+ }
+}
+
+int worker_end_tcp(struct worker_ctx *worker, uv_handle_t *handle)
{
- if (!worker || !handle || !msg) {
+ if (!worker || !handle) {
return kr_error(EINVAL);
}
+ /* If this is subrequest, notify parent task with empty input
+ * because in this case session doesn't own tasks, it has just
+ * borrowed the task from parent session. */
+ struct session *session = handle->data;
+ if (session->is_subreq) {
+ worker_submit(worker, (uv_handle_t *)handle, NULL, NULL);
+ } else {
+ discard_buffered(session);
+ }
+ return 0;
+}
- int nbytes = msg_size(msg, len);
- struct qr_task *task = handle->data;
- const bool start_assembly = (task && task->bytes_remaining == 0);
+int worker_process_tcp(struct worker_ctx *worker, uv_handle_t *handle, const uint8_t *msg, ssize_t len)
+{
+ if (!worker || !handle) {
+ return kr_error(EINVAL);
+ }
+ /* Connection error or forced disconnect */
+ struct session *session = handle->data;
+ if (len <= 0 || !msg) {
+ /* If we have pending tasks, we must dissociate them from the
+ * connection so they don't try to access closed and freed handle */
+ for (size_t i = 0; i < session->tasks.len; ++i) {
+ struct qr_task *task = session->tasks.at[i];
+ task->session = NULL;
+ task->source.handle = NULL;
+ }
+ session->tasks.len = 0;
+ return kr_error(ECONNRESET);
+ }
+
+ int submitted = 0;
+ ssize_t nbytes = 0;
+ struct qr_task *task = session->buffering;
- /* Message is a query (we have no context to buffer it) or complete. */
- if (!task || (start_assembly && nbytes == len - 2)) {
- if (nbytes <= 0) {
- return worker_exec(worker, (uv_handle_t *)handle, NULL, NULL);
+ /* If this is a new query, create a new task that we can use
+ * to buffer incoming message until it's complete. */
+ if (!session->is_subreq) {
+ if (!task) {
+ task = qr_task_create(worker, handle, NULL);
+ if (!task) {
+ return kr_error(ENOMEM);
+ }
+ session->buffering = task;
}
- knot_pkt_t *pkt_nocopy = knot_pkt_new((void *)(msg + 2), nbytes, &worker->pkt_pool);
- return worker_exec(worker, handle, pkt_nocopy, NULL);
+ } else {
+ assert(session->tasks.len > 0);
+ task = array_tail(session->tasks);
}
- /* Starting a new message assembly */
+ /* At this point session must have either created new task or it's already assigned. */
+ assert(task);
+ assert(len > 0);
+ /* Start reading DNS/TCP message length */
knot_pkt_t *pkt_buf = task->pktbuf;
- if (start_assembly) {
- if (nbytes <= 0) {
- return worker_exec(worker, (uv_handle_t *)handle, NULL, NULL);
- }
+ if (task->bytes_remaining == 0 && pkt_buf->size == 0) {
knot_pkt_clear(pkt_buf);
+ /* Read only one byte as TCP fragment may end at a 1B boundary
+ * which would lead to OOB read or improper reassembly length. */
+ pkt_buf->size = 1;
+ pkt_buf->wire[0] = msg[0];
+ len -= 1;
+ msg += 1;
+ if (len == 0) {
+ return 0;
+ }
+ }
+ /* Finish reading DNS/TCP message length. */
+ if (task->bytes_remaining == 0 && pkt_buf->size == 1) {
+ pkt_buf->wire[1] = msg[0];
+ nbytes = msg_size(pkt_buf->wire);
+ len -= 1;
+ msg += 1;
+ /* Cut off fragment length and start reading DNS message. */
pkt_buf->size = 0;
- /* Cut off message length */
task->bytes_remaining = nbytes;
- len -= 2;
- msg += 2;
}
/* Message is too long, can't process it. */
- if (len > pkt_buf->max_size - pkt_buf->size) {
+ ssize_t to_read = MIN(len, task->bytes_remaining);
+ if (to_read > (ssize_t)(pkt_buf->max_size - pkt_buf->size)) {
+ pkt_buf->size = 0;
task->bytes_remaining = 0;
- return worker_exec(worker, handle, NULL, NULL);
+ return kr_error(EMSGSIZE);
}
/* Buffer message and check if it's complete */
- memcpy(pkt_buf->wire + pkt_buf->size, msg, len);
- pkt_buf->size += len;
- if (len >= task->bytes_remaining) {
+ memcpy(pkt_buf->wire + pkt_buf->size, msg, to_read);
+ pkt_buf->size += to_read;
+ if (to_read >= task->bytes_remaining) {
task->bytes_remaining = 0;
- return worker_exec(worker, handle, pkt_buf, NULL);
+ /* Parse the packet and start resolving complete query */
+ int ret = parse_packet(pkt_buf);
+ if (ret == 0 && !session->is_subreq) {
+ ret = qr_task_start(task, pkt_buf);
+ if (ret != 0) {
+ return ret;
+ }
+ ret = qr_task_register(task, session);
+ if (ret != 0) {
+ return ret;
+ }
+ /* Task is now registered in session, clear temporary. */
+ session->buffering = NULL;
+ submitted += 1;
+ }
+ /* Start only new queries, not subrequests that are already pending */
+ if (ret == 0) {
+ ret = qr_task_step(task, NULL, pkt_buf);
+ }
+ /* Process next message part in the stream if no error so far */
+ if (ret != 0) {
+ return ret;
+ }
+ if (len - to_read > 0 && !session->is_subreq) {
+ ret = worker_process_tcp(worker, handle, msg + to_read, len - to_read);
+ if (ret < 0) {
+ return ret;
+ }
+ submitted += ret;
+ }
+ } else {
+ task->bytes_remaining -= to_read;
}
- /* Return number of bytes remaining to receive. */
- task->bytes_remaining -= len;
- return task->bytes_remaining;
+ return submitted;
}
int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query, unsigned options, worker_cb_t on_complete, void *baton)
}
/* Create task */
- struct qr_task *task = qr_task_create(worker, NULL, query, NULL);
+ struct qr_task *task = qr_task_create(worker, NULL, NULL);
if (!task) {
return kr_error(ENOMEM);
}
task->baton = baton;
task->on_complete = on_complete;
task->req.options |= options;
+ /* Start task */
+ int ret = qr_task_start(task, query);
+ if (ret != 0) {
+ qr_task_unref(task);
+ return ret;
+ }
return qr_task_step(task, NULL, query);
}
int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen)
{
- array_init(worker->pools);
- array_init(worker->ioreqs);
- if (array_reserve(worker->pools, ring_maxlen) || array_reserve(worker->ioreqs, ring_maxlen))
+ array_init(worker->pool_mp);
+ array_init(worker->pool_ioreq);
+ array_init(worker->pool_sessions);
+ if (array_reserve(worker->pool_mp, ring_maxlen) ||
+ array_reserve(worker->pool_ioreq, ring_maxlen) ||
+ array_reserve(worker->pool_sessions, ring_maxlen))
return kr_error(ENOMEM);
memset(&worker->pkt_pool, 0, sizeof(worker->pkt_pool));
worker->pkt_pool.ctx = mp_new (4 * sizeof(knot_pkt_t));
worker->pkt_pool.alloc = (knot_mm_alloc_t) mp_alloc;
worker->outstanding = map_make();
+ worker->tcp_pipeline_max = MAX_PIPELINED;
return kr_ok();
}
void worker_reclaim(struct worker_ctx *worker)
{
- reclaim_freelist(worker->pools, struct mempool, mp_delete);
- reclaim_freelist(worker->ioreqs, struct ioreq, free);
+ reclaim_freelist(worker->pool_mp, struct mempool, mp_delete);
+ reclaim_freelist(worker->pool_ioreq, struct req, free);
+ reclaim_freelist(worker->pool_sessions, struct session, session_free);
mp_delete(worker->pkt_pool.ctx);
worker->pkt_pool.ctx = NULL;
map_clear(&worker->outstanding);