]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon/worker: deduplicate inbound queries
authorMarek Vavrusa <marek@vavrusa.com>
Wed, 11 May 2016 07:40:35 +0000 (00:40 -0700)
committerMarek Vavrusa <marek@vavrusa.com>
Thu, 12 May 2016 17:58:07 +0000 (10:58 -0700)
many clients do frequent retransmits of the query
to avoid network losses and get better service,
but then fail to work properly when a resolver
answers SERVFAIL to some of them because of the
time limit and some of them NOERROR.
it's also a good idea to avoid wasting time
tracking pending tasks to solve the same thing.

daemon/worker.c
daemon/worker.h

index c3fb3feb596603d512fea688e060aa735bcd39b3..1edf36f4e05518618cbb7e3183fdf5017a0a7109 100644 (file)
@@ -200,6 +200,13 @@ static inline void pool_release(struct worker_ctx *worker, struct mempool *mp)
        }
 }
 
+/** @internal Get key from current outgoing subrequest. */
+static int subreq_key(char *dst, knot_pkt_t *pkt)
+{
+       assert(pkt);
+       return kr_rrkey(dst, knot_pkt_qname(pkt), knot_pkt_qtype(pkt), knot_pkt_qclass(pkt));
+}
+
 static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *handle, const struct sockaddr *addr)
 {
        /* How much can client handle? */
@@ -274,7 +281,7 @@ static void qr_task_free(struct qr_task *task)
                        }
                }
                /* Start reading again if the session is throttled and
-                * the number of outstanding requests is below watermark. */
+                * the number of outgoing requests is below watermark. */
                uv_handle_t *handle = task->source.handle;
                if (handle && session->tasks.len < task->worker->tcp_pipeline_max/2) {
                        if (!uv_is_closing(handle) && session->throttled) {
@@ -330,6 +337,11 @@ static int qr_task_start(struct qr_task *task, knot_pkt_t *query)
        if (worker->stats.concurrent < QUERY_RATE_THRESHOLD) {
                task->req.options |= QUERY_NO_THROTTLE;
        }
+       /* Track outstanding inbound queries as well for deduplication. */
+       char key[KR_RRKEY_LEN];
+       if (subreq_key(key, query) > 0) {
+               map_set(&task->worker->outstanding, key, task);
+       }
        return 0;
 }
 
@@ -368,6 +380,12 @@ static void qr_task_complete(struct qr_task *task)
        if (task->on_complete) {
                task->on_complete(worker, &task->req, task->baton);
        }
+       char key[KR_RRKEY_LEN];
+       /* Clear outstanding query. */
+       int ret = subreq_key(key, task->req.answer);
+       if (ret > 0) {
+               map_del(&task->worker->outstanding, key);
+       }
        /* Release primary reference to task. */
        qr_task_unref(task);
 }
@@ -556,15 +574,6 @@ static int timer_start(struct qr_task *task, uv_timer_cb cb, uint64_t timeout, u
        return 0;
 }
 
-/** @internal Get key from current outstanding subrequest. */
-static int subreq_key(char *dst, struct qr_task *task)
-{
-       assert(task);
-       knot_pkt_t *pkt = task->pktbuf;
-       assert(knot_wire_get_qr(pkt->wire) == false);
-       return kr_rrkey(dst, knot_pkt_qname(pkt), knot_pkt_qtype(pkt), knot_pkt_qclass(pkt));
-}
-
 static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *pkt)
 {
        /* Close pending timer */
@@ -576,14 +585,14 @@ static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_
                task->timeout = NULL;
        }
        ioreq_killall(task);
-       /* Clear from outstanding table. */
+       /* Clear from outgoing table. */
        if (!task->leading)
                return;
        char key[KR_RRKEY_LEN];
-       int ret = subreq_key(key, task);
+       int ret = subreq_key(key, task->pktbuf);
        if (ret > 0) {
-               assert(map_get(&task->worker->outstanding, key) == task);
-               map_del(&task->worker->outstanding, key);
+               assert(map_get(&task->worker->outgoing, key) == task);
+               map_del(&task->worker->outgoing, key);
        }
        /* Notify waiting tasks. */
        struct kr_query *leader_qry = array_tail(task->req.rplan.pending);
