]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon: fix memory leaks & asan errors; improvements in buffering
authorGrigorii Demidov <grigorii.demidov@nic.cz>
Tue, 18 Sep 2018 15:51:33 +0000 (17:51 +0200)
committerVladimír Čunát <vladimir.cunat@nic.cz>
Fri, 12 Oct 2018 15:36:43 +0000 (17:36 +0200)
daemon/io.c
daemon/session.c
daemon/session.h
daemon/worker.c

index 0615b9452de048a906a6ae267d77ffee348bb881..aedaa9dd950f388bdea1e2bbf3e02efc0349a9f6 100644 (file)
@@ -100,6 +100,7 @@ void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
        ssize_t consumed = session_wirebuf_consume(s, (const uint8_t *)buf->base, nread);
        assert(consumed == nread);
        session_wirebuf_process(s);
+       session_wirebuf_discard(s);
        mp_flush(worker->pkt_pool.ctx);
 }
 
@@ -222,6 +223,7 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
        } else if (ret > 0 && !session_is_closing(s)) {
                session_timer_restart(s);
        }
+       session_wirebuf_compress(s);
        mp_flush(worker->pkt_pool.ctx);
 }
 
index aef1e3f097b6c84cdd71118910b8ce75c2121b36..1d84b831be291a7769e7050ed035f86956e95e21 100644 (file)
@@ -39,7 +39,8 @@ struct session {
 
        uint8_t *wire_buf;           /**< Buffer for DNS message. */
        ssize_t wire_buf_size;       /**< Buffer size. */
-       ssize_t wire_buf_idx;        /**< The number of bytes in wire_buf filled so far. */
+       ssize_t wire_buf_start_idx;  /**< Data start offset in wire_buf. */
+       ssize_t wire_buf_end_idx;    /**< Data end offset in wire_buf. */
 };
 
 static void on_session_close(uv_handle_t *handle)
