From: Marek VavruĊĦa Date: Thu, 12 Nov 2015 18:16:18 +0000 (+0100) Subject: daemon: fast retransmit address selection X-Git-Tag: v1.0.0-beta2~21 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=e25abc9ca4da2dc45f579b0b133e4cde1e6a0eee;p=thirdparty%2Fknot-resolver.git daemon: fast retransmit address selection instead of single I/O request per step, the daemon now retries all addresses in the selection with 300ms timeout between tries. there are len(list) + len(list)/2 tries the idea is to reduce latency when UDP request doesn't punch through, or some NSs are overwhelmed/faulty --- diff --git a/daemon/worker.c b/daemon/worker.c index 0d7f44b7a..3ddb14ab6 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -39,6 +39,52 @@ struct ioreq } as; }; +/** @internal Number of request within timeout window. */ +#define MAX_PENDING (KR_NSREP_MAXADDR + (KR_NSREP_MAXADDR / 2)) + +/** @internal Query resolution task. */ +struct qr_task +{ + struct kr_request req; + struct worker_ctx *worker; + knot_pkt_t *pktbuf; + uv_handle_t *pending[MAX_PENDING]; + uint16_t pending_count; + uint16_t addrlist_count; + uint16_t addrlist_turn; + struct sockaddr *addrlist; + uv_timer_t retry, timeout; + worker_cb_t on_complete; + void *baton; + struct { + union { + struct sockaddr_in ip4; + struct sockaddr_in6 ip6; + } addr; + uv_handle_t *handle; + } source; + uint16_t iter_count; + uint16_t refs; + uint16_t bytes_remaining; +}; + +/* Convenience macros */ +#define qr_task_ref(task) \ + do { ++(task)->refs; } while(0) +#define qr_task_unref(task) \ + do { if (--(task)->refs == 0) { qr_task_free(task); } } while (0) +#define qr_valid_handle(task, checked) \ + (!uv_is_closing((checked)) || (task)->source.handle == (checked)) + +/* Forward decls */ +static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *packet); + +/** @internal Get singleton worker. */ +static inline struct worker_ctx *get_worker(void) +{ + return uv_default_loop()->data; +} + static inline struct ioreq *ioreq_take(struct worker_ctx *worker) { struct ioreq *req = NULL; @@ -62,6 +108,47 @@ static inline void ioreq_release(struct worker_ctx *worker, struct ioreq *req) } } +static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype) +{ + if (task->pending_count >= MAX_PENDING) { + return NULL; + } + /* Create connection for iterative query */ + uv_handle_t *req = (uv_handle_t *)ioreq_take(task->worker); + if (!req) { + return NULL; + } + io_create(task->worker->loop, req, socktype); + req->data = task; + /* Connect or issue query datagram */ + task->pending[task->pending_count] = req; + task->pending_count += 1; + return req; +} + +static void ioreq_on_close(uv_handle_t *handle) +{ + struct worker_ctx *worker = get_worker(); + ioreq_release(worker, (struct ioreq *)handle); +} + +static void ioreq_kill(uv_handle_t *req) +{ + assert(req); + if (!uv_is_closing(req)) { + io_stop_read(req); + uv_close(req, ioreq_on_close); + } +} + +static void ioreq_killall(struct qr_task *task) +{ + for (size_t i = 0; i < task->pending_count; ++i) { + ioreq_kill(task->pending[i]); + } + task->pending_count = 0; +} + static inline struct mempool *pool_take(struct worker_ctx *worker) { /* Recycle available mempool if possible */ @@ -88,45 +175,6 @@ static inline void pool_release(struct worker_ctx *worker, struct mempool *mp) } } -/** @internal Query resolution task. */ -struct qr_task -{ - struct kr_request req; - struct worker_ctx *worker; - knot_pkt_t *pktbuf; - uv_handle_t *iohandle; - uv_timer_t timeout; - worker_cb_t on_complete; - void *baton; - struct { - union { - struct sockaddr_in ip4; - struct sockaddr_in6 ip6; - } addr; - uv_handle_t *handle; - } source; - uint16_t iter_count; - uint16_t refs; - uint16_t bytes_remaining; -}; - -/* Convenience macros */ -#define qr_task_ref(task) \ - do { ++(task)->refs; } while(0) -#define qr_task_unref(task) \ - do { if (--(task)->refs == 0) { qr_task_free(task); } } while (0) -#define qr_valid_handle(task, checked) \ - ((task)->iohandle == (checked) || (task)->source.handle == (checked)) - -/* Forward decls */ -static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *packet); - -/** @internal Get singleton worker. */ -static inline struct worker_ctx *get_worker(void) -{ - return uv_default_loop()->data; -} - static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr *addr) { /* How much can client handle? */ @@ -165,13 +213,16 @@ static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *ha } task->req.answer = answer; task->pktbuf = pktbuf; - task->iohandle = NULL; + task->addrlist = NULL; + task->pending_count = 0; task->bytes_remaining = 0; task->iter_count = 0; task->refs = 1; task->worker = worker; task->source.handle = handle; + uv_timer_init(worker->loop, &task->retry); uv_timer_init(worker->loop, &task->timeout); + task->retry.data = task; task->timeout.data = task; task->on_complete = NULL; /* Remember query source addr */ @@ -192,6 +243,7 @@ static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *ha return task; } +/* This is called when the task refcount is zero, free memory. */ static void qr_task_free(struct qr_task *task) { /* Return mempool to ring or free it if it's full */ @@ -209,10 +261,20 @@ static void qr_task_free(struct qr_task *task) } } +/* This is called when retry timer closes */ +static void retransmit_close(uv_handle_t *handle) +{ + struct qr_task *task = handle->data; + qr_task_unref(task); +} + +/* This is called when task completes and timeout timer is closed. */ static void qr_task_complete(uv_handle_t *handle) { struct qr_task *task = handle->data; struct worker_ctx *worker = task->worker; + /* Kill pending I/O requests */ + ioreq_killall(task); /* Run the completion callback. */ if (task->on_complete) { task->on_complete(worker, &task->req, task->baton); @@ -229,7 +291,8 @@ static void qr_task_complete(uv_handle_t *handle) worker->stats.concurrent -= 1; } -static void qr_task_timeout(uv_timer_t *req) +/* This is called when I/O timeouts */ +static void on_timeout(uv_timer_t *req) { struct qr_task *task = req->data; if (!uv_is_closing((uv_handle_t *)req)) { @@ -237,25 +300,27 @@ static void qr_task_timeout(uv_timer_t *req) } } +/* This is called when we send subrequest / answer */ static int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status) { + /* When NOOP, it means we sent the final answer to originator, + * there we start to close timers and finalize task. */ if (task->req.state != KNOT_STATE_NOOP) { if (status == 0 && handle) { io_start_read(handle); /* Start reading answer */ } - } else { /* Finalize task */ + } else { + /* Close retry timer (borrows task) */ + qr_task_ref(task); + uv_timer_stop(&task->retry); + uv_close((uv_handle_t *)&task->retry, retransmit_close); + /* Close timeout timer (finishes task) */ uv_timer_stop(&task->timeout); uv_close((uv_handle_t *)&task->timeout, qr_task_complete); } return status; } -static void on_close(uv_handle_t *handle) -{ - struct worker_ctx *worker = get_worker(); - ioreq_release(worker, (struct ioreq *)handle); -} - static void on_send(uv_udp_send_t *req, int status) { struct worker_ctx *worker = get_worker(); @@ -342,6 +407,24 @@ static void on_connect(uv_connect_t *req, int status) ioreq_release(worker, (struct ioreq *)req); } +static void on_retransmit(uv_timer_t *req) +{ + struct qr_task *task = req->data; + /* Create connection for iterative query */ + if (!uv_is_closing((uv_handle_t *)req) && task->addrlist) { + uv_handle_t *subreq = ioreq_spawn(task, SOCK_DGRAM); + if (subreq) { + struct sockaddr_in6 *choice = &((struct sockaddr_in6 *)task->addrlist)[task->addrlist_turn]; + if (qr_task_send(task, subreq, (struct sockaddr *)choice, task->pktbuf) == 0) { + task->addrlist_turn = (task->addrlist_turn + 1) % task->addrlist_count; /* Round robin */ + return; + } + } + } + /* Not possible to spawn request, stop trying */ + uv_timer_stop(req); +} + static int qr_task_finalize(struct qr_task *task, int state) { kr_resolve_finish(&task->req, state); @@ -353,21 +436,19 @@ static int qr_task_finalize(struct qr_task *task, int state) static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *packet) { - /* Close subrequest handle. */ + /* Close pending I/O requests */ + uv_timer_stop(&task->retry); uv_timer_stop(&task->timeout); - if (task->iohandle && !uv_is_closing(task->iohandle)) { - io_stop_read(task->iohandle); - uv_close(task->iohandle, on_close); - task->iohandle = NULL; - } + ioreq_killall(task); /* Consume input and produce next query */ int sock_type = -1; - struct sockaddr *addr = NULL; - knot_pkt_t *pktbuf = task->pktbuf; + task->addrlist = NULL; + task->addrlist_count = 0; + task->addrlist_turn = 0; int state = kr_resolve_consume(&task->req, packet_source, packet); while (state == KNOT_STATE_PRODUCE) { - state = kr_resolve_produce(&task->req, &addr, &sock_type, pktbuf); + state = kr_resolve_produce(&task->req, &task->addrlist, &sock_type, task->pktbuf); if (unlikely(++task->iter_count > KR_ITER_LIMIT)) { return qr_task_finalize(task, KNOT_STATE_FAIL); } @@ -376,39 +457,41 @@ static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_sour /* We're done, no more iterations needed */ if (state & (KNOT_STATE_DONE|KNOT_STATE_FAIL)) { return qr_task_finalize(task, state); - } else if (!addr || sock_type < 0) { + } else if (!task->addrlist || sock_type < 0) { return qr_task_step(task, NULL, NULL); } - /* Create connection for iterative query */ - uv_handle_t *subreq = (uv_handle_t *)ioreq_take(task->worker); - if (!subreq) { - return qr_task_finalize(task, KNOT_STATE_FAIL); + /* Count available address choices */ + struct sockaddr_in6 *choice = (struct sockaddr_in6 *)task->addrlist; + for (size_t i = 0; i < KR_NSREP_MAXADDR && choice->sin6_family != AF_UNSPEC; ++i) { + task->addrlist_count += 1; + choice += 1; } - io_create(task->worker->loop, subreq, sock_type); - subreq->data = task; - /* Connect or issue query datagram */ - task->iohandle = subreq; + /* Start fast retransmit with UDP, otherwise connect. */ if (sock_type == SOCK_DGRAM) { - if (qr_task_send(task, subreq, addr, pktbuf) != 0) { + uv_timer_start(&task->retry, on_retransmit, 0, KR_CONN_RETRY); + } else { + struct ioreq *conn = ioreq_take(task->worker); + if (!conn) { return qr_task_step(task, NULL, NULL); } - } else { - struct ioreq *conn_req = ioreq_take(task->worker); - if (!conn_req) { + uv_handle_t *client = ioreq_spawn(task, sock_type); + if (!client) { + ioreq_release(task->worker, conn); return qr_task_step(task, NULL, NULL); } - conn_req->as.connect.data = task; - if (uv_tcp_connect(&conn_req->as.connect, (uv_tcp_t *)subreq, addr, on_connect) != 0) { - ioreq_release(task->worker, conn_req); + conn->as.connect.data = task; + if (uv_tcp_connect(&conn->as.connect, (uv_tcp_t *)client, task->addrlist, on_connect) != 0) { + ioreq_release(task->worker, conn); return qr_task_step(task, NULL, NULL); } + /* Connect request borrows task */ qr_task_ref(task); } /* Start next step with timeout */ - uv_timer_start(&task->timeout, qr_task_timeout, KR_CONN_RTT_MAX, 0); + uv_timer_start(&task->timeout, on_timeout, KR_CONN_RTT_MAX, 0); return kr_ok(); } diff --git a/lib/defines.h b/lib/defines.h index 92c7c6fb9..d2e3e78d7 100644 --- a/lib/defines.h +++ b/lib/defines.h @@ -36,6 +36,7 @@ static inline int __attribute__((__cold__)) kr_error(int x) { * @cond internal */ #define KR_CONN_RTT_MAX 3000 /* Timeout for network activity */ +#define KR_CONN_RETRY 300 /* Retry interval for network activity */ #define KR_ITER_LIMIT 50 /* Built-in iterator limit */ /* diff --git a/lib/resolve.c b/lib/resolve.c index 81ca0b2aa..2340c4f9d 100644 --- a/lib/resolve.c +++ b/lib/resolve.c @@ -411,12 +411,6 @@ int kr_resolve_consume(struct kr_request *request, const struct sockaddr *src, k struct kr_query *qry = TAIL(rplan->pending); bool tried_tcp = (qry->flags & QUERY_TCP); if (!packet || packet->size == 0) { - /* Network error, retry over TCP. */ - if (!tried_tcp) { - DEBUG_MSG(qry, "=> NS unreachable, retrying over TCP\n"); - qry->flags |= QUERY_TCP; - return KNOT_STATE_PRODUCE; - } request->state = KNOT_STATE_FAIL; } else { /* Packet cleared, derandomize QNAME. */