]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon: fast retransmit address selection
authorMarek Vavruša <marek.vavrusa@nic.cz>
Thu, 12 Nov 2015 18:16:18 +0000 (19:16 +0100)
committerMarek Vavruša <marek.vavrusa@nic.cz>
Sat, 14 Nov 2015 23:33:00 +0000 (00:33 +0100)
instead of single I/O request per step, the daemon now retries
all addresses in the selection with 300ms timeout between tries.
there are len(list) + len(list)/2 tries

the idea is to reduce latency when UDP request doesn't punch through,
or some NSs are overwhelmed/faulty

daemon/worker.c
lib/defines.h
lib/resolve.c

index 0d7f44b7a36a687a2d7e50181d92dc064decfac4..3ddb14ab61f20ce228c1aab51cca669f0d671614 100644 (file)
@@ -39,6 +39,52 @@ struct ioreq
        } as;
 };
 
+/** @internal Number of request within timeout window. */
+#define MAX_PENDING (KR_NSREP_MAXADDR + (KR_NSREP_MAXADDR / 2))
+
+/** @internal Query resolution task. */
+struct qr_task
+{
+       struct kr_request req;
+       struct worker_ctx *worker;
+       knot_pkt_t *pktbuf;
+       uv_handle_t *pending[MAX_PENDING];
+       uint16_t pending_count;
+       uint16_t addrlist_count;
+       uint16_t addrlist_turn;
+       struct sockaddr *addrlist;
+       uv_timer_t retry, timeout;
+       worker_cb_t on_complete;
+       void *baton;
+       struct {
+               union {
+                       struct sockaddr_in ip4;
+                       struct sockaddr_in6 ip6;
+               } addr;
+               uv_handle_t *handle;
+       } source;
+       uint16_t iter_count;
+       uint16_t refs;
+       uint16_t bytes_remaining;
+};
+
+/* Convenience macros */
+#define qr_task_ref(task) \
+       do { ++(task)->refs; } while(0)
+#define qr_task_unref(task) \
+       do { if (--(task)->refs == 0) { qr_task_free(task); } } while (0)
+#define qr_valid_handle(task, checked) \
+       (!uv_is_closing((checked)) || (task)->source.handle == (checked))
+
+/* Forward decls */
+static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *packet);
+
+/** @internal Get singleton worker. */
+static inline struct worker_ctx *get_worker(void)
+{
+       return uv_default_loop()->data;
+}
+
 static inline struct ioreq *ioreq_take(struct worker_ctx *worker)
 {
        struct ioreq *req = NULL;
@@ -62,6 +108,47 @@ static inline void ioreq_release(struct worker_ctx *worker, struct ioreq *req)
        }
 }
 
+static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype)
+{
+       if (task->pending_count >= MAX_PENDING) {
+               return NULL;
+       }
+       /* Create connection for iterative query */
+       uv_handle_t *req = (uv_handle_t *)ioreq_take(task->worker);
+       if (!req) {
+               return NULL;
+       }
+       io_create(task->worker->loop, req, socktype);
+       req->data = task;
+       /* Connect or issue query datagram */
+       task->pending[task->pending_count] = req;
+       task->pending_count += 1;
+       return req;
+}
+
+static void ioreq_on_close(uv_handle_t *handle)
+{
+       struct worker_ctx *worker = get_worker();
+       ioreq_release(worker, (struct ioreq *)handle);
+}
+
+static void ioreq_kill(uv_handle_t *req)
+{
+       assert(req);
+       if (!uv_is_closing(req)) {
+               io_stop_read(req);
+               uv_close(req, ioreq_on_close);
+       }
+}
+
+static void ioreq_killall(struct qr_task *task)
+{
+       for (size_t i = 0; i < task->pending_count; ++i) {
+               ioreq_kill(task->pending[i]);
+       }
+       task->pending_count = 0;
+}
+
 static inline struct mempool *pool_take(struct worker_ctx *worker)
 {
        /* Recycle available mempool if possible */
@@ -88,45 +175,6 @@ static inline void pool_release(struct worker_ctx *worker, struct mempool *mp)
        }
 }
 
-/** @internal Query resolution task. */
-struct qr_task
-{
-       struct kr_request req;
-       struct worker_ctx *worker;
-       knot_pkt_t *pktbuf;
-       uv_handle_t *iohandle;
-       uv_timer_t timeout;
-       worker_cb_t on_complete;
-       void *baton;
-       struct {
-               union {
-                       struct sockaddr_in ip4;
-                       struct sockaddr_in6 ip6;
-               } addr;
-               uv_handle_t *handle;
-       } source;
-       uint16_t iter_count;
-       uint16_t refs;
-       uint16_t bytes_remaining;
-};
-
-/* Convenience macros */
-#define qr_task_ref(task) \
-       do { ++(task)->refs; } while(0)
-#define qr_task_unref(task) \
-       do { if (--(task)->refs == 0) { qr_task_free(task); } } while (0)
-#define qr_valid_handle(task, checked) \
-       ((task)->iohandle == (checked) || (task)->source.handle == (checked))
-
-/* Forward decls */
-static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *packet);
-
-/** @internal Get singleton worker. */
-static inline struct worker_ctx *get_worker(void)
-{
-       return uv_default_loop()->data;
-}
-
 static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr *addr)
 {
        /* How much can client handle? */
@@ -165,13 +213,16 @@ static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *ha
        }
        task->req.answer = answer;
        task->pktbuf = pktbuf;
