]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon/worker: reworked multiplexed worker
authorMarek Vavruša <marek.vavrusa@nic.cz>
Fri, 24 Apr 2015 16:31:10 +0000 (18:31 +0200)
committerMarek Vavruša <marek.vavrusa@nic.cz>
Sun, 26 Apr 2015 21:18:45 +0000 (23:18 +0200)
* each query is assigned a task
* each task contains request, some primitives and mempool
* worker can process multiple tasks at once and
  offload I/O to event loop

Not finished:

* it depends on icmp/system timeouts, #22
* tcp reads are going to be bad if the messages
  arrive fragmented #21

daemon/io.c
daemon/io.h
daemon/worker.c
daemon/worker.h
lib/resolve.c

index eeebd1dc6608be3321f4931807bbb77728b337e0..494df3cfd3d80f0a75ecebde9fc62e0f5d0c776b 100644 (file)
 #include "daemon/network.h"
 #include "daemon/worker.h"
 
-#define ENDPOINT_BUFSIZE 512 /**< This is an artificial limit for DNS query. */
-
-static void buf_get(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
+static void *handle_alloc(uv_loop_t *loop, size_t size)
 {
-#warning TODO: freelist from worker allocation
-       buf->base = malloc(ENDPOINT_BUFSIZE);
-       if (buf->base) {
-               buf->len = ENDPOINT_BUFSIZE;
-       } else {
-               buf->len = 0;
+       uv_handle_t *handle = malloc(size);
+       if (handle) {
+               memset(handle, 0, size);
        }
+       return handle;
+}
+
+static void handle_free(uv_handle_t *handle)
+{
+       free(handle);
 }
 
-int udp_send(uv_udp_t *handle, knot_pkt_t *answer, const struct sockaddr *addr)
+static void handle_getbuf(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
 {
-       uv_buf_t sendbuf = uv_buf_init((char *)answer->wire, answer->size);
-       return uv_udp_try_send(handle, &sendbuf, 1, addr);
+       /* Worker has single buffer which is reused for all incoming
+        * datagrams / stream reads, the content of the buffer is
+        * guaranteed to be unchanged only for the duration of
+        * udp_read() and tcp_read().
+        */
+       uv_loop_t *loop = handle->loop;
+       struct worker_ctx *worker = loop->data;
+       buf->base = (char *)worker->bufs.wire;
+       buf->len = sizeof(worker->bufs.wire);
 }
 
-static void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
+void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
        const struct sockaddr *addr, unsigned flags)
 {
        uv_loop_t *loop = handle->loop;
        struct worker_ctx *worker = loop->data;
 
        /* Check the incoming wire length. */
-       if (nread < KNOT_WIRE_HEADER_SIZE) {
-               return;
+       if (nread > KNOT_WIRE_HEADER_SIZE) {
+               knot_pkt_t *query = knot_pkt_new(buf->base, nread, worker->mm);
+               worker_exec(worker, (uv_handle_t *)handle, query, addr);
+               knot_pkt_free(&query);
        }
 
-       /* Create packets */
-       knot_pkt_t *query = knot_pkt_new(buf->base, nread, worker->mm);
-       knot_pkt_t *answer = knot_pkt_new(NULL, KNOT_WIRE_MAX_PKTSIZE, worker->mm);
-
-       /* Resolve */
-       int ret = worker_exec(worker, (uv_handle_t *)handle, answer, query);
-       if (ret == KNOT_EOK && answer->size > 0) {
-               udp_send(handle, answer, addr);
+       /* UDP requests are oneshot, always close afterwards */
+       if (handle->data) { /* Do not free master socket */
+               uv_close((uv_handle_t *)handle, handle_free);
        }
-
-       /* Cleanup */
-       knot_pkt_free(&query);
-       knot_pkt_free(&answer);
-       free(buf->base);
-}
-
-static uv_udp_t *udp_create(uv_loop_t *loop)
-{
-       uv_udp_t *handle = malloc(sizeof(uv_udp_t));
-       if (!handle) {
-               return handle;
-       }
-
-       uv_udp_init(loop, handle);
-
-       return handle;
 }
 
 int udp_bind(struct endpoint *ep, struct sockaddr *addr)
@@ -87,7 +75,8 @@ int udp_bind(struct endpoint *ep, struct sockaddr *addr)
                return ret;
        }
 
-       return uv_udp_recv_start(handle, &buf_get, &udp_recv);
+       handle->data = NULL;
+       return uv_udp_recv_start(handle, &handle_getbuf, &udp_recv);
 }
 
 void udp_unbind(struct endpoint *ep)
@@ -102,58 +91,31 @@ static void tcp_unbind_handle(uv_handle_t *handle)
        uv_read_stop((uv_stream_t *)handle);
 }
 
-static void tcp_send(uv_handle_t *handle, const knot_pkt_t *answer)
-{
-       uint16_t pkt_size = htons(answer->size);
-       uv_buf_t buf[2];
-       buf[0].base = (char *)&pkt_size;
-       buf[0].len  = sizeof(pkt_size);
-       buf[1].base = (char *)answer->wire;
-       buf[1].len  = answer->size;
-
-       uv_try_write((uv_stream_t *)handle, buf, 2);
-}
-
 static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
 {
        uv_loop_t *loop = handle->loop;
        struct worker_ctx *worker = loop->data;
 
-       /* Check the incoming wire length (malformed, EOF or error). */
-       if (nread < (ssize_t) sizeof(uint16_t)) {
-               tcp_unbind_handle((uv_handle_t *)handle);
-               uv_close((uv_handle_t *)handle, (uv_close_cb) free);
+       /* Check for connection close */
+       if (nread <= 0) {
+               uv_close((uv_handle_t *)handle, handle_free);
+               return;
+       } else if (nread < 2) {
+               /* Not enough bytes to read length */
                return;
        }
 
        /* Set packet size */
-       nread = wire_read_u16((const uint8_t *)buf->base);
-
-       /* Create packets */
-       knot_pkt_t *query = knot_pkt_new(buf->base + sizeof(uint16_t), nread, worker->mm);
-       knot_pkt_t *answer = knot_pkt_new(NULL, KNOT_WIRE_MAX_PKTSIZE, worker->mm);
+       /** @todo This is not going to work if the packet is fragmented in the stream ! */
+       uint16_t nbytes = wire_read_u16((const uint8_t *)buf->base);
 
-       /* Resolve */
-       int ret = worker_exec(worker, (uv_handle_t *)handle, answer, query);
-       if (ret == KNOT_EOK && answer->size > 0) {
-               tcp_send((uv_handle_t *)handle, answer);
+       /* Check if there's enough data and execute */
+       if (nbytes + 2 < nread) {
+               return;
        }
-
-       /* Cleanup */
+       knot_pkt_t *query = knot_pkt_new(buf->base + 2, nbytes, worker->mm);
+       worker_exec(worker, (uv_handle_t *)handle, query, NULL);
        knot_pkt_free(&query);
-       knot_pkt_free(&answer);
-       free(buf->base);
-}
-
-static uv_tcp_t *tcp_create(uv_loop_t *loop)
-{
-       uv_tcp_t *handle = malloc(sizeof(uv_tcp_t));
-       if (!handle) {
-               return handle;
-       }
-
-       uv_tcp_init(loop, handle);
-       return handle;
 }
 
 static void tcp_accept(uv_stream_t *master, int status)
@@ -162,13 +124,13 @@ static void tcp_accept(uv_stream_t *master, int status)
                return;
        }
 
-       uv_tcp_t *client = tcp_create(master->loop);
-       if (!client || uv_accept(master, (uv_stream_t*)client) != 0) {
-               free(client);
+       uv_stream_t *client = (uv_stream_t *)io_create(master->loop, SOCK_STREAM);
+       if (!client || uv_accept(master, client) != 0) {
+               handle_free((uv_handle_t *)client);
                return;
        }
 
-       uv_read_start((uv_stream_t*)client, buf_get, tcp_recv);
+       uv_read_start(client, handle_getbuf, tcp_recv);
 }
 
 int tcp_bind(struct endpoint *ep, struct sockaddr *addr)
@@ -185,6 +147,7 @@ int tcp_bind(struct endpoint *ep, struct sockaddr *addr)
                return ret;
        }
 
+       handle->data = NULL;
        return 0;
 }
 
