]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
Merge remote-tracking branch 'origin/master' into query-trace
authorMarek Vavruša <marek@vavrusa.com>
Mon, 8 Jan 2018 21:32:21 +0000 (13:32 -0800)
committerMarek Vavruša <marek@vavrusa.com>
Mon, 8 Jan 2018 21:52:19 +0000 (13:52 -0800)
14 files changed:
1  2 
daemon/bindings.c
daemon/io.c
daemon/lua/kres-gen.lua
daemon/lua/kres-gen.sh
daemon/lua/kres.lua
daemon/main.c
daemon/worker.c
daemon/worker.h
lib/layer/iterate.c
lib/resolve.h
lib/utils.c
lib/utils.h
modules/http/http_trace.lua
tests/config/test.cfg

index f91f54d1b14286562ce74fb2d51cc0e19ae1e88f,9da34be011843bd67afe988f75d101194b7e4be9..42aee6020b271c6b35acee94efa82522d6677e7e
@@@ -1240,27 -1431,18 +1423,27 @@@ static int wrk_resolve(lua_State *L
                knot_wire_set_cd(pkt->wire);
        }
  
 +      /* Create task and start with a first question */
 +      struct qr_task *task = worker_resolve_start(worker, pkt, *options);
 +      if (!task) {
 +              knot_rrset_free(&pkt->opt_rr, NULL);
 +              knot_pkt_free(&pkt);
 +              lua_pushstring(L, "couldn't create a resolution request");
 +              lua_error(L);
 +      }
 +
 +      /* Add initialisation callback */
        if (lua_isfunction(L, 5)) {
 -              /* Store callback in registry */
                lua_pushvalue(L, 5);
-               lua_pushlightuserdata(L, &task->req);
 -              int cb = luaL_ref(L, LUA_REGISTRYINDEX);
 -              ret = worker_resolve(worker, pkt, *options, resolve_callback, (void *) (intptr_t)cb);
 -      } else {
 -              ret = worker_resolve(worker, pkt, *options, NULL, NULL);
++              lua_pushlightuserdata(L, worker_task_request(task));
 +              (void) execute_callback(L, 1);
        }
 -      
 +
 +      /* Start execution */
 +      int ret = worker_resolve_exec(task, pkt);
 +      lua_pushboolean(L, ret == 0);
        knot_rrset_free(&pkt->opt_rr, NULL);
        knot_pkt_free(&pkt);
 -      lua_pushboolean(L, ret == 0);
        return 1;
  }
  
diff --cc daemon/io.c
index 5ebd3ac87f3d11be134219c691c2d06f0fe7111c,0cfe5b3c5de4b2d92053fb0b93ef2700c8814fe4..d8a6efb9e1b701fb90ca2dd76d6765e3794320e6
@@@ -101,10 -108,21 +108,11 @@@ static void session_release(struct work
        }
  }
  
--static uv_stream_t *handle_alloc(uv_loop_t *loop)
 -{
 -      uv_stream_t *handle = calloc(1, sizeof(uv_handles_t));
 -      if (!handle) {
 -              return NULL;
 -      }
 -
 -      return handle;
 -}
 -
