]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon: out-of-order processing for TCP
authorMarek Vavrusa <marek@vavrusa.com>
Tue, 3 May 2016 06:56:20 +0000 (23:56 -0700)
committerMarek Vavrusa <marek@vavrusa.com>
Tue, 3 May 2016 07:24:24 +0000 (00:24 -0700)
* daemon now processes messages over TCP stream
out-of-order and concurrently
* support for TCP_DEFER_ACCEPT
* support for TCP Fast-Open
* there are now deadlines for TCP for idle/slow
streams (to prevent slowloris; pruning)
* there is now per-request limit on timeouts
(each request is allowed 4 timeouts before bailing)
* faster request closing, unified retry/timeout timers
* rare race condition in timer closing fixed

12 files changed:
daemon/README.rst
daemon/bindings.c
daemon/engine.h
daemon/io.c
daemon/io.h
daemon/network.c
daemon/network.h
daemon/worker.c
daemon/worker.h
lib/defines.h
lib/zonecut.c
tests/tests.mk

index 794e7f496c2197d754c7d297e17bd48158d32aef..5b9a812ade6dc892b81c535b24b444cdb7e34961 100644 (file)
@@ -520,6 +520,18 @@ For when listening on ``localhost`` just doesn't cut it.
        > net.bufsize()
        4096
 
+.. function:: net.tcp_pipeline([len])
+
+   Get/set per-client TCP pipeline limit (number of outstanding queries that a single client connection can make in parallel). Default is 50.
+
+   Example output:
+
+   .. code-block:: lua
+
+   > net.tcp_pipeline()
+   50
+   > net.tcp_pipeline(100)
+
 Trust anchors and DNSSEC
 ^^^^^^^^^^^^^^^^^^^^^^^^
 
index 3b07a0a392ea2831039ac5cadbe2bb628598bfa3..489d48bd379ebd3efd140a085d8d146df5ee2b2f 100644 (file)
@@ -48,6 +48,13 @@ static int format_error(lua_State* L, const char *err)
        return 1;
 }
 
+static inline struct worker_ctx *wrk_luaget(lua_State *L) {
+       lua_getglobal(L, "__worker");
+       struct worker_ctx *worker = lua_touserdata(L, -1);
+       lua_pop(L, 1);
+       return worker;
+}
+
 /** List loaded modules */
 static int mod_list(lua_State *L)
 {
@@ -302,14 +309,36 @@ static int net_bufsize(lua_State *L)
        return 0;
 }
 
+/** Set TCP pipelining size. */
+static int net_pipeline(lua_State *L)
+{
+       struct worker_ctx *worker = wrk_luaget(L);
+       if (!worker) {
+               return 0;
+       }
+       if (!lua_isnumber(L, 1)) {
+               lua_pushnumber(L, worker->tcp_pipeline_max);
+               return 1;
+       }
+       int len = lua_tointeger(L, 1);
+       if (len < 0 || len > 4096) {
+               format_error(L, "tcp_pipeline must be within <0, 4096>");
+               lua_error(L);
+       }
+       worker->tcp_pipeline_max = len;
+       lua_pushnumber(L, len);
+       return 1;
+}
+
 int lib_net(lua_State *L)
 {
        static const luaL_Reg lib[] = {
-               { "list",       net_list },
-               { "listen",     net_listen },
-               { "close",      net_close },
-               { "interfaces", net_interfaces },
-               { "bufsize",    net_bufsize },
+               { "list",         net_list },
+               { "listen",       net_listen },
+               { "close",        net_close },
+               { "interfaces",   net_interfaces },
+               { "bufsize",      net_bufsize },
+               { "tcp_pipeline", net_pipeline },
                { NULL, NULL }
        };
        register_lib(L, "net", lib);
@@ -599,13 +628,6 @@ int lib_event(lua_State *L)
        return 1;
 }
 
-static inline struct worker_ctx *wrk_luaget(lua_State *L) {
-       lua_getglobal(L, "__worker");
-       struct worker_ctx *worker = lua_touserdata(L, -1);
-       lua_pop(L, 1);
-       return worker;
-}
-
 /* @internal Call the Lua callback stored in baton. */
 static void resolve_callback(struct worker_ctx *worker, struct kr_request *req, void *baton)
 {
index 1eddda6330c7e124ff625da03a982b45c7d463c6..fde0c2aa93693d1c148c8b511576e963dc546079 100644 (file)
@@ -32,6 +32,9 @@
 #ifndef QUERY_RATE_THRESHOLD
 #define QUERY_RATE_THRESHOLD (2 * MP_FREELIST_SIZE) /**< Nr of parallel queries considered as high rate */
 #endif
+#ifndef MAX_PIPELINED
+#define MAX_PIPELINED 100
+#endif
 
 /*
  * @internal These are forward decls to allow building modules with engine but without Lua.
index 373ac9fdac488166dffc30e20b07f2563d159847..439d2a947afdec76291a2e60d9ad50abecd74c56 100644 (file)
@@ -18,6 +18,7 @@
 #include <libknot/errcode.h>
 #include <contrib/ucw/lib.h>
 #include <contrib/ucw/mempool.h>
+#include <assert.h>
 
 #include "daemon/io.h"
 #include "daemon/network.h"
@@ -44,14 +45,56 @@ static void check_bufsize(uv_handle_t* handle)
 
 #undef negotiate_bufsize
 
-static void *handle_alloc(uv_loop_t *loop, size_t size)
+static void session_clear(struct session *s)
 {
-       return malloc(size);
+       assert(s->is_subreq || s->tasks.len == 0);
+       array_clear(s->tasks);
+       memset(s, 0, sizeof(*s));
 }
 
-static void handle_free(uv_handle_t *handle)
+void session_free(struct session *s)
 {
-       free(handle);
+       session_clear(s);
+       free(s);
+}
+
+struct session *session_new(void)
+{
+       return calloc(1, sizeof(struct session));
+}
+
+static struct session *session_borrow(struct worker_ctx *worker)
+{
+       struct session *s = NULL;
+       if (worker->pool_sessions.len > 0) {
+               s = array_tail(worker->pool_sessions);
+               array_pop(worker->pool_sessions);
+               kr_asan_unpoison(s, sizeof(*s));
+       } else {
+               s = session_new();
+       }
+       return s;
+}
+
+static void session_release(struct worker_ctx *worker, struct session *s)
+{
+       if (worker->pool_sessions.len < MP_FREELIST_SIZE) {
+               session_clear(s);
+               array_push(worker->pool_sessions, s);
+               kr_asan_poison(s, sizeof(*s));
+       } else {
+               session_free(s);
+       }
+}
+
+static uv_stream_t *handle_alloc(uv_loop_t *loop)
+{
+       uv_stream_t *handle = calloc(1, sizeof(*handle));
+       if (!handle) {
+               return NULL;
+       }
+
+       return handle;
 }
 
 static void handle_getbuf(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
@@ -61,14 +104,20 @@ static void handle_getbuf(uv_handle_t* handle, size_t suggested_size, uv_buf_t*
         * guaranteed to be unchanged only for the duration of
         * udp_read() and tcp_read().
         */
+       struct session *session = handle->data;
        uv_loop_t *loop = handle->loop;
        struct worker_ctx *worker = loop->data;
        buf->base = (char *)worker->wire_buf;
-       /* Use recvmmsg() on master sockets if possible. */
-       if (handle->data)
+       /* Limit TCP stream buffer size to 4K for granularity in batches of incoming queries. */
+       if (handle->type == UV_TCP) {
+               buf->len = MIN(suggested_size, 4096);
+       /* Regular buffer size for subrequests. */
+       } else if (session->is_subreq) {
                buf->len = suggested_size;
-       else
+       /* Use recvmmsg() on master sockets if possible. */
+       } else {
                buf->len = sizeof(worker->wire_buf);
+       }
 }
 
 void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
