}
}
+/** @internal Get key from current outgoing subrequest. */
+static int subreq_key(char *dst, knot_pkt_t *pkt)
+{
+ assert(pkt);
+ return kr_rrkey(dst, knot_pkt_qname(pkt), knot_pkt_qtype(pkt), knot_pkt_qclass(pkt));
+}
+
static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *handle, const struct sockaddr *addr)
{
/* How much can client handle? */
}
}
/* Start reading again if the session is throttled and
- * the number of outstanding requests is below watermark. */
+ * the number of outgoing requests is below watermark. */
uv_handle_t *handle = task->source.handle;
if (handle && session->tasks.len < task->worker->tcp_pipeline_max/2) {
if (!uv_is_closing(handle) && session->throttled) {
if (worker->stats.concurrent < QUERY_RATE_THRESHOLD) {
task->req.options |= QUERY_NO_THROTTLE;
}
+ /* Track outstanding inbound queries as well for deduplication. */
+ char key[KR_RRKEY_LEN];
+ if (subreq_key(key, query) > 0) {
+ map_set(&task->worker->outstanding, key, task);
+ }
return 0;
}
if (task->on_complete) {
task->on_complete(worker, &task->req, task->baton);
}
+ char key[KR_RRKEY_LEN];
+ /* Clear outstanding query. */
+ int ret = subreq_key(key, task->req.answer);
+ if (ret > 0) {
+ map_del(&task->worker->outstanding, key);
+ }
/* Release primary reference to task. */
qr_task_unref(task);
}
return 0;
}
-/** @internal Get key from current outstanding subrequest. */
-static int subreq_key(char *dst, struct qr_task *task)
-{
- assert(task);
- knot_pkt_t *pkt = task->pktbuf;
- assert(knot_wire_get_qr(pkt->wire) == false);
- return kr_rrkey(dst, knot_pkt_qname(pkt), knot_pkt_qtype(pkt), knot_pkt_qclass(pkt));
-}
-
static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *pkt)
{
/* Close pending timer */
task->timeout = NULL;
}
ioreq_killall(task);
- /* Clear from outstanding table. */
+ /* Clear from outgoing table. */
if (!task->leading)
return;
char key[KR_RRKEY_LEN];
- int ret = subreq_key(key, task);
+ int ret = subreq_key(key, task->pktbuf);
if (ret > 0) {
- assert(map_get(&task->worker->outstanding, key) == task);
- map_del(&task->worker->outstanding, key);
+ assert(map_get(&task->worker->outgoing, key) == task);
+ map_del(&task->worker->outgoing, key);
}
/* Notify waiting tasks. */
struct kr_query *leader_qry = array_tail(task->req.rplan.pending);
{
assert(task);
char key[KR_RRKEY_LEN];
- if (subreq_key(key, task) > 0) {
- assert(map_contains(&task->worker->outstanding, key) == false);
- map_set(&task->worker->outstanding, key, task);
+ if (subreq_key(key, task->pktbuf) > 0) {
+ assert(map_contains(&task->worker->outgoing, key) == false);
+ map_set(&task->worker->outgoing, key, task);
task->leading = true;
}
}
{
assert(task);
char key[KR_RRKEY_LEN];
- if (subreq_key(key, task) > 0) {
- struct qr_task *leader = map_get(&task->worker->outstanding, key);
+ if (subreq_key(key, task->pktbuf) > 0) {
+ struct qr_task *leader = map_get(&task->worker->outgoing, key);
if (leader) {
/* Enqueue itself to leader for this subrequest. */
int ret = array_reserve_mm(leader->waiting, leader->waiting.len + 1, kr_memreserve, &leader->req.pool);
/* Start fast retransmit with UDP, otherwise connect. */
int ret = 0;
if (sock_type == SOCK_DGRAM) {
- /* If such subrequest is outstanding, enqueue to it. */
+ /* If there is already outgoing query, enqueue to it. */
if (subreq_enqueue(task)) {
- return kr_ok(); /* Will be notified when outstanding subrequest finishes. */
+ return kr_ok(); /* Will be notified when outgoing query finishes. */
}
/* Start transmitting */
if (retransmit(task)) {
if (msg) worker->stats.dropped += 1;
return kr_error(EINVAL); /* Ignore. */
}
+ /* De-duplicate inbound requests.
+ * Many clients do frequent retransmits of the query
+ * in order to avoid network losses and get better service,
+ * but fail to work properly when resolver answers them all
+ * but some of them SERVFAIL because of the time limit and some
+ * of them succeed. It's also a good idea to avoid wasting time
+ * tracking pending tasks to solve the same thing. */
+ char key[KR_RRKEY_LEN];
+ if (subreq_key(key, msg) > 0) {
+ struct qr_task *task = map_get(&worker->outstanding, key);
+ if (task && task->source.handle == handle && task->req.qsource.addr &&
+ addr->sa_family == task->source.addr.ip4.sin_family &&
+ knot_wire_get_id(msg->wire) == knot_wire_get_id(task->req.answer->wire)) {
+ /* Query probably matches, check if it comes from the same origin. */
+ size_t addr_len = sizeof(struct sockaddr_in);
+ if (addr->sa_family == AF_INET6)
+ addr_len = sizeof(struct sockaddr_in6);
+ if (memcmp(&task->source.addr, addr, addr_len) == 0) {
+ return kr_error(EEXIST); /* Ignore query */
+ }
+ }
+ }
task = qr_task_create(worker, handle, addr);
if (!task) {
return kr_error(ENOMEM);
memset(&worker->pkt_pool, 0, sizeof(worker->pkt_pool));
worker->pkt_pool.ctx = mp_new (4 * sizeof(knot_pkt_t));
worker->pkt_pool.alloc = (knot_mm_alloc_t) mp_alloc;
+ worker->outgoing = map_make();
worker->outstanding = map_make();
worker->tcp_pipeline_max = MAX_PIPELINED;
return kr_ok();
reclaim_freelist(worker->pool_sessions, struct session, session_free);
mp_delete(worker->pkt_pool.ctx);
worker->pkt_pool.ctx = NULL;
+ map_clear(&worker->outgoing);
map_clear(&worker->outstanding);
}