]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon/worker: partially implemented multiplexed resolution
authorMarek Vavruša <marek.vavrusa@nic.cz>
Fri, 24 Apr 2015 07:13:20 +0000 (09:13 +0200)
committerMarek Vavruša <marek.vavrusa@nic.cz>
Fri, 24 Apr 2015 07:19:06 +0000 (09:19 +0200)
the worker now creates a resolution context copy,
and keeps it if the query requires iterative queries.
the worker_exec() is now a reentrant function that gets
called with incoming data until the resolution is done,
and it sends the answer

daemon/daemon.mk
daemon/io.c
daemon/io.h
daemon/layer/query.c [deleted file]
daemon/layer/query.h [deleted file]
daemon/worker.c
daemon/worker.h

index 677d23d1575abeb54f522168f22c593fa4a1d0b5..a7e52a30d6539453b3e1505bc3a0061e36328058 100644 (file)
@@ -1,5 +1,4 @@
 kresolved_SOURCES := \
-       daemon/layer/query.c \
        daemon/io.c          \
        daemon/network.c     \
        daemon/engine.c      \
index 621a49173ee5db017501a3fe6fee45615b41eb8b..eeebd1dc6608be3321f4931807bbb77728b337e0 100644 (file)
@@ -56,7 +56,7 @@ static void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
        knot_pkt_t *answer = knot_pkt_new(NULL, KNOT_WIRE_MAX_PKTSIZE, worker->mm);
 
        /* Resolve */
-       int ret = worker_exec(worker, answer, query);
+       int ret = worker_exec(worker, (uv_handle_t *)handle, answer, query);
        if (ret == KNOT_EOK && answer->size > 0) {
                udp_send(handle, answer, addr);
        }
@@ -134,7 +134,7 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
        knot_pkt_t *answer = knot_pkt_new(NULL, KNOT_WIRE_MAX_PKTSIZE, worker->mm);
 
        /* Resolve */
-       int ret = worker_exec(worker, answer, query);
+       int ret = worker_exec(worker, (uv_handle_t *)handle, answer, query);
        if (ret == KNOT_EOK && answer->size > 0) {
                tcp_send((uv_handle_t *)handle, answer);
        }
@@ -193,3 +193,31 @@ void tcp_unbind(struct endpoint *ep)
        tcp_unbind_handle((uv_handle_t *)&ep->tcp);
        uv_close((uv_handle_t *)&ep->tcp, NULL);
 }
+
+uv_handle_t *io_create(uv_loop_t *loop, int type)
+{
+       uv_handle_t *handle = NULL;
+       if (type == SOCK_DGRAM) {
+               handle = (uv_handle_t *)udp_create(loop);
+               if (handle) {
+                       uv_udp_recv_start((uv_udp_t *)handle, &buf_get, &udp_recv);
+               }
+
+       } else {
+               handle = (uv_handle_t *)tcp_create(loop);
+               if (handle) {
+                       uv_read_start((uv_stream_t*)handle, buf_get, tcp_recv);
+               }
+       }
+       return handle;
+}
+
+uv_connect_t *io_connect(uv_handle_t *handle, struct sockaddr *addr, uv_connect_cb on_connect)
+{
+       uv_connect_t* connect = malloc(sizeof(uv_connect_t));
+       if (uv_tcp_connect(connect, (uv_tcp_t *)handle, addr, on_connect) != 0) {
+               free(connect);
+               return NULL;
+       }
+       return connect;
+}
index b5b0116bee29576c29039fc7eccf20703d7e7ad6..073007eba12a9feeeb52ab9f9498913f6d7d08a4 100644 (file)
 #pragma once
 
 #include <uv.h>
+#include <libknot/packet/pkt.h>
+
 struct endpoint;
 
+int udp_send(uv_udp_t *handle, knot_pkt_t *answer, const struct sockaddr *addr);
 int udp_bind(struct endpoint *ep, struct sockaddr *addr);
 void udp_unbind(struct endpoint *ep);
 int tcp_bind(struct endpoint *ep, struct sockaddr *addr);