@@ -78,7 +127,7 @@ void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
        struct worker_ctx *worker = loop->data;
        if (nread <= 0) {
                if (nread < 0) { /* Error response, notify resolver */
-                       worker_exec(worker, (uv_handle_t *)handle, NULL, addr);
+                       worker_submit(worker, (uv_handle_t *)handle, NULL, addr);
                } /* nread == 0 is for freeing buffers, we don't need to do this */
                return;
        }
@@ -86,7 +135,7 @@ void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
        knot_pkt_t *query = knot_pkt_new(buf->base, nread, &worker->pkt_pool);
        if (query) {
                query->max_size = KNOT_WIRE_MAX_PKTSIZE;
-               worker_exec(worker, (uv_handle_t *)handle, query, addr);
+               worker_submit(worker, (uv_handle_t *)handle, query, addr);
        }
        mp_flush(worker->pkt_pool.ctx);
 }
@@ -101,35 +150,53 @@ int udp_bind(uv_udp_t *handle, struct sockaddr *addr)
        if (ret != 0) {
                return ret;
        }
-       handle->data = NULL;
        check_bufsize((uv_handle_t *)handle);
+       /* Handle is already created, just create context. */
+       handle->data = session_new();
+       assert(handle->data);
        return io_start_read((uv_handle_t *)handle);
 }
 
+static void tcp_timeout(uv_handle_t *timer)
+{
+       uv_handle_t *handle = timer->data;
+       uv_close(handle, io_free);
+}
+
+static void tcp_timeout_trigger(uv_timer_t *timer)
+{
+       uv_handle_t *handle = timer->data;
+       struct session *session = handle->data;
+       if (session->tasks.len > 0) {
+               uv_timer_again(timer);
+       } else {
+               uv_close((uv_handle_t *)timer, tcp_timeout);
+       }
+}
+
 static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
 {
        uv_loop_t *loop = handle->loop;
+       struct session *s = handle->data;
        struct worker_ctx *worker = loop->data;
-
-       /* Check for originator connection close. */
-       if (nread <= 0) {
-               if (handle->data) {
-                       worker_exec(worker, (uv_handle_t *)handle, NULL, NULL);
-               }
-               if (!uv_is_closing((uv_handle_t *)handle)) {
-                       uv_close((uv_handle_t *)handle, handle_free);
-               }
-               return;
-       }
-       
+       /* TCP pipelining is rather complicated and requires cooperation from the worker
+        * so the whole message reassembly and demuxing logic is inside worker */
        int ret = worker_process_tcp(worker, (uv_handle_t *)handle, (const uint8_t *)buf->base, nread);
-       if (ret == 0) {
-               /* Push - pull, stop reading from this handle until
-                * the task is finished. Since the handle has no track of the
-                * pending tasks, it might be freed before the task finishes
-                * leading various errors. */
-               uv_unref((uv_handle_t *)handle);
-               io_stop_read((uv_handle_t *)handle);
+       if (ret < 0) {
+               worker_end_tcp(worker, (uv_handle_t *)handle);
+               /* Exceeded per-connection quota for outstanding requests
+                * stop reading from stream and close after last message is processed. */
+               if (!s->is_subreq && !uv_is_closing((uv_handle_t *)&s->timeout)) {
+                       uv_timer_stop(&s->timeout);
+                       if (s->tasks.len == 0) {
+                               uv_close((uv_handle_t *)&s->timeout, tcp_timeout);
+                       } else { /* If there are tasks running, defer until they finish. */
+                               uv_timer_start(&s->timeout, tcp_timeout_trigger, 1, KR_CONN_RTT_MAX/2);
+                       }
+               }
+       /* Connection spawned more than one request, reset its deadline for next query. */
+       } else if (ret > 0 && !s->is_subreq) {
+               uv_timer_again(&s->timeout);
        }
        mp_flush(worker->pkt_pool.ctx);
 }
@@ -139,41 +206,79 @@ static void tcp_accept(uv_stream_t *master, int status)
        if (status != 0) {
                return;
        }
-
-       uv_stream_t *client = handle_alloc(master->loop, sizeof(*client));
+       uv_stream_t *client = handle_alloc(master->loop);
        if (!client) {
                return;
        }
        memset(client, 0, sizeof(*client));
        io_create(master->loop, (uv_handle_t *)client, SOCK_STREAM);
        if (uv_accept(master, client) != 0) {
-               handle_free((uv_handle_t *)client);
+               io_free((uv_handle_t *)client);
                return;
        }
 
+       /* Set deadlines for TCP connection and start reading.
+        * It will re-check every half of a request time limit if the connection
+        * is idle and should be terminated, this is an educated guess. */
+       struct session *session = client->data;
+       uv_timer_t *timer = &session->timeout;
+       uv_timer_init(master->loop, timer);
+       timer->data = client;
+       uv_timer_start(timer, tcp_timeout_trigger, KR_CONN_RTT_MAX/2, KR_CONN_RTT_MAX/2);
        io_start_read((uv_handle_t *)client);
 }
 
-int tcp_bind(uv_tcp_t *handle, struct sockaddr *addr)
+static int set_tcp_option(uv_tcp_t *handle, int option, int val)
 {
-       unsigned flags = UV_UDP_REUSEADDR;
+       uv_os_fd_t fd = 0;
+       if (uv_fileno((uv_handle_t *)handle, &fd) == 0) {
+               return setsockopt(fd, IPPROTO_TCP, option, &val, sizeof(val));
+       }
+       return 0; /* N/A */
+}
+
+static int _tcp_bind(uv_tcp_t *handle, struct sockaddr *addr, uv_connection_cb connection)
+{
+       unsigned flags = 0;
        if (addr->sa_family == AF_INET6) {
-               flags |= UV_UDP_IPV6ONLY;
+               flags |= UV_TCP_IPV6ONLY;
        }
+
        int ret = uv_tcp_bind(handle, addr, flags);
        if (ret != 0) {
                return ret;
        }
 
-       ret = uv_listen((uv_stream_t *)handle, 16, tcp_accept);
+       /* TCP_DEFER_ACCEPT delays accepting connections until there is readable data. */
+#ifdef TCP_DEFER_ACCEPT
+       if (set_tcp_option(handle, TCP_DEFER_ACCEPT, KR_CONN_RTT_MAX/1000) != 0) {
+               kr_log_info("[ io ] tcp_bind (defer_accept): %s\n", strerror(errno));
+       }
+#endif
+
+       ret = uv_listen((uv_stream_t *)handle, 16, connection);
        if (ret != 0) {
                return ret;
        }
 
+       /* TCP_FASTOPEN enables 1 RTT connection resumptions. */
+#ifdef TCP_FASTOPEN
+# ifdef __linux__
+       (void) set_tcp_option(handle, TCP_FASTOPEN, 16); /* Accepts queue length hint */
+# else
+       (void) set_tcp_option(handle, TCP_FASTOPEN, 1);  /* Accepts on/off */
+# endif
+#endif
+
        handle->data = NULL;
        return 0;
 }
 
+int tcp_bind(uv_tcp_t *handle, struct sockaddr *addr)
+{
+       return _tcp_bind(handle, addr, tcp_accept);
+}
+
 void io_create(uv_loop_t *loop, uv_handle_t *handle, int type)
 {
        if (type == SOCK_DGRAM) {
@@ -182,6 +287,34 @@ void io_create(uv_loop_t *loop, uv_handle_t *handle, int type)
                uv_tcp_init(loop, (uv_tcp_t *)handle);
                uv_tcp_nodelay((uv_tcp_t *)handle, 1);
        }
+
+       struct worker_ctx *worker = loop->data;
+       handle->data = session_borrow(worker);
+       assert(handle->data);
+}
+
+void io_deinit(uv_handle_t *handle)
+{
+       if (!handle) {
+               return;
+       }
+       uv_loop_t *loop = handle->loop;
+       if (loop && loop->data) {
+               struct worker_ctx *worker = loop->data;
+               session_release(worker, handle->data);
+       } else {
+               session_free(handle->data);
+       }
+       handle->data = NULL;
+}
+
+void io_free(uv_handle_t *handle)
+{
+       if (!handle) {
+               return;
+       }
+       io_deinit(handle);
+       free(handle);
 }
 
 int io_start_read(uv_handle_t *handle)