@@ -196,28 +159,18 @@ void tcp_unbind(struct endpoint *ep)
 
 uv_handle_t *io_create(uv_loop_t *loop, int type)
 {
-       uv_handle_t *handle = NULL;
        if (type == SOCK_DGRAM) {
-               handle = (uv_handle_t *)udp_create(loop);
+               uv_udp_t *handle = handle_alloc(loop, sizeof(*handle));
                if (handle) {
-                       uv_udp_recv_start((uv_udp_t *)handle, &buf_get, &udp_recv);
+                       uv_udp_init(loop, handle);
+                       uv_udp_recv_start(handle, &handle_getbuf, &udp_recv);
                }
-
+               return (uv_handle_t *)handle;
        } else {
-               handle = (uv_handle_t *)tcp_create(loop);
+               uv_tcp_t *handle = handle_alloc(loop, sizeof(*handle));
                if (handle) {
-                       uv_read_start((uv_stream_t*)handle, buf_get, tcp_recv);
+                       uv_tcp_init(loop, handle);
                }
+               return (uv_handle_t *)handle;
        }
-       return handle;
-}
-
-uv_connect_t *io_connect(uv_handle_t *handle, struct sockaddr *addr, uv_connect_cb on_connect)
-{
-       uv_connect_t* connect = malloc(sizeof(uv_connect_t));
-       if (uv_tcp_connect(connect, (uv_tcp_t *)handle, addr, on_connect) != 0) {
-               free(connect);
-               return NULL;
-       }
-       return connect;
 }
