]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon/session: migrate from array_t to trie_t & queue_t; daemon/worker: some simplif...
authorGrigorii Demidov <grigorii.demidov@nic.cz>
Thu, 27 Sep 2018 14:56:02 +0000 (16:56 +0200)
committerVladimír Čunát <vladimir.cunat@nic.cz>
Fri, 12 Oct 2018 15:36:45 +0000 (17:36 +0200)
daemon/session.c
daemon/session.h
daemon/worker.c
daemon/worker.h

index 830ca4d77c0a56530b1cdc6e63ab22c49ec876a6..1dbe6f670f046ab5075adeafe647da8847088375 100644 (file)
@@ -8,9 +8,7 @@
 #include "daemon/tls.h"
 #include "daemon/worker.h"
 #include "daemon/io.h"
-
-/** List of tasks. */
-typedef array_t(struct qr_task *) session_tasklist_t;
+#include "lib/generic/queue.h"
 
 /* Per-session (TCP or UDP) persistent structure,
  * that exists between remote counterpart and a local socket.
@@ -24,8 +22,8 @@ struct session {
        struct tls_ctx_t *tls_ctx;   /**< server side tls-related data. */
        struct tls_client_ctx_t *tls_client_ctx; /**< client side tls-related data. */
 
-       session_tasklist_t tasks;    /**< list of tasks which assotiated with given session. */
-       session_tasklist_t waiting;  /**< list of tasks been waiting for IO (subset of taska). */
+       trie_t *tasks;               /**< list of tasks assotiated with given session. */
+       queue_t(struct qr_task *) waiting;  /**< list of tasks waiting for sending to upstream. */
 
        uint8_t *wire_buf;           /**< Buffer for DNS message. */
        ssize_t wire_buf_size;       /**< Buffer size. */
