From: Grigorii Demidov Date: Tue, 18 Sep 2018 15:51:33 +0000 (+0200) Subject: daemon: fix memory leaks & asan errors; improvements in buffering X-Git-Tag: v3.1.0~10^2~32 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=15c911475e0dedf2c812f04a3b74ea100eca99c7;p=thirdparty%2Fknot-resolver.git daemon: fix memory leaks & asan errors; improvements in buffering --- diff --git a/daemon/io.c b/daemon/io.c index 0615b9452..aedaa9dd9 100644 --- a/daemon/io.c +++ b/daemon/io.c @@ -100,6 +100,7 @@ void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, ssize_t consumed = session_wirebuf_consume(s, (const uint8_t *)buf->base, nread); assert(consumed == nread); session_wirebuf_process(s); + session_wirebuf_discard(s); mp_flush(worker->pkt_pool.ctx); } @@ -222,6 +223,7 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) } else if (ret > 0 && !session_is_closing(s)) { session_timer_restart(s); } + session_wirebuf_compress(s); mp_flush(worker->pkt_pool.ctx); } diff --git a/daemon/session.c b/daemon/session.c index aef1e3f09..1d84b831b 100644 --- a/daemon/session.c +++ b/daemon/session.c @@ -39,7 +39,8 @@ struct session { uint8_t *wire_buf; /**< Buffer for DNS message. */ ssize_t wire_buf_size; /**< Buffer size. */ - ssize_t wire_buf_idx; /**< The number of bytes in wire_buf filled so far. */ + ssize_t wire_buf_start_idx; /**< Data start offset in wire_buf. */ + ssize_t wire_buf_end_idx; /**< Data end offset in wire_buf. */ }; static void on_session_close(uv_handle_t *handle) @@ -466,40 +467,54 @@ int session_timer_stop(struct session *session) ssize_t session_wirebuf_consume(struct session *session, const uint8_t *data, ssize_t len) { - if (data != &session->wire_buf[session->wire_buf_idx]) { + if (data != &session->wire_buf[session->wire_buf_end_idx]) { /* shouldn't happen */ return kr_error(EINVAL); } - if (session->wire_buf_idx + len > session->wire_buf_size) { + if (session->wire_buf_end_idx + len > session->wire_buf_size) { /* shouldn't happen */ return kr_error(EINVAL); } - session->wire_buf_idx += len; + session->wire_buf_end_idx += len; return len; } knot_pkt_t *session_produce_packet(struct session *session, knot_mm_t *mm) { session->sflags.wirebuf_error = false; - if (session->wire_buf_idx == 0) { + if (session->wire_buf_end_idx == 0) { + return NULL; + } + + if (session->wire_buf_start_idx == session->wire_buf_end_idx) { + session->wire_buf_start_idx = 0; + session->wire_buf_end_idx = 0; return NULL; } + if (session->wire_buf_start_idx > session->wire_buf_end_idx) { + session->sflags.wirebuf_error = true; + session->wire_buf_start_idx = 0; + session->wire_buf_end_idx = 0; + return NULL; + } + const uv_handle_t *handle = session->handle; - uint8_t *msg_start = session->wire_buf; - uint16_t msg_size = session->wire_buf_idx; + uint8_t *msg_start = &session->wire_buf[session->wire_buf_start_idx]; + ssize_t wirebuf_msg_data_size = session->wire_buf_end_idx - session->wire_buf_start_idx; + uint16_t msg_size = wirebuf_msg_data_size; if (!handle) { session->sflags.wirebuf_error = true; return NULL; } else if (handle->type == UV_TCP) { - if (session->wire_buf_idx < 2) { + if (msg_size < 2) { return NULL; } - msg_size = knot_wire_read_u16(session->wire_buf); - if (msg_size + 2 > session->wire_buf_idx) { + msg_size = knot_wire_read_u16(msg_start); + if (msg_size + 2 > wirebuf_msg_data_size) { session->sflags.wirebuf_error = false; return NULL; } @@ -516,66 +531,104 @@ knot_pkt_t *session_produce_packet(struct session *session, knot_mm_t *mm) int session_discard_packet(struct session *session, const knot_pkt_t *pkt) { uv_handle_t *handle = session->handle; - uint8_t *wirebuf_data_start = session->wire_buf; - size_t wirebuf_msg_data_size = session->wire_buf_idx; - uint8_t *wirebuf_msg_start = session->wire_buf; - size_t wirebuf_msg_size = session->wire_buf_idx; + /* Pointer to data start in wire_buf */ + uint8_t *wirebuf_data_start = &session->wire_buf[session->wire_buf_start_idx]; + /* Number of data bytes in wire_buf */ + size_t wirebuf_data_size = session->wire_buf_end_idx - session->wire_buf_start_idx; + /* Pointer to message start in wire_buf */ + uint8_t *wirebuf_msg_start = wirebuf_data_start; + /* Number of message bytes in wire_buf. + * For UDP it is the same number as wirebuf_data_size. */ + size_t wirebuf_msg_size = wirebuf_data_size; + /* Wire data from parsed packet. */ uint8_t *pkt_msg_start = pkt->wire; + /* Number of bytes in packet wire buffer. */ size_t pkt_msg_size = pkt->size; - if (pkt->tsig_rr) { + if (knot_pkt_has_tsig(pkt)) { pkt_msg_size += pkt->tsig_wire.len; } session->sflags.wirebuf_error = true; + if (!handle) { return kr_error(EINVAL); - } else if (handle->type == UV_UDP) { - /* Fast check for UDP */ - if (wirebuf_msg_start != pkt_msg_start) { + } else if (handle->type == UV_TCP) { + /* wire_buf contains TCP DNS message. */ + if (wirebuf_data_size < 2) { + /* TCP message length field isn't in buffer, must not happen. */ + assert(0); + session->wire_buf_start_idx = 0; + session->wire_buf_end_idx = 0; + return kr_error(EINVAL); + } + wirebuf_msg_size = knot_wire_read_u16(wirebuf_msg_start); + wirebuf_msg_start += 2; + if (wirebuf_msg_size + 2 > wirebuf_data_size) { + /* TCP message length field is greater then + * number of bytes in buffer, must not happen. */ + assert(0); + session->wire_buf_start_idx = 0; + session->wire_buf_end_idx = 0; return kr_error(EINVAL); } - session->wire_buf_idx = 0; - session->sflags.wirebuf_error = false; - return kr_ok(); - } - - if (session->wire_buf_idx < 2) { - return kr_error(EINVAL); } - wirebuf_msg_size = knot_wire_read_u16(wirebuf_data_start); - wirebuf_msg_start += 2; - wirebuf_msg_data_size = wirebuf_msg_size + 2; if (wirebuf_msg_start != pkt_msg_start) { + /* packet wirebuf must be located at the beginning + * of the session wirebuf, must not happen. */ + assert(0); + session->wire_buf_start_idx = 0; + session->wire_buf_end_idx = 0; return kr_error(EINVAL); } - - if (wirebuf_msg_size != pkt_msg_size) { + if (wirebuf_msg_size < pkt_msg_size) { + /* Message length field is lesser then packet size, + * must not happen. */ + assert(0); + session->wire_buf_start_idx = 0; + session->wire_buf_end_idx = 0; return kr_error(EINVAL); } - if (wirebuf_msg_data_size > session->wire_buf_idx) { - return kr_error(EINVAL); + if (handle->type == UV_TCP) { + session->wire_buf_start_idx += wirebuf_msg_size + 2; + } else { + session->wire_buf_start_idx += pkt_msg_size; } + session->sflags.wirebuf_error = false; - uint16_t wirebuf_data_amount = session->wire_buf_idx - wirebuf_msg_data_size; - if (wirebuf_data_amount) { - if (wirebuf_msg_data_size < wirebuf_data_amount) { - memmove(wirebuf_data_start, &wirebuf_data_start[wirebuf_msg_data_size], - wirebuf_data_amount); - } else { - memcpy(wirebuf_data_start, &wirebuf_data_start[wirebuf_msg_data_size], - wirebuf_data_amount); - } + if (wirebuf_data_size == 0) { + session_wirebuf_discard(session); + } else if (wirebuf_data_size < KNOT_WIRE_HEADER_SIZE) { + session_wirebuf_compress(session); } - session->wire_buf_idx = wirebuf_data_amount; - session->sflags.wirebuf_error = false; - return kr_ok(); } +void session_wirebuf_discard(struct session *session) +{ + session->wire_buf_start_idx = 0; + session->wire_buf_end_idx = 0; +} + +void session_wirebuf_compress(struct session *session) +{ + if (session->wire_buf_start_idx == 0) { + return; + } + uint8_t *wirebuf_data_start = &session->wire_buf[session->wire_buf_start_idx]; + size_t wirebuf_data_size = session->wire_buf_end_idx - session->wire_buf_start_idx; + if (session->wire_buf_start_idx < wirebuf_data_size) { + memmove(session->wire_buf, wirebuf_data_start, wirebuf_data_size); + } else { + memcpy(session->wire_buf, wirebuf_data_start, wirebuf_data_size); + } + session->wire_buf_start_idx = 0; + session->wire_buf_end_idx = wirebuf_data_size; +} + bool session_wirebuf_error(struct session *session) { return session->sflags.wirebuf_error; @@ -588,7 +641,7 @@ uint8_t *session_wirebuf_get_start(struct session *session) size_t session_wirebuf_get_len(struct session *session) { - return session->wire_buf_idx; + return session->wire_buf_end_idx; } size_t session_wirebuf_get_size(struct session *session) @@ -598,12 +651,12 @@ size_t session_wirebuf_get_size(struct session *session) uint8_t *session_wirebuf_get_free_start(struct session *session) { - return &session->wire_buf[session->wire_buf_idx]; + return &session->wire_buf[session->wire_buf_end_idx]; } size_t session_wirebuf_get_free_size(struct session *session) { - return session->wire_buf_size - session->wire_buf_idx; + return session->wire_buf_size - session->wire_buf_end_idx; } void session_poison(struct session *session) @@ -619,7 +672,7 @@ void session_unpoison(struct session *session) int session_wirebuf_process(struct session *session) { int ret = 0; - if (session->wire_buf_idx == 0) { + if (session->wire_buf_start_idx == session->wire_buf_end_idx) { return ret; } struct worker_ctx *worker = session_get_handle(session)->loop->data; @@ -632,7 +685,6 @@ int session_wirebuf_process(struct session *session) } ret += 1; } - assert(ret < 100); if (session_wirebuf_error(session)) { ret = -1; } diff --git a/daemon/session.h b/daemon/session.h index f06ba77fa..d33aaa4cf 100644 --- a/daemon/session.h +++ b/daemon/session.h @@ -119,16 +119,20 @@ int session_timer_restart(struct session *session); /** Stop session timer. */ int session_timer_stop(struct session *session); -/** Get start of session buffer for wire data. */ +/** Get pointer to the beginning of session wirebuffer. */ uint8_t *session_wirebuf_get_start(struct session *session); /** Get size of session wirebuffer. */ size_t session_wirebuf_get_size(struct session *session); /** Get length of data in the session wirebuffer. */ size_t session_wirebuf_get_len(struct session *session); -/** Get start of free space in session wirebuffer. */ +/** Get pointer to the beginning of free space in session wirebuffer. */ uint8_t *session_wirebuf_get_free_start(struct session *session); /** Get amount of free space in session wirebuffer. */ size_t session_wirebuf_get_free_size(struct session *session); +/** Discard all data in session wirebuffer. */ +void session_wirebuf_discard(struct session *session); +/** Move all data to the beginning of the buffer. */ +void session_wirebuf_compress(struct session *session); int session_wirebuf_process(struct session *session); ssize_t session_wirebuf_consume(struct session *session, const uint8_t *data, ssize_t len); diff --git a/daemon/worker.c b/daemon/worker.c index 244b9cf46..561f78a76 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -623,6 +623,32 @@ static void qr_task_free(struct qr_task *task) worker->stats.concurrent -= 1; } +/*@ Register new qr_task within session. */ +static int qr_task_register(struct qr_task *task, struct session *session) +{ + assert(!session_is_outgoing(session) && session_get_handle(session)->type == UV_TCP); + + session_tasklist_add(session, task); + + struct request_ctx *ctx = task->ctx; + assert(ctx && (ctx->source.session == NULL || ctx->source.session == session)); + ctx->source.session = session; + /* Soft-limit on parallel queries, there is no "slow down" RCODE + * that we could use to signalize to client, but we can stop reading, + * an in effect shrink TCP window size. To get more precise throttling, + * we would need to copy remainder of the unread buffer and reassemble + * when resuming reading. This is NYI. */ + if (session_tasklist_get_len(session) >= task->ctx->worker->tcp_pipeline_max) { + uv_handle_t *handle = session_get_handle(session); + if (handle && !session_is_throttled(session) && !session_is_closing(session)) { + io_stop_read(handle); + session_set_throttled(session, true); + } + } + + return 0; +} + static void qr_task_complete(struct qr_task *task) { struct request_ctx *ctx = task->ctx; @@ -810,9 +836,7 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle, } /* Update statistics */ - if (ctx->source.session && - handle != session_get_handle(ctx->source.session) && - addr) { + if (session_is_outgoing(session) && addr) { if (session_has_tls(session)) worker->stats.tls += 1; else if (handle->type == UV_UDP) @@ -825,7 +849,6 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle, else if (addr->sa_family == AF_INET) worker->stats.ipv4 += 1; } - return ret; } @@ -1613,6 +1636,10 @@ int worker_submit(struct session *session, knot_pkt_t *query) request_free(ctx); return kr_error(ENOMEM); } + + if (handle->type == UV_TCP && qr_task_register(task, session)) { + return kr_error(ENOMEM); + } } else if (query) { /* response from upstream */ if ((ret != kr_ok() && ret != kr_error(EMSGSIZE)) || !knot_wire_get_qr(query->wire)) { @@ -1762,7 +1789,6 @@ int worker_end_tcp(struct session *session) assert(task->ctx->source.session == session); task->ctx->source.session = NULL; } - qr_task_unref(task); } while (!session_tasklist_is_empty(session)) { struct qr_task *task = session_tasklist_get_first(session);