index b47805a6035a16b6721cef4c6975158d6a0b7fa6..5cdd2de761aa1acdab8480fd68202919614d038c 100644 (file)
 
 #include <uv.h>
 #include <libknot/packet/pkt.h>
+#include "lib/generic/array.h"
+
+struct qr_task;
+
+/* Per-session (TCP or UDP) persistent structure,
+ * that exists between remote counterpart and a local socket.
+ */
+struct session {
+       bool is_subreq;
+    bool throttled;
+    uv_timer_t timeout;
+    struct qr_task *buffering;
+       array_t(struct qr_task *) tasks;
+};
+
+void session_free(struct session *s);
+struct session *session_new(void);
 
 int udp_bind(uv_udp_t *handle, struct sockaddr *addr);
 int tcp_bind(uv_tcp_t *handle, struct sockaddr *addr);
+
 void io_create(uv_loop_t *loop, uv_handle_t *handle, int type);
+void io_deinit(uv_handle_t *handle);
+void io_free(uv_handle_t *handle);
+
 int io_start_read(uv_handle_t *handle);
-int io_stop_read(uv_handle_t *handle);
\ No newline at end of file
+int io_stop_read(uv_handle_t *handle);
index 334edcf55fe585593f4f6d94120963ef37abcc59..26bd72fd8235f988b83e7f1a06c5d68ad31b2273 100644 (file)
@@ -21,7 +21,8 @@
 #include "daemon/io.h"
 
 /* libuv 1.7.0+ is able to support SO_REUSEPORT for loadbalancing */
-#if (defined(ENABLE_REUSEPORT) || defined(UV_VERSION_HEX)) && (__linux__ && SO_REUSEPORT)
+#if defined(UV_VERSION_HEX)
+#if (__linux__ && SO_REUSEPORT)
   #define handle_init(type, loop, handle, family) do { \
        uv_ ## type ## _init_ex((loop), (handle), (family)); \
        uv_os_fd_t fd = 0; \
                setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &on, sizeof(on)); \
        } \
   } while (0)
+/* libuv 1.7.0+ is able to assign fd immediately */
+#else
+  #define handle_init(type, loop, handle, family) do { \
+       uv_ ## type ## _init_ex((loop), (handle), (family)); \
+  } while (0)
+#endif
 #else
   #define handle_init(type, loop, handle, family) \
        uv_ ## type ## _init((loop), (handle))
index 77db270dedac444b1c514bdefe95b39e8b2e3299..622927a6256085db068a5b59e8e6ac08c385133f 100644 (file)
@@ -24,7 +24,7 @@
 enum endpoint_flag {
     NET_DOWN = 0 << 0,
     NET_UDP  = 1 << 0,
-    NET_TCP  = 1 << 1
+    NET_TCP  = 1 << 1,
 };
 
 struct endpoint {
index 5bdd7c2a6eae84174b510458d3fb42d3f35dcfee..7ed015a96db24f43a52b0da1e2c2d20187e4c1da 100644 (file)
 #include "daemon/engine.h"
 #include "daemon/io.h"
 
-/* @internal IO request entry. */
-struct ioreq
+/* @internal Union of various libuv objects for freelist. */
+struct req
 {
        union {
+               /* Socket handles, these have session as their `handle->data` and own it. */
                uv_udp_t      udp;
                uv_tcp_t      tcp;
+               /* I/O events, these have only a reference to the task they're operating on. */
                uv_udp_send_t send;
                uv_write_t    write;
                uv_connect_t  connect;
+               /* Timer events */
+               uv_timer_t    timer;
        } as;
 };
 
-/** @internal Number of request within timeout window. */
-#define MAX_PENDING (KR_NSREP_MAXADDR + (KR_NSREP_MAXADDR / 2))
-
 /** @internal Debugging facility. */
 #ifdef DEBUG
 #define DEBUG_MSG(fmt...) printf("[daem] " fmt)
@@ -53,35 +54,6 @@ struct ioreq
 #define DEBUG_MSG(fmt...)
 #endif
 
-/** @internal Query resolution task. */
-struct qr_task
-{
-       struct kr_request req;
-       struct worker_ctx *worker;
-       knot_pkt_t *pktbuf;
-       array_t(struct qr_task *) waiting;
-       uv_handle_t *pending[MAX_PENDING];
-       uint16_t pending_count;
-       uint16_t addrlist_count;
-       uint16_t addrlist_turn;
-       struct sockaddr *addrlist;
-       uv_timer_t retry, timeout;
-       worker_cb_t on_complete;
-       void *baton;
-       struct {
-               union {
-                       struct sockaddr_in ip4;
-                       struct sockaddr_in6 ip6;
-               } addr;
-               uv_handle_t *handle;
-       } source;
-       uint16_t iter_count;
-       uint16_t refs;
-       uint16_t bytes_remaining;
-       bool finished;
-       bool leading;
-};
-
 /* Convenience macros */
 #define qr_task_ref(task) \
        do { ++(task)->refs; } while(0)
@@ -91,6 +63,7 @@ struct qr_task
        (!uv_is_closing((checked)) || (task)->source.handle == (checked))
 
 /* Forward decls */
+static void qr_task_free(struct qr_task *task);
 static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *packet);
 
 /** @internal Get singleton worker. */
@@ -99,58 +72,72 @@ static inline struct worker_ctx *get_worker(void)
        return uv_default_loop()->data;
 }
 
