From: Marek VavrusÌŒa Date: Fri, 24 Apr 2015 07:13:20 +0000 (+0200) Subject: daemon/worker: partially implemented multiplexed resolution X-Git-Tag: v1.0.0-beta1~228^2~11 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=2c84768675b552a4066752d04a7dae22f322f181;p=thirdparty%2Fknot-resolver.git daemon/worker: partially implemented multiplexed resolution the worker now creates a resolution context copy, and keeps it if the query requires iterative queries. the worker_exec() is now a reentrant function that gets called with incoming data until the resolution is done, and it sends the answer --- diff --git a/daemon/daemon.mk b/daemon/daemon.mk index 677d23d15..a7e52a30d 100644 --- a/daemon/daemon.mk +++ b/daemon/daemon.mk @@ -1,5 +1,4 @@ kresolved_SOURCES := \ - daemon/layer/query.c \ daemon/io.c \ daemon/network.c \ daemon/engine.c \ diff --git a/daemon/io.c b/daemon/io.c index 621a49173..eeebd1dc6 100644 --- a/daemon/io.c +++ b/daemon/io.c @@ -56,7 +56,7 @@ static void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, knot_pkt_t *answer = knot_pkt_new(NULL, KNOT_WIRE_MAX_PKTSIZE, worker->mm); /* Resolve */ - int ret = worker_exec(worker, answer, query); + int ret = worker_exec(worker, (uv_handle_t *)handle, answer, query); if (ret == KNOT_EOK && answer->size > 0) { udp_send(handle, answer, addr); } @@ -134,7 +134,7 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) knot_pkt_t *answer = knot_pkt_new(NULL, KNOT_WIRE_MAX_PKTSIZE, worker->mm); /* Resolve */ - int ret = worker_exec(worker, answer, query); + int ret = worker_exec(worker, (uv_handle_t *)handle, answer, query); if (ret == KNOT_EOK && answer->size > 0) { tcp_send((uv_handle_t *)handle, answer); } @@ -193,3 +193,31 @@ void tcp_unbind(struct endpoint *ep) tcp_unbind_handle((uv_handle_t *)&ep->tcp); uv_close((uv_handle_t *)&ep->tcp, NULL); } + +uv_handle_t *io_create(uv_loop_t *loop, int type) +{ + uv_handle_t *handle = NULL; + if (type == SOCK_DGRAM) { + handle = (uv_handle_t *)udp_create(loop); + if (handle) { + uv_udp_recv_start((uv_udp_t *)handle, &buf_get, &udp_recv); + } + + } else { + handle = (uv_handle_t *)tcp_create(loop); + if (handle) { + uv_read_start((uv_stream_t*)handle, buf_get, tcp_recv); + } + } + return handle; +} + +uv_connect_t *io_connect(uv_handle_t *handle, struct sockaddr *addr, uv_connect_cb on_connect) +{ + uv_connect_t* connect = malloc(sizeof(uv_connect_t)); + if (uv_tcp_connect(connect, (uv_tcp_t *)handle, addr, on_connect) != 0) { + free(connect); + return NULL; + } + return connect; +} diff --git a/daemon/io.h b/daemon/io.h index b5b0116be..073007eba 100644 --- a/daemon/io.h +++ b/daemon/io.h @@ -17,9 +17,14 @@ #pragma once #include +#include + struct endpoint; +int udp_send(uv_udp_t *handle, knot_pkt_t *answer, const struct sockaddr *addr); int udp_bind(struct endpoint *ep, struct sockaddr *addr); void udp_unbind(struct endpoint *ep); int tcp_bind(struct endpoint *ep, struct sockaddr *addr); -void tcp_unbind(struct endpoint *ep); \ No newline at end of file +void tcp_unbind(struct endpoint *ep); +uv_handle_t *io_create(uv_loop_t *loop, int type); +uv_connect_t *io_connect(uv_handle_t *handle, struct sockaddr *addr, uv_connect_cb on_connect); \ No newline at end of file diff --git a/daemon/layer/query.c b/daemon/layer/query.c deleted file mode 100644 index be0f83636..000000000 --- a/daemon/layer/query.c +++ /dev/null @@ -1,91 +0,0 @@ -/* Copyright (C) 2014 CZ.NIC, z.s.p.o. - - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see . - */ - -#include - -#include "daemon/layer/query.h" -#include "lib/resolve.h" - -static int reset(knot_layer_t *ctx) -{ - return KNOT_STATE_CONSUME; -} - -static int begin(knot_layer_t *ctx, void *module_param) -{ - ctx->data = module_param; - return reset(ctx); -} - -static int input_query(knot_layer_t *ctx, knot_pkt_t *pkt) -{ - assert(pkt && ctx); - - /* Check if at least header is parsed. */ - if (pkt->parsed < pkt->size) { - return KNOT_STATE_FAIL; - } - - /* Accept only queries. */ - if (knot_wire_get_qr(pkt->wire)) { - return KNOT_STATE_NOOP; /* Ignore. */ - } - - /* No authoritative service. */ - if (!knot_wire_get_rd(pkt->wire)) { - return KNOT_STATE_FAIL; - } - - return KNOT_STATE_PRODUCE; -} - -static int output_answer(knot_layer_t *ctx, knot_pkt_t *pkt) -{ - assert(pkt && ctx); - - /* Prepare for query processing. */ - int ret = kr_resolve(ctx->data, pkt, - knot_pkt_qname(pkt), - knot_pkt_qclass(pkt), - knot_pkt_qtype(pkt)); - - if (ret != KNOT_EOK) { - return KNOT_STATE_FAIL; - } - - return KNOT_STATE_DONE; -} - -static int output_error(knot_layer_t *ctx, knot_pkt_t *pkt) -{ - knot_wire_set_rcode(pkt->wire, KNOT_RCODE_SERVFAIL); - return KNOT_STATE_DONE; -} - -/** Module implementation. */ -static const knot_layer_api_t LAYER_QUERY_MODULE = { - &begin, - NULL, - &reset, - &input_query, - &output_answer, - &output_error -}; - -const knot_layer_api_t *layer_query_module(void) -{ - return &LAYER_QUERY_MODULE; -} diff --git a/daemon/layer/query.h b/daemon/layer/query.h deleted file mode 100644 index 1925219b5..000000000 --- a/daemon/layer/query.h +++ /dev/null @@ -1,23 +0,0 @@ -/* Copyright (C) 2014 CZ.NIC, z.s.p.o. - - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see . - */ - -#pragma once - -#include "lib/layer.h" - -/* Processing module implementation. */ -const knot_layer_api_t *layer_query_module(void); -#define LAYER_QUERY layer_query_module() diff --git a/daemon/worker.c b/daemon/worker.c index 1e037e5d9..5f4fd063a 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -18,38 +18,138 @@ #include #include +#include #include "daemon/worker.h" #include "daemon/engine.h" -#include "daemon/layer/query.h" +#include "daemon/io.h" -int worker_exec(struct worker_ctx *worker, knot_pkt_t *answer, knot_pkt_t *query) +/** @internal Query resolution task. */ +struct qr_task { - if (worker == NULL) { - return kr_error(EINVAL); - } + struct kr_request req; + knot_pkt_t *pending; + uv_handle_t *handle; +}; +static int parse_query(knot_pkt_t *query) +{ /* Parse query packet. */ int ret = knot_pkt_parse(query, 0); if (ret != KNOT_EOK) { return kr_error(EPROTO); /* Ignore malformed query. */ } - /* Process query packet. */ - knot_layer_t proc; - memset(&proc, 0, sizeof(knot_layer_t)); - proc.mm = worker->mm; - knot_layer_begin(&proc, LAYER_QUERY, &worker->engine->resolver); - int state = knot_layer_consume(&proc, query); + /* Check if at least header is parsed. */ + if (query->parsed < query->size) { + return kr_error(EMSGSIZE); + } - /* Build an answer. */ - if (state & (KNOT_STATE_PRODUCE|KNOT_STATE_FAIL)) { - knot_pkt_init_response(answer, query); - state = knot_layer_produce(&proc, answer); + /* Accept only queries, no authoritative service. */ + if (knot_wire_get_qr(query->wire) || !knot_wire_get_rd(query->wire)) { + return kr_error(EINVAL); /* Ignore. */ } - /* Cleanup. */ - knot_layer_finish(&proc); + return kr_ok(); +} + +static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *handle) +{ + mm_ctx_t pool; + mm_ctx_mempool(&pool, MM_DEFAULT_BLKSIZE); + + /* Create worker task */ + struct engine *engine = worker->engine; + struct qr_task *task = mm_alloc(&pool, sizeof(*task)); + if (!task) { + mp_delete(pool.ctx); + return NULL; + } + task->req.pool = pool; + task->handle = handle; + +#warning TODO: devise a better scheme to manage answer buffer, it needs copy each time now + /* Create buffers */ + knot_pkt_t *pending = knot_pkt_new(NULL, KNOT_WIRE_MIN_PKTSIZE, &task->req.pool); + knot_pkt_t *answer = knot_pkt_new(NULL, KNOT_WIRE_MAX_PKTSIZE, &task->req.pool); + if (!pending || !answer) { + mp_delete(pool.ctx); + return NULL; + } + task->req.answer = answer; + task->pending = pending; + + /* Start resolution */ + kr_resolve_begin(&task->req, &engine->resolver, answer); + return task; +} + +static int qr_task_finalize(struct qr_task *task, knot_pkt_t *dst, int state) +{ + knot_pkt_t *answer = task->req.answer; + kr_resolve_finish(&task->req, state); + memcpy(dst->wire, answer->wire, answer->size); + dst->size = answer->size; +#warning TODO: send answer asynchronously + mp_delete(task->req.pool.ctx); + return state == KNOT_STATE_DONE ? 0 : kr_error(EIO); +} + +static void qr_task_on_connect(uv_connect_t *connect, int status) +{ +#warning TODO: if not connected, retry +#warning TODO: if connected, send pending query +} + +int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *answer, knot_pkt_t *query) +{ + if (!worker) { + return kr_error(EINVAL); + } + + /* Parse query */ + int ret = parse_query(query); + if (ret != 0) { + return ret; + } + + /* Get pending request or start new */ + struct qr_task *task = handle->data; + if (!task) { + task = qr_task_create(worker, handle); + if (!task) { + return kr_error(ENOMEM); + } + } + + /* Consume input and produce next query */ + int proto = 0; + struct sockaddr *addr = NULL; +#warning TODO: it shouldnt be needed to provide NULL answer if I/O fails + int state = kr_resolve_consume(&task->req, query); + while (state == KNOT_STATE_PRODUCE) { + state = kr_resolve_produce(&task->req, &addr, &proto, task->pending); + } + if (state & (KNOT_STATE_DONE|KNOT_STATE_FAIL)) { + return qr_task_finalize(task, answer, state); + } + + /* Create connection for iterative query */ + uv_handle_t *next_handle = io_create(handle->loop, proto); +#warning TODO: improve error checking + next_handle->data = task; + if (proto == SOCK_STREAM) { + uv_connect_t *connect = io_connect(next_handle, addr, qr_task_on_connect); + if (!connect) { +#warning TODO: close next_handle + return kr_error(ENOMEM); + } + } else { + /* Fake connection as libuv doesn't support connected UDP */ + uv_connect_t fake_connect; + fake_connect.handle = (uv_stream_t *)next_handle; + qr_task_on_connect(&fake_connect, 0); + } return kr_ok(); } diff --git a/daemon/worker.h b/daemon/worker.h index 72e27896c..370eeb473 100644 --- a/daemon/worker.h +++ b/daemon/worker.h @@ -33,8 +33,9 @@ struct worker_ctx { * Resolve query. * * @param worker + * @param handle * @param answer * @param query * @return 0, error code */ -int worker_exec(struct worker_ctx *worker, knot_pkt_t *answer, knot_pkt_t *query); +int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *answer, knot_pkt_t *query);