+ static uv_stream_t *handle_borrow(uv_loop_t *loop)
  {
-       uv_stream_t *handle = calloc(1, sizeof(*handle));
-       if (!handle) {
+       struct worker_ctx *worker = loop->data;
+       void *req = worker_iohandle_borrow(worker);
+       if (!req) {
                return NULL;
        }
  
@@@ -189,16 -216,12 +206,11 @@@ int udp_bindfd(uv_udp_t *handle, int fd
        return udp_bind_finalize((uv_handle_t *)handle);
  }
  
- static void tcp_timeout(uv_handle_t *timer)
- {
-       uv_handle_t *handle = timer->data;
-       uv_close(handle, io_free);
- }
  static void tcp_timeout_trigger(uv_timer_t *timer)
  {
-       uv_handle_t *handle = timer->data;
-       struct session *session = handle->data;
+       struct session *session = timer->data;
 -      struct worker_ctx *worker = timer->loop->data;
+       assert(session->outgoing == false);
        if (session->tasks.len > 0) {
                uv_timer_again(timer);
        } else {
Simple merge
Simple merge
index 99f9ffd85ada0d5c8492814f03491ab25465621e,216911975f5ca84f06aa948b6c281d2001415c32..dd4e567c2f2504a2ca6a39d6f4b98c4eea32415b
@@@ -151,26 -181,13 +181,27 @@@ local const_rcode = 
        BADVERS    = 16,
        BADCOOKIE  = 23,
  }
 +-- This corresponds to `enum kr_rank`, it's not possible to do this without introspection unfortunately
 +local const_rank = {
 +      INITIAL = 0,
 +      OMIT = 1,
 +      TRY = 2,
 +      INDET = 4,
 +      BOGUS = 5,
 +      MISMATCH = 6,
 +      MISSING = 7,
 +      INSECURE = 8,
 +      AUTH = 16,
 +      SECURE = 32
 +}
  
- -- Create inverse table
- local const_rank_tostring = {}
- for k, v in pairs(const_rank) do
-       const_rank_tostring[v] = k
- end
+ -- Constant tables
+ local const_class_str = itable(const_class)
+ local const_type_str = itable(const_type)
+ local const_rcode_str = itable(const_rcode)
+ local const_opcode_str = itable(const_opcode)
+ local const_section_str = itable(const_section)
++local const_rank_str = itable(const_rank)
  
  -- Metatype for RR types to allow anonymous types
  setmetatable(const_type, {
@@@ -204,9 -242,8 +256,9 @@@ local knot_rrset_t = ffi.typeof('knot_r
  ffi.metatype( knot_rrset_t, {
        -- beware: `owner` and `rdata` are typed as a plain lua strings
        --         and not the real types they represent.
 +      __tostring = function(rr) return rr:txt_dump() end,
        __index = {
-               owner = function(rr) return ffi.string(rr._owner, knot.knot_dname_size(rr._owner)) end,
+               owner = function(rr) return dname2wire(rr._owner) end,
                ttl = function(rr) return tonumber(knot.knot_rrset_ttl(rr)) end,
                rdata = function(rr, i)
                        local rdata = knot.knot_rdataset_at(rr.rrs, i)
@@@ -339,35 -529,6 +544,30 @@@ ffi.metatype( kr_request_t, 
        },
  })
  
- -- Pretty print for domain name
- local function dname2str(dname)
-       return ffi.string(ffi.gc(C.knot_dname_to_str(nil, dname, 0), C.free))
- end
 +-- C array iterator
 +local function c_array_iter(t, i)
 +      i = i + 1
 +      if i >= t.len then return end
 +      return i, t.at[i][0]
 +end
 +
 +-- Metatype for ranked record array
 +local ranked_rr_array_t = ffi.typeof('ranked_rr_array_t')
 +ffi.metatype(ranked_rr_array_t, {
 +      __len = function(self)
 +              return tonumber(self.len)
 +      end,
 +      __ipairs = function (self)
 +              return c_array_iter, self, -1
 +      end,
 +      __index = {
 +              get = function (self, i)
 +                      if i < 0 or i > self.len then return nil end
 +                      return self.at[i][0]
 +              end,
 +      }
 +})
 +
  -- Pretty-print a single RR (which is a table with .owner .ttl .type .rdata)
  -- Extension: append .comment if exists.
  local function rr2str(rr, style)
@@@ -401,8 -562,16 +601,18 @@@ kres = 
        type = const_type,
        section = const_section,
        rcode = const_rcode,
-       rank_tostring = const_rank_tostring,
+       opcode = const_opcode,
 +      rank = const_rank,
+       -- Constants to strings
+       tostring = {
+               class = const_class_str,
+               type = const_type_str,
+               section = const_section_str,
+               rcode = const_rcode_str,
+               opcode = const_opcode_str,
++              rank = const_rank_str,
+       },
  
        -- Create a struct kr_qflags from a single flag name or a list of names.
        mk_qflags = function (names)
diff --cc daemon/main.c
Simple merge
diff --cc daemon/worker.c
index 44e9d0932c80a9cf07d5fe371044c172faf581e7,522e5803b66f76e1367e301bc7f298b24bf96f6b..fbac669e1f2ea2fcc098e09aff4467001dfb5be0
  
  #define VERBOSE_MSG(qry, fmt...) QRVERBOSE(qry, "wrkr", fmt)
  
- /* @internal Union of various libuv objects for freelist. */
- struct req
- {
-       union {
-               /* Socket handles, these have session as their `handle->data` and own it. */
-               uv_udp_t      udp;
-               uv_tcp_t      tcp;
-               /* I/O events, these have only a reference to the task they're operating on. */
-               uv_udp_send_t send;
-               uv_write_t    write;
-               uv_connect_t  connect;
-               /* Timer events */
-               uv_timer_t    timer;
-       } as;
+ /** Client request state. */
+ struct request_ctx
+ {
+       struct kr_request req;
+       struct {
+               union inaddr addr;
+               union inaddr dst_addr;
+               /* uv_handle_t *handle; */
+               /** NULL if the request didn't come over network. */
+               struct session *session;
+       } source;
+       struct worker_ctx *worker;
+       qr_tasklist_t tasks;
+ };
+ /** Query resolution task. */
+ struct qr_task
+ {
+       struct request_ctx *ctx;
+       knot_pkt_t *pktbuf;
+       qr_tasklist_t waiting;
+       uv_handle_t *pending[MAX_PENDING];
+       uint16_t pending_count;
+       uint16_t addrlist_count;
+       uint16_t addrlist_turn;
+       uint16_t timeouts;
+       uint16_t iter_count;
+       uint16_t bytes_remaining;
+       struct sockaddr *addrlist;
 -      worker_cb_t on_complete;
 -      void *baton;
+       uint32_t refs;
+       bool finished : 1;
+       bool leading  : 1;
  };
  
  /* Convenience macros */
  #define qr_task_ref(task) \
        do { ++(task)->refs; } while(0)
  #define qr_task_unref(task) \
 -      do { if (--(task)->refs == 0) { qr_task_free(task); } } while (0)
 +      do { if (task && --(task)->refs == 0) { qr_task_free(task); } } while (0)
  #define qr_valid_handle(task, checked) \
-       (!uv_is_closing((checked)) || (task)->source.handle == (checked))
+       (!uv_is_closing((checked)) || (task)->ctx->source.session->handle == (checked))
+ /** @internal get key for tcp session
+  *  @note kr_straddr() return pointer to static string
+  */
+ #define tcpsess_key(addr) kr_straddr(addr)
  
  /* Forward decls */
  static void qr_task_free(struct qr_task *task);
@@@ -71,26 -129,71 +127,69 @@@ static inline struct worker_ctx *get_wo
        return uv_default_loop()->data;
  }
  
- static inline struct req *req_borrow(struct worker_ctx *worker)
+ static inline void *iohandle_borrow(struct worker_ctx *worker)
+ {
+       void *h = NULL;
+       const size_t size = sizeof(uv_handles_t);
+       if (worker->pool_iohandles.len > 0) {
+               h = array_tail(worker->pool_iohandles);
+               array_pop(worker->pool_iohandles);
+               kr_asan_unpoison(h, size);
+       } else {
+               h = malloc(size);
+       }
+       return h;
+ }
+ static inline void iohandle_release(struct worker_ctx *worker, void *h)
+ {
+       assert(h);
 -      const size_t size = sizeof(uv_handles_t);
+       if (worker->pool_iohandles.len < MP_FREELIST_SIZE) {
+               array_push(worker->pool_iohandles, h);
 -              kr_asan_poison(h, size);
++              kr_asan_poison(h, sizeof(uv_handles_t));
+       } else {
+               free(h);
+       }
+ }
+ void *worker_iohandle_borrow(struct worker_ctx *worker)
+ {
+       return iohandle_borrow(worker);
+ }
+ void worker_iohandle_release(struct worker_ctx *worker, void *h)
+ {
+       iohandle_release(worker, h);
+ }
+ static inline void *iorequest_borrow(struct worker_ctx *worker)
  {
-       struct req *req = NULL;
-       if (worker->pool_ioreq.len > 0) {
-               req = array_tail(worker->pool_ioreq);
-               array_pop(worker->pool_ioreq);
-               kr_asan_unpoison(req, sizeof(*req));
+       void *r = NULL;
+       const size_t size = sizeof(uv_reqs_t);
+       if (worker->pool_ioreqs.len > 0) {
+               r = array_tail(worker->pool_ioreqs);
+               array_pop(worker->pool_ioreqs);
+               kr_asan_unpoison(r, size);
        } else {
-               req = malloc(sizeof(*req));
+               r = malloc(size);
        }
-       return req;
+       return r;
  }
  
- static inline void req_release(struct worker_ctx *worker, struct req *req)
+ static inline void iorequest_release(struct worker_ctx *worker, void *r)
  {
-       if (!req || worker->pool_ioreq.len < 4 * MP_FREELIST_SIZE) {
-               array_push(worker->pool_ioreq, req);
-               kr_asan_poison(req, sizeof(*req));
+       assert(r);
 -      const size_t size = sizeof(uv_reqs_t);
+       if (worker->pool_ioreqs.len < MP_FREELIST_SIZE) {
+               array_push(worker->pool_ioreqs, r);
 -              kr_asan_poison(r, size);
++              kr_asan_poison(r, sizeof(uv_reqs_t));
        } else {
-               free(req);
+               free(r);
        }
  }
  
@@@ -250,38 -505,30 +501,31 @@@ static struct request_ctx *request_crea
                .alloc = (knot_mm_alloc_t) mp_alloc
        };
  
-       /* Create resolution task */
-       struct qr_task *task = mm_alloc(&pool, sizeof(*task));
-       if (!task) {
-               mp_delete(pool.ctx);
+       /* Create request context */
+       struct request_ctx *ctx = mm_alloc(&pool, sizeof(*ctx));
+       if (!ctx) {
+               pool_release(worker, pool.ctx);
                return NULL;
        }
-       memset(&task->req, 0, sizeof(task->req));
 +
-       /* Create packet buffers for answer and subrequests */
-       task->req.pool = pool;
-       knot_pkt_t *pktbuf = knot_pkt_new(NULL, pktbuf_max, &task->req.pool);
-       if (!pktbuf) {
-               mp_delete(pool.ctx);
-               return NULL;
+       memset(ctx, 0, sizeof(*ctx));
+       /* TODO Relocate pool to struct request */
+       ctx->worker = worker;
+       array_init(ctx->tasks);
+       struct session *session = handle ? handle->data : NULL;
+       if (session) {
+               assert(session->outgoing == false);
        }
-       pktbuf->size = 0;
-       task->pktbuf = pktbuf;
-       array_init(task->waiting);
-       task->addrlist = NULL;
-       task->pending_count = 0;
-       task->bytes_remaining = 0;
-       task->iter_count = 0;
-       task->timeouts = 0;
-       task->refs = 1;
-       task->finished = false;
-       task->leading = false;
-       task->worker = worker;
-       task->session = NULL;
-       task->source.handle = handle;
-       task->timeout = NULL;
+       ctx->source.session = session;
+       struct kr_request *req = &ctx->req;
+       req->pool = pool;
        /* Remember query source addr */
-       if (addr) {
+       if (!addr || (addr->sa_family != AF_INET && addr->sa_family != AF_INET6)) {
+               ctx->source.addr.ip.sa_family = AF_UNSPEC;
+       } else {
                size_t addr_len = sizeof(struct sockaddr_in);
                if (addr->sa_family == AF_INET6)
                        addr_len = sizeof(struct sockaddr_in6);
@@@ -414,12 -756,24 +753,22 @@@ static int qr_task_register(struct qr_t
  
  static void qr_task_complete(struct qr_task *task)
  {
 -      struct worker_ctx *worker = ctx->worker;
+       struct request_ctx *ctx = task->ctx;
++
        /* Kill pending I/O requests */
-       ioreq_killall(task);
+       ioreq_kill_pending(task);
        assert(task->waiting.len == 0);
        assert(task->leading == false);
 -      /* Run the completion callback. */
 -      if (task->on_complete) {
 -              task->on_complete(worker, &ctx->req, task->baton);
 -      }
++
+       struct session *source_session = ctx->source.session;
+       if (source_session) {
+               assert(source_session->outgoing == false &&
+                      source_session->waiting.len == 0);
+               session_del_tasks(source_session, task);
+       }
++
        /* Release primary reference to task. */
-       qr_task_unref(task);
+       request_del_tasks(ctx, task);
  }
  
  /* This is called when we send subrequest / answer */
@@@ -504,64 -944,283 +939,299 @@@ static int qr_task_send(struct qr_task 
                        return ret;
                }
        }
-       /* Send using given protocol */
-       if (handle->type == UV_UDP) {
-               uv_buf_t buf = { (char *)pkt->wire, pkt->size };
-               send_req->as.send.data = task;
-               ret = uv_udp_send(&send_req->as.send, (uv_udp_t *)handle, &buf, 1, addr, &on_send);
-       } else {
-               uint16_t pkt_size = htons(pkt->size);
-               uv_buf_t buf[2] = {
-                       { (char *)&pkt_size, sizeof(pkt_size) },
-                       { (char *)pkt->wire, pkt->size }
-               };
-               send_req->as.write.data = task;
-               ret = uv_write(&send_req->as.write, (uv_stream_t *)handle, buf, 2, &on_write);
-       }
-       if (ret == 0) {
-               qr_task_ref(task); /* Pending ioreq on current task */
-       } else {
-               req_release(task->worker, send_req);
+       /* Send using given protocol */
+       if (handle->type == UV_UDP) {
+               uv_udp_send_t *send_req = (uv_udp_send_t *)ioreq;
+               uv_buf_t buf = { (char *)pkt->wire, pkt->size };
+               send_req->data = task;
+               ret = uv_udp_send(send_req, (uv_udp_t *)handle, &buf, 1, addr, &on_send);
+       } else if (handle->type == UV_TCP) {
+               uv_write_t *write_req = (uv_write_t *)ioreq;
+               uint16_t pkt_size = htons(pkt->size);
+               uv_buf_t buf[2] = {
+                       { (char *)&pkt_size, sizeof(pkt_size) },
+                       { (char *)pkt->wire, pkt->size }
+               };
+               write_req->data = task;
+               ret = uv_write(write_req, (uv_stream_t *)handle, buf, 2, &on_write);
+       } else {
+               assert(false);
+       }
+       if (ret == 0) {
+               qr_task_ref(task); /* Pending ioreq on current task */
+               if (worker->too_many_open &&
+                   worker->stats.rconcurrent <
+                       worker->rconcurrent_highwatermark - 10) {
+                       worker->too_many_open = false;
+               }
+       } else {
+               iorequest_release(worker, ioreq);
+               if (ret == UV_EMFILE) {
+                       worker->too_many_open = true;
+                       worker->rconcurrent_highwatermark = worker->stats.rconcurrent;
+               }
+       }
+       /* Update statistics */
+       if (ctx->source.session &&
+           handle != ctx->source.session->handle &&
+           addr) {
+               if (handle->type == UV_UDP)
+                       worker->stats.udp += 1;
+               else
+                       worker->stats.tcp += 1;
+               if (addr->sa_family == AF_INET6)
+                       worker->stats.ipv6 += 1;
+               else if (addr->sa_family == AF_INET)
+                       worker->stats.ipv4 += 1;
+       }
+       return ret;
+ }
+ static int session_next_waiting_send(struct session *session)
+ {
+       union inaddr *peer = &session->peer;
+       int ret = kr_ok();
+       if (session->waiting.len > 0) {
+               struct qr_task *task = session->waiting.at[0];
+               ret = qr_task_send(task, session->handle, &peer->ip, task->pktbuf);
+       }
+       session->timeout.data = session;
+       timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0);
+       return ret;
+ }
+ static int session_tls_hs_cb(struct session *session, int status)
+ {
+       VERBOSE_MSG(NULL, "=> server: '%s' TLS handshake has %s\n",
+                   kr_straddr(&session->peer.ip), status ? "failed" : "completed");
+       struct worker_ctx *worker = get_worker();
+       union inaddr *peer = &session->peer;
+       int deletion_res = worker_del_tcp_waiting(worker, &peer->ip);
+       if (status) {
+               for (size_t i = 0; i < session->waiting.len; ++i) {
+                       struct qr_task *task = session->waiting.at[0];
+                       struct kr_query *qry = array_tail(task->ctx->req.rplan.pending);
+                       kr_nsrep_update_rtt(&qry->ns, &peer->ip, KR_NS_TIMEOUT,
+                                           worker->engine->resolver.cache_rtt, KR_NS_UPDATE);
+               }
+       } else {
+               if (deletion_res != 0) {
+                       /* session isn't in list of waiting queries, *
+                        * something gone wrong */
+                       while (session->waiting.len > 0) {
+                               struct qr_task *task = session->waiting.at[0];
+                               session_del_tasks(session, task);
+                               array_del(session->waiting, 0);
+                               qr_task_finalize(task, KR_STATE_FAIL);
+                               qr_task_unref(task);
+                       }
+                       assert(session->tasks.len == 0);
+                       session_close(session);
+                       return kr_ok();
+               }
+               int ret = session_next_waiting_send(session);
+               if (ret == kr_ok()) {
+                       struct worker_ctx *worker = get_worker();
+                       union inaddr *peer = &session->peer;
+                       int ret = worker_add_tcp_connected(worker, &peer->ip, session);
+                       assert(ret == 0);
+               }
+       }
+       return kr_ok();
+ }
++static struct kr_query *session_current_query(struct session *session)
++{
++      if (session->waiting.len == 0) {
++              return NULL;
++      }
++
++      struct qr_task *task = session->waiting.at[0];
++      if (task->ctx->req.rplan.pending.len == 0) {
++              return NULL;
++      }
++
++      return array_tail(task->ctx->req.rplan.pending);
++}
++
+ static void on_connect(uv_connect_t *req, int status)
+ {
+       struct worker_ctx *worker = get_worker();
+       uv_stream_t *handle = req->handle;
+       struct session *session = handle->data;
+       union inaddr *peer = &session->peer;
+       uv_timer_stop((uv_timer_t *)&session->timeout);
+       if (status == UV_ECANCELED) {
+               worker_del_tcp_waiting(worker, &peer->ip);
+               assert(session->closing && session->waiting.len == 0 && session->tasks.len == 0);
+               iorequest_release(worker, req);
+               return;
+       }
+       if (session->closing) {
+               worker_del_tcp_waiting(worker, &peer->ip);
+               assert(session->waiting.len == 0 && session->tasks.len == 0);
+               iorequest_release(worker, req);
+               return;
+       }
+       if (status != 0) {
+               worker_del_tcp_waiting(worker, &peer->ip);
+               while (session->waiting.len > 0) {
+                       struct qr_task *task = session->waiting.at[0];
+                       session_del_tasks(session, task);
+                       array_del(session->waiting, 0);
+                       assert(task->refs > 1);
+                       qr_task_unref(task);
+                       qr_task_step(task, NULL, NULL);
+               }
+               assert(session->tasks.len == 0);
+               iorequest_release(worker, req);
+               session_close(session);
+               return;
+       }
+       if (!session->has_tls) {
+               /* if there is a TLS, session still waiting for handshake,
+                * otherwise remove it from waiting list */
+               if (worker_del_tcp_waiting(worker, &peer->ip) != 0) {
+                       /* session isn't in list of waiting queries, *
+                        * something gone wrong */
+                       while (session->waiting.len > 0) {
+                               struct qr_task *task = session->waiting.at[0];
+                               session_del_tasks(session, task);
+                               array_del(session->waiting, 0);
+                               qr_task_finalize(task, KR_STATE_FAIL);
+                               qr_task_unref(task);
+                       }
+                       assert(session->tasks.len == 0);
+                       iorequest_release(worker, req);
+                       session_close(session);
+                       return;
+               }
+       }
 -      WITH_VERBOSE {
++      struct kr_query *qry = session_current_query(session);
++      WITH_VERBOSE (qry) {
+               char addr_str[INET6_ADDRSTRLEN];
+               inet_ntop(session->peer.ip.sa_family, kr_inaddr(&session->peer.ip),
+                         addr_str, sizeof(addr_str));
 -              VERBOSE_MSG(NULL, "=> connected to '%s'\n", addr_str);
++              VERBOSE_MSG(qry, "=> connected to '%s'\n", addr_str);
+       }
+       session->connected = true;
+       session->handle = (uv_handle_t *)handle;
+       int ret = kr_ok();
+       if (session->has_tls) {
+               ret = tls_client_connect_start(session->tls_client_ctx,
+                                              session, session_tls_hs_cb);
+               if (ret == kr_error(EAGAIN)) {
+                       iorequest_release(worker, req);
+                       io_start_read(session->handle);
+                       return;
+               }
+       }
+       if (ret == kr_ok()) {
+               ret = session_next_waiting_send(session);
+               if (ret == kr_ok()) {
+                       worker_add_tcp_connected(worker, &session->peer.ip, session);
+                       iorequest_release(worker, req);
+                       return;
+               }
        }
  
-       /* Update statistics */
-       if (handle != task->source.handle && addr) {
-               if (handle->type == UV_UDP)
-                       task->worker->stats.udp += 1;
-               else
-                       task->worker->stats.tcp += 1;
-               if (addr->sa_family == AF_INET6)
-                       task->worker->stats.ipv6 += 1;
-               else
-                       task->worker->stats.ipv4 += 1;
+       while (session->waiting.len > 0) {
+               struct qr_task *task = session->waiting.at[0];
+               session_del_tasks(session, task);
+               array_del(session->waiting, 0);
+               qr_task_finalize(task, KR_STATE_FAIL);
+               qr_task_unref(task);
        }
-       return ret;
+       assert(session->tasks.len == 0);
+       iorequest_release(worker, req);
+       session_close(session);
  }
  
- static void on_connect(uv_connect_t *req, int status)
+ static void on_tcp_connect_timeout(uv_timer_t *timer)
  {
+       struct session *session = timer->data;
+       uv_timer_stop(timer);
        struct worker_ctx *worker = get_worker();
-       struct qr_task *task = req->data;
-       uv_stream_t *handle = req->handle;
-       if (qr_valid_handle(task, (uv_handle_t *)req->handle)) {
-               if (status == 0) {
-                       struct sockaddr_storage addr;
-                       int addr_len = sizeof(addr);
-                       uv_tcp_getpeername((uv_tcp_t *)handle, (struct sockaddr *)&addr, &addr_len);
-                       qr_task_send(task, (uv_handle_t *)handle, (struct sockaddr *)&addr, task->pktbuf);
-               } else {
-                       qr_task_step(task, task->addrlist, NULL);
-               }
+       assert (session->waiting.len == session->tasks.len);
+       union inaddr *peer = &session->peer;
+       worker_del_tcp_waiting(worker, &peer->ip);
 -      WITH_VERBOSE {
++      struct kr_query *qry = session_current_query(session);
++      WITH_VERBOSE (qry) {
+               char addr_str[INET6_ADDRSTRLEN];
+               inet_ntop(peer->ip.sa_family, kr_inaddr(&peer->ip), addr_str, sizeof(addr_str));
 -              VERBOSE_MSG(NULL, "=> connection to '%s' failed\n", addr_str);
++              VERBOSE_MSG(qry, "=> connection to '%s' failed\n", addr_str);
        }
-       qr_task_unref(task);
-       req_release(worker, (struct req *)req);
+       while (session->waiting.len > 0) {
+               struct qr_task *task = session->waiting.at[0];
+               struct request_ctx *ctx = task->ctx;
+               assert(ctx);
+               task->timeouts += 1;
+               worker->stats.timeout += 1;
+               session_del_tasks(session, task);
+               array_del(session->waiting, 0);
+               assert(task->refs > 1);
+               qr_task_unref(task);
+               qr_task_step(task, NULL, NULL);
+       }
+       assert (session->tasks.len == 0);
+       session_close(session);
  }
  
- static void on_timer_close(uv_handle_t *handle)
+ static void on_tcp_watchdog_timeout(uv_timer_t *timer)
  {
-       struct qr_task *task = handle->data;
-       req_release(task->worker, (struct req *)handle);
-       qr_task_unref(task);
+       struct session *session = timer->data;
+       assert(session->outgoing);
+       uv_timer_stop(timer);
+       struct worker_ctx *worker = get_worker();
+       if (session->outgoing) {
+               worker_del_tcp_connected(worker, &session->peer.ip);
+               while (session->waiting.len > 0) {
+                       struct qr_task *task = session->waiting.at[0];
+                       task->timeouts += 1;
+                       worker->stats.timeout += 1;
+                       array_del(session->waiting, 0);
+                       session_del_tasks(session, task);
+                       qr_task_finalize(task, KR_STATE_FAIL);
+                       qr_task_unref(task);
+               }
+       }
+       while (session->tasks.len > 0) {
+               struct qr_task *task = session->tasks.at[0];
+               task->timeouts += 1;
+               worker->stats.timeout += 1;
+               assert(task->refs > 1);
+               array_del(session->tasks, 0);
+               qr_task_finalize(task, KR_STATE_FAIL);
+               qr_task_unref(task);
+       }
+       session_close(session);
  }
  
  /* This is called when I/O timeouts */
@@@ -789,33 -1492,179 +1502,180 @@@ static int qr_task_step(struct qr_task 
                 * @note Only UDP can lead I/O as it doesn't touch 'task->pktbuf' for reassembly.
                 */
                subreq_lead(task);
-       } else {
-               uv_connect_t *conn = (uv_connect_t *)req_borrow(task->worker);
-               if (!conn) {
-                       return qr_task_step(task, NULL, NULL);
+               struct session *session = handle->data;
+               assert(session->handle->type == UV_UDP);
+               ret = timer_start(session, on_retransmit, timeout, 0);
+               /* Start next step with timeout, fatal if can't start a timer. */
+               if (ret != 0) {
+                       subreq_finalize(task, packet_source, packet);
+                       return qr_task_finalize(task, KR_STATE_FAIL);
                }
+       } else {
+               assert (sock_type == SOCK_STREAM);
                const struct sockaddr *addr =
                        packet_source ? packet_source : task->addrlist;
-               uv_handle_t *client = ioreq_spawn(task, sock_type, addr->sa_family);
-               if (!client) {
-                       req_release(task->worker, (struct req *)conn);
-                       return qr_task_step(task, NULL, NULL);
-               }
-               conn->data = task;
-               if (uv_tcp_connect(conn, (uv_tcp_t *)client, addr , on_connect) != 0) {
-                       req_release(task->worker, (struct req *)conn);
-                       return qr_task_step(task, NULL, NULL);
+               if (addr->sa_family == AF_UNSPEC) {
+                       subreq_finalize(task, packet_source, packet);
+                       return qr_task_finalize(task, KR_STATE_FAIL);
                }
-               qr_task_ref(task); /* Connect request borrows task */
-               ret = timer_start(task, on_timeout, KR_CONN_RTT_MAX, 0);
-       }
+               struct session* session = NULL;
+               if ((session = worker_find_tcp_waiting(ctx->worker, addr)) != NULL) {
+                       assert(session->outgoing);
+                       if (session->closing) {
+                               subreq_finalize(task, packet_source, packet);
+                               return qr_task_finalize(task, KR_STATE_FAIL);
+                       }
+                       /* There are waiting tasks.
+                        * It means that connection establishing or data sending
+                        * is coming right now. */
+                       /* Task will be notified in on_connect() or qr_task_on_send(). */
+                       ret = session_add_waiting(session, task);
+                       if (ret < 0) {
+                               subreq_finalize(task, packet_source, packet);
+                               return qr_task_finalize(task, KR_STATE_FAIL);
+                       }
+                       ret = session_add_tasks(session, task);
+                       if (ret < 0) {
+                               session_del_waiting(session, task);
+                               subreq_finalize(task, packet_source, packet);
+                               return qr_task_finalize(task, KR_STATE_FAIL);
+                       }
+               } else if ((session = worker_find_tcp_connected(ctx->worker, addr)) != NULL) {
+                       /* Connection has been already established */
+                       assert(session->outgoing);
+                       if (session->closing) {
+                               session_del_tasks(session, task);
+                               subreq_finalize(task, packet_source, packet);
+                               return qr_task_finalize(task, KR_STATE_FAIL);
+                       }
  
-       /* Start next step with timeout, fatal if can't start a timer. */
-       if (ret != 0) {
-               subreq_finalize(task, packet_source, packet);
-               return qr_task_finalize(task, KR_STATE_FAIL);
+                       if (session->tasks.len >= worker->tcp_pipeline_max) {
+                               session_del_tasks(session, task);
+                               subreq_finalize(task, packet_source, packet);
+                               return qr_task_finalize(task, KR_STATE_FAIL);
+                       }
+                       /* will be removed in qr_task_on_send() */
+                       ret = session_add_waiting(session, task);
+                       if (ret < 0) {
+                               session_del_tasks(session, task);
+                               subreq_finalize(task, packet_source, packet);
+                               return qr_task_finalize(task, KR_STATE_FAIL);
+                       }
+                       ret = session_add_tasks(session, task);
+                       if (ret < 0) {
+                               session_del_waiting(session, task);
+                               session_del_tasks(session, task);
+                               subreq_finalize(task, packet_source, packet);
+                               return qr_task_finalize(task, KR_STATE_FAIL);
+                       }
+                       if (session->waiting.len == 1) {
+                               ret = qr_task_send(task, session->handle,
+                                                  &session->peer.ip, task->pktbuf);
+                               if (ret < 0) {
+                                       session_del_waiting(session, task);
+                                       session_del_tasks(session, task);
+                                       subreq_finalize(task, packet_source, packet);
+                                       return qr_task_finalize(task, KR_STATE_FAIL);
+                               }
+                               ret = timer_start(session, on_tcp_watchdog_timeout,
+                                                 KR_CONN_RTT_MAX, 0);
+                               if (ret < 0) {
+                                       assert(false);
+                                       session_del_waiting(session, task);
+                                       session_del_tasks(session, task);
+                                       subreq_finalize(task, packet_source, packet);
+                                       return qr_task_finalize(task, KR_STATE_FAIL);
+                               }
+                       }
+                       task->pending[task->pending_count] = session->handle;
+                       task->pending_count += 1;
+               } else {
+                       /* Make connection */
+                       uv_connect_t *conn = (uv_connect_t *)iorequest_borrow(ctx->worker);
+                       if (!conn) {
+                               return qr_task_step(task, NULL, NULL);
+                       }
+                       uv_handle_t *client = ioreq_spawn(task, sock_type,
+                                                         addr->sa_family);
+                       if (!client) {
+                               iorequest_release(ctx->worker, conn);
+                               subreq_finalize(task, packet_source, packet);
+                               return qr_task_finalize(task, KR_STATE_FAIL);
+                       }
+                       session = client->data;
+                       ret = worker_add_tcp_waiting(ctx->worker, addr, session);
+                       if (ret < 0) {
+                               session_del_tasks(session, task);
+                               iorequest_release(ctx->worker, conn);
+                               subreq_finalize(task, packet_source, packet);
+                               return qr_task_finalize(task, KR_STATE_FAIL);
+                       }
+                       /* will be removed in qr_task_on_send() */
+                       ret = session_add_waiting(session, task);
+                       if (ret < 0) {
+                               session_del_tasks(session, task);
+                               worker_del_tcp_waiting(ctx->worker, addr);
+                               iorequest_release(ctx->worker, conn);
+                               subreq_finalize(task, packet_source, packet);
+                               return qr_task_finalize(task, KR_STATE_FAIL);
+                       }
+                       /* Check if there must be TLS */
+                       struct engine *engine = ctx->worker->engine;
+                       struct network *net = &engine->net;
+                       const char *key = tcpsess_key(addr);
+                       struct tls_client_paramlist_entry *entry = map_get(&net->tls_client_params, key);
+                       if (entry) {
+                               assert(session->tls_client_ctx == NULL);
+                               struct tls_client_ctx_t *tls_ctx = tls_client_ctx_new(entry);
+                               if (!tls_ctx) {
+                                       session_del_tasks(session, task);
+                                       session_del_waiting(session, task);
+                                       worker_del_tcp_waiting(ctx->worker, addr);
+                                       iorequest_release(ctx->worker, conn);
+                                       subreq_finalize(task, packet_source, packet);
+                                       return qr_task_step(task, NULL, NULL);
+                               }
+                               tls_client_ctx_set_params(tls_ctx, entry, session);
+                               session->tls_client_ctx = tls_ctx;
+                               session->has_tls = true;
+                       }
+                       conn->data = session;
+                       memcpy(&session->peer, addr, sizeof(session->peer));
+                       ret = timer_start(session, on_tcp_connect_timeout,
+                                         KR_CONN_RTT_MAX, 0);
+                       if (ret != 0) {
+                               session_del_tasks(session, task);
+                               session_del_waiting(session, task);
+                               worker_del_tcp_waiting(ctx->worker, addr);
+                               iorequest_release(ctx->worker, conn);
+                               subreq_finalize(task, packet_source, packet);
+                               return qr_task_finalize(task, KR_STATE_FAIL);
+                       }
 -                      WITH_VERBOSE {
++                      struct kr_query *qry = session_current_query(session);
++                      WITH_VERBOSE (qry) {
+                               char addr_str[INET6_ADDRSTRLEN];
+                               inet_ntop(session->peer.ip.sa_family, kr_inaddr(&session->peer.ip), addr_str, sizeof(addr_str));
 -                              VERBOSE_MSG(NULL, "=> connecting to: '%s'\n", addr_str);
++                              VERBOSE_MSG(qry, "=> connecting to: '%s'\n", addr_str);
+                       }
+                       if (uv_tcp_connect(conn, (uv_tcp_t *)client,
+                                          addr , on_connect) != 0) {
+                               uv_timer_stop(&session->timeout);
+                               session_del_tasks(session, task);
+                               session_del_waiting(session, task);
+                               worker_del_tcp_waiting(ctx->worker, addr);
+                               iorequest_release(ctx->worker, conn);
+                               subreq_finalize(task, packet_source, packet);
+                               return qr_task_step(task, NULL, NULL);
+                       }
+               }
        }
-       return 0;
+       return kr_ok();
  }
  
  static int parse_packet(knot_pkt_t *query)
@@@ -917,77 -1889,221 +1900,223 @@@ int worker_process_tcp(struct worker_ct
        if (len <= 0 || !msg) {
                /* If we have pending tasks, we must dissociate them from the
                 * connection so they don't try to access closed and freed handle.
-                * @warning Do not modify task if this is outgoing request as it is shared with originator.
+                * @warning Do not modify task if this is outgoing request
+                * as it is shared with originator.
                 */
-               if (!session->outgoing) {
-                       for (size_t i = 0; i < session->tasks.len; ++i) {
-                               struct qr_task *task = session->tasks.at[i];
-                               task->session = NULL;
-                               task->source.handle = NULL;
 -              WITH_VERBOSE {
++              struct kr_query *qry = session_current_query(session);
++              WITH_VERBOSE (qry) {
+                       char addr_str[INET6_ADDRSTRLEN];
+                       inet_ntop(session->peer.ip.sa_family, kr_inaddr(&session->peer.ip),
+                                 addr_str, sizeof(addr_str));
 -                      VERBOSE_MSG(NULL, "=> connection to '%s' closed by peer\n", addr_str);
++                      VERBOSE_MSG(qry, "=> connection to '%s' closed by peer\n", addr_str);
+               }
+               uv_timer_t *timer = &session->timeout;
+               uv_timer_stop(timer);
+               struct sockaddr *peer = &session->peer.ip;
+               worker_del_tcp_connected(worker, peer);
+               session->connected = false;
+               if (session->tls_client_ctx) {
+                       /* Avoid gnutls_bye() call */
+                       tls_client_set_hs_state(session->tls_client_ctx,
+                                               TLS_HS_NOT_STARTED);
+               }
+               if (session->outgoing && session->buffering) {
+                       session->buffering = NULL;
+               }
+               assert(session->tasks.len >= session->waiting.len);
+               while (session->waiting.len > 0) {
+                       struct qr_task *task = session->waiting.at[0];
+                       array_del(session->waiting, 0);
+                       assert(task->refs > 1);
+                       session_del_tasks(session, task);
+                       if (session->outgoing) {
+                               if (task->ctx->req.options.FORWARD) {
+                                       /* We are in TCP_FORWARD mode.
+                                        * To prevent failing at kr_resolve_consume()
+                                        * qry.flags.TCP must be cleared.
+                                        * TODO - refactoring is needed. */
+                                       struct kr_request *req = &task->ctx->req;
+                                       struct kr_rplan *rplan = &req->rplan;
+                                       struct kr_query *qry = array_tail(rplan->pending);
+                                       qry->flags.TCP = false;
+                               }
+                               qr_task_step(task, NULL, NULL);
+                       } else {
+                               assert(task->ctx->source.session == session);
+                               task->ctx->source.session = NULL;
+                       }
+                       qr_task_unref(task);
+               }
+               while (session->tasks.len > 0) {
+                       struct qr_task *task = session->tasks.at[0];
+                       if (session->outgoing) {
+                               if (task->ctx->req.options.FORWARD) {
+                                       struct kr_request *req = &task->ctx->req;
+                                       struct kr_rplan *rplan = &req->rplan;
+                                       struct kr_query *qry = array_tail(rplan->pending);
+                                       qry->flags.TCP = false;
+                               }
+                               qr_task_step(task, NULL, NULL);
+                       } else {
+                               assert(task->ctx->source.session == session);
+                               task->ctx->source.session = NULL;
                        }
-                       session->tasks.len = 0;
+                       session_del_tasks(session, task);
+               }
+               session_close(session);
+               return kr_ok();
+       }
+       if (session->outgoing) {
+               uv_timer_stop(&session->timeout);
+               timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0);
+       }
+       if (session->bytes_to_skip) {
+               assert(session->buffering == NULL);
+               ssize_t min_len = MIN(session->bytes_to_skip, len);
+               len -= min_len;
+               msg += min_len;
+               session->bytes_to_skip -= min_len;
+               if (len < 0 || session->bytes_to_skip < 0) {
+                       /* Something gone wrong.
+                        * Better kill the connection */
+                       assert(false);
+                       return kr_error(EILSEQ);
+               }
+               if (len == 0) {
+                       return kr_ok();
                }
-               return kr_error(ECONNRESET);
+               assert(session->bytes_to_skip == 0);
        }
  
        int submitted = 0;
        struct qr_task *task = session->buffering;
+       knot_pkt_t *pkt_buf = NULL;
+       if (task) {
+               pkt_buf = task->pktbuf;
+       } else {
+               /* Update DNS header in session->msg_hdr* */
+               assert(session->msg_hdr_idx <= sizeof(session->msg_hdr));
+               ssize_t hdr_amount = sizeof(session->msg_hdr) -
+                                    session->msg_hdr_idx;
+               if (hdr_amount > len) {
+                       hdr_amount = len;
+               }
+               if (hdr_amount > 0) {
+                       memcpy(session->msg_hdr + session->msg_hdr_idx, msg, hdr_amount);
+                       session->msg_hdr_idx += hdr_amount;
+                       len -= hdr_amount;
+                       msg += hdr_amount;
+               }
+               if (len == 0) { /* no data beyond msg_hdr -> not much to do */
+                       return kr_ok();
+               }
+               assert(session->msg_hdr_idx == sizeof(session->msg_hdr));
+               session->msg_hdr_idx = 0;
+               uint16_t msg_size = get_msg_size(session->msg_hdr);
+               uint16_t msg_id = knot_wire_get_id(session->msg_hdr + 2);
+               if (msg_size < KNOT_WIRE_HEADER_SIZE) {
+                       /* better kill the connection; we would probably get out of sync */
+                       uv_timer_t *timer = &session->timeout;
+                       uv_timer_stop(timer);
+                       while (session->waiting.len > 0) {
+                               struct qr_task *task = session->waiting.at[0];
+                               if (session->outgoing) {
+                                       qr_task_finalize(task, KR_STATE_FAIL);
+                               } else {
+                                       assert(task->ctx->source.session == session);
+                                       task->ctx->source.session = NULL;
+                               }
+                               array_del(session->waiting, 0);
+                               session_del_tasks(session, task);
+                               qr_task_unref(task);
+                       }
+                       while (session->tasks.len > 0) {
+                               struct qr_task *task = session->tasks.at[0];
+                               if (session->outgoing) {
+                                       qr_task_finalize(task, KR_STATE_FAIL);
+                               } else {
+                                       assert(task->ctx->source.session == session);
+                                       task->ctx->source.session = NULL;
+                               }
+                               session_del_tasks(session, task);
+                       }
+                       session_close(session);
  
-       /* If this is a new query, create a new task that we can use
-        * to buffer incoming message until it's complete. */
-       if (!session->outgoing) {
-               if (!task) {
-                       /* Get TCP peer name, keep zeroed address if it fails. */
-                       struct sockaddr_storage addr;
-                       memset(&addr, 0, sizeof(addr));
-                       int addr_len = sizeof(addr);
-                       uv_tcp_getpeername((uv_tcp_t *)handle, (struct sockaddr *)&addr, &addr_len);
-                       task = qr_task_create(worker, (uv_handle_t *)handle, (struct sockaddr *)&addr);
+                       return kr_ok();
+               }
+               /* get task */
+               if (!session->outgoing) {
+                       /* This is a new query, create a new task that we can use
+                        * to buffer incoming message until it's complete. */
+                       struct sockaddr *addr = &(session->peer.ip);
+                       assert(addr->sa_family != AF_UNSPEC);
+                       struct request_ctx *ctx = request_create(worker,
+                                                                (uv_handle_t *)handle,
+                                                                addr);
+                       if (!ctx) {
+                               assert(false);
+                               return kr_error(ENOMEM);
+                       }
+                       task = qr_task_create(ctx);
                        if (!task) {
+                               assert(false);
+                               request_free(ctx);
                                return kr_error(ENOMEM);
                        }
-                       session->buffering = task;
+               } else {
+                       /* Start of response from upstream.
+                        * The session task list must contain a task
+                        * with the same msg id. */
+                       task = find_task(session, msg_id);
+                       /* FIXME: on high load over one connection, it's likely
+                        * that we will get multiple matches sooner or later (!) */
+                       if (task) {
+                               knot_pkt_clear(task->pktbuf);
+                               assert(task->leading == false);
+                       } else  {
+                               session->bytes_to_skip = msg_size - 2;
+                               ssize_t min_len = MIN(session->bytes_to_skip, len);
+                               len -= min_len;
+                               msg += min_len;
+                               session->bytes_to_skip -= min_len;
+                               if (len < 0 || session->bytes_to_skip < 0) {
+                                       /* Something gone wrong.
+                                        * Better kill the connection */
+                                       assert(false);
+                                       return kr_error(EILSEQ);
+                               }
+                               if (len == 0) {
+                                       return submitted;
+                               }
+                               assert(session->bytes_to_skip == 0);
+                               int ret = worker_process_tcp(worker, handle, msg, len);
+                               if (ret < 0) {
+                                       submitted = ret;
+                               } else {
+                                       submitted += ret;
+                               }
+                               return submitted;
+                       }
                }
-       } else {
-               assert(session->tasks.len > 0);
-               task = array_tail(session->tasks);
+               pkt_buf = task->pktbuf;
+               knot_wire_set_id(pkt_buf->wire, msg_id);
+               pkt_buf->size = 2;
+               task->bytes_remaining = msg_size - 2;
+               assert(session->buffering == NULL);
+               session->buffering = task;
        }
-       /* At this point session must have either created new task or it's already assigned. */
+       /* At this point session must have either created new task
+        * or it's already assigned. */
        assert(task);
        assert(len > 0);
-       /* Start reading DNS/TCP message length */
-       knot_pkt_t *pkt_buf = task->pktbuf;
-       if (task->bytes_remaining == 0 && pkt_buf->size == 0) {
-               knot_pkt_clear(pkt_buf);
-               /* Make sure we can process maximum packet sizes over TCP for outbound queries.
-                * Previous packet is allocated with mempool, so there's no need to free it manually. */
-               if (session->outgoing && pkt_buf->max_size < KNOT_WIRE_MAX_PKTSIZE) {
-                       pkt_buf = knot_pkt_new(NULL, KNOT_WIRE_MAX_PKTSIZE, &task->req.pool);
-                       if (!pkt_buf) {
-                               return kr_error(ENOMEM);
-                       }
-                       task->pktbuf = pkt_buf;
-               }
-               /* Read only one byte as TCP fragment may end at a 1B boundary
-                * which would lead to OOB read or improper reassembly length. */
-               pkt_buf->size = 1;
-               pkt_buf->wire[0] = msg[0];
-               len -= 1;
-               msg += 1;
-               if (len == 0) {
-                       return 0;
-               }
-       }
-       /* Finish reading DNS/TCP message length. */
-       if (task->bytes_remaining == 0 && pkt_buf->size == 1) {
-               pkt_buf->wire[1] = msg[0];
-               ssize_t nbytes = msg_size(pkt_buf->wire);
-               len -= 1;
-               msg += 1;
-               /* Cut off fragment length and start reading DNS message. */
-               pkt_buf->size = 0;
-               task->bytes_remaining = nbytes;
-       }
++
        /* Message is too long, can't process it. */
        ssize_t to_read = MIN(len, task->bytes_remaining);
        if (pkt_buf->size + to_read > pkt_buf->max_size) {
        return submitted;
  }
  
 -int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query,
 -                 struct kr_qflags options, worker_cb_t on_complete,
 -                 void *baton)
 +struct qr_task *worker_resolve_start(struct worker_ctx *worker, knot_pkt_t *query, struct kr_qflags options)
  {
        if (!worker || !query) {
 -              assert(false);
 -              return kr_error(EINVAL);
 +              return NULL;
        }
  
-       struct qr_task *task = qr_task_create(worker, NULL, NULL);
+       struct request_ctx *ctx = request_create(worker, NULL, NULL);
+       if (!ctx) {
 -              return kr_error(ENOMEM);
++              return NULL;
+       }
+       /* Create task */
+       struct qr_task *task = qr_task_create(ctx);
        if (!task) {
 -              return kr_error(ENOMEM);
+               request_free(ctx);
 +              return NULL;
        }
 -      task->baton = baton;
 -      task->on_complete = on_complete;
 +
-       int ret = qr_task_start(task, query);
+       /* Start task */
+       int ret = request_start(ctx, query);
 +      if (ret != 0) {
++              request_free(ctx);
 +              qr_task_unref(task);
 +              return NULL;
 +      }
  
        /* Set options late, as qr_task_start() -> kr_resolve_begin() rewrite it. */
-       kr_qflags_set(&task->req.options, options);
+       kr_qflags_set(&task->ctx->req.options, options);
 +      return task;
 +}
  
 -      if (ret != 0) {
 -              request_free(ctx);
 -              qr_task_unref(task);
 -              return ret;
 +int worker_resolve_exec(struct qr_task *task, knot_pkt_t *query)
 +{
 +      if (!task) {
 +              return kr_error(EINVAL);
        }
        return qr_task_step(task, NULL, query);
  }
  
- int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query, struct kr_qflags options)
++struct kr_request *worker_task_request(struct qr_task *task)
 +{
-       if (!worker || !query) {
-               return kr_error(EINVAL);
++      if (!task || !task->ctx) {
++              return NULL;
 +      }
 +
-       /* Create task */
-       struct qr_task *task = worker_resolve_start(worker, query, options);
-       if (!task) {
-               return kr_error(ENOMEM);
-       }
++      return &task->ctx->req;
++}
 +
-       return worker_resolve_exec(task, query);
+ void worker_session_close(struct session *session)
+ {
+       session_close(session);
  }
  
  /** Reserve worker buffers */
diff --cc daemon/worker.h
index e88cfdd2523ed80624b26f700cf81865a0017b07,231b239759acd3edeccf21a13c8f595dd249b278..b77281fa24164c6fed2e697634614bf6b4b4391d
  #include "lib/generic/map.h"
  
  
+ /** Query resolution task (opaque). */
+ struct qr_task;
  /** Worker state (opaque). */
  struct worker_ctx;
- struct qr_task;
+ /** Transport session (opaque). */
+ struct session;
 -/** Union of various libuv objects for freelist. */
 -/** Worker callback */
 -typedef void (*worker_cb_t)(struct worker_ctx *worker, struct kr_request *req, void *baton);
  
  /** Create and initialize the worker. */
  struct worker_ctx *worker_create(struct engine *engine, knot_mm_t *pool,
@@@ -52,29 -64,16 +61,22 @@@ int worker_process_tcp(struct worker_ct
  int worker_end_tcp(struct worker_ctx *worker, uv_handle_t *handle);
  
  /**
 - * Schedule query for resolution.
 + * Start query resolution with given query.
   *
 - * After resolution finishes, invoke on_complete with baton.
 - * @return 0 or an error code
 + * @return task or NULL
 + */
 +struct qr_task *worker_resolve_start(struct worker_ctx *worker, knot_pkt_t *query, struct kr_qflags options);
 +
 +/**
 + * Execute a request with given query.
 + * It expects task to be created with \fn worker_resolve_start.
   *
 - * @note the options passed are |-combined with struct kr_context::options
 - * @todo maybe better semantics for this?
 + * @return 0 or an error code
   */
 -int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query, struct kr_qflags options,
 -                 worker_cb_t on_complete, void *baton);
 +int worker_resolve_exec(struct qr_task *task, knot_pkt_t *query);
 +
- /**
-  * Schedule query for resolution.
-  *
-  * @return 0 or an error code
-  *
-  * @note the options passed are |-combined with struct kr_context::options
-  * @todo maybe better semantics for this?
-  */
- int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query, struct kr_qflags options);
++/** @return struct kr_request associated with opaque task */
++struct kr_request *worker_task_request(struct qr_task *task);
  
  /** Collect worker mempools */
  void worker_reclaim(struct worker_ctx *worker);
index e1c0cf10632d376a7d6c2bf1ee4ff06714b4bf49,9a9d3114059f80dc39b9696a6329f2ffc9e9a5da..aa4d98cd607643a8bcca7b93238fc76484ba5b35
@@@ -824,7 -823,7 +824,7 @@@ int kr_make_query(struct kr_query *quer
                char name_str[KNOT_DNAME_MAXLEN], type_str[16];
                knot_dname_to_str(name_str, query->sname, sizeof(name_str));
                knot_rrtype_to_string(query->stype, type_str, sizeof(type_str));
-               QVERBOSE_MSG(query, "'%s' type '%s' created outbound query, parent id %hu\n",
 -              QVERBOSE_MSG(query, "'%s' type '%s' id was assigned, parent id %u\n",
++              QVERBOSE_MSG(query, "'%s' type '%s' id was assigned, parent id %hu\n",
                            name_str, type_str, query->parent ? query->parent->id : 0);
        }
        return kr_ok();
diff --cc lib/resolve.h
Simple merge
diff --cc lib/utils.c
Simple merge
diff --cc lib/utils.h
Simple merge
index b572950346e3db0b20fa198edcbdd964d5351eaa,0000000000000000000000000000000000000000..9e4d4667d7bb964fbae0b230f742c01fee738bd5
mode 100644,000000..100644
--- /dev/null
@@@ -1,109 -1,0 +1,109 @@@
-               local rank_name = kres.rank_tostring[rank] or tostring(rank)
 +local ffi = require('ffi')
 +local bit = require('bit')
 +local condition = require('cqueues.condition')
 +
 +-- Buffer selected record information to a table
 +local function add_selected_records(dst, records)
 +      for _, rec in ipairs(records) do
 +              local rank = rec.rank
 +              -- Separate the referral chain verified flag
 +              local verified = bit.band(rec.rank, kres.rank.AUTH)
 +              if verified then
 +                      rank = bit.band(rank, bit.bnot(kres.rank.AUTH))
 +              end
++              local rank_name = kres.tostring.rank[rank] or tostring(rank)
 +              -- Write out each individual RR
 +              for rr in tostring(rec.rr):gmatch('[^\n]+\n?') do
 +                      local row = string.format('cached: %s, rank: %s, record: %s',
 +                              rec.cached, rank_name:lower(), rr)
 +                      table.insert(dst, row)
 +              end
 +      end
 +end
 +
 +local function format_selected_records(header, records)
 +      if #records == 0 then return '' end
 +      return string.format('%s\n%s\n', header, string.rep('-', #header))
 +             .. table.concat(records, '') .. '\n'
 +end
 +
 +-- Trace execution of DNS queries
 +local function serve_trace(h, _)
 +      local path = h:get(':path')
 +      local qname, qtype_str = path:match('/trace/([^/]+)/?([^/]*)')
 +      if not qname then
 +              return 400, 'expected /trace/<query name>/<query type>'
 +      end
 +
 +      -- Parse query type (or default to A)
 +      if not qtype_str or #qtype_str == 0 then
 +              qtype_str = 'A'
 +      end
 +
 +      local qtype = kres.type[qtype_str]
 +      if not qtype then
 +              return 400, string.format('unexpected query type: %s', qtype_str)
 +      end
 +
 +      -- Create logging handler callback
 +      local buffer = {}
 +      local buffer_log_cb = ffi.cast('trace_log_f', function (query, source, msg)
 +              local message = string.format('[%5s] [%s] %s',
 +                      query.id, ffi.string(source), ffi.string(msg))
 +              table.insert(buffer, message)
 +      end)
 +
 +      -- Wait for the result of the query
 +      -- Note: We can't do non-blocking write to stream directly from resolve callbacks
 +      -- because they don't run inside cqueue.
 +      local answers, authority = {}, {}
 +      local cond = condition.new()
 +      local waiting, done = false, false
 +      local finish_cb = ffi.cast('trace_callback_f', function (req)
 +              req = kres.request_t(req)
 +              add_selected_records(answers, req.answ_selected)
 +              add_selected_records(authority, req.auth_selected)
 +              if waiting then
 +                      cond:signal()
 +              end
 +              done = true
 +      end)
 +
 +      -- Resolve query and buffer logs into table
 +      resolve {
 +              name = qname,
 +              type = qtype,
 +              options = {'TRACE'},
 +              init = function (req)
 +                      req = kres.request_t(req)
 +                      req.trace_log = buffer_log_cb
 +                      req.trace_finish = finish_cb
 +              end
 +      }
 +
 +      -- Wait for asynchronous query and free callbacks
 +      if not done then
 +              waiting = true
 +              cond:wait()
 +      end
 +
 +      buffer_log_cb:free()
 +      finish_cb:free()
 +
 +      -- Build the result
 +      local result = table.concat(buffer, '') .. '\n'
 +                     .. format_selected_records('Used records from answer:', answers)
 +                     .. format_selected_records('Used records from authority:', authority)
 +      -- Return buffered data
 +      if not done then
 +              return 504, result
 +      end
 +      return result
 +end
 +
 +-- Export endpoints
 +return {
 +      endpoints = {
 +              ['/trace']   = {'text/plain', serve_trace},
 +      }
 +}
Simple merge