-       task->iohandle = NULL;
+       task->addrlist = NULL;
+       task->pending_count = 0;
        task->bytes_remaining = 0;
        task->iter_count = 0;
        task->refs = 1;
        task->worker = worker;
        task->source.handle = handle;
+       uv_timer_init(worker->loop, &task->retry);
        uv_timer_init(worker->loop, &task->timeout);
+       task->retry.data = task;
        task->timeout.data = task;
        task->on_complete = NULL;
        /* Remember query source addr */
@@ -192,6 +243,7 @@ static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *ha
        return task;
 }
 
+/* This is called when the task refcount is zero, free memory. */
 static void qr_task_free(struct qr_task *task)
 {
        /* Return mempool to ring or free it if it's full */
@@ -209,10 +261,20 @@ static void qr_task_free(struct qr_task *task)
        }
 }
 
+/* This is called when retry timer closes */
+static void retransmit_close(uv_handle_t *handle)
+{
+       struct qr_task *task = handle->data;
+       qr_task_unref(task);
+}
+
+/* This is called when task completes and timeout timer is closed. */
 static void qr_task_complete(uv_handle_t *handle)
 {
        struct qr_task *task = handle->data;
        struct worker_ctx *worker = task->worker;
+       /* Kill pending I/O requests */
+       ioreq_killall(task);
        /* Run the completion callback. */
        if (task->on_complete) {
                task->on_complete(worker, &task->req, task->baton);
@@ -229,7 +291,8 @@ static void qr_task_complete(uv_handle_t *handle)
        worker->stats.concurrent -= 1;
 }
 
-static void qr_task_timeout(uv_timer_t *req)
+/* This is called when I/O timeouts */
+static void on_timeout(uv_timer_t *req)
 {
        struct qr_task *task = req->data;
        if (!uv_is_closing((uv_handle_t *)req)) {
@@ -237,25 +300,27 @@ static void qr_task_timeout(uv_timer_t *req)
        }
 }
 
+/* This is called when we send subrequest / answer */
 static int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status)
 {
+       /* When NOOP, it means we sent the final answer to originator,
+        * there we start to close timers and finalize task. */
        if (task->req.state != KNOT_STATE_NOOP) {
                if (status == 0 && handle) {
                        io_start_read(handle); /* Start reading answer */
                }
-       } else { /* Finalize task */
+       } else {
+               /* Close retry timer (borrows task) */
+               qr_task_ref(task);
+               uv_timer_stop(&task->retry);
+               uv_close((uv_handle_t *)&task->retry, retransmit_close);
+               /* Close timeout timer (finishes task) */
                uv_timer_stop(&task->timeout);
                uv_close((uv_handle_t *)&task->timeout, qr_task_complete);
        }
        return status;
 }
 
-static void on_close(uv_handle_t *handle)
-{
-       struct worker_ctx *worker = get_worker();
-       ioreq_release(worker, (struct ioreq *)handle);
-}
-
 static void on_send(uv_udp_send_t *req, int status)
 {
        struct worker_ctx *worker = get_worker();
@@ -342,6 +407,24 @@ static void on_connect(uv_connect_t *req, int status)
        ioreq_release(worker, (struct ioreq *)req);
 }
 
