]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon: logic around struct session was relocated to separate module; input data...
authorGrigorii Demidov <grigorii.demidov@nic.cz>
Thu, 13 Sep 2018 15:28:23 +0000 (17:28 +0200)
committerVladimír Čunát <vladimir.cunat@nic.cz>
Fri, 12 Oct 2018 15:36:40 +0000 (17:36 +0200)
daemon/daemon.mk
daemon/io.c
daemon/io.h
daemon/session.c [new file with mode: 0644]
daemon/session.h [new file with mode: 0644]
daemon/tls.c
daemon/tls.h
daemon/worker.c
daemon/worker.h
lib/defines.h

index c3c15075ef8dfc552a129d844671367a302969c1..eef6e8e5b2da092e008e66f7ec4cfab594881894 100644 (file)
@@ -9,6 +9,7 @@ kresd_SOURCES := \
        daemon/tls_ephemeral_credentials.c \
        daemon/tls_session_ticket-srv.c \
        daemon/zimport.c     \
+       daemon/session.c     \
        daemon/main.c
 
 kresd_DIST := daemon/lua/kres.lua daemon/lua/kres-gen.lua \
index e5b8a139a81566b464ee9863d9fa503109e92e44..09eaed48aab092e0f0a8d36563d0d45673684ed9 100644 (file)
@@ -24,6 +24,7 @@
 #include "daemon/network.h"
 #include "daemon/worker.h"
 #include "daemon/tls.h"
+#include "daemon/session.h"
 
 #define negotiate_bufsize(func, handle, bufsize_want) do { \
     int bufsize = 0; func(handle, &bufsize); \
@@ -48,86 +49,35 @@ static void check_bufsize(uv_handle_t* handle)
 
 #undef negotiate_bufsize
 
