]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
WIP: daemon/worker: weak pointer logic for tasks
authorOto Šťáva <oto.stava@nic.cz>
Fri, 10 Jun 2022 08:08:26 +0000 (10:08 +0200)
committerOto Šťáva <oto.stava@nic.cz>
Thu, 23 Jun 2022 10:16:57 +0000 (12:16 +0200)
Replaces the reference-counting logic for `struct qr_task` with
weak-pointer-like logic (in `lib/weakptr`). Pointers to `struct qr_task`
outside of `daemon/worker` are replaced with a unique task identifier
(`qr_task_weakptr_t`), which may be passed to worker functions. If a
weak pointer is no longer valid, operations are (usually silently)
skipped, because it is assumed that a task that has been freed has
already been finished, and any pending operations on it are no longer
needed.

(This is a work in progress and still has some bugs that need to be
resolved)

18 files changed:
daemon/io.c
daemon/lua/kres-gen-30.lua
daemon/lua/kres-gen-31.lua
daemon/lua/kres-gen.sh
daemon/lua/sandbox.lua.in
daemon/main.c
daemon/session.c
daemon/session.h
daemon/tls.h
daemon/udp_queue.c
daemon/udp_queue.h
daemon/worker.c
daemon/worker.h
lib/log.c
lib/log.h
lib/meson.build
lib/weakptr.c [new file with mode: 0644]
lib/weakptr.h [new file with mode: 0644]

