From 51d52754a7f66b98b8d8dffb032c0f9ec123f3b5 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Marek=20Vavru=C5=A1a?= Date: Sat, 2 May 2015 16:36:33 +0200 Subject: [PATCH] daemon/worker: close all handles in worker, pass through errors do all socket closing in worker to avoid double closes with timeout timer, also propagate bad messages to discover errors earlier than timeout --- daemon/io.c | 25 ++++++++----------------- daemon/worker.c | 29 ++++++++++++----------------- 2 files changed, 20 insertions(+), 34 deletions(-) diff --git a/daemon/io.c b/daemon/io.c index 548100371..67b927cc3 100644 --- a/daemon/io.c +++ b/daemon/io.c @@ -53,18 +53,9 @@ void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, { uv_loop_t *loop = handle->loop; struct worker_ctx *worker = loop->data; - - /* UDP requests are oneshot, always close afterwards */ - if (handle->data && !uv_is_closing((uv_handle_t *)handle)) { /* Do not free master socket */ - io_close((uv_handle_t *)handle); - } - - /* Check the incoming wire length. */ - if (nread > KNOT_WIRE_HEADER_SIZE) { - knot_pkt_t *query = knot_pkt_new(buf->base, nread, worker->mm); - worker_exec(worker, (uv_handle_t *)handle, query, addr); - knot_pkt_free(&query); - } + knot_pkt_t *query = knot_pkt_new(buf->base, nread, worker->mm); + worker_exec(worker, (uv_handle_t *)handle, query, addr); + knot_pkt_free(&query); } int udp_bind(struct endpoint *ep, struct sockaddr *addr) @@ -94,23 +85,23 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) uv_loop_t *loop = handle->loop; struct worker_ctx *worker = loop->data; - /* Check for connection close */ - if (nread <= 0) { + /* Check for originator connection close */ + if (nread <= 0 && handle->data == 0) { io_close((uv_handle_t *)handle); return; } else if (nread < 2) { /* Not enough bytes to read length */ + worker_exec(worker, (uv_handle_t *)handle, NULL, NULL); return; } - /* Set packet size */ /** @todo This is not going to work if the packet is fragmented in the stream ! */ uint16_t nbytes = wire_read_u16((const uint8_t *)buf->base); - - /* Check if there's enough data and execute */ if (nbytes + 2 < nread) { + worker_exec(worker, (uv_handle_t *)handle, NULL, NULL); return; } + knot_pkt_t *query = knot_pkt_new(buf->base + 2, nbytes, worker->mm); worker_exec(worker, (uv_handle_t *)handle, query, NULL); knot_pkt_free(&query); diff --git a/daemon/worker.c b/daemon/worker.c index 55fbbd9a4..abd8cb5ea 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -110,9 +110,8 @@ static void qr_task_free(uv_handle_t *handle) static void qr_task_timeout(uv_timer_t *req) { struct qr_task *task = req->data; - if (!uv_is_closing(task->next_handle)) { + if (task->next_handle) { io_stop_read(task->next_handle); - uv_close(task->next_handle, (uv_close_cb) free); qr_task_step(task, NULL); } } @@ -127,6 +126,7 @@ static void qr_task_on_send(uv_req_t* req, int status) io_start_read(task->next_handle); } } else { /* Finalize task */ + uv_timer_stop(&task->timeout); uv_close((uv_handle_t *)&task->timeout, qr_task_free); } } @@ -171,9 +171,12 @@ static int qr_task_finalize(struct qr_task *task, int state) static int qr_task_step(struct qr_task *task, knot_pkt_t *packet) { - /* Cancel timeout if active */ - uv_timer_stop(&task->timeout); - task->next_handle = NULL; + /* Cancel timeout if active, close handle. */ + if (task->next_handle) { + uv_close(task->next_handle, (uv_close_cb) free); + uv_timer_stop(&task->timeout); + task->next_handle = NULL; + } /* Consume input and produce next query */ int sock_type = -1; @@ -187,10 +190,7 @@ static int qr_task_step(struct qr_task *task, knot_pkt_t *packet) /* We're done, no more iterations needed */ if (state & (KNOT_STATE_DONE|KNOT_STATE_FAIL)) { return qr_task_finalize(task, state); - } - - /* Iteration limit */ - if (++task->iter_count > KR_ITER_LIMIT) { + } else if (++task->iter_count > KR_ITER_LIMIT) { return qr_task_finalize(task, KNOT_STATE_FAIL); } @@ -206,20 +206,17 @@ static int qr_task_step(struct qr_task *task, knot_pkt_t *packet) if (sock_type == SOCK_STREAM) { uv_connect_t *connect = &task->ioreq.connect; if (uv_tcp_connect(connect, (uv_tcp_t *)task->next_handle, addr, qr_task_on_connect) != 0) { - uv_close(task->next_handle, (uv_close_cb) free); return qr_task_step(task, NULL); } connect->data = task; } else { if (qr_task_send(task, task->next_handle, addr, next_query) != 0) { - uv_close(task->next_handle, (uv_close_cb) free); return qr_task_step(task, NULL); } } - /* Start next timeout */ + /* Start next step with timeout */ uv_timer_start(&task->timeout, qr_task_timeout, KR_CONN_RTT_MAX, 0); - return kr_ok(); } @@ -231,15 +228,13 @@ int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *quer /* Parse query */ int ret = parse_query(query); - if (ret != 0) { - return ret; - } /* Start new task on master sockets, or resume existing */ struct qr_task *task = handle->data; bool is_master_socket = (!task); if (is_master_socket) { - if (knot_wire_get_qr(query->wire)) { + /* Ignore badly formed queries or responses. */ + if (ret != 0 || knot_wire_get_qr(query->wire)) { return kr_error(EINVAL); /* Ignore. */ } task = qr_task_create(worker, handle, addr); -- 2.47.2