-static void session_clear(struct session *s)
+static uv_stream_t *handle_borrow(uv_loop_t *loop)
 {
-       assert(s->tasks.len == 0 && s->waiting.len == 0);
-       array_clear(s->tasks);
-       array_clear(s->waiting);
-       tls_free(s->tls_ctx);
-       tls_client_ctx_free(s->tls_client_ctx);
-       memset(s, 0, sizeof(*s));
-}
-
-void session_free(struct session *s)
-{
-       if (s) {
-               assert(s->tasks.len == 0 && s->waiting.len == 0);
-               session_clear(s);
-               free(s);
+       struct worker_ctx *worker = loop->data;
+       void *req = worker_iohandle_borrow(worker);
+       if (!req) {
+               return NULL;
        }
-}
-
-struct session *session_new(void)
-{
-       return calloc(1, sizeof(struct session));
-}
 
-static struct session *session_borrow(struct worker_ctx *worker)
-{
-       struct session *s = NULL;
-       if (worker->pool_sessions.len > 0) {
-               s = array_tail(worker->pool_sessions);
-               array_pop(worker->pool_sessions);
-               kr_asan_unpoison(s, sizeof(*s));
-       } else {
-               s = session_new();
-       }
-       return s;
-}
-
-static void session_release(struct worker_ctx *worker, uv_handle_t *handle)
-{
-       if (!worker || !handle) {
-               return;
-       }
-       struct session *s = handle->data;
-       if (!s) {
-               return;
-       }
-       assert(s->waiting.len == 0 && s->tasks.len == 0);
-       assert(s->buffering == NULL);
-       if (!s->outgoing && handle->type == UV_TCP) {
-               worker_end_tcp(worker, handle); /* to free the buffering task */
-       }
-       if (worker->pool_sessions.len < MP_FREELIST_SIZE) {
-               session_clear(s);
-               array_push(worker->pool_sessions, s);
-               kr_asan_poison(s, sizeof(*s));
-       } else {
-               session_free(s);
-       }
+       return (uv_stream_t *)req;
 }
 
 static void handle_getbuf(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
 {
-       /* Worker has single buffer which is reused for all incoming
-        * datagrams / stream reads, the content of the buffer is
+       /* UDP sessions use worker buffer for wire data,
+        * TCP sessions use session buffer for wire data
+        * (see session_set_handle()).
+        * TLS sessions use buffer from TLS context.
+        * The content of the worker buffer is
         * guaranteed to be unchanged only for the duration of
         * udp_read() and tcp_read().
         */
        struct session *session = handle->data;
-       uv_loop_t *loop = handle->loop;
-       struct worker_ctx *worker = loop->data;
-       buf->base = (char *)worker->wire_buf;
-       /* Limit TCP stream buffer size to 4K for granularity in batches of incoming queries. */
-       if (handle->type == UV_TCP) {
-               buf->len = MIN(suggested_size, 4096);
-       /* Regular buffer size for subrequests. */
-       } else if (session->outgoing) {
-               buf->len = suggested_size;
-       /* Use recvmmsg() on master sockets if possible. */
+       if (!session_has_tls(session)) {
+               buf->base = (char *) session_wirebuf_get_free_start(session);
+               buf->len = session_wirebuf_get_free_size(session);
        } else {
-               buf->len = sizeof(worker->wire_buf);
+               struct tls_common_ctx *ctx = session_tls_get_common_ctx(session);
+               buf->base = (char *) ctx->recv_buf;
+               buf->len = sizeof(ctx->recv_buf);
        }
 }
 
@@ -137,29 +87,30 @@ void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
        uv_loop_t *loop = handle->loop;
        struct worker_ctx *worker = loop->data;
        struct session *s = handle->data;
-       if (s->closing) {
+       if (session_is_closing(s)) {
                return;
        }
        if (nread <= 0) {
                if (nread < 0) { /* Error response, notify resolver */
-                       worker_submit(worker, (uv_handle_t *)handle, NULL, addr);
+                       worker_submit(s, NULL);
                } /* nread == 0 is for freeing buffers, we don't need to do this */
                return;
        }
        if (addr->sa_family == AF_UNSPEC) {
                return;
        }
-       if (s->outgoing) {
-               assert(s->peer.ip.sa_family != AF_UNSPEC);
-               if (kr_sockaddr_cmp(&s->peer.ip, addr) != 0) {
+       struct sockaddr *peer = session_get_peer(s);
+       if (session_is_outgoing(s)) {
+               assert(peer->sa_family != AF_UNSPEC);
+               if (kr_sockaddr_cmp(peer, addr) != 0) {
                        return;
                }
+       } else {
+               memcpy(peer, addr, kr_sockaddr_len(addr));
        }
-       knot_pkt_t *query = knot_pkt_new(buf->base, nread, &worker->pkt_pool);
-       if (query) {
-               query->max_size = KNOT_WIRE_MAX_PKTSIZE;
-               worker_submit(worker, (uv_handle_t *)handle, query, addr);
-       }
+       ssize_t consumed = session_wirebuf_consume(s, (const uint8_t *)buf->base, nread);
+       assert(consumed == nread);
+       session_wirebuf_process(s);
        mp_flush(worker->pkt_pool.ctx);
 }
 
@@ -167,11 +118,10 @@ static int udp_bind_finalize(uv_handle_t *handle)
 {
        check_bufsize(handle);
        /* Handle is already created, just create context. */
-       struct session *session = session_new();
-       assert(session);
-       session->outgoing = false;
-       session->handle = handle;
-       handle->data = session;
+       struct session *s = session_new();
+       assert(s);
+       session_set_outgoing(s, false);
+       session_set_handle(s, handle);
        return io_start_read(handle);
 }
 
@@ -203,14 +153,14 @@ int udp_bindfd(uv_udp_t *handle, int fd)
 
 static void tcp_timeout_trigger(uv_timer_t *timer)
 {
-       struct session *session = timer->data;
+       struct session *s = timer->data;
 
-       assert(session->outgoing == false);
-       if (session->tasks.len > 0) {
+       assert(session_is_outgoing(s) == false);
+       if (!session_tasklist_is_empty(s)) {
                uv_timer_again(timer);
-       } else if (!session->closing) {
+       } else if (!session_is_closing(s)) {
                uv_timer_stop(timer);
-               worker_session_close(session);
+               session_close(s);
        }
 }
 
@@ -218,50 +168,78 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
 {
        uv_loop_t *loop = handle->loop;
        struct session *s = handle->data;
-       if (s->closing) {
+
+       assert(s && session_get_handle(s) == (uv_handle_t *)handle &&
+              handle->type == UV_TCP); 
+
+       if (session_is_closing(s)) {
                return;
        }
+
        /* nread might be 0, which does not indicate an error or EOF.
         * This is equivalent to EAGAIN or EWOULDBLOCK under read(2). */
        if (nread == 0) {
                return;
        }
-       if (nread == UV_EOF) {
-               nread = 0;
-       }
+
        struct worker_ctx *worker = loop->data;
-       /* TCP pipelining is rather complicated and requires cooperation from the worker
-        * so the whole message reassembly and demuxing logic is inside worker */
-       int ret = 0;
-       if (s->has_tls) {
-               ret = tls_process(worker, handle, (const uint8_t *)buf->base, nread);
-       } else {
-               ret = worker_process_tcp(worker, handle, (const uint8_t *)buf->base, nread);
+
+       if (nread < 0 || !buf->base) {
+               if (kr_verbose_status) {
+                       struct sockaddr *peer = session_get_peer(s);
+                       char peer_str[INET6_ADDRSTRLEN];
+                       inet_ntop(peer->sa_family, kr_inaddr(peer),
+                                 peer_str, sizeof(peer_str));
+                       kr_log_verbose("[io] => connection to '%s' closed by peer (%s)\n", peer_str,
+                                      uv_strerror(nread));
+               }
+               worker_end_tcp(s);
+               return;
        }
+
+       ssize_t consumed = 0;
+       const uint8_t *data = (const uint8_t *)buf->base;
+       ssize_t data_len = nread;
+       if (session_has_tls(s)) {
+               /* buf->base points to start of the tls receive buffer.
+                  Decode data free space in session wire buffer. */
+               consumed = tls_process_input_data(s, (const uint8_t *)buf->base, nread);
+               data = session_wirebuf_get_free_start(s);
+               data_len = consumed;
+       } 
+
+       /* data points to start of the free space in session wire buffer.
+          Simple increase internal counter. */
+       consumed = session_wirebuf_consume(s, data, data_len);
+       assert(consumed == data_len);
+
+       int ret = session_wirebuf_process(s);
        if (ret < 0) {
-               worker_end_tcp(worker, (uv_handle_t *)handle);
+               worker_end_tcp(s);
                /* Exceeded per-connection quota for outstanding requests
                 * stop reading from stream and close after last message is processed. */
-               if (!s->outgoing && !uv_is_closing((uv_handle_t *)&s->timeout)) {
-                       uv_timer_stop(&s->timeout);
-                       if (s->tasks.len == 0) {
-                               worker_session_close(s);
+               uv_timer_t *t = session_get_timer(s);
+               if (!session_is_outgoing(s) && !uv_is_closing((uv_handle_t *)t)) {
+                       uv_timer_stop(t);
+                       if (session_tasklist_is_empty(s)) {
+                               session_close(s);
                        } else { /* If there are tasks running, defer until they finish. */
-                               uv_timer_start(&s->timeout, tcp_timeout_trigger,
+                               uv_timer_start(t, tcp_timeout_trigger,
                                               MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
                        }
                }
        /* Connection spawned at least one request, reset its deadline for next query.
         * https://tools.ietf.org/html/rfc7766#section-6.2.3 */
-       } else if (ret > 0 && !s->outgoing && !s->closing) {
-               uv_timer_again(&s->timeout);
+       } else if (ret > 0 && !session_is_outgoing(s) && !session_is_closing(s)) {
+               uv_timer_t *t = session_get_timer(s);
+               uv_timer_again(t);
        }
        mp_flush(worker->pkt_pool.ctx);
 }
 
 static void _tcp_accept(uv_stream_t *master, int status, bool tls)
 {
-       if (status != 0) {
+       if (status != 0) {
                return;
        }
 
@@ -298,37 +276,40 @@ static void _tcp_accept(uv_stream_t *master, int status, bool tls)
        /* Set deadlines for TCP connection and start reading.
         * It will re-check every half of a request time limit if the connection
         * is idle and should be terminated, this is an educated guess. */
-
-       struct sockaddr *addr = &(session->peer.ip);
-       int addr_len = sizeof(union inaddr);
-       int ret = uv_tcp_getpeername((uv_tcp_t *)client, addr, &addr_len);
-       if (ret || addr->sa_family == AF_UNSPEC) {
-               /* close session, close underlying uv handles and
-                * deallocate (or return to memory pool) memory. */
-               worker_session_close(session);
+       struct session *s = client->data;
+       assert(session_is_outgoing(s) == false);
+
+       struct sockaddr *peer = session_get_peer(s);
+       int peer_len = sizeof(union inaddr);
+       int ret = uv_tcp_getpeername((uv_tcp_t *)client, peer, &peer_len);
+       if (ret || peer->sa_family == AF_UNSPEC) {
+               session_close(s);
                return;
        }
 
+       struct worker_ctx *worker = (struct worker_ctx *)master->loop->data;
        const struct engine *engine = worker->engine;
        const struct network *net = &engine->net;
        uint64_t idle_in_timeout = net->tcp.in_idle_timeout;
 
        uint64_t timeout = KR_CONN_RTT_MAX / 2;
-       session->has_tls = tls;
+       session_set_has_tls(s, tls);
        if (tls) {
                timeout += TLS_MAX_HANDSHAKE_TIME;
-               if (!session->tls_ctx) {
-                       session->tls_ctx = tls_new(master->loop->data);
-                       if (!session->tls_ctx) {
-                               worker_session_close(session);
+               struct tls_ctx_t *ctx = session_tls_get_server_ctx(s);
+               if (!ctx) {
+                       ctx = tls_new(worker);
+                       if (!ctx) {
+                               session_close(s);
                                return;
                        }
-                       session->tls_ctx->c.session = session;
-                       session->tls_ctx->c.handshake_state = TLS_HS_IN_PROGRESS;
+                       ctx->c.session = s;
+                       ctx->c.handshake_state = TLS_HS_IN_PROGRESS;
+                       session_tls_set_server_ctx(s, ctx);
                }
        }
-       uv_timer_t *timer = &session->timeout;
-       uv_timer_start(timer, tcp_timeout_trigger, timeout, idle_in_timeout);
+       uv_timer_t *t = session_get_timer(s);
+       uv_timer_start(t, tcp_timeout_trigger, timeout, idle_in_timeout);
        io_start_read((uv_handle_t *)client);
 }
 
@@ -444,13 +425,12 @@ int io_create(uv_loop_t *loop, uv_handle_t *handle, int type, unsigned family)
                return ret;
        }
        struct worker_ctx *worker = loop->data;
-       struct session *session = session_borrow(worker);
-       assert(session);
-       session->handle = handle;
-       handle->data = session;
-       session->timeout.data = session;
-       uv_timer_init(worker->loop, &session->timeout);
-       return ret;
+       struct session *s = worker_session_borrow(worker);
+       assert(s);
+       session_set_handle(s, handle);
+       uv_timer_t *t = session_get_timer(s);
+       t->data = s;
+       uv_timer_init(worker->loop, t);
 }
 
 void io_deinit(uv_handle_t *handle)
@@ -461,7 +441,7 @@ void io_deinit(uv_handle_t *handle)
        uv_loop_t *loop = handle->loop;
        if (loop && loop->data) {
                struct worker_ctx *worker = loop->data;
-               session_release(worker, handle);
+               worker_session_release(worker, handle);
        } else {
                session_free(handle->data);
        }
index 428cc62a392bb3c7d866bcf6f86a7141e615d77d..1b5e5791d32506b46ff2abeea99aed0b317d417b 100644 (file)
 struct tls_ctx_t;
 struct tls_client_ctx_t;
 
-/* Per-session (TCP or UDP) persistent structure,
- * that exists between remote counterpart and a local socket.
- */
-struct session {
-       bool outgoing; /**< True: to upstream; false: from a client. */
-       bool throttled;
-       bool has_tls;
-       bool connected;
-       bool closing;
-       union inaddr peer;
-       uv_handle_t *handle;
-       uv_timer_t timeout;
-       struct qr_task *buffering; /**< Worker buffers the incomplete TCP query here. */
-       struct tls_ctx_t *tls_ctx;
-       struct tls_client_ctx_t *tls_client_ctx;
-
-       uint8_t msg_hdr[4];  /**< Buffer for DNS message header. */
-       ssize_t msg_hdr_idx; /**< The number of bytes in msg_hdr filled so far. */
-
-       qr_tasklist_t tasks;
-       qr_tasklist_t waiting;
-       ssize_t bytes_to_skip;
-};
-
-void session_free(struct session *s);
-struct session *session_new(void);
-
 int udp_bind(uv_udp_t *handle, struct sockaddr *addr);
 int udp_bindfd(uv_udp_t *handle, int fd);
 int tcp_bind(uv_tcp_t *handle, struct sockaddr *addr);
diff --git a/daemon/session.c b/daemon/session.c
new file mode 100644 (file)
index 0000000..7133baf
--- /dev/null
@@ -0,0 +1,623 @@
+#include <assert.h>
+
+#include <libknot/packet/pkt.h>
+
+#include "lib/defines.h"
+#include "daemon/session.h"
+#include "daemon/engine.h"
+#include "daemon/tls.h"
+#include "daemon/worker.h"
+#include "daemon/io.h"
+
+/** List of tasks. */
+typedef array_t(struct qr_task *) session_tasklist_t;
+
+struct session_flags {
+       bool outgoing : 1;      /**< True: to upstream; false: from a client. */
+       bool throttled : 1;     /**< True: data reading from peer is temporarily stopped. */
+       bool has_tls : 1;       /**< True: given session uses TLS. */
+       bool connected : 1;     /**< True: TCP connection is established. */
+       bool closing : 1;       /**< True: session close sequence is in progress. */
+       bool wirebuf_error : 1; /**< True: last operation with wirebuf ended up with an error. */
+};
+
+
+/* Per-session (TCP or UDP) persistent structure,
+ * that exists between remote counterpart and a local socket.
+ */
+struct session {
+       struct session_flags sflags; /**< miscellaneous flags. */
+       union inaddr peer;           /**< address of peer; is not set for client's UDP sessions. */
+       uv_handle_t *handle;         /**< libuv handle for IO operations. */
+       uv_timer_t timeout;          /**< libuv handle for timer. */
+
+       struct tls_ctx_t *tls_ctx;   /**< server side tls-related data. */
+       struct tls_client_ctx_t *tls_client_ctx; /**< client side tls-related data. */
+
+       session_tasklist_t tasks;    /**< list of tasks which assotiated with given session. */
+       session_tasklist_t waiting;  /**< list of tasks been waiting for IO (subset of taska). */
+
+       uint8_t *wire_buf;           /**< Buffer for DNS message. */
+       ssize_t wire_buf_size;       /**< Buffer size. */
+       ssize_t wire_buf_idx;        /**< The number of bytes in wire_buf filled so far. */
+};
+
+static void on_session_close(uv_handle_t *handle)
+{
+       uv_loop_t *loop = handle->loop;
+       struct worker_ctx *worker = loop->data;
+       struct session *session = handle->data;
+       assert(session->handle == handle);
+       io_deinit(handle);
+       worker_iohandle_release(worker, handle);
+}
+
+static void on_session_timer_close(uv_handle_t *timer)
+{
+       struct session *session = timer->data;
+       uv_handle_t *handle = session->handle;
+       assert(handle && handle->data == session);
+       assert (session->sflags.outgoing || handle->type == UV_TCP);
+       if (!uv_is_closing(handle)) {
+               uv_close(handle, on_session_close);
+       }
+}
+
+void session_free(struct session *s)
+{
+       if (s) {
+               assert(s->tasks.len == 0 && s->waiting.len == 0);
+               session_clear(s);
+               free(s);
+       }
+}
+
+void session_clear(struct session *s)
+{
+       assert(s->tasks.len == 0 && s->waiting.len == 0);
+       if (s->handle && s->handle->type == UV_TCP) {
+               free(s->wire_buf);
+       }
+       array_clear(s->tasks);
+       array_clear(s->waiting);
+       tls_free(s->tls_ctx);
+       tls_client_ctx_free(s->tls_client_ctx);
+       memset(s, 0, sizeof(*s));
+}
+
+struct session *session_new(void)
+{
+       return calloc(1, sizeof(struct session));
+}
+
+void session_close(struct session *session)
+{
+       assert(session->tasks.len == 0 && session->waiting.len == 0);
+
+       if (session->sflags.closing) {
+               return;
+       }
+
+       uv_handle_t *handle = session->handle;
+       io_stop_read(handle);
+       session->sflags.closing = true;
+       if (session->sflags.outgoing &&
+           session->peer.ip.sa_family != AF_UNSPEC) {
+               struct worker_ctx *worker = handle->loop->data;
+               struct sockaddr *peer = &session->peer.ip;
+               worker_del_tcp_connected(worker, peer);
+               session->sflags.connected = false;
+       }
+
+       if (!uv_is_closing((uv_handle_t *)&session->timeout)) {
+               uv_timer_stop(&session->timeout);
+               if (session->tls_client_ctx) {
+                       tls_close(&session->tls_client_ctx->c);
+               }
+               if (session->tls_ctx) {
+                       tls_close(&session->tls_ctx->c);
+               }
+
+               session->timeout.data = session;
+               uv_close((uv_handle_t *)&session->timeout, on_session_timer_close);
+       }
+}
+
+int session_start_read(struct session *session)
+{
+       return io_start_read(session->handle);
+}
+
+int session_waitinglist_add(struct session *session, struct qr_task *task)
+{
+       for (int i = 0; i < session->waiting.len; ++i) {
+               if (session->waiting.at[i] == task) {
+                       return i;
+               }
+       }
+       int ret = array_push(session->waiting, task);
+       if (ret >= 0) {
+               worker_task_ref(task);
+       }
+       return ret;
+}
+
+int session_waitinglist_del(struct session *session, struct qr_task *task)
+{
+       int ret = kr_error(ENOENT);
+       for (int i = 0; i < session->waiting.len; ++i) {
+               if (session->waiting.at[i] == task) {
+                       array_del(session->waiting, i);
+                       worker_task_unref(task);
+                       ret = kr_ok();
+                       break;
+               }
+       }
+       return ret;
+}
+
+int session_waitinglist_del_index(struct session *session, int index)
+{
+       int ret = kr_error(ENOENT);
+       if (index < session->waiting.len) {
+               struct qr_task *task = session->waiting.at[index];
+               array_del(session->waiting, index);
+               worker_task_unref(task);
+               ret = kr_ok();
+       }
+       return ret;
+}
+
+int session_tasklist_add(struct session *session, struct qr_task *task)
+{
+       for (int i = 0; i < session->tasks.len; ++i) {
+               if (session->tasks.at[i] == task) {
+                       return i;
+               }
+       }
+       int ret = array_push(session->tasks, task);
+       if (ret >= 0) {
+               worker_task_ref(task);
+       }
+       return ret;
+}
+
+int session_tasklist_del(struct session *session, struct qr_task *task)
+{
+       int ret = kr_error(ENOENT);
+       for (int i = 0; i < session->tasks.len; ++i) {
+               if (session->tasks.at[i] == task) {
+                       array_del(session->tasks, i);
+                       worker_task_unref(task);
+                       ret = kr_ok();
+                       break;
+               }
+       }
+       return ret;
+}
+
+int session_tasklist_del_index(struct session *session, int index)
+{
+       int ret = kr_error(ENOENT);
+       if (index < session->tasks.len) {
+               struct qr_task *task = session->tasks.at[index];
+               array_del(session->tasks, index);
+               worker_task_unref(task);
+               ret = kr_ok();
+       }
+       return ret;
+}
+
+struct qr_task* session_tasklist_find(const struct session *session, uint16_t msg_id)
+{
+       struct qr_task *ret = NULL;
+       const session_tasklist_t *tasklist = &session->tasks;
+       for (size_t i = 0; i < tasklist->len; ++i) {
+               struct qr_task *task = tasklist->at[i];
+               knot_pkt_t *pktbuf = worker_task_get_pktbuf(task);
+               uint16_t task_msg_id = knot_wire_get_id(pktbuf->wire);
+               if (task_msg_id == msg_id) {
+                       ret = task;
+                       break;
+               }
+       }
+       return ret;
+}
+
+bool session_is_outgoing(const struct session *session)
+{
+       return session->sflags.outgoing;
+}
+
+void session_set_outgoing(struct session *session, bool outgoing)
+{
+       session->sflags.outgoing = outgoing;
+}
+
+bool session_is_closing(const struct session *session)
+{
+       return session->sflags.closing;
+}
+
+void session_set_closing(struct session *session, bool closing)
+{
+       session->sflags.closing = closing;
+}
+
+bool session_is_connected(const struct session *session)
+{
+       return session->sflags.connected;
+}
+
+void session_set_connected(struct session *session, bool connected)
+{
+       session->sflags.connected = connected;
+}
+
+bool session_is_throttled(const struct session *session)
+{
+       return session->sflags.throttled;
+}
+
+void session_set_throttled(struct session *session, bool throttled)
+{
+       session->sflags.throttled = throttled;
+}
+
+struct sockaddr *session_get_peer(struct session *session)
+{
+       return &session->peer.ip;
+}
+
+struct tls_ctx_t *session_tls_get_server_ctx(const struct session *session)
+{
+       return session->tls_ctx;
+}
+
+void session_tls_set_server_ctx(struct session *session, struct tls_ctx_t *ctx)
+{
+       session->tls_ctx = ctx;
+}
+
+struct tls_client_ctx_t *session_tls_get_client_ctx(const struct session *session)
+{
+       return session->tls_client_ctx;
+}
+
+void session_tls_set_client_ctx(struct session *session, struct tls_client_ctx_t *ctx)
+{
+       session->tls_client_ctx = ctx;
+}
+
+struct tls_common_ctx *session_tls_get_common_ctx(const struct session *session)
+{
+       struct tls_common_ctx *tls_ctx = session->sflags.outgoing ? &session->tls_client_ctx->c :
+                                                                   &session->tls_ctx->c;
+       return tls_ctx;
+}
+
+uv_handle_t *session_get_handle(struct session *session)
+{
+       return session->handle;
+}
+
+int session_set_handle(struct session *session, uv_handle_t *h)
+{
+       if (!h) {
+               return kr_error(EINVAL);
+       }
+
+       if (h->type == UV_TCP) {
+               uint8_t *wire_buf = malloc(KNOT_WIRE_MAX_PKTSIZE);
+               if (!wire_buf) {
+                       return kr_error(ENOMEM);
+               }
+               session->wire_buf = wire_buf;
+               session->wire_buf_size = KNOT_WIRE_MAX_PKTSIZE;
+       } else if (h->type == UV_UDP) {
+               assert(h->loop->data);
+               struct worker_ctx *worker = h->loop->data;
+               session->wire_buf = worker->wire_buf;
+               session->wire_buf_size = sizeof(worker->wire_buf);
+       }
+       
+       session->handle = h;
+       h->data = session;
+       return kr_ok();
+}
+
+uv_timer_t *session_get_timer(struct session *session)
+{
+       return &session->timeout;
+}
+
+size_t session_tasklist_get_len(const struct session *session)
+{
+       return session->tasks.len;
+}
+
+size_t session_waitinglist_get_len(const struct session *session)
+{
+       return session->waiting.len;
+}
+
+bool session_tasklist_is_empty(const struct session *session)
+{
+       return session_tasklist_get_len(session) == 0;
+}
+
+bool session_waitinglist_is_empty(const struct session *session)
+{
+       return session_waitinglist_get_len(session) == 0;
+}
+
+bool session_is_empty(const struct session *session)
+{
+       return session_tasklist_is_empty(session) &&
+              session_waitinglist_is_empty(session);
+}
+
+bool session_has_tls(const struct session *session)
+{
+       return session->sflags.has_tls;
+}
+
+void session_set_has_tls(struct session *session, bool has_tls)
+{
+       session->sflags.has_tls = has_tls;
+}
+
+struct qr_task *session_waitinglist_get_first(const struct session *session)
+{
+       struct qr_task *t = NULL;
+       if (session->waiting.len > 0) {
+               t = session->waiting.at[0];
+       }
+       return t;
+}
+
+struct qr_task *session_tasklist_get_first(const struct session *session)
+{
+       struct qr_task *t = NULL;
+       if (session->tasks.len > 0) {
+               t = session->tasks.at[0];
+       }
+       return t;
+}
+
+void session_waitinglist_retry(struct session *session, bool increase_timeout_cnt)
+{
+       while (session->waiting.len > 0) {
+               struct qr_task *task = session->waiting.at[0];
+               session_tasklist_del(session, task);
+               array_del(session->waiting, 0);
+               assert(worker_task_numrefs(task) > 1);
+               if (increase_timeout_cnt) {
+                       worker_task_timeout_inc(task);
+               }
+               worker_task_unref(task);
+               worker_task_step(task, NULL, NULL);
+       }
+}
+
+void session_waitinglist_finalize(struct session *session, int status)
+{
+       while (session->waiting.len > 0) {
+               struct qr_task *t = session->waiting.at[0];
+               array_del(session->waiting, 0);
+               session_tasklist_del(session, t);
+               if (session->sflags.outgoing) {
+                       worker_task_finalize(t, status);
+               } else {
+                       struct request_ctx *ctx = worker_task_get_request(t);
+                       assert(worker_request_get_source_session(ctx) == session);
+                       worker_request_set_source_session(ctx, NULL);
+               }
+               worker_task_unref(t);
+       }
+}
+
+void session_tasklist_finalize(struct session *session, int status)
+{
+       while (session->tasks.len > 0) {
+               struct qr_task *t = session->tasks.at[0];
+               array_del(session->tasks, 0);
+               if (session->sflags.outgoing) {
+                       worker_task_finalize(t, status);
+               } else {
+                       struct request_ctx *ctx = worker_task_get_request(t);
+                       assert(worker_request_get_source_session(ctx) == session);
+                       worker_request_set_source_session(ctx, NULL);
+               }
+               worker_task_unref(t);
+       }
+}
+
+void session_tasks_finalize(struct session *session, int status)
+{
+       session_waitinglist_finalize(session, status);
+       session_tasklist_finalize(session, status);
+}
+
+int session_timer_start(struct session *session, uv_timer_cb cb,
+                       uint64_t timeout, uint64_t repeat)
+{
+       uv_timer_t *timer = &session->timeout;
+       assert(timer->data == session);
+       int ret = uv_timer_start(timer, cb, timeout, repeat);
+       if (ret != 0) {
+               uv_timer_stop(timer);
+               return kr_error(ENOMEM);
+       }
+       return 0;
+}
+
+int session_timer_restart(struct session *session)
+{
+       return uv_timer_again(&session->timeout);
+}
+
+int session_timer_stop(struct session *session)
+{
+       return uv_timer_stop(&session->timeout);
+}
+
+ssize_t session_wirebuf_consume(struct session *session, const uint8_t *data, ssize_t len)
+{
+       if (data != &session->wire_buf[session->wire_buf_idx]) {
+               /* shouldn't happen */
+               return kr_error(EINVAL);
+       }
+
+       if (session->wire_buf_idx + len > session->wire_buf_size) {
+               /* shouldn't happen */
+               return kr_error(EINVAL);
+       }
+
+       session->wire_buf_idx += len;
+       return len;
+}
+
+knot_pkt_t *session_produce_packet(struct session *session, knot_mm_t *mm)
+{
+       if (session->wire_buf_idx == 0) {
+               session->sflags.wirebuf_error = false;
+               return NULL;
+       }
+       
+       const uv_handle_t *handle = session->handle;
+       uint8_t *msg_start = session->wire_buf;
+       uint16_t msg_size = session->wire_buf_idx;
+       
+       session->sflags.wirebuf_error = true;
+       if (!handle) {
+               return NULL;
+       } else if (handle->type == UV_TCP) {
+               if (session->wire_buf_idx < 2) {
+                       session->sflags.wirebuf_error = false;
+                       return NULL;
+               }
+               msg_size = knot_wire_read_u16(session->wire_buf);
+               if (msg_size + 2 > session->wire_buf_idx) {
+                       session->sflags.wirebuf_error = false;
+                       return NULL;
+               }
+               msg_start += 2;
+       }
+
+       knot_pkt_t *pkt = knot_pkt_new(msg_start, msg_size, mm);
+       if (pkt) {
+               session->sflags.wirebuf_error = false;
+       }
+       return pkt;
+}
+
+int session_discard_packet(struct session *session, const knot_pkt_t *pkt)
+{
+       uv_handle_t *handle = session->handle;
+       uint8_t *wirebuf_data_start = session->wire_buf;
+       size_t wirebuf_msg_data_size = session->wire_buf_idx;
+       uint8_t *wirebuf_msg_start = session->wire_buf;
+       size_t wirebuf_msg_size = session->wire_buf_idx;
+       uint8_t *pkt_msg_start = pkt->wire;
+       size_t pkt_msg_size = pkt->size;
+
+       session->sflags.wirebuf_error = true;
+       if (!handle) {
+               return kr_error(EINVAL);
+       } else if (handle->type == UV_TCP) {
+               if (session->wire_buf_idx < 2) {
+                       return kr_error(EINVAL);
+               }
+               wirebuf_msg_size = knot_wire_read_u16(wirebuf_data_start);
+               wirebuf_msg_start += 2;
+               wirebuf_msg_data_size = wirebuf_msg_size + 2;
+       }
+
+       if (wirebuf_msg_start != pkt_msg_start || wirebuf_msg_size != pkt_msg_size) {
+               return kr_error(EINVAL);
+       }
+
+       if (wirebuf_msg_data_size > session->wire_buf_idx) {
+               return kr_error(EINVAL);
+       }
+       
+       uint16_t wirebuf_data_amount = session->wire_buf_idx - wirebuf_msg_data_size;
+       if (wirebuf_data_amount) {
+               if (wirebuf_msg_data_size < wirebuf_data_amount) {
+                       memmove(wirebuf_data_start, &wirebuf_data_start[wirebuf_msg_data_size],
+                               wirebuf_data_amount);
+               } else {
+                       memcpy(wirebuf_data_start, &wirebuf_data_start[wirebuf_msg_data_size],
+                              wirebuf_data_amount);
+               }
+       }
+
+       session->wire_buf_idx = wirebuf_data_amount;
+       session->sflags.wirebuf_error = false;
+
+       return kr_ok();
+}
+
+bool session_wirebuf_error(struct session *session)
+{
+       return session->sflags.wirebuf_error;
+}
+
+uint8_t *session_wirebuf_get_start(struct session *session)
+{
+       return session->wire_buf;
+}
+
+size_t session_wirebuf_get_len(struct session *session)
+{
+       return session->wire_buf_idx;
+}
+
+size_t session_wirebuf_get_size(struct session *session)
+{
+       return sizeof(session->wire_buf);
+}
+
+uint8_t *session_wirebuf_get_free_start(struct session *session)
+{
+       return &session->wire_buf[session->wire_buf_idx];
+}
+
+size_t session_wirebuf_get_free_size(struct session *session)
+{
+       return session->wire_buf_size - session->wire_buf_idx;
+}
+
+void session_poison(struct session *session)
+{
+       kr_asan_poison(session, sizeof(*session));
+}
+
+void session_unpoison(struct session *session)
+{
+       kr_asan_unpoison(session, sizeof(*session));
+}
+
+int session_wirebuf_process(struct session *session)
+{
+       int ret = 0;
+       if (session->wire_buf_idx == 0) {
+               return ret;
+       }
+       struct worker_ctx *worker = session_get_handle(session)->loop->data;
+       knot_pkt_t *query = NULL;
+       while (((query = session_produce_packet(session, &worker->pkt_pool)) != NULL) && (ret < 100)) {
+               worker_submit(session, query);
+               if (session_discard_packet(session, query) < 0) {
+                       break;
+               }
+               ret += 1;
+       }
+       assert(ret < 100);
+       if (session_wirebuf_error(session)) {
+               ret = -1;
+       }
+       return ret;
+}
+
diff --git a/daemon/session.h b/daemon/session.h
new file mode 100644 (file)
index 0000000..93f8add
--- /dev/null
@@ -0,0 +1,142 @@
+/*  Copyright (C) 2018 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+
+    This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation, either version 3 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program.  If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include <stdbool.h>
+#include <uv.h>
+#include "lib/generic/array.h"
+
+struct qr_task;
+struct worker_ctx;
+struct session;
+
+/* Allocate new session. */
+struct session *session_new(void);
+/* Clear and free given session. */
+void session_free(struct session *s);
+/* Clear session. */
+void session_clear(struct session *s);
+/** Close session. */
+void session_close(struct session *session);
+/** Start reading from underlying libuv IO handle. */
+int session_start_read(struct session *session);
+
+/** List of tasks been waiting for IO. */
+/** Check if list is empty. */
+bool session_waitinglist_is_empty(const struct session *session);
+/** Get the first element. */
+struct qr_task *session_waitinglist_get_first(const struct session *session);
+/** Get the list length. */
+size_t session_waitinglist_get_len(const struct session *session);
+/** Add task to the list. */
+int session_waitinglist_add(struct session *session, struct qr_task *task);
+/** Remove task from the list. */
+int session_waitinglist_del(struct session *session, struct qr_task *task);
+/** Remove task from the list by index. */
+int session_waitinglist_del_index(struct session *session, int index);
+/** Retry resolution for each task in the list. */
+void session_waitinglist_retry(struct session *session, bool increase_timeout_cnt);
+/** Finalize all tasks in the list. */
+void session_waitinglist_finalize(struct session *session, int status);
+
+/** List of tasks associated with session. */
+/** Check if list is empty. */
+bool session_tasklist_is_empty(const struct session *session);
+/** Get the first element. */
+struct qr_task *session_tasklist_get_first(const struct session *session);
+/** Get the list length. */
+size_t session_tasklist_get_len(const struct session *session);
+/** Add task to the list. */
+int session_tasklist_add(struct session *session, struct qr_task *task);
+/** Remove task from the list. */
+int session_tasklist_del(struct session *session, struct qr_task *task);
+/** Remove task from the list by index. */
+int session_tasklist_del_index(struct session *session, int index);
+/** Find task with given msg_id */
+struct qr_task* session_tasklist_find(const struct session *session, uint16_t msg_id);
+/** Finalize all tasks in the list. */
+void session_tasklist_finalize(struct session *session, int status);
+
+/** Both of task lists (associated & waiting). */
+/** Check if empty. */
+bool session_is_empty(const struct session *session);
+/** Finalize all tasks. */
+void session_tasks_finalize(struct session *session, int status);
+
+/** Operations with flags */
+bool session_is_outgoing(const struct session *session);
+void session_set_outgoing(struct session *session, bool outgoing);
+bool session_is_closing(const struct session *session);
+void session_set_closing(struct session *session, bool closing);
+bool session_is_connected(const struct session *session);
+void session_set_connected(struct session *session, bool connected);
+bool session_is_throttled(const struct session *session);
+void session_set_throttled(struct session *session, bool throttled);
+bool session_has_tls(const struct session *session);
+void session_set_has_tls(struct session *session, bool has_tls);
+bool session_wirebuf_error(struct session *session);
+
+/** Get peer address. */
+struct sockaddr *session_get_peer(struct session *session);
+/** Get pointer to server-side tls-related data. */
+struct tls_ctx_t *session_tls_get_server_ctx(const struct session *session);
+/** Set pointer to server-side tls-related data. */
+void session_tls_set_server_ctx(struct session *session, struct tls_ctx_t *ctx);
+/** Get pointer to client-side tls-related data. */
+struct tls_client_ctx_t *session_tls_get_client_ctx(const struct session *session);
+/** Set pointer to client-side tls-related data. */
+void session_tls_set_client_ctx(struct session *session, struct tls_client_ctx_t *ctx);
+/** Get pointer to that part of tls-related data which has common structure for 
+ *  server and client. */
+struct tls_common_ctx *session_tls_get_common_ctx(const struct session *session);
+
+/** Get pointer to underlying libuv handle for IO operations. */
+uv_handle_t *session_get_handle(struct session *session);
+/** Set pointer to libuv handle for IO operations. */
+int session_set_handle(struct session *session, uv_handle_t *handle);
+
+/** Get pointer to session timer handle. */
+uv_timer_t *session_get_timer(struct session *session);
+/** Start session timer. */
+int session_timer_start(struct session *session, uv_timer_cb cb,
+                       uint64_t timeout, uint64_t repeat);
+/** Restart session timer without changing it parameters. */
+int session_timer_restart(struct session *session);
+/** Stop session timer. */
+int session_timer_stop(struct session *session);
+
+/** Get start of session buffer for wire data. */
+uint8_t *session_wirebuf_get_start(struct session *session);
+/** Get size of session wirebuffer. */
+size_t session_wirebuf_get_size(struct session *session);
+/** Get length of data in the session wirebuffer. */
+size_t session_wirebuf_get_len(struct session *session);
+/** Get start of free space in session wirebuffer. */
+uint8_t *session_wirebuf_get_free_start(struct session *session);
+/** Get amount of free space in session wirebuffer. */
+size_t session_wirebuf_get_free_size(struct session *session);
+int session_wirebuf_process(struct session *session);
+ssize_t session_wirebuf_consume(struct session *session,
+                               const uint8_t *data, ssize_t len);
+
+/** poison session structure with ASAN. */
+void session_poison(struct session *session);
+/** unpoison session structure with ASAN. */
+void session_unpoison(struct session *session);
+
+knot_pkt_t *session_produce_packet(struct session *session, knot_mm_t *mm);
+int session_discard_packet(struct session *session, const knot_pkt_t *pkt);
index 0fdc6f9e164ce08c11bc3a4cb9902121b4ec91a2..65393a4e45b95855c57edc4c4c84703e4b780845 100644 (file)
@@ -34,6 +34,7 @@
 #include "daemon/io.h"
 #include "daemon/tls.h"
 #include "daemon/worker.h"
+#include "daemon/session.h"
 
 #define EPHEMERAL_CERT_EXPIRATION_SECONDS_RENEW_BEFORE 60*60*24*7
 #define GNUTLS_PIN_MIN_VERSION  0x030400
@@ -354,9 +355,10 @@ void tls_close(struct tls_common_ctx *ctx)
        assert(ctx->session);
 
        if (ctx->handshake_state == TLS_HS_DONE) {
+               const struct sockaddr *peer = session_get_peer(ctx->session);
                kr_log_verbose("[%s] closing tls connection to `%s`\n",
                               ctx->client_side ? "tls_client" : "tls",
-                              kr_straddr(&ctx->session->peer.ip));
+                              kr_straddr(peer));
                ctx->handshake_state = TLS_HS_CLOSING;
                gnutls_bye(ctx->tls_session, GNUTLS_SHUT_RDWR);
        }
@@ -384,12 +386,11 @@ int tls_write(uv_write_t *req, uv_handle_t *handle, knot_pkt_t *pkt, uv_write_cb
                return kr_error(EINVAL);
        }
 
-       struct session *session = handle->data;
-       struct tls_common_ctx *tls_ctx = session->outgoing ? &session->tls_client_ctx->c :
-                                                            &session->tls_ctx->c;
+       struct session *s = handle->data;
+       struct tls_common_ctx *tls_ctx = session_tls_get_common_ctx(s);
 
        assert (tls_ctx);
-       assert (session->outgoing == tls_ctx->client_side);
+       assert (session_is_outgoing(s) == tls_ctx->client_side);
 
        const uint16_t pkt_size = htons(pkt->size);
        const char *logstring = tls_ctx->client_side ? client_logstring : server_logstring;
@@ -426,17 +427,16 @@ int tls_write(uv_write_t *req, uv_handle_t *handle, knot_pkt_t *pkt, uv_write_cb
        return kr_ok();
 }
 
-int tls_process(struct worker_ctx *worker, uv_stream_t *handle, const uint8_t *buf, ssize_t nread)
+ssize_t tls_process_input_data(struct session *s, const uint8_t *buf, ssize_t nread)
 {
-       struct session *session = handle->data;
-       struct tls_common_ctx *tls_p = session->outgoing ? &session->tls_client_ctx->c :
-                                                          &session->tls_ctx->c;
+       struct tls_common_ctx *tls_p = session_tls_get_common_ctx(s);
        if (!tls_p) {
                return kr_error(ENOSYS);
        }
 
-       assert(tls_p->session == session);
-
+       assert(tls_p->session == s);
+       assert(tls_p->recv_buf == buf && nread <= sizeof(tls_p->recv_buf));
+       
        const char *logstring = tls_p->client_side ? client_logstring : server_logstring;
 
        tls_p->buf = buf;
@@ -455,9 +455,13 @@ int tls_process(struct worker_ctx *worker, uv_stream_t *handle, const uint8_t *b
        }
 
        /* See https://gnutls.org/manual/html_node/Data-transfer-and-termination.html#Data-transfer-and-termination */
-       int submitted = 0;
+       ssize_t submitted = 0;
+       bool is_retrying = false;
+       uint64_t retrying_start = 0;
+       uint8_t *wire_buf = session_wirebuf_get_free_start(s);
+       size_t wire_buf_size = session_wirebuf_get_free_size(s);
        while (true) {
-               ssize_t count = gnutls_record_recv(tls_p->tls_session, tls_p->recv_buf, sizeof(tls_p->recv_buf));
+               ssize_t count = gnutls_record_recv(tls_p->tls_session, wire_buf, wire_buf_size);
                if (count == GNUTLS_E_AGAIN) {
                        break; /* No data available */
                } else if (count == GNUTLS_E_INTERRUPTED) {
@@ -479,17 +483,15 @@ int tls_process(struct worker_ctx *worker, uv_stream_t *handle, const uint8_t *b
                        kr_log_verbose("[%s] gnutls_record_recv failed: %s (%zd)\n",
                                     logstring, gnutls_strerror_name(count), count);
                        return kr_error(EIO);
-               }
-               DEBUG_MSG("[%s] submitting %zd data to worker\n", logstring, count);
-               int ret = worker_process_tcp(worker, handle, tls_p->recv_buf, count);
-               if (ret < 0) {
-                       return ret;
-               }
-               if (count <= 0) {
+               } else if (count == 0) {
                        break;
                }
-               submitted += ret;
+               DEBUG_MSG("[%s] received %zd data\n", logstring, count);
+               wire_buf += count;
+               wire_buf_size -= count;
+               submitted += count;
        }
+       assert(tls_p->consumed == tls_p->nread);
        return submitted;
 }
 
@@ -1127,13 +1129,13 @@ int tls_client_connect_start(struct tls_client_ctx_t *client_ctx,
                return kr_error(EINVAL);
        }
 
-       assert(session->outgoing && session->handle->type == UV_TCP);
+       assert(session_is_outgoing(session) && session_get_handle(session)->type == UV_TCP);
 
        struct tls_common_ctx *ctx = &client_ctx->c;
 
        gnutls_session_set_ptr(ctx->tls_session, client_ctx);
        gnutls_handshake_set_timeout(ctx->tls_session, ctx->worker->engine->net.tcp.tls_handshake_timeout);
-       session->tls_client_ctx = client_ctx;
+       session_tls_set_client_ctx(session, client_ctx);
        ctx->handshake_cb = handshake_cb;
        ctx->handshake_state = TLS_HS_IN_PROGRESS;
        ctx->session = session;
index d208f4cb80e0acc7a1c48e3082e46d4ef53dccbd..1bfa6ef6de512fd3de250e827b46bf45f4f5b18a 100644 (file)
@@ -134,7 +134,7 @@ int tls_write(uv_write_t *req, uv_handle_t* handle, knot_pkt_t * pkt, uv_write_c
 /*! Unwrap incoming data from a TLS stream and pass them to TCP session.
  * @return the number of newly-completed requests (>=0) or an error code
  */
-int tls_process(struct worker_ctx *worker, uv_stream_t *handle, const uint8_t *buf, ssize_t nread);
+ssize_t tls_process_input_data(struct session *s, const uint8_t *buf, ssize_t nread);
 
 /*! Set TLS certificate and key from files. */
 int tls_certificate_set(struct network *net, const char *tls_cert, const char *tls_key);
index 5a6e2e58357a3018fafa68383ac75b99c053fc10..d62633ab3713d419826a3bef6ada5bb85a3f8055 100644 (file)
@@ -36,6 +36,7 @@
 #include "daemon/io.h"
 #include "daemon/tls.h"
 #include "daemon/zimport.h"
+#include "daemon/session.h"
 
 #define VERBOSE_MSG(qry, fmt...) QRVERBOSE(qry, "wrkr", fmt)
 
@@ -97,11 +98,6 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle,
                        struct sockaddr *addr, knot_pkt_t *pkt);
 static int qr_task_finalize(struct qr_task *task, int state);
 static void qr_task_complete(struct qr_task *task);
-static int worker_add_tcp_connected(struct worker_ctx *worker,
-                                   const struct sockaddr *addr,
-                                   struct session *session);
-static int worker_del_tcp_connected(struct worker_ctx *worker,
-                                   const struct sockaddr *addr);
 static struct session* worker_find_tcp_connected(struct worker_ctx *worker,
                                                 const struct sockaddr *addr);
 static int worker_add_tcp_waiting(struct worker_ctx *worker,
@@ -111,14 +107,7 @@ static int worker_del_tcp_waiting(struct worker_ctx *worker,
                                  const struct sockaddr *addr);
 static struct session* worker_find_tcp_waiting(struct worker_ctx *worker,
                                               const struct sockaddr *addr);
-static int session_add_waiting(struct session *session, struct qr_task *task);
-static int session_del_waiting(struct session *session, struct qr_task *task);
-static int session_add_tasks(struct session *session, struct qr_task *task);
-static int session_del_tasks(struct session *session, struct qr_task *task);
-static void session_close(struct session *session);
 static void on_session_idle_timeout(uv_timer_t *timer);
-static int timer_start(struct session *session, uv_timer_cb cb,
-                      uint64_t timeout, uint64_t repeat);
 static void on_tcp_connect_timeout(uv_timer_t *timer);
 static void on_tcp_watchdog_timeout(uv_timer_t *timer);
 
@@ -248,8 +237,8 @@ static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype, sa_family_t
        /* Set current handle as a subrequest type. */
        struct session *session = handle->data;
        if (ret == 0) {
-               session->outgoing = true;
-               ret = session_add_tasks(session, task);
+               session_set_outgoing(session, true);
+               ret = session_tasklist_add(session, task);
        }
        if (ret < 0) {
                io_deinit(handle);
@@ -262,75 +251,53 @@ static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype, sa_family_t
        return handle;
 }
 
-static void on_session_close(uv_handle_t *handle)
-{
-       uv_loop_t *loop = handle->loop;
-       struct worker_ctx *worker = loop->data;
-       struct session *session = handle->data;
-       assert(session->handle == handle);
-       session->handle = NULL;
-       io_deinit(handle);
-       iohandle_release(worker, handle);
-}
-
-static void on_session_timer_close(uv_handle_t *timer)
-{
-       struct session *session = timer->data;
-       uv_handle_t *handle = session->handle;
-       assert(handle && handle->data == session);
-       assert (session->outgoing || handle->type == UV_TCP);
-       if (!uv_is_closing(handle)) {
-               uv_close(handle, on_session_close);
-       }
-}
-
 static void ioreq_kill_udp(uv_handle_t *req, struct qr_task *task)
 {
        assert(req);
-       struct session *session = req->data;
-       assert(session->outgoing);
-       if (session->closing) {
+       struct session *s = req->data;
+       assert(session_is_outgoing(s));
+       if (session_is_closing(s)) {
                return;
        }
-       uv_timer_stop(&session->timeout);
-       session_del_tasks(session, task);
-       assert(session->tasks.len == 0);
-       session_close(session);
+       uv_timer_t *t = session_get_timer(s);
+       uv_timer_stop(t);
+       session_tasklist_del(s, task);
+       assert(session_tasklist_is_empty(s));
+       session_close(s);
 }
 
 static void ioreq_kill_tcp(uv_handle_t *req, struct qr_task *task)
 {
        assert(req);
-       struct session *session = req->data;
-       assert(session->outgoing);
-       if (session->closing) {
+       struct session *s = req->data;
+       assert(session_is_outgoing(s));
+       if (session_is_closing(s)) {
                return;
        }
 
-       session_del_waiting(session, task);
-       session_del_tasks(session, task);
+       session_waitinglist_del(s, task);
+       session_tasklist_del(s, task);
 
        int res = 0;
 
-       if (session->outgoing && session->peer.ip.sa_family != AF_UNSPEC &&
-           session->tasks.len == 0 && session->waiting.len == 0 && !session->closing) {
-               assert(session->peer.ip.sa_family == AF_INET ||
-                      session->peer.ip.sa_family == AF_INET6);
+       const struct sockaddr *peer = session_get_peer(s);
+       if (peer->sa_family != AF_UNSPEC && session_is_empty(s) && !session_is_closing(s)) {
+               assert(peer->sa_family == AF_INET || peer->sa_family == AF_INET6);
                res = 1;
-               if (session->connected) {
+               if (session_is_connected(s)) {
                        /* This is outbound TCP connection which can be reused.
                        * Close it after timeout */
-                       uv_timer_t *timer = &session->timeout;
-                       timer->data = session;
-                       uv_timer_stop(timer);
-                       res = uv_timer_start(timer, on_session_idle_timeout,
+                       uv_timer_t *t = session_get_timer(s);
+                       t->data = s;
+                       uv_timer_stop(t);
+                       res = uv_timer_start(t, on_session_idle_timeout,
                                             KR_CONN_RTT_MAX, 0);
                }
        }
 
        if (res != 0) {
                /* if any errors, close the session immediately */
-               session_close(session);
+               session_close(s);
        }
 }
 
@@ -348,100 +315,6 @@ static void ioreq_kill_pending(struct qr_task *task)
        task->pending_count = 0;
 }
 
-static void session_close(struct session *session)
-{
-       assert(session->tasks.len == 0 && session->waiting.len == 0);
-
-       if (session->closing) {
-               return;
-       }
-
-       if (!session->outgoing && session->buffering != NULL) {
-               qr_task_complete(session->buffering);
-       }
-       session->buffering = NULL;
-
-       uv_handle_t *handle = session->handle;
-       io_stop_read(handle);
-       session->closing = true;
-       if (session->outgoing &&
-           session->peer.ip.sa_family != AF_UNSPEC) {
-               struct worker_ctx *worker = get_worker();
-               struct sockaddr *peer = &session->peer.ip;
-               worker_del_tcp_connected(worker, peer);
-               session->connected = false;
-       }
-
-       if (!uv_is_closing((uv_handle_t *)&session->timeout)) {
-               uv_timer_stop(&session->timeout);
-               if (session->tls_client_ctx) {
-                       tls_close(&session->tls_client_ctx->c);
-               }
-               if (session->tls_ctx) {
-                       tls_close(&session->tls_ctx->c);
-               }
-
-               session->timeout.data = session;
-               uv_close((uv_handle_t *)&session->timeout, on_session_timer_close);
-       }
-}
-
-static int session_add_waiting(struct session *session, struct qr_task *task)
-{
-       for (int i = 0; i < session->waiting.len; ++i) {
-               if (session->waiting.at[i] == task) {
-                       return i;
-               }
-       }
-       int ret = array_push(session->waiting, task);
-       if (ret >= 0) {
-               qr_task_ref(task);
-       }
-       return ret;
-}
-
-static int session_del_waiting(struct session *session, struct qr_task *task)
-{
-       int ret = kr_error(ENOENT);
-       for (int i = 0; i < session->waiting.len; ++i) {
-               if (session->waiting.at[i] == task) {
-                       array_del(session->waiting, i);
-                       qr_task_unref(task);
-                       ret = kr_ok();
-                       break;
-               }
-       }
-       return ret;
-}
-
-static int session_add_tasks(struct session *session, struct qr_task *task)
-{
-       for (int i = 0; i < session->tasks.len; ++i) {
-               if (session->tasks.at[i] == task) {
-                       return i;
-               }
-       }
-       int ret = array_push(session->tasks, task);
-       if (ret >= 0) {
-               qr_task_ref(task);
-       }
-       return ret;
-}
-
-static int session_del_tasks(struct session *session, struct qr_task *task)
-{
-       int ret = kr_error(ENOENT);
-       for (int i = 0; i < session->tasks.len; ++i) {
-               if (session->tasks.at[i] == task) {
-                       array_del(session->tasks, i);
-                       qr_task_unref(task);
-                       ret = kr_ok();
-                       break;
-               }
-       }
-       return ret;
-}
-
 /** @cond This memory layout is internal to mempool.c, use only for debugging. */
 #if defined(__SANITIZE_ADDRESS__)
 struct mempool_chunk {
@@ -530,11 +403,11 @@ static struct request_ctx *request_create(struct worker_ctx *worker,
        /* TODO Relocate pool to struct request */
        ctx->worker = worker;
        array_init(ctx->tasks);
-       struct session *session = handle ? handle->data : NULL;
-       if (session) {
-               assert(session->outgoing == false);
+       struct session *s = handle ? handle->data : NULL;
+       if (s) {
+               assert(session_is_outgoing(s) == false);
        }
-       ctx->source.session = session;
+       ctx->source.session = s;
 
        struct kr_request *req = &ctx->req;
        req->pool = pool;
@@ -584,8 +457,8 @@ static int request_start(struct request_ctx *ctx, knot_pkt_t *query)
        struct kr_request *req = &ctx->req;
 
        /* source.session can be empty if request was generated by kresd itself */
-       if (!ctx->source.session ||
-            ctx->source.session->handle->type == UV_TCP) {
+       struct session *s = ctx->source.session;
+       if (!s || session_get_handle(s)->type == UV_TCP) {
                answer_max = KNOT_WIRE_MAX_PKTSIZE;
        } else if (knot_pkt_has_edns(query)) { /* EDNS */
                answer_max = MAX(knot_edns_get_payload(query->opt_rr),
@@ -679,7 +552,6 @@ static int request_del_tasks(struct request_ctx *ctx, struct qr_task *task)
        return ret;
 }
 
-
 static struct qr_task *qr_task_create(struct request_ctx *ctx)
 {
        /* How much can client handle? */
@@ -695,7 +567,7 @@ static struct qr_task *qr_task_create(struct request_ctx *ctx)
        if (!task) {
                return NULL;
        }
-       memset(task, 0, sizeof(*task)); /* avoid accidentally unitialized fields */
+       memset(task, 0, sizeof(*task)); /* avoid accidentally unintialized fields */
 
        /* Create packet buffers for answer and subrequests */
        knot_pkt_t *pktbuf = knot_pkt_new(NULL, pktbuf_max, &ctx->req.pool);
@@ -727,19 +599,18 @@ static void qr_task_free(struct qr_task *task)
        assert(ctx);
 
        /* Process outbound session. */
-       struct session *source_session = ctx->source.session;
+       struct session *s = ctx->source.session;
        struct worker_ctx *worker = ctx->worker;
 
        /* Process source session. */
-       if (source_session &&
-           source_session->tasks.len < worker->tcp_pipeline_max/2 &&
-           !source_session->closing && source_session->throttled) {
-               uv_handle_t *handle = source_session->handle;
+       if (s && session_tasklist_get_len(s) < worker->tcp_pipeline_max/2 &&
+           !session_is_closing(s) && !session_is_throttled(s)) {
+               uv_handle_t *handle = session_get_handle(s);
                /* Start reading again if the session is throttled and
                 * the number of outgoing requests is below watermark. */
                if (handle) {
                        io_start_read(handle);
-                       source_session->throttled = false;
+                       session_set_throttled(s, false);
                }
        }
 
@@ -755,15 +626,14 @@ static void qr_task_free(struct qr_task *task)
 /*@ Register new qr_task within session. */
 static int qr_task_register(struct qr_task *task, struct session *session)
 {
-       assert(session->outgoing == false && session->handle->type == UV_TCP);
+       assert(session_is_outgoing(session) == false &&
+              session_get_handle(session)->type == UV_TCP);
 
-       int ret = array_reserve(session->tasks, session->tasks.len + 1);
-       if (ret != 0) {
+       int ret = session_tasklist_add(session, task);
+       if (ret < 0) {
                return kr_error(ENOMEM);
        }
 
-       session_add_tasks(session, task);
-
        struct request_ctx *ctx = task->ctx;
        assert(ctx && (ctx->source.session == NULL || ctx->source.session == session));
        ctx->source.session = session;
@@ -772,11 +642,11 @@ static int qr_task_register(struct qr_task *task, struct session *session)
         * an in effect shrink TCP window size. To get more precise throttling,
         * we would need to copy remainder of the unread buffer and reassemble
         * when resuming reading. This is NYI.  */
-       if (session->tasks.len >= task->ctx->worker->tcp_pipeline_max) {
-               uv_handle_t *handle = session->handle;
-               if (handle && !session->throttled && !session->closing) {
+       if (session_tasklist_get_len(session) >= task->ctx->worker->tcp_pipeline_max) {
+               uv_handle_t *handle = session_get_handle(session);
+               if (handle && !session_is_throttled(session) && !session_is_closing(session)) {
                        io_stop_read(handle);
-                       session->throttled = true;
+                       session_set_throttled(session, true);
                }
        }
 
@@ -792,11 +662,10 @@ static void qr_task_complete(struct qr_task *task)
        assert(task->waiting.len == 0);
        assert(task->leading == false);
 
-       struct session *source_session = ctx->source.session;
-       if (source_session) {
-               assert(source_session->outgoing == false &&
-                      source_session->waiting.len == 0);
-               session_del_tasks(source_session, task);
+       struct session *s = ctx->source.session;
+       if (s) {
+               assert(!session_is_outgoing(s) && session_waitinglist_is_empty(s));
+               session_tasklist_del(s, task);
        }
 
        /* Release primary reference to task. */
@@ -812,23 +681,25 @@ static int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status
                if (!handle || handle->type != UV_TCP) {
                        return status;
                }
-               struct session* session = handle->data;
-               assert(session);
-               if (!session->outgoing ||
-                   session->waiting.len == 0) {
+               struct session* s = handle->data;
+               assert(s);
+               if (!session_is_outgoing(s) || session_waitinglist_is_empty(s)) {
                        return status;
                }
        }
 
        if (handle) {
-               struct session* session = handle->data;
-               if (!session->outgoing && task->ctx->source.session) {
-                       assert (task->ctx->source.session->handle == handle);
+               struct session* s = handle->data;
+               bool outgoing = session_is_outgoing(s);
+               if (!outgoing) {
+                       struct session* source_s = task->ctx->source.session;
+                       if (source_s) {
+                               assert (session_get_handle(source_s) == handle);
+                       }
                }
-               if (handle->type == UV_TCP && session->outgoing &&
-                   session->waiting.len > 0) {
-                       session_del_waiting(session, task);
-                       if (session->closing) {
+               if (handle->type == UV_TCP && outgoing && !session_waitinglist_is_empty(s)) {
+                       session_waitinglist_del(s, task);
+                       if (session_is_closing(s)) {
                                return status;
                        }
                        /* Finalize the task, if any errors.
@@ -837,46 +708,27 @@ static int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status
                         * (for instance: tls; send->tls_push->too many non-critical errors->
                         * on_send with nonzero status->re-add to waiting->send->etc).*/
                        if (status != 0) {
-                               if (session->outgoing) {
+                               if (outgoing) {
                                        qr_task_finalize(task, KR_STATE_FAIL);
                                } else {
-                                       assert(task->ctx->source.session == session);
+                                       assert(task->ctx->source.session == s);
                                        task->ctx->source.session = NULL;
                                }
-                               session_del_tasks(session, task);
+                               session_tasklist_del(s, task);
                        }
-                       if (session->waiting.len > 0) {
-                               struct qr_task *t = session->waiting.at[0];
-                               int ret = qr_task_send(t, handle, &session->peer.ip, t->pktbuf);
+                       struct qr_task *waiting_task = session_waitinglist_get_first(s);
+                       if (waiting_task) {
+                               struct sockaddr *peer = session_get_peer(s);
+                               knot_pkt_t *pkt = waiting_task->pktbuf;
+                               int ret = qr_task_send(waiting_task, handle, peer, pkt);
                                if (ret != kr_ok()) {
-                                       while (session->waiting.len > 0) {
-                                               struct qr_task *t = session->waiting.at[0];
-                                               if (session->outgoing) {
-                                                       qr_task_finalize(t, KR_STATE_FAIL);
-                                               } else {
-                                                       assert(t->ctx->source.session == session);
-                                                       t->ctx->source.session = NULL;
-                                               }
-                                               array_del(session->waiting, 0);
-                                               session_del_tasks(session, t);
-                                               qr_task_unref(t);
-                                       }
-                                       while (session->tasks.len > 0) {
-                                               struct qr_task *t = session->tasks.at[0];
-                                               if (session->outgoing) {
-                                                       qr_task_finalize(t, KR_STATE_FAIL);
-                                               } else {
-                                                       assert(t->ctx->source.session == session);
-                                                       t->ctx->source.session = NULL;
-                                               }
-                                               session_del_tasks(session, t);
-                                       }
-                                       session_close(session);
+                                       session_tasks_finalize(s, KR_STATE_FAIL);
+                                       session_close(s);
                                        return status;
                                }
                        }
                }
-               if (!session->closing) {
+               if (!session_is_closing(s)) {
                        io_start_read(handle); /* Start reading new query */
                }
        }
@@ -989,7 +841,7 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle,
 
        /* Update statistics */
        if (ctx->source.session &&
-           handle != ctx->source.session->handle &&
+           handle != session_get_handle(ctx->source.session) &&
            addr) {
                if (session->has_tls)
                        worker->stats.tls += 1;
@@ -1009,31 +861,35 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle,
 
 static int session_next_waiting_send(struct session *session)
 {
-       union inaddr *peer = &session->peer;
        int ret = kr_ok();
-       if (session->waiting.len > 0) {
-               struct qr_task *task = session->waiting.at[0];
-               ret = qr_task_send(task, session->handle, &peer->ip, task->pktbuf);
+       if (!session_waitinglist_is_empty(session)) {
+               struct sockaddr *peer = session_get_peer(session);
+               struct qr_task *task = session_waitinglist_get_first(session);
+               uv_handle_t *handle = session_get_handle(session);
+               ret = qr_task_send(task, handle, peer, task->pktbuf);
        }
        return ret;
 }
 
 static int session_tls_hs_cb(struct session *session, int status)
 {
-       struct worker_ctx *worker = get_worker();
-       union inaddr *peer = &session->peer;
-       int deletion_res = worker_del_tcp_waiting(worker, &peer->ip);
+       assert(session_is_outgoing(session));
+       uv_handle_t *handle = session_get_handle(session);
+       uv_loop_t *loop = handle->loop;
+       struct worker_ctx *worker = loop->data;
+       struct sockaddr *peer = session_get_peer(session);
+       int deletion_res = worker_del_tcp_waiting(worker, peer);
        int ret = kr_ok();
 
        if (status) {
-               kr_nsrep_update_rtt(NULL, &peer->ip, KR_NS_DEAD,
+               kr_nsrep_update_rtt(NULL, peer, KR_NS_DEAD,
                                    worker->engine->resolver.cache_rtt,
                                    KR_NS_UPDATE_NORESET);
                return ret;
        }
 
        /* handshake was completed successfully */
-       struct tls_client_ctx_t *tls_client_ctx = session->tls_client_ctx;
+       struct tls_client_ctx_t *tls_client_ctx = session_tls_get_client_ctx(session);
        struct tls_client_paramlist_entry *tls_params = tls_client_ctx->params;
        gnutls_session_t tls_session = tls_client_ctx->c.tls_session;
        if (gnutls_session_is_resumed(tls_session) != 0) {
@@ -1054,7 +910,7 @@ static int session_tls_hs_cb(struct session *session, int status)
                }
        }
 
-       ret = worker_add_tcp_connected(worker, &peer->ip, session);
+       ret = worker_add_tcp_connected(worker, peer, session);
        if (deletion_res == kr_ok() && ret == kr_ok()) {
                ret = session_next_waiting_send(session);
        } else {
@@ -1066,118 +922,99 @@ static int session_tls_hs_cb(struct session *session, int status)
                 * Session isn't in the list of waiting sessions,
                 * or addition to the list of connected sessions failed,
                 * or write to upstream failed. */
-               while (session->waiting.len > 0) {
-                       struct qr_task *task = session->waiting.at[0];
-                       session_del_tasks(session, task);
-                       array_del(session->waiting, 0);
-                       qr_task_finalize(task, KR_STATE_FAIL);
-                       qr_task_unref(task);
-               }
-               worker_del_tcp_connected(worker, &peer->ip);
-               assert(session->tasks.len == 0);
+               session_waitinglist_finalize(session, KR_STATE_FAIL);
+               worker_del_tcp_connected(worker, peer);
+               assert(session_tasklist_is_empty(session));
                session_close(session);
        } else {
-               uv_timer_stop(&session->timeout);
-               session->timeout.data = session;
-               timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0);
+               uv_timer_t *t = session_get_timer(session);
+               uv_timer_stop(t);
+               t->data = session;
+               session_timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0);
        }
        return kr_ok();
 }
 
-static struct kr_query *session_current_query(struct session *session)
-{
-       if (session->waiting.len == 0) {
-               return NULL;
-       }
 
-       struct qr_task *task = session->waiting.at[0];
-       if (task->ctx->req.rplan.pending.len == 0) {
+static struct kr_query *task_get_last_pending_query(struct qr_task *task)
+{
+       if (!task || task->ctx->req.rplan.pending.len == 0) {
                return NULL;
        }
 
        return array_tail(task->ctx->req.rplan.pending);
 }
 
+
 static void on_connect(uv_connect_t *req, int status)
 {
        struct worker_ctx *worker = get_worker();
        uv_stream_t *handle = req->handle;
        struct session *session = handle->data;
-       union inaddr *peer = &session->peer;
+       struct sockaddr *peer = session_get_peer(session);
+
+       assert(session_is_outgoing(session));
 
        if (status == UV_ECANCELED) {
-               worker_del_tcp_waiting(worker, &peer->ip);
-               assert(session->closing && session->waiting.len == 0 && session->tasks.len == 0);
+               worker_del_tcp_waiting(worker, peer);
+               assert(session_is_empty(session) && session_is_closing(session));
                iorequest_release(worker, req);
                return;
        }
 
-       if (session->closing) {
-               worker_del_tcp_waiting(worker, &peer->ip);
-               assert(session->waiting.len == 0 && session->tasks.len == 0);
+       if (session_is_closing(session)) {
+               worker_del_tcp_waiting(worker, peer);
+               assert(session_is_empty(session));
                iorequest_release(worker, req);
                return;
        }
 
-       uv_timer_stop(&session->timeout);
+       uv_timer_t *t = session_get_timer(session);
+       uv_timer_stop(t);
 
        if (status != 0) {
-               worker_del_tcp_waiting(worker, &peer->ip);
-               while (session->waiting.len > 0) {
-                       struct qr_task *task = session->waiting.at[0];
-                       session_del_tasks(session, task);
-                       array_del(session->waiting, 0);
-                       assert(task->refs > 1);
-                       qr_task_unref(task);
-                       qr_task_step(task, NULL, NULL);
-               }
-               assert(session->tasks.len == 0);
+               worker_del_tcp_waiting(worker, peer);
+               session_waitinglist_retry(session, false);
+               assert(session_tasklist_is_empty(session));
                iorequest_release(worker, req);
                session_close(session);
                return;
        }
 
-       if (!session->has_tls) {
+       if (!session_has_tls(session)) {
                /* if there is a TLS, session still waiting for handshake,
                 * otherwise remove it from waiting list */
-               if (worker_del_tcp_waiting(worker, &peer->ip) != 0) {
+               if (worker_del_tcp_waiting(worker, peer) != 0) {
                        /* session isn't in list of waiting queries, *
                         * something gone wrong */
-                       while (session->waiting.len > 0) {
-                               struct qr_task *task = session->waiting.at[0];
-                               session_del_tasks(session, task);
-                               array_del(session->waiting, 0);
-                               ioreq_kill_pending(task);
-                               assert(task->pending_count == 0);
-                               qr_task_finalize(task, KR_STATE_FAIL);
-                               qr_task_unref(task);
-                       }
-                       assert(session->tasks.len == 0);
+                       session_waitinglist_finalize(session, KR_STATE_FAIL);
+                       assert(session_tasklist_is_empty(session));
                        iorequest_release(worker, req);
                        session_close(session);
                        return;
                }
        }
 
-       struct kr_query *qry = session_current_query(session);
+       struct qr_task *task = session_waitinglist_get_first(session);
+       struct kr_query *qry = task_get_last_pending_query(task);
        WITH_VERBOSE (qry) {
-               char addr_str[INET6_ADDRSTRLEN];
-               inet_ntop(session->peer.ip.sa_family, kr_inaddr(&session->peer.ip),
-                         addr_str, sizeof(addr_str));
-               VERBOSE_MSG(qry, "=> connected to '%s'\n", addr_str);
+               struct sockaddr *peer = session_get_peer(session);
+               char peer_str[INET6_ADDRSTRLEN];
+               inet_ntop(peer->sa_family, kr_inaddr(peer), peer_str, sizeof(peer_str));
+               VERBOSE_MSG(qry, "=> connected to '%s'\n", peer_str);
        }
 
-       session->connected = true;
-       session->handle = (uv_handle_t *)handle;
+       session_set_connected(session, true);
+       session_set_handle(session,(uv_handle_t *)handle);
 
        int ret = kr_ok();
-       if (session->has_tls) {
-               ret = tls_client_connect_start(session->tls_client_ctx,
-                                              session, session_tls_hs_cb);
+       if (session_has_tls(session)) {
+               struct tls_client_ctx_t *tls_ctx = session_tls_get_client_ctx(session);
+               ret = tls_client_connect_start(tls_ctx, session, session_tls_hs_cb);
                if (ret == kr_error(EAGAIN)) {
                        iorequest_release(worker, req);
-                       io_start_read(session->handle);
-                       timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0);
+                       session_start_read(session);
+                       session_timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0);
                        return;
                }
        }
@@ -1185,25 +1022,16 @@ static void on_connect(uv_connect_t *req, int status)
        if (ret == kr_ok()) {
                ret = session_next_waiting_send(session);
                if (ret == kr_ok()) {
-                       timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0);
-                       worker_add_tcp_connected(worker, &session->peer.ip, session);
+                       session_timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0);
+                       struct sockaddr *peer = session_get_peer(session);
+                       worker_add_tcp_connected(worker, peer, session);
                        iorequest_release(worker, req);
                        return;
                }
        }
 
-       while (session->waiting.len > 0) {
-               struct qr_task *task = session->waiting.at[0];
-               session_del_tasks(session, task);
-               array_del(session->waiting, 0);
-               ioreq_kill_pending(task);
-               assert(task->pending_count == 0);
-               qr_task_finalize(task, KR_STATE_FAIL);
-               qr_task_unref(task);
-       }
-
-       assert(session->tasks.len == 0);
-
+       session_waitinglist_finalize(session, KR_STATE_FAIL);
+       assert(session_tasklist_is_empty(session));
        iorequest_release(worker, req);
        session_close(session);
 }
@@ -1215,76 +1043,48 @@ static void on_tcp_connect_timeout(uv_timer_t *timer)
        uv_timer_stop(timer);
        struct worker_ctx *worker = get_worker();
 
-       assert (session->waiting.len == session->tasks.len);
+       assert (session_waitinglist_get_len(session) == session_tasklist_get_len(session));
 
-       union inaddr *peer = &session->peer;
-       worker_del_tcp_waiting(worker, &peer->ip);
+       struct sockaddr *peer = session_get_peer(session);
+       worker_del_tcp_waiting(worker, peer);
 
-       struct kr_query *qry = session_current_query(session);
+       struct qr_task *task = session_waitinglist_get_first(session);
+       struct kr_query *qry = task_get_last_pending_query(task);
        WITH_VERBOSE (qry) {
-               char addr_str[INET6_ADDRSTRLEN];
-               inet_ntop(peer->ip.sa_family, kr_inaddr(&peer->ip), addr_str, sizeof(addr_str));
-               VERBOSE_MSG(qry, "=> connection to '%s' failed\n", addr_str);
+               char peer_str[INET6_ADDRSTRLEN];
+               inet_ntop(peer->sa_family, kr_inaddr(peer), peer_str, sizeof(peer_str));
+               VERBOSE_MSG(qry, "=> connection to '%s' failed\n", peer_str);
        }
 
-       kr_nsrep_update_rtt(NULL, &peer->ip, KR_NS_DEAD,
+       kr_nsrep_update_rtt(NULL, peer, KR_NS_DEAD,
                            worker->engine->resolver.cache_rtt,
                            KR_NS_UPDATE_NORESET);
 
-       while (session->waiting.len > 0) {
-               struct qr_task *task = session->waiting.at[0];
-               assert(task->ctx);
-               task->timeouts += 1;
-               worker->stats.timeout += 1;
-               session_del_tasks(session, task);
-               array_del(session->waiting, 0);
-               assert(task->refs > 1);
-               qr_task_unref(task);
-               qr_task_step(task, NULL, NULL);
-       }
-
-       assert (session->tasks.len == 0);
+       worker->stats.timeout += session_waitinglist_get_len(session);
+       session_waitinglist_retry(session, true);
+       assert (session_tasklist_is_empty(session));
        session_close(session);
 }
 
 static void on_tcp_watchdog_timeout(uv_timer_t *timer)
 {
        struct session *session = timer->data;
+       struct worker_ctx *worker =  timer->loop->data;
+       struct sockaddr *peer = session_get_peer(session);
+
+       assert(session_is_outgoing(session));
 
-       assert(session->outgoing);
        uv_timer_stop(timer);
-       struct worker_ctx *worker = get_worker();
-       if (session->outgoing) {
-               if (session->has_tls) {
-                       worker_del_tcp_waiting(worker, &session->peer.ip);
-               }
-               worker_del_tcp_connected(worker, &session->peer.ip);
-
-               while (session->waiting.len > 0) {
-                       struct qr_task *task = session->waiting.at[0];
-                       task->timeouts += 1;
-                       worker->stats.timeout += 1;
-                       array_del(session->waiting, 0);
-                       session_del_tasks(session, task);
-                       ioreq_kill_pending(task);
-                       assert(task->pending_count == 0);
-                       qr_task_finalize(task, KR_STATE_FAIL);
-                       qr_task_unref(task);
-               }
-       }
 
-       while (session->tasks.len > 0) {
-               struct qr_task *task = session->tasks.at[0];
-               task->timeouts += 1;
-               worker->stats.timeout += 1;
-               assert(task->refs > 1);
-               array_del(session->tasks, 0);
-               ioreq_kill_pending(task);
-               assert(task->pending_count == 0);
-               qr_task_finalize(task, KR_STATE_FAIL);
-               qr_task_unref(task);
+       if (session_has_tls(session)) {
+               worker_del_tcp_waiting(worker, peer);
        }
 
+       worker_del_tcp_connected(worker, peer);
+       worker->stats.timeout += session_waitinglist_get_len(session);
+       session_waitinglist_finalize(session, KR_STATE_FAIL);
+       worker->stats.timeout += session_tasklist_get_len(session);
+       session_tasklist_finalize(session, KR_STATE_FAIL);
        session_close(session);
 }
 
@@ -1292,14 +1092,14 @@ static void on_tcp_watchdog_timeout(uv_timer_t *timer)
 static void on_udp_timeout(uv_timer_t *timer)
 {
        struct session *session = timer->data;
-       assert(session->handle->data == session);
+       assert(session_get_handle(session)->data == session);
+       assert(session_tasklist_get_len(session) == 1);
+       assert(session_waitinglist_is_empty(session));
 
        uv_timer_stop(timer);
-       assert(session->tasks.len == 1);
-       assert(session->waiting.len == 0);
 
        /* Penalize all tried nameservers with a timeout. */
-       struct qr_task *task = session->tasks.at[0];
+       struct qr_task *task = session_tasklist_get_first(session);
        struct worker_ctx *worker = task->ctx->worker;
        if (task->leading && task->pending_count > 0) {
                struct kr_query *qry = array_tail(task->ctx->req.rplan.pending);
@@ -1326,13 +1126,13 @@ static void on_session_idle_timeout(uv_timer_t *timer)
        struct session *s = timer->data;
        assert(s);
        uv_timer_stop(timer);
-       if (s->closing) {
+       if (session_is_closing(s)) {
                return;
        }
        /* session was not in use during timer timeout
         * remove it from connection list and close
         */
-       assert(s->tasks.len == 0 && s->waiting.len == 0);
+       assert(session_is_empty(s));
        session_close(s);
 }
 
@@ -1350,8 +1150,9 @@ static uv_handle_t *retransmit(struct qr_task *task)
                }
                struct sockaddr *addr = (struct sockaddr *)choice;
                struct session *session = ret->data;
-               assert (session->peer.ip.sa_family == AF_UNSPEC && session->outgoing);
-               memcpy(&session->peer, addr, sizeof(session->peer));
+               struct sockaddr *peer = session_get_peer(session);
+               assert (peer->sa_family == AF_UNSPEC && session_is_outgoing(session));
+               memcpy(peer, addr, kr_sockaddr_len(addr));
                if (qr_task_send(task, ret, (struct sockaddr *)choice,
                                 task->pktbuf) == 0) {
                        task->addrlist_turn = (task->addrlist_turn + 1) %
@@ -1364,10 +1165,10 @@ static uv_handle_t *retransmit(struct qr_task *task)
 static void on_retransmit(uv_timer_t *req)
 {
        struct session *session = req->data;
-       assert(session->tasks.len == 1);
+       assert(session_tasklist_get_len(session) == 1);
 
        uv_timer_stop(req);
-       struct qr_task *task = session->tasks.at[0];
+       struct qr_task *task = session_tasklist_get_first(session);
        if (retransmit(task) == NULL) {
                /* Not possible to spawn request, start timeout timer with remaining deadline. */
                uint64_t timeout = KR_CONN_RTT_MAX - task->pending_count * KR_CONN_RETRY;
@@ -1377,19 +1178,6 @@ static void on_retransmit(uv_timer_t *req)
        }
 }
 
-static int timer_start(struct session *session, uv_timer_cb cb,
-                      uint64_t timeout, uint64_t repeat)
-{
-       uv_timer_t *timer = &session->timeout;
-       assert(timer->data == session);
-       int ret = uv_timer_start(timer, cb, timeout, repeat);
-       if (ret != 0) {
-               uv_timer_stop(timer);
-               return kr_error(ENOMEM);
-       }
-       return 0;
-}
-
 static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *pkt)
 {
        /* Close pending timer */
@@ -1461,7 +1249,6 @@ static bool subreq_enqueue(struct qr_task *task)
        return true;
 }
 
-
 static int qr_task_finalize(struct qr_task *task, int state)
 {
        assert(task && task->leading == false);
@@ -1482,8 +1269,8 @@ static int qr_task_finalize(struct qr_task *task, int state)
 
        /* Send back answer */
        struct session *source_session = ctx->source.session;
-       uv_handle_t *handle = source_session->handle;
-       assert(source_session->closing == false);
+       uv_handle_t *handle = session_get_handle(source_session);
+       assert(!session_is_closing(source_session));
        assert(handle && handle->data == ctx->source.session);
        assert(ctx->source.addr.ip.sa_family != AF_UNSPEC);
        int res = qr_task_send(task, handle,
@@ -1492,21 +1279,21 @@ static int qr_task_finalize(struct qr_task *task, int state)
        if (res != kr_ok()) {
                (void) qr_task_on_send(task, NULL, kr_error(EIO));
                /* Since source session is erroneous detach all tasks. */
-               while (source_session->tasks.len > 0) {
-                       struct qr_task *t = source_session->tasks.at[0];
+               while (!session_tasklist_is_empty(source_session)) {
+                       struct qr_task *t = session_tasklist_get_first(source_session);
                        struct request_ctx *c = t->ctx;
                        assert(c->source.session == source_session);
                        c->source.session = NULL;
                        /* Don't finalize them as there can be other tasks
                         * waiting for answer to this particular task.
                         * (ie. task->leading is true) */
-                       session_del_tasks(source_session, t);
+                       session_tasklist_del_index(source_session, 0);
                }
                session_close(source_session);
        } else if (handle->type == UV_TCP && ctx->source.session) {
                /* Don't try to close source session at least
                 * retry_interval_for_timeout_timer milliseconds */
-               uv_timer_again(&ctx->source.session->timeout);
+               session_timer_restart(ctx->source.session);
        }
 
        qr_task_unref(task);
@@ -1533,7 +1320,7 @@ static int qr_task_step(struct qr_task *task,
        task->addrlist = NULL;
        task->addrlist_count = 0;
        task->addrlist_turn = 0;
-       req->has_tls = (ctx->source.session && ctx->source.session->has_tls);
+       req->has_tls = (ctx->source.session && session_has_tls(ctx->source.session));
 
        if (worker->too_many_open) {
                struct kr_rplan *rplan = &req->rplan;
@@ -1600,8 +1387,8 @@ static int qr_task_step(struct qr_task *task,
                 */
                subreq_lead(task);
                struct session *session = handle->data;
-               assert(session->handle->type == UV_UDP);
-               ret = timer_start(session, on_retransmit, timeout, 0);
+               assert(session_get_handle(session) == handle && (handle->type == UV_UDP));
+               ret = session_timer_start(session, on_retransmit, timeout, 0);
                /* Start next step with timeout, fatal if can't start a timer. */
                if (ret != 0) {
                        subreq_finalize(task, packet_source, packet);
@@ -1617,8 +1404,8 @@ static int qr_task_step(struct qr_task *task,
                }
                struct session* session = NULL;
                if ((session = worker_find_tcp_waiting(ctx->worker, addr)) != NULL) {
-                       assert(session->outgoing);
-                       if (session->closing) {
+                       assert(session_is_outgoing(session));
+                       if (session_is_closing(session)) {
                                subreq_finalize(task, packet_source, packet);
                                return qr_task_finalize(task, KR_STATE_FAIL);
                        }
@@ -1626,84 +1413,76 @@ static int qr_task_step(struct qr_task *task,
                         * It means that connection establishing or data sending
                         * is coming right now. */
                        /* Task will be notified in on_connect() or qr_task_on_send(). */
-                       ret = session_add_waiting(session, task);
+                       ret = session_waitinglist_add(session, task);
                        if (ret < 0) {
                                subreq_finalize(task, packet_source, packet);
                                return qr_task_finalize(task, KR_STATE_FAIL);
                        }
-                       ret = session_add_tasks(session, task);
+                       ret = session_tasklist_add(session, task);
                        if (ret < 0) {
-                               session_del_waiting(session, task);
+                               session_waitinglist_del(session, task);
                                subreq_finalize(task, packet_source, packet);
                                return qr_task_finalize(task, KR_STATE_FAIL);
                        }
                        assert(task->pending_count == 0);
-                       task->pending[task->pending_count] = session->handle;
+                       task->pending[task->pending_count] = session_get_handle(session);
                        task->pending_count += 1;
                } else if ((session = worker_find_tcp_connected(ctx->worker, addr)) != NULL) {
                        /* Connection has been already established */
-                       assert(session->outgoing);
-                       if (session->closing) {
-                               session_del_tasks(session, task);
+                       assert(session_is_outgoing(session));
+                       if (session_is_closing(session)) {
+                               session_tasklist_del(session, task);
                                subreq_finalize(task, packet_source, packet);
                                return qr_task_finalize(task, KR_STATE_FAIL);
                        }
 
-                       if (session->tasks.len >= worker->tcp_pipeline_max) {
-                               session_del_tasks(session, task);
+                       if (session_tasklist_get_len(session) >= worker->tcp_pipeline_max) {
+                               session_tasklist_del(session, task);
                                subreq_finalize(task, packet_source, packet);
                                return qr_task_finalize(task, KR_STATE_FAIL);
                        }
 
                        /* will be removed in qr_task_on_send() */
-                       ret = session_add_waiting(session, task);
+                       ret = session_waitinglist_add(session, task);
                        if (ret < 0) {
-                               session_del_tasks(session, task);
+                               session_tasklist_del(session, task);
                                subreq_finalize(task, packet_source, packet);
                                return qr_task_finalize(task, KR_STATE_FAIL);
                        }
-                       ret = session_add_tasks(session, task);
+                       ret = session_tasklist_add(session, task);
                        if (ret < 0) {
-                               session_del_waiting(session, task);
-                               session_del_tasks(session, task);
+                               session_waitinglist_del(session, task);
+                               session_tasklist_del(session, task);
                                subreq_finalize(task, packet_source, packet);
                                return qr_task_finalize(task, KR_STATE_FAIL);
                        }
-                       if (session->waiting.len == 1) {
-                               ret = qr_task_send(task, session->handle,
-                                                  &session->peer.ip, task->pktbuf);
+                       if (session_waitinglist_get_len(session) == 1) {
+                               ret = qr_task_send(task, session_get_handle(session),
+                                                  session_get_peer(session), task->pktbuf);
                                if (ret < 0) {
-                                       session_del_waiting(session, task);
-                                       session_del_tasks(session, task);
-                                       while (session->tasks.len != 0) {
-                                               struct qr_task *t = session->tasks.at[0];
-                                               qr_task_finalize(t, KR_STATE_FAIL);
-                                               session_del_tasks(session, t);
-                                       }
+                                       session_waitinglist_del(session, task);
+                                       session_tasklist_del(session, task);
+                                       session_tasklist_finalize(session, KR_STATE_FAIL);
                                        subreq_finalize(task, packet_source, packet);
                                        session_close(session);
                                        return qr_task_finalize(task, KR_STATE_FAIL);
                                }
-                               if (session->tasks.len == 1) {
-                                       uv_timer_stop(&session->timeout);
-                                       ret = timer_start(session, on_tcp_watchdog_timeout,
-                                                         MAX_TCP_INACTIVITY, 0);
+                               if (session_tasklist_get_len(session) == 1) {
+                                       session_timer_stop(session);
+                                       ret = session_timer_start(session, on_tcp_watchdog_timeout,
+                                                                 MAX_TCP_INACTIVITY, 0);
                                }
                                if (ret < 0) {
-                                       session_del_waiting(session, task);
-                                       session_del_tasks(session, task);
-                                       while (session->tasks.len != 0) {
-                                               struct qr_task *t = session->tasks.at[0];
-                                               qr_task_finalize(t, KR_STATE_FAIL);
-                                               session_del_tasks(session, t);
-                                       }
+                                       session_waitinglist_del(session, task);
+                                       session_tasklist_del(session, task);
+                                       session_tasklist_finalize(session, KR_STATE_FAIL);
                                        subreq_finalize(task, packet_source, packet);
                                        session_close(session);
                                        return qr_task_finalize(task, KR_STATE_FAIL);
                                }
                        }
                        assert(task->pending_count == 0);
-                       task->pending[task->pending_count] = session->handle;
+                       task->pending[task->pending_count] = session_get_handle(session);
                        task->pending_count += 1;
                } else {
                        /* Make connection */
@@ -1721,15 +1500,15 @@ static int qr_task_step(struct qr_task *task,
                        session = client->data;
                        ret = worker_add_tcp_waiting(ctx->worker, addr, session);
                        if (ret < 0) {
-                               session_del_tasks(session, task);
+                               session_tasklist_del(session, task);
                                iorequest_release(ctx->worker, conn);
                                subreq_finalize(task, packet_source, packet);
                                return qr_task_finalize(task, KR_STATE_FAIL);
                        }
                        /* will be removed in qr_task_on_send() */
-                       ret = session_add_waiting(session, task);
+                       ret = session_waitinglist_add(session, task);
                        if (ret < 0) {
-                               session_del_tasks(session, task);
+                               session_tasklist_del(session, task);
                                worker_del_tcp_waiting(ctx->worker, addr);
                                iorequest_release(ctx->worker, conn);
                                subreq_finalize(task, packet_source, packet);
@@ -1742,47 +1521,49 @@ static int qr_task_step(struct qr_task *task,
                        const char *key = tcpsess_key(addr);
                        struct tls_client_paramlist_entry *entry = map_get(&net->tls_client_params, key);
                        if (entry) {
-                               assert(session->tls_client_ctx == NULL);
+                               assert(session_tls_get_client_ctx(session) == NULL);
                                struct tls_client_ctx_t *tls_ctx = tls_client_ctx_new(entry, worker);
                                if (!tls_ctx) {
-                                       session_del_tasks(session, task);
-                                       session_del_waiting(session, task);
+                                       session_tasklist_del(session, task);
+                                       session_waitinglist_del(session, task);
                                        worker_del_tcp_waiting(ctx->worker, addr);
                                        iorequest_release(ctx->worker, conn);
                                        subreq_finalize(task, packet_source, packet);
                                        return qr_task_step(task, NULL, NULL);
                                }
                                tls_client_ctx_set_session(tls_ctx, session);
-                               session->tls_client_ctx = tls_ctx;
-                               session->has_tls = true;
+                               session_tls_set_client_ctx(session, tls_ctx);
+                               session_set_has_tls(session, true);
                        }
 
                        conn->data = session;
-                       memcpy(&session->peer, addr, sizeof(session->peer));
+                       struct sockaddr *peer = session_get_peer(session);
+                       memcpy(peer, addr, kr_sockaddr_len(addr));
 
-                       ret = timer_start(session, on_tcp_connect_timeout,
-                                         KR_CONN_RTT_MAX, 0);
+                       ret = session_timer_start(session, on_tcp_connect_timeout,
+                                                 KR_CONN_RTT_MAX, 0);
                        if (ret != 0) {
-                               session_del_tasks(session, task);
-                               session_del_waiting(session, task);
+                               session_tasklist_del(session, task);
+                               session_waitinglist_del(session, task);
                                worker_del_tcp_waiting(ctx->worker, addr);
                                iorequest_release(ctx->worker, conn);
                                subreq_finalize(task, packet_source, packet);
                                return qr_task_finalize(task, KR_STATE_FAIL);
                        }
 
-                       struct kr_query *qry = session_current_query(session);
+                       struct qr_task *task = session_waitinglist_get_first(session);
+                       struct kr_query *qry = task_get_last_pending_query(task);
                        WITH_VERBOSE (qry) {
-                               char addr_str[INET6_ADDRSTRLEN];
-                               inet_ntop(session->peer.ip.sa_family, kr_inaddr(&session->peer.ip), addr_str, sizeof(addr_str));
-                               VERBOSE_MSG(qry, "=> connecting to: '%s'\n", addr_str);
+                               char peer_str[INET6_ADDRSTRLEN];
+                               inet_ntop(peer->sa_family, kr_inaddr(peer), peer_str, sizeof(peer_str));
+                               VERBOSE_MSG(qry, "=> connecting to: '%s'\n", peer_str);
                        }
 
                        if (uv_tcp_connect(conn, (uv_tcp_t *)client,
                                           addr , on_connect) != 0) {
-                               uv_timer_stop(&session->timeout);
-                               session_del_tasks(session, task);
-                               session_del_waiting(session, task);
+                               session_timer_stop(session);
+                               session_tasklist_del(session, task);
+                               session_waitinglist_del(session, task);
                                worker_del_tcp_waiting(ctx->worker, addr);
                                iorequest_release(ctx->worker, conn);
                                subreq_finalize(task, packet_source, packet);
@@ -1814,32 +1595,21 @@ static int parse_packet(knot_pkt_t *query)
        return ret;
 }
 
-static struct qr_task* find_task(const struct session *session, uint16_t msg_id)
+int worker_submit(struct session *session, knot_pkt_t *query)
 {
-       struct qr_task *ret = NULL;
-       const qr_tasklist_t *tasklist = &session->tasks;
-       for (size_t i = 0; i < tasklist->len; ++i) {
-               struct qr_task *task = tasklist->at[i];
-               uint16_t task_msg_id = knot_wire_get_id(task->pktbuf->wire);
-               if (task_msg_id == msg_id) {
-                       ret = task;
-                       break;
-               }
+       if (!session) {
+               assert(false);
+               return kr_error(EINVAL);
        }
-       return ret;
-}
 
-
-int worker_submit(struct worker_ctx *worker, uv_handle_t *handle,
-                 knot_pkt_t *query, const struct sockaddr* addr)
-{
-       bool OK = worker && handle && handle->data;
+       uv_handle_t *handle = session_get_handle(session);
+       bool OK = handle && handle->loop->data;
        if (!OK) {
                assert(false);
                return kr_error(EINVAL);
        }
 
-       struct session *session = handle->data;
+       struct worker_ctx *worker = handle->loop->data;
 
        /* Parse packet */
        int ret = parse_packet(query);
@@ -1847,13 +1617,15 @@ int worker_submit(struct worker_ctx *worker, uv_handle_t *handle,
        /* Start new task on listening sockets,
         * or resume if this is subrequest */
        struct qr_task *task = NULL;
-       if (!session->outgoing) { /* request from a client */
+       struct sockaddr *addr = NULL;
+       if (!session_is_outgoing(session)) { /* request from a client */
                /* Ignore badly formed queries. */
                if (!query || ret != 0 || knot_wire_get_qr(query->wire)) {
                        if (query) worker->stats.dropped += 1;
                        return kr_error(EILSEQ);
                }
-               struct request_ctx *ctx = request_create(worker, handle, addr);
+               struct request_ctx *ctx = request_create(worker, handle,
+                                                        session_get_peer(session));
                if (!ctx) {
                        return kr_error(ENOMEM);
                }
@@ -1876,13 +1648,13 @@ int worker_submit(struct worker_ctx *worker, uv_handle_t *handle,
                        /* Ignore badly formed responses. */
                        return kr_error(EILSEQ);
                }
-               task = find_task(session, knot_wire_get_id(query->wire));
+               task = session_tasklist_find(session, knot_wire_get_id(query->wire));
                if (task == NULL) {
                        return kr_error(ENOENT);
                }
-               assert(session->closing == false);
+               assert(!session_is_closing(session));
        }
-       assert(uv_is_closing(session->handle) == false);
+       assert(uv_is_closing(session_get_handle(session)) == false);
 
        /* Consume input and produce next message */
        return qr_task_step(task, addr, query);
@@ -1918,7 +1690,7 @@ static struct session* map_find_tcp_session(map_t *map,
        return ret;
 }
 
-static int worker_add_tcp_connected(struct worker_ctx *worker,
+int worker_add_tcp_connected(struct worker_ctx *worker,
                                    const struct sockaddr* addr,
                                    struct session *session)
 {
@@ -1931,7 +1703,7 @@ static int worker_add_tcp_connected(struct worker_ctx *worker,
        return map_add_tcp_session(&worker->tcp_connected, addr, session);
 }
 
-static int worker_del_tcp_connected(struct worker_ctx *worker,
+int worker_del_tcp_connected(struct worker_ctx *worker,
                                    const struct sockaddr* addr)
 {
        assert(addr && tcpsess_key(addr));
@@ -1976,379 +1748,74 @@ static int get_msg_size(const uint8_t *msg)
        return wire_read_u16(msg);
 }
 
-/* If buffering, close last task as it isn't live yet. */
-static void discard_buffered(struct session *session)
-{
-       if (session->buffering) {
-               qr_task_free(session->buffering);
-               session->buffering = NULL;
-               session->msg_hdr_idx = 0;
-       }
-}
-
-int worker_end_tcp(struct worker_ctx *worker, uv_handle_t *handle)
+int worker_end_tcp(struct session *session)
 {
-       if (!worker || !handle) {
+       if (!session) {
                return kr_error(EINVAL);
        }
-       /* If this is subrequest, notify parent task with empty input
-        * because in this case session doesn't own tasks, it has just
-        * borrowed the task from parent session. */
-       struct session *session = handle->data;
-       if (session->outgoing) {
-               worker_submit(worker, handle, NULL, NULL);
-       } else {
-               discard_buffered(session);
-       }
-       return 0;
-}
 
-int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle,
-                      const uint8_t *msg, ssize_t len)
+       session_timer_stop(session);
+       
+       uv_handle_t *handle = session_get_handle(session);
+       struct worker_ctx *worker = handle->loop->data;
+       struct sockaddr *peer = session_get_peer(session);
+       worker_del_tcp_connected(worker, peer);
+       session_set_connected(session, false);
 
-{
-       if (!worker || !handle) {
-               return kr_error(EINVAL);
+       struct tls_client_ctx_t *tls_client_ctx = session_tls_get_client_ctx(session);
+       if (tls_client_ctx) {
+               /* Avoid gnutls_bye() call */
+               tls_set_hs_state(&tls_client_ctx->c, TLS_HS_NOT_STARTED);
        }
-       /* Connection error or forced disconnect */
-       struct session *session = handle->data;
-       assert(session && session->handle == (uv_handle_t *)handle && handle->type == UV_TCP);
-       if (session->closing) {
-               return kr_ok();
-       }
-       if (len <= 0 || !msg) {
-               /* If we have pending tasks, we must dissociate them from the
-                * connection so they don't try to access closed and freed handle.
-                * @warning Do not modify task if this is outgoing request
-                * as it is shared with originator.
-                */
-               struct kr_query *qry = session_current_query(session);
-               WITH_VERBOSE (qry) {
-                       char addr_str[INET6_ADDRSTRLEN];
-                       inet_ntop(session->peer.ip.sa_family, kr_inaddr(&session->peer.ip),
-                                 addr_str, sizeof(addr_str));
-                       VERBOSE_MSG(qry, "=> connection to '%s' closed by peer\n", addr_str);
-               }
-               uv_timer_t *timer = &session->timeout;
-               uv_timer_stop(timer);
-               struct sockaddr *peer = &session->peer.ip;
-               worker_del_tcp_connected(worker, peer);
-               session->connected = false;
-
-               if (session->tls_client_ctx) {
-                       /* Avoid gnutls_bye() call */
-                       tls_set_hs_state(&session->tls_client_ctx->c,
-                                        TLS_HS_NOT_STARTED);
-               }
 
-               if (session->tls_ctx) {
-                       /* Avoid gnutls_bye() call */
-                       tls_set_hs_state(&session->tls_ctx->c,
-                                        TLS_HS_NOT_STARTED);
-               }
-
-               if (session->outgoing && session->buffering) {
-                       session->buffering = NULL;
-               }
+       struct tls_ctx_t *tls_ctx = session_tls_get_server_ctx(session);
+       if (tls_ctx) {
+               /* Avoid gnutls_bye() call */
+               tls_set_hs_state(&tls_ctx->c, TLS_HS_NOT_STARTED);
+       }
 
-               assert(session->tasks.len >= session->waiting.len);
-               while (session->waiting.len > 0) {
-                       struct qr_task *task = session->waiting.at[0];
-                       array_del(session->waiting, 0);
-                       assert(task->refs > 1);
-                       session_del_tasks(session, task);
-                       if (session->outgoing) {
-                               if (task->ctx->req.options.FORWARD) {
-                                       /* We are in TCP_FORWARD mode.
-                                        * To prevent failing at kr_resolve_consume()
-                                        * qry.flags.TCP must be cleared.
-                                        * TODO - refactoring is needed. */
+       assert(session_tasklist_get_len(session) >= session_waitinglist_get_len(session));
+       while (!session_waitinglist_is_empty(session)) {
+               struct qr_task *task = session_waitinglist_get_first(session);
+               session_waitinglist_del_index(session, 0);
+               assert(task->refs > 1);
+               session_tasklist_del(session, task);
+               if (session_is_outgoing(session)) {
+                       if (task->ctx->req.options.FORWARD) {
+                               /* We are in TCP_FORWARD mode.
+                                * To prevent failing at kr_resolve_consume()
+                                * qry.flags.TCP must be cleared.
+                                * TODO - refactoring is needed. */
                                        struct kr_request *req = &task->ctx->req;
                                        struct kr_rplan *rplan = &req->rplan;
                                        struct kr_query *qry = array_tail(rplan->pending);
                                        qry->flags.TCP = false;
-                               }
-                               qr_task_step(task, NULL, NULL);
-                       } else {
-                               assert(task->ctx->source.session == session);
-                               task->ctx->source.session = NULL;
-                       }
-                       qr_task_unref(task);
-               }
-               while (session->tasks.len > 0) {
-                       struct qr_task *task = session->tasks.at[0];
-                       if (session->outgoing) {
-                               if (task->ctx->req.options.FORWARD) {
-                                       struct kr_request *req = &task->ctx->req;
-                                       struct kr_rplan *rplan = &req->rplan;
-                                       struct kr_query *qry = array_tail(rplan->pending);
-                                       qry->flags.TCP = false;
-                               }
-                               qr_task_step(task, NULL, NULL);
-                       } else {
-                               assert(task->ctx->source.session == session);
-                               task->ctx->source.session = NULL;
-                       }
-                       session_del_tasks(session, task);
-               }
-               session_close(session);
-               return kr_ok();
-       }
-
-       if (session->bytes_to_skip) {
-               assert(session->buffering == NULL);
-               ssize_t min_len = MIN(session->bytes_to_skip, len);
-               len -= min_len;
-               msg += min_len;
-               session->bytes_to_skip -= min_len;
-               if (len < 0 || session->bytes_to_skip < 0) {
-                       /* Something gone wrong.
-                        * Better kill the connection */
-                       return kr_error(EILSEQ);
-               }
-               if (len == 0) {
-                       return kr_ok();
-               }
-               assert(session->bytes_to_skip == 0);
-       }
-
-       int submitted = 0;
-       struct qr_task *task = session->buffering;
-       knot_pkt_t *pkt_buf = NULL;
-       if (task) {
-               pkt_buf = task->pktbuf;
-       } else {
-               /* Update DNS header in session->msg_hdr* */
-               assert(session->msg_hdr_idx <= sizeof(session->msg_hdr));
-               ssize_t hdr_amount = sizeof(session->msg_hdr) -
-                                    session->msg_hdr_idx;
-               if (hdr_amount > len) {
-                       hdr_amount = len;
-               }
-               if (hdr_amount > 0) {
-                       memcpy(session->msg_hdr + session->msg_hdr_idx, msg, hdr_amount);
-                       session->msg_hdr_idx += hdr_amount;
-                       len -= hdr_amount;
-                       msg += hdr_amount;
-               }
-               if (len == 0) { /* no data beyond msg_hdr -> not much to do */
-                       return kr_ok();
-               }
-               assert(session->msg_hdr_idx == sizeof(session->msg_hdr));
-               session->msg_hdr_idx = 0;
-               uint16_t msg_size = get_msg_size(session->msg_hdr);
-               uint16_t msg_id = knot_wire_get_id(session->msg_hdr + 2);
-               if (msg_size < KNOT_WIRE_HEADER_SIZE) {
-                       /* better kill the connection; we would probably get out of sync */
-                       uv_timer_t *timer = &session->timeout;
-                       uv_timer_stop(timer);
-                       while (session->waiting.len > 0) {
-                               struct qr_task *task = session->waiting.at[0];
-                               if (session->outgoing) {
-                                       qr_task_finalize(task, KR_STATE_FAIL);
-                               } else {
-                                       assert(task->ctx->source.session == session);
-                                       task->ctx->source.session = NULL;
-                               }
-                               array_del(session->waiting, 0);
-                               session_del_tasks(session, task);
-                               qr_task_unref(task);
-                       }
-                       while (session->tasks.len > 0) {
-                               struct qr_task *task = session->tasks.at[0];
-                               if (session->outgoing) {
-                                       qr_task_finalize(task, KR_STATE_FAIL);
-                               } else {
-                                       assert(task->ctx->source.session == session);
-                                       task->ctx->source.session = NULL;
-                               }
-                               session_del_tasks(session, task);
-                       }
-                       session_close(session);
-
-                       return kr_ok();
-               }
-
-               /* get task */
-               if (!session->outgoing) {
-                       /* This is a new query, create a new task that we can use
-                        * to buffer incoming message until it's complete. */
-                       struct sockaddr *addr = &(session->peer.ip);
-                       assert(addr->sa_family != AF_UNSPEC);
-                       struct request_ctx *ctx = request_create(worker,
-                                                                (uv_handle_t *)handle,
-                                                                addr);
-                       if (!ctx) {
-                               return kr_error(ENOMEM);
-                       }
-                       task = qr_task_create(ctx);
-                       if (!task) {
-                               request_free(ctx);
-                               return kr_error(ENOMEM);
                        }
+                       qr_task_step(task, NULL, NULL);
                } else {
-                       /* Start of response from upstream.
-                        * The session task list must contain a task
-                        * with the same msg id. */
-                       task = find_task(session, msg_id);
-                       /* FIXME: on high load over one connection, it's likely
-                        * that we will get multiple matches sooner or later (!) */
-                       if (task) {
-                               /* Make sure we can process maximum packet sizes over TCP for outbound queries.
-                                * Previous packet is allocated with mempool, so there's no need to free it manually. */
-                               if (task->pktbuf->max_size < KNOT_WIRE_MAX_PKTSIZE) {
-                                               knot_mm_t *pool = &task->pktbuf->mm;
-                                               pkt_buf = knot_pkt_new(NULL, KNOT_WIRE_MAX_PKTSIZE, pool);
-                                               if (!pkt_buf) {
-                                                               return kr_error(ENOMEM);
-                                               }
-                                               task->pktbuf = pkt_buf;
-                               }
-                               knot_pkt_clear(task->pktbuf);
-                               assert(task->leading == false);
-                       } else  {
-                               session->bytes_to_skip = msg_size - 2;
-                               ssize_t min_len = MIN(session->bytes_to_skip, len);
-                               len -= min_len;
-                               msg += min_len;
-                               session->bytes_to_skip -= min_len;
-                               if (len < 0 || session->bytes_to_skip < 0) {
-                                       /* Something gone wrong.
-                                        * Better kill the connection */
-                                       return kr_error(EILSEQ);
-                               }
-                               if (len == 0) {
-                                       return submitted;
-                               }
-                               assert(session->bytes_to_skip == 0);
-                               int ret = worker_process_tcp(worker, handle, msg, len);
-                               if (ret < 0) {
-                                       submitted = ret;
-                               } else {
-                                       submitted += ret;
-                               }
-                               return submitted;
-                       }
+                       assert(task->ctx->source.session == session);
+                       task->ctx->source.session = NULL;
                }
-
-               pkt_buf = task->pktbuf;
-               knot_wire_set_id(pkt_buf->wire, msg_id);
-               pkt_buf->size = 2;
-               task->bytes_remaining = msg_size - 2;
-               assert(session->buffering == NULL);
-               session->buffering = task;
+               qr_task_unref(task);
        }
-       /* At this point session must have either created new task
-        * or it's already assigned. */
-       assert(task);
-       assert(len > 0);
-
-       /* Message is too long, can't process it. */
-       ssize_t to_read = MIN(len, task->bytes_remaining);
-       if (pkt_buf->size + to_read > pkt_buf->max_size) {
-               // TODO reallocate pkt_buf
-               pkt_buf->size = 0;
-               len -= to_read;
-               msg += to_read;
-               session->bytes_to_skip = task->bytes_remaining - to_read;
-               task->bytes_remaining = 0;
-               if (session->buffering) {
-                       if (!session->outgoing) {
-                               qr_task_complete(session->buffering);
-                       }
-                       session->buffering = NULL;
-               }
-               if (len > 0) {
-                       int ret = worker_process_tcp(worker, handle, msg, len);
-                       if (ret < 0) {
-                               submitted = ret;
-                       } else {
-                               submitted += ret;
+       while (!session_tasklist_is_empty(session)) {
+               struct qr_task *task = session_tasklist_get_first(session);
+               session_tasklist_del_index(session, 0);
+               if (session_is_outgoing(session)) {
+                       if (task->ctx->req.options.FORWARD) {
+                               struct kr_request *req = &task->ctx->req;
+                               struct kr_rplan *rplan = &req->rplan;
+                               struct kr_query *qry = array_tail(rplan->pending);
+                               qry->flags.TCP = false;
                        }
-               }
-               return submitted;
-       }
-       /* Buffer message and check if it's complete */
-       memcpy(pkt_buf->wire + pkt_buf->size, msg, to_read);
-       pkt_buf->size += to_read;
-       task->bytes_remaining -= to_read;
-       len -= to_read;
-       msg += to_read;
-       if (task->bytes_remaining == 0) {
-               /* Message was assembled, clear temporary. */
-               session->buffering = NULL;
-               session->msg_hdr_idx = 0;
-               const struct sockaddr *addr = NULL;
-               knot_pkt_t *pkt = pkt_buf;
-               if (session->outgoing) {
-                       addr = &session->peer.ip;
-                       assert ((task->pending_count == 1) && (task->pending[0] == session->handle));
-                       task->pending_count = 0;
-                       session_del_tasks(session, task);
-               }
-               /* Parse the packet and start resolving complete query */
-               int ret = parse_packet(pkt);
-               if (ret == 0) {
-                       if (session->outgoing) {
-                               /* To prevent slow lorris attack restart watchdog only after
-                               * the whole message was successfully assembled and parsed */
-                               if (session->tasks.len > 0 || session->waiting.len > 0) {
-                                       uv_timer_stop(&session->timeout);
-                                       timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0);
-                               }
-                       } else {
-                               /* Start only new queries,
-                                * not subrequests that are already pending */
-                               ret = request_start(task->ctx, pkt);
-                               if (ret != 0) {
-                                       /* Allocation of answer buffer has failed.
-                                        * We can't notify client about failure,
-                                        * so just end the task processing. */
-                                       qr_task_complete(task);
-                                       goto next_msg;
-                               }
-
-                               ret = qr_task_register(task, session);
-                               if (ret != 0) {
-                                       /* Answer buffer has been allocated,
-                                        * but task can't be attached to the given
-                                        * session due to memory problems.
-                                        * Finalize the task, otherwise it becomes orphaned. */
-                                       knot_pkt_init_response(task->ctx->req.answer, pkt);
-                                       qr_task_finalize(task, KR_STATE_FAIL);
-                                       goto next_msg;
-                               }
-                               submitted += 1;
-                               if (task->leading) {
-                                       assert(false);
-                               }
-                       }
-               } else if (session->outgoing) {
-                       /* Drop malformed packet and retry resolution */
-                       pkt = NULL;
-                       ret = 0;
+                       qr_task_step(task, NULL, NULL);
                } else {
-                       qr_task_complete(task);
-               }
-               /* Only proceed if the message is valid, or it's an invalid response to
-                * an outbound query which needs to be treated as a timeout. */
-               if (ret == 0) {
-                       /* since there can be next dns message, we must to proceed
-                        * even if qr_task_step() returns error */
-                       qr_task_step(task, addr, pkt);
-               }
-next_msg:
-               if (len > 0) {
-                       /* TODO: this is simple via iteration; recursion doesn't really help */
-                       ret = worker_process_tcp(worker, handle, msg, len);
-                       if (ret < 0) {
-                               return ret;
-                       }
-                       submitted += ret;
+                       assert(task->ctx->source.session == session);
+                       task->ctx->source.session = NULL;
                }
        }
-       assert(submitted >= 0);
-       return submitted;
+       session_close(session);
+       return kr_ok();
 }
 
 struct qr_task *worker_resolve_start(struct worker_ctx *worker, knot_pkt_t *query, struct kr_qflags options)
@@ -2394,6 +1861,11 @@ int worker_resolve_exec(struct qr_task *task, knot_pkt_t *query)
        return qr_task_step(task, NULL, query);
 }
 
+int worker_task_numrefs(const struct qr_task *task)
+{
+       return task->refs;
+}
+
 struct kr_request *worker_task_request(struct qr_task *task)
 {
        if (!task || !task->ctx) {
@@ -2408,9 +1880,82 @@ int worker_task_finalize(struct qr_task *task, int state)
        return qr_task_finalize(task, state);
 }
 
-void worker_session_close(struct session *session)
+ int worker_task_step(struct qr_task *task, const struct sockaddr *packet_source,
+                     knot_pkt_t *packet)
+ {
+        return qr_task_step(task, packet_source, packet);
+ }
+
+void worker_task_complete(struct qr_task *task)
 {
-       session_close(session);
+       return qr_task_complete(task);
+}
+
+void worker_task_ref(struct qr_task *task)
+{
+       qr_task_ref(task);
+}
+
+void worker_task_unref(struct qr_task *task)
+{
+       qr_task_unref(task);
+}
+
+void worker_task_timeout_inc(struct qr_task *task)
+{
+       task->timeouts += 1;
+}
+
+struct session *worker_session_borrow(struct worker_ctx *worker)
+{
+       struct session *s = NULL;
+       if (worker->pool_sessions.len > 0) {
+               s = array_tail(worker->pool_sessions);
+               array_pop(worker->pool_sessions);
+               kr_asan_custom_unpoison(session, s);
+       } else {
+               s = session_new();
+       }
+       return s;
+}
+
+void worker_session_release(struct worker_ctx *worker, uv_handle_t *handle)
+{
+       if (!worker || !handle) {
+               return;
+       }
+       struct session *s = handle->data;
+       if (!s) {
+               return;
+       }
+       assert(session_is_empty(s));
+       if (worker->pool_sessions.len < MP_FREELIST_SIZE) {
+               session_clear(s);
+               array_push(worker->pool_sessions, s);
+               kr_asan_custom_poison(session, s);
+       } else {
+               session_free(s);
+       }
+}
+
+knot_pkt_t *worker_task_get_pktbuf(const struct qr_task *task)
+{
+       return task->pktbuf;
+}
+
+struct request_ctx *worker_task_get_request(struct qr_task *task)
+{
+       return task->ctx;
+}
+
+struct session *worker_request_get_source_session(struct request_ctx *ctx)
+{
+       return ctx->source.session;
+}
+
+void worker_request_set_source_session(struct request_ctx *ctx, struct session *session)
+{
+       ctx->source.session = session;
 }
 
 /** Reserve worker buffers */
@@ -2445,12 +1990,20 @@ static int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen)
        } \
        array_clear(list)
 
+#define reclaim_freelist_custom(list, type, cb) \
+       for (unsigned i = 0; i < list.len; ++i) { \
+               void *elm = list.at[i]; \
+               kr_asan_custom_unpoison(type, elm); \
+               cb(elm); \
+       } \
+       array_clear(list)
+
 void worker_reclaim(struct worker_ctx *worker)
 {
        reclaim_freelist(worker->pool_mp, struct mempool, mp_delete);
        reclaim_freelist(worker->pool_ioreqs, uv_reqs_t, free);
        reclaim_freelist(worker->pool_iohandles, uv_handles_t, free);
-       reclaim_freelist(worker->pool_sessions, struct session, session_free);
+       reclaim_freelist_custom(worker->pool_sessions, session, session_free);
        mp_delete(worker->pkt_pool.ctx);
        worker->pkt_pool.ctx = NULL;
        trie_free(worker->subreq_out);
index 3acecfd0eab6721f1f5fcc5a8b1cfcda01f1c00f..8b90bf84cceabfb89f03299f919264e0fdc32d70 100644 (file)
@@ -37,30 +37,17 @@ struct worker_ctx *worker_create(struct engine *engine, knot_mm_t *pool,
 /**
  * Process an incoming packet (query from a client or answer from upstream).
  *
- * @param worker the singleton worker
- * @param handle socket through which the request came
- * @param query  the packet, or NULL on an error from the transport layer
- * @param addr   the address from which the packet came (or NULL, possibly, on error)
+ * @param session  session the where packet came from
+ * @param query    the packet, or NULL on an error from the transport layer
  * @return 0 or an error code
  */
-int worker_submit(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query,
-               const struct sockaddr* addr);
-
-/**
- * Process incoming DNS message fragment(s) that arrived over a stream (TCP, TLS).
- *
- * If the fragment contains only a partial message, it is buffered.
- * If the fragment contains a complete query or completes current fragment, execute it.
- * @return the number of newly-completed requests (>=0) or an error code
- */
-int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle,
-               const uint8_t *msg, ssize_t len);
+int worker_submit(struct session *session, knot_pkt_t *query);
 
 /**
  * End current DNS/TCP session, this disassociates pending tasks from this session
  * which may be freely closed afterwards.
  */
-int worker_end_tcp(struct worker_ctx *worker, uv_handle_t *handle);
+int worker_end_tcp(struct session *s);
 
 /**
  * Start query resolution with given query.
@@ -83,16 +70,48 @@ struct kr_request *worker_task_request(struct qr_task *task);
 /** Collect worker mempools */
 void worker_reclaim(struct worker_ctx *worker);
 
-/** Closes given session */
-void worker_session_close(struct session *session);
+struct session *worker_session_borrow(struct worker_ctx *worker);
+
+void worker_session_release(struct worker_ctx *worker, uv_handle_t *handle);
 
 void *worker_iohandle_borrow(struct worker_ctx *worker);
 
 void worker_iohandle_release(struct worker_ctx *worker, void *h);
 
+ssize_t worker_gnutls_push(gnutls_transport_ptr_t h, const void *buf, size_t len);
+
+ssize_t worker_gnutls_client_push(gnutls_transport_ptr_t h, const void *buf, size_t len);
+
+int worker_task_step(struct qr_task *task, const struct sockaddr *packet_source,
+                    knot_pkt_t *packet);
+
+int worker_task_numrefs(const struct qr_task *task);
+
 /** Finalize given task */
 int worker_task_finalize(struct qr_task *task, int state);
 
+void worker_task_complete(struct qr_task *task);
+
+void worker_task_ref(struct qr_task *task);
+
+void worker_task_unref(struct qr_task *task);
+
+void worker_task_timeout_inc(struct qr_task *task);
+
+int worker_add_tcp_connected(struct worker_ctx *worker,
+                            const struct sockaddr *addr,
+                            struct session *session);
+int worker_del_tcp_connected(struct worker_ctx *worker,
+                            const struct sockaddr *addr);
+
+knot_pkt_t *worker_task_get_pktbuf(const struct qr_task *task);
+
+struct request_ctx *worker_task_get_request(struct qr_task *task);
+
+struct session *worker_request_get_source_session(struct request_ctx *);
+
+void worker_request_set_source_session(struct request_ctx *, struct session *session);
+
 /** @cond internal */
 
 /** Number of request within timeout window. */
@@ -107,9 +126,6 @@ typedef array_t(void *) mp_freelist_t;
 /** List of query resolution tasks. */
 typedef array_t(struct qr_task *) qr_tasklist_t;
 
-/** Session list. */
-typedef array_t(struct session *) qr_sessionlist_t;
-
 /** \details Worker state is meant to persist during the whole life of daemon. */
 struct worker_ctx {
        struct engine *engine;
index 6595588376a651e93b40b80ce4f4bacf05040cec..84da059e3cbcdca87ab141f092265e53c62d1e6f 100644 (file)
@@ -91,8 +91,12 @@ void __asan_poison_memory_region(void const volatile *addr, size_t size);
 void __asan_unpoison_memory_region(void const volatile *addr, size_t size);
 #define kr_asan_poison(addr, size) __asan_poison_memory_region((addr), (size))
 #define kr_asan_unpoison(addr, size) __asan_unpoison_memory_region((addr), (size))
+#define kr_asan_custom_poison(fn, addr) fn ##_poison((addr))
+#define kr_asan_custom_unpoison(fn, addr) fn ##_unpoison((addr))
 #else
 #define kr_asan_poison(addr, size)
 #define kr_asan_unpoison(addr, size)
+#define kr_asan_custom_poison(fn, addr)
+#define kr_asan_custom_unpoison(fn, addr)
 #endif
 /* @endcond */