-static inline struct ioreq *ioreq_take(struct worker_ctx *worker)
+static inline struct req *req_borrow(struct worker_ctx *worker)
 {
-       struct ioreq *req = NULL;
-       if (worker->ioreqs.len > 0) {
-               req = array_tail(worker->ioreqs);
-               array_pop(worker->ioreqs);
+       struct req *req = NULL;
+       if (worker->pool_ioreq.len > 0) {
+               req = array_tail(worker->pool_ioreq);
+               array_pop(worker->pool_ioreq);
+               kr_asan_unpoison(req, sizeof(*req));
        } else {
                req = malloc(sizeof(*req));
        }
-       kr_asan_unpoison(req, sizeof(*req));
        return req;
 }
 
-static inline void ioreq_release(struct worker_ctx *worker, struct ioreq *req)
+static inline void req_release(struct worker_ctx *worker, struct req *req)
 {
-       kr_asan_poison(req, sizeof(*req));
-       if (!req || worker->ioreqs.len < 4 * MP_FREELIST_SIZE) {
-               array_push(worker->ioreqs, req);
+       if (!req || worker->pool_ioreq.len < 4 * MP_FREELIST_SIZE) {
+               array_push(worker->pool_ioreq, req);
+               kr_asan_poison(req, sizeof(*req));
        } else {
                free(req);
        }
 }
 
+/*! @internal Create a UDP/TCP handle */
 static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype)
 {
        if (task->pending_count >= MAX_PENDING) {
                return NULL;
        }
        /* Create connection for iterative query */
-       uv_handle_t *req = (uv_handle_t *)ioreq_take(task->worker);
-       if (!req) {
+       uv_handle_t *handle = (uv_handle_t *)req_borrow(task->worker);
+       if (!handle) {
+               return NULL;
+       }
+       io_create(task->worker->loop, handle, socktype);
+       /* Set current handle as a subrequest type. */
+       struct session *session = handle->data;
+       session->is_subreq = true;
+       int ret = array_push(session->tasks, task);
+       if (ret != 0) {
+               io_deinit(handle);
+               req_release(task->worker, (struct req *)handle);
                return NULL;
        }
-       io_create(task->worker->loop, req, socktype);
-       req->data = task;
+       qr_task_ref(task);
        /* Connect or issue query datagram */
-       task->pending[task->pending_count] = req;
+       task->pending[task->pending_count] = handle;
        task->pending_count += 1;
-       return req;
+       return handle;
 }
 
 static void ioreq_on_close(uv_handle_t *handle)
 {
        struct worker_ctx *worker = get_worker();
-       ioreq_release(worker, (struct ioreq *)handle);
+       /* Handle-type events own a session, must close it. */
+       struct session *session = handle->data;
+       struct qr_task *task = session->tasks.at[0];
+       io_deinit(handle);
+       qr_task_unref(task);
+       req_release(worker, (struct req *)handle);
 }
 
 static void ioreq_kill(uv_handle_t *req)
 {
        assert(req);
        if (!uv_is_closing(req)) {
-               io_stop_read(req);
                uv_close(req, ioreq_on_close);
        }
 }
@@ -187,51 +174,44 @@ static void mp_poison(struct mempool *mp, bool poison)
 #endif
 /** @endcond */
 
-static inline struct mempool *pool_take(struct worker_ctx *worker)
+static inline struct mempool *pool_borrow(struct worker_ctx *worker)
 {
        /* Recycle available mempool if possible */
        struct mempool *mp = NULL;
-       if (worker->pools.len > 0) {
-               mp = array_tail(worker->pools);
-               array_pop(worker->pools);
+       if (worker->pool_mp.len > 0) {
+               mp = array_tail(worker->pool_mp);
+               array_pop(worker->pool_mp);
+               mp_poison(mp, 0);
        } else { /* No mempool on the freelist, create new one */
                mp = mp_new (4 * CPU_PAGE_SIZE);
        }
-       mp_poison(mp, 0);
        return mp;
 }
 
 static inline void pool_release(struct worker_ctx *worker, struct mempool *mp)
 {
        /* Return mempool to ring or free it if it's full */
-       if (worker->pools.len < MP_FREELIST_SIZE) {
+       if (worker->pool_mp.len < MP_FREELIST_SIZE) {
                mp_flush(mp);
-               array_push(worker->pools, mp);
+               array_push(worker->pool_mp, mp);
                mp_poison(mp, 1);
        } else {
                mp_delete(mp);
        }
 }
 
-static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr *addr)
+static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *handle, const struct sockaddr *addr)
 {
        /* How much can client handle? */
        struct engine *engine = worker->engine;
-       size_t answer_max = KNOT_WIRE_MIN_PKTSIZE;
        size_t pktbuf_max = KR_EDNS_PAYLOAD;
        if (engine->resolver.opt_rr) {
                pktbuf_max = MAX(knot_edns_get_payload(engine->resolver.opt_rr), pktbuf_max);
        }
-       if (!addr && handle) { /* TCP */
-               answer_max = KNOT_WIRE_MAX_PKTSIZE;
-               pktbuf_max = KNOT_WIRE_MAX_PKTSIZE;
-       } else if (knot_pkt_has_edns(query)) { /* EDNS */
-               answer_max = MAX(knot_edns_get_payload(query->opt_rr), KNOT_WIRE_MIN_PKTSIZE);
-       }
 
        /* Recycle available mempool if possible */
        knot_mm_t pool = {
-               .ctx = pool_take(worker),
+               .ctx = pool_borrow(worker),
                .alloc = (knot_mm_alloc_t) mp_alloc
        };
 
@@ -244,27 +224,26 @@ static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *ha
        /* Create packet buffers for answer and subrequests */
        task->req.pool = pool;
        knot_pkt_t *pktbuf = knot_pkt_new(NULL, pktbuf_max, &task->req.pool);
-       knot_pkt_t *answer = knot_pkt_new(NULL, answer_max, &task->req.pool);
-       if (!pktbuf || !answer) {
+       if (!pktbuf) {
                mp_delete(pool.ctx);
                return NULL;
        }
-       task->req.answer = answer;
+       pktbuf->size = 0;
+       task->req.answer = NULL;
        task->pktbuf = pktbuf;
        array_init(task->waiting);
        task->addrlist = NULL;
        task->pending_count = 0;
        task->bytes_remaining = 0;
        task->iter_count = 0;
+       task->timeouts = 0;
        task->refs = 1;
        task->finished = false;
        task->leading = false;
        task->worker = worker;
+       task->session = NULL;
        task->source.handle = handle;
-       uv_timer_init(worker->loop, &task->retry);
-       uv_timer_init(worker->loop, &task->timeout);
-       task->retry.data = task;
-       task->timeout.data = task;
+       task->timeout = NULL;
        task->on_complete = NULL;
        task->req.qsource.key = NULL;
        task->req.qsource.addr = NULL;
@@ -278,27 +257,36 @@ static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *ha
        } else {
                task->source.addr.ip4.sin_family = AF_UNSPEC;
        }
-       /* Remember query source TSIG key */
-       if (query->tsig_rr) {
-               task->req.qsource.key = knot_rrset_copy(query->tsig_rr, &task->req.pool);
-       }
-
-       /* Start resolution */
-       kr_resolve_begin(&task->req, &engine->resolver, answer);
        worker->stats.concurrent += 1;
-       worker->stats.queries += 1;
-       /* Throttle outbound queries only when high pressure */
-       if (worker->stats.concurrent < QUERY_RATE_THRESHOLD) {
-               task->req.options |= QUERY_NO_THROTTLE;
-       }
        return task;
 }
 
 /* This is called when the task refcount is zero, free memory. */
 static void qr_task_free(struct qr_task *task)
 {
-       /* Return mempool to ring or free it if it's full */
+       struct session *session = task->session;
+       if (session) {
+               /* Walk the session task list and remove itself. */
+               for (size_t i = 0; i < session->tasks.len; ++i) {
+                       if (session->tasks.at[i] == task) {
+                               array_del(session->tasks, i);
+                               break;          
+                       }
+               }
+               /* Start reading again if the session is throttled and
+                * the number of outstanding requests is below watermark. */
+               uv_handle_t *handle = task->source.handle;
+               if (handle && session->tasks.len < task->worker->tcp_pipeline_max/2) {
+                       if (!uv_is_closing(handle) && session->throttled) {
+                               io_start_read(handle);
+                               session->throttled = false;
+                       }
+               }
+       }
+       /* Update stats */
        struct worker_ctx *worker = task->worker;
+       worker->stats.concurrent -= 1;
+       /* Return mempool to ring or free it if it's full */
        pool_release(worker, task->req.pool.ctx);
        /* @note The 'task' is invalidated from now on. */
        /* Decommit memory every once in a while */
@@ -312,17 +300,65 @@ static void qr_task_free(struct qr_task *task)
        }
 }
 
-/* This is called when retry timer closes */
-static void retransmit_close(uv_handle_t *handle)
+static int qr_task_start(struct qr_task *task, knot_pkt_t *query)
 {
-       struct qr_task *task = handle->data;
-       qr_task_unref(task);
+       assert(task && query);
+       size_t answer_max = KNOT_WIRE_MIN_PKTSIZE;
+       if (!task->source.handle || task->source.handle->type == UV_TCP) {
+               answer_max = KNOT_WIRE_MAX_PKTSIZE;
+       } else if (knot_pkt_has_edns(query)) { /* EDNS */
+               answer_max = MAX(knot_edns_get_payload(query->opt_rr), KNOT_WIRE_MIN_PKTSIZE);
+       }
+
+       knot_pkt_t *answer = knot_pkt_new(NULL, answer_max, &task->req.pool);
+       if (!answer) {
+               return kr_error(ENOMEM);
+       }
+       task->req.answer = answer;
+
+       /* Remember query source TSIG key */
+       if (query->tsig_rr) {
+               task->req.qsource.key = knot_rrset_copy(query->tsig_rr, &task->req.pool);
+       }
+
+       /* Start resolution */
+       struct worker_ctx *worker = task->worker;
+       struct engine *engine = worker->engine;
+       kr_resolve_begin(&task->req, &engine->resolver, answer);
+       worker->stats.queries += 1;
+       /* Throttle outbound queries only when high pressure */
+       if (worker->stats.concurrent < QUERY_RATE_THRESHOLD) {
+               task->req.options |= QUERY_NO_THROTTLE;
+       }
+       return 0;
 }
 
-/* This is called when task completes and timeout timer is closed. */
-static void qr_task_complete(uv_handle_t *handle)
+/*@ Register qr_task within session. */
+static int qr_task_register(struct qr_task *task, struct session *session)
+{
+       int ret = array_reserve(session->tasks, session->tasks.len + 1);
+       if (ret != 0) {
+               return kr_error(ENOMEM);
+       }
+       array_push(session->tasks, task);
+       task->session = session;
+       /* Soft-limit on parallel queries, there is no "slow down" RCODE
+        * that we could use to signalize to client, but we can stop reading,
+        * an in effect shrink TCP window size. To get more precise throttling,
+        * we would need to copy remainder of the unread buffer and reassemble
+        * when resuming reading. This is NYI.  */
+       if (session->tasks.len >= task->worker->tcp_pipeline_max) {
+               uv_handle_t *handle = task->source.handle;
+               if (handle && !session->throttled && !uv_is_closing(handle)) {
+                       io_stop_read(handle);
+                       session->throttled = true;
+               }
+       }
+       return 0;
+}
+
+static void qr_task_complete(struct qr_task *task)
 {
-       struct qr_task *task = handle->data;
        struct worker_ctx *worker = task->worker;
        /* Kill pending I/O requests */
        ioreq_killall(task);
@@ -332,51 +368,8 @@ static void qr_task_complete(uv_handle_t *handle)
        if (task->on_complete) {
                task->on_complete(worker, &task->req, task->baton);
        }
-       /* Return handle to the event loop in case
-        * it was exclusively taken by this task. */
-       if (task->source.handle && !uv_has_ref(task->source.handle)) {
-               uv_ref(task->source.handle);
-               io_start_read(task->source.handle);
-       }
-       /* Release task */
+       /* Release primary reference to task. */
        qr_task_unref(task);
-       /* Update stats */
-       worker->stats.concurrent -= 1;
-}
-
-/* This is called when I/O timeouts */
-static void on_timeout(uv_timer_t *req)
-{
-       struct qr_task *task = req->data;
-       uv_handle_t *handle = (uv_handle_t *)req;
-#ifdef DEBUG
-       char qname_str[KNOT_DNAME_MAXLEN] = {'\0'}, type_str[16] = {'\0'};
-       knot_dname_to_str(qname_str, knot_pkt_qname(task->pktbuf), sizeof(qname_str));
-       knot_rrtype_to_string(knot_pkt_qtype(task->pktbuf), type_str, sizeof(type_str));
-       DEBUG_MSG("ioreq timeout %s %s %p\n", qname_str, type_str, req);
-#endif
-       /* Ignore if this timeout is being terminated. */
-       if (uv_is_closing(handle)) {
-               return;
-       }
-       /* Penalize all tried nameservers with a timeout. */
-       struct worker_ctx *worker = task->worker;
-       if (task->leading && task->pending_count > 0) {
-               struct kr_query *qry = array_tail(task->req.rplan.pending);
-               struct sockaddr_in6 *addrlist = (struct sockaddr_in6 *)task->addrlist;
-               for (uint16_t i = 0; i < MIN(task->pending_count, task->addrlist_count); ++i) {
-                       struct sockaddr *choice = (struct sockaddr *)(&addrlist[i]);
-                       WITH_DEBUG {
-                               char addr_str[INET6_ADDRSTRLEN];
-                               inet_ntop(choice->sa_family, kr_inaddr(choice), addr_str, sizeof(addr_str));
-                               QRDEBUG(qry, "wrkr", "=> server: '%s' flagged as 'bad'\n", addr_str);
-                       }
-                       kr_nsrep_update_rtt(&qry->ns, choice, KR_NS_TIMEOUT, worker->engine->resolver.cache_rtt);
-               }
-       }
-       /* Interrupt current pending request. */
-       worker->stats.timeout += 1;
-       qr_task_step(task, NULL, NULL);
 }
 
 /* This is called when we send subrequest / answer */
@@ -385,15 +378,10 @@ static int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status
        if (!task->finished) {
                if (status == 0 && handle) {
                        io_start_read(handle); /* Start reading new query */
-               } else {
-                       DEBUG_MSG("ioreq send_done %p => %d, %s\n", handle, status, uv_strerror(status));
                }
        } else {
-               /* Close retry timer (borrows task) */
-               qr_task_ref(task);
-               uv_close((uv_handle_t *)&task->retry, retransmit_close);
-               /* Close timeout timer (finishes task) */
-               uv_close((uv_handle_t *)&task->timeout, qr_task_complete);
+               assert(task->timeout == NULL);
+               qr_task_complete(task);
        }
        return status;
 }
