From: Marek Vavrusa Date: Wed, 11 May 2016 07:40:35 +0000 (-0700) Subject: daemon/worker: deduplicate inbound queries X-Git-Tag: v1.0.0~19 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=e638f9fb6e5aa20e090ebfa52255abc36a619bfd;p=thirdparty%2Fknot-resolver.git daemon/worker: deduplicate inbound queries many clients do frequent retransmits of the query to avoid network losses and get better service, but then fail to work properly when a resolver answers SERVFAIL to some of them because of the time limit and some of them NOERROR. it's also a good idea to avoid wasting time tracking pending tasks to solve the same thing. --- diff --git a/daemon/worker.c b/daemon/worker.c index c3fb3feb5..1edf36f4e 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -200,6 +200,13 @@ static inline void pool_release(struct worker_ctx *worker, struct mempool *mp) } } +/** @internal Get key from current outgoing subrequest. */ +static int subreq_key(char *dst, knot_pkt_t *pkt) +{ + assert(pkt); + return kr_rrkey(dst, knot_pkt_qname(pkt), knot_pkt_qtype(pkt), knot_pkt_qclass(pkt)); +} + static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *handle, const struct sockaddr *addr) { /* How much can client handle? */ @@ -274,7 +281,7 @@ static void qr_task_free(struct qr_task *task) } } /* Start reading again if the session is throttled and - * the number of outstanding requests is below watermark. */ + * the number of outgoing requests is below watermark. */ uv_handle_t *handle = task->source.handle; if (handle && session->tasks.len < task->worker->tcp_pipeline_max/2) { if (!uv_is_closing(handle) && session->throttled) { @@ -330,6 +337,11 @@ static int qr_task_start(struct qr_task *task, knot_pkt_t *query) if (worker->stats.concurrent < QUERY_RATE_THRESHOLD) { task->req.options |= QUERY_NO_THROTTLE; } + /* Track outstanding inbound queries as well for deduplication. */ + char key[KR_RRKEY_LEN]; + if (subreq_key(key, query) > 0) { + map_set(&task->worker->outstanding, key, task); + } return 0; } @@ -368,6 +380,12 @@ static void qr_task_complete(struct qr_task *task) if (task->on_complete) { task->on_complete(worker, &task->req, task->baton); } + char key[KR_RRKEY_LEN]; + /* Clear outstanding query. */ + int ret = subreq_key(key, task->req.answer); + if (ret > 0) { + map_del(&task->worker->outstanding, key); + } /* Release primary reference to task. */ qr_task_unref(task); } @@ -556,15 +574,6 @@ static int timer_start(struct qr_task *task, uv_timer_cb cb, uint64_t timeout, u return 0; } -/** @internal Get key from current outstanding subrequest. */ -static int subreq_key(char *dst, struct qr_task *task) -{ - assert(task); - knot_pkt_t *pkt = task->pktbuf; - assert(knot_wire_get_qr(pkt->wire) == false); - return kr_rrkey(dst, knot_pkt_qname(pkt), knot_pkt_qtype(pkt), knot_pkt_qclass(pkt)); -} - static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *pkt) { /* Close pending timer */ @@ -576,14 +585,14 @@ static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_ task->timeout = NULL; } ioreq_killall(task); - /* Clear from outstanding table. */ + /* Clear from outgoing table. */ if (!task->leading) return; char key[KR_RRKEY_LEN]; - int ret = subreq_key(key, task); + int ret = subreq_key(key, task->pktbuf); if (ret > 0) { - assert(map_get(&task->worker->outstanding, key) == task); - map_del(&task->worker->outstanding, key); + assert(map_get(&task->worker->outgoing, key) == task); + map_del(&task->worker->outgoing, key); } /* Notify waiting tasks. */ struct kr_query *leader_qry = array_tail(task->req.rplan.pending); @@ -607,9 +616,9 @@ static void subreq_lead(struct qr_task *task) { assert(task); char key[KR_RRKEY_LEN]; - if (subreq_key(key, task) > 0) { - assert(map_contains(&task->worker->outstanding, key) == false); - map_set(&task->worker->outstanding, key, task); + if (subreq_key(key, task->pktbuf) > 0) { + assert(map_contains(&task->worker->outgoing, key) == false); + map_set(&task->worker->outgoing, key, task); task->leading = true; } } @@ -618,8 +627,8 @@ static bool subreq_enqueue(struct qr_task *task) { assert(task); char key[KR_RRKEY_LEN]; - if (subreq_key(key, task) > 0) { - struct qr_task *leader = map_get(&task->worker->outstanding, key); + if (subreq_key(key, task->pktbuf) > 0) { + struct qr_task *leader = map_get(&task->worker->outgoing, key); if (leader) { /* Enqueue itself to leader for this subrequest. */ int ret = array_reserve_mm(leader->waiting, leader->waiting.len + 1, kr_memreserve, &leader->req.pool); @@ -681,9 +690,9 @@ static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_sour /* Start fast retransmit with UDP, otherwise connect. */ int ret = 0; if (sock_type == SOCK_DGRAM) { - /* If such subrequest is outstanding, enqueue to it. */ + /* If there is already outgoing query, enqueue to it. */ if (subreq_enqueue(task)) { - return kr_ok(); /* Will be notified when outstanding subrequest finishes. */ + return kr_ok(); /* Will be notified when outgoing query finishes. */ } /* Start transmitting */ if (retransmit(task)) { @@ -762,6 +771,28 @@ int worker_submit(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *ms if (msg) worker->stats.dropped += 1; return kr_error(EINVAL); /* Ignore. */ } + /* De-duplicate inbound requests. + * Many clients do frequent retransmits of the query + * in order to avoid network losses and get better service, + * but fail to work properly when resolver answers them all + * but some of them SERVFAIL because of the time limit and some + * of them succeed. It's also a good idea to avoid wasting time + * tracking pending tasks to solve the same thing. */ + char key[KR_RRKEY_LEN]; + if (subreq_key(key, msg) > 0) { + struct qr_task *task = map_get(&worker->outstanding, key); + if (task && task->source.handle == handle && task->req.qsource.addr && + addr->sa_family == task->source.addr.ip4.sin_family && + knot_wire_get_id(msg->wire) == knot_wire_get_id(task->req.answer->wire)) { + /* Query probably matches, check if it comes from the same origin. */ + size_t addr_len = sizeof(struct sockaddr_in); + if (addr->sa_family == AF_INET6) + addr_len = sizeof(struct sockaddr_in6); + if (memcmp(&task->source.addr, addr, addr_len) == 0) { + return kr_error(EEXIST); /* Ignore query */ + } + } + } task = qr_task_create(worker, handle, addr); if (!task) { return kr_error(ENOMEM); @@ -962,6 +993,7 @@ int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen) memset(&worker->pkt_pool, 0, sizeof(worker->pkt_pool)); worker->pkt_pool.ctx = mp_new (4 * sizeof(knot_pkt_t)); worker->pkt_pool.alloc = (knot_mm_alloc_t) mp_alloc; + worker->outgoing = map_make(); worker->outstanding = map_make(); worker->tcp_pipeline_max = MAX_PIPELINED; return kr_ok(); @@ -982,6 +1014,7 @@ void worker_reclaim(struct worker_ctx *worker) reclaim_freelist(worker->pool_sessions, struct session, session_free); mp_delete(worker->pkt_pool.ctx); worker->pkt_pool.ctx = NULL; + map_clear(&worker->outgoing); map_clear(&worker->outstanding); } diff --git a/daemon/worker.h b/daemon/worker.h index aeda6a68b..27d6c184d 100644 --- a/daemon/worker.h +++ b/daemon/worker.h @@ -50,6 +50,7 @@ struct worker_ctx { size_t dropped; size_t timeout; } stats; + map_t outgoing; map_t outstanding; mp_freelist_t pool_mp; mp_freelist_t pool_ioreq;