-void tcp_unbind(struct endpoint *ep);
\ No newline at end of file
+void tcp_unbind(struct endpoint *ep);
+uv_handle_t *io_create(uv_loop_t *loop, int type);
+uv_connect_t *io_connect(uv_handle_t *handle, struct sockaddr *addr, uv_connect_cb on_connect);
\ No newline at end of file
diff --git a/daemon/layer/query.c b/daemon/layer/query.c
deleted file mode 100644 (file)
index be0f836..0000000
+++ /dev/null
@@ -1,91 +0,0 @@
-/*  Copyright (C) 2014 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
-
-    This program is free software: you can redistribute it and/or modify
-    it under the terms of the GNU General Public License as published by
-    the Free Software Foundation, either version 3 of the License, or
-    (at your option) any later version.
-
-    This program is distributed in the hope that it will be useful,
-    but WITHOUT ANY WARRANTY; without even the implied warranty of
-    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    GNU General Public License for more details.
-
-    You should have received a copy of the GNU General Public License
-    along with this program.  If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include <libknot/errcode.h>
-
-#include "daemon/layer/query.h"
-#include "lib/resolve.h"
-
-static int reset(knot_layer_t *ctx)
-{
-       return KNOT_STATE_CONSUME;
-}
-
-static int begin(knot_layer_t *ctx, void *module_param)
-{
-       ctx->data = module_param;
-       return reset(ctx);
-}
-
-static int input_query(knot_layer_t *ctx, knot_pkt_t *pkt)
-{
-       assert(pkt && ctx);
-
-       /* Check if at least header is parsed. */
-       if (pkt->parsed < pkt->size) {
-               return KNOT_STATE_FAIL;
-       }
-
-       /* Accept only queries. */
-       if (knot_wire_get_qr(pkt->wire)) {
-               return KNOT_STATE_NOOP; /* Ignore. */
-       }
-
-       /* No authoritative service. */
-       if (!knot_wire_get_rd(pkt->wire)) {
-               return KNOT_STATE_FAIL;
-       }
-
-       return KNOT_STATE_PRODUCE;
-}
-
-static int output_answer(knot_layer_t *ctx, knot_pkt_t *pkt)
-{
-       assert(pkt && ctx);
-
-       /* Prepare for query processing. */
-       int ret = kr_resolve(ctx->data, pkt,
-                            knot_pkt_qname(pkt),
-                            knot_pkt_qclass(pkt),
-                            knot_pkt_qtype(pkt));
-
-       if (ret != KNOT_EOK) {
-               return KNOT_STATE_FAIL;
-       }
-
-       return KNOT_STATE_DONE;
-}
-
-static int output_error(knot_layer_t *ctx, knot_pkt_t *pkt)
-{
-       knot_wire_set_rcode(pkt->wire, KNOT_RCODE_SERVFAIL);
-       return KNOT_STATE_DONE;
-}
-
-/** Module implementation. */
-static const knot_layer_api_t LAYER_QUERY_MODULE = {
-       &begin,
-       NULL,
-       &reset,
-       &input_query,
-       &output_answer,
-       &output_error
-};
-
-const knot_layer_api_t *layer_query_module(void)
-{
-       return &LAYER_QUERY_MODULE;
-}
diff --git a/daemon/layer/query.h b/daemon/layer/query.h
deleted file mode 100644 (file)
index 1925219..0000000
+++ /dev/null
@@ -1,23 +0,0 @@
-/*  Copyright (C) 2014 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
-
-    This program is free software: you can redistribute it and/or modify
-    it under the terms of the GNU General Public License as published by
-    the Free Software Foundation, either version 3 of the License, or
-    (at your option) any later version.
-
-    This program is distributed in the hope that it will be useful,
-    but WITHOUT ANY WARRANTY; without even the implied warranty of
-    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    GNU General Public License for more details.
-
-    You should have received a copy of the GNU General Public License
-    along with this program.  If not, see <http://www.gnu.org/licenses/>.
- */
-
-#pragma once
-
-#include "lib/layer.h"
-
-/* Processing module implementation. */
-const knot_layer_api_t *layer_query_module(void);
-#define LAYER_QUERY layer_query_module()
index 1e037e5d91edbd06870842a0e8538521e2f022dd..5f4fd063abe011a15d32adfbbae0d07c26e66117 100644 (file)
 
 #include <libknot/packet/pkt.h>
 #include <libknot/internal/net.h>
+#include <libknot/internal/mempool.h>
 
 #include "daemon/worker.h"
 #include "daemon/engine.h"
-#include "daemon/layer/query.h"
+#include "daemon/io.h"
 
-int worker_exec(struct worker_ctx *worker, knot_pkt_t *answer, knot_pkt_t *query)
+/** @internal Query resolution task. */
+struct qr_task
 {
-       if (worker == NULL) {
-               return kr_error(EINVAL);
-       }
+       struct kr_request req;
+       knot_pkt_t *pending;
+       uv_handle_t *handle;
+};
 