@@ -406,7 +394,7 @@ static void on_send(uv_udp_send_t *req, int status)
                qr_task_on_send(task, (uv_handle_t *)req->handle, status);
        }
        qr_task_unref(task);
-       ioreq_release(worker, (struct ioreq *)req);
+       req_release(worker, (struct req *)req);
 }
 
 static void on_write(uv_write_t *req, int status)
@@ -417,7 +405,7 @@ static void on_write(uv_write_t *req, int status)
                qr_task_on_send(task, (uv_handle_t *)req->handle, status);
        }
        qr_task_unref(task);
-       ioreq_release(worker, (struct ioreq *)req);
+       req_release(worker, (struct req *)req);
 }
 
 static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockaddr *addr, knot_pkt_t *pkt)
@@ -425,7 +413,7 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockad
        if (!handle) {
                return qr_task_on_send(task, handle, kr_error(EIO));
        }
-       struct ioreq *send_req = ioreq_take(task->worker);
+       struct req *send_req = req_borrow(task->worker);
        if (!send_req) {
                return qr_task_on_send(task, handle, kr_error(ENOMEM));
        }
@@ -448,8 +436,7 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockad
        if (ret == 0) {
                qr_task_ref(task); /* Pending ioreq on current task */
        } else {
-               DEBUG_MSG("ioreq send_start %p => %d, %s\n", send_req, ret, uv_strerror(ret));
-               ioreq_release(task->worker, send_req);
+               req_release(task->worker, send_req);
        }
 
        /* Update statistics */
@@ -475,17 +462,53 @@ static void on_connect(uv_connect_t *req, int status)
                if (status == 0) {
                        qr_task_send(task, (uv_handle_t *)handle, NULL, task->pktbuf);
                } else {
-                       DEBUG_MSG("ioreq conn_done %p => %d, %s\n", req, status, uv_strerror(status));
                        qr_task_step(task, task->addrlist, NULL);
                }
        }
        qr_task_unref(task);