@@ -466,40 +467,54 @@ int session_timer_stop(struct session *session)
 
 ssize_t session_wirebuf_consume(struct session *session, const uint8_t *data, ssize_t len)
 {
-       if (data != &session->wire_buf[session->wire_buf_idx]) {
+       if (data != &session->wire_buf[session->wire_buf_end_idx]) {
                /* shouldn't happen */
                return kr_error(EINVAL);
        }
 
-       if (session->wire_buf_idx + len > session->wire_buf_size) {
+       if (session->wire_buf_end_idx + len > session->wire_buf_size) {
                /* shouldn't happen */
                return kr_error(EINVAL);
        }
 
-       session->wire_buf_idx += len;
+       session->wire_buf_end_idx += len;
        return len;
 }
 
 knot_pkt_t *session_produce_packet(struct session *session, knot_mm_t *mm)
 {
        session->sflags.wirebuf_error = false;
-       if (session->wire_buf_idx == 0) {
+       if (session->wire_buf_end_idx == 0) {
+               return NULL;
+       }
+
+       if (session->wire_buf_start_idx == session->wire_buf_end_idx) {
+               session->wire_buf_start_idx = 0;
+               session->wire_buf_end_idx = 0;
                return NULL;
        }
        
+       if (session->wire_buf_start_idx > session->wire_buf_end_idx) {
+               session->sflags.wirebuf_error = true;
+               session->wire_buf_start_idx = 0;
+               session->wire_buf_end_idx = 0;
+               return NULL;
+       }
+
        const uv_handle_t *handle = session->handle;
-       uint8_t *msg_start = session->wire_buf;
-       uint16_t msg_size = session->wire_buf_idx;
+       uint8_t *msg_start = &session->wire_buf[session->wire_buf_start_idx];
+       ssize_t wirebuf_msg_data_size = session->wire_buf_end_idx - session->wire_buf_start_idx;
+       uint16_t msg_size = wirebuf_msg_data_size;
        
        if (!handle) {
                session->sflags.wirebuf_error = true;
                return NULL;
        } else if (handle->type == UV_TCP) {
-               if (session->wire_buf_idx < 2) {
+               if (msg_size < 2) {
                        return NULL;
                }
-               msg_size = knot_wire_read_u16(session->wire_buf);
-               if (msg_size + 2 > session->wire_buf_idx) {
+               msg_size = knot_wire_read_u16(msg_start);
+               if (msg_size + 2 > wirebuf_msg_data_size) {
                        session->sflags.wirebuf_error = false;
                        return NULL;
                }
@@ -516,66 +531,104 @@ knot_pkt_t *session_produce_packet(struct session *session, knot_mm_t *mm)
 int session_discard_packet(struct session *session, const knot_pkt_t *pkt)
 {
        uv_handle_t *handle = session->handle;
-       uint8_t *wirebuf_data_start = session->wire_buf;
-       size_t wirebuf_msg_data_size = session->wire_buf_idx;
-       uint8_t *wirebuf_msg_start = session->wire_buf;
-       size_t wirebuf_msg_size = session->wire_buf_idx;
+       /* Pointer to data start in wire_buf */
+       uint8_t *wirebuf_data_start = &session->wire_buf[session->wire_buf_start_idx];
+       /* Number of data bytes in wire_buf */
+       size_t wirebuf_data_size = session->wire_buf_end_idx - session->wire_buf_start_idx;
+       /* Pointer to message start in wire_buf */
+       uint8_t *wirebuf_msg_start = wirebuf_data_start;
+       /* Number of message bytes in wire_buf.
+        * For UDP it is the same number as wirebuf_data_size. */
+       size_t wirebuf_msg_size = wirebuf_data_size;
+       /* Wire data from parsed packet. */
        uint8_t *pkt_msg_start = pkt->wire;
+       /* Number of bytes in packet wire buffer. */
        size_t pkt_msg_size = pkt->size;
-       if (pkt->tsig_rr) {
+       if (knot_pkt_has_tsig(pkt)) {
                pkt_msg_size += pkt->tsig_wire.len;
        }
 
        session->sflags.wirebuf_error = true;
+
        if (!handle) {
                return kr_error(EINVAL);
-       } else if (handle->type == UV_UDP) {
-               /* Fast check for UDP */
-               if (wirebuf_msg_start != pkt_msg_start) {
+       } else if (handle->type == UV_TCP) {
+               /* wire_buf contains TCP DNS message. */
+               if (wirebuf_data_size < 2) {
+                       /* TCP message length field isn't in buffer, must not happen. */
+                       assert(0);
+                       session->wire_buf_start_idx = 0;
+                       session->wire_buf_end_idx = 0;
+                       return kr_error(EINVAL);
+               }
+               wirebuf_msg_size = knot_wire_read_u16(wirebuf_msg_start);
+               wirebuf_msg_start += 2;
+               if (wirebuf_msg_size + 2 > wirebuf_data_size) {
+                       /* TCP message length field is greater then
+                        * number of bytes in buffer, must not happen. */
+                       assert(0);
+                       session->wire_buf_start_idx = 0;
+                       session->wire_buf_end_idx = 0;
                        return kr_error(EINVAL);
                }
-               session->wire_buf_idx = 0;
-               session->sflags.wirebuf_error = false;
-               return kr_ok();
-       }
-
-       if (session->wire_buf_idx < 2) {
-               return kr_error(EINVAL);
        }
-       wirebuf_msg_size = knot_wire_read_u16(wirebuf_data_start);
-       wirebuf_msg_start += 2;
-       wirebuf_msg_data_size = wirebuf_msg_size + 2;
 
        if (wirebuf_msg_start != pkt_msg_start) {
+               /* packet wirebuf must be located at the beginning
+                * of the session wirebuf, must not happen. */
+               assert(0);
+               session->wire_buf_start_idx = 0;
+               session->wire_buf_end_idx = 0;
                return kr_error(EINVAL);
        }
 
-
-       if (wirebuf_msg_size != pkt_msg_size) {
+       if (wirebuf_msg_size < pkt_msg_size) {
+               /* Message length field is lesser then packet size,
+                * must not happen. */
+               assert(0);
+               session->wire_buf_start_idx = 0;
+               session->wire_buf_end_idx = 0;
                return kr_error(EINVAL);
        }
 
-       if (wirebuf_msg_data_size > session->wire_buf_idx) {
-               return kr_error(EINVAL);
+       if (handle->type == UV_TCP) {
+               session->wire_buf_start_idx += wirebuf_msg_size + 2;
+       } else {
+               session->wire_buf_start_idx += pkt_msg_size;
        }
+       session->sflags.wirebuf_error = false;
        
-       uint16_t wirebuf_data_amount = session->wire_buf_idx - wirebuf_msg_data_size;
-       if (wirebuf_data_amount) {
-               if (wirebuf_msg_data_size < wirebuf_data_amount) {
-                       memmove(wirebuf_data_start, &wirebuf_data_start[wirebuf_msg_data_size],
-                               wirebuf_data_amount);
-               } else {
-                       memcpy(wirebuf_data_start, &wirebuf_data_start[wirebuf_msg_data_size],
-                              wirebuf_data_amount);
-               }
+       if (wirebuf_data_size == 0) {
+               session_wirebuf_discard(session);
+       } else if (wirebuf_data_size < KNOT_WIRE_HEADER_SIZE) {
+               session_wirebuf_compress(session);
        }
 
-       session->wire_buf_idx = wirebuf_data_amount;
-       session->sflags.wirebuf_error = false;
-
        return kr_ok();
 }
 
+void session_wirebuf_discard(struct session *session)
+{
+       session->wire_buf_start_idx = 0;
+       session->wire_buf_end_idx = 0;
+}
+
+void session_wirebuf_compress(struct session *session)
+{
+       if (session->wire_buf_start_idx == 0) {
+               return;
+       }
+       uint8_t *wirebuf_data_start = &session->wire_buf[session->wire_buf_start_idx];
+       size_t wirebuf_data_size = session->wire_buf_end_idx - session->wire_buf_start_idx;
+       if (session->wire_buf_start_idx < wirebuf_data_size) {
+               memmove(session->wire_buf, wirebuf_data_start, wirebuf_data_size);
+       } else {
+               memcpy(session->wire_buf, wirebuf_data_start, wirebuf_data_size);
+       }
+       session->wire_buf_start_idx = 0;
+       session->wire_buf_end_idx = wirebuf_data_size;
+}
+
 bool session_wirebuf_error(struct session *session)
 {
        return session->sflags.wirebuf_error;
@@ -588,7 +641,7 @@ uint8_t *session_wirebuf_get_start(struct session *session)
 
 size_t session_wirebuf_get_len(struct session *session)
 {
-       return session->wire_buf_idx;
+       return session->wire_buf_end_idx;
 }
 
 size_t session_wirebuf_get_size(struct session *session)
@@ -598,12 +651,12 @@ size_t session_wirebuf_get_size(struct session *session)
 
 uint8_t *session_wirebuf_get_free_start(struct session *session)
 {
-       return &session->wire_buf[session->wire_buf_idx];
+       return &session->wire_buf[session->wire_buf_end_idx];
 }
 
 size_t session_wirebuf_get_free_size(struct session *session)
 {
-       return session->wire_buf_size - session->wire_buf_idx;
+       return session->wire_buf_size - session->wire_buf_end_idx;
 }
 
 void session_poison(struct session *session)
@@ -619,7 +672,7 @@ void session_unpoison(struct session *session)
 int session_wirebuf_process(struct session *session)
 {
        int ret = 0;
-       if (session->wire_buf_idx == 0) {
+       if (session->wire_buf_start_idx == session->wire_buf_end_idx) {
                return ret;
        }
        struct worker_ctx *worker = session_get_handle(session)->loop->data;
@@ -632,7 +685,6 @@ int session_wirebuf_process(struct session *session)
                }
                ret += 1;
        }
-       assert(ret < 100);
        if (session_wirebuf_error(session)) {
                ret = -1;
        }
index f06ba77fa4979b12f3130f339998b587ab823ac5..d33aaa4cfda244353f8f73f4e7008a765290e3f2 100644 (file)
@@ -119,16 +119,20 @@ int session_timer_restart(struct session *session);
 /** Stop session timer. */
 int session_timer_stop(struct session *session);
 
-/** Get start of session buffer for wire data. */
+/** Get pointer to the beginning of session wirebuffer. */
 uint8_t *session_wirebuf_get_start(struct session *session);
 /** Get size of session wirebuffer. */
 size_t session_wirebuf_get_size(struct session *session);
 /** Get length of data in the session wirebuffer. */
 size_t session_wirebuf_get_len(struct session *session);
-/** Get start of free space in session wirebuffer. */
+/** Get pointer to the beginning of free space in session wirebuffer. */
 uint8_t *session_wirebuf_get_free_start(struct session *session);
 /** Get amount of free space in session wirebuffer. */
 size_t session_wirebuf_get_free_size(struct session *session);
+/** Discard all data in session wirebuffer. */
+void session_wirebuf_discard(struct session *session);
+/** Move all data to the beginning of the buffer. */
+void session_wirebuf_compress(struct session *session);
 int session_wirebuf_process(struct session *session);
 ssize_t session_wirebuf_consume(struct session *session,
                                const uint8_t *data, ssize_t len);
index 244b9cf46bfd71284aacad0da5d989ddbce16560..561f78a76a922494d62bddba8b7084fcf41b97c0 100644 (file)
@@ -623,6 +623,32 @@ static void qr_task_free(struct qr_task *task)
        worker->stats.concurrent -= 1;
 }
 
+/*@ Register new qr_task within session. */
+static int qr_task_register(struct qr_task *task, struct session *session)
+{
+       assert(!session_is_outgoing(session) && session_get_handle(session)->type == UV_TCP);
+
+       session_tasklist_add(session, task);
+
+       struct request_ctx *ctx = task->ctx;
+       assert(ctx && (ctx->source.session == NULL || ctx->source.session == session));
+       ctx->source.session = session;
+       /* Soft-limit on parallel queries, there is no "slow down" RCODE
+        * that we could use to signalize to client, but we can stop reading,
+        * an in effect shrink TCP window size. To get more precise throttling,
+        * we would need to copy remainder of the unread buffer and reassemble
+        * when resuming reading. This is NYI.  */
+       if (session_tasklist_get_len(session) >= task->ctx->worker->tcp_pipeline_max) {
+               uv_handle_t *handle = session_get_handle(session);
+               if (handle && !session_is_throttled(session) && !session_is_closing(session)) {
+                       io_stop_read(handle);
+                       session_set_throttled(session, true);
+               }
+       }
+
+       return 0;
+}
+
 static void qr_task_complete(struct qr_task *task)
 {
        struct request_ctx *ctx = task->ctx;
@@ -810,9 +836,7 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle,
        }
 
        /* Update statistics */
-       if (ctx->source.session &&
-           handle != session_get_handle(ctx->source.session) &&
-           addr) {
+       if (session_is_outgoing(session) && addr) {
                if (session_has_tls(session))
                        worker->stats.tls += 1;
                else if (handle->type == UV_UDP)
@@ -825,7 +849,6 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle,
                else if (addr->sa_family == AF_INET)
                        worker->stats.ipv4 += 1;
        }
-
        return ret;
 }
 
@@ -1613,6 +1636,10 @@ int worker_submit(struct session *session, knot_pkt_t *query)
                        request_free(ctx);
                        return kr_error(ENOMEM);
                }
+
+               if (handle->type == UV_TCP && qr_task_register(task, session)) {
+                       return kr_error(ENOMEM);
+               }
        } else if (query) { /* response from upstream */
                if ((ret != kr_ok() && ret != kr_error(EMSGSIZE)) ||
                    !knot_wire_get_qr(query->wire)) {
@@ -1762,7 +1789,6 @@ int worker_end_tcp(struct session *session)
                        assert(task->ctx->source.session == session);
                        task->ctx->source.session = NULL;
                }
-               qr_task_unref(task);
        }
        while (!session_tasklist_is_empty(session)) {
                struct qr_task *task = session_tasklist_get_first(session);