@@ -54,7 +52,7 @@ static void on_session_timer_close(uv_handle_t *timer)
 void session_free(struct session *session)
 {
        if (session) {
-               assert(session->tasks.len == 0 && session->waiting.len == 0);
+               assert(session_is_empty(session));
                session_clear(session);
                free(session);
        }
@@ -62,12 +60,13 @@ void session_free(struct session *session)
 
 void session_clear(struct session *session)
 {
-       assert(session->tasks.len == 0 && session->waiting.len == 0);
+       assert(session_is_empty(session));
        if (session->handle && session->handle->type == UV_TCP) {
                free(session->wire_buf);
        }
-       array_clear(session->tasks);
-       array_clear(session->waiting);
+       trie_clear(session->tasks);
+       trie_free(session->tasks);
+       queue_deinit(session->waiting);
        tls_free(session->tls_ctx);
        tls_client_ctx_free(session->tls_client_ctx);
        memset(session, 0, sizeof(*session));
@@ -75,8 +74,7 @@ void session_clear(struct session *session)
 
 void session_close(struct session *session)
 {
-       assert(session->tasks.len == 0 && session->waiting.len == 0);
-
+       assert(session_is_empty(session));
        if (session->sflags.closing) {
                return;
        }
@@ -84,8 +82,7 @@ void session_close(struct session *session)
        uv_handle_t *handle = session->handle;
        io_stop_read(handle);
        session->sflags.closing = true;
-       if (session->sflags.outgoing &&
-           session->peer.ip.sa_family != AF_UNSPEC) {
+       if (session->peer.ip.sa_family != AF_UNSPEC) {
                struct worker_ctx *worker = handle->loop->data;
                struct sockaddr *peer = &session->peer.ip;
                worker_del_tcp_connected(worker, peer);
@@ -111,98 +108,124 @@ int session_start_read(struct session *session)
        return io_start_read(session->handle);
 }
 
-int session_waitinglist_add(struct session *session, struct qr_task *task)
+int session_waitinglist_push(struct session *session, struct qr_task *task)
 {
-       for (int i = 0; i < session->waiting.len; ++i) {
-               if (session->waiting.at[i] == task) {
-                       return i;
-               }
-       }
-       int ret = array_push(session->waiting, task);
-       if (ret >= 0) {
-               worker_task_ref(task);
-       }
-       return ret;
+       queue_push(session->waiting, task);
+       worker_task_ref(task);
+       return kr_ok();
 }
 
-int session_waitinglist_del(struct session *session, struct qr_task *task)
+struct qr_task *session_waitinglist_get(const struct session *session)
 {
-       int ret = kr_error(ENOENT);
-       for (int i = 0; i < session->waiting.len; ++i) {
-               if (session->waiting.at[i] == task) {
-                       array_del(session->waiting, i);
-                       worker_task_unref(task);
-                       ret = kr_ok();
-                       break;
-               }
-       }
-       return ret;
+       return queue_head(session->waiting);
 }
 
-int session_waitinglist_del_index(struct session *session, int index)
+struct qr_task *session_waitinglist_pop(struct session *session, bool deref)
 {
-       int ret = kr_error(ENOENT);
-       if (index < session->waiting.len) {
-               struct qr_task *task = session->waiting.at[index];
-               array_del(session->waiting, index);
-               worker_task_unref(task);
-               ret = kr_ok();
+       struct qr_task *t = session_waitinglist_get(session);
+       queue_pop(session->waiting);
+       if (deref) {
+               worker_task_unref(t);
        }
-       return ret;
+       return t;
 }
 
 int session_tasklist_add(struct session *session, struct qr_task *task)
 {
-       for (int i = 0; i < session->tasks.len; ++i) {
-               if (session->tasks.at[i] == task) {
-                       return i;
-               }
+       trie_t *t = session->tasks;
+       uint16_t task_msg_id = 0;
+       const char *key = NULL;
+       size_t key_len = 0;
+       if (session->sflags.outgoing) {
+               knot_pkt_t *pktbuf = worker_task_get_pktbuf(task);
+               task_msg_id = knot_wire_get_id(pktbuf->wire);
+               key = (const char *)&task_msg_id;
+               key_len = sizeof(task_msg_id);
+       } else {
+               key = (const char *)task;
+               key_len = sizeof(task);
+       }
+       trie_val_t *v = trie_get_ins(t, key, key_len);
+       if (unlikely(!v)) {
+               assert(false);
+               return kr_error(ENOMEM);
        }
-       int ret = array_push(session->tasks, task);
-       if (ret >= 0) {
+       if (*v == NULL) {
+               *v = task;
                worker_task_ref(task);
+       } else if (*v != task) {
+               assert(false);
+               return kr_error(ENOMEM);
        }
-       return ret;
+       return kr_ok();
 }
 
 int session_tasklist_del(struct session *session, struct qr_task *task)
 {
-       int ret = kr_error(ENOENT);
-       for (int i = 0; i < session->tasks.len; ++i) {
-               if (session->tasks.at[i] == task) {
-                       array_del(session->tasks, i);
-                       worker_task_unref(task);
-                       ret = kr_ok();
-                       break;
-               }
+       trie_t *t = session->tasks;
+       uint16_t task_msg_id = 0;
+       const char *key = NULL;
+       size_t key_len = 0;
+       trie_val_t val;
+       if (session->sflags.outgoing) {
+               knot_pkt_t *pktbuf = worker_task_get_pktbuf(task);
+               task_msg_id = knot_wire_get_id(pktbuf->wire);
+               key = (const char *)&task_msg_id;
+               key_len = sizeof(task_msg_id);
+       } else {
+               key = (const char *)task;
+               key_len = sizeof(task);
+       }
+       int ret = trie_del(t, key, key_len, &val);
+       if (ret == kr_ok()) {
+               assert(val == task);
+               worker_task_unref(val);
        }
        return ret;
 }
 
-int session_tasklist_del_index(struct session *session, int index)
+struct qr_task *session_tasklist_get_first(struct session *session)
 {
-       int ret = kr_error(ENOENT);
-       if (index < session->tasks.len) {
-               struct qr_task *task = session->tasks.at[index];
-               array_del(session->tasks, index);
-               worker_task_unref(task);
-               ret = kr_ok();
+       trie_val_t *val = trie_get_first(session->tasks, NULL, NULL);
+       return val ? (struct qr_task *) *val : NULL;
+}
+
+struct qr_task *session_tasklist_del_first(struct session *session, bool deref)
+{
+       trie_val_t val = NULL;
+       int res = trie_del_first(session->tasks, NULL, NULL, &val);
+       if (res != kr_ok()) {
+               val = NULL;
+       } else if (deref) {
+               worker_task_unref(val);
+       }
+       return (struct qr_task *)val;
+}
+struct qr_task* session_tasklist_del_msgid(const struct session *session, uint16_t msg_id)
+{
+       trie_t *t = session->tasks;
+       assert(session->sflags.outgoing);
+       struct qr_task *ret = NULL;
+       const char *key = (const char *)&msg_id;
+       size_t key_len = sizeof(msg_id);
+       trie_val_t val;
+       int res = trie_del(t, key, key_len, &val);
+       if (res == kr_ok()) {
+               ret = val;
+               assert(worker_task_numrefs(ret) > 1);
+               worker_task_unref(ret);
        }
        return ret;
 }
 
-struct qr_task* session_tasklist_find(const struct session *session, uint16_t msg_id)
+struct qr_task* session_tasklist_find_msgid(const struct session *session, uint16_t msg_id)
 {
+       trie_t *t = session->tasks;
+       assert(session->sflags.outgoing);
        struct qr_task *ret = NULL;
-       const session_tasklist_t *tasklist = &session->tasks;
-       for (size_t i = 0; i < tasklist->len; ++i) {
-               struct qr_task *task = tasklist->at[i];
-               knot_pkt_t *pktbuf = worker_task_get_pktbuf(task);
-               uint16_t task_msg_id = knot_wire_get_id(pktbuf->wire);
-               if (task_msg_id == msg_id) {
-                       ret = task;
-                       break;
-               }
+       trie_val_t *val = trie_get_try(t, (char *)&msg_id, sizeof(msg_id));
+       if (val) {
+               ret = *val;
        }
        return ret;
 }
@@ -249,6 +272,11 @@ uv_handle_t *session_get_handle(struct session *session)
        return session->handle;
 }
 
+struct session *session_get(uv_handle_t *h)
+{
+       return h->data;
+}
+
 struct session *session_new(uv_handle_t *handle)
 {
        if (!handle) {
@@ -259,6 +287,8 @@ struct session *session_new(uv_handle_t *handle)
                return NULL;
        }
 
+       queue_init(session->waiting);
+       session->tasks = trie_create(NULL);
        if (handle->type == UV_TCP) {
                uint8_t *wire_buf = malloc(KNOT_WIRE_MAX_PKTSIZE);
                if (!wire_buf) {
@@ -293,12 +323,12 @@ struct session *session_new(uv_handle_t *handle)
 
 size_t session_tasklist_get_len(const struct session *session)
 {
-       return session->tasks.len;
+       return trie_weight(session->tasks);
 }
 
 size_t session_waitinglist_get_len(const struct session *session)
 {
-       return session->waiting.len;
+       return queue_len(session->waiting);
 }
 
 bool session_tasklist_is_empty(const struct session *session)
@@ -327,30 +357,10 @@ void session_set_has_tls(struct session *session, bool has_tls)
        session->sflags.has_tls = has_tls;
 }
 
-struct qr_task *session_waitinglist_get_first(const struct session *session)
-{
-       struct qr_task *t = NULL;
-       if (session->waiting.len > 0) {
-               t = session->waiting.at[0];
-       }
-       return t;
-}
-
-struct qr_task *session_tasklist_get_first(const struct session *session)
-{
-       struct qr_task *t = NULL;
-       if (session->tasks.len > 0) {
-               t = session->tasks.at[0];
-       }
-       return t;
-}
-
 void session_waitinglist_retry(struct session *session, bool increase_timeout_cnt)
 {
-       while (session->waiting.len > 0) {
-               struct qr_task *task = session->waiting.at[0];
-               session_tasklist_del(session, task);
-               array_del(session->waiting, 0);
+       while (!session_waitinglist_is_empty(session)) {
+               struct qr_task *task = session_waitinglist_pop(session, false);
                assert(worker_task_numrefs(task) > 1);
                if (increase_timeout_cnt) {
                        worker_task_timeout_inc(task);
@@ -362,10 +372,8 @@ void session_waitinglist_retry(struct session *session, bool increase_timeout_cn
 
 void session_waitinglist_finalize(struct session *session, int status)
 {
-       while (session->waiting.len > 0) {
-               struct qr_task *t = session->waiting.at[0];
-               array_del(session->waiting, 0);
-               session_tasklist_del(session, t);
+       while (!session_waitinglist_is_empty(session)) {
+               struct qr_task *t = session_waitinglist_pop(session, false);
                if (session->sflags.outgoing) {
                        worker_task_finalize(t, status);
                } else {
@@ -379,9 +387,9 @@ void session_waitinglist_finalize(struct session *session, int status)
 
 void session_tasklist_finalize(struct session *session, int status)
 {
-       while (session->tasks.len > 0) {
-               struct qr_task *t = session->tasks.at[0];
-               array_del(session->tasks, 0);
+       while (session_tasklist_get_len(session) > 0) {
+               struct qr_task *t = session_tasklist_del_first(session, false);
+               assert(worker_task_numrefs(t) > 0);
                if (session->sflags.outgoing) {
                        worker_task_finalize(t, status);
                } else {
@@ -690,8 +698,6 @@ void session_kill_ioreq(struct session *s, struct qr_task *task)
        }
        /* TCP-specific code now. */
        if (s->handle->type != UV_TCP) abort();
-       session_waitinglist_del(s, task);
-       session_tasklist_del(s, task);
 
        int res = 0;
 
@@ -714,4 +720,3 @@ void session_kill_ioreq(struct session *s, struct qr_task *task)
                session_close(s);
        }
 }
-
index 096dbd3a54bd881916381ce4cc97535b48e67a1e..182cabec3646d40d99e358c7817d57e7d55b8564 100644 (file)
@@ -18,7 +18,6 @@
 
 #include <stdbool.h>
 #include <uv.h>
-#include "lib/generic/array.h"
 
 struct qr_task;
 struct worker_ctx;
@@ -47,16 +46,14 @@ int session_start_read(struct session *session);
 /** List of tasks been waiting for IO. */
 /** Check if list is empty. */
 bool session_waitinglist_is_empty(const struct session *session);
+/** Add task to the end of the list. */
+int session_waitinglist_push(struct session *session, struct qr_task *task);
 /** Get the first element. */
-struct qr_task *session_waitinglist_get_first(const struct session *session);
+struct qr_task *session_waitinglist_get(const struct session *session);
+/** Get the first element and remove it from the list. */
+struct qr_task *session_waitinglist_pop(struct session *session, bool deref);
 /** Get the list length. */
 size_t session_waitinglist_get_len(const struct session *session);
-/** Add task to the list. */
-int session_waitinglist_add(struct session *session, struct qr_task *task);
-/** Remove task from the list. */
-int session_waitinglist_del(struct session *session, struct qr_task *task);
-/** Remove task from the list by index. */
-int session_waitinglist_del_index(struct session *session, int index);
 /** Retry resolution for each task in the list. */
 void session_waitinglist_retry(struct session *session, bool increase_timeout_cnt);
 /** Finalize all tasks in the list. */
@@ -66,17 +63,19 @@ void session_waitinglist_finalize(struct session *session, int status);
 /** Check if list is empty. */
 bool session_tasklist_is_empty(const struct session *session);
 /** Get the first element. */
-struct qr_task *session_tasklist_get_first(const struct session *session);
+struct qr_task *session_tasklist_get_first(struct session *session);
+/** Get the first element and remove it from the list. */
+struct qr_task *session_tasklist_del_first(struct session *session, bool deref);
 /** Get the list length. */
 size_t session_tasklist_get_len(const struct session *session);
 /** Add task to the list. */
 int session_tasklist_add(struct session *session, struct qr_task *task);
 /** Remove task from the list. */
 int session_tasklist_del(struct session *session, struct qr_task *task);
-/** Remove task from the list by index. */
-int session_tasklist_del_index(struct session *session, int index);
+/** Remove task with given msg_id, session_flags(session)->outgoing must be true. */
+struct qr_task* session_tasklist_del_msgid(const struct session *session, uint16_t msg_id);
 /** Find task with given msg_id */
-struct qr_task* session_tasklist_find(const struct session *session, uint16_t msg_id);
+struct qr_task* session_tasklist_find_msgid(const struct session *session, uint16_t msg_id);
 /** Finalize all tasks in the list. */
 void session_tasklist_finalize(struct session *session, int status);
 
@@ -103,6 +102,7 @@ struct tls_common_ctx *session_tls_get_common_ctx(const struct session *session)
 
 /** Get pointer to underlying libuv handle for IO operations. */
 uv_handle_t *session_get_handle(struct session *session);
+struct session *session_get(uv_handle_t *h);
 
 /** Start session timer. */
 int session_timer_start(struct session *session, uv_timer_cb cb,
@@ -139,4 +139,3 @@ knot_pkt_t *session_produce_packet(struct session *session, knot_mm_t *mm);
 int session_discard_packet(struct session *session, const knot_pkt_t *pkt);
 
 void session_kill_ioreq(struct session *s, struct qr_task *task);
-
index ebf7942c5a68f00b0d0a9cae71323f8a976f080e..c2d2fc3b991aee00465c5c03ed3399108eb9685b 100644 (file)
@@ -110,7 +110,7 @@ static void qr_task_free(struct qr_task *task);
 static int qr_task_step(struct qr_task *task,
                        const struct sockaddr *packet_source,
                        knot_pkt_t *packet);
-static int qr_task_send(struct qr_task *task, uv_handle_t *handle,
+static int qr_task_send(struct qr_task *task, struct session *session,
                        struct sockaddr *addr, knot_pkt_t *pkt);
 static int qr_task_finalize(struct qr_task *task, int state);
 static void qr_task_complete(struct qr_task *task);
@@ -134,7 +134,7 @@ static inline struct worker_ctx *get_worker(void)
 
 /*! @internal Create a UDP/TCP handle for an outgoing AF_INET* connection.
  *  socktype is SOCK_* */
-static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype, sa_family_t family)
+static uv_handle_t *ioreq_spawn(struct worker_ctx *worker, int socktype, sa_family_t family)
 {
        bool precond = (socktype == SOCK_DGRAM || socktype == SOCK_STREAM)
                        && (family == AF_INET  || family == AF_INET6);
@@ -144,11 +144,7 @@ static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype, sa_family_t
                return NULL;
        }
 
-       if (task->pending_count >= MAX_PENDING) {
-               return NULL;
-       }
        /* Create connection for iterative query */
-       struct worker_ctx *worker = task->ctx->worker;
        uv_handle_t *handle = malloc(socktype == SOCK_DGRAM
                                        ? sizeof(uv_udp_t) : sizeof(uv_tcp_t));
        if (!handle) {
@@ -182,20 +178,16 @@ static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype, sa_family_t
                }
        }
 
-       /* Set current handle as a subrequest type. */
-       struct session *session = handle->data;
-       if (ret == 0) {
-               session_flags(session)->outgoing = true;
-               ret = session_tasklist_add(session, task);
-       }
-       if (ret < 0) {
+       if (ret != 0) {
                io_deinit(handle);
                free(handle);
                return NULL;
        }
+
+       /* Set current handle as a subrequest type. */
+       struct session *session = handle->data;
+       session_flags(session)->outgoing = true;
        /* Connect or issue query datagram */
-       task->pending[task->pending_count] = handle;
-       task->pending_count += 1;
        return handle;
 }
 
@@ -585,37 +577,6 @@ static int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status
                                assert (session_get_handle(source_s) == handle);
                        }
                }
-               if (handle->type == UV_TCP && outgoing && !session_waitinglist_is_empty(s)) {
-                       session_waitinglist_del(s, task);
-                       if (session_flags(s)->closing) {
-                               return status;
-                       }
-                       /* Finalize the task, if any errors.
-                        * We can't add it to the end of waiting list for retrying
-                        * since it may lead endless loop in some circumstances
-                        * (for instance: tls; send->tls_push->too many non-critical errors->
-                        * on_send with nonzero status->re-add to waiting->send->etc).*/
-                       if (status != 0) {
-                               if (outgoing) {
-                                       qr_task_finalize(task, KR_STATE_FAIL);
-                               } else {
-                                       assert(task->ctx->source.session == s);
-                                       task->ctx->source.session = NULL;
-                               }
-                               session_tasklist_del(s, task);
-                       }
-                       struct qr_task *waiting_task = session_waitinglist_get_first(s);
-                       if (waiting_task) {
-                               struct sockaddr *peer = session_get_peer(s);
-                               knot_pkt_t *pkt = waiting_task->pktbuf;
-                               int ret = qr_task_send(waiting_task, handle, peer, pkt);
-                               if (ret != kr_ok()) {
-                                       session_tasks_finalize(s, KR_STATE_FAIL);
-                                       session_close(s);
-                                       return status;
-                               }
-                       }
-               }
                if (!session_flags(s)->closing) {
                        io_start_read(handle); /* Start reading new query */
                }
@@ -626,32 +587,60 @@ static int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status
 static void on_send(uv_udp_send_t *req, int status)
 {
        struct qr_task *task = req->data;
-       qr_task_on_send(task, (uv_handle_t *)(req->handle), status);
+       uv_handle_t *h = (uv_handle_t *)req->handle;
+       qr_task_on_send(task, h, status);
        qr_task_unref(task);
        free(req);
 }
-static void on_task_write(uv_write_t *req, int status)
+
+static void on_write(uv_write_t *req, int status)
 {
        struct qr_task *task = req->data;
-       qr_task_on_send(task, (uv_handle_t *)(req->handle), status);
+       uv_handle_t *h = (uv_handle_t *)req->handle;
+       qr_task_on_send(task, h, status);
        qr_task_unref(task);
        free(req);
 }
 
-static int qr_task_send(struct qr_task *task, uv_handle_t *handle,
+static int qr_task_send(struct qr_task *task, struct session *session,
                        struct sockaddr *addr, knot_pkt_t *pkt)
 {
-       if (!handle) {
-               return qr_task_on_send(task, handle, kr_error(EIO));
+       if (!session) {
+               return qr_task_on_send(task, NULL, kr_error(EIO));
        }
 
        int ret = 0;
        struct request_ctx *ctx = task->ctx;
        struct kr_request *req = &ctx->req;
 
+       uv_handle_t *handle = session_get_handle(session);
+       assert(handle && handle->data == session);
        const bool is_stream = handle->type == UV_TCP;
        if (!is_stream && handle->type != UV_UDP) abort();
 
+       if (addr == NULL) {
+               addr = session_get_peer(session);
+       }
+
+       if (pkt == NULL) {
+               pkt = worker_task_get_pktbuf(task);
+       }
+
+       if (session_flags(session)->outgoing) {
+               size_t try_limit = session_tasklist_get_len(session) + 1;
+               uint16_t msg_id = knot_wire_get_id(pkt->wire);
+               int try_count = 0;
+               while (session_tasklist_find_msgid(session, msg_id) &&
+                      try_count <= try_limit) {
+                       ++msg_id;
+                       ++try_count;
+               }
+               if (try_count > try_limit) {
+                       return qr_task_on_send(task, handle, kr_error(EIO));
+               }
+               worker_task_pkt_set_msgid(task, msg_id);
+       }
+
        if (knot_wire_get_qr(pkt->wire) == 0) {
                /*
                 * Query must be finalised using destination address before
@@ -681,13 +670,13 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle,
        /* Pending ioreq on current task */
        qr_task_ref(task);
 
+       struct worker_ctx *worker = ctx->worker;
        /* Send using given protocol */
-       struct session *session = handle->data;
        assert(!session_flags(session)->closing);
        if (session_flags(session)->has_tls) {
                uv_write_t *write_req = (uv_write_t *)ioreq;
                write_req->data = task;
-               ret = tls_write(write_req, handle, pkt, &on_task_write);
+               ret = tls_write(write_req, handle, pkt, &on_write);
        } else if (handle->type == UV_UDP) {
                uv_udp_send_t *send_req = (uv_udp_send_t *)ioreq;
                uv_buf_t buf = { (char *)pkt->wire, pkt->size };
@@ -701,13 +690,15 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle,
                        { (char *)pkt->wire, pkt->size }
                };
                write_req->data = task;
-               ret = uv_write(write_req, (uv_stream_t *)handle, buf, 2, &on_task_write);
+               ret = uv_write(write_req, (uv_stream_t *)handle, buf, 2, &on_write);
        } else {
                assert(false);
        }
 
-       struct worker_ctx *worker = ctx->worker;
        if (ret == 0) {
+               if (session_flags(session)->outgoing) {
+                       session_tasklist_add(session, task);
+               }
                if (worker->too_many_open &&
                    worker->stats.rconcurrent <
                        worker->rconcurrent_highwatermark - 10) {
@@ -719,6 +710,7 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle,
                if (ret == UV_EMFILE) {
                        worker->too_many_open = true;
                        worker->rconcurrent_highwatermark = worker->stats.rconcurrent;
+                       ret = kr_error(UV_EMFILE);
                }
        }
 
@@ -739,18 +731,6 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle,
        return ret;
 }
 
-static int session_next_waiting_send(struct session *session)
-{
-       int ret = kr_ok();
-       if (!session_waitinglist_is_empty(session)) {
-               struct sockaddr *peer = session_get_peer(session);
-               struct qr_task *task = session_waitinglist_get_first(session);
-               uv_handle_t *handle = session_get_handle(session);
-               ret = qr_task_send(task, handle, peer, task->pktbuf);
-       }
-       return ret;
-}
-
 static int session_tls_hs_cb(struct session *session, int status)
 {
        assert(session_flags(session)->outgoing);
@@ -792,7 +772,14 @@ static int session_tls_hs_cb(struct session *session, int status)
 
        ret = worker_add_tcp_connected(worker, peer, session);
        if (deletion_res == kr_ok() && ret == kr_ok()) {
-               ret = session_next_waiting_send(session);
+               while (!session_waitinglist_is_empty(session)) {
+                       struct qr_task *t = session_waitinglist_get(session);
+                       ret = qr_task_send(t, session, NULL, NULL);
+                       if (ret != 0) {
+                               break;
+                       }
+                       session_waitinglist_pop(session, true);
+               }
        } else {
                ret = kr_error(EINVAL);
        }
@@ -831,20 +818,19 @@ static void on_connect(uv_connect_t *req, int status)
        uv_stream_t *handle = req->handle;
        struct session *session = handle->data;
        struct sockaddr *peer = session_get_peer(session);
+       free(req);
 
        assert(session_flags(session)->outgoing);
 
        if (status == UV_ECANCELED) {
                worker_del_tcp_waiting(worker, peer);
                assert(session_is_empty(session) && session_flags(session)->closing);
-               free(req);
                return;
        }
 
        if (session_flags(session)->closing) {
                worker_del_tcp_waiting(worker, peer);
                assert(session_is_empty(session));
-               free(req);
                return;
        }
 
@@ -852,9 +838,8 @@ static void on_connect(uv_connect_t *req, int status)
 
        if (status != 0) {
                worker_del_tcp_waiting(worker, peer);
-               session_waitinglist_retry(session, false);
                assert(session_tasklist_is_empty(session));
-               free(req);
+               session_waitinglist_retry(session, false);
                session_close(session);
                return;
        }
@@ -867,13 +852,12 @@ static void on_connect(uv_connect_t *req, int status)
                         * something gone wrong */
                        session_waitinglist_finalize(session, KR_STATE_FAIL);
                        assert(session_tasklist_is_empty(session));
-                       free(req);
                        session_close(session);
                        return;
                }
        }
 
-       struct qr_task *task = session_waitinglist_get_first(session);
+       struct qr_task *task = session_waitinglist_get(session);
        struct kr_query *qry = task_get_last_pending_query(task);
        WITH_VERBOSE (qry) {
                struct sockaddr *peer = session_get_peer(session);
@@ -889,7 +873,6 @@ static void on_connect(uv_connect_t *req, int status)
                struct tls_client_ctx_t *tls_ctx = session_tls_get_client_ctx(session);
                ret = tls_client_connect_start(tls_ctx, session, session_tls_hs_cb);
                if (ret == kr_error(EAGAIN)) {
-                       free(req);
                        session_start_read(session);
                        session_timer_start(session, on_tcp_watchdog_timeout,
                                            MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
@@ -898,21 +881,19 @@ static void on_connect(uv_connect_t *req, int status)
        } else {
                worker_add_tcp_connected(worker, peer, session);
        }
-
-       if (ret == kr_ok()) {
-               ret = session_next_waiting_send(session);
-               if (ret == kr_ok()) {
-                       session_timer_start(session, on_tcp_watchdog_timeout,
-                                           MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
-                       free(req);
+       while (!session_waitinglist_is_empty(session)) {
+               struct qr_task *t = session_waitinglist_get(session);
+               ret = qr_task_send(t, session, NULL, NULL);
+               if (ret != 0) {
+                       assert(session_tasklist_is_empty(session));
+                       assert(false);
+                       worker_del_tcp_connected(worker, peer);
+                       session_waitinglist_finalize(session, KR_STATE_FAIL);
+                       session_close(session);
                        return;
                }
+               session_waitinglist_pop(session, true);
        }
-
-       session_waitinglist_finalize(session, KR_STATE_FAIL);
-       assert(session_tasklist_is_empty(session));
-       free(req);
-       session_close(session);
 }
 
 static void on_tcp_connect_timeout(uv_timer_t *timer)
@@ -922,12 +903,12 @@ static void on_tcp_connect_timeout(uv_timer_t *timer)
        uv_timer_stop(timer);
        struct worker_ctx *worker = get_worker();
 
-       assert (session_waitinglist_get_len(session) == session_tasklist_get_len(session));
+       assert (session_tasklist_is_empty(session));
 
        struct sockaddr *peer = session_get_peer(session);
        worker_del_tcp_waiting(worker, peer);
 
-       struct qr_task *task = session_waitinglist_get_first(session);
+       struct qr_task *task = session_waitinglist_get(session);
        struct kr_query *qry = task_get_last_pending_query(task);
        WITH_VERBOSE (qry) {
                char peer_str[INET6_ADDRSTRLEN];
@@ -948,10 +929,12 @@ static void on_tcp_connect_timeout(uv_timer_t *timer)
 static void on_tcp_watchdog_timeout(uv_timer_t *timer)
 {
        struct session *session = timer->data;
-       struct worker_ctx *worker =  timer->loop->data;
-       struct sockaddr *peer = session_get_peer(session);
 
        assert(session_flags(session)->outgoing);
+       assert(!session_flags(session)->closing);
+
+       struct worker_ctx *worker =  timer->loop->data;
+       struct sockaddr *peer = session_get_peer(session);
 
        uv_timer_stop(timer);
 
@@ -1008,7 +991,10 @@ static uv_handle_t *retransmit(struct qr_task *task)
                if (!choice) {
                        return ret;
                }
-               ret = ioreq_spawn(task, SOCK_DGRAM, choice->sin6_family);
+               if (task->pending_count >= MAX_PENDING) {
+                       return ret;
+               }
+               ret = ioreq_spawn(task->ctx->worker, SOCK_DGRAM, choice->sin6_family);
                if (!ret) {
                        return ret;
                }
@@ -1017,8 +1003,13 @@ static uv_handle_t *retransmit(struct qr_task *task)
                struct sockaddr *peer = session_get_peer(session);
                assert (peer->sa_family == AF_UNSPEC && session_flags(session)->outgoing);
                memcpy(peer, addr, kr_sockaddr_len(addr));
-               if (qr_task_send(task, ret, (struct sockaddr *)choice,
-                                task->pktbuf) == 0) {
+               if (qr_task_send(task, session, (struct sockaddr *)choice,
+                                task->pktbuf) != 0) {
+                       session_close(session);
+                       ret = NULL;
+               } else {
+                       task->pending[task->pending_count] = session_get_handle(session);
+                       task->pending_count += 1;
                        task->addrlist_turn = (task->addrlist_turn + 1) %
                                              task->addrlist_count; /* Round robin */
                }
@@ -1133,31 +1124,29 @@ static int qr_task_finalize(struct qr_task *task, int state)
 
        /* Send back answer */
        struct session *source_session = ctx->source.session;
-       uv_handle_t *handle = session_get_handle(source_session);
        assert(!session_flags(source_session)->closing);
-       assert(handle && handle->data == ctx->source.session);
        assert(ctx->source.addr.ip.sa_family != AF_UNSPEC);
-       int res = qr_task_send(task, handle,
+       int res = qr_task_send(task, source_session,
                               (struct sockaddr *)&ctx->source.addr,
                                ctx->req.answer);
        if (res != kr_ok()) {
                (void) qr_task_on_send(task, NULL, kr_error(EIO));
                /* Since source session is erroneous detach all tasks. */
                while (!session_tasklist_is_empty(source_session)) {
-                       struct qr_task *t = session_tasklist_get_first(source_session);
+                       struct qr_task *t = session_tasklist_del_first(source_session, false);
                        struct request_ctx *c = t->ctx;
                        assert(c->source.session == source_session);
                        c->source.session = NULL;
                        /* Don't finalize them as there can be other tasks
                         * waiting for answer to this particular task.
                         * (ie. task->leading is true) */
-                       session_tasklist_del_index(source_session, 0);
+                       worker_task_unref(t);
                }
                session_close(source_session);
-       } else if (handle->type == UV_TCP && ctx->source.session) {
+       } else if (session_get_handle(source_session)->type == UV_TCP) {
                /* Don't try to close source session at least
                 * retry_interval_for_timeout_timer milliseconds */
-               session_timer_restart(ctx->source.session);
+               session_timer_restart(source_session);
        }
 
        qr_task_unref(task);
@@ -1232,7 +1221,8 @@ static int qr_task_step(struct qr_task *task,
                /* Start transmitting */
                uv_handle_t *handle = retransmit(task);
                if (handle == NULL) {
-                       return qr_task_step(task, NULL, NULL);
+                       subreq_finalize(task, packet_source, packet);
+                       return qr_task_finalize(task, KR_STATE_FAIL);
                }
                /* Check current query NSLIST */
                struct kr_query *qry = array_tail(req->rplan.pending);
@@ -1273,78 +1263,53 @@ static int qr_task_step(struct qr_task *task,
                                subreq_finalize(task, packet_source, packet);
                                return qr_task_finalize(task, KR_STATE_FAIL);
                        }
-                       /* There are waiting tasks.
-                        * It means that connection establishing or data sending
-                        * is coming right now. */
-                       /* Task will be notified in on_connect() or qr_task_on_send(). */
-                       ret = session_waitinglist_add(session, task);
-                       if (ret < 0) {
-                               subreq_finalize(task, packet_source, packet);
-                               return qr_task_finalize(task, KR_STATE_FAIL);
-                       }
-                       ret = session_tasklist_add(session, task);
+                       /* Connection is in the list of waiting connections.
+                        * It means that connection establishing is coming right now.
+                        * Add task to the end of list of waiting tasks..
+                        * It will be notified in on_connect() or qr_task_on_send(). */
+                       ret = session_waitinglist_push(session, task);
                        if (ret < 0) {
-                               session_waitinglist_del(session, task);
                                subreq_finalize(task, packet_source, packet);
                                return qr_task_finalize(task, KR_STATE_FAIL);
                        }
-                       assert(task->pending_count == 0);
-                       task->pending[task->pending_count] = session_get_handle(session);
-                       task->pending_count += 1;
                } else if ((session = worker_find_tcp_connected(ctx->worker, addr)) != NULL) {
                        /* Connection has been already established */
                        assert(session_flags(session)->outgoing);
                        if (session_flags(session)->closing) {
-                               session_tasklist_del(session, task);
                                subreq_finalize(task, packet_source, packet);
                                return qr_task_finalize(task, KR_STATE_FAIL);
                        }
 
-                       if (session_tasklist_get_len(session) >= worker->tcp_pipeline_max) {
-                               session_tasklist_del(session, task);
-                               subreq_finalize(task, packet_source, packet);
-                               return qr_task_finalize(task, KR_STATE_FAIL);
+                       session_timer_stop(session);
+                       while (!session_waitinglist_is_empty(session)) {
+                               struct qr_task *t = session_waitinglist_get(session);
+                               ret = qr_task_send(t, session, NULL, NULL);
+                               if (ret != 0) {
+                                       session_waitinglist_finalize(session, KR_STATE_FAIL);
+                                       session_tasklist_finalize(session, KR_STATE_FAIL);
+                                       subreq_finalize(task, packet_source, packet);
+                                       session_close(session);
+                                       return qr_task_finalize(task, KR_STATE_FAIL);
+                               }
+                               session_waitinglist_pop(session, true);
                        }
 
-                       /* will be removed in qr_task_on_send() */
-                       ret = session_waitinglist_add(session, task);
-                       if (ret < 0) {
-                               session_tasklist_del(session, task);
+                       ret = qr_task_send(task, session, NULL, NULL);
+                       if (ret != 0 /* && ret != kr_error(EMFILE) */) {
+                               session_tasklist_finalize(session, KR_STATE_FAIL);
                                subreq_finalize(task, packet_source, packet);
+                               session_close(session);
                                return qr_task_finalize(task, KR_STATE_FAIL);
                        }
-                       ret = session_tasklist_add(session, task);
+                       ret = session_timer_start(session, on_tcp_watchdog_timeout,
+                                                 MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
                        if (ret < 0) {
-                               session_waitinglist_del(session, task);
-                               session_tasklist_del(session, task);
+                               session_tasklist_finalize(session, KR_STATE_FAIL);
                                subreq_finalize(task, packet_source, packet);
+                               session_close(session);
                                return qr_task_finalize(task, KR_STATE_FAIL);
                        }
-                       if (session_waitinglist_get_len(session) == 1) {
-                               ret = qr_task_send(task, session_get_handle(session),
-                                                  session_get_peer(session), task->pktbuf);
-                               if (ret < 0) {
-                                       session_waitinglist_del(session, task);
-                                       session_tasklist_del(session, task);
-                                       session_tasklist_finalize(session, KR_STATE_FAIL);
-                                       subreq_finalize(task, packet_source, packet);
-                                       session_close(session);
-                                       return qr_task_finalize(task, KR_STATE_FAIL);
-                               }
-                               if (session_tasklist_get_len(session) == 1) {
-                                       session_timer_stop(session);
-                                       ret = session_timer_start(session, on_tcp_watchdog_timeout,
-                                                                 MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
-                               }
-                               if (ret < 0) {
-                                       session_waitinglist_del(session, task);
-                                       session_tasklist_del(session, task);
-                                       session_tasklist_finalize(session, KR_STATE_FAIL);
-                                       subreq_finalize(task, packet_source, packet);
-                                       session_close(session);
-                                       return qr_task_finalize(task, KR_STATE_FAIL);
-                               }
-                       }
+
                        assert(task->pending_count == 0);
                        task->pending[task->pending_count] = session_get_handle(session);
                        task->pending_count += 1;
@@ -1354,7 +1319,7 @@ static int qr_task_step(struct qr_task *task,
                        if (!conn) {
                                return qr_task_step(task, NULL, NULL);
                        }
-                       uv_handle_t *client = ioreq_spawn(task, sock_type,
+                       uv_handle_t *client = ioreq_spawn(worker, sock_type,
                                                          addr->sa_family);
                        if (!client) {
                                free(conn);
@@ -1364,16 +1329,6 @@ static int qr_task_step(struct qr_task *task,
                        session = client->data;
                        ret = worker_add_tcp_waiting(ctx->worker, addr, session);
                        if (ret < 0) {
-                               session_tasklist_del(session, task);
-                               free(conn);
-                               subreq_finalize(task, packet_source, packet);
-                               return qr_task_finalize(task, KR_STATE_FAIL);
-                       }
-                       /* will be removed in qr_task_on_send() */
-                       ret = session_waitinglist_add(session, task);
-                       if (ret < 0) {
-                               session_tasklist_del(session, task);
-                               worker_del_tcp_waiting(ctx->worker, addr);
                                free(conn);
                                subreq_finalize(task, packet_source, packet);
                                return qr_task_finalize(task, KR_STATE_FAIL);
@@ -1388,8 +1343,6 @@ static int qr_task_step(struct qr_task *task,
                                assert(session_tls_get_client_ctx(session) == NULL);
                                struct tls_client_ctx_t *tls_ctx = tls_client_ctx_new(entry, worker);
                                if (!tls_ctx) {
-                                       session_tasklist_del(session, task);
-                                       session_waitinglist_del(session, task);
                                        worker_del_tcp_waiting(ctx->worker, addr);
                                        free(conn);
                                        subreq_finalize(task, packet_source, packet);
@@ -1407,15 +1360,12 @@ static int qr_task_step(struct qr_task *task,
                        ret = session_timer_start(session, on_tcp_connect_timeout,
                                                  KR_CONN_RTT_MAX, 0);
                        if (ret != 0) {
-                               session_tasklist_del(session, task);
-                               session_waitinglist_del(session, task);
                                worker_del_tcp_waiting(ctx->worker, addr);
                                free(conn);
                                subreq_finalize(task, packet_source, packet);
                                return qr_task_finalize(task, KR_STATE_FAIL);
                        }
 
-                       struct qr_task *task = session_waitinglist_get_first(session);
                        struct kr_query *qry = task_get_last_pending_query(task);
                        WITH_VERBOSE (qry) {
                                char peer_str[INET6_ADDRSTRLEN];
@@ -1426,13 +1376,18 @@ static int qr_task_step(struct qr_task *task,
                        if (uv_tcp_connect(conn, (uv_tcp_t *)client,
                                           addr , on_connect) != 0) {
                                session_timer_stop(session);
-                               session_tasklist_del(session, task);
-                               session_waitinglist_del(session, task);
                                worker_del_tcp_waiting(ctx->worker, addr);
                                free(conn);
                                subreq_finalize(task, packet_source, packet);
                                return qr_task_step(task, NULL, NULL);
                        }
+
+                       /* will be removed in on_connect() or qr_task_on_send() */
+                       ret = session_waitinglist_push(session, task);
+                       if (ret < 0) {
+                               subreq_finalize(task, packet_source, packet);
+                               return qr_task_finalize(task, KR_STATE_FAIL);
+                       }
                }
        }
        return kr_ok();
@@ -1478,17 +1433,20 @@ int worker_submit(struct session *session, knot_pkt_t *query)
        /* Parse packet */
        int ret = parse_packet(query);
 
+       bool is_query = (knot_wire_get_qr(query->wire) == 0);
+       /* Ignore badly formed queries. */
+       if (!query ||
+           (ret != kr_ok() && ret != kr_error(EMSGSIZE)) ||
+           (is_query == session_flags(session)->outgoing)) {
+               if (query) worker->stats.dropped += 1;
+               return kr_error(EILSEQ);
+       }
+
        /* Start new task on listening sockets,
         * or resume if this is subrequest */
        struct qr_task *task = NULL;
        struct sockaddr *addr = NULL;
        if (!session_flags(session)->outgoing) { /* request from a client */
-               /* Ignore badly formed queries. */
-               if (!query || (ret != kr_ok() && ret != kr_error(EMSGSIZE)) ||
-                    knot_wire_get_qr(query->wire)) {
-                       if (query) worker->stats.dropped += 1;
-                       return kr_error(EILSEQ);
-               }
                struct request_ctx *ctx = request_create(worker, handle,
                                                         session_get_peer(session));
                if (!ctx) {
@@ -1511,12 +1469,7 @@ int worker_submit(struct session *session, knot_pkt_t *query)
                        return kr_error(ENOMEM);
                }
        } else if (query) { /* response from upstream */
-               if ((ret != kr_ok() && ret != kr_error(EMSGSIZE)) ||
-                   !knot_wire_get_qr(query->wire)) {
-                       /* Ignore badly formed responses. */
-                       return kr_error(EILSEQ);
-               }
-               task = session_tasklist_find(session, knot_wire_get_id(query->wire));
+               task = session_tasklist_del_msgid(session, knot_wire_get_id(query->wire));
                if (task == NULL) {
                        return kr_error(ENOENT);
                }
@@ -1622,6 +1575,7 @@ int worker_end_tcp(struct session *session)
        uv_handle_t *handle = session_get_handle(session);
        struct worker_ctx *worker = handle->loop->data;
        struct sockaddr *peer = session_get_peer(session);
+
        worker_del_tcp_connected(worker, peer);
        session_flags(session)->connected = false;
 
@@ -1639,8 +1593,7 @@ int worker_end_tcp(struct session *session)
 
        assert(session_tasklist_get_len(session) >= session_waitinglist_get_len(session));
        while (!session_waitinglist_is_empty(session)) {
-               struct qr_task *task = session_waitinglist_get_first(session);
-               session_waitinglist_del_index(session, 0);
+               struct qr_task *task = session_waitinglist_pop(session, false);
                assert(task->refs > 1);
                session_tasklist_del(session, task);
                if (session_flags(session)->outgoing) {
@@ -1659,10 +1612,10 @@ int worker_end_tcp(struct session *session)
                        assert(task->ctx->source.session == session);
                        task->ctx->source.session = NULL;
                }
+               worker_task_unref(task);
        }
        while (!session_tasklist_is_empty(session)) {
-               struct qr_task *task = session_tasklist_get_first(session);
-               session_tasklist_del_index(session, 0);
+               struct qr_task *task = session_tasklist_del_first(session, false);
                if (session_flags(session)->outgoing) {
                        if (task->ctx->req.options.FORWARD) {
                                struct kr_request *req = &task->ctx->req;
@@ -1675,6 +1628,7 @@ int worker_end_tcp(struct session *session)
                        assert(task->ctx->source.session == session);
                        task->ctx->source.session = NULL;
                }
+               worker_task_unref(task);
        }
        session_close(session);
        return kr_ok();
@@ -1788,6 +1742,19 @@ void worker_request_set_source_session(struct request_ctx *ctx, struct session *
        ctx->source.session = session;
 }
 
+uint16_t worker_task_pkt_get_msgid(struct qr_task *task)
+{
+       knot_pkt_t *pktbuf = worker_task_get_pktbuf(task);
+       uint16_t msg_id = knot_wire_get_id(pktbuf->wire);
+       return msg_id;
+}
+
+void worker_task_pkt_set_msgid(struct qr_task *task, uint16_t msgid)
+{
+       knot_pkt_t *pktbuf = worker_task_get_pktbuf(task);
+       knot_wire_set_id(pktbuf->wire, msgid);
+}
+
 /** Reserve worker buffers */
 static int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen)
 {
index 2231eaab766830c0eb5f09e01b8e6fc16be9cc2d..89d157f1b0db4272d9de2e3cf9a29e50ca706dcb 100644 (file)
@@ -100,6 +100,9 @@ struct session *worker_request_get_source_session(struct request_ctx *);
 
 void worker_request_set_source_session(struct request_ctx *, struct session *session);
 
+uint16_t worker_task_pkt_get_msgid(struct qr_task *task);
+void worker_task_pkt_set_msgid(struct qr_task *task, uint16_t msgid);
+
 /** @cond internal */
 
 /** Number of request within timeout window. */