-       ioreq_release(worker, (struct ioreq *)req);
+       req_release(worker, (struct req *)req);
+}
+
+static void on_timer_close(uv_handle_t *handle)
+{
+       struct qr_task *task = handle->data;
+       req_release(task->worker, (struct req *)handle);
+       qr_task_unref(task);
+}
+
+/* This is called when I/O timeouts */
+static void on_timeout(uv_timer_t *req)
+{
+       struct qr_task *task = req->data;
+
+       /* Penalize all tried nameservers with a timeout. */
+       struct worker_ctx *worker = task->worker;
+       if (task->leading && task->pending_count > 0) {
+               struct kr_query *qry = array_tail(task->req.rplan.pending);
+               struct sockaddr_in6 *addrlist = (struct sockaddr_in6 *)task->addrlist;
+               for (uint16_t i = 0; i < MIN(task->pending_count, task->addrlist_count); ++i) {
+                       struct sockaddr *choice = (struct sockaddr *)(&addrlist[i]);
+                       WITH_DEBUG {
+                               char addr_str[INET6_ADDRSTRLEN];
+                               inet_ntop(choice->sa_family, kr_inaddr(choice), addr_str, sizeof(addr_str));
+                               QRDEBUG(qry, "wrkr", "=> server: '%s' flagged as 'bad'\n", addr_str);
+                       }
+                       kr_nsrep_update_rtt(&qry->ns, choice, KR_NS_TIMEOUT, worker->engine->resolver.cache_rtt);
+               }
+       }
+       /* Release timer handle */
+       task->timeout = NULL;
+       req_release(worker, (struct req *)req);
+       /* Interrupt current pending request. */
+       task->timeouts += 1;
+       worker->stats.timeout += 1;
+       qr_task_step(task, NULL, NULL);
+       qr_task_unref(task); /* Return borrowed task */
 }
 
 static bool retransmit(struct qr_task *task)
 {
-       if (task && task->addrlist) {
+       if (task && task->addrlist && task->addrlist_count > 0) {
                uv_handle_t *subreq = ioreq_spawn(task, SOCK_DGRAM);
                if (subreq) { /* Create connection for iterative query */
                        struct sockaddr_in6 *choice = &((struct sockaddr_in6 *)task->addrlist)[task->addrlist_turn];
@@ -500,13 +523,38 @@ static bool retransmit(struct qr_task *task)
 
 static void on_retransmit(uv_timer_t *req)
 {
-       if (uv_is_closing((uv_handle_t *)req))
-               return;
+       uv_timer_stop(req);
+       struct qr_task *task = req->data;
        if (!retransmit(req->data)) {
-               uv_timer_stop(req); /* Not possible to spawn request, stop trying */
+               /* Not possible to spawn request, start timeout timer with remaining deadline. */
+               uint64_t timeout = KR_CONN_RTT_MAX - task->pending_count * KR_CONN_RETRY;
+               uv_timer_start(req, on_timeout, timeout, 0);
+       } else {
+               uv_timer_start(req, on_retransmit, KR_CONN_RETRY, 0);
        }
 }
 
+static int timer_start(struct qr_task *task, uv_timer_cb cb, uint64_t timeout, uint64_t repeat)
+{
+       assert(task->timeout == NULL);
+       struct worker_ctx *worker = task->worker;
+       uv_timer_t *timer = (uv_timer_t *)req_borrow(worker);
+       if (!timer) {
+               return kr_error(ENOMEM);
+       }
+       uv_timer_init(worker->loop, timer);
+       int ret = uv_timer_start(timer, cb, timeout, repeat);
+       if (ret != 0) {
+               uv_timer_stop(timer);
+               req_release(worker, (struct req *)timer);
+               return kr_error(ENOMEM);
+       }
+       timer->data = task;
+       qr_task_ref(task);
+       task->timeout = timer;
+       return 0;
+}
+
 /** @internal Get key from current outstanding subrequest. */
 static int subreq_key(char *dst, struct qr_task *task)
 {
@@ -518,11 +566,14 @@ static int subreq_key(char *dst, struct qr_task *task)
 
 static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *pkt)
 {
-       /* Close pending I/O requests */
-       if (uv_is_active((uv_handle_t *)&task->retry))
-               uv_timer_stop(&task->retry);
-       if (uv_is_active((uv_handle_t *)&task->timeout))
-               uv_timer_stop(&task->timeout);
+       /* Close pending timer */
+       if (task->timeout) {
+               /* Timer was running so it holds reference to task, make sure the timer event
+                * never fires and release the reference on timer close instead. */
+               uv_timer_stop(task->timeout);
+               uv_close((uv_handle_t *)task->timeout, on_timer_close);
+               task->timeout = NULL;
+       }
        ioreq_killall(task);
        /* Clear from outstanding table. */
        if (!task->leading)
@@ -594,7 +645,7 @@ static int qr_task_finalize(struct qr_task *task, int state)
 static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *packet)
 {
        /* No more steps after we're finished. */
-       if (task->finished) {
+       if (!task || task->finished) {
                return kr_error(ESTALE);
        }
        /* Close pending I/O requests */
@@ -607,8 +658,7 @@ static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_sour
        int state = kr_resolve_consume(&task->req, packet_source, packet);
        while (state == KNOT_STATE_PRODUCE) {
                state = kr_resolve_produce(&task->req, &task->addrlist, &sock_type, task->pktbuf);
-               if (unlikely(++task->iter_count > KR_ITER_LIMIT)) {
-                       DEBUG_MSG("task iter_limit %p\n", task);
+               if (unlikely(++task->iter_count > KR_ITER_LIMIT || task->timeouts >= KR_TIMEOUT_LIMIT)) {
                        return qr_task_finalize(task, KNOT_STATE_FAIL);
                }
        }
@@ -628,6 +678,7 @@ static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_sour
        }
 
        /* Start fast retransmit with UDP, otherwise connect. */
+       int ret = 0;
        if (sock_type == SOCK_DGRAM) {
                /* If such subrequest is outstanding, enqueue to it. */
                if (subreq_enqueue(task)) {
@@ -635,7 +686,7 @@ static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_sour
                }
                /* Start transmitting */
                if (retransmit(task)) {
-                       uv_timer_start(&task->retry, on_retransmit, KR_CONN_RETRY, KR_CONN_RETRY);
+                       ret = timer_start(task, on_retransmit, KR_CONN_RETRY, 0);
                } else {
                        return qr_task_step(task, NULL, NULL);
                }
@@ -644,38 +695,37 @@ static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_sour
                 */
                subreq_lead(task);
        } else {
-               struct ioreq *conn = ioreq_take(task->worker);
+               uv_connect_t *conn = (uv_connect_t *)req_borrow(task->worker);
                if (!conn) {
                        return qr_task_step(task, NULL, NULL);
                }
                uv_handle_t *client = ioreq_spawn(task, sock_type);
                if (!client) {
-                       ioreq_release(task->worker, conn);
+                       req_release(task->worker, (struct req *)conn);
                        return qr_task_step(task, NULL, NULL);
                }
-               conn->as.connect.data = task;
-               if (uv_tcp_connect(&conn->as.connect, (uv_tcp_t *)client, task->addrlist, on_connect) != 0) {
-                       ioreq_release(task->worker, conn);
+               conn->data = task;
+               if (uv_tcp_connect(conn, (uv_tcp_t *)client, task->addrlist, on_connect) != 0) {
+                       req_release(task->worker, (struct req *)conn);
                        return qr_task_step(task, NULL, NULL);
                }
-               /* Connect request borrows task */
-               qr_task_ref(task);
+               qr_task_ref(task); /* Connect request borrows task */
+               ret = timer_start(task, on_timeout, KR_CONN_RTT_MAX, 0);
        }
 
        /* Start next step with timeout, fatal if can't start a timer. */
-       int ret = uv_timer_start(&task->timeout, on_timeout, KR_CONN_RTT_MAX, 0);
        if (ret != 0) {
                subreq_finalize(task, packet_source, packet);
                return qr_task_finalize(task, KNOT_STATE_FAIL);
        }
-
-       return ret;
+       return 0;
 }
 
 static int parse_packet(knot_pkt_t *query)
 {
-       if (!query)
+       if (!query){
                return kr_error(EINVAL);
+       }
 
        /* Parse query packet. */
        int ret = knot_pkt_parse(query, 0);
@@ -691,90 +741,185 @@ static int parse_packet(knot_pkt_t *query)
        return kr_ok();
 }
 
-int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr* addr)
+int worker_submit(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *msg, const struct sockaddr* addr)
 {
        if (!worker || !handle) {
                return kr_error(EINVAL);
        }
 
+       struct session *session = handle->data;
+       assert(session);
+
        /* Parse packet */
-       int ret = parse_packet(query);
+       int ret = parse_packet(msg);
 
-       /* Start new task on master sockets, or resume existing */
-       struct qr_task *task = handle->data;
-       bool is_master_socket = (!task);
-       if (is_master_socket) {
+       /* Start new task on listening sockets, or resume if this is subrequest */
+       struct qr_task *task = NULL;
+       if (!session->is_subreq) {
                /* Ignore badly formed queries or responses. */
-               if (!query || ret != 0 || knot_wire_get_qr(query->wire)) {
-                       DEBUG_MSG("task bad_query %p => %d, %s\n", task, ret, kr_strerror(ret));
-                       worker->stats.dropped += 1;
+               if (!msg || ret != 0 || knot_wire_get_qr(msg->wire)) {
+                       if (msg) worker->stats.dropped += 1;
                        return kr_error(EINVAL); /* Ignore. */
                }
-               task = qr_task_create(worker, handle, query, addr);
+               task = qr_task_create(worker, handle, addr);
                if (!task) {
                        return kr_error(ENOMEM);
                }
+               ret = qr_task_start(task, msg);
+               if (ret != 0) {
+                       qr_task_free(task);
+                       return kr_error(ENOMEM);
+               }
+       } else {
+               task = session->tasks.len > 0 ? array_tail(session->tasks) : NULL;
        }
 
-       /* Consume input and produce next query */
-       return qr_task_step(task, addr, query);
+       /* Consume input and produce next message */
+       return qr_task_step(task, addr, msg);
 }
 
 /* Return DNS/TCP message size. */
-static int msg_size(const uint8_t *msg, size_t len)
+static int msg_size(const uint8_t *msg)
 {
-               if (len < 2) {
-                       return kr_error(EMSGSIZE);
-               }
                return wire_read_u16(msg);
 }
 
-int worker_process_tcp(struct worker_ctx *worker, uv_handle_t *handle, const uint8_t *msg, size_t len)
+/* If buffering, close last task as it isn't live yet. */
+static void discard_buffered(struct session *session)
+{
+       if (session->buffering) {
+               qr_task_free(session->buffering);
+               session->buffering = NULL;
+       }
+}
+
+int worker_end_tcp(struct worker_ctx *worker, uv_handle_t *handle)
 {
-       if (!worker || !handle || !msg) {
+       if (!worker || !handle) {
                return kr_error(EINVAL);
        }
+       /* If this is subrequest, notify parent task with empty input
+        * because in this case session doesn't own tasks, it has just
+        * borrowed the task from parent session. */
+       struct session *session = handle->data;
+       if (session->is_subreq) {
+               worker_submit(worker, (uv_handle_t *)handle, NULL, NULL);       
+       } else {
+               discard_buffered(session);
+       }
+       return 0;
+}
 
-       int nbytes = msg_size(msg, len);
-       struct qr_task *task = handle->data;
-       const bool start_assembly = (task && task->bytes_remaining == 0);
+int worker_process_tcp(struct worker_ctx *worker, uv_handle_t *handle, const uint8_t *msg, ssize_t len)
+{
+       if (!worker || !handle) {
+               return kr_error(EINVAL);
+       }
+       /* Connection error or forced disconnect */
+       struct session *session = handle->data;
+       if (len <= 0 || !msg) {
+               /* If we have pending tasks, we must dissociate them from the
+                * connection so they don't try to access closed and freed handle */
+               for (size_t i = 0; i < session->tasks.len; ++i) {
+                       struct qr_task *task = session->tasks.at[i];
+                       task->session = NULL;
+                       task->source.handle = NULL;
+               }
+               session->tasks.len = 0;
+               return kr_error(ECONNRESET);
+       }
+
+       int submitted = 0;
+       ssize_t nbytes = 0;
+       struct qr_task *task = session->buffering;
 
-       /* Message is a query (we have no context to buffer it) or complete. */
-       if (!task || (start_assembly && nbytes == len - 2)) {
-               if (nbytes <= 0) {
-                       return worker_exec(worker, (uv_handle_t *)handle, NULL, NULL);  
+       /* If this is a new query, create a new task that we can use
+        * to buffer incoming message until it's complete. */
+       if (!session->is_subreq) {
+               if (!task) {
+                       task = qr_task_create(worker, handle, NULL);
+                       if (!task) {
+                               return kr_error(ENOMEM);
+                       }
+                       session->buffering = task;
                }
-               knot_pkt_t *pkt_nocopy = knot_pkt_new((void *)(msg + 2), nbytes, &worker->pkt_pool);
-               return worker_exec(worker, handle, pkt_nocopy, NULL);
+       } else {
+               assert(session->tasks.len > 0);
+               task = array_tail(session->tasks);
        }
-       /* Starting a new message assembly */
+       /* At this point session must have either created new task or it's already assigned. */
+       assert(task);
+       assert(len > 0);
+       /* Start reading DNS/TCP message length */
        knot_pkt_t *pkt_buf = task->pktbuf;
-       if (start_assembly) {
-               if (nbytes <= 0) {
-                       return worker_exec(worker, (uv_handle_t *)handle, NULL, NULL);  
-               }       
+       if (task->bytes_remaining == 0 && pkt_buf->size == 0) {
                knot_pkt_clear(pkt_buf);
+               /* Read only one byte as TCP fragment may end at a 1B boundary
+                * which would lead to OOB read or improper reassembly length. */
+               pkt_buf->size = 1;
+               pkt_buf->wire[0] = msg[0];
+               len -= 1;
+               msg += 1;
+               if (len == 0) {
+                       return 0;
+               }
+       }
+       /* Finish reading DNS/TCP message length. */
+       if (task->bytes_remaining == 0 && pkt_buf->size == 1) {
+               pkt_buf->wire[1] = msg[0];
+               nbytes = msg_size(pkt_buf->wire);
+               len -= 1;
+               msg += 1;
+               /* Cut off fragment length and start reading DNS message. */
                pkt_buf->size = 0;
-               /* Cut off message length */
                task->bytes_remaining = nbytes;
-               len -= 2;
-               msg += 2;
        }
        /* Message is too long, can't process it. */
-       if (len > pkt_buf->max_size - pkt_buf->size) {
+       ssize_t to_read = MIN(len, task->bytes_remaining);
+       if (to_read > (ssize_t)(pkt_buf->max_size - pkt_buf->size)) {
+               pkt_buf->size = 0;
                task->bytes_remaining = 0;
-               return worker_exec(worker, handle, NULL, NULL);
+               return kr_error(EMSGSIZE);
        }
        /* Buffer message and check if it's complete */
-       memcpy(pkt_buf->wire + pkt_buf->size, msg, len);
-       pkt_buf->size += len;
-       if (len >= task->bytes_remaining) {
+       memcpy(pkt_buf->wire + pkt_buf->size, msg, to_read);
+       pkt_buf->size += to_read;
+       if (to_read >= task->bytes_remaining) {
                task->bytes_remaining = 0;
-               return worker_exec(worker, handle, pkt_buf, NULL);
+               /* Parse the packet and start resolving complete query */
+               int ret = parse_packet(pkt_buf);
+               if (ret == 0 && !session->is_subreq) {
+                       ret = qr_task_start(task, pkt_buf);
+                       if (ret != 0) {
+                               return ret;
+                       }
+                       ret = qr_task_register(task, session);
+                       if (ret != 0) {
+                               return ret;
+                       }
+                       /* Task is now registered in session, clear temporary. */
+                       session->buffering = NULL;
+                       submitted += 1;
+               }
+               /* Start only new queries, not subrequests that are already pending */
+               if (ret == 0) {
+                       ret = qr_task_step(task, NULL, pkt_buf);
+               }
+               /* Process next message part in the stream if no error so far */
+               if (ret != 0) {
+                       return ret;
+               }
+               if (len - to_read > 0 && !session->is_subreq) {
+                       ret = worker_process_tcp(worker, handle, msg + to_read, len - to_read);
+                       if (ret < 0) {
+                               return ret;
+                       }
+                       submitted += ret;
+               }
+       } else {
+               task->bytes_remaining -= to_read;       
        }
-       /* Return number of bytes remaining to receive. */
-       task->bytes_remaining -= len;
-       return task->bytes_remaining;
+       return submitted;
 }
 
 int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query, unsigned options, worker_cb_t on_complete, void *baton)
@@ -784,26 +929,36 @@ int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query, unsigned option
        }
 
        /* Create task */
-       struct qr_task *task = qr_task_create(worker, NULL, query, NULL);
+       struct qr_task *task = qr_task_create(worker, NULL, NULL);
        if (!task) {
                return kr_error(ENOMEM);
        }
        task->baton = baton;
        task->on_complete = on_complete;
        task->req.options |= options;
+       /* Start task */
+       int ret = qr_task_start(task, query);
+       if (ret != 0) {
+               qr_task_unref(task);
+               return ret;
+       }
        return qr_task_step(task, NULL, query);
 }
 
 int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen)
 {
-       array_init(worker->pools);
-       array_init(worker->ioreqs);
-       if (array_reserve(worker->pools, ring_maxlen) || array_reserve(worker->ioreqs, ring_maxlen))
+       array_init(worker->pool_mp);
+       array_init(worker->pool_ioreq);
+       array_init(worker->pool_sessions);
+       if (array_reserve(worker->pool_mp, ring_maxlen) ||
+               array_reserve(worker->pool_ioreq, ring_maxlen) ||
+               array_reserve(worker->pool_sessions, ring_maxlen))
                return kr_error(ENOMEM);
        memset(&worker->pkt_pool, 0, sizeof(worker->pkt_pool));
        worker->pkt_pool.ctx = mp_new (4 * sizeof(knot_pkt_t));
        worker->pkt_pool.alloc = (knot_mm_alloc_t) mp_alloc;
        worker->outstanding = map_make();
+       worker->tcp_pipeline_max = MAX_PIPELINED;
        return kr_ok();
 }
 
@@ -817,8 +972,9 @@ int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen)
 
 void worker_reclaim(struct worker_ctx *worker)
 {
-       reclaim_freelist(worker->pools, struct mempool, mp_delete);
-       reclaim_freelist(worker->ioreqs, struct ioreq, free);
+       reclaim_freelist(worker->pool_mp, struct mempool, mp_delete);
+       reclaim_freelist(worker->pool_ioreq, struct req, free);
+       reclaim_freelist(worker->pool_sessions, struct session, session_free);
        mp_delete(worker->pkt_pool.ctx);
        worker->pkt_pool.ctx = NULL;
        map_clear(&worker->outstanding);
index c598c7ba6cb8b8bb222877a41e6de92e4b8cb375..aeda6a68b9c5a760f03a5421783eedf35fbe18b9 100644 (file)
 #include "lib/generic/array.h"
 #include "lib/generic/map.h"
 
+/** @internal Number of request within timeout window. */
+#define MAX_PENDING (KR_NSREP_MAXADDR + (KR_NSREP_MAXADDR / 2))
+
 /** @cond internal Freelist of available mempools. */
 typedef array_t(void *) mp_freelist_t;
-/* @endcond */
 
 /**
  * Query resolution worker.
@@ -32,6 +34,7 @@ struct worker_ctx {
        uv_loop_t *loop;
        int id;
        int count;
+       unsigned tcp_pipeline_max;
 #if __linux__
        uint8_t wire_buf[RECVMMSG_BATCH * KNOT_WIRE_MAX_PKTSIZE];
 #else
@@ -48,27 +51,67 @@ struct worker_ctx {
                size_t timeout;
        } stats;
        map_t outstanding;
-       mp_freelist_t pools;
-       mp_freelist_t ioreqs;
+       mp_freelist_t pool_mp;
+       mp_freelist_t pool_ioreq;
+       mp_freelist_t pool_sessions;
        knot_mm_t pkt_pool;
 };
 
 /* Worker callback */
 typedef void (*worker_cb_t)(struct worker_ctx *worker, struct kr_request *req, void *baton);
 
+/** @internal Query resolution task. */
+struct qr_task
+{
+       struct kr_request req;
+       struct worker_ctx *worker;
+       struct session *session;
+       knot_pkt_t *pktbuf;
+       array_t(struct qr_task *) waiting;
+       uv_handle_t *pending[MAX_PENDING];
+       uint16_t pending_count;
+       uint16_t addrlist_count;
+       uint16_t addrlist_turn;
+       uint16_t timeouts;
+       uint16_t iter_count;
+       uint16_t bytes_remaining;
+       struct sockaddr *addrlist;
+       uv_timer_t *timeout;
+       worker_cb_t on_complete;
+       void *baton;
+       struct {
+               union {
+                       struct sockaddr_in ip4;
+                       struct sockaddr_in6 ip6;
+               } addr;
+               uv_handle_t *handle;
+       } source;
+       uint32_t refs;
+       bool finished : 1;
+       bool leading  : 1;
+};
+/* @endcond */
+
 /**
  * Process incoming packet (query or answer to subrequest).
  * @return 0 or an error code
  */
-int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr* addr);
+int worker_submit(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr* addr);
 
 /**
- * Process incoming DNS/TCP message fragment.
+ * Process incoming DNS/TCP message fragment(s).
  * If the fragment contains only a partial message, it is buffered.
  * If the fragment contains a complete query or completes current fragment, execute it.
- * @return 0, number of bytes remaining to assemble, or an error code
+ * @return 0 or an error code
  */
-int worker_process_tcp(struct worker_ctx *worker, uv_handle_t *handle, const uint8_t *msg, size_t len);
+int worker_process_tcp(struct worker_ctx *worker, uv_handle_t *handle, const uint8_t *msg, ssize_t len);
+
+/**
+ * End current DNS/TCP session, this disassociates pending tasks from this session
+ * which may be freely closed afterwards.
+ */
+int worker_end_tcp(struct worker_ctx *worker, uv_handle_t *handle);
+
 
 /**
  * Schedule query for resolution.
index aeda472b959d4c7d378eb6f83db18ddf8fd6661c..8d9796c80ca2f3645121c9935b672592d93ecaa0 100644 (file)
@@ -54,6 +54,7 @@ static inline int __attribute__((__cold__)) kr_error(int x) {
 #define KR_CONN_RETRY 300    /* Retry interval for network activity */
 #define KR_ITER_LIMIT 50     /* Built-in iterator limit */
 #define KR_CNAME_CHAIN_LIMIT 40 /* Built-in maximum CNAME chain length */
+#define KR_TIMEOUT_LIMIT 4   /* Maximum number of retries after timeout. */
 
 /*
  * Defines.
index 8ef2d1c39ba0523f5395f425c1ee3d712573323d..b3de6e7eddbe96520cb4168623c677fd8769195b 100644 (file)
@@ -114,6 +114,7 @@ void kr_zonecut_deinit(struct kr_zonecut *cut)
        map_clear(&cut->nsset);
        knot_rrset_free(&cut->key, cut->pool);
        knot_rrset_free(&cut->trust_anchor, cut->pool);
+       cut->name = NULL;
 }
 
 void kr_zonecut_set(struct kr_zonecut *cut, const knot_dname_t *name)
index dbb0efeef794adebd75d044b602f8e01ae3a3e56..c971de628f3d92d2442d65b5d4afbdf4969df828 100644 (file)
@@ -10,8 +10,9 @@ deckard_DIR := tests/deckard
 TESTS := sets/resolver
 TEMPLATE := template/kresd.j2
 $(deckard_DIR)/Makefile:
-       @git submodule update --init
+       @git submodule update --init --recursive
 check-integration: $(deckard_DIR)/Makefile
+       @[ ! -d $(deckard_DIR)/contrib/libswrap/obj ] && mkdir $(deckard_DIR)/contrib/libswrap/obj
        @$(MAKE) -s -C $(deckard_DIR) TESTS=$(TESTS) DAEMON=$(abspath daemon/kresd) TEMPLATE=$(TEMPLATE) DYLD_LIBRARY_PATH=$(DYLD_LIBRARY_PATH)
 deckard: check-integration