]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon/worker: deduplicate outbound queries
authorMarek Vavruša <marek.vavrusa@nic.cz>
Thu, 3 Dec 2015 18:01:56 +0000 (19:01 +0100)
committerMarek Vavruša <marek.vavrusa@nic.cz>
Thu, 3 Dec 2015 21:54:24 +0000 (22:54 +0100)
worker can track outbound requests and if N resolutions want the same
subrequest, only one will lead it and others will be notified when it
finishes

this massively reduces number of outbound requests for
slow/unresponsive/low ttl requests

daemon/worker.c
daemon/worker.h
lib/utils.c

index 85b1919876dc0a1104d88d5fde7aab4dad0f5887..8872a0be3b330b8b7a25a4cf72d419a9e12ad37b 100644 (file)
@@ -56,6 +56,7 @@ struct qr_task
        struct kr_request req;
        struct worker_ctx *worker;
        knot_pkt_t *pktbuf;
+       array_t(struct qr_task *) waiting;
        uv_handle_t *pending[MAX_PENDING];
        uint16_t pending_count;
        uint16_t addrlist_count;
@@ -74,7 +75,8 @@ struct qr_task
        uint16_t iter_count;
        uint16_t refs;
        uint16_t bytes_remaining;
-       uint16_t finished;
+       bool finished;
+       bool leading;
 };
 
 /* Convenience macros */
@@ -246,12 +248,14 @@ static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *ha
        }
        task->req.answer = answer;
        task->pktbuf = pktbuf;
+       array_init(task->waiting);
        task->addrlist = NULL;
        task->pending_count = 0;
        task->bytes_remaining = 0;
        task->iter_count = 0;
        task->refs = 1;
        task->finished = false;
+       task->leading = false;
        task->worker = worker;
        task->source.handle = handle;
        uv_timer_init(worker->loop, &task->retry);
@@ -310,6 +314,8 @@ static void qr_task_complete(uv_handle_t *handle)
        struct worker_ctx *worker = task->worker;
        /* Kill pending I/O requests */
        ioreq_killall(task);
+       assert(task->waiting.len == 0);
+       assert(task->leading == false);
        /* Run the completion callback. */
        if (task->on_complete) {
                task->on_complete(worker, &task->req, task->baton);
@@ -475,16 +481,16 @@ static void on_retransmit(uv_timer_t *req)
        }
 }
 
-static int qr_task_finalize(struct qr_task *task, int state)
+/** @internal Get key from current outstanding subrequest. */
+static int subreq_key(char *dst, struct qr_task *task)
 {
-       kr_resolve_finish(&task->req, state);
-       task->finished = true;
-       /* Send back answer */
-       (void) qr_task_send(task, task->source.handle, (struct sockaddr *)&task->source.addr, task->req.answer);
-       return state == KNOT_STATE_DONE ? 0 : kr_error(EIO);
+       assert(task);
+       knot_pkt_t *pkt = task->pktbuf;
+       assert(knot_wire_get_qr(pkt->wire) == false);
+       return kr_rrmap_key(dst, knot_pkt_qname(pkt), knot_pkt_qtype(pkt), knot_pkt_qclass(pkt));
 }
 
-static void cancel_subrequests(struct qr_task *task)
+static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *pkt)
 {
        /* Close pending I/O requests */
        if (uv_is_active((uv_handle_t *)&task->retry))
@@ -492,6 +498,71 @@ static void cancel_subrequests(struct qr_task *task)
        if (uv_is_active((uv_handle_t *)&task->timeout))
                uv_timer_stop(&task->timeout);
        ioreq_killall(task);
+       /* Clear from outstanding table. */
+       if (!task->leading)
+               return;
+       char key[RRMAP_KEYSIZE];
+       int ret = subreq_key(key, task);
+       if (ret > 0) {
+               assert(map_get(&task->worker->outstanding, key) == task);
+               map_del(&task->worker->outstanding, key);
+       }
+       /* Notify waiting tasks. */
+       struct kr_query *leader_qry = TAIL(task->req.rplan.pending);
+       for (size_t i = task->waiting.len; i --> 0;) {
+               struct qr_task *follower = task->waiting.at[i];
+               struct kr_query *qry = TAIL(follower->req.rplan.pending);
+               /* Reuse MSGID and 0x20 secret */
+               if (qry) {
+                       qry->id = leader_qry->id;
+                       qry->secret = leader_qry->secret;
+                       leader_qry->secret = 0; /* Next will be already decoded */
+               }
+               qr_task_step(follower, packet_source, pkt);
+               qr_task_unref(follower);
+       }
+       task->waiting.len = 0;
+       task->leading = false;
+}
+
+static void subreq_lead(struct qr_task *task)
+{
+       assert(task);
+       char key[RRMAP_KEYSIZE];
+       if (subreq_key(key, task) > 0) {
+               assert(map_contains(&task->worker->outstanding, key) == false);
+               map_set(&task->worker->outstanding, key, task);
+               task->leading = true;
+       }
+}
+
+static bool subreq_enqueue(struct qr_task *task)
+{
+       assert(task);
+       char key[RRMAP_KEYSIZE];
+       if (subreq_key(key, task) > 0) {
+               struct qr_task *leader = map_get(&task->worker->outstanding, key);
+               if (leader) {
+                       /* Enqueue itself to leader for this subrequest. */
+                       int ret = array_reserve_mm(leader->waiting, leader->waiting.len + 1, mm_reserve, &leader->req.pool);
+                       if (ret == 0) {
+                               array_push(leader->waiting, task);
+                               qr_task_ref(task);
+                               return true;
+                       }
+               }
+       }
+       return false;
+}
+
+static int qr_task_finalize(struct qr_task *task, int state)
+{
+       assert(task && task->leading == false);
+       kr_resolve_finish(&task->req, state);
+       task->finished = true;
+       /* Send back answer */
+       (void) qr_task_send(task, task->source.handle, (struct sockaddr *)&task->source.addr, task->req.answer);
+       return state == KNOT_STATE_DONE ? 0 : kr_error(EIO);
 }
 
 static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *packet)
