From c7d6205587b4d9ecfe796a32d591ca4bebde3ca7 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Marek=20Vavru=C5=A1a?= Date: Fri, 24 Apr 2015 18:31:10 +0200 Subject: [PATCH] daemon/worker: reworked multiplexed worker * each query is assigned a task * each task contains request, some primitives and mempool * worker can process multiple tasks at once and offload I/O to event loop Not finished: * it depends on icmp/system timeouts, #22 * tcp reads are going to be bad if the messages arrive fragmented #21 --- daemon/io.c | 157 +++++++++++++++---------------------------- daemon/io.h | 3 - daemon/worker.c | 174 +++++++++++++++++++++++++++++++++--------------- daemon/worker.h | 6 +- lib/resolve.c | 2 +- 5 files changed, 180 insertions(+), 162 deletions(-) diff --git a/daemon/io.c b/daemon/io.c index eeebd1dc6..494df3cfd 100644 --- a/daemon/io.c +++ b/daemon/io.c @@ -21,62 +21,50 @@ #include "daemon/network.h" #include "daemon/worker.h" -#define ENDPOINT_BUFSIZE 512 /**< This is an artificial limit for DNS query. */ - -static void buf_get(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) +static void *handle_alloc(uv_loop_t *loop, size_t size) { -#warning TODO: freelist from worker allocation - buf->base = malloc(ENDPOINT_BUFSIZE); - if (buf->base) { - buf->len = ENDPOINT_BUFSIZE; - } else { - buf->len = 0; + uv_handle_t *handle = malloc(size); + if (handle) { + memset(handle, 0, size); } + return handle; +} + +static void handle_free(uv_handle_t *handle) +{ + free(handle); } -int udp_send(uv_udp_t *handle, knot_pkt_t *answer, const struct sockaddr *addr) +static void handle_getbuf(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { - uv_buf_t sendbuf = uv_buf_init((char *)answer->wire, answer->size); - return uv_udp_try_send(handle, &sendbuf, 1, addr); + /* Worker has single buffer which is reused for all incoming + * datagrams / stream reads, the content of the buffer is + * guaranteed to be unchanged only for the duration of + * udp_read() and tcp_read(). + */ + uv_loop_t *loop = handle->loop; + struct worker_ctx *worker = loop->data; + buf->base = (char *)worker->bufs.wire; + buf->len = sizeof(worker->bufs.wire); } -static void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, +void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags) { uv_loop_t *loop = handle->loop; struct worker_ctx *worker = loop->data; /* Check the incoming wire length. */ - if (nread < KNOT_WIRE_HEADER_SIZE) { - return; + if (nread > KNOT_WIRE_HEADER_SIZE) { + knot_pkt_t *query = knot_pkt_new(buf->base, nread, worker->mm); + worker_exec(worker, (uv_handle_t *)handle, query, addr); + knot_pkt_free(&query); } - /* Create packets */ - knot_pkt_t *query = knot_pkt_new(buf->base, nread, worker->mm); - knot_pkt_t *answer = knot_pkt_new(NULL, KNOT_WIRE_MAX_PKTSIZE, worker->mm); - - /* Resolve */ - int ret = worker_exec(worker, (uv_handle_t *)handle, answer, query); - if (ret == KNOT_EOK && answer->size > 0) { - udp_send(handle, answer, addr); + /* UDP requests are oneshot, always close afterwards */ + if (handle->data) { /* Do not free master socket */ + uv_close((uv_handle_t *)handle, handle_free); } - - /* Cleanup */ - knot_pkt_free(&query); - knot_pkt_free(&answer); - free(buf->base); -} - -static uv_udp_t *udp_create(uv_loop_t *loop) -{ - uv_udp_t *handle = malloc(sizeof(uv_udp_t)); - if (!handle) { - return handle; - } - - uv_udp_init(loop, handle); - - return handle; } int udp_bind(struct endpoint *ep, struct sockaddr *addr) @@ -87,7 +75,8 @@ int udp_bind(struct endpoint *ep, struct sockaddr *addr) return ret; } - return uv_udp_recv_start(handle, &buf_get, &udp_recv); + handle->data = NULL; + return uv_udp_recv_start(handle, &handle_getbuf, &udp_recv); } void udp_unbind(struct endpoint *ep) @@ -102,58 +91,31 @@ static void tcp_unbind_handle(uv_handle_t *handle) uv_read_stop((uv_stream_t *)handle); } -static void tcp_send(uv_handle_t *handle, const knot_pkt_t *answer) -{ - uint16_t pkt_size = htons(answer->size); - uv_buf_t buf[2]; - buf[0].base = (char *)&pkt_size; - buf[0].len = sizeof(pkt_size); - buf[1].base = (char *)answer->wire; - buf[1].len = answer->size; - - uv_try_write((uv_stream_t *)handle, buf, 2); -} - static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) { uv_loop_t *loop = handle->loop; struct worker_ctx *worker = loop->data; - /* Check the incoming wire length (malformed, EOF or error). */ - if (nread < (ssize_t) sizeof(uint16_t)) { - tcp_unbind_handle((uv_handle_t *)handle); - uv_close((uv_handle_t *)handle, (uv_close_cb) free); + /* Check for connection close */ + if (nread <= 0) { + uv_close((uv_handle_t *)handle, handle_free); + return; + } else if (nread < 2) { + /* Not enough bytes to read length */ return; } /* Set packet size */ - nread = wire_read_u16((const uint8_t *)buf->base); - - /* Create packets */ - knot_pkt_t *query = knot_pkt_new(buf->base + sizeof(uint16_t), nread, worker->mm); - knot_pkt_t *answer = knot_pkt_new(NULL, KNOT_WIRE_MAX_PKTSIZE, worker->mm); + /** @todo This is not going to work if the packet is fragmented in the stream ! */ + uint16_t nbytes = wire_read_u16((const uint8_t *)buf->base); - /* Resolve */ - int ret = worker_exec(worker, (uv_handle_t *)handle, answer, query); - if (ret == KNOT_EOK && answer->size > 0) { - tcp_send((uv_handle_t *)handle, answer); + /* Check if there's enough data and execute */ + if (nbytes + 2 < nread) { + return; } - - /* Cleanup */ + knot_pkt_t *query = knot_pkt_new(buf->base + 2, nbytes, worker->mm); + worker_exec(worker, (uv_handle_t *)handle, query, NULL); knot_pkt_free(&query); - knot_pkt_free(&answer); - free(buf->base); -} - -static uv_tcp_t *tcp_create(uv_loop_t *loop) -{ - uv_tcp_t *handle = malloc(sizeof(uv_tcp_t)); - if (!handle) { - return handle; - } - - uv_tcp_init(loop, handle); - return handle; } static void tcp_accept(uv_stream_t *master, int status) @@ -162,13 +124,13 @@ static void tcp_accept(uv_stream_t *master, int status) return; } - uv_tcp_t *client = tcp_create(master->loop); - if (!client || uv_accept(master, (uv_stream_t*)client) != 0) { - free(client); + uv_stream_t *client = (uv_stream_t *)io_create(master->loop, SOCK_STREAM); + if (!client || uv_accept(master, client) != 0) { + handle_free((uv_handle_t *)client); return; } - uv_read_start((uv_stream_t*)client, buf_get, tcp_recv); + uv_read_start(client, handle_getbuf, tcp_recv); } int tcp_bind(struct endpoint *ep, struct sockaddr *addr) @@ -185,6 +147,7 @@ int tcp_bind(struct endpoint *ep, struct sockaddr *addr) return ret; } + handle->data = NULL; return 0; } @@ -196,28 +159,18 @@ void tcp_unbind(struct endpoint *ep) uv_handle_t *io_create(uv_loop_t *loop, int type) { - uv_handle_t *handle = NULL; if (type == SOCK_DGRAM) { - handle = (uv_handle_t *)udp_create(loop); + uv_udp_t *handle = handle_alloc(loop, sizeof(*handle)); if (handle) { - uv_udp_recv_start((uv_udp_t *)handle, &buf_get, &udp_recv); + uv_udp_init(loop, handle); + uv_udp_recv_start(handle, &handle_getbuf, &udp_recv); } - + return (uv_handle_t *)handle; } else { - handle = (uv_handle_t *)tcp_create(loop); + uv_tcp_t *handle = handle_alloc(loop, sizeof(*handle)); if (handle) { - uv_read_start((uv_stream_t*)handle, buf_get, tcp_recv); + uv_tcp_init(loop, handle); } + return (uv_handle_t *)handle; } - return handle; -} - -uv_connect_t *io_connect(uv_handle_t *handle, struct sockaddr *addr, uv_connect_cb on_connect) -{ - uv_connect_t* connect = malloc(sizeof(uv_connect_t)); - if (uv_tcp_connect(connect, (uv_tcp_t *)handle, addr, on_connect) != 0) { - free(connect); - return NULL; - } - return connect; } diff --git a/daemon/io.h b/daemon/io.h index 073007eba..e8579cea1 100644 --- a/daemon/io.h +++ b/daemon/io.h @@ -20,11 +20,8 @@ #include struct endpoint; - -int udp_send(uv_udp_t *handle, knot_pkt_t *answer, const struct sockaddr *addr); int udp_bind(struct endpoint *ep, struct sockaddr *addr); void udp_unbind(struct endpoint *ep); int tcp_bind(struct endpoint *ep, struct sockaddr *addr); void tcp_unbind(struct endpoint *ep); uv_handle_t *io_create(uv_loop_t *loop, int type); -uv_connect_t *io_connect(uv_handle_t *handle, struct sockaddr *addr, uv_connect_cb on_connect); \ No newline at end of file diff --git a/daemon/worker.c b/daemon/worker.c index 5f4fd063a..06c2d9e88 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -28,10 +28,23 @@ struct qr_task { struct kr_request req; - knot_pkt_t *pending; - uv_handle_t *handle; + knot_pkt_t *next_query; + union { + uv_write_t tcp_send; + uv_udp_send_t udp_send; + uv_connect_t connect; + } ioreq; + struct { + union { + struct sockaddr_in ip4; + struct sockaddr_in6 ip6; + } addr; + uv_handle_t *handle; + } source; }; +static int qr_task_step(struct qr_task *task, knot_pkt_t *packet); + static int parse_query(knot_pkt_t *query) { /* Parse query packet. */ @@ -45,15 +58,10 @@ static int parse_query(knot_pkt_t *query) return kr_error(EMSGSIZE); } - /* Accept only queries, no authoritative service. */ - if (knot_wire_get_qr(query->wire) || !knot_wire_get_rd(query->wire)) { - return kr_error(EINVAL); /* Ignore. */ - } - return kr_ok(); } -static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *handle) +static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *handle, const struct sockaddr *addr) { mm_ctx_t pool; mm_ctx_mempool(&pool, MM_DEFAULT_BLKSIZE); @@ -66,42 +74,121 @@ static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *ha return NULL; } task->req.pool = pool; - task->handle = handle; + task->source.handle = handle; + if (addr) { + memcpy(&task->source.addr, addr, sockaddr_len(addr)); + } -#warning TODO: devise a better scheme to manage answer buffer, it needs copy each time now /* Create buffers */ - knot_pkt_t *pending = knot_pkt_new(NULL, KNOT_WIRE_MIN_PKTSIZE, &task->req.pool); + knot_pkt_t *next_query = knot_pkt_new(NULL, KNOT_WIRE_MIN_PKTSIZE, &task->req.pool); knot_pkt_t *answer = knot_pkt_new(NULL, KNOT_WIRE_MAX_PKTSIZE, &task->req.pool); - if (!pending || !answer) { + if (!next_query || !answer) { mp_delete(pool.ctx); return NULL; } task->req.answer = answer; - task->pending = pending; + task->next_query = next_query; /* Start resolution */ kr_resolve_begin(&task->req, &engine->resolver, answer); return task; } -static int qr_task_finalize(struct qr_task *task, knot_pkt_t *dst, int state) +static void qr_task_on_send(uv_req_t* req, int status) +{ + struct qr_task *task = req->data; + if (task) { + /* Failed to send, invalidate */ + if (status != 0) { + qr_task_step(task, NULL); + } + if (task->req.overlay.state == KNOT_STATE_NOOP) { + mp_delete(task->req.pool.ctx); + } + } +} + +static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockaddr *addr, knot_pkt_t *pkt) +{ + if (handle->type == UV_UDP) { + uv_buf_t buf = { (char *)pkt->wire, pkt->size }; + uv_udp_send_t *req = &task->ioreq.udp_send; + req->data = task; + return uv_udp_send(req, (uv_udp_t *)handle, &buf, 1, addr, (uv_udp_send_cb)qr_task_on_send); + } else { + uint16_t pkt_size = htons(pkt->size); + uv_buf_t buf[2] = { + { (char *)&pkt_size, sizeof(pkt_size) }, + { (char *)pkt->wire, pkt->size } + }; + uv_write_t *req = &task->ioreq.tcp_send; + req->data = task; + return uv_write(req, (uv_stream_t *)handle, buf, 2, (uv_write_cb)qr_task_on_send); + } +} + +static void qr_task_on_connect(uv_connect_t *connect, int status) +{ + uv_stream_t *handle = connect->handle; + struct qr_task *task = connect->data; + if (status != 0) { /* Failed to connect */ + qr_task_step(task, NULL); + } else { + qr_task_send(task, (uv_handle_t *)handle, NULL, task->next_query); + } +} + +static int qr_task_finalize(struct qr_task *task, int state) { - knot_pkt_t *answer = task->req.answer; kr_resolve_finish(&task->req, state); - memcpy(dst->wire, answer->wire, answer->size); - dst->size = answer->size; -#warning TODO: send answer asynchronously - mp_delete(task->req.pool.ctx); + qr_task_send(task, task->source.handle, (struct sockaddr *)&task->source.addr, task->req.answer); return state == KNOT_STATE_DONE ? 0 : kr_error(EIO); } -static void qr_task_on_connect(uv_connect_t *connect, int status) +static int qr_task_step(struct qr_task *task, knot_pkt_t *packet) { -#warning TODO: if not connected, retry -#warning TODO: if connected, send pending query + /* Consume input and produce next query */ + assert(task); + int sock_type = -1; + struct sockaddr *addr = NULL; + knot_pkt_t *next_query = task->next_query; + int state = kr_resolve_consume(&task->req, packet); + while (state == KNOT_STATE_PRODUCE) { + state = kr_resolve_produce(&task->req, &addr, &sock_type, next_query); + } + + /* We're done, no more iterations needed */ + if (state & (KNOT_STATE_DONE|KNOT_STATE_FAIL)) { + return qr_task_finalize(task, state); + } + + /* Create connection for iterative query */ + uv_handle_t *source_handle = task->source.handle; + uv_handle_t *next_handle = io_create(source_handle->loop, sock_type); + if (next_handle == NULL) { + return qr_task_finalize(task, KNOT_STATE_FAIL); + } + + /* Connect or issue query datagram */ + next_handle->data = task; + if (sock_type == SOCK_STREAM) { + uv_connect_t *connect = &task->ioreq.connect; + if (uv_tcp_connect(connect, (uv_tcp_t *)next_handle, addr, qr_task_on_connect) != 0) { + uv_close(next_handle, (uv_close_cb) free); + return qr_task_step(task, NULL); + } + connect->data = task; + } else { + if (qr_task_send(task, next_handle, addr, next_query) != 0) { + uv_close(next_handle, (uv_close_cb) free); + return qr_task_step(task, NULL); + } + } + + return kr_ok(); } -int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *answer, knot_pkt_t *query) +int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr* addr) { if (!worker) { return kr_error(EINVAL); @@ -113,43 +200,20 @@ int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *answ return ret; } - /* Get pending request or start new */ + /* Start new task on master sockets, or resume existing */ struct qr_task *task = handle->data; - if (!task) { - task = qr_task_create(worker, handle); + bool is_master_socket = (!task); + if (is_master_socket) { + /* Accept only queries */ + if (knot_wire_get_qr(query->wire)) { + return kr_error(EINVAL); /* Ignore. */ + } + task = qr_task_create(worker, handle, addr); if (!task) { return kr_error(ENOMEM); } } /* Consume input and produce next query */ - int proto = 0; - struct sockaddr *addr = NULL; -#warning TODO: it shouldnt be needed to provide NULL answer if I/O fails - int state = kr_resolve_consume(&task->req, query); - while (state == KNOT_STATE_PRODUCE) { - state = kr_resolve_produce(&task->req, &addr, &proto, task->pending); - } - if (state & (KNOT_STATE_DONE|KNOT_STATE_FAIL)) { - return qr_task_finalize(task, answer, state); - } - - /* Create connection for iterative query */ - uv_handle_t *next_handle = io_create(handle->loop, proto); -#warning TODO: improve error checking - next_handle->data = task; - if (proto == SOCK_STREAM) { - uv_connect_t *connect = io_connect(next_handle, addr, qr_task_on_connect); - if (!connect) { -#warning TODO: close next_handle - return kr_error(ENOMEM); - } - } else { - /* Fake connection as libuv doesn't support connected UDP */ - uv_connect_t fake_connect; - fake_connect.handle = (uv_stream_t *)next_handle; - qr_task_on_connect(&fake_connect, 0); - } - - return kr_ok(); + return qr_task_step(task, query); } diff --git a/daemon/worker.h b/daemon/worker.h index 370eeb473..503aba431 100644 --- a/daemon/worker.h +++ b/daemon/worker.h @@ -27,6 +27,9 @@ struct worker_ctx { struct engine *engine; uv_loop_t *loop; mm_ctx_t *mm; + struct { + uint8_t wire[KNOT_WIRE_MAX_PKTSIZE]; + } bufs; }; /** @@ -36,6 +39,7 @@ struct worker_ctx { * @param handle * @param answer * @param query + * @param addr * @return 0, error code */ -int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *answer, knot_pkt_t *query); +int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr* addr); diff --git a/lib/resolve.c b/lib/resolve.c index 3cf346fd4..4c1b91200 100644 --- a/lib/resolve.c +++ b/lib/resolve.c @@ -351,8 +351,8 @@ int kr_resolve_finish(struct kr_request *request, int state) } /* Clean up. */ - knot_overlay_reset(&request->overlay); knot_overlay_deinit(&request->overlay); + request->overlay.state = KNOT_STATE_NOOP; kr_rplan_deinit(&request->rplan); return KNOT_STATE_DONE; } -- 2.47.2