#include "daemon/network.h"
#include "daemon/worker.h"
-#define ENDPOINT_BUFSIZE 512 /**< This is an artificial limit for DNS query. */
-
-static void buf_get(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
+static void *handle_alloc(uv_loop_t *loop, size_t size)
{
-#warning TODO: freelist from worker allocation
- buf->base = malloc(ENDPOINT_BUFSIZE);
- if (buf->base) {
- buf->len = ENDPOINT_BUFSIZE;
- } else {
- buf->len = 0;
+ uv_handle_t *handle = malloc(size);
+ if (handle) {
+ memset(handle, 0, size);
}
+ return handle;
+}
+
+static void handle_free(uv_handle_t *handle)
+{
+ free(handle);
}
-int udp_send(uv_udp_t *handle, knot_pkt_t *answer, const struct sockaddr *addr)
+static void handle_getbuf(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
{
- uv_buf_t sendbuf = uv_buf_init((char *)answer->wire, answer->size);
- return uv_udp_try_send(handle, &sendbuf, 1, addr);
+ /* Worker has single buffer which is reused for all incoming
+ * datagrams / stream reads, the content of the buffer is
+ * guaranteed to be unchanged only for the duration of
+ * udp_read() and tcp_read().
+ */
+ uv_loop_t *loop = handle->loop;
+ struct worker_ctx *worker = loop->data;
+ buf->base = (char *)worker->bufs.wire;
+ buf->len = sizeof(worker->bufs.wire);
}
-static void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
+void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
const struct sockaddr *addr, unsigned flags)
{
uv_loop_t *loop = handle->loop;
struct worker_ctx *worker = loop->data;
/* Check the incoming wire length. */
- if (nread < KNOT_WIRE_HEADER_SIZE) {
- return;
+ if (nread > KNOT_WIRE_HEADER_SIZE) {
+ knot_pkt_t *query = knot_pkt_new(buf->base, nread, worker->mm);
+ worker_exec(worker, (uv_handle_t *)handle, query, addr);
+ knot_pkt_free(&query);
}
- /* Create packets */
- knot_pkt_t *query = knot_pkt_new(buf->base, nread, worker->mm);
- knot_pkt_t *answer = knot_pkt_new(NULL, KNOT_WIRE_MAX_PKTSIZE, worker->mm);
-
- /* Resolve */
- int ret = worker_exec(worker, (uv_handle_t *)handle, answer, query);
- if (ret == KNOT_EOK && answer->size > 0) {
- udp_send(handle, answer, addr);
+ /* UDP requests are oneshot, always close afterwards */
+ if (handle->data) { /* Do not free master socket */
+ uv_close((uv_handle_t *)handle, handle_free);
}
-
- /* Cleanup */
- knot_pkt_free(&query);
- knot_pkt_free(&answer);
- free(buf->base);
-}
-
-static uv_udp_t *udp_create(uv_loop_t *loop)
-{
- uv_udp_t *handle = malloc(sizeof(uv_udp_t));
- if (!handle) {
- return handle;
- }
-
- uv_udp_init(loop, handle);
-
- return handle;
}
int udp_bind(struct endpoint *ep, struct sockaddr *addr)
return ret;
}
- return uv_udp_recv_start(handle, &buf_get, &udp_recv);
+ handle->data = NULL;
+ return uv_udp_recv_start(handle, &handle_getbuf, &udp_recv);
}
void udp_unbind(struct endpoint *ep)
uv_read_stop((uv_stream_t *)handle);
}
-static void tcp_send(uv_handle_t *handle, const knot_pkt_t *answer)
-{
- uint16_t pkt_size = htons(answer->size);
- uv_buf_t buf[2];
- buf[0].base = (char *)&pkt_size;
- buf[0].len = sizeof(pkt_size);
- buf[1].base = (char *)answer->wire;
- buf[1].len = answer->size;
-
- uv_try_write((uv_stream_t *)handle, buf, 2);
-}
-
static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
{
uv_loop_t *loop = handle->loop;
struct worker_ctx *worker = loop->data;
- /* Check the incoming wire length (malformed, EOF or error). */
- if (nread < (ssize_t) sizeof(uint16_t)) {
- tcp_unbind_handle((uv_handle_t *)handle);
- uv_close((uv_handle_t *)handle, (uv_close_cb) free);
+ /* Check for connection close */
+ if (nread <= 0) {
+ uv_close((uv_handle_t *)handle, handle_free);
+ return;
+ } else if (nread < 2) {
+ /* Not enough bytes to read length */
return;
}
/* Set packet size */
- nread = wire_read_u16((const uint8_t *)buf->base);
-
- /* Create packets */
- knot_pkt_t *query = knot_pkt_new(buf->base + sizeof(uint16_t), nread, worker->mm);
- knot_pkt_t *answer = knot_pkt_new(NULL, KNOT_WIRE_MAX_PKTSIZE, worker->mm);
+ /** @todo This is not going to work if the packet is fragmented in the stream ! */
+ uint16_t nbytes = wire_read_u16((const uint8_t *)buf->base);
- /* Resolve */
- int ret = worker_exec(worker, (uv_handle_t *)handle, answer, query);
- if (ret == KNOT_EOK && answer->size > 0) {
- tcp_send((uv_handle_t *)handle, answer);
+ /* Check if there's enough data and execute */
+ if (nbytes + 2 < nread) {
+ return;
}
-
- /* Cleanup */
+ knot_pkt_t *query = knot_pkt_new(buf->base + 2, nbytes, worker->mm);
+ worker_exec(worker, (uv_handle_t *)handle, query, NULL);
knot_pkt_free(&query);
- knot_pkt_free(&answer);
- free(buf->base);
-}
-
-static uv_tcp_t *tcp_create(uv_loop_t *loop)
-{
- uv_tcp_t *handle = malloc(sizeof(uv_tcp_t));
- if (!handle) {
- return handle;
- }
-
- uv_tcp_init(loop, handle);
- return handle;
}
static void tcp_accept(uv_stream_t *master, int status)
return;
}
- uv_tcp_t *client = tcp_create(master->loop);
- if (!client || uv_accept(master, (uv_stream_t*)client) != 0) {
- free(client);
+ uv_stream_t *client = (uv_stream_t *)io_create(master->loop, SOCK_STREAM);
+ if (!client || uv_accept(master, client) != 0) {
+ handle_free((uv_handle_t *)client);
return;
}
- uv_read_start((uv_stream_t*)client, buf_get, tcp_recv);
+ uv_read_start(client, handle_getbuf, tcp_recv);
}
int tcp_bind(struct endpoint *ep, struct sockaddr *addr)
return ret;
}
+ handle->data = NULL;
return 0;
}
uv_handle_t *io_create(uv_loop_t *loop, int type)
{
- uv_handle_t *handle = NULL;
if (type == SOCK_DGRAM) {
- handle = (uv_handle_t *)udp_create(loop);
+ uv_udp_t *handle = handle_alloc(loop, sizeof(*handle));
if (handle) {
- uv_udp_recv_start((uv_udp_t *)handle, &buf_get, &udp_recv);
+ uv_udp_init(loop, handle);
+ uv_udp_recv_start(handle, &handle_getbuf, &udp_recv);
}
-
+ return (uv_handle_t *)handle;
} else {
- handle = (uv_handle_t *)tcp_create(loop);
+ uv_tcp_t *handle = handle_alloc(loop, sizeof(*handle));
if (handle) {
- uv_read_start((uv_stream_t*)handle, buf_get, tcp_recv);
+ uv_tcp_init(loop, handle);
}
+ return (uv_handle_t *)handle;
}
- return handle;
-}
-
-uv_connect_t *io_connect(uv_handle_t *handle, struct sockaddr *addr, uv_connect_cb on_connect)
-{
- uv_connect_t* connect = malloc(sizeof(uv_connect_t));
- if (uv_tcp_connect(connect, (uv_tcp_t *)handle, addr, on_connect) != 0) {
- free(connect);
- return NULL;
- }
- return connect;
}
#include <libknot/packet/pkt.h>
struct endpoint;
-
-int udp_send(uv_udp_t *handle, knot_pkt_t *answer, const struct sockaddr *addr);
int udp_bind(struct endpoint *ep, struct sockaddr *addr);
void udp_unbind(struct endpoint *ep);
int tcp_bind(struct endpoint *ep, struct sockaddr *addr);
void tcp_unbind(struct endpoint *ep);
uv_handle_t *io_create(uv_loop_t *loop, int type);
-uv_connect_t *io_connect(uv_handle_t *handle, struct sockaddr *addr, uv_connect_cb on_connect);
\ No newline at end of file
struct qr_task
{
struct kr_request req;
- knot_pkt_t *pending;
- uv_handle_t *handle;
+ knot_pkt_t *next_query;
+ union {
+ uv_write_t tcp_send;
+ uv_udp_send_t udp_send;
+ uv_connect_t connect;
+ } ioreq;
+ struct {
+ union {
+ struct sockaddr_in ip4;
+ struct sockaddr_in6 ip6;
+ } addr;
+ uv_handle_t *handle;
+ } source;
};
+static int qr_task_step(struct qr_task *task, knot_pkt_t *packet);
+
static int parse_query(knot_pkt_t *query)
{
/* Parse query packet. */
return kr_error(EMSGSIZE);
}
- /* Accept only queries, no authoritative service. */
- if (knot_wire_get_qr(query->wire) || !knot_wire_get_rd(query->wire)) {
- return kr_error(EINVAL); /* Ignore. */
- }
-
return kr_ok();
}
-static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *handle)
+static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *handle, const struct sockaddr *addr)
{
mm_ctx_t pool;
mm_ctx_mempool(&pool, MM_DEFAULT_BLKSIZE);
return NULL;
}
task->req.pool = pool;
- task->handle = handle;
+ task->source.handle = handle;
+ if (addr) {
+ memcpy(&task->source.addr, addr, sockaddr_len(addr));
+ }
-#warning TODO: devise a better scheme to manage answer buffer, it needs copy each time now
/* Create buffers */
- knot_pkt_t *pending = knot_pkt_new(NULL, KNOT_WIRE_MIN_PKTSIZE, &task->req.pool);
+ knot_pkt_t *next_query = knot_pkt_new(NULL, KNOT_WIRE_MIN_PKTSIZE, &task->req.pool);
knot_pkt_t *answer = knot_pkt_new(NULL, KNOT_WIRE_MAX_PKTSIZE, &task->req.pool);
- if (!pending || !answer) {
+ if (!next_query || !answer) {
mp_delete(pool.ctx);
return NULL;
}
task->req.answer = answer;
- task->pending = pending;
+ task->next_query = next_query;
/* Start resolution */
kr_resolve_begin(&task->req, &engine->resolver, answer);
return task;
}
-static int qr_task_finalize(struct qr_task *task, knot_pkt_t *dst, int state)
+static void qr_task_on_send(uv_req_t* req, int status)
+{
+ struct qr_task *task = req->data;
+ if (task) {
+ /* Failed to send, invalidate */
+ if (status != 0) {
+ qr_task_step(task, NULL);
+ }
+ if (task->req.overlay.state == KNOT_STATE_NOOP) {
+ mp_delete(task->req.pool.ctx);
+ }
+ }
+}
+
+static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockaddr *addr, knot_pkt_t *pkt)
+{
+ if (handle->type == UV_UDP) {
+ uv_buf_t buf = { (char *)pkt->wire, pkt->size };
+ uv_udp_send_t *req = &task->ioreq.udp_send;
+ req->data = task;
+ return uv_udp_send(req, (uv_udp_t *)handle, &buf, 1, addr, (uv_udp_send_cb)qr_task_on_send);
+ } else {
+ uint16_t pkt_size = htons(pkt->size);
+ uv_buf_t buf[2] = {
+ { (char *)&pkt_size, sizeof(pkt_size) },
+ { (char *)pkt->wire, pkt->size }
+ };
+ uv_write_t *req = &task->ioreq.tcp_send;
+ req->data = task;
+ return uv_write(req, (uv_stream_t *)handle, buf, 2, (uv_write_cb)qr_task_on_send);
+ }
+}
+
+static void qr_task_on_connect(uv_connect_t *connect, int status)
+{
+ uv_stream_t *handle = connect->handle;
+ struct qr_task *task = connect->data;
+ if (status != 0) { /* Failed to connect */
+ qr_task_step(task, NULL);
+ } else {
+ qr_task_send(task, (uv_handle_t *)handle, NULL, task->next_query);
+ }
+}
+
+static int qr_task_finalize(struct qr_task *task, int state)
{
- knot_pkt_t *answer = task->req.answer;
kr_resolve_finish(&task->req, state);
- memcpy(dst->wire, answer->wire, answer->size);
- dst->size = answer->size;
-#warning TODO: send answer asynchronously
- mp_delete(task->req.pool.ctx);
+ qr_task_send(task, task->source.handle, (struct sockaddr *)&task->source.addr, task->req.answer);
return state == KNOT_STATE_DONE ? 0 : kr_error(EIO);
}
-static void qr_task_on_connect(uv_connect_t *connect, int status)
+static int qr_task_step(struct qr_task *task, knot_pkt_t *packet)
{
-#warning TODO: if not connected, retry
-#warning TODO: if connected, send pending query
+ /* Consume input and produce next query */
+ assert(task);
+ int sock_type = -1;
+ struct sockaddr *addr = NULL;
+ knot_pkt_t *next_query = task->next_query;
+ int state = kr_resolve_consume(&task->req, packet);
+ while (state == KNOT_STATE_PRODUCE) {
+ state = kr_resolve_produce(&task->req, &addr, &sock_type, next_query);
+ }
+
+ /* We're done, no more iterations needed */
+ if (state & (KNOT_STATE_DONE|KNOT_STATE_FAIL)) {
+ return qr_task_finalize(task, state);
+ }
+
+ /* Create connection for iterative query */
+ uv_handle_t *source_handle = task->source.handle;
+ uv_handle_t *next_handle = io_create(source_handle->loop, sock_type);
+ if (next_handle == NULL) {
+ return qr_task_finalize(task, KNOT_STATE_FAIL);
+ }
+
+ /* Connect or issue query datagram */
+ next_handle->data = task;
+ if (sock_type == SOCK_STREAM) {
+ uv_connect_t *connect = &task->ioreq.connect;
+ if (uv_tcp_connect(connect, (uv_tcp_t *)next_handle, addr, qr_task_on_connect) != 0) {
+ uv_close(next_handle, (uv_close_cb) free);
+ return qr_task_step(task, NULL);
+ }
+ connect->data = task;
+ } else {
+ if (qr_task_send(task, next_handle, addr, next_query) != 0) {
+ uv_close(next_handle, (uv_close_cb) free);
+ return qr_task_step(task, NULL);
+ }
+ }
+
+ return kr_ok();
}
-int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *answer, knot_pkt_t *query)
+int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr* addr)
{
if (!worker) {
return kr_error(EINVAL);
return ret;
}
- /* Get pending request or start new */
+ /* Start new task on master sockets, or resume existing */
struct qr_task *task = handle->data;
- if (!task) {
- task = qr_task_create(worker, handle);
+ bool is_master_socket = (!task);
+ if (is_master_socket) {
+ /* Accept only queries */
+ if (knot_wire_get_qr(query->wire)) {
+ return kr_error(EINVAL); /* Ignore. */
+ }
+ task = qr_task_create(worker, handle, addr);
if (!task) {
return kr_error(ENOMEM);
}
}
/* Consume input and produce next query */
- int proto = 0;
- struct sockaddr *addr = NULL;
-#warning TODO: it shouldnt be needed to provide NULL answer if I/O fails
- int state = kr_resolve_consume(&task->req, query);
- while (state == KNOT_STATE_PRODUCE) {
- state = kr_resolve_produce(&task->req, &addr, &proto, task->pending);
- }
- if (state & (KNOT_STATE_DONE|KNOT_STATE_FAIL)) {
- return qr_task_finalize(task, answer, state);
- }
-
- /* Create connection for iterative query */
- uv_handle_t *next_handle = io_create(handle->loop, proto);
-#warning TODO: improve error checking
- next_handle->data = task;
- if (proto == SOCK_STREAM) {
- uv_connect_t *connect = io_connect(next_handle, addr, qr_task_on_connect);
- if (!connect) {
-#warning TODO: close next_handle
- return kr_error(ENOMEM);
- }
- } else {
- /* Fake connection as libuv doesn't support connected UDP */
- uv_connect_t fake_connect;
- fake_connect.handle = (uv_stream_t *)next_handle;
- qr_task_on_connect(&fake_connect, 0);
- }
-
- return kr_ok();
+ return qr_task_step(task, query);
}
struct engine *engine;
uv_loop_t *loop;
mm_ctx_t *mm;
+ struct {
+ uint8_t wire[KNOT_WIRE_MAX_PKTSIZE];
+ } bufs;
};
/**
* @param handle
* @param answer
* @param query
+ * @param addr
* @return 0, error code
*/
-int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *answer, knot_pkt_t *query);
+int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr* addr);
}
/* Clean up. */
- knot_overlay_reset(&request->overlay);
knot_overlay_deinit(&request->overlay);
+ request->overlay.state = KNOT_STATE_NOOP;
kr_rplan_deinit(&request->rplan);
return KNOT_STATE_DONE;
}