index 073007eba12a9feeeb52ab9f9498913f6d7d08a4..e8579cea17956e3919803ce83059066b876fc4a5 100644 (file)
 #include <libknot/packet/pkt.h>
 
 struct endpoint;
-
-int udp_send(uv_udp_t *handle, knot_pkt_t *answer, const struct sockaddr *addr);
 int udp_bind(struct endpoint *ep, struct sockaddr *addr);
 void udp_unbind(struct endpoint *ep);
 int tcp_bind(struct endpoint *ep, struct sockaddr *addr);
 void tcp_unbind(struct endpoint *ep);
 uv_handle_t *io_create(uv_loop_t *loop, int type);
-uv_connect_t *io_connect(uv_handle_t *handle, struct sockaddr *addr, uv_connect_cb on_connect);
\ No newline at end of file
index 5f4fd063abe011a15d32adfbbae0d07c26e66117..06c2d9e8858c3a9def7eec87a5c8e16e4d95eb8f 100644 (file)
 struct qr_task
 {
        struct kr_request req;
-       knot_pkt_t *pending;
-       uv_handle_t *handle;
+       knot_pkt_t *next_query;
+       union {
+               uv_write_t tcp_send;
+               uv_udp_send_t udp_send;
+               uv_connect_t connect;
+       } ioreq;
+       struct {
+               union {
+                       struct sockaddr_in ip4;
+                       struct sockaddr_in6 ip6;
+               } addr;
+               uv_handle_t *handle;
+       } source;
 };
 
+static int qr_task_step(struct qr_task *task, knot_pkt_t *packet);
+
 static int parse_query(knot_pkt_t *query)
 {
        /* Parse query packet. */
@@ -45,15 +58,10 @@ static int parse_query(knot_pkt_t *query)
                return kr_error(EMSGSIZE);
        }
 
-       /* Accept only queries, no authoritative service. */
-       if (knot_wire_get_qr(query->wire) || !knot_wire_get_rd(query->wire)) {
-               return kr_error(EINVAL); /* Ignore. */
-       }
-
        return kr_ok();
 }
 
-static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *handle)
+static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *handle, const struct sockaddr *addr)
 {
        mm_ctx_t pool;
        mm_ctx_mempool(&pool, MM_DEFAULT_BLKSIZE);
@@ -66,42 +74,121 @@ static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *ha
                return NULL;
        }
        task->req.pool = pool;
-       task->handle = handle;
+       task->source.handle = handle;
+       if (addr) {
+               memcpy(&task->source.addr, addr, sockaddr_len(addr));
+       }
 
-#warning TODO: devise a better scheme to manage answer buffer, it needs copy each time now
        /* Create buffers */