@@ -607,9 +616,9 @@ static void subreq_lead(struct qr_task *task)
 {
        assert(task);
        char key[KR_RRKEY_LEN];
-       if (subreq_key(key, task) > 0) {
-               assert(map_contains(&task->worker->outstanding, key) == false);
-               map_set(&task->worker->outstanding, key, task);
+       if (subreq_key(key, task->pktbuf) > 0) {
+               assert(map_contains(&task->worker->outgoing, key) == false);
+               map_set(&task->worker->outgoing, key, task);
                task->leading = true;
        }
 }
@@ -618,8 +627,8 @@ static bool subreq_enqueue(struct qr_task *task)
 {
        assert(task);
        char key[KR_RRKEY_LEN];
-       if (subreq_key(key, task) > 0) {
-               struct qr_task *leader = map_get(&task->worker->outstanding, key);
+       if (subreq_key(key, task->pktbuf) > 0) {
+               struct qr_task *leader = map_get(&task->worker->outgoing, key);
                if (leader) {
                        /* Enqueue itself to leader for this subrequest. */
                        int ret = array_reserve_mm(leader->waiting, leader->waiting.len + 1, kr_memreserve, &leader->req.pool);
@@ -681,9 +690,9 @@ static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_sour
        /* Start fast retransmit with UDP, otherwise connect. */
        int ret = 0;
        if (sock_type == SOCK_DGRAM) {
-               /* If such subrequest is outstanding, enqueue to it. */
+               /* If there is already outgoing query, enqueue to it. */
                if (subreq_enqueue(task)) {
-                       return kr_ok(); /* Will be notified when outstanding subrequest finishes. */
+                       return kr_ok(); /* Will be notified when outgoing query finishes. */
                }
                /* Start transmitting */
                if (retransmit(task)) {
@@ -762,6 +771,28 @@ int worker_submit(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *ms
                        if (msg) worker->stats.dropped += 1;
                        return kr_error(EINVAL); /* Ignore. */
                }
+               /* De-duplicate inbound requests.
+                * Many clients do frequent retransmits of the query
+                * in order to avoid network losses and get better service,
+                * but fail to work properly when resolver answers them all
+                * but some of them SERVFAIL because of the time limit and some
+                * of them succeed. It's also a good idea to avoid wasting time
+                * tracking pending tasks to solve the same thing. */
+               char key[KR_RRKEY_LEN];
+               if (subreq_key(key, msg) > 0) {
+                       struct qr_task *task = map_get(&worker->outstanding, key);
+                       if (task && task->source.handle == handle && task->req.qsource.addr &&
+                               addr->sa_family == task->source.addr.ip4.sin_family &&
+                               knot_wire_get_id(msg->wire) == knot_wire_get_id(task->req.answer->wire)) {
+                               /* Query probably matches, check if it comes from the same origin. */
+                               size_t addr_len = sizeof(struct sockaddr_in);
+                               if (addr->sa_family == AF_INET6)
+                                       addr_len = sizeof(struct sockaddr_in6);
+                               if (memcmp(&task->source.addr, addr, addr_len) == 0) {
+                                       return kr_error(EEXIST); /* Ignore query */
+                               }
+                       }
+               }
                task = qr_task_create(worker, handle, addr);
                if (!task) {
                        return kr_error(ENOMEM);
@@ -962,6 +993,7 @@ int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen)
        memset(&worker->pkt_pool, 0, sizeof(worker->pkt_pool));
        worker->pkt_pool.ctx = mp_new (4 * sizeof(knot_pkt_t));
        worker->pkt_pool.alloc = (knot_mm_alloc_t) mp_alloc;
+       worker->outgoing = map_make();
        worker->outstanding = map_make();
        worker->tcp_pipeline_max = MAX_PIPELINED;
        return kr_ok();
@@ -982,6 +1014,7 @@ void worker_reclaim(struct worker_ctx *worker)
        reclaim_freelist(worker->pool_sessions, struct session, session_free);
        mp_delete(worker->pkt_pool.ctx);
        worker->pkt_pool.ctx = NULL;
+       map_clear(&worker->outgoing);
        map_clear(&worker->outstanding);
 }
 
index aeda6a68b9c5a760f03a5421783eedf35fbe18b9..27d6c184d5df64005c03f6d71d6e2225291434e9 100644 (file)
@@ -50,6 +50,7 @@ struct worker_ctx {
                size_t dropped;
                size_t timeout;
        } stats;
+       map_t outgoing;
        map_t outstanding;
        mp_freelist_t pool_mp;
        mp_freelist_t pool_ioreq;