index 7724120bb1f7ca33cf8418fb1fdbe39779c75a7e..db6c42d87b8b7c0f14e088bfcb69bde4e5b78e6c 100644 (file)
@@ -302,9 +302,8 @@ void tcp_timeout_trigger(uv_timer_t *timer)
                /* Normally it should not happen,
                 * but better to check if there anything in this list. */
                while (!session_waitinglist_is_empty(s)) {
-                       struct qr_task *t = session_waitinglist_pop(s, false);
+                       qr_task_weakptr_t t = session_waitinglist_pop(s);
                        worker_task_finalize(t, KR_STATE_FAIL);
-                       worker_task_unref(t);
                        the_worker->stats.timeout += 1;
                        if (session_flags(s)->closing) {
                                return;
index 8b86935e02e229c94e36c48cda2fa477addbd449..091788e50e839482e2d78a6d485149bdcceb45de 100644 (file)
@@ -313,7 +313,8 @@ struct kr_server_selection {
        struct local_state *local_state;
 };
 typedef int kr_log_level_t;
-enum kr_log_group {LOG_GRP_UNKNOWN = -1, LOG_GRP_SYSTEM = 1, LOG_GRP_CACHE, LOG_GRP_IO, LOG_GRP_NETWORK, LOG_GRP_TA, LOG_GRP_TLS, LOG_GRP_GNUTLS, LOG_GRP_TLSCLIENT, LOG_GRP_XDP, LOG_GRP_DOH, LOG_GRP_DNSSEC, LOG_GRP_HINT, LOG_GRP_PLAN, LOG_GRP_ITERATOR, LOG_GRP_VALIDATOR, LOG_GRP_RESOLVER, LOG_GRP_SELECTION, LOG_GRP_ZCUT, LOG_GRP_COOKIES, LOG_GRP_STATISTICS, LOG_GRP_REBIND, LOG_GRP_WORKER, LOG_GRP_POLICY, LOG_GRP_TASENTINEL, LOG_GRP_TASIGNALING, LOG_GRP_TAUPDATE, LOG_GRP_DAF, LOG_GRP_DETECTTIMEJUMP, LOG_GRP_DETECTTIMESKEW, LOG_GRP_GRAPHITE, LOG_GRP_PREFILL, LOG_GRP_PRIMING, LOG_GRP_SRVSTALE, LOG_GRP_WATCHDOG, LOG_GRP_NSID, LOG_GRP_DNSTAP, LOG_GRP_TESTS, LOG_GRP_DOTAUTH, LOG_GRP_HTTP, LOG_GRP_CONTROL, LOG_GRP_MODULE, LOG_GRP_DEVEL, LOG_GRP_RENUMBER, LOG_GRP_EDE, LOG_GRP_REQDBG};
+enum kr_log_group {LOG_GRP_UNKNOWN = -1, LOG_GRP_SYSTEM = 1, LOG_GRP_CACHE, LOG_GRP_IO, LOG_GRP_NETWORK, LOG_GRP_TA, LOG_GRP_TLS, LOG_GRP_GNUTLS, LOG_GRP_TLSCLIENT, LOG_GRP_XDP, LOG_GRP_DOH, LOG_GRP_DNSSEC, LOG_GRP_HINT, LOG_GRP_PLAN, LOG_GRP_ITERATOR, LOG_GRP_VALIDATOR, LOG_GRP_RESOLVER, LOG_GRP_SELECTION, LOG_GRP_ZCUT, LOG_GRP_COOKIES, LOG_GRP_STATISTICS, LOG_GRP_REBIND, LOG_GRP_WORKER, LOG_GRP_POLICY, LOG_GRP_TASENTINEL, LOG_GRP_TASIGNALING, LOG_GRP_TAUPDATE, LOG_GRP_DAF, LOG_GRP_DETECTTIMEJUMP, LOG_GRP_DETECTTIMESKEW, LOG_GRP_GRAPHITE, LOG_GRP_PREFILL, LOG_GRP_PRIMING, LOG_GRP_SRVSTALE, LOG_GRP_WATCHDOG, LOG_GRP_NSID, LOG_GRP_DNSTAP, LOG_GRP_TESTS, LOG_GRP_DOTAUTH, LOG_GRP_HTTP, LOG_GRP_CONTROL, LOG_GRP_MODULE, LOG_GRP_DEVEL, LOG_GRP_RENUMBER, LOG_GRP_EDE, LOG_GRP_WEAKPTR, LOG_GRP_REQDBG};
+typedef unsigned long weakptr_t;
 
 kr_layer_t kr_layer_t_static;
 _Bool kr_dbg_assertion_abort;
@@ -456,6 +457,7 @@ int kr_cache_remove(struct kr_cache *, const knot_dname_t *, uint16_t);
 int kr_cache_remove_subtree(struct kr_cache *, const knot_dname_t *, _Bool, int);
 int kr_cache_commit(struct kr_cache *);
 uint32_t packet_ttl(const knot_pkt_t *, _Bool);
+typedef unsigned long qr_task_weakptr_t;
 typedef struct {
        int sock_type;
        _Bool tls;
@@ -518,16 +520,13 @@ struct endpoint {
 };
 struct request_ctx {
        struct kr_request req;
-       struct qr_task *task;
+       qr_task_weakptr_t taskptr;
        /* beware: hidden stub, to avoid hardcoding sockaddr lengths */
 };
-struct qr_task {
-       struct request_ctx *ctx;
-       /* beware: hidden stub, to avoid qr_tasklist_t */
-};
-int worker_resolve_exec(struct qr_task *, knot_pkt_t *);
+int worker_resolve_exec(qr_task_weakptr_t, knot_pkt_t *);
+struct kr_request *worker_task_request(qr_task_weakptr_t);
 knot_pkt_t *worker_resolve_mk_pkt(const char *, uint16_t, uint16_t, const struct kr_qflags *);
-struct qr_task *worker_resolve_start(knot_pkt_t *, struct kr_qflags);
+qr_task_weakptr_t worker_resolve_start(knot_pkt_t *, struct kr_qflags);
 int zi_zone_import(const zi_config_t);
 struct engine {
        char _stub[];
index 6c569f5948198a72697fc2e17bb5b24ee83fed2b..72c89b403c4eef188fce590fa534ffd40c14e2f1 100644 (file)
@@ -313,7 +313,8 @@ struct kr_server_selection {
        struct local_state *local_state;
 };
 typedef int kr_log_level_t;
-enum kr_log_group {LOG_GRP_UNKNOWN = -1, LOG_GRP_SYSTEM = 1, LOG_GRP_CACHE, LOG_GRP_IO, LOG_GRP_NETWORK, LOG_GRP_TA, LOG_GRP_TLS, LOG_GRP_GNUTLS, LOG_GRP_TLSCLIENT, LOG_GRP_XDP, LOG_GRP_DOH, LOG_GRP_DNSSEC, LOG_GRP_HINT, LOG_GRP_PLAN, LOG_GRP_ITERATOR, LOG_GRP_VALIDATOR, LOG_GRP_RESOLVER, LOG_GRP_SELECTION, LOG_GRP_ZCUT, LOG_GRP_COOKIES, LOG_GRP_STATISTICS, LOG_GRP_REBIND, LOG_GRP_WORKER, LOG_GRP_POLICY, LOG_GRP_TASENTINEL, LOG_GRP_TASIGNALING, LOG_GRP_TAUPDATE, LOG_GRP_DAF, LOG_GRP_DETECTTIMEJUMP, LOG_GRP_DETECTTIMESKEW, LOG_GRP_GRAPHITE, LOG_GRP_PREFILL, LOG_GRP_PRIMING, LOG_GRP_SRVSTALE, LOG_GRP_WATCHDOG, LOG_GRP_NSID, LOG_GRP_DNSTAP, LOG_GRP_TESTS, LOG_GRP_DOTAUTH, LOG_GRP_HTTP, LOG_GRP_CONTROL, LOG_GRP_MODULE, LOG_GRP_DEVEL, LOG_GRP_RENUMBER, LOG_GRP_EDE, LOG_GRP_REQDBG};
+enum kr_log_group {LOG_GRP_UNKNOWN = -1, LOG_GRP_SYSTEM = 1, LOG_GRP_CACHE, LOG_GRP_IO, LOG_GRP_NETWORK, LOG_GRP_TA, LOG_GRP_TLS, LOG_GRP_GNUTLS, LOG_GRP_TLSCLIENT, LOG_GRP_XDP, LOG_GRP_DOH, LOG_GRP_DNSSEC, LOG_GRP_HINT, LOG_GRP_PLAN, LOG_GRP_ITERATOR, LOG_GRP_VALIDATOR, LOG_GRP_RESOLVER, LOG_GRP_SELECTION, LOG_GRP_ZCUT, LOG_GRP_COOKIES, LOG_GRP_STATISTICS, LOG_GRP_REBIND, LOG_GRP_WORKER, LOG_GRP_POLICY, LOG_GRP_TASENTINEL, LOG_GRP_TASIGNALING, LOG_GRP_TAUPDATE, LOG_GRP_DAF, LOG_GRP_DETECTTIMEJUMP, LOG_GRP_DETECTTIMESKEW, LOG_GRP_GRAPHITE, LOG_GRP_PREFILL, LOG_GRP_PRIMING, LOG_GRP_SRVSTALE, LOG_GRP_WATCHDOG, LOG_GRP_NSID, LOG_GRP_DNSTAP, LOG_GRP_TESTS, LOG_GRP_DOTAUTH, LOG_GRP_HTTP, LOG_GRP_CONTROL, LOG_GRP_MODULE, LOG_GRP_DEVEL, LOG_GRP_RENUMBER, LOG_GRP_EDE, LOG_GRP_WEAKPTR, LOG_GRP_REQDBG};
+typedef unsigned long weakptr_t;
 
 kr_layer_t kr_layer_t_static;
 _Bool kr_dbg_assertion_abort;
@@ -456,6 +457,7 @@ int kr_cache_remove(struct kr_cache *, const knot_dname_t *, uint16_t);
 int kr_cache_remove_subtree(struct kr_cache *, const knot_dname_t *, _Bool, int);
 int kr_cache_commit(struct kr_cache *);
 uint32_t packet_ttl(const knot_pkt_t *, _Bool);
+typedef unsigned long qr_task_weakptr_t;
 typedef struct {
        int sock_type;
        _Bool tls;
@@ -518,16 +520,13 @@ struct endpoint {
 };
 struct request_ctx {
        struct kr_request req;
-       struct qr_task *task;
+       qr_task_weakptr_t taskptr;
        /* beware: hidden stub, to avoid hardcoding sockaddr lengths */
 };
-struct qr_task {
-       struct request_ctx *ctx;
-       /* beware: hidden stub, to avoid qr_tasklist_t */
-};
-int worker_resolve_exec(struct qr_task *, knot_pkt_t *);
+int worker_resolve_exec(qr_task_weakptr_t, knot_pkt_t *);
+struct kr_request *worker_task_request(qr_task_weakptr_t);
 knot_pkt_t *worker_resolve_mk_pkt(const char *, uint16_t, uint16_t, const struct kr_qflags *);
-struct qr_task *worker_resolve_start(knot_pkt_t *, struct kr_qflags);
+qr_task_weakptr_t worker_resolve_start(knot_pkt_t *, struct kr_qflags);
 int zi_zone_import(const zi_config_t);
 struct engine {
        char _stub[];
index 6be72aa73a2d6f15d20fb35326dfb6aee730b80e..e1ed4caa7f54b184085411a1d93ea054f724776a 100755 (executable)
@@ -140,6 +140,8 @@ ${CDEFS} ${LIBKRES} types <<-EOF
        struct kr_server_selection
        kr_log_level_t
        enum kr_log_group
+       # lib/weakptr.h
+       typedef weakptr_t
 EOF
 
 # static variables; these lines might not be simple to generate
@@ -287,6 +289,7 @@ EOF
 ## kresd itself: worker stuff
 
 ${CDEFS} ${KRESD} types <<-EOF
+       typedef qr_task_weakptr_t
        endpoint_flags_t
        # struct args is a bit complex
        addr_array_t
@@ -302,12 +305,10 @@ echo "struct endpoint"    | ${CDEFS} ${KRESD} types | sed 's/uv_handle_t \*/void
 echo "struct request_ctx" | ${CDEFS} ${KRESD} types | sed '/struct {/,$ d'
 printf "\t/* beware: hidden stub, to avoid hardcoding sockaddr lengths */\n};\n"
 
-echo "struct qr_task" | ${CDEFS} ${KRESD} types | sed '/pktbuf/,$ d'
-printf "\t/* beware: hidden stub, to avoid qr_tasklist_t */\n};\n"
-
 
 ${CDEFS} ${KRESD} functions <<-EOF
        worker_resolve_exec
+       worker_task_request
        worker_resolve_mk_pkt
        worker_resolve_start
        zi_zone_import
index 60fd299254799680befe4b32300d8a9c3a124b71..b977eb469828adff6a9c00a5e7052f47e7a484ef 100644 (file)
@@ -87,6 +87,9 @@ worker.resolve_pkt = function (pkt, options, finish, init)
        local task = ffi.C.worker_resolve_start(pkt, options)
 
        -- Deal with finish and init callbacks
+       local request = ffi.C.worker_task_request(task)
+       assert(request ~= nil, 'request was nil for a just-created task')
+
        if finish ~= nil then
                local finish_cb
                finish_cb = ffi.cast('trace_callback_f',
@@ -95,10 +98,14 @@ worker.resolve_pkt = function (pkt, options, finish, init)
                                finish(req.answer, req)
                                finish_cb:free()
                        end)
-               task.ctx.req.trace_finish = finish_cb
+               if request ~= nil then
+                       request.trace_finish = finish_cb
+               else
+                       finish_cb(nil)
+               end
        end
-       if init ~= nil then
-               init(task.ctx.req)
+       if init ~= nil and request ~= nil then
+               init(request)
        end
 
        return ffi.C.worker_resolve_exec(task, pkt) == 0
index 09fcb6acd03da08f23d6b7412c89d5b9cb29baf7..2bc6213de00ebef553f8cca41e8b7c43ffa4b116 100644 (file)
@@ -5,6 +5,7 @@
 #include "kresconfig.h"
 
 #include "contrib/ucw/mempool.h"
+#include "daemon/bindings/api.h"
 #include "daemon/engine.h"
 #include "daemon/io.h"
 #include "daemon/network.h"
@@ -13,6 +14,7 @@
 #include "lib/defines.h"
 #include "lib/dnssec.h"
 #include "lib/log.h"
+#include "lib/weakptr.h"
 
 #include <arpa/inet.h>
 #include <getopt.h>
@@ -490,6 +492,13 @@ int main(int argc, char **argv)
 
        kr_crypto_init();
 
+       ret = weakptr_manager_init();
+       if (ret != 0) {
+               kr_log_error(SYSTEM, "failed to initialize weakptr manager: %s\n",
+                               kr_strerror(ret));
+               return EXIT_FAILURE;
+       }
+
        network_init(uv_default_loop(), TCP_BACKLOG_DEFAULT);
 
        /* Create a server engine. */
@@ -505,6 +514,10 @@ int main(int argc, char **argv)
                kr_log_error(SYSTEM, "failed to initialize resolver: %s\n", kr_strerror(ret));
                return EXIT_FAILURE;
        }
+
+       /* Register Lua bindings. */
+       kr_bindings_register(the_engine->L);
+
        /* Initialize the worker. */
        ret = worker_init(the_args->forks);
        if (ret != 0) {
@@ -602,6 +615,7 @@ cleanup:/* Cleanup. */
        if (loop != NULL) {
                uv_loop_close(loop);
        }
+       weakptr_manager_deinit();
 cleanup_args:
        args_deinit(the_args);
        kr_crypto_cleanup();
index 795a445e9d693190b430b4678ddb488f468e4df7..1f6adbe1abcdcd7de8aca53db17bb810be576c1f 100644 (file)
@@ -42,15 +42,15 @@ struct session {
        struct http_ctx *http_ctx;  /**< server side http-related data. */
 #endif
 
-       trie_t *tasks;                /**< list of tasks associated with given session. */
-       queue_t(struct qr_task *) waiting;  /**< list of tasks waiting for sending to upstream. */
-
-       uint8_t *wire_buf;            /**< Buffer for DNS message, except for XDP. */
-       ssize_t wire_buf_size;        /**< Buffer size. */
-       ssize_t wire_buf_start_idx;   /**< Data start offset in wire_buf. */
-       ssize_t wire_buf_end_idx;     /**< Data end offset in wire_buf. */
-       uint64_t last_activity;       /**< Time of last IO activity (if any occurs).
-                                      *   Otherwise session creation time. */
+       trie_t *tasks;                       /**< list of tasks associated with given session. */
+       queue_t(qr_task_weakptr_t) waiting;  /**< list of tasks waiting for sending to upstream. */
+
+       uint8_t *wire_buf;                   /**< Buffer for DNS message, except for XDP. */
+       ssize_t wire_buf_size;               /**< Buffer size. */
+       ssize_t wire_buf_start_idx;          /**< Data start offset in wire_buf. */
+       ssize_t wire_buf_end_idx;            /**< Data end offset in wire_buf. */
+       uint64_t last_activity;              /**< Time of last IO activity (if any occurs).
+                                             *   Otherwise session creation time. */
 };
 
 static void on_session_close(uv_handle_t *handle)
@@ -132,126 +132,147 @@ int session_stop_read(struct session *session)
        return io_stop_read(session->handle);
 }
 
-int session_waitinglist_push(struct session *session, struct qr_task *task)
+int session_waitinglist_push(struct session *session, qr_task_weakptr_t taskptr)
 {
-       queue_push(session->waiting, task);
-       worker_task_ref(task);
+       if (kr_fails_assert(worker_task_exists(taskptr)))
+               return kr_error(EINVAL);
+       queue_push(session->waiting, taskptr);
        return kr_ok();
 }
 
-struct qr_task *session_waitinglist_get(const struct session *session)
+qr_task_weakptr_t session_waitinglist_get(struct session *session)
 {
-       return (queue_len(session->waiting) > 0) ? (queue_head(session->waiting)) : NULL;
+       do {
+               if (queue_len(session->waiting) == 0)
+                       return WEAKPTR_NULL;
+               qr_task_weakptr_t ptr = queue_head(session->waiting);
+               if (!ptr || !worker_task_exists(ptr)) {
+                       queue_pop(session->waiting);
+                       continue;
+               }
+               return ptr;
+       } while (true);
 }
 
-struct qr_task *session_waitinglist_pop(struct session *session, bool deref)
+qr_task_weakptr_t session_waitinglist_pop(struct session *session)
 {
-       struct qr_task *t = session_waitinglist_get(session);
+       qr_task_weakptr_t t = session_waitinglist_get(session);
        queue_pop(session->waiting);
-       if (deref) {
-               worker_task_unref(t);
-       }
        return t;
 }
 
-int session_tasklist_add(struct session *session, struct qr_task *task)
+int session_tasklist_add(struct session *session, qr_task_weakptr_t taskptr)
 {
+       if (kr_fails_assert(worker_task_exists(taskptr)))
+               return kr_error(ENOENT);
+
        trie_t *t = session->tasks;
        uint16_t task_msg_id = 0;
        const char *key = NULL;
        size_t key_len = 0;
        if (session->sflags.outgoing) {
-               knot_pkt_t *pktbuf = worker_task_get_pktbuf(task);
+               knot_pkt_t *pktbuf = worker_task_get_pktbuf(taskptr);
+               kr_require(pktbuf);
                task_msg_id = knot_wire_get_id(pktbuf->wire);
                key = (const char *)&task_msg_id;
                key_len = sizeof(task_msg_id);
        } else {
-               key = (const char *)&task;
-               key_len = sizeof(char *);
+               key = (const char *)&taskptr;
+               key_len = sizeof(taskptr);
        }
        trie_val_t *v = trie_get_ins(t, key, key_len);
        if (kr_fails_assert(v))
                return kr_error(ENOMEM);
        if (*v == NULL) {
-               *v = task;
-               worker_task_ref(task);
-       } else if (kr_fails_assert(*v == task)) {
+               *v = (void *) taskptr;
+       } else if (kr_fails_assert((qr_task_weakptr_t) *v == taskptr)) {
                return kr_error(EINVAL);
        }
        return kr_ok();
 }
 
-int session_tasklist_del(struct session *session, struct qr_task *task)
+int session_tasklist_del(struct session *session, qr_task_weakptr_t taskptr)
 {
+       if (kr_fails_assert(worker_task_exists(taskptr)))
+               return kr_error(ENOENT);
+
        trie_t *t = session->tasks;
        uint16_t task_msg_id = 0;
        const char *key = NULL;
        size_t key_len = 0;
-       trie_val_t val;
        if (session->sflags.outgoing) {
-               knot_pkt_t *pktbuf = worker_task_get_pktbuf(task);
+               knot_pkt_t *pktbuf = worker_task_get_pktbuf(taskptr);
                task_msg_id = knot_wire_get_id(pktbuf->wire);
                key = (const char *)&task_msg_id;
                key_len = sizeof(task_msg_id);
        } else {
-               key = (const char *)&task;
-               key_len = sizeof(char *);
-       }
-       int ret = trie_del(t, key, key_len, &val);
-       if (ret == KNOT_EOK) {
-               kr_require(val == task);
-               worker_task_unref(val);
+               key = (const char *)&taskptr;
+               key_len = sizeof(taskptr);
        }
+       int ret = trie_del(t, key, key_len, NULL);
+
        return ret;
 }
 
-struct qr_task *session_tasklist_get_first(struct session *session)
+qr_task_weakptr_t session_tasklist_get_first(struct session *session)
 {
-       trie_val_t *val = trie_get_first(session->tasks, NULL, NULL);
-       return val ? (struct qr_task *) *val : NULL;
+       do {
+               trie_val_t *val = trie_get_first(session->tasks, NULL, NULL);
+               if (!val)
+                       return WEAKPTR_NULL;
+               if (!worker_task_exists((qr_task_weakptr_t) *val)) {
+                       // Task is not valid anymore, fetch next
+                       trie_del_first(session->tasks, NULL, NULL, NULL);
+                       continue;
+               }
+
+               return (qr_task_weakptr_t) *val;
+       } while (true);
 }
 
-struct qr_task *session_tasklist_del_first(struct session *session, bool deref)
+qr_task_weakptr_t session_tasklist_del_first(struct session *session)
 {
-       trie_val_t val = NULL;
-       int res = trie_del_first(session->tasks, NULL, NULL, &val);
-       if (res != KNOT_EOK) {
-               val = NULL;
-       } else if (deref) {
-               worker_task_unref(val);
-       }
-       return (struct qr_task *)val;
+       do {
+               trie_val_t val = NULL;
+               int res = trie_del_first(session->tasks, NULL, NULL, &val);
+               if (res != KNOT_EOK)
+                       return WEAKPTR_NULL;
+               if (!worker_task_exists((qr_task_weakptr_t) val)) {
+                       // Task is not valid anymore, delete next
+                       continue;
+               }
+               return (qr_task_weakptr_t)val;
+       } while (true);
 }
-struct qr_task* session_tasklist_del_msgid(const struct session *session, uint16_t msg_id)
+
+qr_task_weakptr_t session_tasklist_del_msgid(const struct session *session, uint16_t msg_id)
 {
        if (kr_fails_assert(session->sflags.outgoing))
-               return NULL;
+               return WEAKPTR_NULL;
        trie_t *t = session->tasks;
-       struct qr_task *ret = NULL;
        const char *key = (const char *)&msg_id;
        size_t key_len = sizeof(msg_id);
        trie_val_t val;
        int res = trie_del(t, key, key_len, &val);
-       if (res == KNOT_EOK) {
-               if (worker_task_numrefs(val) > 1) {
-                       ret = val;
-               }
-               worker_task_unref(val);
-       }
-       return ret;
+       if (res != KNOT_EOK || !worker_task_exists((qr_task_weakptr_t)val))
+               return WEAKPTR_NULL;
+       return (qr_task_weakptr_t)val;
 }
 
-struct qr_task* session_tasklist_find_msgid(const struct session *session, uint16_t msg_id)
+qr_task_weakptr_t session_tasklist_find_msgid(const struct session *session, uint16_t msg_id)
 {
        if (kr_fails_assert(session->sflags.outgoing))
-               return NULL;
+               return WEAKPTR_NULL;
        trie_t *t = session->tasks;
-       struct qr_task *ret = NULL;
        trie_val_t *val = trie_get_try(t, (char *)&msg_id, sizeof(msg_id));
        if (val) {
-               ret = *val;
+               if (!worker_task_exists((qr_task_weakptr_t)*val)) {
+                       trie_del(t, (char *)&msg_id, sizeof(msg_id), NULL);
+                       return WEAKPTR_NULL;
+               }
+               return (qr_task_weakptr_t)*val;
        }
-       return ret;
+       return WEAKPTR_NULL;
 }
 
 struct session_flags *session_flags(struct session *session)
@@ -382,27 +403,29 @@ struct session *session_new(uv_handle_t *handle, bool has_tls, bool has_http)
        return session;
 }
 
-size_t session_tasklist_get_len(const struct session *session)
+size_t session_tasklist_get_len(struct session *session)
 {
+       session_tasklist_get_first(session); // Clean up no-longer-valid tasks
        return trie_weight(session->tasks);
 }
 
-size_t session_waitinglist_get_len(const struct session *session)
+size_t session_waitinglist_get_len(struct session *session)
 {
+       session_waitinglist_get(session); // Clean up no-longer-valid tasks
        return queue_len(session->waiting);
 }
 
-bool session_tasklist_is_empty(const struct session *session)
+bool session_tasklist_is_empty(struct session *session)
 {
        return session_tasklist_get_len(session) == 0;
 }
 
-bool session_waitinglist_is_empty(const struct session *session)
+bool session_waitinglist_is_empty(struct session *session)
 {
        return session_waitinglist_get_len(session) == 0;
 }
 
-bool session_is_empty(const struct session *session)
+bool session_is_empty(struct session *session)
 {
        return session_tasklist_is_empty(session) &&
               session_waitinglist_is_empty(session);
@@ -421,21 +444,19 @@ void session_set_has_tls(struct session *session, bool has_tls)
 void session_waitinglist_retry(struct session *session, bool increase_timeout_cnt)
 {
        while (!session_waitinglist_is_empty(session)) {
-               struct qr_task *task = session_waitinglist_pop(session, false);
+               qr_task_weakptr_t t = session_waitinglist_pop(session);
                if (increase_timeout_cnt) {
-                       worker_task_timeout_inc(task);
+                       worker_task_timeout_inc(t);
                }
-               worker_task_step(task, &session->peer.ip, NULL);
-               worker_task_unref(task);
+               worker_task_step(t, &session->peer.ip, NULL);
        }
 }
 
 void session_waitinglist_finalize(struct session *session, int status)
 {
        while (!session_waitinglist_is_empty(session)) {
-               struct qr_task *t = session_waitinglist_pop(session, false);
+               qr_task_weakptr_t t = session_waitinglist_pop(session);
                worker_task_finalize(t, status);
-               worker_task_unref(t);
        }
 }
 
@@ -457,62 +478,65 @@ struct proxy_result *session_proxy_get(struct session *session)
 void session_tasklist_finalize(struct session *session, int status)
 {
        while (session_tasklist_get_len(session) > 0) {
-               struct qr_task *t = session_tasklist_del_first(session, false);
-               kr_require(worker_task_numrefs(t) > 0);
+               qr_task_weakptr_t t = session_tasklist_del_first(session);
                worker_task_finalize(t, status);
-               worker_task_unref(t);
        }
 }
 
 int session_tasklist_finalize_expired(struct session *session)
 {
        int ret = 0;
-       queue_t(struct qr_task *) q;
+       queue_t(qr_task_weakptr_t) q;
        uint64_t now = kr_now();
        trie_t *t = session->tasks;
        trie_it_t *it;
        queue_init(q);
        for (it = trie_it_begin(t); !trie_it_finished(it); trie_it_next(it)) {
                trie_val_t *v = trie_it_val(it);
-               struct qr_task *task = (struct qr_task *)*v;
-               if ((now - worker_task_creation_time(task)) >= KR_RESOLVE_TIME_LIMIT) {
-                       struct kr_request *req = worker_task_request(task);
+               qr_task_weakptr_t taskptr = (qr_task_weakptr_t)*v;
+               if (!worker_task_exists(taskptr))
+                       continue;
+
+               if ((now - worker_task_creation_time(taskptr)) >= KR_RESOLVE_TIME_LIMIT
+                               && worker_task_exists(taskptr)) {
+                       struct kr_request *req = worker_task_request(taskptr);
                        if (!kr_fails_assert(req))
                                kr_query_inform_timeout(req, req->current_query);
-                       queue_push(q, task);
-                       worker_task_ref(task);
+                       queue_push(q, taskptr);
                }
        }
        trie_it_free(it);
 
-       struct qr_task *task = NULL;
+       qr_task_weakptr_t taskptr = WEAKPTR_NULL;
        uint16_t msg_id = 0;
-       char *key = (char *)&task;
-       int32_t keylen = sizeof(struct qr_task *);
+       char *key = (char *)&taskptr;
+       int32_t keylen = sizeof(taskptr);
        if (session->sflags.outgoing) {
                key = (char *)&msg_id;
                keylen = sizeof(msg_id);
        }
        while (queue_len(q) > 0) {
-               task = queue_head(q);
+               taskptr = queue_head(q);
+               if (!worker_task_exists(taskptr)) {
+                       queue_pop(q);
+                       continue;
+               }
+
                if (session->sflags.outgoing) {
-                       knot_pkt_t *pktbuf = worker_task_get_pktbuf(task);
+                       knot_pkt_t *pktbuf = worker_task_get_pktbuf(taskptr);
+                       kr_require(pktbuf); // should never happen - task check above
                        msg_id = knot_wire_get_id(pktbuf->wire);
                }
-               int res = trie_del(t, key, keylen, NULL);
-               if (!worker_task_finished(task)) {
+               trie_del(t, key, keylen, NULL);
+               if (!worker_task_finished(taskptr)) {
                        /* task->pending_count must be zero,
                         * but there are can be followers,
                         * so run worker_task_subreq_finalize() to ensure retrying
                         * for all the followers. */
-                       worker_task_subreq_finalize(task);
-                       worker_task_finalize(task, KR_STATE_FAIL);
-               }
-               if (res == KNOT_EOK) {
-                       worker_task_unref(task);
+                       worker_task_subreq_finalize(taskptr);
+                       worker_task_finalize(taskptr, KR_STATE_FAIL);
                }
                queue_pop(q);
-               worker_task_unref(task);
                ++ret;
        }
 
@@ -803,7 +827,7 @@ int session_wirebuf_process(struct session *session, struct io_comm_data *comm)
        return ret;
 }
 
-void session_kill_ioreq(struct session *session, struct qr_task *task)
+void session_kill_ioreq(struct session *session, qr_task_weakptr_t task)
 {
        if (!session || session->sflags.closing)
                return;
index e27187458cd96f2be09109c55dc26193ccb01f65..c1c881dc958c30e0a3f32cfd188264cbdfc90cc5 100644 (file)
@@ -9,8 +9,9 @@
 #include <stdbool.h>
 #include <uv.h>
 #include "lib/defines.h"
+#include "lib/weakptr.h"
 
-struct qr_task;
+typedef weakptr_t qr_task_weakptr_t;
 struct worker_ctx;
 struct session;
 struct io_comm_data;
@@ -45,15 +46,15 @@ int session_stop_read(struct session *session);
 
 /** List of tasks been waiting for IO. */
 /** Check if list is empty. */
-bool session_waitinglist_is_empty(const struct session *session);
+bool session_waitinglist_is_empty(struct session *session);
 /** Add task to the end of the list. */
-int session_waitinglist_push(struct session *session, struct qr_task *task);
+int session_waitinglist_push(struct session *session, qr_task_weakptr_t taskptr);
 /** Get the first element. */
-struct qr_task *session_waitinglist_get(const struct session *session);
+qr_task_weakptr_t session_waitinglist_get(struct session *session);
 /** Get the first element and remove it from the list. */
-struct qr_task *session_waitinglist_pop(struct session *session, bool deref);
+qr_task_weakptr_t session_waitinglist_pop(struct session *session);
 /** Get the list length. */
-size_t session_waitinglist_get_len(const struct session *session);
+size_t session_waitinglist_get_len(struct session *session);
 /** Retry resolution for each task in the list. */
 void session_waitinglist_retry(struct session *session, bool increase_timeout_cnt);
 /** Finalize all tasks in the list. */
@@ -68,21 +69,21 @@ struct proxy_result *session_proxy_get(struct session *session);
 
 /** List of tasks associated with session. */
 /** Check if list is empty. */
-bool session_tasklist_is_empty(const struct session *session);
+bool session_tasklist_is_empty(struct session *session);
 /** Get the first element. */
-struct qr_task *session_tasklist_get_first(struct session *session);
+qr_task_weakptr_t session_tasklist_get_first(struct session *session);
 /** Get the first element and remove it from the list. */
-struct qr_task *session_tasklist_del_first(struct session *session, bool deref);
+qr_task_weakptr_t session_tasklist_del_first(struct session *session);
 /** Get the list length. */
-size_t session_tasklist_get_len(const struct session *session);
+size_t session_tasklist_get_len(struct session *session);
 /** Add task to the list. */
-int session_tasklist_add(struct session *session, struct qr_task *task);
+int session_tasklist_add(struct session *session, qr_task_weakptr_t taskptr);
 /** Remove task from the list. */
-int session_tasklist_del(struct session *session, struct qr_task *task);
+int session_tasklist_del(struct session *session, qr_task_weakptr_t taskptr);
 /** Remove task with given msg_id, session_flags(session)->outgoing must be true. */
-struct qr_task* session_tasklist_del_msgid(const struct session *session, uint16_t msg_id);
+qr_task_weakptr_t session_tasklist_del_msgid(const struct session *session, uint16_t msg_id);
 /** Find task with given msg_id */
-struct qr_task* session_tasklist_find_msgid(const struct session *session, uint16_t msg_id);
+qr_task_weakptr_t session_tasklist_find_msgid(const struct session *session, uint16_t msg_id);
 /** Finalize all tasks in the list. */
 void session_tasklist_finalize(struct session *session, int status);
 /** Finalize all expired tasks in the list. */
@@ -90,7 +91,7 @@ int session_tasklist_finalize_expired(struct session *session);
 
 /** Both of task lists (associated & waiting). */
 /** Check if empty. */
-bool session_is_empty(const struct session *session);
+bool session_is_empty(struct session *session);
 /** Get pointer to session flags */
 struct session_flags *session_flags(struct session *session);
 /** Get pointer to peer address. */
@@ -155,7 +156,7 @@ void session_unpoison(struct session *session);
 knot_pkt_t *session_produce_packet(struct session *session, knot_mm_t *mm);
 int session_discard_packet(struct session *session, const knot_pkt_t *pkt);
 
-void session_kill_ioreq(struct session *session, struct qr_task *task);
+void session_kill_ioreq(struct session *session, qr_task_weakptr_t taskptr);
 /** Update timestamp */
 void session_touch(struct session *session);
 /** Returns either creation time or time of last IO activity if any occurs. */
index 1eb43a65e13c543340e093553b74b7e4d6dfee8d..45a0719442844a36bc017c6f314129746b1542d9 100644 (file)
@@ -95,7 +95,6 @@ void tls_client_params_free(tls_client_params_t *params);
 
 
 struct worker_ctx;
-struct qr_task;
 struct network;
 struct engine;
 
index 7004d1efa5098c8283e170e406bf97b6ed4e5ccc..7e1af1b8eac78d5530efb6ed8eea0db2909dfeab 100644 (file)
@@ -9,8 +9,6 @@
 #include "lib/generic/array.h"
 #include "lib/utils.h"
 
-struct qr_task;
-
 #include <sys/socket.h>
 
 
@@ -20,7 +18,7 @@ int udp_queue_init_global(uv_loop_t *loop)
        return 0;
 }
 /* Appease the linker in case this unused call isn't optimized out. */
-void udp_queue_push(int fd, struct kr_request *req, struct qr_task *task)
+void udp_queue_push(int fd, struct kr_request *req, qr_task_weakptr_t taskptr)
 {
        abort();
 }
@@ -35,7 +33,7 @@ typedef struct {
        int len; /**< The number of messages in the queue: 0..UDP_QUEUE_LEN */
        struct mmsghdr msgvec[UDP_QUEUE_LEN]; /**< Parameter for sendmmsg() */
        struct {
-               struct qr_task *task; /**< Links for completion callbacks. */
+               qr_task_weakptr_t taskptr; /**< Links for completion callbacks. */
                struct iovec msg_iov[1]; /**< storage for .msgvec[i].msg_iov */
        } items[UDP_QUEUE_LEN];
 } udp_queue_t;
@@ -77,8 +75,7 @@ static void udp_queue_send(int fd)
        /* ATM we don't really do anything about failures. */
        int err = sent_len < 0 ? errno : EAGAIN /* unknown error, really */;
        for (int i = 0; i < q->len; ++i) {
-               qr_task_on_send(q->items[i].task, NULL, i < sent_len ? 0 : err);
-               worker_task_unref(q->items[i].task);
+               qr_task_on_send(q->items[i].taskptr, NULL, i < sent_len ? 0 : err);
        }
        q->len = 0;
 }
@@ -99,13 +96,14 @@ int udp_queue_init_global(uv_loop_t *loop)
        return ret;
 }
 
-void udp_queue_push(int fd, struct kr_request *req, struct qr_task *task)
+void udp_queue_push(int fd, struct kr_request *req, qr_task_weakptr_t taskptr)
 {
        if (fd < 0) {
                kr_log_error(SYSTEM, "ERROR: called udp_queue_push(fd = %d, ...)\n", fd);
                abort();
        }
-       worker_task_ref(task);
+       if (kr_fails_assert(worker_task_exists(taskptr)))
+               return;
        /* Get a valid correct queue. */
        if (fd >= state.udp_queues_len) {
                const int new_len = fd + 1;
@@ -124,7 +122,7 @@ void udp_queue_push(int fd, struct kr_request *req, struct qr_task *task)
        struct sockaddr *sa = (struct sockaddr *)/*const-cast*/req->qsource.comm_addr;
        q->msgvec[q->len].msg_hdr.msg_name = sa;
        q->msgvec[q->len].msg_hdr.msg_namelen = kr_sockaddr_len(sa);
-       q->items[q->len].task = task;
+       q->items[q->len].taskptr = taskptr;
        q->items[q->len].msg_iov[0] = (struct iovec){
                .iov_base = req->answer->wire,
                .iov_len  = req->answer->size,
index 43fd56f1b736295c3d069ce126db8cbe89ec4ade..578fb079c41af34a668d729a5ab232372b1e0f69 100644 (file)
@@ -5,12 +5,14 @@
 #pragma once
 
 #include <uv.h>
+#include "lib/weakptr.h"
+
 struct kr_request;
-struct qr_task;
+typedef weakptr_t qr_task_weakptr_t;
 
 /** Initialize the global state for udp_queue. */
 int udp_queue_init_global(uv_loop_t *loop);
 
 /** Send req->answer via UDP, possibly not immediately. */
-void udp_queue_push(int fd, struct kr_request *req, struct qr_task *task);
+void udp_queue_push(int fd, struct kr_request *req, qr_task_weakptr_t task);
 
index 2a042957ea241b9e3012cb11c4a4355442f85ccf..73aee64c9ff3fa43419e9bbb10f20cc3d396b09f 100644 (file)
@@ -24,7 +24,6 @@
        #include <libknot/xdp/xdp.h>
 #endif
 
-#include "daemon/bindings/api.h"
 #include "daemon/engine.h"
 #include "daemon/io.h"
 #include "daemon/proxyv2.h"
@@ -58,7 +57,7 @@ struct request_ctx
 {
        struct kr_request req;
 
-       struct qr_task *task;
+       qr_task_weakptr_t taskptr;
        struct {
                /** NULL if the request didn't come over network. */
                struct session *session;
@@ -84,7 +83,7 @@ struct qr_task
        uint16_t pending_count;
        uint16_t timeouts;
        uint16_t iter_count;
-       uint32_t refs;
+       qr_task_weakptr_t weakptr; /**< Weak pointer to self. */
        bool finished : 1;
        bool leading  : 1;
        uint64_t creation_time;
@@ -94,18 +93,10 @@ struct qr_task
 };
 
 
-/* Convenience macros */
-#define qr_task_ref(task) \
-       do { ++(task)->refs; } while(0)
-#define qr_task_unref(task) \
-       do { \
-               if (task) \
-                       kr_require((task)->refs > 0); \
-               if ((task) && --(task)->refs == 0) \
-                       qr_task_free((task)); \
-       } while (0)
-
 /* Forward decls */
+static struct session* worker_find_tcp_connected(const struct sockaddr* addr);
+static struct session* worker_find_tcp_waiting(const struct sockaddr* addr);
+static int worker_add_tcp_connected(const struct sockaddr* addr, struct session *session);
 static void qr_task_free(struct qr_task *task);
 static int qr_task_step(struct qr_task *task,
                        const struct sockaddr *packet_source,
@@ -114,6 +105,7 @@ static int qr_task_send(struct qr_task *task, struct session *session,
                        const struct sockaddr *addr, knot_pkt_t *pkt);
 static int qr_task_finalize(struct qr_task *task, int state);
 static void qr_task_complete(struct qr_task *task);
+static void qr_task_assert_weakptr(struct qr_task *task);
 static int worker_add_tcp_waiting(const struct sockaddr* addr, struct session *session);
 static void on_tcp_connect_timeout(uv_timer_t *timer);
 static void on_udp_timeout(uv_timer_t *timer);
@@ -123,6 +115,23 @@ static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_
 struct worker_ctx the_worker_value; /**< Static allocation is suitable for the singleton. */
 struct worker_ctx *the_worker = NULL;
 
+static struct kr_query *task_get_last_pending_query(struct qr_task *task)
+{
+       if (!task || task->ctx->req.rplan.pending.len == 0) {
+               return NULL;
+       }
+
+       return array_tail(task->ctx->req.rplan.pending);
+}
+
+static inline void worker_task_pkt_set_msgid(struct qr_task *task, uint16_t msgid)
+{
+       knot_pkt_t *pktbuf = task->pktbuf;
+       knot_wire_set_id(pktbuf->wire, msgid);
+       struct kr_query *q = task_get_last_pending_query(task);
+       q->id = msgid;
+}
+
 /*! @internal Create a UDP/TCP handle for an outgoing AF_INET* connection.
  *  socktype is SOCK_* */
 static uv_handle_t *ioreq_spawn(int socktype, sa_family_t family, bool has_tls,
@@ -186,8 +195,9 @@ static uv_handle_t *ioreq_spawn(int socktype, sa_family_t family, bool has_tls,
 
 static void ioreq_kill_pending(struct qr_task *task)
 {
+       qr_task_weakptr_t taskptr = task->weakptr;
        for (uint16_t i = 0; i < task->pending_count; ++i) {
-               session_kill_ioreq(task->pending[i], task);
+               session_kill_ioreq(task->pending[i], taskptr);
        }
        task->pending_count = 0;
 }
@@ -510,7 +520,7 @@ static void request_free(struct request_ctx *ctx)
        the_worker->stats.rconcurrent -= 1;
 }
 
-static struct qr_task *qr_task_create(struct request_ctx *ctx)
+static struct qr_task *qr_task_create(struct request_ctx *ctx, qr_task_weakptr_t *out_weakptr)
 {
        /* Choose (initial) pktbuf size.  As it is now, pktbuf can be used
         * for UDP answers from upstream *and* from cache
@@ -522,11 +532,16 @@ static struct qr_task *qr_task_create(struct request_ctx *ctx)
        }
 
        /* Create resolution task */
-       struct qr_task *task = mm_calloc(&ctx->req.pool, 1, sizeof(*task));
-       if (!task) {
+       struct qr_task *task;
+       qr_task_weakptr_t taskptr = weakptr_mm_alloc(&ctx->req.pool,
+                       sizeof(*task), (void **)&task);
+       if (!taskptr || !task) {
                return NULL;
        }
 
+       /* Initialize task to zero + make self-reference with `weakptr` */
+       *task = (struct qr_task) { .weakptr = taskptr };
+
        /* Create packet buffers for answer and subrequests */
        knot_pkt_t *pktbuf = knot_pkt_new(NULL, pktbuf_max, &ctx->req.pool);
        if (!pktbuf) {
@@ -538,25 +553,29 @@ static struct qr_task *qr_task_create(struct request_ctx *ctx)
        task->ctx = ctx;
        task->pktbuf = pktbuf;
        array_init(task->waiting);
-       task->refs = 0;
-       kr_assert(ctx->task == NULL);
-       ctx->task = task;
-       /* Make the primary reference to task. */
-       qr_task_ref(task);
+       kr_assert(ctx->taskptr == WEAKPTR_NULL);
+       ctx->taskptr = taskptr;
        task->creation_time = kr_now();
        the_worker->stats.concurrent += 1;
+       if (out_weakptr)
+               *out_weakptr = taskptr;
        return task;
 }
 
-/* This is called when the task refcount is zero, free memory. */
 static void qr_task_free(struct qr_task *task)
 {
+       if (!task)
+               return;
+
+       qr_task_assert_weakptr(task);
+
        struct request_ctx *ctx = task->ctx;
+       weakptr_mm_free(&ctx->req.pool, task->weakptr);
 
        if (kr_fails_assert(ctx))
                return;
 
-       if (ctx->task == NULL) {
+       if (ctx->taskptr == WEAKPTR_NULL) {
                request_free(ctx);
        }
 
@@ -570,7 +589,7 @@ static int qr_task_register(struct qr_task *task, struct session *session)
        if (kr_fails_assert(!session_flags(session)->outgoing && session_get_handle(session)->type == UV_TCP))
                return kr_error(EINVAL);
 
-       session_tasklist_add(session, task);
+       session_tasklist_add(session, task->weakptr);
 
        struct request_ctx *ctx = task->ctx;
        if (kr_fails_assert(ctx && (ctx->source.session == NULL || ctx->source.session == session)))
@@ -603,42 +622,54 @@ static void qr_task_complete(struct qr_task *task)
        if (s) {
                kr_require(!session_flags(s)->outgoing && session_waitinglist_is_empty(s));
                ctx->source.session = NULL;
-               session_tasklist_del(s, task);
+               session_tasklist_del(s, task->weakptr);
        }
 
-       /* Release primary reference to task. */
-       if (ctx->task == task) {
-               ctx->task = NULL;
-               qr_task_unref(task);
-       }
+       if (ctx->taskptr == task->weakptr)
+               ctx->taskptr = WEAKPTR_NULL;
+}
+
+/** A debug function checking if `task->weakptr` is not corrupted, i.e. still
+ * pointing to `task` itself. */
+static inline void qr_task_assert_weakptr(struct qr_task *task)
+{
+#ifndef NDEBUG
+       if (!task)
+               return;
+       struct qr_task *self = weakptr_get(task->weakptr);
+       kr_require(self); /* Weakptr did not exist */
+       kr_require(self == task); /* Weakptr existed but did not point to `task` */
+#endif
 }
 
 /* This is called when we send subrequest / answer */
-int qr_task_on_send(struct qr_task *task, const uv_handle_t *handle, int status)
+static int qr_task_on_send_internal(struct qr_task *task, const uv_handle_t *handle, int status)
 {
+       qr_task_assert_weakptr(task);
        if (task->finished) {
                kr_require(task->leading == false);
                qr_task_complete(task);
        }
 
        if (!handle || kr_fails_assert(handle->data))
-               return status;
+               goto cleanup;
        struct session* s = handle->data;
 
        if (handle->type == UV_UDP && session_flags(s)->outgoing) {
                // This should ensure that we are only dealing with our question to upstream
                if (kr_fails_assert(!knot_wire_get_qr(task->pktbuf->wire)))
-                       return status;
+                       goto cleanup;
                // start the timer
                struct kr_query *qry = array_tail(task->ctx->req.rplan.pending);
                if (kr_fails_assert(qry && task->transport))
-                       return status;
+                       goto cleanup;
                size_t timeout = task->transport->timeout;
                int ret = session_timer_start(s, on_udp_timeout, timeout, 0);
                /* Start next step with timeout, fatal if can't start a timer. */
                if (ret != 0) {
                        subreq_finalize(task, &task->transport->address.ip, task->pktbuf);
                        qr_task_finalize(task, KR_STATE_FAIL);
+                       return status;
                }
        }
 
@@ -659,11 +690,11 @@ int qr_task_on_send(struct qr_task *task, const uv_handle_t *handle, int status)
                                                peer_str, uv_strerror(status));
                        }
                        worker_end_tcp(s);
-                       return status;
+                       goto cleanup;
                }
 
                if (session_flags(s)->outgoing || session_flags(s)->closing)
-                       return status;
+                       goto cleanup;
 
                if (session_flags(s)->throttled &&
                    session_tasklist_get_len(s) < the_worker->tcp_pipeline_max/2) {
@@ -674,24 +705,36 @@ int qr_task_on_send(struct qr_task *task, const uv_handle_t *handle, int status)
                }
        }
 
+cleanup:;
+       if (task->finished) {
+               /* Answer has been sent or an error has occurred,
+                * the task is complete, we can free it. */
+               qr_task_free(task);
+       }
        return status;
 }
 
+int qr_task_on_send(qr_task_weakptr_t taskptr, const uv_handle_t *handle, int status)
+{
+       struct qr_task *task = weakptr_get(taskptr);
+       if (!task)
+               return kr_error(ENOENT);
+       return qr_task_on_send_internal(task, handle, status);
+}
+
 static void on_send(uv_udp_send_t *req, int status)
 {
-       struct qr_task *task = req->data;
+       qr_task_weakptr_t taskptr = (qr_task_weakptr_t) req->data;
        uv_handle_t *h = (uv_handle_t *)req->handle;
-       qr_task_on_send(task, h, status);
-       qr_task_unref(task);
+       qr_task_on_send(taskptr, h, status);
        free(req);
 }
 
 static void on_write(uv_write_t *req, int status)
 {
-       struct qr_task *task = req->data;
+       qr_task_weakptr_t taskptr = (qr_task_weakptr_t) req->data;
        uv_handle_t *h = (uv_handle_t *)req->handle;
-       qr_task_on_send(task, h, status);
-       qr_task_unref(task);
+       qr_task_on_send(taskptr, h, status);
        free(req);
 }
 
@@ -699,14 +742,14 @@ static int qr_task_send(struct qr_task *task, struct session *session,
                        const struct sockaddr *addr, knot_pkt_t *pkt)
 {
        if (!session)
-               return qr_task_on_send(task, NULL, kr_error(EIO));
+               return qr_task_on_send_internal(task, NULL, kr_error(EIO));
 
        int ret = 0;
        struct request_ctx *ctx = task->ctx;
 
        uv_handle_t *handle = session_get_handle(session);
        if (kr_fails_assert(handle && handle->data == session))
-               return qr_task_on_send(task, NULL, kr_error(EINVAL));
+               return qr_task_on_send_internal(task, NULL, kr_error(EINVAL));
        const bool is_stream = handle->type == UV_TCP;
        kr_require(is_stream || handle->type == UV_UDP);
 
@@ -714,7 +757,7 @@ static int qr_task_send(struct qr_task *task, struct session *session,
                addr = session_get_peer(session);
 
        if (pkt == NULL)
-               pkt = worker_task_get_pktbuf(task);
+               pkt = task->pktbuf;
 
        if (session_flags(session)->outgoing && handle->type == UV_TCP) {
                size_t try_limit = session_tasklist_get_len(session) + 1;
@@ -725,8 +768,10 @@ static int qr_task_send(struct qr_task *task, struct session *session,
                        ++msg_id;
                        ++try_count;
                }
-               if (try_count > try_limit)
+               if (try_count > try_limit) {
+                       qr_task_free(task);
                        return kr_error(ENOENT);
+               }
                worker_task_pkt_set_msgid(task, msg_id);
        }
 
@@ -735,31 +780,28 @@ static int qr_task_send(struct qr_task *task, struct session *session,
        task->recv_time = 0; // task structure is being reused so we have to zero this out here
        /* Send using given protocol */
        if (kr_fails_assert(!session_flags(session)->closing))
-               return qr_task_on_send(task, NULL, kr_error(EIO));
+               return qr_task_on_send_internal(task, NULL, kr_error(EIO));
 
        uv_handle_t *ioreq = malloc(is_stream ? sizeof(uv_write_t) : sizeof(uv_udp_send_t));
        if (!ioreq)
-               return qr_task_on_send(task, handle, kr_error(ENOMEM));
-
-       /* Pending ioreq on current task */
-       qr_task_ref(task);
+               return qr_task_on_send_internal(task, handle, kr_error(ENOMEM));
 
        if (session_flags(session)->has_http) {
 #if ENABLE_DOH2
                uv_write_t *write_req = (uv_write_t *)ioreq;
-               write_req->data = task;
+               write_req->data = (void *)task->weakptr;
                ret = http_write(write_req, handle, pkt, ctx->req.qsource.stream_id, &on_write);
 #else
                ret = kr_error(ENOPROTOOPT);
 #endif
        } else if (session_flags(session)->has_tls) {
                uv_write_t *write_req = (uv_write_t *)ioreq;
-               write_req->data = task;
+               write_req->data = (void *)task->weakptr;
                ret = tls_write(write_req, handle, pkt, &on_write);
        } else if (handle->type == UV_UDP) {
                uv_udp_send_t *send_req = (uv_udp_send_t *)ioreq;
                uv_buf_t buf = { (char *)pkt->wire, pkt->size };
-               send_req->data = task;
+               send_req->data = (void *)task->weakptr;
                ret = uv_udp_send(send_req, (uv_udp_t *)handle, &buf, 1, addr, &on_send);
        } else if (handle->type == UV_TCP) {
                uv_write_t *write_req = (uv_write_t *)ioreq;
@@ -783,7 +825,7 @@ static int qr_task_send(struct qr_task *task, struct session *session,
                        { (char *)&pkt->size + lsbi,  1 },
                        { (char *)pkt->wire, pkt->size },
                };
-               write_req->data = task;
+               write_req->data = (void *)task->weakptr;
                ret = uv_write(write_req, (uv_stream_t *)handle, buf, 3, &on_write);
        } else {
                kr_assert(false);
@@ -792,7 +834,7 @@ static int qr_task_send(struct qr_task *task, struct session *session,
        if (ret == 0) {
                session_touch(session);
                if (session_flags(session)->outgoing) {
-                       session_tasklist_add(session, task);
+                       session_tasklist_add(session, task->weakptr);
                }
                if (the_worker->too_many_open &&
                    the_worker->stats.rconcurrent <
@@ -801,7 +843,6 @@ static int qr_task_send(struct qr_task *task, struct session *session,
                }
        } else {
                free(ioreq);
-               qr_task_unref(task);
                if (ret == UV_EMFILE) {
                        the_worker->too_many_open = true;
                        the_worker->rconcurrent_highwatermark = the_worker->stats.rconcurrent;
@@ -835,15 +876,6 @@ static int qr_task_send(struct qr_task *task, struct session *session,
        return ret;
 }
 
-static struct kr_query *task_get_last_pending_query(struct qr_task *task)
-{
-       if (!task || task->ctx->req.rplan.pending.len == 0) {
-               return NULL;
-       }
-
-       return array_tail(task->ctx->req.rplan.pending);
-}
-
 static int session_tls_hs_cb(struct session *session, int status)
 {
        if (kr_fails_assert(session_flags(session)->outgoing))
@@ -853,9 +885,10 @@ static int session_tls_hs_cb(struct session *session, int status)
        int ret = kr_ok();
 
        if (status) {
-               struct qr_task *task = session_waitinglist_get(session);
-               if (task) {
+               qr_task_weakptr_t taskptr = session_waitinglist_get(session);
+               if (taskptr) {
                        // TLS handshake failed, report it to server selection
+                       struct qr_task *task = weakptr_get(taskptr);
                        struct kr_query *qry = array_tail(task->ctx->req.rplan.pending);
                        qry->server_selection.error(qry, task->transport, KR_SELECTION_TLS_HANDSHAKE_FAILED);
                }
@@ -925,12 +958,15 @@ static int session_tls_hs_cb(struct session *session, int status)
        }
        if (ret == kr_ok()) {
                while (!session_waitinglist_is_empty(session)) {
-                       struct qr_task *t = session_waitinglist_get(session);
-                       ret = qr_task_send(t, session, NULL, NULL);
-                       if (ret != 0) {
-                               break;
+                       qr_task_weakptr_t taskptr = session_waitinglist_get(session);
+                       struct qr_task *t = weakptr_get(taskptr);
+                       if (t) {
+                               ret = qr_task_send(t, session, NULL, NULL);
+                               if (ret != 0) {
+                                       break;
+                               }
                        }
-                       session_waitinglist_pop(session, true);
+                       session_waitinglist_pop(session);
                }
        } else {
                ret = kr_error(EINVAL);
@@ -956,17 +992,20 @@ static int send_waiting(struct session *session)
 {
        int ret = 0;
        while (!session_waitinglist_is_empty(session)) {
-               struct qr_task *t = session_waitinglist_get(session);
-               ret = qr_task_send(t, session, NULL, NULL);
-               if (ret != 0) {
-                       struct sockaddr *peer = session_get_peer(session);
-                       session_waitinglist_finalize(session, KR_STATE_FAIL);
-                       session_tasklist_finalize(session, KR_STATE_FAIL);
-                       worker_del_tcp_connected(peer);
-                       session_close(session);
-                       break;
+               qr_task_weakptr_t taskptr = session_waitinglist_get(session);
+               struct qr_task *t = weakptr_get(taskptr);
+               if (t) {
+                       ret = qr_task_send(t, session, NULL, NULL);
+                       if (ret != 0) {
+                               struct sockaddr *peer = session_get_peer(session);
+                               session_waitinglist_finalize(session, KR_STATE_FAIL);
+                               session_tasklist_finalize(session, KR_STATE_FAIL);
+                               worker_del_tcp_connected(peer);
+                               session_close(session);
+                               break;
+                       }
                }
-               session_waitinglist_pop(session, true);
+               session_waitinglist_pop(session);
        }
        return ret;
 }
@@ -1034,7 +1073,8 @@ static void on_connect(uv_connect_t *req, int status)
                                        peer_str ? peer_str : "", uv_strerror(status));
                }
                worker_del_tcp_waiting(peer);
-               struct qr_task *task = session_waitinglist_get(session);
+               qr_task_weakptr_t taskptr = session_waitinglist_get(session);
+               struct qr_task *task = weakptr_get(taskptr);
                if (task && status != UV_ETIMEDOUT) {
                        /* Penalize upstream.
                        * In case of UV_ETIMEDOUT upstream has been
@@ -1105,7 +1145,8 @@ static void on_tcp_connect_timeout(uv_timer_t *timer)
        struct sockaddr *peer = session_get_peer(session);
        worker_del_tcp_waiting(peer);
 
-       struct qr_task *task = session_waitinglist_get(session);
+       qr_task_weakptr_t taskptr = session_waitinglist_get(session);
+       struct qr_task *task = weakptr_get(taskptr);
        if (!task) {
                /* Normally shouldn't happen. */
                const char *peer_str = kr_straddr(peer);
@@ -1146,7 +1187,8 @@ static void on_udp_timeout(uv_timer_t *timer)
 
        uv_timer_stop(timer);
 
-       struct qr_task *task = session_tasklist_get_first(session);
+       qr_task_weakptr_t taskptr = session_tasklist_get_first(session);
+       struct qr_task *task = weakptr_get(taskptr);
        if (!task)
                return;
 
@@ -1162,44 +1204,39 @@ static void on_udp_timeout(uv_timer_t *timer)
 
 static uv_handle_t *transmit(struct qr_task *task)
 {
-       uv_handle_t *ret = NULL;
 
-       if (task) {
-               struct kr_transport* transport = task->transport;
+       if (!task)
+               return NULL;
+
+       struct kr_transport* transport = task->transport;
+       struct sockaddr_in6 *choice = (struct sockaddr_in6 *)&transport->address;
 
-               struct sockaddr_in6 *choice = (struct sockaddr_in6 *)&transport->address;
+       if (!choice || task->pending_count >= MAX_PENDING)
+               return NULL;
 
-               if (!choice) {
-                       return ret;
-               }
-               if (task->pending_count >= MAX_PENDING) {
-                       return ret;
-               }
-               /* Checkout answer before sending it */
-               struct request_ctx *ctx = task->ctx;
-               if (kr_resolve_checkout(&ctx->req, NULL, transport, task->pktbuf) != 0) {
-                       return ret;
-               }
-               ret = ioreq_spawn(SOCK_DGRAM, choice->sin6_family, false, false);
-               if (!ret) {
-                       return ret;
-               }
-               struct sockaddr *addr = (struct sockaddr *)choice;
-               struct session *session = ret->data;
-               struct sockaddr *peer = session_get_peer(session);
-               kr_assert(peer->sa_family == AF_UNSPEC && session_flags(session)->outgoing);
-               kr_require(addr->sa_family == AF_INET || addr->sa_family == AF_INET6);
-               memcpy(peer, addr, kr_sockaddr_len(addr));
-               if (qr_task_send(task, session, (struct sockaddr *)choice,
-                                task->pktbuf) != 0) {
-                       session_close(session);
-                       ret = NULL;
-               } else {
-                       task->pending[task->pending_count] = session;
-                       task->pending_count += 1;
-                       session_start_read(session); /* Start reading answer */
-               }
+       /* Checkout answer before sending it */
+       struct request_ctx *ctx = task->ctx;
+       if (kr_resolve_checkout(&ctx->req, NULL, transport, task->pktbuf) != 0)
+               return NULL;
+
+       uv_handle_t *ret = ioreq_spawn(SOCK_DGRAM, choice->sin6_family, false, false);
+       if (!ret)
+               return ret;
+
+       struct sockaddr *addr = (struct sockaddr *)choice;
+       struct session *session = ret->data;
+       struct sockaddr *peer = session_get_peer(session);
+       kr_assert(peer->sa_family == AF_UNSPEC && session_flags(session)->outgoing);
+       kr_require(addr->sa_family == AF_INET || addr->sa_family == AF_INET6);
+       memcpy(peer, addr, kr_sockaddr_len(addr));
+       if (qr_task_send(task, session, (struct sockaddr *)choice, task->pktbuf) != 0) {
+               session_close(session);
+               return NULL;
        }
+
+       task->pending[task->pending_count] = session;
+       task->pending_count += 1;
+       session_start_read(session); /* Start reading answer */
        return ret;
 }
 
@@ -1224,7 +1261,11 @@ static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_
        /* Notify waiting tasks. */
        struct kr_query *leader_qry = array_tail(task->ctx->req.rplan.pending);
        for (size_t i = task->waiting.len; i > 0; i--) {
-               struct qr_task *follower = task->waiting.at[i - 1];
+               qr_task_weakptr_t followerptr = task->waiting.at[i - 1];
+               struct qr_task *follower = weakptr_get(followerptr);
+               if (!follower)
+                       continue;
+
                /* Reuse MSGID and 0x20 secret */
                if (follower->ctx->req.rplan.pending.len > 0) {
                        struct kr_query *qry = array_tail(follower->ctx->req.rplan.pending);
@@ -1239,7 +1280,6 @@ static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_
                        leader_qry->secret = 0; /* Next will be already decoded */
                }
                qr_task_step(follower, packet_source, pkt);
-               qr_task_unref(follower);
        }
        task->waiting.len = 0;
        task->leading = false;
@@ -1276,11 +1316,11 @@ static bool subreq_enqueue(struct qr_task *task)
        if (!leader /*ENOMEM*/ || !*leader)
                return false;
        /* Enqueue itself to leader for this subrequest. */
-       int ret = array_push_mm((*leader)->waiting, task,
+
+       int ret = array_push_mm((*leader)->waiting, task->weakptr,
                                kr_memreserve, &(*leader)->ctx->req.pool);
        if (unlikely(ret < 0)) /*ENOMEM*/
                return false;
-       qr_task_ref(task);
        return true;
 }
 
@@ -1306,7 +1346,7 @@ static int xdp_push(struct qr_task *task, const uv_handle_t *src_handle)
        struct request_ctx *ctx = task->ctx;
        xdp_handle_data_t *xhd = src_handle->data;
        if (kr_fails_assert(xhd && xhd->socket && xhd->session == ctx->source.session))
-               return qr_task_on_send(task, src_handle, kr_error(EINVAL));
+               return qr_task_on_send_internal(task, src_handle, kr_error(EINVAL));
 
        knot_xdp_msg_t msg;
 #if KNOT_VERSION_HEX >= 0x030100
@@ -1332,7 +1372,7 @@ static int xdp_push(struct qr_task *task, const uv_handle_t *src_handle)
        uv_idle_start(&xhd->tx_waker, xdp_tx_waker);
        kr_log_debug(XDP, "pushed a packet, ret = %d\n", ret);
 
-       return qr_task_on_send(task, src_handle, ret);
+       return qr_task_on_send_internal(task, src_handle, ret);
 #else
        kr_assert(!EINVAL);
        return kr_error(EINVAL);
@@ -1342,7 +1382,9 @@ static int xdp_push(struct qr_task *task, const uv_handle_t *src_handle)
 static int qr_task_finalize(struct qr_task *task, int state)
 {
        kr_require(task && task->leading == false);
+       qr_task_assert_weakptr(task);
        if (task->finished) {
+               qr_task_free(task);
                return kr_ok();
        }
        struct request_ctx *ctx = task->ctx;
@@ -1351,7 +1393,7 @@ static int qr_task_finalize(struct qr_task *task, int state)
 
        task->finished = true;
        if (source_session == NULL) {
-               (void) qr_task_on_send(task, NULL, kr_error(EIO));
+               (void) qr_task_on_send_internal(task, NULL, kr_error(EIO));
                return state == KR_STATE_DONE ? kr_ok() : kr_error(EIO);
        }
 
@@ -1360,20 +1402,21 @@ static int qr_task_finalize(struct qr_task *task, int state)
                /* For NO_ANSWER, a well-behaved layer should set the state to FAIL */
                kr_assert(!ctx->req.options.NO_ANSWER || (ctx->req.state & KR_STATE_FAIL));
 
-               (void) qr_task_on_send(task, NULL, kr_ok());
+               (void) qr_task_on_send_internal(task, NULL, kr_ok());
                return kr_ok();
        }
 
        if (session_flags(source_session)->closing ||
-           ctx->source.addr.ip.sa_family == AF_UNSPEC)
+           ctx->source.addr.ip.sa_family == AF_UNSPEC) {
+               qr_task_free(task);
                return kr_error(EINVAL);
-
-       /* Reference task as the callback handler can close it */
-       qr_task_ref(task);
+       }
 
        /* Send back answer */
        int ret;
        const uv_handle_t *src_handle = session_get_handle(source_session);
+       qr_task_assert_weakptr(task);
+       qr_task_weakptr_t taskptr = task->weakptr;
        if (kr_fails_assert(src_handle->type == UV_UDP || src_handle->type == UV_TCP
                       || src_handle->type == UV_POLL)) {
                ret = kr_error(EINVAL);
@@ -1382,32 +1425,32 @@ static int qr_task_finalize(struct qr_task *task, int state)
        } else if (src_handle->type == UV_UDP && ENABLE_SENDMMSG) {
                int fd;
                ret = uv_fileno(src_handle, &fd);
-               if (ret == 0)
-                       udp_queue_push(fd, &ctx->req, task);
-               else
+               if (ret == 0) {
+                       udp_queue_push(fd, &ctx->req, taskptr);
+               } else {
                        kr_assert(false);
+               }
        } else {
                ret = qr_task_send(task, source_session, &ctx->source.comm_addr.ip, ctx->req.answer);
        }
 
        if (ret != kr_ok()) {
-               (void) qr_task_on_send(task, NULL, kr_error(EIO));
+               if (worker_task_exists(taskptr)) /* May have been freed by `qr_task_send` above */
+                       (void) qr_task_on_send_internal(task, NULL, kr_error(EIO));
+
                /* Since source session is erroneous detach all tasks. */
                while (!session_tasklist_is_empty(source_session)) {
-                       struct qr_task *t = session_tasklist_del_first(source_session, false);
+                       qr_task_weakptr_t tptr = session_tasklist_del_first(source_session);
+                       struct qr_task *t = weakptr_get(tptr);
+                       if (!t) continue;
+
                        struct request_ctx *c = t->ctx;
                        kr_assert(c->source.session == source_session);
                        c->source.session = NULL;
-                       /* Don't finalize them as there can be other tasks
-                        * waiting for answer to this particular task.
-                        * (ie. task->leading is true) */
-                       worker_task_unref(t);
                }
                session_close(source_session);
        }
 
-       qr_task_unref(task);
-
        if (ret != kr_ok() || state != KR_STATE_DONE)
                return kr_error(EIO);
        return kr_ok();
@@ -1441,7 +1484,7 @@ static int tcp_task_waiting_connection(struct session *session, struct qr_task *
                return kr_error(EINVAL);
        /* Add task to the end of list of waiting tasks.
         * It will be notified in on_connect() or qr_task_on_send(). */
-       int ret = session_waitinglist_push(session, task);
+       int ret = session_waitinglist_push(session, task->weakptr);
        if (ret < 0) {
                return kr_error(EINVAL);
        }
@@ -1561,7 +1604,7 @@ static int tcp_task_make_connection(struct qr_task *task, const struct sockaddr
 
        /* Add task to the end of list of waiting tasks.
         * Will be notified either in on_connect() or in qr_task_on_send(). */
-       ret = session_waitinglist_push(session, task);
+       ret = session_waitinglist_push(session, task->weakptr);
        if (ret < 0) {
                session_timer_stop(session);
                worker_del_tcp_waiting(addr);
@@ -1632,8 +1675,8 @@ static int qr_task_step(struct qr_task *task,
 
        /* Close pending I/O requests */
        subreq_finalize(task, packet_source, packet);
-       if ((kr_now() - worker_task_creation_time(task)) >= KR_RESOLVE_TIME_LIMIT) {
-               struct kr_request *req = worker_task_request(task);
+       if ((kr_now() - task->creation_time) >= KR_RESOLVE_TIME_LIMIT) {
+               struct kr_request *req = &task->ctx->req;
                if (!kr_fails_assert(req))
                        kr_query_inform_timeout(req, req->current_query);
                return qr_task_finalize(task, KR_STATE_FAIL);
@@ -1790,7 +1833,7 @@ int worker_submit(struct session *session, struct io_comm_data *comm,
                        return kr_error(ENOMEM);
                }
 
-               task = qr_task_create(ctx);
+               task = qr_task_create(ctx, NULL);
                if (!task) {
                        request_free(ctx);
                        return kr_error(ENOMEM);
@@ -1801,12 +1844,18 @@ int worker_submit(struct session *session, struct io_comm_data *comm,
                }
        } else { /* response from upstream */
                const uint16_t id = knot_wire_get_id(pkt->wire);
-               task = session_tasklist_del_msgid(session, id);
-               if (task == NULL) {
+               qr_task_weakptr_t taskptr = session_tasklist_del_msgid(session, id);
+               if (taskptr == WEAKPTR_NULL) {
                        VERBOSE_MSG(NULL, "=> ignoring packet with mismatching ID %d\n",
                                        (int)id);
                        return kr_error(ENOENT);
                }
+               task = weakptr_get(taskptr);
+               if (task == NULL) {
+                       VERBOSE_MSG(NULL, "=> ignoring packet for no longer valid task (ID %d)\n",
+                                       (int)id);
+                       return kr_error(ENOENT);
+               }
                if (kr_fails_assert(!session_flags(session)->closing))
                        return kr_error(EINVAL);
                addr = (comm) ? comm->src_addr : NULL;
@@ -1922,9 +1971,11 @@ int worker_end_tcp(struct session *session)
        }
 
        while (!session_waitinglist_is_empty(session)) {
-               struct qr_task *task = session_waitinglist_pop(session, false);
-               kr_assert(task->refs > 1);
-               session_tasklist_del(session, task);
+               qr_task_weakptr_t taskptr = session_waitinglist_pop(session);
+               struct qr_task *task = weakptr_get(taskptr);
+               if (!task) continue;
+
+               session_tasklist_del(session, taskptr);
                if (session_flags(session)->outgoing) {
                        if (task->ctx->req.options.FORWARD) {
                                /* We are in TCP_FORWARD mode.
@@ -1941,10 +1992,12 @@ int worker_end_tcp(struct session *session)
                        kr_assert(task->ctx->source.session == session);
                        task->ctx->source.session = NULL;
                }
-               worker_task_unref(task);
        }
        while (!session_tasklist_is_empty(session)) {
-               struct qr_task *task = session_tasklist_del_first(session, false);
+               qr_task_weakptr_t taskptr = session_tasklist_del_first(session);
+               struct qr_task *task = weakptr_get(taskptr);
+               if (!task) continue;
+
                if (session_flags(session)->outgoing) {
                        if (task->ctx->req.options.FORWARD) {
                                struct kr_request *req = &task->ctx->req;
@@ -1957,7 +2010,6 @@ int worker_end_tcp(struct session *session)
                        kr_assert(task->ctx->source.session == session);
                        task->ctx->source.session = NULL;
                }
-               worker_task_unref(task);
        }
        session_close(session);
        return kr_ok();
@@ -2009,33 +2061,34 @@ knot_pkt_t *worker_resolve_mk_pkt(const char *qname_str, uint16_t qtype, uint16_
        return worker_resolve_mk_pkt_dname(qname, qtype, qclass, options);
 }
 
-struct qr_task *worker_resolve_start(knot_pkt_t *query, struct kr_qflags options)
+qr_task_weakptr_t worker_resolve_start(knot_pkt_t *query, struct kr_qflags options)
 {
        if (kr_fails_assert(the_worker && query))
-               return NULL;
+               return WEAKPTR_NULL;
 
 
        struct request_ctx *ctx = request_create(NULL, NULL, NULL, NULL,
                                                 the_worker->next_request_uid);
        if (!ctx)
-               return NULL;
+               return WEAKPTR_NULL;
 
        /* Create task */
-       struct qr_task *task = qr_task_create(ctx);
+       qr_task_weakptr_t taskptr;
+       struct qr_task *task = qr_task_create(ctx, &taskptr);
        if (!task) {
                request_free(ctx);
-               return NULL;
+               return WEAKPTR_NULL;
        }
 
        /* Start task */
        int ret = request_start(ctx, query);
        if (ret != 0) {
                /* task is attached to request context,
-                * so dereference (and deallocate) it first */
-               ctx->task = NULL;
-               qr_task_unref(task);
+                * so deallocate it first */
+               ctx->taskptr = WEAKPTR_NULL;
+               qr_task_free(task);
                request_free(ctx);
-               return NULL;
+               return WEAKPTR_NULL;
        }
 
        the_worker->next_request_uid += 1;
@@ -2044,68 +2097,70 @@ struct qr_task *worker_resolve_start(knot_pkt_t *query, struct kr_qflags options
 
        /* Set options late, as qr_task_start() -> kr_resolve_begin() rewrite it. */
        kr_qflags_set(&task->ctx->req.options, options);
-       return task;
+       return taskptr;
 }
 
-int worker_resolve_exec(struct qr_task *task, knot_pkt_t *query)
+int worker_resolve_exec(qr_task_weakptr_t taskptr, knot_pkt_t *query)
 {
-       if (!task)
+       if (!taskptr)
                return kr_error(EINVAL);
+       struct qr_task *task = weakptr_get(taskptr);
+       if (!task)
+               return kr_error(ENOENT);
        return qr_task_step(task, NULL, query);
 }
 
-int worker_task_numrefs(const struct qr_task *task)
+bool worker_task_exists(qr_task_weakptr_t taskptr)
 {
-       return task->refs;
+       return weakptr_get(taskptr) != NULL;
 }
 
-struct kr_request *worker_task_request(struct qr_task *task)
+struct kr_request *worker_task_request(qr_task_weakptr_t taskptr)
 {
+       if (!taskptr)
+               return NULL;
+       struct qr_task *task = weakptr_get(taskptr);
        if (!task || !task->ctx)
                return NULL;
 
        return &task->ctx->req;
 }
 
-int worker_task_finalize(struct qr_task *task, int state)
+int worker_task_finalize(qr_task_weakptr_t taskptr, int state)
 {
+       struct qr_task *task = weakptr_get(taskptr);
+       if (!task)
+               return kr_error(ENOENT);
        return qr_task_finalize(task, state);
 }
 
- int worker_task_step(struct qr_task *task, const struct sockaddr *packet_source,
-                     knot_pkt_t *packet)
- {
-        return qr_task_step(task, packet_source, packet);
- }
-
-void worker_task_complete(struct qr_task *task)
-{
-       qr_task_complete(task);
-}
-
-void worker_task_ref(struct qr_task *task)
+int worker_task_step(qr_task_weakptr_t taskptr, const struct sockaddr *packet_source,
+               knot_pkt_t *packet)
 {
-       qr_task_ref(task);
-}
-
-void worker_task_unref(struct qr_task *task)
-{
-       qr_task_unref(task);
+       struct qr_task *task = weakptr_get(taskptr);
+       if (!task)
+               return kr_error(ENOENT);
+       return qr_task_step(task, packet_source, packet);
 }
 
-void worker_task_timeout_inc(struct qr_task *task)
+void worker_task_complete(qr_task_weakptr_t taskptr)
 {
-       task->timeouts += 1;
+       struct qr_task *task = weakptr_get(taskptr);
+       if (task)
+               qr_task_complete(task);
 }
 
-knot_pkt_t *worker_task_get_pktbuf(const struct qr_task *task)
+void worker_task_timeout_inc(qr_task_weakptr_t taskptr)
 {
-       return task->pktbuf;
+       struct qr_task *task = weakptr_get(taskptr);
+       if (task)
+               task->timeouts += 1;
 }
 
-struct request_ctx *worker_task_get_request(struct qr_task *task)
+knot_pkt_t *worker_task_get_pktbuf(qr_task_weakptr_t taskptr)
 {
-       return task->ctx;
+       struct qr_task *task = weakptr_get(taskptr);
+       return (task) ? task->pktbuf : NULL;
 }
 
 struct session *worker_request_get_source_session(const struct kr_request *req)
@@ -2115,34 +2170,27 @@ struct session *worker_request_get_source_session(const struct kr_request *req)
        return ((struct request_ctx *)req)->source.session;
 }
 
-uint16_t worker_task_pkt_get_msgid(struct qr_task *task)
-{
-       knot_pkt_t *pktbuf = worker_task_get_pktbuf(task);
-       uint16_t msg_id = knot_wire_get_id(pktbuf->wire);
-       return msg_id;
-}
-
-void worker_task_pkt_set_msgid(struct qr_task *task, uint16_t msgid)
-{
-       knot_pkt_t *pktbuf = worker_task_get_pktbuf(task);
-       knot_wire_set_id(pktbuf->wire, msgid);
-       struct kr_query *q = task_get_last_pending_query(task);
-       q->id = msgid;
-}
-
-uint64_t worker_task_creation_time(struct qr_task *task)
+uint64_t worker_task_creation_time(qr_task_weakptr_t taskptr)
 {
-       return task->creation_time;
+       struct qr_task *task = weakptr_get(taskptr);
+       return (task) ? task->creation_time : UINT64_MAX;
 }
 
-void worker_task_subreq_finalize(struct qr_task *task)
+void worker_task_subreq_finalize(qr_task_weakptr_t taskptr)
 {
-       subreq_finalize(task, NULL, NULL);
+       struct qr_task *task = weakptr_get(taskptr);
+       if (task)
+               subreq_finalize(task, NULL, NULL);
 }
 
-bool worker_task_finished(struct qr_task *task)
+bool worker_task_finished(qr_task_weakptr_t taskptr)
 {
-       return task->finished;
+       struct qr_task *task = weakptr_get(taskptr);
+       return (task)
+               ? task->finished
+               : true; /* When the weak pointer is not valid, the task has most
+                          probably finished. May also be a corruption, but
+                          that's not very likely, I think. */
 }
 
 /** Reserve worker buffers.  We assume worker's been zeroed. */
@@ -2196,7 +2244,6 @@ int worker_init(void)
 {
        if (kr_fails_assert(the_worker == NULL))
                return kr_error(EINVAL);
-       kr_bindings_register(the_engine->L); // TODO move
 
        /* Create main worker. */
        the_worker = &the_worker_value;
index 050db0665bc58e6ac695a22d0f2c373ed6497bbe..e077243f6cd93001d695a0986fb36515f8b1c406 100644 (file)
@@ -7,12 +7,82 @@
 #include "daemon/engine.h"
 #include "lib/generic/array.h"
 #include "lib/generic/trie.h"
+#include "lib/weakptr.h"
 
 
-/** Query resolution task (opaque). */
-struct qr_task;
-/** Worker state (opaque). */
-struct worker_ctx;
+/** Query resolution task weak pointer. */
+typedef weakptr_t qr_task_weakptr_t;
+
+/** Number of request within timeout window. */
+#define MAX_PENDING 4
+
+/** Maximum response time from TCP upstream, milliseconds */
+#define MAX_TCP_INACTIVITY (KR_RESOLVE_TIME_LIMIT + KR_CONN_RTT_MAX)
+
+#ifndef RECVMMSG_BATCH /* see check_bufsize() */
+#define RECVMMSG_BATCH 1
+#endif
+
+/** Various worker statistics.  Sync with wrk_stats() */
+struct worker_stats {
+       size_t queries;     /**< Total number of requests (from clients and internal ones). */
+       size_t concurrent;  /**< The number of requests currently in processing. */
+       size_t rconcurrent; /*< TODO: remove?  I see no meaningful difference from .concurrent. */
+       size_t dropped;     /**< The number of requests dropped due to being badly formed.  See #471. */
+
+       size_t timeout; /**< Number of outbound queries that timed out. */
+       size_t udp;  /**< Number of outbound queries over UDP. */
+       size_t tcp;  /**< Number of outbound queries over TCP (excluding TLS). */
+       size_t tls;  /**< Number of outbound queries over TLS. */
+       size_t ipv4; /**< Number of outbound queries over IPv4.*/
+       size_t ipv6; /**< Number of outbound queries over IPv6. */
+
+       size_t err_udp;  /**< Total number of write errors for UDP transport. */
+       size_t err_tcp;  /**< Total number of write errors for TCP transport. */
+       size_t err_tls;  /**< Total number of write errors for TLS transport. */
+       size_t err_http;  /**< Total number of write errors for HTTP(S) transport. */
+};
+
+/** Freelist of available mempools. */
+typedef array_t(struct mempool *) mp_freelist_t;
+
+/** List of query resolution tasks. */
+typedef array_t(qr_task_weakptr_t) qr_tasklist_t;
+
+/** List of HTTP header names. */
+typedef array_t(const char *) doh_headerlist_t;
+
+/** \details Worker state is meant to persist during the whole life of daemon. */
+struct worker_ctx {
+       uv_loop_t *loop;
+       int count;  /** unreliable, does not count systemd instance, do not use */
+       int vars_table_ref;
+       unsigned tcp_pipeline_max;
+
+       /** Addresses to bind for outgoing connections or AF_UNSPEC. */
+       struct sockaddr_in out_addr4;
+       struct sockaddr_in6 out_addr6;
+
+       uint8_t wire_buf[RECVMMSG_BATCH * KNOT_WIRE_MAX_PKTSIZE];
+
+       struct worker_stats stats;
+
+       bool too_many_open;
+       size_t rconcurrent_highwatermark;
+       /** List of active outbound TCP sessions */
+       trie_t *tcp_connected;
+       /** List of outbound TCP sessions waiting to be accepted */
+       trie_t *tcp_waiting;
+       /** Subrequest leaders (qr_task_weakptr_t), indexed by qname+qtype+qclass. */
+       trie_t *subreq_out;
+       mp_freelist_t pool_mp;
+       knot_mm_t pkt_pool;
+       unsigned int next_request_uid;
+
+       /* HTTP Headers for DoH. */
+       doh_headerlist_t doh_qry_headers;
+};
+
 /** Transport session (opaque). */
 struct session;
 /** Zone import context (opaque). */
@@ -59,132 +129,48 @@ worker_resolve_mk_pkt(const char *qname_str, uint16_t qtype, uint16_t qclass,
                        const struct kr_qflags *options);
 
 /**
- * Start query resolution with given query.
+ * Start query resolution with given query. Creates a new query resolution task,
+ * to which it returns a managed weak pointer.
  *
- * @return task or NULL
+ * @return task weak pointer or WEAKPTR_NULL
  */
-KR_EXPORT struct qr_task *
-worker_resolve_start(knot_pkt_t *query, struct kr_qflags options);
+KR_EXPORT
+qr_task_weakptr_t worker_resolve_start(knot_pkt_t *query, struct kr_qflags options);
 
 /**
  * Execute a request with given query.
- * It expects task to be created with \fn worker_resolve_start.
+ * It expects `taskptr` to be created with \fn worker_resolve_start.
  *
  * @return 0 or an error code
  */
-KR_EXPORT int worker_resolve_exec(struct qr_task *task, knot_pkt_t *query);
+KR_EXPORT int worker_resolve_exec(qr_task_weakptr_t taskptr, knot_pkt_t *query);
+
+/** Checks whether the task associated with the specified weak pointer still
+ * exists, or if it has been freed. It is safe to pass WEAKPTR_NULL to this
+ * function. */
+bool worker_task_exists(qr_task_weakptr_t taskptr);
 
 /** @return struct kr_request associated with opaque task */
-struct kr_request *worker_task_request(struct qr_task *task);
+KR_EXPORT struct kr_request *worker_task_request(qr_task_weakptr_t taskptr);
 
-int worker_task_step(struct qr_task *task, const struct sockaddr *packet_source,
+int worker_task_step(qr_task_weakptr_t taskptr, const struct sockaddr *packet_source,
                     knot_pkt_t *packet);
 
-int worker_task_numrefs(const struct qr_task *task);
-
 /** Finalize given task */
-int worker_task_finalize(struct qr_task *task, int state);
-
-void worker_task_complete(struct qr_task *task);
-
-void worker_task_ref(struct qr_task *task);
+int worker_task_finalize(qr_task_weakptr_t taskptr, int state);
 
-void worker_task_unref(struct qr_task *task);
+void worker_task_timeout_inc(qr_task_weakptr_t taskptr);
 
-void worker_task_timeout_inc(struct qr_task *task);
-
-int worker_add_tcp_connected(const struct sockaddr *addr, struct session *session);
 int worker_del_tcp_connected(const struct sockaddr *addr);
 int worker_del_tcp_waiting(const struct sockaddr* addr);
-struct session* worker_find_tcp_waiting(const struct sockaddr* addr);
-struct session* worker_find_tcp_connected(const struct sockaddr* addr);
-knot_pkt_t *worker_task_get_pktbuf(const struct qr_task *task);
-
-struct request_ctx *worker_task_get_request(struct qr_task *task);
+knot_pkt_t *worker_task_get_pktbuf(qr_task_weakptr_t taskptr);
 
 /** Note: source session is NULL in case the request hasn't come over network. */
 KR_EXPORT struct session *worker_request_get_source_session(const struct kr_request *req);
 
-uint16_t worker_task_pkt_get_msgid(struct qr_task *task);
-void worker_task_pkt_set_msgid(struct qr_task *task, uint16_t msgid);
-uint64_t worker_task_creation_time(struct qr_task *task);
-void worker_task_subreq_finalize(struct qr_task *task);
-bool worker_task_finished(struct qr_task *task);
+uint64_t worker_task_creation_time(qr_task_weakptr_t taskptr);
+void worker_task_subreq_finalize(qr_task_weakptr_t taskptr);
+bool worker_task_finished(qr_task_weakptr_t taskptr);
 
 /** To be called after sending a DNS message.  It mainly deals with cleanups. */
-int qr_task_on_send(struct qr_task *task, const uv_handle_t *handle, int status);
-
-/** Various worker statistics.  Sync with wrk_stats() */
-struct worker_stats {
-       size_t queries;     /**< Total number of requests (from clients and internal ones). */
-       size_t concurrent;  /**< The number of requests currently in processing. */
-       size_t rconcurrent; /*< TODO: remove?  I see no meaningful difference from .concurrent. */
-       size_t dropped;     /**< The number of requests dropped due to being badly formed.  See #471. */
-
-       size_t timeout; /**< Number of outbound queries that timed out. */
-       size_t udp;  /**< Number of outbound queries over UDP. */
-       size_t tcp;  /**< Number of outbound queries over TCP (excluding TLS). */
-       size_t tls;  /**< Number of outbound queries over TLS. */
-       size_t ipv4; /**< Number of outbound queries over IPv4.*/
-       size_t ipv6; /**< Number of outbound queries over IPv6. */
-
-       size_t err_udp;  /**< Total number of write errors for UDP transport. */
-       size_t err_tcp;  /**< Total number of write errors for TCP transport. */
-       size_t err_tls;  /**< Total number of write errors for TLS transport. */
-       size_t err_http;  /**< Total number of write errors for HTTP(S) transport. */
-};
-
-/** @cond internal */
-
-/** Number of request within timeout window. */
-#define MAX_PENDING 4
-
-/** Maximum response time from TCP upstream, milliseconds */
-#define MAX_TCP_INACTIVITY (KR_RESOLVE_TIME_LIMIT + KR_CONN_RTT_MAX)
-
-#ifndef RECVMMSG_BATCH /* see check_bufsize() */
-#define RECVMMSG_BATCH 1
-#endif
-
-/** Freelist of available mempools. */
-typedef array_t(struct mempool *) mp_freelist_t;
-
-/** List of query resolution tasks. */
-typedef array_t(struct qr_task *) qr_tasklist_t;
-
-/** List of HTTP header names. */
-typedef array_t(const char *) doh_headerlist_t;
-
-/** \details Worker state is meant to persist during the whole life of daemon. */
-struct worker_ctx {
-       uv_loop_t *loop;
-       int count;  /** unreliable, does not count systemd instance, do not use */
-       int vars_table_ref;
-       unsigned tcp_pipeline_max;
-
-       /** Addresses to bind for outgoing connections or AF_UNSPEC. */
-       struct sockaddr_in out_addr4;
-       struct sockaddr_in6 out_addr6;
-
-       uint8_t wire_buf[RECVMMSG_BATCH * KNOT_WIRE_MAX_PKTSIZE];
-
-       struct worker_stats stats;
-
-       bool too_many_open;
-       size_t rconcurrent_highwatermark;
-       /** List of active outbound TCP sessions */
-       trie_t *tcp_connected;
-       /** List of outbound TCP sessions waiting to be accepted */
-       trie_t *tcp_waiting;
-       /** Subrequest leaders (struct qr_task*), indexed by qname+qtype+qclass. */
-       trie_t *subreq_out;
-       mp_freelist_t pool_mp;
-       knot_mm_t pkt_pool;
-       unsigned int next_request_uid;
-
-       /* HTTP Headers for DoH. */
-       doh_headerlist_t doh_qry_headers;
-};
-
-/** @endcond */
-
+int qr_task_on_send(qr_task_weakptr_t task, const uv_handle_t *handle, int status);
index 92dabab98745156d3ff959dcedea340a4175afa2..2cf1716042a9ef815d0d52962fee76f48abdc7e6 100644 (file)
--- a/lib/log.c
+++ b/lib/log.c
@@ -78,6 +78,7 @@ const log_group_names_t log_group_names[] = {
        GRP_NAME_ITEM(LOG_GRP_DEVEL),
        GRP_NAME_ITEM(LOG_GRP_RENUMBER),
        GRP_NAME_ITEM(LOG_GRP_EDE),
+       GRP_NAME_ITEM(LOG_GRP_WEAKPTR),
        GRP_NAME_ITEM(LOG_GRP_REQDBG),
        { NULL, LOG_GRP_UNKNOWN },
 };
index 2f8a2b0ab22348e2d7ad1750567a27e6a14d0560..12330292983c66bb02cbd38d4fe12001886546ac 100644 (file)
--- a/lib/log.h
+++ b/lib/log.h
@@ -78,6 +78,7 @@ enum kr_log_group {
        LOG_GRP_DEVEL,
        LOG_GRP_RENUMBER,
        LOG_GRP_EDE,
+       LOG_GRP_WEAKPTR,
        /* ^^ Add new log groups above ^^. */
        LOG_GRP_REQDBG, /* Must be first non-displayed entry in enum! */
 };
@@ -130,6 +131,7 @@ enum kr_log_group {
 #define LOG_GRP_DEVEL_TAG              "devel"         /**< ``devel``: for development purposes */
 #define LOG_GRP_RENUMBER_TAG           "renum"         /**< ``renum``: operation related to renumber */
 #define LOG_GRP_EDE_TAG                        "exterr"        /**< ``exterr``: extended error module */
+#define LOG_GRP_WEAKPTR_TAG            "weakptr"       /**< ``weakptr``: weak pointer manager */
 #define LOG_GRP_REQDBG_TAG             "reqdbg"        /**< ``reqdbg``: debug logs enabled by policy actions */
 ///@}
 
index ec11da9f52cc840fda08bd880298644a8d70e862..fae4eb3d1657ecd15e7783c24831ca7e3adfd28e 100644 (file)
@@ -30,6 +30,7 @@ libkres_src = files([
   'selection_forward.c',
   'selection_iter.c',
   'utils.c',
+  'weakptr.c',
   'zonecut.c',
 ])
 c_src_lint += libkres_src
@@ -60,6 +61,7 @@ libkres_headers = files([
   'selection_forward.h',
   'selection_iter.h',
   'utils.h',
+  'weakptr.h',
   'zonecut.h',
 ])
 
diff --git a/lib/weakptr.c b/lib/weakptr.c
new file mode 100644 (file)
index 0000000..7a58619
--- /dev/null
@@ -0,0 +1,120 @@
+/*  Copyright (C) 2015-2022 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+ *  SPDX-License-Identifier: GPL-3.0-or-later
+ */
+
+#include "contrib/mempattern.h"
+#include "lib/generic/trie.h"
+
+#include "lib/weakptr.h"
+
+struct weakptr_manager {
+       trie_t *weak_to_mem; /**< Key: weakptr_t;
+                             *   Value: Pointer to managed memory */
+       weakptr_t next_val;  /**< Value of the next created weakptr_t */
+};
+
+static struct weakptr_manager the_manager_val = {0};
+static struct weakptr_manager *the_manager = NULL;
+
+static inline weakptr_t weakptr_next()
+{
+       weakptr_t ptr = the_manager->next_val++;
+       if (the_manager->next_val == WEAKPTR_NULL)
+               the_manager->next_val++;
+       return ptr;
+}
+
+int weakptr_manager_init()
+{
+       kr_assert(!the_manager);
+       the_manager = &the_manager_val;
+
+       the_manager->next_val = 1;
+       the_manager->weak_to_mem = trie_create(NULL);
+       kr_require(the_manager->weak_to_mem);
+
+       return kr_ok();
+}
+
+void weakptr_manager_deinit()
+{
+       kr_assert(the_manager);
+       size_t leaked = trie_weight(the_manager->weak_to_mem);
+       if (leaked) {
+               kr_log_debug(WEAKPTR, "Leaked %zu weak pointers!\n", leaked);
+               /* TODO: Add an assertion here when proper session cleanup
+                *       is implemented. Without it, these messages are pretty
+                *       much useless false-positives. */
+       }
+       trie_free(the_manager->weak_to_mem);
+       the_manager = NULL;
+}
+
+weakptr_t weakptr_mm_alloc(knot_mm_t *mm, size_t size, void **naked_ptr)
+{
+       bool rolled_over = false;
+       weakptr_t initial_ptr = weakptr_next();
+       weakptr_t ptr = initial_ptr;
+       trie_val_t *val = NULL;
+
+       do {
+               /* Create reference to managed memory */
+               val = trie_get_ins(the_manager->weak_to_mem,
+                               (const char *)&ptr, sizeof(weakptr_t));
+               if (!val)
+                       goto exit_err;
+
+               if (*val) {
+                       /* The pointer already exists, find a new one */
+
+                       if (ptr == initial_ptr && rolled_over) {
+                               /* Ran out of weak-pointer space */
+                               return WEAKPTR_NULL;
+                       }
+
+                       rolled_over = true;
+                       ptr = weakptr_next();
+                       continue;
+               }
+
+               /* Allocate the managed memory */
+               *val = mm_alloc(mm, size);
+               if (!*val)
+                       goto exit_err;
+
+               *naked_ptr = *val;
+               return ptr;
+       } while (true);
+
+exit_err:
+       if (val) {
+               trie_del(the_manager->weak_to_mem,
+                               (const char *)&ptr, sizeof(weakptr_t), NULL);
+       }
+       return WEAKPTR_NULL;
+}
+
+void weakptr_mm_free(knot_mm_t *mm, weakptr_t ptr)
+{
+       if (!ptr)
+               return;
+
+       /* Delete reference to managed memory */
+       trie_val_t val;
+       int ret = trie_del(the_manager->weak_to_mem,
+                       (const char *)&ptr, sizeof(weakptr_t), &val);
+       if (kr_fails_assert(ret == KNOT_EOK))
+               return;
+
+       mm_free(mm, val);
+}
+
+void *weakptr_get(weakptr_t ptr)
+{
+       if (ptr == WEAKPTR_NULL)
+               return NULL;
+
+       trie_val_t *val = trie_get_try(the_manager->weak_to_mem,
+                       (const char *)&ptr, sizeof(weakptr_t));
+       return (val) ? *val : NULL;
+}
diff --git a/lib/weakptr.h b/lib/weakptr.h
new file mode 100644 (file)
index 0000000..bf05df5
--- /dev/null
@@ -0,0 +1,76 @@
+/*  Copyright (C) 2015-2022 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+ *  SPDX-License-Identifier: GPL-3.0-or-later
+ */
+
+#pragma once
+
+#include <stdint.h>
+#include <stdlib.h>
+
+#include <libknot/mm_ctx.h>
+#include "lib/defines.h"
+#include "lib/utils.h"
+
+/** A managed weak pointer to dynamic memory. Its actual pointer value may
+ * be retrieved using `weakptr_get()`.
+ *
+ * Like a normal pointer, a weak pointer may have a special value
+ * of 0 (WEAKPTR_NULL), indicating that the pointer represents no valid memory.
+ *
+ * For readability, it is encouraged to create own `typedef` declarations
+ * for weak pointers to particular data types. Consider `weakptr_t`
+ * an equivalent of `void *`. */
+typedef unsigned long weakptr_t;
+
+/** Special value for `weakptr_t` indicating that it represents no valid memory. */
+#define WEAKPTR_NULL ((weakptr_t) 0)
+
+/** Initializes the weak pointer manager structure. Must be deinitialized
+ * using `weakptr_manager_deinit()`. */
+KR_EXPORT
+int weakptr_manager_init();
+
+/** Deinitializes the weak pointer manager structure. */
+KR_EXPORT
+void weakptr_manager_deinit();
+
+/** Allocates weak-pointer-managed dynamic memory of `size` using the memory
+ * pool `mm`. `mm` may be `NULL`, `malloc()` will then be used instead of a memory
+ * pool.
+ *
+ * If `naked_ptr` is not `NULL`, its target is assigned the naked (non-weak)
+ * pointer to the newly allocated memory on success. On error, `*naked_ptr`
+ * remains unchanged.
+ *
+ * Returns `WEAKPTR_NULL` on error. */
+KR_EXPORT
+weakptr_t weakptr_mm_alloc(knot_mm_t *mm, size_t size, void **naked_ptr);
+
+/** Allocates weak-pointer-managed dynamic memory of `size` using `malloc()`.
+ *
+ * If `naked_ptr` is not `NULL`, its target is assigned the naked (non-weak)
+ * pointer to the newly allocated memory on success. On error, `*naked_ptr`
+ * remains unchanged.
+ *
+ * Returns `WEAKPTR_NULL` on error. */
+static inline weakptr_t weakptr_malloc(size_t size, void **naked_ptr)
+{
+       return weakptr_mm_alloc(NULL, size, naked_ptr);
+}
+
+/** Frees the specified `ptr` using the memory pool `mm`. `mm` may be `NULL`,
+ * `free()` will then be used instead of a memory pool. */
+KR_EXPORT
+void weakptr_mm_free(knot_mm_t *mm, weakptr_t ptr);
+
+/** Frees the specified `ptr` using `free()`. If `ptr` is `WEAKPTR_NULL`,
+ * no operation is performed. */
+static inline void weakptr_free(weakptr_t ptr)
+{
+       weakptr_mm_free(NULL, ptr);
+}
+
+/** Retrieves the value of the `ptr`. Returns `NULL` if the memory pointed to
+ * by `ptr` has already been freed, or if `ptr` itself is `WEAKPTR_NULL`. */
+KR_EXPORT
+void *weakptr_get(weakptr_t ptr);