-       knot_pkt_t *pending = knot_pkt_new(NULL, KNOT_WIRE_MIN_PKTSIZE, &task->req.pool);
+       knot_pkt_t *next_query = knot_pkt_new(NULL, KNOT_WIRE_MIN_PKTSIZE, &task->req.pool);
        knot_pkt_t *answer = knot_pkt_new(NULL, KNOT_WIRE_MAX_PKTSIZE, &task->req.pool);
-       if (!pending || !answer) {
+       if (!next_query || !answer) {
                mp_delete(pool.ctx);
                return NULL;
        }
        task->req.answer = answer;
-       task->pending = pending;
+       task->next_query = next_query;
 
        /* Start resolution */
        kr_resolve_begin(&task->req, &engine->resolver, answer);
        return task;
 }
 
-static int qr_task_finalize(struct qr_task *task, knot_pkt_t *dst, int state)
+static void qr_task_on_send(uv_req_t* req, int status)
+{
+       struct qr_task *task = req->data;
+       if (task) {
+               /* Failed to send, invalidate */
+               if (status != 0) {
+                       qr_task_step(task, NULL);
+               }
+               if (task->req.overlay.state == KNOT_STATE_NOOP) {
+                       mp_delete(task->req.pool.ctx);
+               }
+       }
+}
+
+static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockaddr *addr, knot_pkt_t *pkt)
+{
+       if (handle->type == UV_UDP) {
+               uv_buf_t buf = { (char *)pkt->wire, pkt->size };
+               uv_udp_send_t *req = &task->ioreq.udp_send;
+               req->data = task;
+               return uv_udp_send(req, (uv_udp_t *)handle, &buf, 1, addr, (uv_udp_send_cb)qr_task_on_send);
+       } else {
+               uint16_t pkt_size = htons(pkt->size);
+               uv_buf_t buf[2] = {
+                       { (char *)&pkt_size, sizeof(pkt_size) },
+                       { (char *)pkt->wire, pkt->size }
+               };
+               uv_write_t *req = &task->ioreq.tcp_send;
+               req->data = task;
+               return uv_write(req, (uv_stream_t *)handle, buf, 2, (uv_write_cb)qr_task_on_send);
+       }
+}
+
+static void qr_task_on_connect(uv_connect_t *connect, int status)
+{
+       uv_stream_t *handle = connect->handle;
+       struct qr_task *task = connect->data;
+       if (status != 0) { /* Failed to connect */
+               qr_task_step(task, NULL);
+       } else {
+               qr_task_send(task, (uv_handle_t *)handle, NULL, task->next_query);
+       }
+}
+
+static int qr_task_finalize(struct qr_task *task, int state)
 {
-       knot_pkt_t *answer = task->req.answer;
        kr_resolve_finish(&task->req, state);
-       memcpy(dst->wire, answer->wire, answer->size);
-       dst->size = answer->size;
-#warning TODO: send answer asynchronously
-       mp_delete(task->req.pool.ctx);
+       qr_task_send(task, task->source.handle, (struct sockaddr *)&task->source.addr, task->req.answer);
        return state == KNOT_STATE_DONE ? 0 : kr_error(EIO);
 }
 
-static void qr_task_on_connect(uv_connect_t *connect, int status)
+static int qr_task_step(struct qr_task *task, knot_pkt_t *packet)
 {
-#warning TODO: if not connected, retry
-#warning TODO: if connected, send pending query
+       /* Consume input and produce next query */
+       assert(task);
+       int sock_type = -1;
+       struct sockaddr *addr = NULL;
+       knot_pkt_t *next_query = task->next_query;
+       int state = kr_resolve_consume(&task->req, packet);
+       while (state == KNOT_STATE_PRODUCE) {
+               state = kr_resolve_produce(&task->req, &addr, &sock_type, next_query);
+       }
+
+       /* We're done, no more iterations needed */
+       if (state & (KNOT_STATE_DONE|KNOT_STATE_FAIL)) {
+               return qr_task_finalize(task, state);
+       }
+
+       /* Create connection for iterative query */
+       uv_handle_t *source_handle = task->source.handle;
+       uv_handle_t *next_handle = io_create(source_handle->loop, sock_type);
+       if (next_handle == NULL) {
+               return qr_task_finalize(task, KNOT_STATE_FAIL);
+       }
+
+       /* Connect or issue query datagram */
+       next_handle->data = task;
+       if (sock_type == SOCK_STREAM) {
+               uv_connect_t *connect = &task->ioreq.connect;
+               if (uv_tcp_connect(connect, (uv_tcp_t *)next_handle, addr, qr_task_on_connect) != 0) {
+                       uv_close(next_handle, (uv_close_cb) free);
+                       return qr_task_step(task, NULL);
+               }
+               connect->data = task;
+       } else {
+               if (qr_task_send(task, next_handle, addr, next_query) != 0) {
+                       uv_close(next_handle, (uv_close_cb) free);
+                       return qr_task_step(task, NULL);
+               }
+       }
+
+       return kr_ok();
 }
 
