/* Normally it should not happen,
* but better to check if there anything in this list. */
while (!session_waitinglist_is_empty(s)) {
- struct qr_task *t = session_waitinglist_pop(s, false);
+ qr_task_weakptr_t t = session_waitinglist_pop(s);
worker_task_finalize(t, KR_STATE_FAIL);
- worker_task_unref(t);
the_worker->stats.timeout += 1;
if (session_flags(s)->closing) {
return;
struct local_state *local_state;
};
typedef int kr_log_level_t;
-enum kr_log_group {LOG_GRP_UNKNOWN = -1, LOG_GRP_SYSTEM = 1, LOG_GRP_CACHE, LOG_GRP_IO, LOG_GRP_NETWORK, LOG_GRP_TA, LOG_GRP_TLS, LOG_GRP_GNUTLS, LOG_GRP_TLSCLIENT, LOG_GRP_XDP, LOG_GRP_DOH, LOG_GRP_DNSSEC, LOG_GRP_HINT, LOG_GRP_PLAN, LOG_GRP_ITERATOR, LOG_GRP_VALIDATOR, LOG_GRP_RESOLVER, LOG_GRP_SELECTION, LOG_GRP_ZCUT, LOG_GRP_COOKIES, LOG_GRP_STATISTICS, LOG_GRP_REBIND, LOG_GRP_WORKER, LOG_GRP_POLICY, LOG_GRP_TASENTINEL, LOG_GRP_TASIGNALING, LOG_GRP_TAUPDATE, LOG_GRP_DAF, LOG_GRP_DETECTTIMEJUMP, LOG_GRP_DETECTTIMESKEW, LOG_GRP_GRAPHITE, LOG_GRP_PREFILL, LOG_GRP_PRIMING, LOG_GRP_SRVSTALE, LOG_GRP_WATCHDOG, LOG_GRP_NSID, LOG_GRP_DNSTAP, LOG_GRP_TESTS, LOG_GRP_DOTAUTH, LOG_GRP_HTTP, LOG_GRP_CONTROL, LOG_GRP_MODULE, LOG_GRP_DEVEL, LOG_GRP_RENUMBER, LOG_GRP_EDE, LOG_GRP_REQDBG};
+enum kr_log_group {LOG_GRP_UNKNOWN = -1, LOG_GRP_SYSTEM = 1, LOG_GRP_CACHE, LOG_GRP_IO, LOG_GRP_NETWORK, LOG_GRP_TA, LOG_GRP_TLS, LOG_GRP_GNUTLS, LOG_GRP_TLSCLIENT, LOG_GRP_XDP, LOG_GRP_DOH, LOG_GRP_DNSSEC, LOG_GRP_HINT, LOG_GRP_PLAN, LOG_GRP_ITERATOR, LOG_GRP_VALIDATOR, LOG_GRP_RESOLVER, LOG_GRP_SELECTION, LOG_GRP_ZCUT, LOG_GRP_COOKIES, LOG_GRP_STATISTICS, LOG_GRP_REBIND, LOG_GRP_WORKER, LOG_GRP_POLICY, LOG_GRP_TASENTINEL, LOG_GRP_TASIGNALING, LOG_GRP_TAUPDATE, LOG_GRP_DAF, LOG_GRP_DETECTTIMEJUMP, LOG_GRP_DETECTTIMESKEW, LOG_GRP_GRAPHITE, LOG_GRP_PREFILL, LOG_GRP_PRIMING, LOG_GRP_SRVSTALE, LOG_GRP_WATCHDOG, LOG_GRP_NSID, LOG_GRP_DNSTAP, LOG_GRP_TESTS, LOG_GRP_DOTAUTH, LOG_GRP_HTTP, LOG_GRP_CONTROL, LOG_GRP_MODULE, LOG_GRP_DEVEL, LOG_GRP_RENUMBER, LOG_GRP_EDE, LOG_GRP_WEAKPTR, LOG_GRP_REQDBG};
+typedef unsigned long weakptr_t;
kr_layer_t kr_layer_t_static;
_Bool kr_dbg_assertion_abort;
int kr_cache_remove_subtree(struct kr_cache *, const knot_dname_t *, _Bool, int);
int kr_cache_commit(struct kr_cache *);
uint32_t packet_ttl(const knot_pkt_t *, _Bool);
+typedef unsigned long qr_task_weakptr_t;
typedef struct {
int sock_type;
_Bool tls;
};
struct request_ctx {
struct kr_request req;
- struct qr_task *task;
+ qr_task_weakptr_t taskptr;
/* beware: hidden stub, to avoid hardcoding sockaddr lengths */
};
-struct qr_task {
- struct request_ctx *ctx;
- /* beware: hidden stub, to avoid qr_tasklist_t */
-};
-int worker_resolve_exec(struct qr_task *, knot_pkt_t *);
+int worker_resolve_exec(qr_task_weakptr_t, knot_pkt_t *);
+struct kr_request *worker_task_request(qr_task_weakptr_t);
knot_pkt_t *worker_resolve_mk_pkt(const char *, uint16_t, uint16_t, const struct kr_qflags *);
-struct qr_task *worker_resolve_start(knot_pkt_t *, struct kr_qflags);
+qr_task_weakptr_t worker_resolve_start(knot_pkt_t *, struct kr_qflags);
int zi_zone_import(const zi_config_t);
struct engine {
char _stub[];
struct local_state *local_state;
};
typedef int kr_log_level_t;
-enum kr_log_group {LOG_GRP_UNKNOWN = -1, LOG_GRP_SYSTEM = 1, LOG_GRP_CACHE, LOG_GRP_IO, LOG_GRP_NETWORK, LOG_GRP_TA, LOG_GRP_TLS, LOG_GRP_GNUTLS, LOG_GRP_TLSCLIENT, LOG_GRP_XDP, LOG_GRP_DOH, LOG_GRP_DNSSEC, LOG_GRP_HINT, LOG_GRP_PLAN, LOG_GRP_ITERATOR, LOG_GRP_VALIDATOR, LOG_GRP_RESOLVER, LOG_GRP_SELECTION, LOG_GRP_ZCUT, LOG_GRP_COOKIES, LOG_GRP_STATISTICS, LOG_GRP_REBIND, LOG_GRP_WORKER, LOG_GRP_POLICY, LOG_GRP_TASENTINEL, LOG_GRP_TASIGNALING, LOG_GRP_TAUPDATE, LOG_GRP_DAF, LOG_GRP_DETECTTIMEJUMP, LOG_GRP_DETECTTIMESKEW, LOG_GRP_GRAPHITE, LOG_GRP_PREFILL, LOG_GRP_PRIMING, LOG_GRP_SRVSTALE, LOG_GRP_WATCHDOG, LOG_GRP_NSID, LOG_GRP_DNSTAP, LOG_GRP_TESTS, LOG_GRP_DOTAUTH, LOG_GRP_HTTP, LOG_GRP_CONTROL, LOG_GRP_MODULE, LOG_GRP_DEVEL, LOG_GRP_RENUMBER, LOG_GRP_EDE, LOG_GRP_REQDBG};
+enum kr_log_group {LOG_GRP_UNKNOWN = -1, LOG_GRP_SYSTEM = 1, LOG_GRP_CACHE, LOG_GRP_IO, LOG_GRP_NETWORK, LOG_GRP_TA, LOG_GRP_TLS, LOG_GRP_GNUTLS, LOG_GRP_TLSCLIENT, LOG_GRP_XDP, LOG_GRP_DOH, LOG_GRP_DNSSEC, LOG_GRP_HINT, LOG_GRP_PLAN, LOG_GRP_ITERATOR, LOG_GRP_VALIDATOR, LOG_GRP_RESOLVER, LOG_GRP_SELECTION, LOG_GRP_ZCUT, LOG_GRP_COOKIES, LOG_GRP_STATISTICS, LOG_GRP_REBIND, LOG_GRP_WORKER, LOG_GRP_POLICY, LOG_GRP_TASENTINEL, LOG_GRP_TASIGNALING, LOG_GRP_TAUPDATE, LOG_GRP_DAF, LOG_GRP_DETECTTIMEJUMP, LOG_GRP_DETECTTIMESKEW, LOG_GRP_GRAPHITE, LOG_GRP_PREFILL, LOG_GRP_PRIMING, LOG_GRP_SRVSTALE, LOG_GRP_WATCHDOG, LOG_GRP_NSID, LOG_GRP_DNSTAP, LOG_GRP_TESTS, LOG_GRP_DOTAUTH, LOG_GRP_HTTP, LOG_GRP_CONTROL, LOG_GRP_MODULE, LOG_GRP_DEVEL, LOG_GRP_RENUMBER, LOG_GRP_EDE, LOG_GRP_WEAKPTR, LOG_GRP_REQDBG};
+typedef unsigned long weakptr_t;
kr_layer_t kr_layer_t_static;
_Bool kr_dbg_assertion_abort;
int kr_cache_remove_subtree(struct kr_cache *, const knot_dname_t *, _Bool, int);
int kr_cache_commit(struct kr_cache *);
uint32_t packet_ttl(const knot_pkt_t *, _Bool);
+typedef unsigned long qr_task_weakptr_t;
typedef struct {
int sock_type;
_Bool tls;
};
struct request_ctx {
struct kr_request req;
- struct qr_task *task;
+ qr_task_weakptr_t taskptr;
/* beware: hidden stub, to avoid hardcoding sockaddr lengths */
};
-struct qr_task {
- struct request_ctx *ctx;
- /* beware: hidden stub, to avoid qr_tasklist_t */
-};
-int worker_resolve_exec(struct qr_task *, knot_pkt_t *);
+int worker_resolve_exec(qr_task_weakptr_t, knot_pkt_t *);
+struct kr_request *worker_task_request(qr_task_weakptr_t);
knot_pkt_t *worker_resolve_mk_pkt(const char *, uint16_t, uint16_t, const struct kr_qflags *);
-struct qr_task *worker_resolve_start(knot_pkt_t *, struct kr_qflags);
+qr_task_weakptr_t worker_resolve_start(knot_pkt_t *, struct kr_qflags);
int zi_zone_import(const zi_config_t);
struct engine {
char _stub[];
struct kr_server_selection
kr_log_level_t
enum kr_log_group
+ # lib/weakptr.h
+ typedef weakptr_t
EOF
# static variables; these lines might not be simple to generate
## kresd itself: worker stuff
${CDEFS} ${KRESD} types <<-EOF
+ typedef qr_task_weakptr_t
endpoint_flags_t
# struct args is a bit complex
addr_array_t
echo "struct request_ctx" | ${CDEFS} ${KRESD} types | sed '/struct {/,$ d'
printf "\t/* beware: hidden stub, to avoid hardcoding sockaddr lengths */\n};\n"
-echo "struct qr_task" | ${CDEFS} ${KRESD} types | sed '/pktbuf/,$ d'
-printf "\t/* beware: hidden stub, to avoid qr_tasklist_t */\n};\n"
-
${CDEFS} ${KRESD} functions <<-EOF
worker_resolve_exec
+ worker_task_request
worker_resolve_mk_pkt
worker_resolve_start
zi_zone_import
local task = ffi.C.worker_resolve_start(pkt, options)
-- Deal with finish and init callbacks
+ local request = ffi.C.worker_task_request(task)
+ assert(request ~= nil, 'request was nil for a just-created task')
+
if finish ~= nil then
local finish_cb
finish_cb = ffi.cast('trace_callback_f',
finish(req.answer, req)
finish_cb:free()
end)
- task.ctx.req.trace_finish = finish_cb
+ if request ~= nil then
+ request.trace_finish = finish_cb
+ else
+ finish_cb(nil)
+ end
end
- if init ~= nil then
- init(task.ctx.req)
+ if init ~= nil and request ~= nil then
+ init(request)
end
return ffi.C.worker_resolve_exec(task, pkt) == 0
#include "kresconfig.h"
#include "contrib/ucw/mempool.h"
+#include "daemon/bindings/api.h"
#include "daemon/engine.h"
#include "daemon/io.h"
#include "daemon/network.h"
#include "lib/defines.h"
#include "lib/dnssec.h"
#include "lib/log.h"
+#include "lib/weakptr.h"
#include <arpa/inet.h>
#include <getopt.h>
kr_crypto_init();
+ ret = weakptr_manager_init();
+ if (ret != 0) {
+ kr_log_error(SYSTEM, "failed to initialize weakptr manager: %s\n",
+ kr_strerror(ret));
+ return EXIT_FAILURE;
+ }
+
network_init(uv_default_loop(), TCP_BACKLOG_DEFAULT);
/* Create a server engine. */
kr_log_error(SYSTEM, "failed to initialize resolver: %s\n", kr_strerror(ret));
return EXIT_FAILURE;
}
+
+ /* Register Lua bindings. */
+ kr_bindings_register(the_engine->L);
+
/* Initialize the worker. */
ret = worker_init(the_args->forks);
if (ret != 0) {
if (loop != NULL) {
uv_loop_close(loop);
}
+ weakptr_manager_deinit();
cleanup_args:
args_deinit(the_args);
kr_crypto_cleanup();
struct http_ctx *http_ctx; /**< server side http-related data. */
#endif
- trie_t *tasks; /**< list of tasks associated with given session. */
- queue_t(struct qr_task *) waiting; /**< list of tasks waiting for sending to upstream. */
-
- uint8_t *wire_buf; /**< Buffer for DNS message, except for XDP. */
- ssize_t wire_buf_size; /**< Buffer size. */
- ssize_t wire_buf_start_idx; /**< Data start offset in wire_buf. */
- ssize_t wire_buf_end_idx; /**< Data end offset in wire_buf. */
- uint64_t last_activity; /**< Time of last IO activity (if any occurs).
- * Otherwise session creation time. */
+ trie_t *tasks; /**< list of tasks associated with given session. */
+ queue_t(qr_task_weakptr_t) waiting; /**< list of tasks waiting for sending to upstream. */
+
+ uint8_t *wire_buf; /**< Buffer for DNS message, except for XDP. */
+ ssize_t wire_buf_size; /**< Buffer size. */
+ ssize_t wire_buf_start_idx; /**< Data start offset in wire_buf. */
+ ssize_t wire_buf_end_idx; /**< Data end offset in wire_buf. */
+ uint64_t last_activity; /**< Time of last IO activity (if any occurs).
+ * Otherwise session creation time. */
};
static void on_session_close(uv_handle_t *handle)
return io_stop_read(session->handle);
}
-int session_waitinglist_push(struct session *session, struct qr_task *task)
+int session_waitinglist_push(struct session *session, qr_task_weakptr_t taskptr)
{
- queue_push(session->waiting, task);
- worker_task_ref(task);
+ if (kr_fails_assert(worker_task_exists(taskptr)))
+ return kr_error(EINVAL);
+ queue_push(session->waiting, taskptr);
return kr_ok();
}
-struct qr_task *session_waitinglist_get(const struct session *session)
+qr_task_weakptr_t session_waitinglist_get(struct session *session)
{
- return (queue_len(session->waiting) > 0) ? (queue_head(session->waiting)) : NULL;
+ do {
+ if (queue_len(session->waiting) == 0)
+ return WEAKPTR_NULL;
+ qr_task_weakptr_t ptr = queue_head(session->waiting);
+ if (!ptr || !worker_task_exists(ptr)) {
+ queue_pop(session->waiting);
+ continue;
+ }
+ return ptr;
+ } while (true);
}
-struct qr_task *session_waitinglist_pop(struct session *session, bool deref)
+qr_task_weakptr_t session_waitinglist_pop(struct session *session)
{
- struct qr_task *t = session_waitinglist_get(session);
+ qr_task_weakptr_t t = session_waitinglist_get(session);
queue_pop(session->waiting);
- if (deref) {
- worker_task_unref(t);
- }
return t;
}
-int session_tasklist_add(struct session *session, struct qr_task *task)
+int session_tasklist_add(struct session *session, qr_task_weakptr_t taskptr)
{
+ if (kr_fails_assert(worker_task_exists(taskptr)))
+ return kr_error(ENOENT);
+
trie_t *t = session->tasks;
uint16_t task_msg_id = 0;
const char *key = NULL;
size_t key_len = 0;
if (session->sflags.outgoing) {
- knot_pkt_t *pktbuf = worker_task_get_pktbuf(task);
+ knot_pkt_t *pktbuf = worker_task_get_pktbuf(taskptr);
+ kr_require(pktbuf);
task_msg_id = knot_wire_get_id(pktbuf->wire);
key = (const char *)&task_msg_id;
key_len = sizeof(task_msg_id);
} else {
- key = (const char *)&task;
- key_len = sizeof(char *);
+ key = (const char *)&taskptr;
+ key_len = sizeof(taskptr);
}
trie_val_t *v = trie_get_ins(t, key, key_len);
if (kr_fails_assert(v))
return kr_error(ENOMEM);
if (*v == NULL) {
- *v = task;
- worker_task_ref(task);
- } else if (kr_fails_assert(*v == task)) {
+ *v = (void *) taskptr;
+ } else if (kr_fails_assert((qr_task_weakptr_t) *v == taskptr)) {
return kr_error(EINVAL);
}
return kr_ok();
}
-int session_tasklist_del(struct session *session, struct qr_task *task)
+int session_tasklist_del(struct session *session, qr_task_weakptr_t taskptr)
{
+ if (kr_fails_assert(worker_task_exists(taskptr)))
+ return kr_error(ENOENT);
+
trie_t *t = session->tasks;
uint16_t task_msg_id = 0;
const char *key = NULL;
size_t key_len = 0;
- trie_val_t val;
if (session->sflags.outgoing) {
- knot_pkt_t *pktbuf = worker_task_get_pktbuf(task);
+ knot_pkt_t *pktbuf = worker_task_get_pktbuf(taskptr);
task_msg_id = knot_wire_get_id(pktbuf->wire);
key = (const char *)&task_msg_id;
key_len = sizeof(task_msg_id);
} else {
- key = (const char *)&task;
- key_len = sizeof(char *);
- }
- int ret = trie_del(t, key, key_len, &val);
- if (ret == KNOT_EOK) {
- kr_require(val == task);
- worker_task_unref(val);
+ key = (const char *)&taskptr;
+ key_len = sizeof(taskptr);
}
+ int ret = trie_del(t, key, key_len, NULL);
+
return ret;
}
-struct qr_task *session_tasklist_get_first(struct session *session)
+qr_task_weakptr_t session_tasklist_get_first(struct session *session)
{
- trie_val_t *val = trie_get_first(session->tasks, NULL, NULL);
- return val ? (struct qr_task *) *val : NULL;
+ do {
+ trie_val_t *val = trie_get_first(session->tasks, NULL, NULL);
+ if (!val)
+ return WEAKPTR_NULL;
+ if (!worker_task_exists((qr_task_weakptr_t) *val)) {
+ // Task is not valid anymore, fetch next
+ trie_del_first(session->tasks, NULL, NULL, NULL);
+ continue;
+ }
+
+ return (qr_task_weakptr_t) *val;
+ } while (true);
}
-struct qr_task *session_tasklist_del_first(struct session *session, bool deref)
+qr_task_weakptr_t session_tasklist_del_first(struct session *session)
{
- trie_val_t val = NULL;
- int res = trie_del_first(session->tasks, NULL, NULL, &val);
- if (res != KNOT_EOK) {
- val = NULL;
- } else if (deref) {
- worker_task_unref(val);
- }
- return (struct qr_task *)val;
+ do {
+ trie_val_t val = NULL;
+ int res = trie_del_first(session->tasks, NULL, NULL, &val);
+ if (res != KNOT_EOK)
+ return WEAKPTR_NULL;
+ if (!worker_task_exists((qr_task_weakptr_t) val)) {
+ // Task is not valid anymore, delete next
+ continue;
+ }
+ return (qr_task_weakptr_t)val;
+ } while (true);
}
-struct qr_task* session_tasklist_del_msgid(const struct session *session, uint16_t msg_id)
+
+qr_task_weakptr_t session_tasklist_del_msgid(const struct session *session, uint16_t msg_id)
{
if (kr_fails_assert(session->sflags.outgoing))
- return NULL;
+ return WEAKPTR_NULL;
trie_t *t = session->tasks;
- struct qr_task *ret = NULL;
const char *key = (const char *)&msg_id;
size_t key_len = sizeof(msg_id);
trie_val_t val;
int res = trie_del(t, key, key_len, &val);
- if (res == KNOT_EOK) {
- if (worker_task_numrefs(val) > 1) {
- ret = val;
- }
- worker_task_unref(val);
- }
- return ret;
+ if (res != KNOT_EOK || !worker_task_exists((qr_task_weakptr_t)val))
+ return WEAKPTR_NULL;
+ return (qr_task_weakptr_t)val;
}
-struct qr_task* session_tasklist_find_msgid(const struct session *session, uint16_t msg_id)
+qr_task_weakptr_t session_tasklist_find_msgid(const struct session *session, uint16_t msg_id)
{
if (kr_fails_assert(session->sflags.outgoing))
- return NULL;
+ return WEAKPTR_NULL;
trie_t *t = session->tasks;
- struct qr_task *ret = NULL;
trie_val_t *val = trie_get_try(t, (char *)&msg_id, sizeof(msg_id));
if (val) {
- ret = *val;
+ if (!worker_task_exists((qr_task_weakptr_t)*val)) {
+ trie_del(t, (char *)&msg_id, sizeof(msg_id), NULL);
+ return WEAKPTR_NULL;
+ }
+ return (qr_task_weakptr_t)*val;
}
- return ret;
+ return WEAKPTR_NULL;
}
struct session_flags *session_flags(struct session *session)
return session;
}
-size_t session_tasklist_get_len(const struct session *session)
+size_t session_tasklist_get_len(struct session *session)
{
+ session_tasklist_get_first(session); // Clean up no-longer-valid tasks
return trie_weight(session->tasks);
}
-size_t session_waitinglist_get_len(const struct session *session)
+size_t session_waitinglist_get_len(struct session *session)
{
+ session_waitinglist_get(session); // Clean up no-longer-valid tasks
return queue_len(session->waiting);
}
-bool session_tasklist_is_empty(const struct session *session)
+bool session_tasklist_is_empty(struct session *session)
{
return session_tasklist_get_len(session) == 0;
}
-bool session_waitinglist_is_empty(const struct session *session)
+bool session_waitinglist_is_empty(struct session *session)
{
return session_waitinglist_get_len(session) == 0;
}
-bool session_is_empty(const struct session *session)
+bool session_is_empty(struct session *session)
{
return session_tasklist_is_empty(session) &&
session_waitinglist_is_empty(session);
void session_waitinglist_retry(struct session *session, bool increase_timeout_cnt)
{
while (!session_waitinglist_is_empty(session)) {
- struct qr_task *task = session_waitinglist_pop(session, false);
+ qr_task_weakptr_t t = session_waitinglist_pop(session);
if (increase_timeout_cnt) {
- worker_task_timeout_inc(task);
+ worker_task_timeout_inc(t);
}
- worker_task_step(task, &session->peer.ip, NULL);
- worker_task_unref(task);
+ worker_task_step(t, &session->peer.ip, NULL);
}
}
void session_waitinglist_finalize(struct session *session, int status)
{
while (!session_waitinglist_is_empty(session)) {
- struct qr_task *t = session_waitinglist_pop(session, false);
+ qr_task_weakptr_t t = session_waitinglist_pop(session);
worker_task_finalize(t, status);
- worker_task_unref(t);
}
}
void session_tasklist_finalize(struct session *session, int status)
{
while (session_tasklist_get_len(session) > 0) {
- struct qr_task *t = session_tasklist_del_first(session, false);
- kr_require(worker_task_numrefs(t) > 0);
+ qr_task_weakptr_t t = session_tasklist_del_first(session);
worker_task_finalize(t, status);
- worker_task_unref(t);
}
}
int session_tasklist_finalize_expired(struct session *session)
{
int ret = 0;
- queue_t(struct qr_task *) q;
+ queue_t(qr_task_weakptr_t) q;
uint64_t now = kr_now();
trie_t *t = session->tasks;
trie_it_t *it;
queue_init(q);
for (it = trie_it_begin(t); !trie_it_finished(it); trie_it_next(it)) {
trie_val_t *v = trie_it_val(it);
- struct qr_task *task = (struct qr_task *)*v;
- if ((now - worker_task_creation_time(task)) >= KR_RESOLVE_TIME_LIMIT) {
- struct kr_request *req = worker_task_request(task);
+ qr_task_weakptr_t taskptr = (qr_task_weakptr_t)*v;
+ if (!worker_task_exists(taskptr))
+ continue;
+
+ if ((now - worker_task_creation_time(taskptr)) >= KR_RESOLVE_TIME_LIMIT
+ && worker_task_exists(taskptr)) {
+ struct kr_request *req = worker_task_request(taskptr);
if (!kr_fails_assert(req))
kr_query_inform_timeout(req, req->current_query);
- queue_push(q, task);
- worker_task_ref(task);
+ queue_push(q, taskptr);
}
}
trie_it_free(it);
- struct qr_task *task = NULL;
+ qr_task_weakptr_t taskptr = WEAKPTR_NULL;
uint16_t msg_id = 0;
- char *key = (char *)&task;
- int32_t keylen = sizeof(struct qr_task *);
+ char *key = (char *)&taskptr;
+ int32_t keylen = sizeof(taskptr);
if (session->sflags.outgoing) {
key = (char *)&msg_id;
keylen = sizeof(msg_id);
}
while (queue_len(q) > 0) {
- task = queue_head(q);
+ taskptr = queue_head(q);
+ if (!worker_task_exists(taskptr)) {
+ queue_pop(q);
+ continue;
+ }
+
if (session->sflags.outgoing) {
- knot_pkt_t *pktbuf = worker_task_get_pktbuf(task);
+ knot_pkt_t *pktbuf = worker_task_get_pktbuf(taskptr);
+ kr_require(pktbuf); // should never happen - task check above
msg_id = knot_wire_get_id(pktbuf->wire);
}
- int res = trie_del(t, key, keylen, NULL);
- if (!worker_task_finished(task)) {
+ trie_del(t, key, keylen, NULL);
+ if (!worker_task_finished(taskptr)) {
/* task->pending_count must be zero,
* but there are can be followers,
* so run worker_task_subreq_finalize() to ensure retrying
* for all the followers. */
- worker_task_subreq_finalize(task);
- worker_task_finalize(task, KR_STATE_FAIL);
- }
- if (res == KNOT_EOK) {
- worker_task_unref(task);
+ worker_task_subreq_finalize(taskptr);
+ worker_task_finalize(taskptr, KR_STATE_FAIL);
}
queue_pop(q);
- worker_task_unref(task);
++ret;
}
return ret;
}
-void session_kill_ioreq(struct session *session, struct qr_task *task)
+void session_kill_ioreq(struct session *session, qr_task_weakptr_t task)
{
if (!session || session->sflags.closing)
return;
#include <stdbool.h>
#include <uv.h>
#include "lib/defines.h"
+#include "lib/weakptr.h"
-struct qr_task;
+typedef weakptr_t qr_task_weakptr_t;
struct worker_ctx;
struct session;
struct io_comm_data;
/** List of tasks been waiting for IO. */
/** Check if list is empty. */
-bool session_waitinglist_is_empty(const struct session *session);
+bool session_waitinglist_is_empty(struct session *session);
/** Add task to the end of the list. */
-int session_waitinglist_push(struct session *session, struct qr_task *task);
+int session_waitinglist_push(struct session *session, qr_task_weakptr_t taskptr);
/** Get the first element. */
-struct qr_task *session_waitinglist_get(const struct session *session);
+qr_task_weakptr_t session_waitinglist_get(struct session *session);
/** Get the first element and remove it from the list. */
-struct qr_task *session_waitinglist_pop(struct session *session, bool deref);
+qr_task_weakptr_t session_waitinglist_pop(struct session *session);
/** Get the list length. */
-size_t session_waitinglist_get_len(const struct session *session);
+size_t session_waitinglist_get_len(struct session *session);
/** Retry resolution for each task in the list. */
void session_waitinglist_retry(struct session *session, bool increase_timeout_cnt);
/** Finalize all tasks in the list. */
/** List of tasks associated with session. */
/** Check if list is empty. */
-bool session_tasklist_is_empty(const struct session *session);
+bool session_tasklist_is_empty(struct session *session);
/** Get the first element. */
-struct qr_task *session_tasklist_get_first(struct session *session);
+qr_task_weakptr_t session_tasklist_get_first(struct session *session);
/** Get the first element and remove it from the list. */
-struct qr_task *session_tasklist_del_first(struct session *session, bool deref);
+qr_task_weakptr_t session_tasklist_del_first(struct session *session);
/** Get the list length. */
-size_t session_tasklist_get_len(const struct session *session);
+size_t session_tasklist_get_len(struct session *session);
/** Add task to the list. */
-int session_tasklist_add(struct session *session, struct qr_task *task);
+int session_tasklist_add(struct session *session, qr_task_weakptr_t taskptr);
/** Remove task from the list. */
-int session_tasklist_del(struct session *session, struct qr_task *task);
+int session_tasklist_del(struct session *session, qr_task_weakptr_t taskptr);
/** Remove task with given msg_id, session_flags(session)->outgoing must be true. */
-struct qr_task* session_tasklist_del_msgid(const struct session *session, uint16_t msg_id);
+qr_task_weakptr_t session_tasklist_del_msgid(const struct session *session, uint16_t msg_id);
/** Find task with given msg_id */
-struct qr_task* session_tasklist_find_msgid(const struct session *session, uint16_t msg_id);
+qr_task_weakptr_t session_tasklist_find_msgid(const struct session *session, uint16_t msg_id);
/** Finalize all tasks in the list. */
void session_tasklist_finalize(struct session *session, int status);
/** Finalize all expired tasks in the list. */
/** Both of task lists (associated & waiting). */
/** Check if empty. */
-bool session_is_empty(const struct session *session);
+bool session_is_empty(struct session *session);
/** Get pointer to session flags */
struct session_flags *session_flags(struct session *session);
/** Get pointer to peer address. */
knot_pkt_t *session_produce_packet(struct session *session, knot_mm_t *mm);
int session_discard_packet(struct session *session, const knot_pkt_t *pkt);
-void session_kill_ioreq(struct session *session, struct qr_task *task);
+void session_kill_ioreq(struct session *session, qr_task_weakptr_t taskptr);
/** Update timestamp */
void session_touch(struct session *session);
/** Returns either creation time or time of last IO activity if any occurs. */
struct worker_ctx;
-struct qr_task;
struct network;
struct engine;
#include "lib/generic/array.h"
#include "lib/utils.h"
-struct qr_task;
-
#include <sys/socket.h>
return 0;
}
/* Appease the linker in case this unused call isn't optimized out. */
-void udp_queue_push(int fd, struct kr_request *req, struct qr_task *task)
+void udp_queue_push(int fd, struct kr_request *req, qr_task_weakptr_t taskptr)
{
abort();
}
int len; /**< The number of messages in the queue: 0..UDP_QUEUE_LEN */
struct mmsghdr msgvec[UDP_QUEUE_LEN]; /**< Parameter for sendmmsg() */
struct {
- struct qr_task *task; /**< Links for completion callbacks. */
+ qr_task_weakptr_t taskptr; /**< Links for completion callbacks. */
struct iovec msg_iov[1]; /**< storage for .msgvec[i].msg_iov */
} items[UDP_QUEUE_LEN];
} udp_queue_t;
/* ATM we don't really do anything about failures. */
int err = sent_len < 0 ? errno : EAGAIN /* unknown error, really */;
for (int i = 0; i < q->len; ++i) {
- qr_task_on_send(q->items[i].task, NULL, i < sent_len ? 0 : err);
- worker_task_unref(q->items[i].task);
+ qr_task_on_send(q->items[i].taskptr, NULL, i < sent_len ? 0 : err);
}
q->len = 0;
}
return ret;
}
-void udp_queue_push(int fd, struct kr_request *req, struct qr_task *task)
+void udp_queue_push(int fd, struct kr_request *req, qr_task_weakptr_t taskptr)
{
if (fd < 0) {
kr_log_error(SYSTEM, "ERROR: called udp_queue_push(fd = %d, ...)\n", fd);
abort();
}
- worker_task_ref(task);
+ if (kr_fails_assert(worker_task_exists(taskptr)))
+ return;
/* Get a valid correct queue. */
if (fd >= state.udp_queues_len) {
const int new_len = fd + 1;
struct sockaddr *sa = (struct sockaddr *)/*const-cast*/req->qsource.comm_addr;
q->msgvec[q->len].msg_hdr.msg_name = sa;
q->msgvec[q->len].msg_hdr.msg_namelen = kr_sockaddr_len(sa);
- q->items[q->len].task = task;
+ q->items[q->len].taskptr = taskptr;
q->items[q->len].msg_iov[0] = (struct iovec){
.iov_base = req->answer->wire,
.iov_len = req->answer->size,
#pragma once
#include <uv.h>
+#include "lib/weakptr.h"
+
struct kr_request;
-struct qr_task;
+typedef weakptr_t qr_task_weakptr_t;
/** Initialize the global state for udp_queue. */
int udp_queue_init_global(uv_loop_t *loop);
/** Send req->answer via UDP, possibly not immediately. */
-void udp_queue_push(int fd, struct kr_request *req, struct qr_task *task);
+void udp_queue_push(int fd, struct kr_request *req, qr_task_weakptr_t task);
#include <libknot/xdp/xdp.h>
#endif
-#include "daemon/bindings/api.h"
#include "daemon/engine.h"
#include "daemon/io.h"
#include "daemon/proxyv2.h"
{
struct kr_request req;
- struct qr_task *task;
+ qr_task_weakptr_t taskptr;
struct {
/** NULL if the request didn't come over network. */
struct session *session;
uint16_t pending_count;
uint16_t timeouts;
uint16_t iter_count;
- uint32_t refs;
+ qr_task_weakptr_t weakptr; /**< Weak pointer to self. */
bool finished : 1;
bool leading : 1;
uint64_t creation_time;
};
-/* Convenience macros */
-#define qr_task_ref(task) \
- do { ++(task)->refs; } while(0)
-#define qr_task_unref(task) \
- do { \
- if (task) \
- kr_require((task)->refs > 0); \
- if ((task) && --(task)->refs == 0) \
- qr_task_free((task)); \
- } while (0)
-
/* Forward decls */
+static struct session* worker_find_tcp_connected(const struct sockaddr* addr);
+static struct session* worker_find_tcp_waiting(const struct sockaddr* addr);
+static int worker_add_tcp_connected(const struct sockaddr* addr, struct session *session);
static void qr_task_free(struct qr_task *task);
static int qr_task_step(struct qr_task *task,
const struct sockaddr *packet_source,
const struct sockaddr *addr, knot_pkt_t *pkt);
static int qr_task_finalize(struct qr_task *task, int state);
static void qr_task_complete(struct qr_task *task);
+static void qr_task_assert_weakptr(struct qr_task *task);
static int worker_add_tcp_waiting(const struct sockaddr* addr, struct session *session);
static void on_tcp_connect_timeout(uv_timer_t *timer);
static void on_udp_timeout(uv_timer_t *timer);
struct worker_ctx the_worker_value; /**< Static allocation is suitable for the singleton. */
struct worker_ctx *the_worker = NULL;
+static struct kr_query *task_get_last_pending_query(struct qr_task *task)
+{
+ if (!task || task->ctx->req.rplan.pending.len == 0) {
+ return NULL;
+ }
+
+ return array_tail(task->ctx->req.rplan.pending);
+}
+
+static inline void worker_task_pkt_set_msgid(struct qr_task *task, uint16_t msgid)
+{
+ knot_pkt_t *pktbuf = task->pktbuf;
+ knot_wire_set_id(pktbuf->wire, msgid);
+ struct kr_query *q = task_get_last_pending_query(task);
+ q->id = msgid;
+}
+
/*! @internal Create a UDP/TCP handle for an outgoing AF_INET* connection.
* socktype is SOCK_* */
static uv_handle_t *ioreq_spawn(int socktype, sa_family_t family, bool has_tls,
static void ioreq_kill_pending(struct qr_task *task)
{
+ qr_task_weakptr_t taskptr = task->weakptr;
for (uint16_t i = 0; i < task->pending_count; ++i) {
- session_kill_ioreq(task->pending[i], task);
+ session_kill_ioreq(task->pending[i], taskptr);
}
task->pending_count = 0;
}
the_worker->stats.rconcurrent -= 1;
}
-static struct qr_task *qr_task_create(struct request_ctx *ctx)
+static struct qr_task *qr_task_create(struct request_ctx *ctx, qr_task_weakptr_t *out_weakptr)
{
/* Choose (initial) pktbuf size. As it is now, pktbuf can be used
* for UDP answers from upstream *and* from cache
}
/* Create resolution task */
- struct qr_task *task = mm_calloc(&ctx->req.pool, 1, sizeof(*task));
- if (!task) {
+ struct qr_task *task;
+ qr_task_weakptr_t taskptr = weakptr_mm_alloc(&ctx->req.pool,
+ sizeof(*task), (void **)&task);
+ if (!taskptr || !task) {
return NULL;
}
+ /* Initialize task to zero + make self-reference with `weakptr` */
+ *task = (struct qr_task) { .weakptr = taskptr };
+
/* Create packet buffers for answer and subrequests */
knot_pkt_t *pktbuf = knot_pkt_new(NULL, pktbuf_max, &ctx->req.pool);
if (!pktbuf) {
task->ctx = ctx;
task->pktbuf = pktbuf;
array_init(task->waiting);
- task->refs = 0;
- kr_assert(ctx->task == NULL);
- ctx->task = task;
- /* Make the primary reference to task. */
- qr_task_ref(task);
+ kr_assert(ctx->taskptr == WEAKPTR_NULL);
+ ctx->taskptr = taskptr;
task->creation_time = kr_now();
the_worker->stats.concurrent += 1;
+ if (out_weakptr)
+ *out_weakptr = taskptr;
return task;
}
-/* This is called when the task refcount is zero, free memory. */
static void qr_task_free(struct qr_task *task)
{
+ if (!task)
+ return;
+
+ qr_task_assert_weakptr(task);
+
struct request_ctx *ctx = task->ctx;
+ weakptr_mm_free(&ctx->req.pool, task->weakptr);
if (kr_fails_assert(ctx))
return;
- if (ctx->task == NULL) {
+ if (ctx->taskptr == WEAKPTR_NULL) {
request_free(ctx);
}
if (kr_fails_assert(!session_flags(session)->outgoing && session_get_handle(session)->type == UV_TCP))
return kr_error(EINVAL);
- session_tasklist_add(session, task);
+ session_tasklist_add(session, task->weakptr);
struct request_ctx *ctx = task->ctx;
if (kr_fails_assert(ctx && (ctx->source.session == NULL || ctx->source.session == session)))
if (s) {
kr_require(!session_flags(s)->outgoing && session_waitinglist_is_empty(s));
ctx->source.session = NULL;
- session_tasklist_del(s, task);
+ session_tasklist_del(s, task->weakptr);
}
- /* Release primary reference to task. */
- if (ctx->task == task) {
- ctx->task = NULL;
- qr_task_unref(task);
- }
+ if (ctx->taskptr == task->weakptr)
+ ctx->taskptr = WEAKPTR_NULL;
+}
+
+/** A debug function checking if `task->weakptr` is not corrupted, i.e. still
+ * pointing to `task` itself. */
+static inline void qr_task_assert_weakptr(struct qr_task *task)
+{
+#ifndef NDEBUG
+ if (!task)
+ return;
+ struct qr_task *self = weakptr_get(task->weakptr);
+ kr_require(self); /* Weakptr did not exist */
+ kr_require(self == task); /* Weakptr existed but did not point to `task` */
+#endif
}
/* This is called when we send subrequest / answer */
-int qr_task_on_send(struct qr_task *task, const uv_handle_t *handle, int status)
+static int qr_task_on_send_internal(struct qr_task *task, const uv_handle_t *handle, int status)
{
+ qr_task_assert_weakptr(task);
if (task->finished) {
kr_require(task->leading == false);
qr_task_complete(task);
}
if (!handle || kr_fails_assert(handle->data))
- return status;
+ goto cleanup;
struct session* s = handle->data;
if (handle->type == UV_UDP && session_flags(s)->outgoing) {
// This should ensure that we are only dealing with our question to upstream
if (kr_fails_assert(!knot_wire_get_qr(task->pktbuf->wire)))
- return status;
+ goto cleanup;
// start the timer
struct kr_query *qry = array_tail(task->ctx->req.rplan.pending);
if (kr_fails_assert(qry && task->transport))
- return status;
+ goto cleanup;
size_t timeout = task->transport->timeout;
int ret = session_timer_start(s, on_udp_timeout, timeout, 0);
/* Start next step with timeout, fatal if can't start a timer. */
if (ret != 0) {
subreq_finalize(task, &task->transport->address.ip, task->pktbuf);
qr_task_finalize(task, KR_STATE_FAIL);
+ return status;
}
}
peer_str, uv_strerror(status));
}
worker_end_tcp(s);
- return status;
+ goto cleanup;
}
if (session_flags(s)->outgoing || session_flags(s)->closing)
- return status;
+ goto cleanup;
if (session_flags(s)->throttled &&
session_tasklist_get_len(s) < the_worker->tcp_pipeline_max/2) {
}
}
+cleanup:;
+ if (task->finished) {
+ /* Answer has been sent or an error has occurred,
+ * the task is complete, we can free it. */
+ qr_task_free(task);
+ }
return status;
}
+int qr_task_on_send(qr_task_weakptr_t taskptr, const uv_handle_t *handle, int status)
+{
+ struct qr_task *task = weakptr_get(taskptr);
+ if (!task)
+ return kr_error(ENOENT);
+ return qr_task_on_send_internal(task, handle, status);
+}
+
static void on_send(uv_udp_send_t *req, int status)
{
- struct qr_task *task = req->data;
+ qr_task_weakptr_t taskptr = (qr_task_weakptr_t) req->data;
uv_handle_t *h = (uv_handle_t *)req->handle;
- qr_task_on_send(task, h, status);
- qr_task_unref(task);
+ qr_task_on_send(taskptr, h, status);
free(req);
}
static void on_write(uv_write_t *req, int status)
{
- struct qr_task *task = req->data;
+ qr_task_weakptr_t taskptr = (qr_task_weakptr_t) req->data;
uv_handle_t *h = (uv_handle_t *)req->handle;
- qr_task_on_send(task, h, status);
- qr_task_unref(task);
+ qr_task_on_send(taskptr, h, status);
free(req);
}
const struct sockaddr *addr, knot_pkt_t *pkt)
{
if (!session)
- return qr_task_on_send(task, NULL, kr_error(EIO));
+ return qr_task_on_send_internal(task, NULL, kr_error(EIO));
int ret = 0;
struct request_ctx *ctx = task->ctx;
uv_handle_t *handle = session_get_handle(session);
if (kr_fails_assert(handle && handle->data == session))
- return qr_task_on_send(task, NULL, kr_error(EINVAL));
+ return qr_task_on_send_internal(task, NULL, kr_error(EINVAL));
const bool is_stream = handle->type == UV_TCP;
kr_require(is_stream || handle->type == UV_UDP);
addr = session_get_peer(session);
if (pkt == NULL)
- pkt = worker_task_get_pktbuf(task);
+ pkt = task->pktbuf;
if (session_flags(session)->outgoing && handle->type == UV_TCP) {
size_t try_limit = session_tasklist_get_len(session) + 1;
++msg_id;
++try_count;
}
- if (try_count > try_limit)
+ if (try_count > try_limit) {
+ qr_task_free(task);
return kr_error(ENOENT);
+ }
worker_task_pkt_set_msgid(task, msg_id);
}
task->recv_time = 0; // task structure is being reused so we have to zero this out here
/* Send using given protocol */
if (kr_fails_assert(!session_flags(session)->closing))
- return qr_task_on_send(task, NULL, kr_error(EIO));
+ return qr_task_on_send_internal(task, NULL, kr_error(EIO));
uv_handle_t *ioreq = malloc(is_stream ? sizeof(uv_write_t) : sizeof(uv_udp_send_t));
if (!ioreq)
- return qr_task_on_send(task, handle, kr_error(ENOMEM));
-
- /* Pending ioreq on current task */
- qr_task_ref(task);
+ return qr_task_on_send_internal(task, handle, kr_error(ENOMEM));
if (session_flags(session)->has_http) {
#if ENABLE_DOH2
uv_write_t *write_req = (uv_write_t *)ioreq;
- write_req->data = task;
+ write_req->data = (void *)task->weakptr;
ret = http_write(write_req, handle, pkt, ctx->req.qsource.stream_id, &on_write);
#else
ret = kr_error(ENOPROTOOPT);
#endif
} else if (session_flags(session)->has_tls) {
uv_write_t *write_req = (uv_write_t *)ioreq;
- write_req->data = task;
+ write_req->data = (void *)task->weakptr;
ret = tls_write(write_req, handle, pkt, &on_write);
} else if (handle->type == UV_UDP) {
uv_udp_send_t *send_req = (uv_udp_send_t *)ioreq;
uv_buf_t buf = { (char *)pkt->wire, pkt->size };
- send_req->data = task;
+ send_req->data = (void *)task->weakptr;
ret = uv_udp_send(send_req, (uv_udp_t *)handle, &buf, 1, addr, &on_send);
} else if (handle->type == UV_TCP) {
uv_write_t *write_req = (uv_write_t *)ioreq;
{ (char *)&pkt->size + lsbi, 1 },
{ (char *)pkt->wire, pkt->size },
};
- write_req->data = task;
+ write_req->data = (void *)task->weakptr;
ret = uv_write(write_req, (uv_stream_t *)handle, buf, 3, &on_write);
} else {
kr_assert(false);
if (ret == 0) {
session_touch(session);
if (session_flags(session)->outgoing) {
- session_tasklist_add(session, task);
+ session_tasklist_add(session, task->weakptr);
}
if (the_worker->too_many_open &&
the_worker->stats.rconcurrent <
}
} else {
free(ioreq);
- qr_task_unref(task);
if (ret == UV_EMFILE) {
the_worker->too_many_open = true;
the_worker->rconcurrent_highwatermark = the_worker->stats.rconcurrent;
return ret;
}
-static struct kr_query *task_get_last_pending_query(struct qr_task *task)
-{
- if (!task || task->ctx->req.rplan.pending.len == 0) {
- return NULL;
- }
-
- return array_tail(task->ctx->req.rplan.pending);
-}
-
static int session_tls_hs_cb(struct session *session, int status)
{
if (kr_fails_assert(session_flags(session)->outgoing))
int ret = kr_ok();
if (status) {
- struct qr_task *task = session_waitinglist_get(session);
- if (task) {
+ qr_task_weakptr_t taskptr = session_waitinglist_get(session);
+ if (taskptr) {
// TLS handshake failed, report it to server selection
+ struct qr_task *task = weakptr_get(taskptr);
struct kr_query *qry = array_tail(task->ctx->req.rplan.pending);
qry->server_selection.error(qry, task->transport, KR_SELECTION_TLS_HANDSHAKE_FAILED);
}
}
if (ret == kr_ok()) {
while (!session_waitinglist_is_empty(session)) {
- struct qr_task *t = session_waitinglist_get(session);
- ret = qr_task_send(t, session, NULL, NULL);
- if (ret != 0) {
- break;
+ qr_task_weakptr_t taskptr = session_waitinglist_get(session);
+ struct qr_task *t = weakptr_get(taskptr);
+ if (t) {
+ ret = qr_task_send(t, session, NULL, NULL);
+ if (ret != 0) {
+ break;
+ }
}
- session_waitinglist_pop(session, true);
+ session_waitinglist_pop(session);
}
} else {
ret = kr_error(EINVAL);
{
int ret = 0;
while (!session_waitinglist_is_empty(session)) {
- struct qr_task *t = session_waitinglist_get(session);
- ret = qr_task_send(t, session, NULL, NULL);
- if (ret != 0) {
- struct sockaddr *peer = session_get_peer(session);
- session_waitinglist_finalize(session, KR_STATE_FAIL);
- session_tasklist_finalize(session, KR_STATE_FAIL);
- worker_del_tcp_connected(peer);
- session_close(session);
- break;
+ qr_task_weakptr_t taskptr = session_waitinglist_get(session);
+ struct qr_task *t = weakptr_get(taskptr);
+ if (t) {
+ ret = qr_task_send(t, session, NULL, NULL);
+ if (ret != 0) {
+ struct sockaddr *peer = session_get_peer(session);
+ session_waitinglist_finalize(session, KR_STATE_FAIL);
+ session_tasklist_finalize(session, KR_STATE_FAIL);
+ worker_del_tcp_connected(peer);
+ session_close(session);
+ break;
+ }
}
- session_waitinglist_pop(session, true);
+ session_waitinglist_pop(session);
}
return ret;
}
peer_str ? peer_str : "", uv_strerror(status));
}
worker_del_tcp_waiting(peer);
- struct qr_task *task = session_waitinglist_get(session);
+ qr_task_weakptr_t taskptr = session_waitinglist_get(session);
+ struct qr_task *task = weakptr_get(taskptr);
if (task && status != UV_ETIMEDOUT) {
/* Penalize upstream.
* In case of UV_ETIMEDOUT upstream has been
struct sockaddr *peer = session_get_peer(session);
worker_del_tcp_waiting(peer);
- struct qr_task *task = session_waitinglist_get(session);
+ qr_task_weakptr_t taskptr = session_waitinglist_get(session);
+ struct qr_task *task = weakptr_get(taskptr);
if (!task) {
/* Normally shouldn't happen. */
const char *peer_str = kr_straddr(peer);
uv_timer_stop(timer);
- struct qr_task *task = session_tasklist_get_first(session);
+ qr_task_weakptr_t taskptr = session_tasklist_get_first(session);
+ struct qr_task *task = weakptr_get(taskptr);
if (!task)
return;
static uv_handle_t *transmit(struct qr_task *task)
{
- uv_handle_t *ret = NULL;
- if (task) {
- struct kr_transport* transport = task->transport;
+ if (!task)
+ return NULL;
+
+ struct kr_transport* transport = task->transport;
+ struct sockaddr_in6 *choice = (struct sockaddr_in6 *)&transport->address;
- struct sockaddr_in6 *choice = (struct sockaddr_in6 *)&transport->address;
+ if (!choice || task->pending_count >= MAX_PENDING)
+ return NULL;
- if (!choice) {
- return ret;
- }
- if (task->pending_count >= MAX_PENDING) {
- return ret;
- }
- /* Checkout answer before sending it */
- struct request_ctx *ctx = task->ctx;
- if (kr_resolve_checkout(&ctx->req, NULL, transport, task->pktbuf) != 0) {
- return ret;
- }
- ret = ioreq_spawn(SOCK_DGRAM, choice->sin6_family, false, false);
- if (!ret) {
- return ret;
- }
- struct sockaddr *addr = (struct sockaddr *)choice;
- struct session *session = ret->data;
- struct sockaddr *peer = session_get_peer(session);
- kr_assert(peer->sa_family == AF_UNSPEC && session_flags(session)->outgoing);
- kr_require(addr->sa_family == AF_INET || addr->sa_family == AF_INET6);
- memcpy(peer, addr, kr_sockaddr_len(addr));
- if (qr_task_send(task, session, (struct sockaddr *)choice,
- task->pktbuf) != 0) {
- session_close(session);
- ret = NULL;
- } else {
- task->pending[task->pending_count] = session;
- task->pending_count += 1;
- session_start_read(session); /* Start reading answer */
- }
+ /* Checkout answer before sending it */
+ struct request_ctx *ctx = task->ctx;
+ if (kr_resolve_checkout(&ctx->req, NULL, transport, task->pktbuf) != 0)
+ return NULL;
+
+ uv_handle_t *ret = ioreq_spawn(SOCK_DGRAM, choice->sin6_family, false, false);
+ if (!ret)
+ return ret;
+
+ struct sockaddr *addr = (struct sockaddr *)choice;
+ struct session *session = ret->data;
+ struct sockaddr *peer = session_get_peer(session);
+ kr_assert(peer->sa_family == AF_UNSPEC && session_flags(session)->outgoing);
+ kr_require(addr->sa_family == AF_INET || addr->sa_family == AF_INET6);
+ memcpy(peer, addr, kr_sockaddr_len(addr));
+ if (qr_task_send(task, session, (struct sockaddr *)choice, task->pktbuf) != 0) {
+ session_close(session);
+ return NULL;
}
+
+ task->pending[task->pending_count] = session;
+ task->pending_count += 1;
+ session_start_read(session); /* Start reading answer */
return ret;
}
/* Notify waiting tasks. */
struct kr_query *leader_qry = array_tail(task->ctx->req.rplan.pending);
for (size_t i = task->waiting.len; i > 0; i--) {
- struct qr_task *follower = task->waiting.at[i - 1];
+ qr_task_weakptr_t followerptr = task->waiting.at[i - 1];
+ struct qr_task *follower = weakptr_get(followerptr);
+ if (!follower)
+ continue;
+
/* Reuse MSGID and 0x20 secret */
if (follower->ctx->req.rplan.pending.len > 0) {
struct kr_query *qry = array_tail(follower->ctx->req.rplan.pending);
leader_qry->secret = 0; /* Next will be already decoded */
}
qr_task_step(follower, packet_source, pkt);
- qr_task_unref(follower);
}
task->waiting.len = 0;
task->leading = false;
if (!leader /*ENOMEM*/ || !*leader)
return false;
/* Enqueue itself to leader for this subrequest. */
- int ret = array_push_mm((*leader)->waiting, task,
+
+ int ret = array_push_mm((*leader)->waiting, task->weakptr,
kr_memreserve, &(*leader)->ctx->req.pool);
if (unlikely(ret < 0)) /*ENOMEM*/
return false;
- qr_task_ref(task);
return true;
}
struct request_ctx *ctx = task->ctx;
xdp_handle_data_t *xhd = src_handle->data;
if (kr_fails_assert(xhd && xhd->socket && xhd->session == ctx->source.session))
- return qr_task_on_send(task, src_handle, kr_error(EINVAL));
+ return qr_task_on_send_internal(task, src_handle, kr_error(EINVAL));
knot_xdp_msg_t msg;
#if KNOT_VERSION_HEX >= 0x030100
uv_idle_start(&xhd->tx_waker, xdp_tx_waker);
kr_log_debug(XDP, "pushed a packet, ret = %d\n", ret);
- return qr_task_on_send(task, src_handle, ret);
+ return qr_task_on_send_internal(task, src_handle, ret);
#else
kr_assert(!EINVAL);
return kr_error(EINVAL);
static int qr_task_finalize(struct qr_task *task, int state)
{
kr_require(task && task->leading == false);
+ qr_task_assert_weakptr(task);
if (task->finished) {
+ qr_task_free(task);
return kr_ok();
}
struct request_ctx *ctx = task->ctx;
task->finished = true;
if (source_session == NULL) {
- (void) qr_task_on_send(task, NULL, kr_error(EIO));
+ (void) qr_task_on_send_internal(task, NULL, kr_error(EIO));
return state == KR_STATE_DONE ? kr_ok() : kr_error(EIO);
}
/* For NO_ANSWER, a well-behaved layer should set the state to FAIL */
kr_assert(!ctx->req.options.NO_ANSWER || (ctx->req.state & KR_STATE_FAIL));
- (void) qr_task_on_send(task, NULL, kr_ok());
+ (void) qr_task_on_send_internal(task, NULL, kr_ok());
return kr_ok();
}
if (session_flags(source_session)->closing ||
- ctx->source.addr.ip.sa_family == AF_UNSPEC)
+ ctx->source.addr.ip.sa_family == AF_UNSPEC) {
+ qr_task_free(task);
return kr_error(EINVAL);
-
- /* Reference task as the callback handler can close it */
- qr_task_ref(task);
+ }
/* Send back answer */
int ret;
const uv_handle_t *src_handle = session_get_handle(source_session);
+ qr_task_assert_weakptr(task);
+ qr_task_weakptr_t taskptr = task->weakptr;
if (kr_fails_assert(src_handle->type == UV_UDP || src_handle->type == UV_TCP
|| src_handle->type == UV_POLL)) {
ret = kr_error(EINVAL);
} else if (src_handle->type == UV_UDP && ENABLE_SENDMMSG) {
int fd;
ret = uv_fileno(src_handle, &fd);
- if (ret == 0)
- udp_queue_push(fd, &ctx->req, task);
- else
+ if (ret == 0) {
+ udp_queue_push(fd, &ctx->req, taskptr);
+ } else {
kr_assert(false);
+ }
} else {
ret = qr_task_send(task, source_session, &ctx->source.comm_addr.ip, ctx->req.answer);
}
if (ret != kr_ok()) {
- (void) qr_task_on_send(task, NULL, kr_error(EIO));
+ if (worker_task_exists(taskptr)) /* May have been freed by `qr_task_send` above */
+ (void) qr_task_on_send_internal(task, NULL, kr_error(EIO));
+
/* Since source session is erroneous detach all tasks. */
while (!session_tasklist_is_empty(source_session)) {
- struct qr_task *t = session_tasklist_del_first(source_session, false);
+ qr_task_weakptr_t tptr = session_tasklist_del_first(source_session);
+ struct qr_task *t = weakptr_get(tptr);
+ if (!t) continue;
+
struct request_ctx *c = t->ctx;
kr_assert(c->source.session == source_session);
c->source.session = NULL;
- /* Don't finalize them as there can be other tasks
- * waiting for answer to this particular task.
- * (ie. task->leading is true) */
- worker_task_unref(t);
}
session_close(source_session);
}
- qr_task_unref(task);
-
if (ret != kr_ok() || state != KR_STATE_DONE)
return kr_error(EIO);
return kr_ok();
return kr_error(EINVAL);
/* Add task to the end of list of waiting tasks.
* It will be notified in on_connect() or qr_task_on_send(). */
- int ret = session_waitinglist_push(session, task);
+ int ret = session_waitinglist_push(session, task->weakptr);
if (ret < 0) {
return kr_error(EINVAL);
}
/* Add task to the end of list of waiting tasks.
* Will be notified either in on_connect() or in qr_task_on_send(). */
- ret = session_waitinglist_push(session, task);
+ ret = session_waitinglist_push(session, task->weakptr);
if (ret < 0) {
session_timer_stop(session);
worker_del_tcp_waiting(addr);
/* Close pending I/O requests */
subreq_finalize(task, packet_source, packet);
- if ((kr_now() - worker_task_creation_time(task)) >= KR_RESOLVE_TIME_LIMIT) {
- struct kr_request *req = worker_task_request(task);
+ if ((kr_now() - task->creation_time) >= KR_RESOLVE_TIME_LIMIT) {
+ struct kr_request *req = &task->ctx->req;
if (!kr_fails_assert(req))
kr_query_inform_timeout(req, req->current_query);
return qr_task_finalize(task, KR_STATE_FAIL);
return kr_error(ENOMEM);
}
- task = qr_task_create(ctx);
+ task = qr_task_create(ctx, NULL);
if (!task) {
request_free(ctx);
return kr_error(ENOMEM);
}
} else { /* response from upstream */
const uint16_t id = knot_wire_get_id(pkt->wire);
- task = session_tasklist_del_msgid(session, id);
- if (task == NULL) {
+ qr_task_weakptr_t taskptr = session_tasklist_del_msgid(session, id);
+ if (taskptr == WEAKPTR_NULL) {
VERBOSE_MSG(NULL, "=> ignoring packet with mismatching ID %d\n",
(int)id);
return kr_error(ENOENT);
}
+ task = weakptr_get(taskptr);
+ if (task == NULL) {
+ VERBOSE_MSG(NULL, "=> ignoring packet for no longer valid task (ID %d)\n",
+ (int)id);
+ return kr_error(ENOENT);
+ }
if (kr_fails_assert(!session_flags(session)->closing))
return kr_error(EINVAL);
addr = (comm) ? comm->src_addr : NULL;
}
while (!session_waitinglist_is_empty(session)) {
- struct qr_task *task = session_waitinglist_pop(session, false);
- kr_assert(task->refs > 1);
- session_tasklist_del(session, task);
+ qr_task_weakptr_t taskptr = session_waitinglist_pop(session);
+ struct qr_task *task = weakptr_get(taskptr);
+ if (!task) continue;
+
+ session_tasklist_del(session, taskptr);
if (session_flags(session)->outgoing) {
if (task->ctx->req.options.FORWARD) {
/* We are in TCP_FORWARD mode.
kr_assert(task->ctx->source.session == session);
task->ctx->source.session = NULL;
}
- worker_task_unref(task);
}
while (!session_tasklist_is_empty(session)) {
- struct qr_task *task = session_tasklist_del_first(session, false);
+ qr_task_weakptr_t taskptr = session_tasklist_del_first(session);
+ struct qr_task *task = weakptr_get(taskptr);
+ if (!task) continue;
+
if (session_flags(session)->outgoing) {
if (task->ctx->req.options.FORWARD) {
struct kr_request *req = &task->ctx->req;
kr_assert(task->ctx->source.session == session);
task->ctx->source.session = NULL;
}
- worker_task_unref(task);
}
session_close(session);
return kr_ok();
return worker_resolve_mk_pkt_dname(qname, qtype, qclass, options);
}
-struct qr_task *worker_resolve_start(knot_pkt_t *query, struct kr_qflags options)
+qr_task_weakptr_t worker_resolve_start(knot_pkt_t *query, struct kr_qflags options)
{
if (kr_fails_assert(the_worker && query))
- return NULL;
+ return WEAKPTR_NULL;
struct request_ctx *ctx = request_create(NULL, NULL, NULL, NULL,
the_worker->next_request_uid);
if (!ctx)
- return NULL;
+ return WEAKPTR_NULL;
/* Create task */
- struct qr_task *task = qr_task_create(ctx);
+ qr_task_weakptr_t taskptr;
+ struct qr_task *task = qr_task_create(ctx, &taskptr);
if (!task) {
request_free(ctx);
- return NULL;
+ return WEAKPTR_NULL;
}
/* Start task */
int ret = request_start(ctx, query);
if (ret != 0) {
/* task is attached to request context,
- * so dereference (and deallocate) it first */
- ctx->task = NULL;
- qr_task_unref(task);
+ * so deallocate it first */
+ ctx->taskptr = WEAKPTR_NULL;
+ qr_task_free(task);
request_free(ctx);
- return NULL;
+ return WEAKPTR_NULL;
}
the_worker->next_request_uid += 1;
/* Set options late, as qr_task_start() -> kr_resolve_begin() rewrite it. */
kr_qflags_set(&task->ctx->req.options, options);
- return task;
+ return taskptr;
}
-int worker_resolve_exec(struct qr_task *task, knot_pkt_t *query)
+int worker_resolve_exec(qr_task_weakptr_t taskptr, knot_pkt_t *query)
{
- if (!task)
+ if (!taskptr)
return kr_error(EINVAL);
+ struct qr_task *task = weakptr_get(taskptr);
+ if (!task)
+ return kr_error(ENOENT);
return qr_task_step(task, NULL, query);
}
-int worker_task_numrefs(const struct qr_task *task)
+bool worker_task_exists(qr_task_weakptr_t taskptr)
{
- return task->refs;
+ return weakptr_get(taskptr) != NULL;
}
-struct kr_request *worker_task_request(struct qr_task *task)
+struct kr_request *worker_task_request(qr_task_weakptr_t taskptr)
{
+ if (!taskptr)
+ return NULL;
+ struct qr_task *task = weakptr_get(taskptr);
if (!task || !task->ctx)
return NULL;
return &task->ctx->req;
}
-int worker_task_finalize(struct qr_task *task, int state)
+int worker_task_finalize(qr_task_weakptr_t taskptr, int state)
{
+ struct qr_task *task = weakptr_get(taskptr);
+ if (!task)
+ return kr_error(ENOENT);
return qr_task_finalize(task, state);
}
- int worker_task_step(struct qr_task *task, const struct sockaddr *packet_source,
- knot_pkt_t *packet)
- {
- return qr_task_step(task, packet_source, packet);
- }
-
-void worker_task_complete(struct qr_task *task)
-{
- qr_task_complete(task);
-}
-
-void worker_task_ref(struct qr_task *task)
+int worker_task_step(qr_task_weakptr_t taskptr, const struct sockaddr *packet_source,
+ knot_pkt_t *packet)
{
- qr_task_ref(task);
-}
-
-void worker_task_unref(struct qr_task *task)
-{
- qr_task_unref(task);
+ struct qr_task *task = weakptr_get(taskptr);
+ if (!task)
+ return kr_error(ENOENT);
+ return qr_task_step(task, packet_source, packet);
}
-void worker_task_timeout_inc(struct qr_task *task)
+void worker_task_complete(qr_task_weakptr_t taskptr)
{
- task->timeouts += 1;
+ struct qr_task *task = weakptr_get(taskptr);
+ if (task)
+ qr_task_complete(task);
}
-knot_pkt_t *worker_task_get_pktbuf(const struct qr_task *task)
+void worker_task_timeout_inc(qr_task_weakptr_t taskptr)
{
- return task->pktbuf;
+ struct qr_task *task = weakptr_get(taskptr);
+ if (task)
+ task->timeouts += 1;
}
-struct request_ctx *worker_task_get_request(struct qr_task *task)
+knot_pkt_t *worker_task_get_pktbuf(qr_task_weakptr_t taskptr)
{
- return task->ctx;
+ struct qr_task *task = weakptr_get(taskptr);
+ return (task) ? task->pktbuf : NULL;
}
struct session *worker_request_get_source_session(const struct kr_request *req)
return ((struct request_ctx *)req)->source.session;
}
-uint16_t worker_task_pkt_get_msgid(struct qr_task *task)
-{
- knot_pkt_t *pktbuf = worker_task_get_pktbuf(task);
- uint16_t msg_id = knot_wire_get_id(pktbuf->wire);
- return msg_id;
-}
-
-void worker_task_pkt_set_msgid(struct qr_task *task, uint16_t msgid)
-{
- knot_pkt_t *pktbuf = worker_task_get_pktbuf(task);
- knot_wire_set_id(pktbuf->wire, msgid);
- struct kr_query *q = task_get_last_pending_query(task);
- q->id = msgid;
-}
-
-uint64_t worker_task_creation_time(struct qr_task *task)
+uint64_t worker_task_creation_time(qr_task_weakptr_t taskptr)
{
- return task->creation_time;
+ struct qr_task *task = weakptr_get(taskptr);
+ return (task) ? task->creation_time : UINT64_MAX;
}
-void worker_task_subreq_finalize(struct qr_task *task)
+void worker_task_subreq_finalize(qr_task_weakptr_t taskptr)
{
- subreq_finalize(task, NULL, NULL);
+ struct qr_task *task = weakptr_get(taskptr);
+ if (task)
+ subreq_finalize(task, NULL, NULL);
}
-bool worker_task_finished(struct qr_task *task)
+bool worker_task_finished(qr_task_weakptr_t taskptr)
{
- return task->finished;
+ struct qr_task *task = weakptr_get(taskptr);
+ return (task)
+ ? task->finished
+ : true; /* When the weak pointer is not valid, the task has most
+ probably finished. May also be a corruption, but
+ that's not very likely, I think. */
}
/** Reserve worker buffers. We assume worker's been zeroed. */
{
if (kr_fails_assert(the_worker == NULL))
return kr_error(EINVAL);
- kr_bindings_register(the_engine->L); // TODO move
/* Create main worker. */
the_worker = &the_worker_value;
#include "daemon/engine.h"
#include "lib/generic/array.h"
#include "lib/generic/trie.h"
+#include "lib/weakptr.h"
-/** Query resolution task (opaque). */
-struct qr_task;
-/** Worker state (opaque). */
-struct worker_ctx;
+/** Query resolution task weak pointer. */
+typedef weakptr_t qr_task_weakptr_t;
+
+/** Number of request within timeout window. */
+#define MAX_PENDING 4
+
+/** Maximum response time from TCP upstream, milliseconds */
+#define MAX_TCP_INACTIVITY (KR_RESOLVE_TIME_LIMIT + KR_CONN_RTT_MAX)
+
+#ifndef RECVMMSG_BATCH /* see check_bufsize() */
+#define RECVMMSG_BATCH 1
+#endif
+
+/** Various worker statistics. Sync with wrk_stats() */
+struct worker_stats {
+ size_t queries; /**< Total number of requests (from clients and internal ones). */
+ size_t concurrent; /**< The number of requests currently in processing. */
+ size_t rconcurrent; /*< TODO: remove? I see no meaningful difference from .concurrent. */
+ size_t dropped; /**< The number of requests dropped due to being badly formed. See #471. */
+
+ size_t timeout; /**< Number of outbound queries that timed out. */
+ size_t udp; /**< Number of outbound queries over UDP. */
+ size_t tcp; /**< Number of outbound queries over TCP (excluding TLS). */
+ size_t tls; /**< Number of outbound queries over TLS. */
+ size_t ipv4; /**< Number of outbound queries over IPv4.*/
+ size_t ipv6; /**< Number of outbound queries over IPv6. */
+
+ size_t err_udp; /**< Total number of write errors for UDP transport. */
+ size_t err_tcp; /**< Total number of write errors for TCP transport. */
+ size_t err_tls; /**< Total number of write errors for TLS transport. */
+ size_t err_http; /**< Total number of write errors for HTTP(S) transport. */
+};
+
+/** Freelist of available mempools. */
+typedef array_t(struct mempool *) mp_freelist_t;
+
+/** List of query resolution tasks. */
+typedef array_t(qr_task_weakptr_t) qr_tasklist_t;
+
+/** List of HTTP header names. */
+typedef array_t(const char *) doh_headerlist_t;
+
+/** \details Worker state is meant to persist during the whole life of daemon. */
+struct worker_ctx {
+ uv_loop_t *loop;
+ int count; /** unreliable, does not count systemd instance, do not use */
+ int vars_table_ref;
+ unsigned tcp_pipeline_max;
+
+ /** Addresses to bind for outgoing connections or AF_UNSPEC. */
+ struct sockaddr_in out_addr4;
+ struct sockaddr_in6 out_addr6;
+
+ uint8_t wire_buf[RECVMMSG_BATCH * KNOT_WIRE_MAX_PKTSIZE];
+
+ struct worker_stats stats;
+
+ bool too_many_open;
+ size_t rconcurrent_highwatermark;
+ /** List of active outbound TCP sessions */
+ trie_t *tcp_connected;
+ /** List of outbound TCP sessions waiting to be accepted */
+ trie_t *tcp_waiting;
+ /** Subrequest leaders (qr_task_weakptr_t), indexed by qname+qtype+qclass. */
+ trie_t *subreq_out;
+ mp_freelist_t pool_mp;
+ knot_mm_t pkt_pool;
+ unsigned int next_request_uid;
+
+ /* HTTP Headers for DoH. */
+ doh_headerlist_t doh_qry_headers;
+};
+
/** Transport session (opaque). */
struct session;
/** Zone import context (opaque). */
const struct kr_qflags *options);
/**
- * Start query resolution with given query.
+ * Start query resolution with given query. Creates a new query resolution task,
+ * to which it returns a managed weak pointer.
*
- * @return task or NULL
+ * @return task weak pointer or WEAKPTR_NULL
*/
-KR_EXPORT struct qr_task *
-worker_resolve_start(knot_pkt_t *query, struct kr_qflags options);
+KR_EXPORT
+qr_task_weakptr_t worker_resolve_start(knot_pkt_t *query, struct kr_qflags options);
/**
* Execute a request with given query.
- * It expects task to be created with \fn worker_resolve_start.
+ * It expects `taskptr` to be created with \fn worker_resolve_start.
*
* @return 0 or an error code
*/
-KR_EXPORT int worker_resolve_exec(struct qr_task *task, knot_pkt_t *query);
+KR_EXPORT int worker_resolve_exec(qr_task_weakptr_t taskptr, knot_pkt_t *query);
+
+/** Checks whether the task associated with the specified weak pointer still
+ * exists, or if it has been freed. It is safe to pass WEAKPTR_NULL to this
+ * function. */
+bool worker_task_exists(qr_task_weakptr_t taskptr);
/** @return struct kr_request associated with opaque task */
-struct kr_request *worker_task_request(struct qr_task *task);
+KR_EXPORT struct kr_request *worker_task_request(qr_task_weakptr_t taskptr);
-int worker_task_step(struct qr_task *task, const struct sockaddr *packet_source,
+int worker_task_step(qr_task_weakptr_t taskptr, const struct sockaddr *packet_source,
knot_pkt_t *packet);
-int worker_task_numrefs(const struct qr_task *task);
-
/** Finalize given task */
-int worker_task_finalize(struct qr_task *task, int state);
-
-void worker_task_complete(struct qr_task *task);
-
-void worker_task_ref(struct qr_task *task);
+int worker_task_finalize(qr_task_weakptr_t taskptr, int state);
-void worker_task_unref(struct qr_task *task);
+void worker_task_timeout_inc(qr_task_weakptr_t taskptr);
-void worker_task_timeout_inc(struct qr_task *task);
-
-int worker_add_tcp_connected(const struct sockaddr *addr, struct session *session);
int worker_del_tcp_connected(const struct sockaddr *addr);
int worker_del_tcp_waiting(const struct sockaddr* addr);
-struct session* worker_find_tcp_waiting(const struct sockaddr* addr);
-struct session* worker_find_tcp_connected(const struct sockaddr* addr);
-knot_pkt_t *worker_task_get_pktbuf(const struct qr_task *task);
-
-struct request_ctx *worker_task_get_request(struct qr_task *task);
+knot_pkt_t *worker_task_get_pktbuf(qr_task_weakptr_t taskptr);
/** Note: source session is NULL in case the request hasn't come over network. */
KR_EXPORT struct session *worker_request_get_source_session(const struct kr_request *req);
-uint16_t worker_task_pkt_get_msgid(struct qr_task *task);
-void worker_task_pkt_set_msgid(struct qr_task *task, uint16_t msgid);
-uint64_t worker_task_creation_time(struct qr_task *task);
-void worker_task_subreq_finalize(struct qr_task *task);
-bool worker_task_finished(struct qr_task *task);
+uint64_t worker_task_creation_time(qr_task_weakptr_t taskptr);
+void worker_task_subreq_finalize(qr_task_weakptr_t taskptr);
+bool worker_task_finished(qr_task_weakptr_t taskptr);
/** To be called after sending a DNS message. It mainly deals with cleanups. */
-int qr_task_on_send(struct qr_task *task, const uv_handle_t *handle, int status);
-
-/** Various worker statistics. Sync with wrk_stats() */
-struct worker_stats {
- size_t queries; /**< Total number of requests (from clients and internal ones). */
- size_t concurrent; /**< The number of requests currently in processing. */
- size_t rconcurrent; /*< TODO: remove? I see no meaningful difference from .concurrent. */
- size_t dropped; /**< The number of requests dropped due to being badly formed. See #471. */
-
- size_t timeout; /**< Number of outbound queries that timed out. */
- size_t udp; /**< Number of outbound queries over UDP. */
- size_t tcp; /**< Number of outbound queries over TCP (excluding TLS). */
- size_t tls; /**< Number of outbound queries over TLS. */
- size_t ipv4; /**< Number of outbound queries over IPv4.*/
- size_t ipv6; /**< Number of outbound queries over IPv6. */
-
- size_t err_udp; /**< Total number of write errors for UDP transport. */
- size_t err_tcp; /**< Total number of write errors for TCP transport. */
- size_t err_tls; /**< Total number of write errors for TLS transport. */
- size_t err_http; /**< Total number of write errors for HTTP(S) transport. */
-};
-
-/** @cond internal */
-
-/** Number of request within timeout window. */
-#define MAX_PENDING 4
-
-/** Maximum response time from TCP upstream, milliseconds */
-#define MAX_TCP_INACTIVITY (KR_RESOLVE_TIME_LIMIT + KR_CONN_RTT_MAX)
-
-#ifndef RECVMMSG_BATCH /* see check_bufsize() */
-#define RECVMMSG_BATCH 1
-#endif
-
-/** Freelist of available mempools. */
-typedef array_t(struct mempool *) mp_freelist_t;
-
-/** List of query resolution tasks. */
-typedef array_t(struct qr_task *) qr_tasklist_t;
-
-/** List of HTTP header names. */
-typedef array_t(const char *) doh_headerlist_t;
-
-/** \details Worker state is meant to persist during the whole life of daemon. */
-struct worker_ctx {
- uv_loop_t *loop;
- int count; /** unreliable, does not count systemd instance, do not use */
- int vars_table_ref;
- unsigned tcp_pipeline_max;
-
- /** Addresses to bind for outgoing connections or AF_UNSPEC. */
- struct sockaddr_in out_addr4;
- struct sockaddr_in6 out_addr6;
-
- uint8_t wire_buf[RECVMMSG_BATCH * KNOT_WIRE_MAX_PKTSIZE];
-
- struct worker_stats stats;
-
- bool too_many_open;
- size_t rconcurrent_highwatermark;
- /** List of active outbound TCP sessions */
- trie_t *tcp_connected;
- /** List of outbound TCP sessions waiting to be accepted */
- trie_t *tcp_waiting;
- /** Subrequest leaders (struct qr_task*), indexed by qname+qtype+qclass. */
- trie_t *subreq_out;
- mp_freelist_t pool_mp;
- knot_mm_t pkt_pool;
- unsigned int next_request_uid;
-
- /* HTTP Headers for DoH. */
- doh_headerlist_t doh_qry_headers;
-};
-
-/** @endcond */
-
+int qr_task_on_send(qr_task_weakptr_t task, const uv_handle_t *handle, int status);
GRP_NAME_ITEM(LOG_GRP_DEVEL),
GRP_NAME_ITEM(LOG_GRP_RENUMBER),
GRP_NAME_ITEM(LOG_GRP_EDE),
+ GRP_NAME_ITEM(LOG_GRP_WEAKPTR),
GRP_NAME_ITEM(LOG_GRP_REQDBG),
{ NULL, LOG_GRP_UNKNOWN },
};
LOG_GRP_DEVEL,
LOG_GRP_RENUMBER,
LOG_GRP_EDE,
+ LOG_GRP_WEAKPTR,
/* ^^ Add new log groups above ^^. */
LOG_GRP_REQDBG, /* Must be first non-displayed entry in enum! */
};
#define LOG_GRP_DEVEL_TAG "devel" /**< ``devel``: for development purposes */
#define LOG_GRP_RENUMBER_TAG "renum" /**< ``renum``: operation related to renumber */
#define LOG_GRP_EDE_TAG "exterr" /**< ``exterr``: extended error module */
+#define LOG_GRP_WEAKPTR_TAG "weakptr" /**< ``weakptr``: weak pointer manager */
#define LOG_GRP_REQDBG_TAG "reqdbg" /**< ``reqdbg``: debug logs enabled by policy actions */
///@}
'selection_forward.c',
'selection_iter.c',
'utils.c',
+ 'weakptr.c',
'zonecut.c',
])
c_src_lint += libkres_src
'selection_forward.h',
'selection_iter.h',
'utils.h',
+ 'weakptr.h',
'zonecut.h',
])
--- /dev/null
+/* Copyright (C) 2015-2022 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ */
+
+#include "contrib/mempattern.h"
+#include "lib/generic/trie.h"
+
+#include "lib/weakptr.h"
+
+struct weakptr_manager {
+ trie_t *weak_to_mem; /**< Key: weakptr_t;
+ * Value: Pointer to managed memory */
+ weakptr_t next_val; /**< Value of the next created weakptr_t */
+};
+
+static struct weakptr_manager the_manager_val = {0};
+static struct weakptr_manager *the_manager = NULL;
+
+static inline weakptr_t weakptr_next()
+{
+ weakptr_t ptr = the_manager->next_val++;
+ if (the_manager->next_val == WEAKPTR_NULL)
+ the_manager->next_val++;
+ return ptr;
+}
+
+int weakptr_manager_init()
+{
+ kr_assert(!the_manager);
+ the_manager = &the_manager_val;
+
+ the_manager->next_val = 1;
+ the_manager->weak_to_mem = trie_create(NULL);
+ kr_require(the_manager->weak_to_mem);
+
+ return kr_ok();
+}
+
+void weakptr_manager_deinit()
+{
+ kr_assert(the_manager);
+ size_t leaked = trie_weight(the_manager->weak_to_mem);
+ if (leaked) {
+ kr_log_debug(WEAKPTR, "Leaked %zu weak pointers!\n", leaked);
+ /* TODO: Add an assertion here when proper session cleanup
+ * is implemented. Without it, these messages are pretty
+ * much useless false-positives. */
+ }
+ trie_free(the_manager->weak_to_mem);
+ the_manager = NULL;
+}
+
+weakptr_t weakptr_mm_alloc(knot_mm_t *mm, size_t size, void **naked_ptr)
+{
+ bool rolled_over = false;
+ weakptr_t initial_ptr = weakptr_next();
+ weakptr_t ptr = initial_ptr;
+ trie_val_t *val = NULL;
+
+ do {
+ /* Create reference to managed memory */
+ val = trie_get_ins(the_manager->weak_to_mem,
+ (const char *)&ptr, sizeof(weakptr_t));
+ if (!val)
+ goto exit_err;
+
+ if (*val) {
+ /* The pointer already exists, find a new one */
+
+ if (ptr == initial_ptr && rolled_over) {
+ /* Ran out of weak-pointer space */
+ return WEAKPTR_NULL;
+ }
+
+ rolled_over = true;
+ ptr = weakptr_next();
+ continue;
+ }
+
+ /* Allocate the managed memory */
+ *val = mm_alloc(mm, size);
+ if (!*val)
+ goto exit_err;
+
+ *naked_ptr = *val;
+ return ptr;
+ } while (true);
+
+exit_err:
+ if (val) {
+ trie_del(the_manager->weak_to_mem,
+ (const char *)&ptr, sizeof(weakptr_t), NULL);
+ }
+ return WEAKPTR_NULL;
+}
+
+void weakptr_mm_free(knot_mm_t *mm, weakptr_t ptr)
+{
+ if (!ptr)
+ return;
+
+ /* Delete reference to managed memory */
+ trie_val_t val;
+ int ret = trie_del(the_manager->weak_to_mem,
+ (const char *)&ptr, sizeof(weakptr_t), &val);
+ if (kr_fails_assert(ret == KNOT_EOK))
+ return;
+
+ mm_free(mm, val);
+}
+
+void *weakptr_get(weakptr_t ptr)
+{
+ if (ptr == WEAKPTR_NULL)
+ return NULL;
+
+ trie_val_t *val = trie_get_try(the_manager->weak_to_mem,
+ (const char *)&ptr, sizeof(weakptr_t));
+ return (val) ? *val : NULL;
+}
--- /dev/null
+/* Copyright (C) 2015-2022 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ */
+
+#pragma once
+
+#include <stdint.h>
+#include <stdlib.h>
+
+#include <libknot/mm_ctx.h>
+#include "lib/defines.h"
+#include "lib/utils.h"
+
+/** A managed weak pointer to dynamic memory. Its actual pointer value may
+ * be retrieved using `weakptr_get()`.
+ *
+ * Like a normal pointer, a weak pointer may have a special value
+ * of 0 (WEAKPTR_NULL), indicating that the pointer represents no valid memory.
+ *
+ * For readability, it is encouraged to create own `typedef` declarations
+ * for weak pointers to particular data types. Consider `weakptr_t`
+ * an equivalent of `void *`. */
+typedef unsigned long weakptr_t;
+
+/** Special value for `weakptr_t` indicating that it represents no valid memory. */
+#define WEAKPTR_NULL ((weakptr_t) 0)
+
+/** Initializes the weak pointer manager structure. Must be deinitialized
+ * using `weakptr_manager_deinit()`. */
+KR_EXPORT
+int weakptr_manager_init();
+
+/** Deinitializes the weak pointer manager structure. */
+KR_EXPORT
+void weakptr_manager_deinit();
+
+/** Allocates weak-pointer-managed dynamic memory of `size` using the memory
+ * pool `mm`. `mm` may be `NULL`, `malloc()` will then be used instead of a memory
+ * pool.
+ *
+ * If `naked_ptr` is not `NULL`, its target is assigned the naked (non-weak)
+ * pointer to the newly allocated memory on success. On error, `*naked_ptr`
+ * remains unchanged.
+ *
+ * Returns `WEAKPTR_NULL` on error. */
+KR_EXPORT
+weakptr_t weakptr_mm_alloc(knot_mm_t *mm, size_t size, void **naked_ptr);
+
+/** Allocates weak-pointer-managed dynamic memory of `size` using `malloc()`.
+ *
+ * If `naked_ptr` is not `NULL`, its target is assigned the naked (non-weak)
+ * pointer to the newly allocated memory on success. On error, `*naked_ptr`
+ * remains unchanged.
+ *
+ * Returns `WEAKPTR_NULL` on error. */
+static inline weakptr_t weakptr_malloc(size_t size, void **naked_ptr)
+{
+ return weakptr_mm_alloc(NULL, size, naked_ptr);
+}
+
+/** Frees the specified `ptr` using the memory pool `mm`. `mm` may be `NULL`,
+ * `free()` will then be used instead of a memory pool. */
+KR_EXPORT
+void weakptr_mm_free(knot_mm_t *mm, weakptr_t ptr);
+
+/** Frees the specified `ptr` using `free()`. If `ptr` is `WEAKPTR_NULL`,
+ * no operation is performed. */
+static inline void weakptr_free(weakptr_t ptr)
+{
+ weakptr_mm_free(NULL, ptr);
+}
+
+/** Retrieves the value of the `ptr`. Returns `NULL` if the memory pointed to
+ * by `ptr` has already been freed, or if `ptr` itself is `WEAKPTR_NULL`. */
+KR_EXPORT
+void *weakptr_get(weakptr_t ptr);