+static int parse_query(knot_pkt_t *query)
+{
        /* Parse query packet. */
        int ret = knot_pkt_parse(query, 0);
        if (ret != KNOT_EOK) {
                return kr_error(EPROTO); /* Ignore malformed query. */
        }
 
-       /* Process query packet. */
-       knot_layer_t proc;
-       memset(&proc, 0, sizeof(knot_layer_t));
-       proc.mm = worker->mm;
-       knot_layer_begin(&proc, LAYER_QUERY, &worker->engine->resolver);
-       int state = knot_layer_consume(&proc, query);
+       /* Check if at least header is parsed. */
+       if (query->parsed < query->size) {
+               return kr_error(EMSGSIZE);
+       }
 
-       /* Build an answer. */
-       if (state & (KNOT_STATE_PRODUCE|KNOT_STATE_FAIL)) {
-               knot_pkt_init_response(answer, query);
-               state = knot_layer_produce(&proc, answer);
+       /* Accept only queries, no authoritative service. */
+       if (knot_wire_get_qr(query->wire) || !knot_wire_get_rd(query->wire)) {
+               return kr_error(EINVAL); /* Ignore. */
        }
 
-       /* Cleanup. */
-       knot_layer_finish(&proc);
+       return kr_ok();
+}
+
+static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *handle)
+{
+       mm_ctx_t pool;
+       mm_ctx_mempool(&pool, MM_DEFAULT_BLKSIZE);
+
+       /* Create worker task */
+       struct engine *engine = worker->engine;
+       struct qr_task *task = mm_alloc(&pool, sizeof(*task));
+       if (!task) {
+               mp_delete(pool.ctx);
+               return NULL;
+       }
+       task->req.pool = pool;
+       task->handle = handle;
+
+#warning TODO: devise a better scheme to manage answer buffer, it needs copy each time now
+       /* Create buffers */
+       knot_pkt_t *pending = knot_pkt_new(NULL, KNOT_WIRE_MIN_PKTSIZE, &task->req.pool);
+       knot_pkt_t *answer = knot_pkt_new(NULL, KNOT_WIRE_MAX_PKTSIZE, &task->req.pool);
+       if (!pending || !answer) {
+               mp_delete(pool.ctx);
+               return NULL;
+       }
+       task->req.answer = answer;
+       task->pending = pending;
+
+       /* Start resolution */
+       kr_resolve_begin(&task->req, &engine->resolver, answer);
+       return task;
+}
+
+static int qr_task_finalize(struct qr_task *task, knot_pkt_t *dst, int state)
+{
+       knot_pkt_t *answer = task->req.answer;
+       kr_resolve_finish(&task->req, state);
+       memcpy(dst->wire, answer->wire, answer->size);
+       dst->size = answer->size;
+#warning TODO: send answer asynchronously
+       mp_delete(task->req.pool.ctx);
+       return state == KNOT_STATE_DONE ? 0 : kr_error(EIO);
+}
+
+static void qr_task_on_connect(uv_connect_t *connect, int status)
+{
+#warning TODO: if not connected, retry
+#warning TODO: if connected, send pending query
+}
+
+int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *answer, knot_pkt_t *query)
+{
+       if (!worker) {
+               return kr_error(EINVAL);
+       }
+
+       /* Parse query */
+       int ret = parse_query(query);
+       if (ret != 0) {
+               return ret;
+       }
+
+       /* Get pending request or start new */
+       struct qr_task *task = handle->data;
+       if (!task) {
+               task = qr_task_create(worker, handle);
+               if (!task) {
+                       return kr_error(ENOMEM);
+               }
+       }
+
+       /* Consume input and produce next query */
+       int proto = 0;
+       struct sockaddr *addr = NULL;
+#warning TODO: it shouldnt be needed to provide NULL answer if I/O fails
+       int state = kr_resolve_consume(&task->req, query);
+       while (state == KNOT_STATE_PRODUCE) {
+               state = kr_resolve_produce(&task->req, &addr, &proto, task->pending);
+       }
+       if (state & (KNOT_STATE_DONE|KNOT_STATE_FAIL)) {
+               return qr_task_finalize(task, answer, state);
+       }
+
+       /* Create connection for iterative query */
+       uv_handle_t *next_handle = io_create(handle->loop, proto);
+#warning TODO: improve error checking  
+       next_handle->data = task;
+       if (proto == SOCK_STREAM) {
+               uv_connect_t *connect = io_connect(next_handle, addr, qr_task_on_connect);
+               if (!connect) {
+#warning TODO: close next_handle                       
+                       return kr_error(ENOMEM);
+               }
+       } else {
+               /* Fake connection as libuv doesn't support connected UDP */
+               uv_connect_t fake_connect;
+               fake_connect.handle = (uv_stream_t *)next_handle;
+               qr_task_on_connect(&fake_connect, 0);
+       }
 
        return kr_ok();
 }
index 72e27896cdf6d7440856eaa93b969e6603e9d60b..370eeb473823167d55c71a6c7a95df2547a32eff 100644 (file)
@@ -33,8 +33,9 @@ struct worker_ctx {
  * Resolve query.
  *
  * @param worker
+ * @param handle
  * @param answer
  * @param query
  * @return 0, error code
  */
-int worker_exec(struct worker_ctx *worker, knot_pkt_t *answer, knot_pkt_t *query);
+int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *answer, knot_pkt_t *query);