} as;
};
+/** @internal Number of request within timeout window. */
+#define MAX_PENDING (KR_NSREP_MAXADDR + (KR_NSREP_MAXADDR / 2))
+
+/** @internal Query resolution task. */
+struct qr_task
+{
+ struct kr_request req;
+ struct worker_ctx *worker;
+ knot_pkt_t *pktbuf;
+ uv_handle_t *pending[MAX_PENDING];
+ uint16_t pending_count;
+ uint16_t addrlist_count;
+ uint16_t addrlist_turn;
+ struct sockaddr *addrlist;
+ uv_timer_t retry, timeout;
+ worker_cb_t on_complete;
+ void *baton;
+ struct {
+ union {
+ struct sockaddr_in ip4;
+ struct sockaddr_in6 ip6;
+ } addr;
+ uv_handle_t *handle;
+ } source;
+ uint16_t iter_count;
+ uint16_t refs;
+ uint16_t bytes_remaining;
+};
+
+/* Convenience macros */
+#define qr_task_ref(task) \
+ do { ++(task)->refs; } while(0)
+#define qr_task_unref(task) \
+ do { if (--(task)->refs == 0) { qr_task_free(task); } } while (0)
+#define qr_valid_handle(task, checked) \
+ (!uv_is_closing((checked)) || (task)->source.handle == (checked))
+
+/* Forward decls */
+static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *packet);
+
+/** @internal Get singleton worker. */
+static inline struct worker_ctx *get_worker(void)
+{
+ return uv_default_loop()->data;
+}
+
static inline struct ioreq *ioreq_take(struct worker_ctx *worker)
{
struct ioreq *req = NULL;
}
}
+static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype)
+{
+ if (task->pending_count >= MAX_PENDING) {
+ return NULL;
+ }
+ /* Create connection for iterative query */
+ uv_handle_t *req = (uv_handle_t *)ioreq_take(task->worker);
+ if (!req) {
+ return NULL;
+ }
+ io_create(task->worker->loop, req, socktype);
+ req->data = task;
+ /* Connect or issue query datagram */
+ task->pending[task->pending_count] = req;
+ task->pending_count += 1;
+ return req;
+}
+
+static void ioreq_on_close(uv_handle_t *handle)
+{
+ struct worker_ctx *worker = get_worker();
+ ioreq_release(worker, (struct ioreq *)handle);
+}
+
+static void ioreq_kill(uv_handle_t *req)
+{
+ assert(req);
+ if (!uv_is_closing(req)) {
+ io_stop_read(req);
+ uv_close(req, ioreq_on_close);
+ }
+}
+
+static void ioreq_killall(struct qr_task *task)
+{
+ for (size_t i = 0; i < task->pending_count; ++i) {
+ ioreq_kill(task->pending[i]);
+ }
+ task->pending_count = 0;
+}
+
static inline struct mempool *pool_take(struct worker_ctx *worker)
{
/* Recycle available mempool if possible */
}
}
-/** @internal Query resolution task. */
-struct qr_task
-{
- struct kr_request req;
- struct worker_ctx *worker;
- knot_pkt_t *pktbuf;
- uv_handle_t *iohandle;
- uv_timer_t timeout;
- worker_cb_t on_complete;
- void *baton;
- struct {
- union {
- struct sockaddr_in ip4;
- struct sockaddr_in6 ip6;
- } addr;
- uv_handle_t *handle;
- } source;
- uint16_t iter_count;
- uint16_t refs;
- uint16_t bytes_remaining;
-};
-
-/* Convenience macros */
-#define qr_task_ref(task) \
- do { ++(task)->refs; } while(0)
-#define qr_task_unref(task) \
- do { if (--(task)->refs == 0) { qr_task_free(task); } } while (0)
-#define qr_valid_handle(task, checked) \
- ((task)->iohandle == (checked) || (task)->source.handle == (checked))
-
-/* Forward decls */
-static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *packet);
-
-/** @internal Get singleton worker. */
-static inline struct worker_ctx *get_worker(void)
-{
- return uv_default_loop()->data;
-}
-
static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr *addr)
{
/* How much can client handle? */
}
task->req.answer = answer;
task->pktbuf = pktbuf;
- task->iohandle = NULL;
+ task->addrlist = NULL;
+ task->pending_count = 0;
task->bytes_remaining = 0;
task->iter_count = 0;
task->refs = 1;
task->worker = worker;
task->source.handle = handle;
+ uv_timer_init(worker->loop, &task->retry);
uv_timer_init(worker->loop, &task->timeout);
+ task->retry.data = task;
task->timeout.data = task;
task->on_complete = NULL;
/* Remember query source addr */
return task;
}
+/* This is called when the task refcount is zero, free memory. */
static void qr_task_free(struct qr_task *task)
{
/* Return mempool to ring or free it if it's full */
}
}
+/* This is called when retry timer closes */
+static void retransmit_close(uv_handle_t *handle)
+{
+ struct qr_task *task = handle->data;
+ qr_task_unref(task);
+}
+
+/* This is called when task completes and timeout timer is closed. */
static void qr_task_complete(uv_handle_t *handle)
{
struct qr_task *task = handle->data;
struct worker_ctx *worker = task->worker;
+ /* Kill pending I/O requests */
+ ioreq_killall(task);
/* Run the completion callback. */
if (task->on_complete) {
task->on_complete(worker, &task->req, task->baton);
worker->stats.concurrent -= 1;
}
-static void qr_task_timeout(uv_timer_t *req)
+/* This is called when I/O timeouts */
+static void on_timeout(uv_timer_t *req)
{
struct qr_task *task = req->data;
if (!uv_is_closing((uv_handle_t *)req)) {
}
}
+/* This is called when we send subrequest / answer */
static int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status)
{
+ /* When NOOP, it means we sent the final answer to originator,
+ * there we start to close timers and finalize task. */
if (task->req.state != KNOT_STATE_NOOP) {
if (status == 0 && handle) {
io_start_read(handle); /* Start reading answer */
}
- } else { /* Finalize task */
+ } else {
+ /* Close retry timer (borrows task) */
+ qr_task_ref(task);
+ uv_timer_stop(&task->retry);
+ uv_close((uv_handle_t *)&task->retry, retransmit_close);
+ /* Close timeout timer (finishes task) */
uv_timer_stop(&task->timeout);
uv_close((uv_handle_t *)&task->timeout, qr_task_complete);
}
return status;
}
-static void on_close(uv_handle_t *handle)
-{
- struct worker_ctx *worker = get_worker();
- ioreq_release(worker, (struct ioreq *)handle);
-}
-
static void on_send(uv_udp_send_t *req, int status)
{
struct worker_ctx *worker = get_worker();
ioreq_release(worker, (struct ioreq *)req);
}
+static void on_retransmit(uv_timer_t *req)
+{
+ struct qr_task *task = req->data;
+ /* Create connection for iterative query */
+ if (!uv_is_closing((uv_handle_t *)req) && task->addrlist) {
+ uv_handle_t *subreq = ioreq_spawn(task, SOCK_DGRAM);
+ if (subreq) {
+ struct sockaddr_in6 *choice = &((struct sockaddr_in6 *)task->addrlist)[task->addrlist_turn];
+ if (qr_task_send(task, subreq, (struct sockaddr *)choice, task->pktbuf) == 0) {
+ task->addrlist_turn = (task->addrlist_turn + 1) % task->addrlist_count; /* Round robin */
+ return;
+ }
+ }
+ }
+ /* Not possible to spawn request, stop trying */
+ uv_timer_stop(req);
+}
+
static int qr_task_finalize(struct qr_task *task, int state)
{
kr_resolve_finish(&task->req, state);
static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *packet)
{
- /* Close subrequest handle. */
+ /* Close pending I/O requests */
+ uv_timer_stop(&task->retry);
uv_timer_stop(&task->timeout);
- if (task->iohandle && !uv_is_closing(task->iohandle)) {
- io_stop_read(task->iohandle);
- uv_close(task->iohandle, on_close);
- task->iohandle = NULL;
- }
+ ioreq_killall(task);
/* Consume input and produce next query */
int sock_type = -1;
- struct sockaddr *addr = NULL;
- knot_pkt_t *pktbuf = task->pktbuf;
+ task->addrlist = NULL;
+ task->addrlist_count = 0;
+ task->addrlist_turn = 0;
int state = kr_resolve_consume(&task->req, packet_source, packet);
while (state == KNOT_STATE_PRODUCE) {
- state = kr_resolve_produce(&task->req, &addr, &sock_type, pktbuf);
+ state = kr_resolve_produce(&task->req, &task->addrlist, &sock_type, task->pktbuf);
if (unlikely(++task->iter_count > KR_ITER_LIMIT)) {
return qr_task_finalize(task, KNOT_STATE_FAIL);
}
/* We're done, no more iterations needed */
if (state & (KNOT_STATE_DONE|KNOT_STATE_FAIL)) {
return qr_task_finalize(task, state);
- } else if (!addr || sock_type < 0) {
+ } else if (!task->addrlist || sock_type < 0) {
return qr_task_step(task, NULL, NULL);
}
- /* Create connection for iterative query */
- uv_handle_t *subreq = (uv_handle_t *)ioreq_take(task->worker);
- if (!subreq) {
- return qr_task_finalize(task, KNOT_STATE_FAIL);
+ /* Count available address choices */
+ struct sockaddr_in6 *choice = (struct sockaddr_in6 *)task->addrlist;
+ for (size_t i = 0; i < KR_NSREP_MAXADDR && choice->sin6_family != AF_UNSPEC; ++i) {
+ task->addrlist_count += 1;
+ choice += 1;
}
- io_create(task->worker->loop, subreq, sock_type);
- subreq->data = task;
- /* Connect or issue query datagram */
- task->iohandle = subreq;
+ /* Start fast retransmit with UDP, otherwise connect. */
if (sock_type == SOCK_DGRAM) {
- if (qr_task_send(task, subreq, addr, pktbuf) != 0) {
+ uv_timer_start(&task->retry, on_retransmit, 0, KR_CONN_RETRY);
+ } else {
+ struct ioreq *conn = ioreq_take(task->worker);
+ if (!conn) {
return qr_task_step(task, NULL, NULL);
}
- } else {
- struct ioreq *conn_req = ioreq_take(task->worker);
- if (!conn_req) {
+ uv_handle_t *client = ioreq_spawn(task, sock_type);
+ if (!client) {
+ ioreq_release(task->worker, conn);
return qr_task_step(task, NULL, NULL);
}
- conn_req->as.connect.data = task;
- if (uv_tcp_connect(&conn_req->as.connect, (uv_tcp_t *)subreq, addr, on_connect) != 0) {
- ioreq_release(task->worker, conn_req);
+ conn->as.connect.data = task;
+ if (uv_tcp_connect(&conn->as.connect, (uv_tcp_t *)client, task->addrlist, on_connect) != 0) {
+ ioreq_release(task->worker, conn);
return qr_task_step(task, NULL, NULL);
}
+ /* Connect request borrows task */
qr_task_ref(task);
}
/* Start next step with timeout */
- uv_timer_start(&task->timeout, qr_task_timeout, KR_CONN_RTT_MAX, 0);
+ uv_timer_start(&task->timeout, on_timeout, KR_CONN_RTT_MAX, 0);
return kr_ok();
}