-int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *answer, knot_pkt_t *query)
+int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr* addr)
 {
        if (!worker) {
                return kr_error(EINVAL);
@@ -113,43 +200,20 @@ int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *answ
                return ret;
        }
 
-       /* Get pending request or start new */
+       /* Start new task on master sockets, or resume existing */
        struct qr_task *task = handle->data;
-       if (!task) {
-               task = qr_task_create(worker, handle);
+       bool is_master_socket = (!task);
+       if (is_master_socket) {
+               /* Accept only queries */
+               if (knot_wire_get_qr(query->wire)) {
+                       return kr_error(EINVAL); /* Ignore. */
+               }
+               task = qr_task_create(worker, handle, addr);
                if (!task) {
                        return kr_error(ENOMEM);
                }
        }
 
        /* Consume input and produce next query */
-       int proto = 0;
-       struct sockaddr *addr = NULL;
-#warning TODO: it shouldnt be needed to provide NULL answer if I/O fails
-       int state = kr_resolve_consume(&task->req, query);
-       while (state == KNOT_STATE_PRODUCE) {
-               state = kr_resolve_produce(&task->req, &addr, &proto, task->pending);
-       }
-       if (state & (KNOT_STATE_DONE|KNOT_STATE_FAIL)) {
-               return qr_task_finalize(task, answer, state);
-       }
-
-       /* Create connection for iterative query */
-       uv_handle_t *next_handle = io_create(handle->loop, proto);
-#warning TODO: improve error checking  
-       next_handle->data = task;
-       if (proto == SOCK_STREAM) {
-               uv_connect_t *connect = io_connect(next_handle, addr, qr_task_on_connect);
-               if (!connect) {
-#warning TODO: close next_handle                       
-                       return kr_error(ENOMEM);
-               }
-       } else {
-               /* Fake connection as libuv doesn't support connected UDP */
-               uv_connect_t fake_connect;
-               fake_connect.handle = (uv_stream_t *)next_handle;
-               qr_task_on_connect(&fake_connect, 0);
-       }
-
-       return kr_ok();
+       return qr_task_step(task, query);
 }
index 370eeb473823167d55c71a6c7a95df2547a32eff..503aba431d7969f0f2dbfa6f664f423a5ac5dc18 100644 (file)
@@ -27,6 +27,9 @@ struct worker_ctx {
        struct engine *engine;
        uv_loop_t *loop;
        mm_ctx_t *mm;
+    struct {
+        uint8_t wire[KNOT_WIRE_MAX_PKTSIZE];
+    } bufs;
 };
 
 /**
@@ -36,6 +39,7 @@ struct worker_ctx {
  * @param handle
  * @param answer
  * @param query
+ * @param addr
  * @return 0, error code
  */
-int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *answer, knot_pkt_t *query);
+int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr* addr);
index 3cf346fd421fe75be9f9600d0f44e2c1747e9323..4c1b912008452f5a483b796b86943578f7159631 100644 (file)
@@ -351,8 +351,8 @@ int kr_resolve_finish(struct kr_request *request, int state)
        }
 
        /* Clean up. */
-       knot_overlay_reset(&request->overlay);
        knot_overlay_deinit(&request->overlay);
+       request->overlay.state = KNOT_STATE_NOOP;
        kr_rplan_deinit(&request->rplan);
        return KNOT_STATE_DONE;
 }