+static void on_retransmit(uv_timer_t *req)
+{
+       struct qr_task *task = req->data;
+       /* Create connection for iterative query */
+       if (!uv_is_closing((uv_handle_t *)req) && task->addrlist) {
+               uv_handle_t *subreq = ioreq_spawn(task, SOCK_DGRAM);
+               if (subreq) {
+                       struct sockaddr_in6 *choice = &((struct sockaddr_in6 *)task->addrlist)[task->addrlist_turn];
+                       if (qr_task_send(task, subreq, (struct sockaddr *)choice, task->pktbuf) == 0) {
+                               task->addrlist_turn = (task->addrlist_turn + 1) % task->addrlist_count; /* Round robin */
+                               return;
+                       }
+               }
+       }
+       /* Not possible to spawn request, stop trying */
+       uv_timer_stop(req);
+}
+
 static int qr_task_finalize(struct qr_task *task, int state)
 {
        kr_resolve_finish(&task->req, state);
@@ -353,21 +436,19 @@ static int qr_task_finalize(struct qr_task *task, int state)
 
 static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *packet)
 {
-       /* Close subrequest handle. */
+       /* Close pending I/O requests */
+       uv_timer_stop(&task->retry);
        uv_timer_stop(&task->timeout);
-       if (task->iohandle && !uv_is_closing(task->iohandle)) {
-               io_stop_read(task->iohandle);
-               uv_close(task->iohandle, on_close);
-               task->iohandle = NULL;
-       }
+       ioreq_killall(task);
 
        /* Consume input and produce next query */
        int sock_type = -1;
-       struct sockaddr *addr = NULL;
-       knot_pkt_t *pktbuf = task->pktbuf;
+       task->addrlist = NULL;
+       task->addrlist_count = 0;
+       task->addrlist_turn = 0;
        int state = kr_resolve_consume(&task->req, packet_source, packet);
        while (state == KNOT_STATE_PRODUCE) {
-               state = kr_resolve_produce(&task->req, &addr, &sock_type, pktbuf);
+               state = kr_resolve_produce(&task->req, &task->addrlist, &sock_type, task->pktbuf);
                if (unlikely(++task->iter_count > KR_ITER_LIMIT)) {
                        return qr_task_finalize(task, KNOT_STATE_FAIL);
                }
@@ -376,39 +457,41 @@ static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_sour
        /* We're done, no more iterations needed */
        if (state & (KNOT_STATE_DONE|KNOT_STATE_FAIL)) {
                return qr_task_finalize(task, state);
-       } else if (!addr || sock_type < 0) {
+       } else if (!task->addrlist || sock_type < 0) {
                return qr_task_step(task, NULL, NULL);
        }
 
-       /* Create connection for iterative query */
-       uv_handle_t *subreq = (uv_handle_t *)ioreq_take(task->worker);
-       if (!subreq) {
-               return qr_task_finalize(task, KNOT_STATE_FAIL);
+       /* Count available address choices */
+       struct sockaddr_in6 *choice = (struct sockaddr_in6 *)task->addrlist;
+       for (size_t i = 0; i < KR_NSREP_MAXADDR && choice->sin6_family != AF_UNSPEC; ++i) {
+               task->addrlist_count += 1;
+               choice += 1;
        }
-       io_create(task->worker->loop, subreq, sock_type);
-       subreq->data = task;
 
-       /* Connect or issue query datagram */
-       task->iohandle = subreq;
+       /* Start fast retransmit with UDP, otherwise connect. */
        if (sock_type == SOCK_DGRAM) {
-               if (qr_task_send(task, subreq, addr, pktbuf) != 0) {
+               uv_timer_start(&task->retry, on_retransmit, 0, KR_CONN_RETRY);
+       } else {
+               struct ioreq *conn = ioreq_take(task->worker);
+               if (!conn) {
                        return qr_task_step(task, NULL, NULL);
                }
-       } else {
-               struct ioreq *conn_req = ioreq_take(task->worker);
-               if (!conn_req) {
+               uv_handle_t *client = ioreq_spawn(task, sock_type);
+               if (!client) {
+                       ioreq_release(task->worker, conn);
                        return qr_task_step(task, NULL, NULL);
                }
-               conn_req->as.connect.data = task;
-               if (uv_tcp_connect(&conn_req->as.connect, (uv_tcp_t *)subreq, addr, on_connect) != 0) {
-                       ioreq_release(task->worker, conn_req);
+               conn->as.connect.data = task;
+               if (uv_tcp_connect(&conn->as.connect, (uv_tcp_t *)client, task->addrlist, on_connect) != 0) {
+                       ioreq_release(task->worker, conn);
                        return qr_task_step(task, NULL, NULL);
                }
+               /* Connect request borrows task */
                qr_task_ref(task);
        }
 
        /* Start next step with timeout */
-       uv_timer_start(&task->timeout, qr_task_timeout, KR_CONN_RTT_MAX, 0);
+       uv_timer_start(&task->timeout, on_timeout, KR_CONN_RTT_MAX, 0);
        return kr_ok();
 }
 
index 92c7c6fb9d1c95233c5e4d10f46fc16e26c0527d..d2e3e78d775799f785985755d3fbb28dafefc3c3 100644 (file)
@@ -36,6 +36,7 @@ static inline int __attribute__((__cold__)) kr_error(int x) {
  * @cond internal
  */
 #define KR_CONN_RTT_MAX 3000 /* Timeout for network activity */
+#define KR_CONN_RETRY 300    /* Retry interval for network activity */
 #define KR_ITER_LIMIT 50     /* Built-in iterator limit */
 
 /*
index 81ca0b2aa05786be15e6c40d6f60ff46dba1cca7..2340c4f9d0314c3f6fdbf59a50212a2588716652 100644 (file)
@@ -411,12 +411,6 @@ int kr_resolve_consume(struct kr_request *request, const struct sockaddr *src, k
        struct kr_query *qry = TAIL(rplan->pending);
        bool tried_tcp = (qry->flags & QUERY_TCP);
        if (!packet || packet->size == 0) {
-               /* Network error, retry over TCP. */
-               if (!tried_tcp) {
-                       DEBUG_MSG(qry, "=> NS unreachable, retrying over TCP\n");
-                       qry->flags |= QUERY_TCP;
-                       return KNOT_STATE_PRODUCE;
-               }
                request->state = KNOT_STATE_FAIL;
        } else {
                /* Packet cleared, derandomize QNAME. */