#include <assert.h>
#include <sys/types.h>
#include <unistd.h>
+#include <gnutls/gnutls.h>
#include "lib/utils.h"
#include "lib/layer.h"
#include "daemon/worker.h"
#define VERBOSE_MSG(qry, fmt...) QRVERBOSE(qry, "wrkr", fmt)
+/** Client request state. */
+struct request_ctx
+{
+ struct kr_request req;
+ struct {
+ union inaddr addr;
+ union inaddr dst_addr;
+ /* uv_handle_t *handle; */
+
+ /** NULL if the request didn't come over network. */
+ struct session *session;
+ } source;
+ struct worker_ctx *worker;
+ qr_tasklist_t tasks;
+};
+
+/** Query resolution task. */
+struct qr_task
+{
+ struct request_ctx *ctx;
+ knot_pkt_t *pktbuf;
+ qr_tasklist_t waiting;
+ uv_handle_t *pending[MAX_PENDING];
+ uint16_t pending_count;
+ uint16_t addrlist_count;
+ uint16_t addrlist_turn;
+ uint16_t timeouts;
+ uint16_t iter_count;
+ uint16_t bytes_remaining;
+ struct sockaddr *addrlist;
+ worker_cb_t on_complete;
+ void *baton;
+ uint32_t refs;
+ bool finished : 1;
+ bool leading : 1;
+};
+
+
+int32_t tcp_connected = 0;
+int32_t tcp_waiting = 0;
+
/* @internal Union of various libuv objects for freelist. */
struct req
{
#define qr_task_unref(task) \
do { if (--(task)->refs == 0) { qr_task_free(task); } } while (0)
#define qr_valid_handle(task, checked) \
- (!uv_is_closing((checked)) || (task)->source.handle == (checked))
+ (!uv_is_closing((checked)) || (task)->ctx->source.session->handle == (checked))
+
+/** @internal get key for tcp session
+ * @note kr_straddr() return pointer to static string
+ */
+#define tcpsess_key(addr) kr_straddr(addr)
/* Forward decls */
static void qr_task_free(struct qr_task *task);
-static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *packet);
+static int qr_task_step(struct qr_task *task,
+ const struct sockaddr *packet_source,
+ knot_pkt_t *packet);
+static int qr_task_send(struct qr_task *task, uv_handle_t *handle,
+ struct sockaddr *addr, knot_pkt_t *pkt);
+static int qr_task_finalize(struct qr_task *task, int state);
+static void qr_task_complete(struct qr_task *task);
+static int worker_add_tcp_connected(struct worker_ctx *worker,
+ const struct sockaddr *addr,
+ struct session *session);
+static int worker_del_tcp_connected(struct worker_ctx *worker,
+ const struct sockaddr *addr);
+static struct session* worker_find_tcp_connected(struct worker_ctx *worker,
+ const struct sockaddr *srv);
+static int worker_add_tcp_waiting(struct worker_ctx *worker,
+ const struct sockaddr *addr,
+ struct session *session);
+static int worker_del_tcp_waiting(struct worker_ctx *worker,
+ const struct sockaddr *addr);
+static struct session* worker_find_tcp_waiting(struct worker_ctx *worker,
+ const struct sockaddr *srv);
+static int session_add_waiting(struct session *session, struct qr_task *task);
+static int session_del_waiting(struct session *session, struct qr_task *task);
+static int session_add_tasks(struct session *session, struct qr_task *task);
+static int session_del_tasks(struct session *session, struct qr_task *task);
+static void session_close(struct session *session);
+static void on_session_idle_timeout(uv_timer_t *timer);
+static int timer_start(struct session *session, uv_timer_cb cb,
+ uint64_t timeout, uint64_t repeat);
+static void on_tcp_connect_timeout(uv_timer_t *timer);
+static void on_tcp_watchdog_timeout(uv_timer_t *timer);
/** @internal Get singleton worker. */
static inline struct worker_ctx *get_worker(void)
return NULL;
}
/* Create connection for iterative query */
- uv_handle_t *handle = (uv_handle_t *)req_borrow(task->worker);
+ struct worker_ctx *worker = task->ctx->worker;
+ uv_handle_t *handle = (uv_handle_t *)req_borrow(worker);
if (!handle) {
return NULL;
}
- io_create(task->worker->loop, handle, socktype);
+ io_create(worker->loop, handle, socktype);
/* Bind to outgoing address, according to IP v4/v6. */
union inaddr *addr;
if (family == AF_INET) {
- addr = (union inaddr *)&task->worker->out_addr4;
+ addr = (union inaddr *)&worker->out_addr4;
} else {
- addr = (union inaddr *)&task->worker->out_addr6;
+ addr = (union inaddr *)&worker->out_addr6;
}
int ret = 0;
if (addr->ip.sa_family != AF_UNSPEC) {
struct session *session = handle->data;
if (ret == 0) {
session->outgoing = true;
- ret = array_push(session->tasks, task);
+ ret = session_add_tasks(session, task);
}
if (ret < 0) {
io_deinit(handle);
- req_release(task->worker, (struct req *)handle);
+ req_release(worker, (struct req *)handle);
return NULL;
}
- qr_task_ref(task);
/* Connect or issue query datagram */
task->pending[task->pending_count] = handle;
task->pending_count += 1;
return handle;
}
-static void ioreq_on_close(uv_handle_t *handle)
+static void on_session_close(uv_handle_t *handle)
{
struct worker_ctx *worker = get_worker();
- /* Handle-type events own a session, must close it. */
struct session *session = handle->data;
- struct qr_task *task = session->tasks.at[0];
+ if (!session->outgoing) {
+ assert(session->handle->type == UV_TCP);
+ }
+ bool free_handle = false;
+ if (!session->outgoing && session->handle->type == UV_TCP) {
+ free_handle = true;
+ }
io_deinit(handle);
- qr_task_unref(task);
- req_release(worker, (struct req *)handle);
+ if (free_handle) {
+ free(handle);
+ } else {
+ req_release(worker, (struct req *)handle);
+ }
+}
+
+static void on_session_timer_close(uv_handle_t *timer)
+{
+ struct session *session = timer->data;
+ uv_handle_t *handle = session->handle;
+ if (!uv_is_closing(handle)) {
+ uv_close(handle, on_session_close);
+ }
}
-static void ioreq_kill(uv_handle_t *req)
+static void ioreq_kill_udp(uv_handle_t *req, struct qr_task *task)
{
assert(req);
- if (!uv_is_closing(req)) {
- uv_close(req, ioreq_on_close);
+ struct session *session = req->data;
+ assert(session->outgoing);
+ if (session->closing) {
+ return;
+ }
+ uv_timer_stop(&session->timeout);
+ session_del_tasks(session, task);
+ assert(session->tasks.len == 0);
+ session_close(session);
+}
+
+static void ioreq_kill_tcp(uv_handle_t *req, struct qr_task *task)
+{
+ assert(req);
+ struct session *session = req->data;
+ assert(session->outgoing);
+ if (session->closing) {
+ return;
+ }
+
+ session_del_waiting(session, task);
+ session_del_tasks(session, task);
+
+ int res = 0;
+
+ if (session->outgoing && session->peer.ip.sa_family != AF_UNSPEC &&
+ session->tasks.len == 0 && session->waiting.len == 0 &&
+ session->connected && !session->closing) {
+ assert(session->peer.ip.sa_family == AF_INET ||
+ session->peer.ip.sa_family == AF_INET6);
+ /* This is outbound TCP connection which can be reused.
+ * Close it after timeout */
+ uv_timer_t *timer = &session->timeout;
+ timer->data = session;
+ uv_timer_stop(timer);
+ res = uv_timer_start(timer, on_session_idle_timeout,
+ KR_CONN_RTT_MAX, 0);
+ }
+
+ if (res != 0) {
+ /* if any errors, close the session immediately */
+ session_close(session);
}
}
-static void ioreq_killall(struct qr_task *task)
+static void ioreq_kill_pending(struct qr_task *task)
{
- for (size_t i = 0; i < task->pending_count; ++i) {
- ioreq_kill(task->pending[i]);
+ for (uint16_t i = 0; i < task->pending_count; ++i) {
+ if (task->pending[i]->type == UV_UDP) {
+ ioreq_kill_udp(task->pending[i], task);
+ } else if (task->pending[i]->type == UV_TCP) {
+ ioreq_kill_tcp(task->pending[i], task);
+ } else {
+ assert(false);
+ }
}
task->pending_count = 0;
}
+static void session_close(struct session *session)
+{
+ assert(session->tasks.len == 0 && session->waiting.len == 0);
+
+ if (session->closing) {
+ return;
+ }
+
+ if (session->buffering != NULL) {
+ qr_task_complete(session->buffering);
+ session->buffering = NULL;
+ }
+
+ session->closing = true;
+ if (session->outgoing &&
+ session->peer.ip.sa_family != AF_UNSPEC) {
+ struct worker_ctx *worker = get_worker();
+ struct sockaddr *peer = &session->peer.ip;
+ worker_del_tcp_connected(worker, peer);
+ session->connected = false;
+ }
+
+ if (!uv_is_closing((uv_handle_t *)&session->timeout)) {
+ uv_timer_stop(&session->timeout);
+ if (session->tls_client_ctx) {
+ tls_client_close(session->tls_client_ctx);
+ }
+ session->timeout.data = session;
+ uv_close((uv_handle_t *)&session->timeout, on_session_timer_close);
+ }
+}
+
+static int session_add_waiting(struct session *session, struct qr_task *task)
+{
+ for (int i = 0; i < session->waiting.len; ++i) {
+ if (session->waiting.at[i] == task) {
+ return i;
+ }
+ }
+ int ret = array_push(session->waiting, task);
+ if (ret >= 0) {
+ qr_task_ref(task);
+ }
+ return ret;
+}
+
+static int session_del_waiting(struct session *session, struct qr_task *task)
+{
+ int ret = kr_error(ENOENT);
+ for (int i = 0; i < session->waiting.len; ++i) {
+ if (session->waiting.at[i] == task) {
+ array_del(session->waiting, i);
+ qr_task_unref(task);
+ ret = kr_ok();
+ break;
+ }
+ }
+ return ret;
+}
+
+static int session_add_tasks(struct session *session, struct qr_task *task)
+{
+ for (int i = 0; i < session->tasks.len; ++i) {
+ if (session->tasks.at[i] == task) {
+ return i;
+ }
+ }
+ int ret = array_push(session->tasks, task);
+ if (ret >= 0) {
+ qr_task_ref(task);
+ }
+ return ret;
+}
+
+static int session_del_tasks(struct session *session, struct qr_task *task)
+{
+ int ret = kr_error(ENOENT);
+ for (int i = 0; i < session->tasks.len; ++i) {
+ if (session->tasks.at[i] == task) {
+ array_del(session->tasks, i);
+ qr_task_unref(task);
+ ret = kr_ok();
+ break;
+ }
+ }
+ return ret;
+}
+
/** @cond This memory layout is internal to mempool.c, use only for debugging. */
#if defined(__SANITIZE_ADDRESS__)
struct mempool_chunk {
#endif
/** @endcond */
+/** Get a mempool. (Recycle if possible.) */
static inline struct mempool *pool_borrow(struct worker_ctx *worker)
{
- /* Recycle available mempool if possible */
struct mempool *mp = NULL;
if (worker->pool_mp.len > 0) {
mp = array_tail(worker->pool_mp);
return mp;
}
+/** Return a mempool. (Cache them up to some count.) */
static inline void pool_release(struct worker_ctx *worker, struct mempool *mp)
{
- /* Return mempool to ring or free it if it's full */
if (worker->pool_mp.len < MP_FREELIST_SIZE) {
mp_flush(mp);
array_push(worker->pool_mp, mp);
return kr_rrkey(dst, knot_pkt_qname(pkt), knot_pkt_qtype(pkt), knot_pkt_qclass(pkt));
}
-static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *handle, const struct sockaddr *addr)
+/** Create and initialize a request_ctx (on a fresh mempool).
+ *
+ * handle and addr point to the source of the request, and they are NULL
+ * in case the request didn't come from network.
+ */
+static struct request_ctx *request_create(struct worker_ctx *worker,
+ uv_handle_t *handle,
+ const struct sockaddr *addr)
{
- /* How much can client handle? */
- struct engine *engine = worker->engine;
- size_t pktbuf_max = KR_EDNS_PAYLOAD;
- if (engine->resolver.opt_rr) {
- pktbuf_max = MAX(knot_edns_get_payload(engine->resolver.opt_rr), pktbuf_max);
- }
-
- /* Recycle available mempool if possible */
knot_mm_t pool = {
.ctx = pool_borrow(worker),
.alloc = (knot_mm_alloc_t) mp_alloc
};
- /* Create resolution task */
- struct qr_task *task = mm_alloc(&pool, sizeof(*task));
- if (!task) {
- mp_delete(pool.ctx);
+ /* Create request context */
+ struct request_ctx *ctx = mm_alloc(&pool, sizeof(*ctx));
+ if (!ctx) {
+ pool_release(worker, pool.ctx);
return NULL;
}
- /* Create packet buffers for answer and subrequests */
- task->req.pool = pool;
- knot_pkt_t *pktbuf = knot_pkt_new(NULL, pktbuf_max, &task->req.pool);
- if (!pktbuf) {
- mp_delete(pool.ctx);
- return NULL;
- }
- pktbuf->size = 0;
- task->req.answer = NULL;
- task->pktbuf = pktbuf;
- array_init(task->waiting);
- task->addrlist = NULL;
- task->pending_count = 0;
- task->bytes_remaining = 0;
- task->iter_count = 0;
- task->timeouts = 0;
- task->refs = 1;
- task->finished = false;
- task->leading = false;
- task->worker = worker;
- task->session = NULL;
- task->source.handle = handle;
- task->timeout = NULL;
- task->on_complete = NULL;
- task->req.qsource.key = NULL;
- task->req.qsource.addr = NULL;
- task->req.qsource.dst_addr = NULL;
- task->req.qsource.packet = NULL;
- task->req.qsource.opt = NULL;
- task->req.qsource.size = 0;
+ memset(ctx, 0, sizeof(*ctx));
+
+ /* TODO Relocate pool to struct request */
+ ctx->worker = worker;
+ array_init(ctx->tasks);
+ ctx->source.session = handle ? handle->data : NULL;
+
+ struct kr_request *req = &ctx->req;
+ req->pool = pool;
+
/* Remember query source addr */
- if (addr) {
+ if (!addr || (addr->sa_family != AF_INET && addr->sa_family != AF_INET6)) {
+ ctx->source.addr.ip.sa_family = AF_UNSPEC;
+ } else {
size_t addr_len = sizeof(struct sockaddr_in);
if (addr->sa_family == AF_INET6)
addr_len = sizeof(struct sockaddr_in6);
- memcpy(&task->source.addr, addr, addr_len);
- task->req.qsource.addr = (const struct sockaddr *)&task->source.addr;
- } else {
- task->source.addr.ip4.sin_family = AF_UNSPEC;
+ memcpy(&ctx->source.addr.ip, addr, addr_len);
+ ctx->req.qsource.addr = &ctx->source.addr.ip;
}
+
+ worker->stats.rconcurrent += 1;
+
+ if (!handle) {
+ return ctx;
+ }
+
/* Remember the destination address. */
- if (handle) {
- int addr_len = sizeof(task->source.dst_addr);
- struct sockaddr *dst_addr = (struct sockaddr *)&task->source.dst_addr;
- task->source.dst_addr.ip4.sin_family = AF_UNSPEC;
- if (handle->type == UV_UDP) {
- if (uv_udp_getsockname((uv_udp_t *)handle, dst_addr, &addr_len) == 0) {
- task->req.qsource.dst_addr = dst_addr;
- }
- task->req.qsource.tcp = false;
- } else if (handle->type == UV_TCP) {
- if (uv_tcp_getsockname((uv_tcp_t *)handle, dst_addr, &addr_len) == 0) {
- task->req.qsource.dst_addr = dst_addr;
- }
- task->req.qsource.tcp = true;
+ int addr_len = sizeof(ctx->source.dst_addr);
+ struct sockaddr *dst_addr = &ctx->source.dst_addr.ip;
+ ctx->source.dst_addr.ip.sa_family = AF_UNSPEC;
+ if (handle->type == UV_UDP) {
+ if (uv_udp_getsockname((uv_udp_t *)handle, dst_addr, &addr_len) == 0) {
+ req->qsource.dst_addr = dst_addr;
+ }
+ req->qsource.tcp = false;
+ } else if (handle->type == UV_TCP) {
+ if (uv_tcp_getsockname((uv_tcp_t *)handle, dst_addr, &addr_len) == 0) {
+ req->qsource.dst_addr = dst_addr;
}
+ req->qsource.tcp = true;
}
- worker->stats.concurrent += 1;
- return task;
+
+ return ctx;
}
-/* This is called when the task refcount is zero, free memory. */
-static void qr_task_free(struct qr_task *task)
+/** More initialization, related to the particular incoming query/packet. */
+static int request_start(struct request_ctx *ctx, knot_pkt_t *query)
{
- struct session *session = task->session;
- if (session) {
- /* Walk the session task list and remove itself. */
- for (size_t i = 0; i < session->tasks.len; ++i) {
- if (session->tasks.at[i] == task) {
- array_del(session->tasks, i);
- break;
- }
- }
- /* Start reading again if the session is throttled and
- * the number of outgoing requests is below watermark. */
- uv_handle_t *handle = task->source.handle;
- if (handle && session->tasks.len < task->worker->tcp_pipeline_max/2) {
- if (!uv_is_closing(handle) && session->throttled) {
- io_start_read(handle);
- session->throttled = false;
- }
- }
+ assert(query && ctx);
+ size_t answer_max = KNOT_WIRE_MIN_PKTSIZE;
+ struct kr_request *req = &ctx->req;
+
+ /* source.session can be empty if request was generated by kresd itself */
+ if (!ctx->source.session ||
+ ctx->source.session->handle->type == UV_TCP) {
+ answer_max = KNOT_WIRE_MAX_PKTSIZE;
+ } else if (knot_pkt_has_edns(query)) { /* EDNS */
+ answer_max = MAX(knot_edns_get_payload(query->opt_rr),
+ KNOT_WIRE_MIN_PKTSIZE);
}
- /* Update stats */
- struct worker_ctx *worker = task->worker;
- worker->stats.concurrent -= 1;
+ req->qsource.size = query->size;
+
+ req->answer = knot_pkt_new(NULL, answer_max, &req->pool);
+ if (!req->answer) {
+ return kr_error(ENOMEM);
+ }
+
+ /* Remember query source TSIG key */
+ if (query->tsig_rr) {
+ req->qsource.key = knot_rrset_copy(query->tsig_rr, &req->pool);
+ }
+
+ /* Remember query source EDNS data */
+ if (query->opt_rr) {
+ req->qsource.opt = knot_rrset_copy(query->opt_rr, &req->pool);
+ }
+ /* Start resolution */
+ struct worker_ctx *worker = ctx->worker;
+ struct engine *engine = worker->engine;
+ kr_resolve_begin(req, &engine->resolver, req->answer);
+ worker->stats.queries += 1;
+ /* Throttle outbound queries only when high pressure */
+ if (worker->stats.concurrent < QUERY_RATE_THRESHOLD) {
+ req->options.NO_THROTTLE = true;
+ }
+ return kr_ok();
+}
+
+static void request_free(struct request_ctx *ctx)
+{
+ struct worker_ctx *worker = ctx->worker;
/* Return mempool to ring or free it if it's full */
- pool_release(worker, task->req.pool.ctx);
+ pool_release(worker, ctx->req.pool.ctx);
/* @note The 'task' is invalidated from now on. */
/* Decommit memory every once in a while */
static int mp_delete_count = 0;
#endif
mp_delete_count = 0;
}
+ worker->stats.rconcurrent -= 1;
}
-static int qr_task_start(struct qr_task *task, knot_pkt_t *query)
+static int request_add_tasks(struct request_ctx *ctx, struct qr_task *task)
{
- assert(task && query);
- size_t answer_max = KNOT_WIRE_MIN_PKTSIZE;
- if (!task->source.handle || task->source.handle->type == UV_TCP) {
- answer_max = KNOT_WIRE_MAX_PKTSIZE;
- } else if (knot_pkt_has_edns(query)) { /* EDNS */
- answer_max = MAX(knot_edns_get_payload(query->opt_rr), KNOT_WIRE_MIN_PKTSIZE);
+ for (int i = 0; i < ctx->tasks.len; ++i) {
+ if (ctx->tasks.at[i] == task) {
+ return i;
+ }
+ }
+ int ret = array_push(ctx->tasks, task);
+ if (ret >= 0) {
+ qr_task_ref(task);
}
+ return ret;
+}
- /* Remember query packet size */
- task->req.qsource.size = query->size;
+static int request_del_tasks(struct request_ctx *ctx, struct qr_task *task)
+{
+ int ret = kr_error(ENOENT);
+ for (int i = 0; i < ctx->tasks.len; ++i) {
+ if (ctx->tasks.at[i] == task) {
+ array_del(ctx->tasks, i);
+ qr_task_unref(task);
+ ret = kr_ok();
+ break;
+ }
+ }
+ return ret;
+}
- knot_pkt_t *answer = knot_pkt_new(NULL, answer_max, &task->req.pool);
- if (!answer) {
- return kr_error(ENOMEM);
+
+static struct qr_task *qr_task_create(struct request_ctx *ctx)
+{
+ /* How much can client handle? */
+ struct engine *engine = ctx->worker->engine;
+ size_t pktbuf_max = KR_EDNS_PAYLOAD;
+ if (engine->resolver.opt_rr) {
+ pktbuf_max = MAX(knot_edns_get_payload(engine->resolver.opt_rr),
+ pktbuf_max);
}
- task->req.answer = answer;
- /* Remember query source TSIG key */
- if (query->tsig_rr) {
- task->req.qsource.key = knot_rrset_copy(query->tsig_rr, &task->req.pool);
+ /* Create resolution task */
+ struct qr_task *task = mm_alloc(&ctx->req.pool, sizeof(*task));
+ if (!task) {
+ return NULL;
}
+ memset(task, 0, sizeof(*task)); /* avoid accidentally unitialized fields */
- /* Remember query source EDNS data */
- if (query->opt_rr) {
- task->req.qsource.opt = knot_rrset_copy(query->opt_rr, &task->req.pool);
+ /* Create packet buffers for answer and subrequests */
+ knot_pkt_t *pktbuf = knot_pkt_new(NULL, pktbuf_max, &ctx->req.pool);
+ if (!pktbuf) {
+ mm_free(&ctx->req.pool, task);
+ return NULL;
}
+ pktbuf->size = 0;
- /* Start resolution */
- struct worker_ctx *worker = task->worker;
- struct engine *engine = worker->engine;
- kr_resolve_begin(&task->req, &engine->resolver, answer);
- worker->stats.queries += 1;
- /* Throttle outbound queries only when high pressure */
- if (worker->stats.concurrent < QUERY_RATE_THRESHOLD) {
- task->req.options.NO_THROTTLE = true;
+ task->ctx = ctx;
+ task->pktbuf = pktbuf;
+ array_init(task->waiting);
+ task->refs = 0;
+ int ret = request_add_tasks(ctx, task);
+ if (ret < 0) {
+ mm_free(&ctx->req.pool, task);
+ mm_free(&ctx->req.pool, pktbuf);
+ return NULL;
}
- return 0;
+ ctx->worker->stats.concurrent += 1;
+ return task;
}
-/*@ Register qr_task within session. */
+/* This is called when the task refcount is zero, free memory. */
+static void qr_task_free(struct qr_task *task)
+{
+ struct request_ctx *ctx = task->ctx;
+
+ assert(ctx);
+
+ /* Process outbound session. */
+ struct session *source_session = ctx->source.session;
+ struct worker_ctx *worker = ctx->worker;
+
+ /* Process source session. */
+ if (source_session) {
+ /* Walk the session task list and remove itself. */
+ session_del_tasks(source_session, task);
+ /* Start reading again if the session is throttled and
+ * the number of outgoing requests is below watermark. */
+ uv_handle_t *handle = source_session->handle;
+ if (handle && source_session->tasks.len < worker->tcp_pipeline_max/2) {
+ if (!uv_is_closing(handle) && source_session->throttled) {
+ io_start_read(handle);
+ source_session->throttled = false;
+ }
+ }
+ }
+
+ if (ctx->tasks.len == 0) {
+ array_clear(ctx->tasks);
+ request_free(ctx);
+ }
+
+ /* Update stats */
+ worker->stats.concurrent -= 1;
+}
+
+/*@ Register new qr_task within session. */
static int qr_task_register(struct qr_task *task, struct session *session)
{
+ assert(session->outgoing == false);
+
int ret = array_reserve(session->tasks, session->tasks.len + 1);
if (ret != 0) {
return kr_error(ENOMEM);
}
- array_push(session->tasks, task);
- task->session = session;
+
+ session_add_tasks(session, task);
+
+ struct request_ctx *ctx = task->ctx;
+ assert(ctx && (ctx->source.session == NULL || ctx->source.session == session));
+ ctx->source.session = session;
/* Soft-limit on parallel queries, there is no "slow down" RCODE
* that we could use to signalize to client, but we can stop reading,
* an in effect shrink TCP window size. To get more precise throttling,
* we would need to copy remainder of the unread buffer and reassemble
* when resuming reading. This is NYI. */
- if (session->tasks.len >= task->worker->tcp_pipeline_max) {
- uv_handle_t *handle = task->source.handle;
+ if (session->tasks.len >= task->ctx->worker->tcp_pipeline_max) {
+ uv_handle_t *handle = session->handle;
if (handle && !session->throttled && !uv_is_closing(handle)) {
io_stop_read(handle);
session->throttled = true;
}
}
+
return 0;
}
static void qr_task_complete(struct qr_task *task)
{
- struct worker_ctx *worker = task->worker;
+ struct request_ctx *ctx = task->ctx;
+ struct worker_ctx *worker = ctx->worker;
/* Kill pending I/O requests */
- ioreq_killall(task);
+ ioreq_kill_pending(task);
assert(task->waiting.len == 0);
assert(task->leading == false);
/* Run the completion callback. */
if (task->on_complete) {
- task->on_complete(worker, &task->req, task->baton);
+ task->on_complete(worker, &ctx->req, task->baton);
}
/* Release primary reference to task. */
- qr_task_unref(task);
+ request_del_tasks(ctx, task);
}
/* This is called when we send subrequest / answer */
static int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status)
{
- if (!task->finished) {
- if (status == 0 && handle) {
- /* For TCP we can be sure there will be no retransmit, so we flush
- * the packet buffer so it can be reused again for reassembly. */
- if (handle->type == UV_TCP) {
- knot_pkt_t *pktbuf = task->pktbuf;
- knot_pkt_clear(pktbuf);
- pktbuf->size = 0;
+ if (task->finished) {
+ assert(task->leading == false);
+ qr_task_complete(task);
+ if (!handle || handle->type != UV_TCP) {
+ return status;
+ }
+ struct session* session = handle->data;
+ if (!session->outgoing ||
+ session->waiting.len == 0) {
+ return status;
+ }
+ }
+
+
+ if (status == 0 && handle) {
+ struct session* session = handle->data;
+ if (handle->type == UV_TCP && session->outgoing &&
+ session->waiting.len > 0) {
+ session_del_waiting(session, task);
+ if (session->waiting.len > 0) {
+ struct qr_task *t = session->waiting.at[0];
+ int ret = qr_task_send(t, (uv_handle_t *)handle,
+ &session->peer.ip, t->pktbuf);
+ if (ret != kr_ok()) {
+ while (session->waiting.len > 0) {
+ struct qr_task *task = session->waiting.at[0];
+ array_del(session->waiting, 0);
+ qr_task_finalize(task, KR_STATE_FAIL);
+ }
+ while (session->tasks.len > 0) {
+ struct qr_task *task = session->tasks.at[0];
+ array_del(session->tasks, 0);
+ qr_task_finalize(task, KR_STATE_FAIL);
+ }
+ session_close(session);
+ return status;
+ }
}
- io_start_read(handle); /* Start reading new query */
}
- } else {
- assert(task->timeout == NULL);
- qr_task_complete(task);
+ io_start_read(handle); /* Start reading new query */
}
return status;
}
/* Synchronous push to TLS context, bypassing event loop. */
struct session *session = handle->data;
if (session->has_tls) {
- int ret = tls_push(task, handle, pkt);
+ struct kr_request *req = &task->ctx->req;
+ int ret = kr_ok();
+ if (!session->outgoing) {
+ ret = tls_push(task, handle, pkt);
+ } else {
+ ret = kr_resolve_checkout(req, NULL, addr,
+ SOCK_STREAM, pkt);
+ if (ret != kr_ok()) {
+ return ret;
+ }
+ ret = tls_client_push(task, handle, pkt);
+ }
return qr_task_on_send(task, handle, ret);
}
int ret = 0;
- struct req *send_req = req_borrow(task->worker);
+ struct request_ctx *ctx = task->ctx;
+ struct worker_ctx *worker = ctx->worker;
+ struct kr_request *req = &ctx->req;
+ struct req *send_req = req_borrow(worker);
if (!send_req) {
return qr_task_on_send(task, handle, kr_error(ENOMEM));
}
* @note -- A solution might be opening a separate socket and
* trying to obtain the IP address from it.
*/
- ret = kr_resolve_checkout(&task->req, NULL, addr,
+ ret = kr_resolve_checkout(req, NULL, addr,
handle->type == UV_UDP ? SOCK_DGRAM : SOCK_STREAM,
pkt);
if (ret != kr_ok()) {
- req_release(task->worker, send_req);
+ req_release(worker, send_req);
return ret;
}
}
}
if (ret == 0) {
qr_task_ref(task); /* Pending ioreq on current task */
+ if (worker->too_many_open &&
+ worker->stats.rconcurrent <
+ worker->rconcurrent_highwatermark - (worker->rconcurrent_highwatermark / 4)) {
+ worker->too_many_open = false;
+ }
} else {
- req_release(task->worker, send_req);
+ req_release(worker, send_req);
+ if (ret == UV_EMFILE) {
+ worker->too_many_open = true;
+ worker->rconcurrent_highwatermark = worker->stats.rconcurrent;
+ }
}
/* Update statistics */
- if (handle != task->source.handle && addr) {
+ if (ctx->source.session &&
+ handle != ctx->source.session->handle &&
+ addr) {
if (handle->type == UV_UDP)
- task->worker->stats.udp += 1;
+ worker->stats.udp += 1;
else
- task->worker->stats.tcp += 1;
+ worker->stats.tcp += 1;
if (addr->sa_family == AF_INET6)
- task->worker->stats.ipv6 += 1;
+ worker->stats.ipv6 += 1;
else
- task->worker->stats.ipv4 += 1;
+ worker->stats.ipv4 += 1;
}
return ret;
}
+static int session_next_waiting_send(struct session *session)
+{
+ union inaddr *peer = &session->peer;
+ int ret = kr_ok();
+ if (session->waiting.len > 0) {
+ struct qr_task *task = session->waiting.at[0];
+ ret = qr_task_send(task, session->handle, &peer->ip, task->pktbuf);
+ if (ret == kr_ok()) {
+ session->timeout.data = session;
+ timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0);
+ }
+ }
+ return ret;
+}
+
+static int session_tls_hs_cb(struct session *session, int status)
+{
+ VERBOSE_MSG(NULL, "=> server: '%s' TLS handshake has %s\n",
+ kr_straddr(&session->peer.ip), status ? "failed" : "completed");
+ if (status == 0) {
+ int ret = session_next_waiting_send(session);
+ if (ret == kr_ok()) {
+ struct worker_ctx *worker = get_worker();
+ union inaddr *peer = &session->peer;
+ int ret = worker_add_tcp_connected(worker, &peer->ip, session);
+ assert(ret == 0);
+ }
+ }
+ return kr_ok();
+}
+
static void on_connect(uv_connect_t *req, int status)
{
struct worker_ctx *worker = get_worker();
- struct qr_task *task = req->data;
uv_stream_t *handle = req->handle;
- if (qr_valid_handle(task, (uv_handle_t *)req->handle)) {
- if (status == 0) {
- struct sockaddr_storage addr;
- int addr_len = sizeof(addr);
- uv_tcp_getpeername((uv_tcp_t *)handle, (struct sockaddr *)&addr, &addr_len);
- qr_task_send(task, (uv_handle_t *)handle, (struct sockaddr *)&addr, task->pktbuf);
- } else {
- qr_task_step(task, task->addrlist, NULL);
+ struct session *session = handle->data;
+
+ union inaddr *peer = &session->peer;
+ uv_timer_stop((uv_timer_t *)&session->timeout);
+
+ if (status == UV_ECANCELED) {
+ worker_del_tcp_waiting(worker, &peer->ip);
+ assert(session->closing && session->waiting.len == 0 && session->tasks.len == 0);
+ req_release(worker, (struct req *)req);
+ return;
+ }
+
+ if (session->closing) {
+ worker_del_tcp_waiting(worker, &peer->ip);
+ assert(session->waiting.len == 0 && session->tasks.len == 0);
+ req_release(worker, (struct req *)req);
+ return;
+ }
+
+ if (worker_del_tcp_waiting(worker, &peer->ip) != 0) {
+ /* session isn't in list of waiting queries, *
+ * something gone wrong */
+ while (session->waiting.len > 0) {
+ struct qr_task *task = session->waiting.at[0];
+ session_del_tasks(session, task);
+ array_del(session->waiting, 0);
+ qr_task_finalize(task, KR_STATE_FAIL);
+ qr_task_unref(task);
}
+ assert(session->tasks.len == 0);
+ req_release(worker, (struct req *)req);
+ session_close(session);
+ return;
}
- qr_task_unref(task);
+
+ if (status != 0) {
+ while (session->waiting.len > 0) {
+ struct qr_task *task = session->waiting.at[0];
+ session_del_tasks(session, task);
+ array_del(session->waiting, 0);
+ qr_task_finalize(task, KR_STATE_FAIL);
+ qr_task_unref(task);
+ }
+ assert(session->tasks.len == 0);
+ req_release(worker, (struct req *)req);
+ session_close(session);
+ return;
+ }
+
+ session->connected = true;
+ session->handle = (uv_handle_t *)handle;
+
+ int ret = kr_ok();
+ if (session->has_tls) {
+ ret = tls_client_connect_start(session->tls_client_ctx,
+ session, session_tls_hs_cb);
+ if (ret == kr_error(EAGAIN)) {
+ req_release(worker, (struct req *)req);
+ io_start_read(session->handle);
+ return;
+ }
+ }
+
+ if (ret == kr_ok()) {
+ ret = session_next_waiting_send(session);
+ if (ret == kr_ok()) {
+ worker_add_tcp_connected(worker, &session->peer.ip, session);
+ req_release(worker, (struct req *)req);
+ return;
+ }
+ }
+
+ while (session->waiting.len > 0) {
+ struct qr_task *task = session->waiting.at[0];
+ session_del_tasks(session, task);
+ array_del(session->waiting, 0);
+ qr_task_finalize(task, KR_STATE_FAIL);
+ qr_task_unref(task);
+ }
+
+ assert(session->tasks.len == 0);
+
req_release(worker, (struct req *)req);
+ session_close(session);
}
-static void on_timer_close(uv_handle_t *handle)
+static void on_tcp_connect_timeout(uv_timer_t *timer)
{
- struct qr_task *task = handle->data;
- req_release(task->worker, (struct req *)handle);
- qr_task_unref(task);
+ struct session *session = timer->data;
+
+ uv_timer_stop(timer);
+ struct worker_ctx *worker = get_worker();
+
+ assert (session->waiting.len == session->tasks.len);
+
+ while (session->waiting.len > 0) {
+ struct qr_task *task = session->waiting.at[0];
+ struct request_ctx *ctx = task->ctx;
+ task->timeouts += 1;
+ worker->stats.timeout += 1;
+ session_del_tasks(session, task);
+ array_del(session->waiting, 0);
+ qr_task_unref(task);
+ assert(task->refs == 1);
+ qr_task_finalize(task, KR_STATE_FAIL);
+ }
+
+ assert (session->tasks.len == 0);
+ session_close(session);
+}
+
+static void on_tcp_watchdog_timeout(uv_timer_t *timer)
+{
+ struct session *session = timer->data;
+
+ assert(session->outgoing);
+ uv_timer_stop(timer);
+ struct worker_ctx *worker = get_worker();
+
+ worker_del_tcp_connected(worker, &session->peer.ip);
+
+ while (session->waiting.len > 0) {
+ struct qr_task *task = session->waiting.at[0];
+ task->timeouts += 1;
+ worker->stats.timeout += 1;
+ array_del(session->waiting, 0);
+ qr_task_unref(task);
+ session_del_tasks(session, task);
+ qr_task_finalize(task, KR_STATE_FAIL);
+ }
+
+ while (session->tasks.len > 0) {
+ struct qr_task *task = session->tasks.at[0];
+ task->timeouts += 1;
+ worker->stats.timeout += 1;
+ assert(task->refs > 1);
+ array_del(session->tasks, 0);
+ qr_task_unref(task);
+ qr_task_finalize(task, KR_STATE_FAIL);
+ }
+
+ session_close(session);
}
/* This is called when I/O timeouts */
-static void on_timeout(uv_timer_t *req)
+static void on_udp_timeout(uv_timer_t *timer)
{
- struct qr_task *task = req->data;
+ struct session *session = timer->data;
+ uv_timer_stop(timer);
+
+ assert(session->tasks.len == 1);
+ assert(session->waiting.len == 0);
/* Penalize all tried nameservers with a timeout. */
- struct worker_ctx *worker = task->worker;
+ struct qr_task *task = session->tasks.at[0];
+ struct worker_ctx *worker = task->ctx->worker;
if (task->leading && task->pending_count > 0) {
- struct kr_query *qry = array_tail(task->req.rplan.pending);
+ struct kr_query *qry = array_tail(task->ctx->req.rplan.pending);
struct sockaddr_in6 *addrlist = (struct sockaddr_in6 *)task->addrlist;
for (uint16_t i = 0; i < MIN(task->pending_count, task->addrlist_count); ++i) {
struct sockaddr *choice = (struct sockaddr *)(&addrlist[i]);
worker->engine->resolver.cache_rtt, KR_NS_UPDATE);
}
}
- /* Release timer handle */
- task->timeout = NULL;
- uv_close((uv_handle_t *)req, on_timer_close); /* Return borrowed task here */
- /* Interrupt current pending request. */
task->timeouts += 1;
worker->stats.timeout += 1;
qr_task_step(task, NULL, NULL);
}
-static bool retransmit(struct qr_task *task)
+static void on_session_idle_timeout(uv_timer_t *timer)
+{
+ struct session *s = timer->data;
+ assert(s && s->outgoing);
+ uv_timer_stop(timer);
+ if (s->closing) {
+ return;
+ }
+ /* session was not in use during timer timeout
+ * remove it from connection list and close
+ */
+ assert(s->tasks.len == 0 && s->waiting.len == 0);
+ session_close(s);
+}
+
+static uv_handle_t *retransmit(struct qr_task *task)
{
+ uv_handle_t *ret = NULL;
if (task && task->addrlist && task->addrlist_count > 0) {
struct sockaddr_in6 *choice = &((struct sockaddr_in6 *)task->addrlist)[task->addrlist_turn];
- uv_handle_t *subreq = ioreq_spawn(task, SOCK_DGRAM, choice->sin6_family);
- if (subreq) { /* Create connection for iterative query */
- if (qr_task_send(task, subreq, (struct sockaddr *)choice, task->pktbuf) == 0) {
- task->addrlist_turn = (task->addrlist_turn + 1) % task->addrlist_count; /* Round robin */
- return true;
- }
+ ret = ioreq_spawn(task, SOCK_DGRAM, choice->sin6_family);
+ if (ret &&
+ qr_task_send(task, ret, (struct sockaddr *)choice,
+ task->pktbuf) == 0) {
+ task->addrlist_turn = (task->addrlist_turn + 1) %
+ task->addrlist_count; /* Round robin */
}
}
- return false;
+ return ret;
}
static void on_retransmit(uv_timer_t *req)
{
- struct qr_task *task = req->data;
- assert(task->finished == false);
- assert(task->timeout != NULL);
+ struct session *session = req->data;
+ assert(session->tasks.len == 1);
uv_timer_stop(req);
- if (!retransmit(req->data)) {
+ struct qr_task *task = session->tasks.at[0];
+ if (retransmit(task) == NULL) {
/* Not possible to spawn request, start timeout timer with remaining deadline. */
uint64_t timeout = KR_CONN_RTT_MAX - task->pending_count * KR_CONN_RETRY;
- uv_timer_start(req, on_timeout, timeout, 0);
+ uv_timer_start(req, on_udp_timeout, timeout, 0);
} else {
uv_timer_start(req, on_retransmit, KR_CONN_RETRY, 0);
}
}
-static int timer_start(struct qr_task *task, uv_timer_cb cb, uint64_t timeout, uint64_t repeat)
+static int timer_start(struct session *session, uv_timer_cb cb,
+ uint64_t timeout, uint64_t repeat)
{
- assert(task->timeout == NULL);
- struct worker_ctx *worker = task->worker;
- uv_timer_t *timer = (uv_timer_t *)req_borrow(worker);
- if (!timer) {
- return kr_error(ENOMEM);
- }
- uv_timer_init(worker->loop, timer);
+ uv_timer_t *timer = (uv_timer_t *)&session->timeout;
+ assert(timer->data == session);
int ret = uv_timer_start(timer, cb, timeout, repeat);
if (ret != 0) {
uv_timer_stop(timer);
- req_release(worker, (struct req *)timer);
return kr_error(ENOMEM);
}
- timer->data = task;
- qr_task_ref(task);
- task->timeout = timer;
return 0;
}
static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *pkt)
{
/* Close pending timer */
- if (task->timeout) {
- /* Timer was running so it holds reference to task, make sure the timer event
- * never fires and release the reference on timer close instead. */
- uv_timer_stop(task->timeout);
- uv_close((uv_handle_t *)task->timeout, on_timer_close);
- task->timeout = NULL;
- }
- ioreq_killall(task);
+ ioreq_kill_pending(task);
/* Clear from outgoing table. */
if (!task->leading)
return;
char key[KR_RRKEY_LEN];
int ret = subreq_key(key, task->pktbuf);
if (ret > 0) {
- assert(map_get(&task->worker->outgoing, key) == task);
- map_del(&task->worker->outgoing, key);
+ assert(map_get(&task->ctx->worker->outgoing, key) == task);
+ map_del(&task->ctx->worker->outgoing, key);
}
/* Notify waiting tasks. */
- struct kr_query *leader_qry = array_tail(task->req.rplan.pending);
+ struct kr_query *leader_qry = array_tail(task->ctx->req.rplan.pending);
for (size_t i = task->waiting.len; i > 0; i--) {
struct qr_task *follower = task->waiting.at[i - 1];
/* Reuse MSGID and 0x20 secret */
- if (follower->req.rplan.pending.len > 0) {
- struct kr_query *qry = array_tail(follower->req.rplan.pending);
+ if (follower->ctx->req.rplan.pending.len > 0) {
+ struct kr_query *qry = array_tail(follower->ctx->req.rplan.pending);
qry->id = leader_qry->id;
qry->secret = leader_qry->secret;
leader_qry->secret = 0; /* Next will be already decoded */
assert(task);
char key[KR_RRKEY_LEN];
if (subreq_key(key, task->pktbuf) > 0) {
- assert(map_contains(&task->worker->outgoing, key) == false);
- map_set(&task->worker->outgoing, key, task);
+ assert(map_contains(&task->ctx->worker->outgoing, key) == false);
+ map_set(&task->ctx->worker->outgoing, key, task);
task->leading = true;
}
}
assert(task);
char key[KR_RRKEY_LEN];
if (subreq_key(key, task->pktbuf) > 0) {
- struct qr_task *leader = map_get(&task->worker->outgoing, key);
+ struct qr_task *leader = map_get(&task->ctx->worker->outgoing, key);
if (leader) {
/* Enqueue itself to leader for this subrequest. */
- int ret = array_reserve_mm(leader->waiting, leader->waiting.len + 1, kr_memreserve, &leader->req.pool);
+ int ret = array_reserve_mm(leader->waiting, leader->waiting.len + 1,
+ kr_memreserve, &leader->ctx->req.pool);
if (ret == 0) {
array_push(leader->waiting, task);
qr_task_ref(task);
static int qr_task_finalize(struct qr_task *task, int state)
{
assert(task && task->leading == false);
- kr_resolve_finish(&task->req, state);
+ struct request_ctx *ctx = task->ctx;
+ kr_resolve_finish(&ctx->req, state);
task->finished = true;
/* Send back answer */
- (void) qr_task_send(task, task->source.handle, (struct sockaddr *)&task->source.addr, task->req.answer);
+ if (ctx->source.session != NULL) {
+ (void) qr_task_send(task, ctx->source.session->handle,
+ (struct sockaddr *)&ctx->source.addr,
+ ctx->req.answer);
+ } else {
+ (void) qr_task_on_send(task, NULL, kr_error(EIO));
+ }
return state == KR_STATE_DONE ? 0 : kr_error(EIO);
}
-static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *packet)
+static int qr_task_step(struct qr_task *task,
+ const struct sockaddr *packet_source, knot_pkt_t *packet)
{
/* No more steps after we're finished. */
if (!task || task->finished) {
return kr_error(ESTALE);
}
+
+
/* Close pending I/O requests */
subreq_finalize(task, packet_source, packet);
/* Consume input and produce next query */
+ struct request_ctx *ctx = task->ctx;
+ struct kr_request *req = &ctx->req;
+ struct worker_ctx *worker = ctx->worker;
int sock_type = -1;
task->addrlist = NULL;
task->addrlist_count = 0;
task->addrlist_turn = 0;
- task->req.has_tls = (task->session && task->session->has_tls);
- int state = kr_resolve_consume(&task->req, packet_source, packet);
+ req->has_tls = (ctx->source.session && ctx->source.session->has_tls);
+
+ if (worker->too_many_open) {
+ struct kr_rplan *rplan = &req->rplan;
+ if (worker->stats.rconcurrent <
+ worker->rconcurrent_highwatermark - (worker->rconcurrent_highwatermark / 4)) {
+ worker->too_many_open = false;
+ } else if (packet && kr_rplan_empty(rplan)) {
+ /* new query; TODO - make this detection more obvious */
+ kr_resolve_consume(req, packet_source, packet);
+ return qr_task_finalize(task, KR_STATE_FAIL);
+ }
+ }
+
+ int state = kr_resolve_consume(req, packet_source, packet);
while (state == KR_STATE_PRODUCE) {
- state = kr_resolve_produce(&task->req, &task->addrlist, &sock_type, task->pktbuf);
- if (unlikely(++task->iter_count > KR_ITER_LIMIT || task->timeouts >= KR_TIMEOUT_LIMIT)) {
+ state = kr_resolve_produce(req, &task->addrlist,
+ &sock_type, task->pktbuf);
+ if (unlikely(++task->iter_count > KR_ITER_LIMIT ||
+ task->timeouts >= KR_TIMEOUT_LIMIT)) {
return qr_task_finalize(task, KR_STATE_FAIL);
}
}
if (subreq_enqueue(task)) {
return kr_ok(); /* Will be notified when outgoing query finishes. */
}
- /* Check current query NSLIST */
- struct kr_query *qry = array_tail(task->req.rplan.pending);
/* Start transmitting */
- if (retransmit(task)) {
- assert(qry != NULL);
- /* Retransmit at default interval, or more frequently if the mean
- * RTT of the server is better. If the server is glued, use default rate. */
- size_t timeout = qry->ns.score;
- if (timeout > KR_NS_GLUED) {
- /* We don't have information about variance in RTT, expect +10ms */
- timeout = MIN(qry->ns.score + 10, KR_CONN_RETRY);
- } else {
- timeout = KR_CONN_RETRY;
- }
- ret = timer_start(task, on_retransmit, timeout, 0);
- } else {
+ uv_handle_t *handle = retransmit(task);
+ if (handle == NULL) {
return qr_task_step(task, NULL, NULL);
}
+ /* Check current query NSLIST */
+ struct kr_query *qry = array_tail(req->rplan.pending);
+ assert(qry != NULL);
+ /* Retransmit at default interval, or more frequently if the mean
+ * RTT of the server is better. If the server is glued, use default rate. */
+ size_t timeout = qry->ns.score;
+ if (timeout > KR_NS_GLUED) {
+ /* We don't have information about variance in RTT, expect +10ms */
+ timeout = MIN(qry->ns.score + 10, KR_CONN_RETRY);
+ } else {
+ timeout = KR_CONN_RETRY;
+ }
/* Announce and start subrequest.
* @note Only UDP can lead I/O as it doesn't touch 'task->pktbuf' for reassembly.
*/
subreq_lead(task);
- } else {
- uv_connect_t *conn = (uv_connect_t *)req_borrow(task->worker);
- if (!conn) {
- return qr_task_step(task, NULL, NULL);
+ struct session *session = handle->data;
+ ret = timer_start(session, on_retransmit, timeout, 0);
+ /* Start next step with timeout, fatal if can't start a timer. */
+ if (ret != 0) {
+ subreq_finalize(task, packet_source, packet);
+ return qr_task_finalize(task, KR_STATE_FAIL);
}
+ } else {
+ assert (sock_type == SOCK_STREAM);
const struct sockaddr *addr =
packet_source ? packet_source : task->addrlist;
- uv_handle_t *client = ioreq_spawn(task, sock_type, addr->sa_family);
- if (!client) {
- req_release(task->worker, (struct req *)conn);
- return qr_task_step(task, NULL, NULL);
- }
- conn->data = task;
- if (uv_tcp_connect(conn, (uv_tcp_t *)client, addr , on_connect) != 0) {
- req_release(task->worker, (struct req *)conn);
- return qr_task_step(task, NULL, NULL);
+ if (addr->sa_family == AF_UNSPEC) {
+ subreq_finalize(task, packet_source, packet);
+ return qr_task_finalize(task, KR_STATE_FAIL);
}
- qr_task_ref(task); /* Connect request borrows task */
- ret = timer_start(task, on_timeout, KR_CONN_RTT_MAX, 0);
- }
+ struct session* session = NULL;
+ if ((session = worker_find_tcp_waiting(ctx->worker, addr)) != NULL) {
+ if (session->closing) {
+ subreq_finalize(task, packet_source, packet);
+ return qr_task_finalize(task, KR_STATE_FAIL);
+ }
+ /* There are waiting tasks.
+ * It means that connection establishing or data sending
+ * is coming right now. */
+ /* Task will be notified in on_connect() or qr_task_on_send(). */
+ ret = session_add_waiting(session, task);
+ if (ret < 0) {
+ subreq_finalize(task, packet_source, packet);
+ return qr_task_finalize(task, KR_STATE_FAIL);
+ }
+ ret = session_add_tasks(session, task);
+ if (ret < 0) {
+ session_del_waiting(session, task);
+ subreq_finalize(task, packet_source, packet);
+ return qr_task_finalize(task, KR_STATE_FAIL);
+ }
+ } else if ((session = worker_find_tcp_connected(ctx->worker, addr)) != NULL) {
+ /* Connection has been already established */
+ assert(session->outgoing);
+ if (session->closing) {
+ session_del_tasks(session, task);
+ subreq_finalize(task, packet_source, packet);
+ return qr_task_finalize(task, KR_STATE_FAIL);
+ }
- /* Start next step with timeout, fatal if can't start a timer. */
- if (ret != 0) {
- subreq_finalize(task, packet_source, packet);
- return qr_task_finalize(task, KR_STATE_FAIL);
+ if (session->tasks.len >= worker->tcp_pipeline_max) {
+ session_del_tasks(session, task);
+ subreq_finalize(task, packet_source, packet);
+ return qr_task_finalize(task, KR_STATE_FAIL);
+ }
+
+ /* will be removed in qr_task_on_send() */
+ ret = session_add_waiting(session, task);
+ if (ret < 0) {
+ session_del_tasks(session, task);
+ subreq_finalize(task, packet_source, packet);
+ return qr_task_finalize(task, KR_STATE_FAIL);
+ }
+ ret = session_add_tasks(session, task);
+ if (ret < 0) {
+ session_del_waiting(session, task);
+ session_del_tasks(session, task);
+ subreq_finalize(task, packet_source, packet);
+ return qr_task_finalize(task, KR_STATE_FAIL);
+ }
+ if (session->waiting.len == 1) {
+ ret = qr_task_send(task, session->handle,
+ &session->peer.ip, task->pktbuf);
+ if (ret < 0) {
+ session_del_waiting(session, task);
+ session_del_tasks(session, task);
+ subreq_finalize(task, packet_source, packet);
+ return qr_task_finalize(task, KR_STATE_FAIL);
+ }
+ ret = timer_start(session, on_tcp_watchdog_timeout,
+ KR_CONN_RTT_MAX, 0);
+ if (ret < 0) {
+ assert(false);
+ session_del_waiting(session, task);
+ session_del_tasks(session, task);
+ subreq_finalize(task, packet_source, packet);
+ return qr_task_finalize(task, KR_STATE_FAIL);
+ }
+ }
+ task->pending[task->pending_count] = session->handle;
+ task->pending_count += 1;
+ } else {
+ /* Make connection */
+ uv_connect_t *conn = (uv_connect_t *)req_borrow(ctx->worker);
+ if (!conn) {
+ return qr_task_step(task, NULL, NULL);
+ }
+ uv_handle_t *client = ioreq_spawn(task, sock_type,
+ addr->sa_family);
+ if (!client) {
+ req_release(ctx->worker, (struct req *)conn);
+ subreq_finalize(task, packet_source, packet);
+ return qr_task_finalize(task, KR_STATE_FAIL);
+ }
+ session = client->data;
+ ret = worker_add_tcp_waiting(ctx->worker, addr, session);
+ if (ret < 0) {
+ session_del_tasks(session, task);
+ req_release(ctx->worker, (struct req *)conn);
+ subreq_finalize(task, packet_source, packet);
+ return qr_task_finalize(task, KR_STATE_FAIL);
+ }
+ /* will be removed in qr_task_on_send() */
+ ret = session_add_waiting(session, task);
+ if (ret < 0) {
+ session_del_tasks(session, task);
+ worker_del_tcp_waiting(ctx->worker, addr);
+ req_release(ctx->worker, (struct req *)conn);
+ subreq_finalize(task, packet_source, packet);
+ return qr_task_finalize(task, KR_STATE_FAIL);
+ }
+
+ /* Check if there must be TLS */
+ struct engine *engine = ctx->worker->engine;
+ struct network *net = &engine->net;
+ const char *key = tcpsess_key(addr);
+ struct tls_client_paramlist_entry *entry = map_get(&net->tls_client_params, key);
+ if (entry) {
+ assert(session->tls_client_ctx == NULL);
+ struct tls_client_ctx_t *tls_ctx = tls_client_ctx_new(entry);
+ if (!tls_ctx) {
+ session_del_tasks(session, task);
+ session_del_waiting(session, task);
+ worker_del_tcp_waiting(ctx->worker, addr);
+ req_release(ctx->worker, (struct req *)conn);
+ return qr_task_step(task, NULL, NULL);
+ }
+ tls_client_ctx_set_params(tls_ctx, entry);
+ session->tls_client_ctx = tls_ctx;
+ session->has_tls = true;
+ }
+
+ conn->data = session;
+ memcpy(&session->peer, addr, sizeof(session->peer));
+
+ ret = timer_start(session, on_tcp_connect_timeout,
+ KR_CONN_RTT_MAX, 0);
+ if (ret != 0) {
+ session_del_tasks(session, task);
+ session_del_waiting(session, task);
+ worker_del_tcp_waiting(ctx->worker, addr);
+ req_release(ctx->worker, (struct req *)conn);
+ subreq_finalize(task, packet_source, packet);
+ return qr_task_finalize(task, KR_STATE_FAIL);
+ }
+
+ if (uv_tcp_connect(conn, (uv_tcp_t *)client,
+ addr , on_connect) != 0) {
+ session_del_tasks(session, task);
+ session_del_waiting(session, task);
+ worker_del_tcp_waiting(ctx->worker, addr);
+ req_release(ctx->worker, (struct req *)conn);
+ return qr_task_step(task, NULL, NULL);
+ }
+ }
}
- return 0;
+ return kr_ok();
}
static int parse_packet(knot_pkt_t *query)
return kr_ok();
}
-int worker_submit(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *msg, const struct sockaddr* addr)
+static struct qr_task* find_task(const struct session *session, uint16_t msg_id)
{
- if (!worker || !handle) {
+ struct qr_task *ret = NULL;
+ const qr_tasklist_t *tasklist = &session->tasks;
+ for (size_t i = 0; i < tasklist->len; ++i) {
+ struct qr_task *task = tasklist->at[i];
+ uint16_t task_msg_id = knot_wire_get_id(task->pktbuf->wire);
+ if (task_msg_id == msg_id) {
+ ret = task;
+ break;
+ }
+ }
+ return ret;
+}
+
+
+int worker_submit(struct worker_ctx *worker, uv_handle_t *handle,
+ knot_pkt_t *msg, const struct sockaddr* addr)
+{
+ bool OK = worker && handle && handle->data;
+ if (!OK) {
+ assert(false);
return kr_error(EINVAL);
}
struct session *session = handle->data;
- assert(session);
/* Parse packet */
int ret = parse_packet(msg);
- /* Start new task on listening sockets, or resume if this is subrequest */
+ /* Start new task on listening sockets,
+ * or resume if this is subrequest */
struct qr_task *task = NULL;
- if (!session->outgoing) {
+ if (!session->outgoing) { /* request from a client */
/* Ignore badly formed queries or responses. */
if (!msg || ret != 0 || knot_wire_get_qr(msg->wire)) {
if (msg) worker->stats.dropped += 1;
- return kr_error(EINVAL); /* Ignore. */
+ return kr_error(EILSEQ);
}
- task = qr_task_create(worker, handle, addr);
- if (!task) {
+ struct request_ctx *ctx = request_create(worker, handle, addr);
+ if (!ctx) {
return kr_error(ENOMEM);
}
- ret = qr_task_start(task, msg);
+
+ ret = request_start(ctx, msg);
if (ret != 0) {
- qr_task_free(task);
+ request_free(ctx);
return kr_error(ENOMEM);
}
- } else {
- task = session->tasks.len > 0 ? array_tail(session->tasks) : NULL;
+
+ task = qr_task_create(ctx);
+ if (!task) {
+ request_free(ctx);
+ return kr_error(ENOMEM);
+ }
+ } else if (msg) { /* response from upstream */
+ task = find_task(session, knot_wire_get_id(msg->wire));
}
/* Consume input and produce next message */
- return qr_task_step(task, addr, msg);
+ return qr_task_step(task, NULL, msg);
+}
+
+static int map_add_tcp_session(map_t *map, const struct sockaddr* addr,
+ struct session *session)
+{
+ assert(map && addr);
+ const char *key = tcpsess_key(addr);
+ assert(key);
+ assert(map_contains(map, key) == 0);
+ int ret = map_set(map, key, session);
+ return ret ? kr_error(EINVAL) : kr_ok();
+}
+
+static int map_del_tcp_session(map_t *map, const struct sockaddr* addr)
+{
+ assert(map && addr);
+ const char *key = tcpsess_key(addr);
+ assert(key);
+ int ret = map_del(map, key);
+ return ret ? kr_error(ENOENT) : kr_ok();
+}
+
+static struct session* map_find_tcp_session(map_t *map,
+ const struct sockaddr *addr)
+{
+ assert(map && addr);
+ const char *key = tcpsess_key(addr);
+ assert(key);
+ struct session* ret = map_get(map, key);
+ return ret;
+}
+
+static int worker_add_tcp_connected(struct worker_ctx *worker,
+ const struct sockaddr* addr,
+ struct session *session)
+{
+ assert(addr);
+ const char *key = tcpsess_key(addr);
+ assert(key);
+ tcp_connected += 1;
+ assert(map_contains(&worker->tcp_connected, key) == 0);
+ return map_add_tcp_session(&worker->tcp_connected, addr, session);
+}
+
+static int worker_del_tcp_connected(struct worker_ctx *worker,
+ const struct sockaddr* addr)
+{
+ assert(addr);
+ const char *key = tcpsess_key(addr);
+ assert(key);
+ int ret = map_del_tcp_session(&worker->tcp_connected, addr);
+ if (ret == 0) {
+ tcp_connected -= 1;
+ }
+ return ret;
+}
+
+static struct session* worker_find_tcp_connected(struct worker_ctx *worker,
+ const struct sockaddr* addr)
+{
+ return map_find_tcp_session(&worker->tcp_connected, addr);
+}
+
+static int worker_add_tcp_waiting(struct worker_ctx *worker,
+ const struct sockaddr* addr,
+ struct session *session)
+{
+ assert(addr);
+ const char *key = tcpsess_key(addr);
+ assert(key);
+ assert(map_contains(&worker->tcp_waiting, key) == 0);
+ int ret = map_add_tcp_session(&worker->tcp_waiting, addr, session);
+ if (ret == 0) {
+ tcp_waiting += 1;
+ }
+ return ret;
+}
+
+static int worker_del_tcp_waiting(struct worker_ctx *worker,
+ const struct sockaddr* addr)
+{
+ assert(addr);
+ const char *key = tcpsess_key(addr);
+ assert(key);
+ int ret = map_del_tcp_session(&worker->tcp_waiting, addr);
+ if (ret == 0) {
+ tcp_waiting -= 1;
+ }
+ return ret;
+}
+
+static struct session* worker_find_tcp_waiting(struct worker_ctx *worker,
+ const struct sockaddr* addr)
+{
+ return map_find_tcp_session(&worker->tcp_waiting, addr);
}
/* Return DNS/TCP message size. */
-static int msg_size(const uint8_t *msg)
+static int get_msg_size(const uint8_t *msg)
{
- return wire_read_u16(msg);
+ return wire_read_u16(msg);
}
/* If buffering, close last task as it isn't live yet. */
if (session->buffering) {
qr_task_free(session->buffering);
session->buffering = NULL;
+ session->msg_hdr_idx = 0;
}
}
return 0;
}
-int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle, const uint8_t *msg, ssize_t len)
+int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle,
+ const uint8_t *msg, ssize_t len)
+
{
if (!worker || !handle) {
return kr_error(EINVAL);
}
/* Connection error or forced disconnect */
struct session *session = handle->data;
+ if (session->closing) {
+ return kr_ok();
+ }
if (len <= 0 || !msg) {
/* If we have pending tasks, we must dissociate them from the
* connection so they don't try to access closed and freed handle.
- * @warning Do not modify task if this is outgoing request as it is shared with originator.
+ * @warning Do not modify task if this is outgoing request
+ * as it is shared with originator.
*/
- if (!session->outgoing) {
- for (size_t i = 0; i < session->tasks.len; ++i) {
- struct qr_task *task = session->tasks.at[i];
- task->session = NULL;
- task->source.handle = NULL;
+ uv_timer_t *timer = &session->timeout;
+ uv_timer_stop(timer);
+ while (session->waiting.len > 0) {
+ struct qr_task *task = session->waiting.at[0];
+ if (session->outgoing) {
+ qr_task_finalize(task, KR_STATE_FAIL);
+ }
+ array_del(session->waiting, 0);
+ qr_task_unref(task);
+ session_del_tasks(session, task);
+ }
+ while (session->tasks.len > 0) {
+ struct qr_task *task = session->tasks.at[0];
+ if (session->outgoing) {
+ qr_task_finalize(task, KR_STATE_FAIL);
}
- session->tasks.len = 0;
+ session_del_tasks(session, task);
}
+ session_close(session);
return kr_error(ECONNRESET);
}
+ if (session->outgoing) {
+ uv_timer_stop(&session->timeout);
+ timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0);
+ if (session->bytes_to_skip) {
+ session->buffering = NULL;
+ ssize_t min_len = MIN(session->bytes_to_skip, len);
+ len -= min_len;
+ msg += min_len;
+ session->bytes_to_skip -= min_len;
+ if (len < 0 || session->bytes_to_skip < 0) {
+ /* Something gone wrong.
+ * Better kill the connection */
+ assert(false);
+ return kr_error(EILSEQ);
+ }
+ if (len == 0) {
+ return kr_ok();
+ }
+ assert(session->bytes_to_skip == 0);
+ }
+ }
+
int submitted = 0;
struct qr_task *task = session->buffering;
+ knot_pkt_t *pkt_buf = NULL;
+ if (task) {
+ pkt_buf = task->pktbuf;
+ } else {
+ /* Update DNS header in session->msg_hdr* */
+ assert(session->msg_hdr_idx <= sizeof(session->msg_hdr));
+ ssize_t hdr_amount = sizeof(session->msg_hdr) -
+ session->msg_hdr_idx;
+ if (hdr_amount > len) {
+ hdr_amount = len;
+ }
+ if (hdr_amount > 0) {
+ memcpy(session->msg_hdr + session->msg_hdr_idx, msg, hdr_amount);
+ session->msg_hdr_idx += hdr_amount;
+ len -= hdr_amount;
+ msg += hdr_amount;
+ }
+ if (len == 0) { /* no data beyond msg_hdr -> not much to do */
+ return kr_ok();
+ }
+ assert(session->msg_hdr_idx == sizeof(session->msg_hdr));
+ session->msg_hdr_idx = 0;
+ uint16_t msg_size = get_msg_size(session->msg_hdr);
+ uint16_t msg_id = knot_wire_get_id(session->msg_hdr + 2);
+ if (msg_size < KNOT_WIRE_HEADER_SIZE) {
+ /* better kill the connection; we would probably get out of sync */
+ assert(false);
+ return kr_error(EILSEQ);
+ }
- /* If this is a new query, create a new task that we can use
- * to buffer incoming message until it's complete. */
- if (!session->outgoing) {
- if (!task) {
- /* Get TCP peer name, keep zeroed address if it fails. */
- struct sockaddr_storage addr;
- memset(&addr, 0, sizeof(addr));
- int addr_len = sizeof(addr);
- uv_tcp_getpeername((uv_tcp_t *)handle, (struct sockaddr *)&addr, &addr_len);
- task = qr_task_create(worker, (uv_handle_t *)handle, (struct sockaddr *)&addr);
+ /* get task */
+ if (!session->outgoing) {
+ /* This is a new query, create a new task that we can use
+ * to buffer incoming message until it's complete. */
+ struct sockaddr_storage addr_storage;
+ struct sockaddr *addr = (struct sockaddr *)&addr_storage;
+ int addr_len = sizeof(addr_storage);
+ int ret = uv_tcp_getpeername((uv_tcp_t *)handle, addr, &addr_len);
+ if (ret) {
+ addr = NULL; /* fallback */
+ }
+ struct request_ctx *ctx = request_create(worker,
+ (uv_handle_t *)handle,
+ addr);
+ if (!ctx) {
+ assert(false);
+ return kr_error(ENOMEM);
+ }
+ task = qr_task_create(ctx);
if (!task) {
+ assert(false);
+ request_free(ctx);
return kr_error(ENOMEM);
}
- session->buffering = task;
+ } else {
+ /* Start of response from upstream.
+ * The session task list must contain a task
+ * with the same msg id. */
+ task = find_task(session, msg_id);
+ /* FIXME: on high load over one connection, it's likely
+ * that we will get multiple matches sooner or later (!) */
+ if (task) {
+ knot_pkt_clear(task->pktbuf);
+ } else {
+ /* TODO: only ignore one message without killing connection */
+ session->buffering = NULL;
+ session->bytes_to_skip = msg_size - 2;
+ ssize_t min_len = MIN(session->bytes_to_skip, len);
+ len -= min_len;
+ msg += min_len;
+ session->bytes_to_skip -= min_len;
+ if (len < 0 || session->bytes_to_skip < 0) {
+ /* Something gone wrong.
+ * Better kill the connection */
+ assert(false);
+ return kr_error(EILSEQ);
+ }
+ if (len == 0) {
+ return kr_ok();
+ }
+ assert(session->bytes_to_skip == 0);
+ int ret = worker_process_tcp(worker, handle, msg, len);
+ submitted += ret;
+ return submitted;
+ }
}
- } else {
- assert(session->tasks.len > 0);
- task = array_tail(session->tasks);
+
+ pkt_buf = task->pktbuf;
+ knot_wire_set_id(pkt_buf->wire, msg_id);
+ pkt_buf->size = 2;
+ task->bytes_remaining = msg_size - 2;
+ session->buffering = task;
}
- /* At this point session must have either created new task or it's already assigned. */
+ /* At this point session must have either created new task
+ * or it's already assigned. */
assert(task);
assert(len > 0);
- /* Start reading DNS/TCP message length */
- knot_pkt_t *pkt_buf = task->pktbuf;
- if (task->bytes_remaining == 0 && pkt_buf->size == 0) {
- knot_pkt_clear(pkt_buf);
- /* Read only one byte as TCP fragment may end at a 1B boundary
- * which would lead to OOB read or improper reassembly length. */
- pkt_buf->size = 1;
- pkt_buf->wire[0] = msg[0];
- len -= 1;
- msg += 1;
- if (len == 0) {
- return 0;
- }
- }
- /* Finish reading DNS/TCP message length. */
- if (task->bytes_remaining == 0 && pkt_buf->size == 1) {
- pkt_buf->wire[1] = msg[0];
- ssize_t nbytes = msg_size(pkt_buf->wire);
- len -= 1;
- msg += 1;
- /* Cut off fragment length and start reading DNS message. */
- pkt_buf->size = 0;
- task->bytes_remaining = nbytes;
- }
/* Message is too long, can't process it. */
ssize_t to_read = MIN(len, task->bytes_remaining);
if (pkt_buf->size + to_read > pkt_buf->max_size) {
pkt_buf->size = 0;
task->bytes_remaining = 0;
+ /* TODO: only ignore one message without killing connection */
+ session->buffering = NULL;
return kr_error(EMSGSIZE);
}
/* Buffer message and check if it's complete */
memcpy(pkt_buf->wire + pkt_buf->size, msg, to_read);
pkt_buf->size += to_read;
- if (to_read >= task->bytes_remaining) {
- task->bytes_remaining = 0;
+ task->bytes_remaining -= to_read;
+ if (task->bytes_remaining == 0) {
+ /* Message was assembled, clear temporary. */
+ session->buffering = NULL;
+ session->msg_hdr_idx = 0;
+ session_del_tasks(session, task);
/* Parse the packet and start resolving complete query */
int ret = parse_packet(pkt_buf);
if (ret == 0 && !session->outgoing) {
- ret = qr_task_start(task, pkt_buf);
- if (ret != 0) {
- return ret;
+ /* Start only new queries,
+ * not subrequests that are already pending */
+ ret = request_start(task->ctx, pkt_buf);
+ assert(ret == 0);
+ if (ret == 0) {
+ ret = qr_task_register(task, session);
}
- ret = qr_task_register(task, session);
- if (ret != 0) {
- return ret;
+ if (ret == 0) {
+ submitted += 1;
+ }
+ if (task->leading) {
+ assert(false);
}
- /* Task is now registered in session, clear temporary. */
- session->buffering = NULL;
- submitted += 1;
}
- /* Start only new queries, not subrequests that are already pending */
if (ret == 0) {
- ret = qr_task_step(task, NULL, pkt_buf);
+ const struct sockaddr *addr = session->outgoing ? &session->peer.ip : NULL;
+ ret = qr_task_step(task, addr, pkt_buf);
}
- /* Process next message part in the stream if no error so far */
- if (ret != 0) {
- return ret;
- }
- if (len - to_read > 0 && !session->outgoing) {
+ if (len - to_read > 0) {
+ /* TODO: this is simple via iteration; recursion doesn't really help */
ret = worker_process_tcp(worker, handle, msg + to_read, len - to_read);
if (ret < 0) {
return ret;
}
submitted += ret;
}
- } else {
- task->bytes_remaining -= to_read;
}
return submitted;
}
-int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query, struct kr_qflags options,
- worker_cb_t on_complete, void *baton)
+int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query,
+ struct kr_qflags options, worker_cb_t on_complete,
+ void *baton)
{
if (!worker || !query) {
+ assert(false);
return kr_error(EINVAL);
}
+ struct request_ctx *ctx = request_create(worker, NULL, NULL);
+ if (!ctx) {
+ return kr_error(ENOMEM);
+ }
+
/* Create task */
- struct qr_task *task = qr_task_create(worker, NULL, NULL);
+ struct qr_task *task = qr_task_create(ctx);
if (!task) {
+ request_free(ctx);
return kr_error(ENOMEM);
}
task->baton = baton;
task->on_complete = on_complete;
/* Start task */
- int ret = qr_task_start(task, query);
+ int ret = request_start(ctx, query);
/* Set options late, as qr_task_start() -> kr_resolve_begin() rewrite it. */
- kr_qflags_set(&task->req.options, options);
+ kr_qflags_set(&task->ctx->req.options, options);
if (ret != 0) {
+ request_free(ctx);
qr_task_unref(task);
return ret;
}
return qr_task_step(task, NULL, query);
}
+void worker_session_close(struct session *session)
+{
+ session_close(session);
+}
+
/** Reserve worker buffers */
static int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen)
{
array_init(worker->pool_sessions);
if (array_reserve(worker->pool_mp, ring_maxlen) ||
array_reserve(worker->pool_ioreq, ring_maxlen) ||
- array_reserve(worker->pool_sessions, ring_maxlen))
+ array_reserve(worker->pool_sessions, ring_maxlen)) {
return kr_error(ENOMEM);
+ }
memset(&worker->pkt_pool, 0, sizeof(worker->pkt_pool));
worker->pkt_pool.ctx = mp_new (4 * sizeof(knot_pkt_t));
worker->pkt_pool.alloc = (knot_mm_alloc_t) mp_alloc;
worker->outgoing = map_make();
+ worker->tcp_connected = map_make();
+ worker->tcp_waiting = map_make();
worker->tcp_pipeline_max = MAX_PIPELINED;
+ memset(&worker->stats, 0, sizeof(worker->stats));
return kr_ok();
}
mp_delete(worker->pkt_pool.ctx);
worker->pkt_pool.ctx = NULL;
map_clear(&worker->outgoing);
+ map_clear(&worker->tcp_connected);
+ map_clear(&worker->tcp_waiting);
}
struct worker_ctx *worker_create(struct engine *engine, knot_mm_t *pool,