@@ -501,7 +572,7 @@ static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_sour
                return kr_error(ESTALE);
        }
        /* Close pending I/O requests */
-       cancel_subrequests(task);
+       subreq_finalize(task, packet_source, packet);
        /* Consume input and produce next query */
        int sock_type = -1;
        task->addrlist = NULL;
@@ -532,11 +603,20 @@ static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_sour
 
        /* Start fast retransmit with UDP, otherwise connect. */
        if (sock_type == SOCK_DGRAM) {
+               /* If such subrequest is outstanding, enqueue to it. */
+               if (subreq_enqueue(task)) {
+                       return kr_ok(); /* Will be notified when outstanding subrequest finishes. */
+               }
+               /* Start transmitting */
                if (retransmit(task)) {
                        uv_timer_start(&task->retry, on_retransmit, KR_CONN_RETRY, KR_CONN_RETRY);
                } else {
                        return qr_task_step(task, NULL, NULL);
                }
+               /* Announce and start subrequest.
+                * @note Only UDP can lead I/O as it doesn't touch 'task->pktbuf' for reassembly.
+                */
+               subreq_lead(task);
        } else {
                struct ioreq *conn = ioreq_take(task->worker);
                if (!conn) {
@@ -549,7 +629,6 @@ static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_sour
                }
                conn->as.connect.data = task;
                if (uv_tcp_connect(&conn->as.connect, (uv_tcp_t *)client, task->addrlist, on_connect) != 0) {
-                       DEBUG_MSG("task conn_start %p => failed\n", task);
                        ioreq_release(task->worker, conn);
                        return qr_task_step(task, NULL, NULL);
                }
@@ -559,9 +638,12 @@ static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_sour
 
        /* Start next step with timeout, fatal if can't start a timer. */
        int ret = uv_timer_start(&task->timeout, on_timeout, KR_CONN_RTT_MAX, 0);
-       if (ret != 0)
+       if (ret != 0) {
+               subreq_finalize(task, packet_source, packet);
                return qr_task_finalize(task, KNOT_STATE_FAIL);
-       return kr_ok();
+       }
+
+       return ret;
 }
 
 static int parse_packet(knot_pkt_t *query)
@@ -695,6 +777,7 @@ int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen)
        memset(&worker->pkt_pool, 0, sizeof(worker->pkt_pool));
        worker->pkt_pool.ctx = mp_new (4 * sizeof(knot_pkt_t));
        worker->pkt_pool.alloc = (mm_alloc_t) mp_alloc;
+       worker->outstanding = map_make();
        return kr_ok();
 }
 
@@ -712,6 +795,7 @@ void worker_reclaim(struct worker_ctx *worker)
        reclaim_freelist(worker->ioreqs, struct ioreq, free);
        mp_delete(worker->pkt_pool.ctx);
        worker->pkt_pool.ctx = NULL;
+       map_clear(&worker->outstanding);
 }
 
 #undef DEBUG_MSG
index fa2824dc5fcd052ba95fd842ae94554254017b52..92a1c2f3b72d74a311f1232f74cca8cab8a58565 100644 (file)
@@ -20,6 +20,7 @@
 
 #include "daemon/engine.h"
 #include "lib/generic/array.h"
+#include "lib/generic/map.h"
 
 /* @cond internal Freelist of available mempools. */
 typedef array_t(void *) mp_freelist_t;
@@ -46,6 +47,7 @@ struct worker_ctx {
                size_t dropped;
                size_t timeout;
        } stats;
+       map_t outstanding;
        mp_freelist_t pools;
        mp_freelist_t ioreqs;
        mm_ctx_t pkt_pool;
index 40422f082f0369ca2fc5235946b30fef75344a0c..75cd81a080613a2a79df45115936da4dc991ff55 100644 (file)
@@ -307,7 +307,6 @@ int kr_bitcmp(const char *a, const char *b, int bits)
 int kr_rrmap_key(char *key, const knot_dname_t *owner, uint16_t type, uint8_t rank)
 {
        if (!key || !owner) {
-               printf("key owner %p %p\n", key, owner);
                return kr_error(EINVAL);
        }
        key[0] = (rank << 2) | 0x01; /* Must be non-zero */